Add TDengine BulkCopy

This commit is contained in:
sunkaixuan 2025-04-22 18:35:38 +08:00
parent d4560619d0
commit 11f9799e21

View File

@ -38,13 +38,14 @@ namespace SqlSugar.TDengine
return dt.Rows.Count; return dt.Rows.Count;
} }
public static void SetTags(ISqlSugarClient db,params string [] tagNames) public static void SetTags(ISqlSugarClient db, Func<string, string> action, params string [] tagNames)
{ {
if (db.TempItems == null) if (db.TempItems == null)
{ {
db.TempItems = new Dictionary<string, object>(); db.TempItems = new Dictionary<string, object>();
} }
db.TempItems.Add(TagKey, tagNames); db.TempItems.Add(TagKey, tagNames);
db.TempItems.Add(TagKey+"action", action);
} }
public async Task BulkInsertToTDengine(TDengineConnection conn, string tableName, DataTable table, bool isTran, string[] tagColumns) public async Task BulkInsertToTDengine(TDengineConnection conn, string tableName, DataTable table, bool isTran, string[] tagColumns)
{ {
@ -55,24 +56,40 @@ namespace SqlSugar.TDengine
if (tagColumns != null && tagColumns.Length > 0) if (tagColumns != null && tagColumns.Length > 0)
{ {
StringBuilder sb = new StringBuilder();
StringBuilder sbTables = new StringBuilder();
// 创建一个列名映射(忽略大小写)
var columnMap = table.Columns.Cast<DataColumn>()
.ToDictionary(c => c.ColumnName, c => c, StringComparer.OrdinalIgnoreCase);
string tags = string.Join(", ", tagColumns.Select(tag => FormatValue(tag))); // 检查所有 tagColumns 是否在 DataTable 中存在
foreach (var item in tagColumns) foreach (var col in tagColumns)
{ {
table.Columns.Remove(item); if (!columnMap.ContainsKey(col))
throw new Exception($"Column '{col}' not found in DataTable.");
} }
// Build the column names and value placeholders
var valuePlaceholdersList = table.Rows.Cast<DataRow>().Select(row => // 用 LINQ 分组
var groups = table.AsEnumerable()
.GroupBy(row => string.Join("||", tagColumns.Select(tc => row[columnMap[tc]].ToString())));
foreach (var group in groups)
{ {
var values = row.ItemArray.Select(item => FormatValue(item)).ToList(); // 构建一个新的子表(结构与原表一致)
return $"({string.Join(", ", values)})"; DataTable childTable = table.Clone();
});
var valuePlaceholders = string.Join(", ", valuePlaceholdersList); // 将分组行复制到子表中
// 排除标签列,确保数据列不包含标签列 foreach (var row in group)
var columnNames = string.Join(", ", table.Columns.Cast<DataColumn>() {
.Select(c => c.ColumnName)); childTable.ImportRow(row);
// 生成插入语句USING 部分包含标签列VALUES 部分只包含数据列 }
insertSql = $"INSERT INTO {tableName} USING {tableName} TAGS({tags}) ({columnNames}) VALUES {valuePlaceholders}";
// 调用 InsertChildTable
InsertChildTable(tableName, childTable, tagColumns,sb, sbTables);
}
await this.Context.Ado.ExecuteCommandAsync(sbTables.ToString());
await this.Context.Ado.ExecuteCommandAsync(sb.ToString());
} }
else else
{ {
@ -87,10 +104,10 @@ namespace SqlSugar.TDengine
// Construct SQL without tags // Construct SQL without tags
var columnNames = string.Join(", ", table.Columns.Cast<DataColumn>().Select(c => c.ColumnName)); var columnNames = string.Join(", ", table.Columns.Cast<DataColumn>().Select(c => c.ColumnName));
insertSql = $"INSERT INTO {tableName} ({columnNames}) VALUES {valuePlaceholders}"; insertSql = $"INSERT INTO {tableName} ({columnNames}) VALUES {valuePlaceholders}";
}
// Execute the command asynchronously // Execute the command asynchronously
await this.Context.Ado.ExecuteCommandAsync(insertSql); await this.Context.Ado.ExecuteCommandAsync(insertSql);
}
} }
catch(Exception ex) catch(Exception ex)
{ {
@ -101,6 +118,42 @@ namespace SqlSugar.TDengine
} }
} }
private StringBuilder InsertChildTable(string tableName, DataTable table, string[] tagColumns,StringBuilder sb, StringBuilder sbtables)
{
var builder = InstanceFactory.GetSqlBuilderWithContext(this.Context);
var columnMap = table.Columns.Cast<DataColumn>()
.ToDictionary(c => c.ColumnName, c => c.ColumnName, StringComparer.OrdinalIgnoreCase);
var firstRow = table.Rows[0];
string tags = string.Join(", ", tagColumns.Select(tag => FormatValue(firstRow[columnMap[tag]])));
string tagsValues = string.Join("_", tagColumns.Select(tag => firstRow[columnMap[tag]].ToString()));
// 移除标签列,只留下数据列
foreach (var item in tagColumns)
{
table.Columns.Remove(item);
}
var columnNames = string.Join(", ", table.Columns.Cast<DataColumn>()
.Select(c => builder.GetTranslationColumnName(c.ColumnName)));
var action = this.Context.TempItems[TagKey + "action"] as Func<string, string>;
var subTableName = builder.GetTranslationColumnName(action(tagsValues));
sbtables.AppendLine($"CREATE TABLE {subTableName} USING {tableName} TAGS({tags});");
var sqlBuilder =sb;
foreach (DataRow row in table.Rows)
{
var values = row.ItemArray.Select(item => FormatValue(item)).ToList();
var valuePart = string.Join(", ", values);
string insertSql = $"INSERT INTO {subTableName} USING {tableName} TAGS({tags}) ({columnNames}) VALUES ({valuePart});";
sqlBuilder.AppendLine(insertSql);
}
return sqlBuilder;
}
public object FormatValue(object value) public object FormatValue(object value)
{ {