2011-09-30 51 views
0

我们注意到,我们的.NET应用程序的内部,我们有竞争,当涉及到使用SqlDataReader的。虽然我们知道SqlDataReader不是ThreadSafe,但它应该缩放。下面的代码是一个简单的例子来表明,由于存在对SqlDataReader的GetValue方法竞争,我们不能扩展我们的应用程序。我们不受CPU,磁盘或网络的限制;只是SqlDataReader的内部争用。我们可以使用1个线程运行应用程序10次,它可以线性缩放,但1个应用程序中的10个线程不会缩放。有关如何在单个c#应用程序中扩展从SQL Server读取的任何想法?c。与SqlDataReaders和SqlDataAdapters#线程争

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Threading; 
using System.Diagnostics; 
using System.Globalization; 

namespace ThreadAndSQLTester 
{ 
    class Host 
    { 
     /// <summary> 
     /// Gets or sets the receive workers. 
     /// </summary> 
     /// <value>The receive workers.</value> 
     internal List<Worker> Workers { get; set; } 
     /// <summary> 
     /// Gets or sets the receive threads. 
     /// </summary> 
     /// <value>The receive threads.</value> 
     internal List<Thread> Threads { get; set; } 

     public int NumberOfThreads { get; set; } 
     public int Sleep { get; set; } 
     public int MinutesToRun { get; set; } 
     public bool IsRunning { get; set; } 
     private System.Timers.Timer runTime; 

     private object lockVar = new object(); 

     public Host() 
     { 
      Init(1, 0, 0); 
     } 

     public Host(int numberOfThreads, int sleep, int minutesToRun) 
     { 
      Init(numberOfThreads, sleep, minutesToRun); 
     } 

     private void Init(int numberOfThreads, int sleep, int minutesToRun) 
     { 
      this.Workers = new List<Worker>(); 
      this.Threads = new List<Thread>(); 

      this.NumberOfThreads = numberOfThreads; 
      this.Sleep = sleep; 
      this.MinutesToRun = minutesToRun; 

      SetUpTimer(); 
     } 

     private void SetUpTimer() 
     { 
      if (this.MinutesToRun > 0) 
      { 
       this.runTime = new System.Timers.Timer(); 
       this.runTime.Interval = TimeSpan.FromMinutes(this.MinutesToRun).TotalMilliseconds; 
       this.runTime.Elapsed += new System.Timers.ElapsedEventHandler(runTime_Elapsed); 
       this.runTime.Start(); 
      } 
     } 

     void runTime_Elapsed(object sender, System.Timers.ElapsedEventArgs e) 
     { 
      this.runTime.Stop(); 
      this.Stop(); 
      this.IsRunning = false; 
     } 

     public void Start() 
     { 
      this.IsRunning = true; 

      Random r = new Random(DateTime.Now.Millisecond); 

      for (int i = 0; i < this.NumberOfThreads; i++) 
      { 
       string threadPoolId = Math.Ceiling(r.NextDouble() * 10).ToString(); 

       Worker worker = new Worker("-" + threadPoolId); //i.ToString()); 
       worker.Sleep = this.Sleep; 

       this.Workers.Add(worker); 

       Thread thread = new Thread(worker.Work); 
       worker.Name = string.Format("WorkerThread-{0}", i); 

       thread.Name = worker.Name; 

       this.Threads.Add(thread); 
       thread.Start(); 

       Debug.WriteLine(string.Format(CultureInfo.InvariantCulture, "Started new Worker Thread. Total active: {0}", i + 1)); 
      } 
     } 

     public void Stop() 
     { 
      if (this.Workers != null) 
      { 
       lock (lockVar) 
       { 
        for (int i = 0; i < this.Workers.Count; i++) 
        { 
         //Thread thread = this.Threads[i]; 
         //thread.Interrupt(); 
         this.Workers[i].IsEnabled = false; 
        } 

        for (int i = this.Workers.Count - 1; i >= 0; i--) 
        { 
         Worker worker = this.Workers[i]; 
         while (worker.IsRunning) 
         { 
          Thread.Sleep(32); 
         } 
        } 

        foreach (Thread thread in this.Threads) 
        { 
         thread.Abort(); 
        } 

        this.Workers.Clear(); 
        this.Threads.Clear(); 
       } 
      } 
     } 

    } 
} 

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Data.SqlClient; 
using System.Data; 
using System.Threading; 
using System.ComponentModel; 
using System.Data.OleDb; 

namespace ThreadAndSQLTester 
{ 
    class Worker 
    { 
     public bool IsEnabled { get; set; } 
     public bool IsRunning { get; set; } 
     public string Name { get; set; } 
     public int Sleep { get; set; } 

     private string dataCnString { get; set; } 
     private string logCnString { get; set; } 

     private List<Log> Logs { get; set; } 

     public Worker(string threadPoolId) 
     { 
      this.Logs = new List<Log>(); 

      SqlConnectionStringBuilder cnBldr = new SqlConnectionStringBuilder(); 
      cnBldr.DataSource = @"trgcrmqa3"; 
      cnBldr.InitialCatalog = "Scratch"; 
      cnBldr.IntegratedSecurity = true; 
      cnBldr.MultipleActiveResultSets = true; 
      cnBldr.Pooling = true;    

      dataCnString = GetConnectionStringWithWorkStationId(cnBldr.ToString(), threadPoolId);    

      cnBldr = new SqlConnectionStringBuilder(); 
      cnBldr.DataSource = @"trgcrmqa3"; 
      cnBldr.InitialCatalog = "Scratch"; 
      cnBldr.IntegratedSecurity = true; 

      logCnString = GetConnectionStringWithWorkStationId(cnBldr.ToString(), string.Empty); 

      IsEnabled = true; 
     } 

     private string machineName { get; set; } 
     private string GetConnectionStringWithWorkStationId(string connectionString, string connectionPoolToken) 
     { 
      if (string.IsNullOrEmpty(machineName)) machineName = Environment.MachineName; 

      SqlConnectionStringBuilder cnbdlr; 
      try 
      { 
       cnbdlr = new SqlConnectionStringBuilder(connectionString); 
      } 
      catch 
      { 
       throw new ArgumentException("connection string was an invalid format"); 
      } 

      cnbdlr.WorkstationID = machineName + connectionPoolToken; 

      return cnbdlr.ConnectionString; 
     } 

     public void Work() 
     { 
      int i = 0; 

      while (this.IsEnabled) 
      { 
       this.IsRunning = true; 

       try 
       { 
        Log log = new Log(); 
        log.WorkItemId = Guid.NewGuid(); 
        log.StartTime = DateTime.Now; 
        List<object> lst = new List<object>(); 

        using (SqlConnection cn = new SqlConnection(this.dataCnString)) 
        { 
         try 
         { 
          cn.Open(); 

          using (SqlCommand cmd = new SqlCommand("Analysis.spSelectTestData", cn)) 
          { 
           cmd.CommandType = System.Data.CommandType.StoredProcedure; 

           using (SqlDataReader dr = cmd.ExecuteReader(CommandBehavior.SequentialAccess)) // DBHelper.ExecuteReader(cn, cmd)) 
           {          
            while (dr.Read()) 
            { 
             CreateClaimHeader2(dr, lst); 
            } 

            dr.Close(); 
           } 

           cmd.Cancel(); 
          } 
         } 
         catch { } 
         finally 
         { 
          cn.Close(); 
         } 
        } 

        log.StopTime = DateTime.Now; 
        log.RouteName = this.Name; 
        log.HostName = this.machineName; 

        this.Logs.Add(log); 
        i++; 

        if (i > 1000) 
        { 
         Console.WriteLine(string.Format("Thread: {0} executed {1} items.", this.Name, i)); 
         i = 0; 
        } 

        if (this.Sleep > 0) Thread.Sleep(this.Sleep); 
       } 
       catch { } 
      } 

      this.LogMessages(); 

      this.IsRunning = false; 
     }  

     private void CreateClaimHeader2(IDataReader reader, List<object> lst) 
     { 
      lst.Add(reader["ClaimHeaderID"]); 
      lst.Add(reader["ClientCode"]); 
      lst.Add(reader["MemberID"]); 
      lst.Add(reader["ProviderID"]); 
      lst.Add(reader["ClaimNumber"]); 
      lst.Add(reader["PatientAcctNumber"]); 
      lst.Add(reader["Source"]); 
      lst.Add(reader["SourceID"]); 
      lst.Add(reader["TotalPayAmount"]); 
      lst.Add(reader["TotalBillAmount"]); 
      lst.Add(reader["FirstDateOfService"]); 
      lst.Add(reader["LastDateOfService"]); 
      lst.Add(reader["MaxStartDateOfService"]); 
      lst.Add(reader["MaxValidStartDateOfService"]); 
      lst.Add(reader["LastUpdated"]); 
      lst.Add(reader["UpdatedBy"]); 
     } 

     /// <summary> 
     /// Toes the data table. 
     /// </summary> 
     /// <typeparam name="T"></typeparam> 
     /// <param name="data">The data.</param> 
     /// <returns></returns> 
     public DataTable ToDataTable<T>(IEnumerable<T> data) 
     { 
      PropertyDescriptorCollection props = 
       TypeDescriptor.GetProperties(typeof(T)); 

      if (props == null) throw new ArgumentNullException("Table properties."); 
      if (data == null) throw new ArgumentNullException("data"); 

      DataTable table = new DataTable(); 
      for (int i = 0; i < props.Count; i++) 
      { 
       PropertyDescriptor prop = props[i]; 
       table.Columns.Add(prop.Name, Nullable.GetUnderlyingType(prop.PropertyType) ?? prop.PropertyType); 
      } 
      object[] values = new object[props.Count]; 
      foreach (T item in data) 
      { 
       for (int i = 0; i < values.Length; i++) 
       { 
        values[i] = props[i].GetValue(item) ?? DBNull.Value; 
       } 
       table.Rows.Add(values); 
      } 
      return table; 
     } 


     private void LogMessages() 
     { 
      using (SqlConnection cn = new SqlConnection(this.logCnString)) 
      { 
       try 
       { 
        cn.Open(); 
        DataTable dt = ToDataTable(this.Logs); 

        Console.WriteLine(string.Format("Logging {0} records for Thread: {1}", this.Logs.Count, this.Name)); 

        using (SqlCommand cmd = new SqlCommand("Analysis.spInsertWorkItemRouteLog", cn)) 
        { 
         cmd.CommandType = System.Data.CommandType.StoredProcedure; 

         cmd.Parameters.AddWithValue("@dt", dt); 

         cmd.ExecuteNonQuery(); 
        } 

        Console.WriteLine(string.Format("Logged {0} records for Thread: {1}", this.Logs.Count, this.Name)); 
       } 
       finally 
       { 
        cn.Close(); 
       } 
      } 
     } 
    } 
} 
+0

我不明白你想要做什么。当你说有“上SqlDataReader对象的GetValue争”,你的意思是有锁争用?或者你的意思是GetValue在你的性能测试中花费的时间最长? –

+0

存在锁争用。 – Sean

+0

好的。为什么我们要获取锁以便使用SqlDataReader?为什么每个线程只有一个读卡器? –

回答

-1

SqlDataAdapter或sqlDataReader之间的区别? 答:1.A DataReader的作品在连接环境, 而DataSet的工作在断开连接的环境。 2.A数据集表示由任何数目的相互关联的数据表对象中的数据的内存中缓存。 DataTable对象表示内存数据的表格块。

SqlDataAdapter or sqlDataReader

0
1.A DataReader works in a connected environment, 
whereas DataSet works in a disconnected environment. 

2.A DataSet represents an in-memory cache of data consisting of any number of inter related DataTable objects. A DataTable object represents a tabular block of in-memory data. 

SqlDataAdapter or sqlDataReader