2013-04-04 95 views
0

我有一个大约5MB的CSV数据库,邮政编码,城市和州正在尝试导入SQL Server CE数据库。从多个线程访问SQL Server CE是否安全?

使用单线程,估计该过程需要大约3小时才能完成。虽然这对完成工作很好,但我想尝试在多个线程中分割任务,以缩短总共3小时的时间。如果我在每个线程上创建一个SqlCeConnection对象,是否可以安全地在每个线程上同时运行命令?

我有一种感觉,会有并发和死锁的问题。这里是我找到的CSV数据库:http://www.unitedstateszipcodes.org/zip-code-database/

这里是我的相关代码:

List<AddressSet> addressList; 

public void OpenCSV(string file) 
{ 
    var addresses = from line in File.ReadAllLines(file).Skip(1) 
        let columns = line.Split(',') 
        select new AddressSet 
        { 
         ZipCode = columns[0].Replace("\"", "").Trim(), 
         City = columns[2].Replace("\"", "").Trim(), 
         State = columns[5].Replace("\"", "").Trim() 
        }; 
    addressList = addresses.ToList(); 

    Thread worker = new Thread(new ThreadStart(ProcessData)); 
    worker.Start(); 

} 

private void ProcessData() 
{ 
    try 
    { 
     int i = 1; 
     DateTime operationStart = DateTime.Now; 
     foreach (AddressSet address in addressList) 
     { 
      int stateId = InsertState(address.State); 
      int zipCodeId = InsertZipCode(address.ZipCode, stateId); 
      int cityId = InsertCity(address.City, stateId); 

      UpdateRelationships(zipCodeId, cityId); 
      float pct = i/(float)addressList.Count() * 100; 
      TimeSpan timeSinceStart = DateTime.Now.Subtract(operationStart); 
      TimeSpan totalTime = TimeSpan.FromMilliseconds(timeSinceStart.TotalMilliseconds/(pct/100)); 
      TimeSpan timeLeft = totalTime - timeSinceStart; 
      //richTextBox1.BeginInvoke((MethodInvoker)(() => richTextBox1.Text = pct.ToString("N2") + "% (" + i + " of " + addressList.Count().ToString() + ") " + address.City + ", " + address.State + " " + address.ZipCode 
      // + "\nEstimated Total Time: " + totalTime.Days.ToString() + " days, " + totalTime.Hours.ToString() + " hours, " + totalTime.Minutes.ToString() + " minutes" + 
      // " - Time Left: " + timeLeft.Days.ToString() + " days, " + timeLeft.Hours.ToString() + " hours, " + timeLeft.Minutes.ToString() + " minutes")); 
      richTextBox1.BeginInvoke((MethodInvoker)(() => richTextBox1.Text = pct.ToString("N2") + "% (" + i + " of " + addressList.Count().ToString() + ") " + address.City + ", " + address.State + " " + address.ZipCode 
       + "\nEstimated Total Time: " + totalTime.ToString("h'h 'm'm 's's'") + 
       "\nTime Left: " + timeLeft.ToString("h'h 'm'm 's's'") + 
       "\nRunning Time: " + timeSinceStart.ToString("h'h 'm'm 's's'"))); 
      richTextBox1.BeginInvoke((MethodInvoker)(() => richTextBox1.SelectionStart = richTextBox1.Text.Length)); 
      richTextBox1.BeginInvoke((MethodInvoker)(() => richTextBox1.ScrollToCaret())); 
      i++; 
     } 
     this.Invoke(new Action(() => 
     { 
      MessageBox.Show("Done!"); 
      btnChooseCSV.Enabled = true; 
     })); 
    } 
    catch (Exception ex) 
    { 
     this.Invoke(new Action(() => 
     { 
      MessageBox.Show(ex.Message); 
     })); 
    } 
} 

private int InsertZipCode(string zipCode, int stateId) 
{ 
    string connstr = System.Configuration.ConfigurationManager.ConnectionStrings["AddressInformation"].ConnectionString; 
    SqlCeConnection connection = new SqlCeConnection(connstr); 
    connection.Open(); 

    SqlCeCommand command = new SqlCeCommand("SELECT COUNT(*) FROM ZipCode WHERE ZipCode = @ZipCode", connection); 

    command.Parameters.AddWithValue("ZipCode", zipCode); 

    int result = (int)command.ExecuteScalar(); 

    // if nothing found, insert 
    if (result == 0) 
    { 
     command = new SqlCeCommand("INSERT INTO ZipCode(ZipCode, StateId) VALUES(@ZipCode, @StateId)", connection); 
     command.Parameters.AddWithValue("ZipCode", zipCode); 
     command.Parameters.AddWithValue("StateId", stateId); 
     command.ExecuteNonQuery(); 

     command = new SqlCeCommand("SELECT @@IDENTITY", connection); 
    } 

    if (result == 1) 
    { 
     command = new SqlCeCommand("SELECT ZipCodeId FROM ZipCode WHERE ZipCode = @ZipCode", connection); 
     command.Parameters.AddWithValue("ZipCode", zipCode); 
    } 

    string test = command.ExecuteScalar().ToString(); 
    result = int.Parse(test); 

    connection.Close(); 
    return result; 
} 

private int InsertCity(string city, int stateId) 
{ 
    string connstr = System.Configuration.ConfigurationManager.ConnectionStrings["AddressInformation"].ConnectionString; 
    SqlCeConnection connection = new SqlCeConnection(connstr); 
    connection.Open(); 

    SqlCeCommand command = new SqlCeCommand("SELECT COUNT(*) FROM City WHERE CityName = @City", connection); 
    command.Parameters.AddWithValue("City", city); 
    int result = (int)command.ExecuteScalar(); 

    // if nothing found, insert 
    if (result == 0) 
    { 
     command = new SqlCeCommand("INSERT INTO City(CityName, StateId) VALUES(@City, @StateId)", connection); 
     command.Parameters.AddWithValue("City", city); 
     command.Parameters.AddWithValue("StateId", stateId); 
     command.ExecuteNonQuery(); 

     command = new SqlCeCommand("SELECT @@IDENTITY", connection); 
    } 

    if (result == 1) 
    { 
     command = new SqlCeCommand("SELECT CityId FROM City WHERE CityName = @City", connection); 
     command.Parameters.AddWithValue("City", city); 
    } 
    string test = command.ExecuteScalar().ToString(); 
    result = int.Parse(test); 

    connection.Close(); 
    return result; 
} 

private int InsertState(string state) 
{ 
    string connstr = System.Configuration.ConfigurationManager.ConnectionStrings["AddressInformation"].ConnectionString; 
    SqlCeConnection connection = new SqlCeConnection(connstr); 
    connection.Open(); 

    SqlCeCommand command = new SqlCeCommand("SELECT COUNT(*) FROM State WHERE State = @State", connection); 
    command.Parameters.AddWithValue("State", state); 
    int result = (int)command.ExecuteScalar(); 

    // if nothing found, insert 
    if (result == 0) 
    { 
     command = new SqlCeCommand("INSERT INTO State(State) VALUES(@State)", connection); 
     command.Parameters.AddWithValue("State", state); 
     command.ExecuteNonQuery(); 

     command = new SqlCeCommand("SELECT @@IDENTITY", connection); 
    } 

    if (result == 1) 
    { 
     command = new SqlCeCommand("SELECT StateId FROM State WHERE State = @State", connection); 
     command.Parameters.AddWithValue("State", state); 
    } 
    string test = command.ExecuteScalar().ToString(); 
    result = int.Parse(test); 

    connection.Close(); 
    return result; 
} 

private void UpdateRelationships(int zipCodeId, int cityId) 
{ 
    string connstr = System.Configuration.ConfigurationManager.ConnectionStrings["AddressInformation"].ConnectionString; 
    SqlCeConnection connection = new SqlCeConnection(connstr); 
    connection.Open(); 

    SqlCeCommand command = new SqlCeCommand("INSERT INTO CityZipCode(CityId, ZipCodeId) VALUES(@CityId, @ZipCodeId)", connection); 

    command.Parameters.AddWithValue("CityId", cityId); 
    command.Parameters.AddWithValue("ZipCodeId", zipCodeId); 
    command.ExecuteNonQuery(); 

    connection.Close(); 
} 

编辑:

只是为了澄清,我不只是简单地插入的信息,每行来自CSV文件。我正在改变数据的布局方式,将每个项目插入单独的表格并添加每个实体之间的关系。

例如,一个城市可以有多个邮政编码,并且邮政编码有时可以覆盖多个城市,以便通过多对多关系来表示。城市和邮政编码只有一个州,所以这种关系是多对一的。

我有一张城市,邮政编码和州的表。我也有一张表格,将城市与邮政编码联系起来。我将需要修改我的关系表模式以使具有相同名称的城市可以存在多个状态。关系表应该包含城市,州和邮政编码,而不仅仅是城市和邮政编码。

我的最终目标是将带有密码保护的SQL Server CE数据库与另一个应用程序一起用于城市,州和邮政编码验证。我不想分发CSV数据库,因为任何人都可以改变它来通过验证。

+0

你可以,但它可能不会让它更快。为了使速度更快,请选择所有状态等,并将其添加到字典以便更快地访问,并记住在更新数据库时更新它们。 – Casperah 2013-04-04 16:11:12

+0

我会试试看。谢谢你的提示。 – 2013-04-04 16:13:02

+0

另请参阅重新使用线程内的连接。通常我会同意使用内置的连接池等,但就你的情况而言,在线程上有很多细粒度的动作被采用(希望)会很快,并且每次连接这些连接的开销都是不必要的 - 特别是因为这是一个正在进行的操作,您不应该像您可能通过网络一样发生间歇性连接失败。 – 2013-04-04 18:14:04

回答

3

您必须创建每个线程的连接对象,它是不是安全的多线程:

SqlCeConnection Class

编辑

SQL CE对象不是线程安全的,是不是线程亲和 要么。如果SqlCeConnection或SqlCeTransaction的实例在线程间共享但不确保线程安全,则可能导致Access Violation异常。

建议每个线程都应该使用单独连接 而不是共享。如果真的需要跨线程共享对象,那么应用程序应将访问 序列化到这些对象。

Multithreaded programming with SQL Server Compact

为什么你不使用SQL Server Compact Toolbox您可以使用它,它基于一个CSV文件的INSERT语句。

或使用Conversion of CSV to SQLCE database应用

+0

我在我的问题中说我会为每个线程创建SqlCeConnection对象。我不会为所有线程使用一个对象。感谢您的参考。 – 2013-04-04 16:12:04

+0

@Cameron Tinker:查看我的编辑 – KF2 2013-04-04 16:21:05

+0

我正在将您的答案标记为正确,因为这是您如何使用SQL Server CE处理多线程方法。谢谢你的帮助。 – 2013-04-05 18:00:10

0

只是一个建议,我做了这样的事情,这是我,这是非常快相比,简单的解决方案

public static DataTable CSVToDataTable(string path, string name) 
{ 
    return CSVToDataTable(Path.Combine(path, name)); 
} 

public static DataTable CSVToDataTable(string path) 
{ 
    DataTable res = new DataTable(); 
    if (!File.Exists(path)) 
    { 
     return res; 
    } 
    using (FileStream stream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite)) 
    { 
     using (StreamReader re = new StreamReader(stream)) 
     { 
      if (re.EndOfStream) 
       return res; 
      string line = re.ReadLine(); 
      if (line.IsNullOrWhiteSpace()) 
       return res; 
      string[] headers = LineToArray(line); 
      foreach (string header in headers) 
      { 
       res.Columns.Add(header); 
      } 
      int i = 0; 
      string[] cells = new string[0]; 
      DataRow row = null; 
      while (!re.EndOfStream) 
      { 
       line = re.ReadLine(); 
       if (line.IsNullOrWhiteSpace()) 
        continue; 
       cells = LineToArray(line); 
       row = res.NewRow(); 
       for (i = 0; i < headers.Length && i < cells.Length; i += 1) 
       { 
        row[i] = cells[i]; 
       } 
       res.Rows.Add(row); 
      } 
     } 
    } 
    return res; 
} 

private static string[] LineToArray(string line, char delimiter = ',') 
{ 
    if (line.Contains("\"")) 
    { 
     List<string> l = new List<string>(); 
     bool inq = false; 
     string cell = string.Empty; 
     char lastCh = 'x'; 
     foreach (char ch in line) 
     { 
      if (ch == '"') 
      { 
       if (cell.Length == 0) 
       { 
        inq = true; 
       } 
       else if (lastCh == '\\') 
       { 
        cell += ch; 
       } 
       else 
       { 
        inq = false; 
       } 
      } 
      else if (delimiter == ch) 
      { 
       if (inq) 
       { 
        cell += ch; 
       } 
       else 
       { 
        l.Add(cell); 
        inq = false; 
        cell = string.Empty; 
       } 
      } 
      else 
      { 
       cell += ch; 
      } 
      if (inq) 
       lastCh = ch; 
      else 
       lastCh = 'x'; 
     } 
     return l.ToArray(); 
    } 
    else 
    { 
     return line.Split(new String[] { delimiter.ToString() }, StringSplitOptions.None); 
    } 
} 

public void insert(string path, string name, string table, bool KeepNulls){ 

    DataTable data = CSVToDataTable(path, name); 
    //do data manipulation here 

    SqlCeBulkCopyOptions options = new SqlCeBulkCopyOptions(); 
    if (KeepNulls) 
    { 
     options = options |= SqlCeBulkCopyOptions.KeepNulls; 
    } 
    using (SqlCeBulkCopy bc = new SqlCeBulkCopy(Fastway_Remote_Agent.Properties.Settings.Default.DatabaseConnectionString, options)) 
    { 
     bc.DestinationTableName = table; 
     bc.WriteToServer(data); 
    } 
} 

使用此库:http://sqlcebulkcopy.codeplex.com/

而且使用线程池(将其更改为满足您的需要):

/// <summary> 
/// Manages open connections on a per-thread basis 
/// </summary> 
public abstract class SqlCeConnectionPool 
{ 
    private static Dictionary<int, DBCon> threadConnectionMap = new Dictionary<int, DBCon>(); 

    private static Dictionary<int, Thread> threadMap = new Dictionary<int, Thread>(); 

    /// <summary> 
    /// The connection map 
    /// </summary> 
    public static Dictionary<int, DBCon> ThreadConnectionMap 
    { 
     get { return SqlCeConnectionPool.threadConnectionMap; } 
    } 

    /// <summary> 
    /// Gets the connection string. 
    /// </summary> 
    /// <value>The connection string.</value> 
    public static ConnectionString ConnectionString 
    { 
     get { return global::ConnectionString.Default; } 
    } 

    /// <summary> 
    /// Gets a connection for this thread, maintains one open one of each. 
    /// </summary> 
    /// <remarks>Don't do this with anything but SQL compact edition or you'll run out of connections - compact edition is not 
    /// connection pooling friendly and unloads itself too often otherwise so that is why this class exists</remarks> 
    /// <returns>An open connection</returns> 
    public static DBCon Connection 
    { 
     get 
     { 
      lock (threadConnectionMap) 
      { 
       //do some quick maintenance on existing connections (closing those that have no thread) 
       List<int> removeItems = new List<int>(); 
       foreach (var kvp in threadConnectionMap) 
       { 
        if (threadMap.ContainsKey(kvp.Key)) 
        { 
         if (!threadMap[kvp.Key].IsAlive) 
         { 
          //close the connection 
          if (!kvp.Value.Disposed) 
           kvp.Value.Dispose(); 
          removeItems.Add(kvp.Key); 
         } 
        } 
        else 
        { 
         if (!kvp.Value.Disposed) 
          kvp.Value.Dispose(); 
         removeItems.Add(kvp.Key); 
        } 
       } 
       foreach (int i in removeItems) 
       { 
        threadMap.Remove(i); 
        threadConnectionMap.Remove(i); 
       } 

       //now issue the appropriate connection for our current thread 
       int threadId = Thread.CurrentThread.ManagedThreadId; 

       DBCon connection = null; 
       if (threadConnectionMap.ContainsKey(threadId)) 
       { 
        connection = threadConnectionMap[threadId]; 
        if (connection.Disposed) 
        { 
         if (threadConnectionMap.ContainsKey(threadId)) 
          threadConnectionMap.Remove(threadId); 
         if (threadMap.ContainsKey(threadId)) 
          threadMap.Remove(threadId); 
         connection = null; 
        } 
        else if (connection.Connection.State == ConnectionState.Broken) 
        { 
         connection.Dispose(); 
         if (threadConnectionMap.ContainsKey(threadId)) 
          threadConnectionMap.Remove(threadId); 
         if (threadMap.ContainsKey(threadId)) 
          threadMap.Remove(threadId); 
         connection = null; 
        } 
        else if (connection.Connection.State == ConnectionState.Closed) 
        { 
         connection.Dispose(); 
         if (threadConnectionMap.ContainsKey(threadId)) 
          threadConnectionMap.Remove(threadId); 
         if (threadMap.ContainsKey(threadId)) 
          threadMap.Remove(threadId); 
         connection = null; 
        } 

       } 
       if (connection == null) 
       { 
        connection = new DBCon(ConnectionString); 
        //connection.Connection.Open(); 
        if (threadConnectionMap.ContainsKey(threadId)) 
         threadConnectionMap[threadId] = connection; 
        else 
         threadConnectionMap.Add(threadId, connection); 
        if (threadMap.ContainsKey(threadId)) 
         threadMap[threadId] = Thread.CurrentThread; 
        else 
         threadMap.Add(threadId, Thread.CurrentThread); 

       } 
       return connection; 
      } 
     } 
    } 
}