您现在的位置是:首页 >技术杂谈 >C# MySQL 插入大批量数据网站首页技术杂谈
C# MySQL 插入大批量数据
一、定义DataTable的数据类型
private DataTable GetDataTableFromExternalSource(string tablename)
{
DataTable dataTable = new DataTable();
dataTable.TableName = tablename;
DataColumn IdColumn = dataTable.Columns.Add("Id", Type.GetType("System.Int64"));
IdColumn.AutoIncrement = true; //设置这一列自增
IdColumn.AllowDBNull = false;
IdColumn.AutoIncrementSeed = 1; //启始值
IdColumn.AutoIncrementStep = 1; //每次增长数
dataTable.Columns.Add("ChannelName", Type.GetType("System.String"));
dataTable.Columns.Add("ChannelDescribe", Type.GetType("System.String"));
dataTable.Columns.Add("MessageTime", Type.GetType("System.DateTime"));
dataTable.Columns.Add("MessageMicrosecond", Type.GetType("System.Int32"));
dataTable.Columns.Add("MicrosecondCount", Type.GetType("System.Int32"));
dataTable.Columns.Add("ChannelValue", Type.GetType("System.String"));
dataTable.Columns.Add("CreateTime", Type.GetType("System.DateTime"));
return dataTable;
}
二、创建mysql 数据库对应的数据库表
private bool CreateDetailDataTable(MySqlConnection conn, string computer_no, string dayStr, string dbName)
{
string tableName = computer_no + "_data_" + dayStr;
if (conn == null)
{
}
else
{
if (conn.State != ConnectionState.Open)
{
conn.Open();
}
}
//SELECT* FROM information_schema.TABLES where table_name = '6_data_20221205' and TABLE_SCHEMA = 'donghuadatadb';
string selectTable = string.Format("SELECT TABLE_NAME FROM information_schema.TABLES where table_name = '{0}' and TABLE_SCHEMA = '{1}'", tableName, dbName);
MySqlCommand command = new MySqlCommand(selectTable, conn);
object objResult = command.ExecuteScalar();
if (objResult == null)
{
command.CommandText = GetCreateTableSql(computer_no, dayStr, tableName);
command.ExecuteNonQuery();
command.Dispose();
return true;
}
else
{
return false;
}
}
mysql数据库表表结构
private string GetCreateTableSql(string computer_no, string dayStr, string tableName)
{
StringBuilder createTableSql = new StringBuilder();
createTableSql.AppendFormat(" Create Table {0}", tableName);
createTableSql.AppendFormat("(");
createTableSql.AppendFormat(" `Id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',");
createTableSql.AppendFormat(" `ChannelName` varchar(255) NOT NULL COMMENT '通道名称',");
createTableSql.AppendFormat(" `ChannelDescribe` varchar(255) DEFAULT NULL COMMENT '测点描述',");
createTableSql.AppendFormat(" `MessageTime` datetime NOT NULL COMMENT '报文时间',");
createTableSql.AppendFormat(" `MessageMicrosecond` int(11) NOT NULL COMMENT '报文微妙',");
createTableSql.AppendFormat(" `MicrosecondCount` int(11) NOT NULL COMMENT '报文微妙计算',");
createTableSql.AppendFormat(" `ChannelValue` varchar(255) NOT NULL DEFAULT '' COMMENT '通道值',");
createTableSql.AppendFormat(" `CreateTime` datetime(6) NOT NULL COMMENT '创建时间',");
createTableSql.AppendFormat(" PRIMARY KEY(`Id`),");
createTableSql.AppendFormat(" UNIQUE KEY `tbrDataIndex` (`ChannelName`,`ChannelDescribe`,`MessageTime`,`MessageMicrosecond`,`CreateTime`)");
createTableSql.AppendFormat(") ENGINE = InnoDB DEFAULT CHARSET = utf8;");
return createTableSql.ToString();
}
三、调用大批量插入数据库的方法
private void InsertAsync(MySqlConnection connection, DataTable dataTable)
{
open the connection
//using var connection = new MySqlConnection("server=localhost;database=donghuadatadb;uid=root;pwd=sa123;SslMode = none;AllowLoadLocalInfile=True");
//connection.Open();
// bulk copy the data
var bulkCopy = new MySqlBulkCopy(connection);
bulkCopy.DestinationTableName = dataTable.TableName;
bulkCopy.ColumnMappings.AddRange(GetMySqlColumnMapping(dataTable));
try
{
MySqlBulkCopyResult result = bulkCopy.WriteToServer(dataTable);
check for problems
//if (result.Warnings.Count != 0)
//{
// /* handle potential data loss warnings */
//}
}
catch (Exception ex)
{
}
connection.Close();
}
private List<MySqlBulkCopyColumnMapping> GetMySqlColumnMapping(DataTable dataTable)
{
List<MySqlBulkCopyColumnMapping> colMappings = new List<MySqlBulkCopyColumnMapping>();
int i = 0;
foreach (DataColumn col in dataTable.Columns)
{
colMappings.Add(new MySqlBulkCopyColumnMapping(i, col.ColumnName));
i++;
}
return colMappings;
}
四、调用大批量插入InsertAsync
MySqlConnection conn = new MySqlConnection(_connstring);
conn.Open();
CreateDetailDataTable(conn, computer_No, dayStr, "donghuadatadb");
MySqlCommand command = new MySqlCommand(GetSamplingFrequencyBycomputerNo(computer_No), conn);
object obj = command.ExecuteScalar();
string cypl = (obj != null ? obj.ToString() : "50");
command.CommandText = GetMdataNo(computer_No, dayStr, startTime, endTime);
MySqlDataReader reader = command.ExecuteReader();
float avgtime = (float.Parse(1.ToString()) / float.Parse(cypl)) * 1000000;
float millisecond = 0;//毫秒
int second = 0;//秒
string valuestr = string.Empty;//所有的通道值
string ChannelNames = string.Empty;//通道名称
string MessageTime_Millisecond = string.Empty;//报文时间微妙
int PackageDataCount = 0;//多少组数据
int MessageMicrosecond = 0;//微妙
while (reader.Read())
{
valuestr = string.Empty;
ChannelNames = string.Empty;
MessageTime_Millisecond = string.Empty;
PackageDataCount = 0;
MessageMicrosecond = 0;
valuestr = reader["ChannelValues"].ToString();
ChannelNames = reader["ChannelNames"].ToString();
DateTime messageTime = DateTime.Parse(reader["MessageTime"].ToString());//报文时间 ,精确到秒
PackageDataCount = int.Parse(reader["PackageDataCount"].ToString());
MessageMicrosecond = int.Parse(reader["MessageMicrosecond"].ToString());
string[] valueArray = valuestr.Split("@");
string[] tongdaonameArray = ChannelNames.Split("|");
float MicrosecondCountTemp = 0;
for (int i = 0; i < PackageDataCount; i++)
{
if (i == 0)
{
MicrosecondCountTemp = MessageMicrosecond;
}
else
{
MicrosecondCountTemp = MicrosecondCountTemp + avgtime;
}
millisecond = 0;
second = 0;
millisecond = (MicrosecondCountTemp / float.Parse(1000.ToString()));
if (millisecond >= 1000)
{
second = (int)(millisecond / 1000);
MessageTime_Millisecond = messageTime.AddSeconds(second).ToString("yyyy-MM-dd HH:mm:ss") + "." + (Math.Round(millisecond) - (second * 1000));
}
else
{
MessageTime_Millisecond = messageTime.ToString("yyyy-MM-dd HH:mm:ss") + "." + Math.Round(millisecond);
}
int n = 0;
for (int j = (i == 0 ? 0 : i * tongdaonameArray.Length); j < (i == 0 ? tongdaonameArray.Length : (((i * tongdaonameArray.Length) + tongdaonameArray.Length))); j++)
{
DataRow newRow = dataTable.NewRow();
newRow["ChannelName"] = tongdaonameArray[n];
newRow["ChannelDescribe"] = "ChannelDescribe";
newRow["MessageTime"] = messageTime;
newRow["MessageMicrosecond"] = MessageMicrosecond;
newRow["MicrosecondCount"] = MicrosecondCountTemp;
newRow["ChannelValue"] = valueArray[j];
newRow["CreateTime"] = DateTime.Now;
dataTable.Rows.Add(newRow);
n++;
}
}
}
reader.Close();