diff --git a/Src/Asp.NetCore2/SqlSugar.TDengineCore/TDengine/SqlBuilder/TDengineFastBuilder.cs b/Src/Asp.NetCore2/SqlSugar.TDengineCore/TDengine/SqlBuilder/TDengineFastBuilder.cs index 9260b11bd..a867c1f69 100644 --- a/Src/Asp.NetCore2/SqlSugar.TDengineCore/TDengine/SqlBuilder/TDengineFastBuilder.cs +++ b/Src/Asp.NetCore2/SqlSugar.TDengineCore/TDengine/SqlBuilder/TDengineFastBuilder.cs @@ -3,7 +3,9 @@ using SqlSugar.TDengineAdo; using System; using System.Collections.Generic; using System.Data; +using System.Data.Common; using System.Linq; +using System.Net.Http.Headers; using System.Text; using System.Threading.Tasks; @@ -11,44 +13,182 @@ namespace SqlSugar.TDengine { public class TDengineFastBuilder : FastBuilder, IFastBuilder { - - private EntityInfo entityInfo; - - public TDengineFastBuilder(EntityInfo entityInfo) + public const string TagKey = "TDengineFastBuilderTagNames"; + public static void SetTags(ISqlSugarClient db,params string [] tagNames) { - this.entityInfo = entityInfo; + if (db.TempItems == null) + { + db.TempItems = new Dictionary(); + } + db.TempItems.Add(TagKey, tagNames); } - - public override string UpdateSql { get; set; } = @"UPDATE {1} SET {0} FROM {2} AS TE WHERE {3} -"; - - //public virtual async Task UpdateByTempAsync(string tableName, string tempName, string[] updateColumns, string[] whereColumns) - //{ - // Check.ArgumentNullException(!updateColumns.Any(), "update columns count is 0"); - // Check.ArgumentNullException(!whereColumns.Any(), "where columns count is 0"); - // var sets = string.Join(",", updateColumns.Select(it => $"TM.{it}=TE.{it}")); - // var wheres = string.Join(",", whereColumns.Select(it => $"TM.{it}=TE.{it}")); - // string sql = string.Format(UpdateSql, sets, tableName, tempName, wheres); - // return await this.Context.Ado.ExecuteCommandAsync(sql); - //} public async Task ExecuteBulkCopyAsync(DataTable dt) { - return 0; + var db = this.Context; + if (db.TempItems == null) + { + db.TempItems = new Dictionary(); + } + string[] tagNames = new string[] { }; + if (db.TempItems.ContainsKey("TDengineFastBuilderTagNames")) + { + tagNames = (string[])db.TempItems[TagKey]; + } + db.TempItems.Add("TDengineFastBuilderTagNames", tagNames); + await BulkInsertToTDengine((TDengineConnection)this.Context.Ado.Connection,dt.TableName,dt, this.Context.Ado.IsNoTran(), tagNames); + return dt.Rows.Count; + } + public async Task BulkInsertToTDengine(TDengineConnection conn, string tableName, DataTable table, bool isTran, string[] tagColumns) + { + DbTransaction? transaction = null; + var isAutoClose = this.Context.CurrentConnectionConfig.IsAutoCloseConnection; + if (isAutoClose) + { + this.Context.CurrentConnectionConfig.IsAutoCloseConnection = false; + } + + try + { + if (conn.State != ConnectionState.Open) + { + await conn.OpenAsync(); + } + + if (isTran) + { + transaction = await conn.BeginTransactionAsync(); + } + + // 自动建表 + var createTableSql = BuildCreateTableSql(tableName, table, tagColumns); + await ExecuteSqlAsync(conn, createTableSql); + + // 构造 INSERT 语句 + var columnNames = string.Join(", ", table.Columns.Cast().Select(c => c.ColumnName)); + var valuePlaceholders = string.Join(", ", table.Rows.Cast().Select(row => + $"({string.Join(", ", row.ItemArray.Select(item => FormatValue(item)))})" + )); + + var insertSql = $"INSERT INTO {tableName} ({columnNames}) TAGS ({string.Join(", ", tagColumns)}) VALUES {valuePlaceholders}"; + + using var cmd = conn.CreateCommand(); + cmd.CommandText = insertSql; + + // 执行批量插入 + await cmd.ExecuteNonQueryAsync(); + + if (isTran) + { + await transaction!.CommitAsync(); + } + } + catch + { + if (isTran && transaction != null) + { + await transaction.RollbackAsync(); + } + + throw; + } + finally + { + if (isTran && transaction != null) + { + transaction?.Dispose(); + } + this.Context.CurrentConnectionConfig.IsAutoCloseConnection = isAutoClose; + } } - private void BulkCopy(DataTable dt, string copyString, TDengineConnection conn, List columns) + private string BuildCreateTableSql(string tableName, DataTable table, string[] tagColumns) { - throw new NotSupportedException(); + // 自动建表语句 + var columnDefinitions = table.Columns.Cast() + .Select(c => $"{c.ColumnName} {GetTDengineColumnType(c.DataType)}"); + + var tagDefinitions = tagColumns.Select(tag => $"{tag} STRING"); + + var createTableSql = $@" + CREATE TABLE IF NOT EXISTS {tableName} ( + {string.Join(", ", columnDefinitions)}, + {string.Join(", ", tagDefinitions)}, + ts TIMESTAMP + ) TAGS ({string.Join(", ", tagColumns)}); + "; + + return createTableSql; } - - public override async Task UpdateByTempAsync(string tableName, string tempName, string[] updateColumns, string[] whereColumns) + private string GetTDengineColumnType(Type type) { - throw new NotSupportedException(); + if (type == typeof(int) || type == typeof(long) || type == typeof(float) || type == typeof(double)) + { + return "FLOAT"; + } + else if (type == typeof(DateTime)) + { + return "TIMESTAMP"; + } + else if (type == typeof(bool)) + { + return "BOOLEAN"; + } + else if (type == typeof(string)) + { + return "STRING"; + } + else + { + return "STRING"; // 默认类型为 STRING + } } - public override async Task CreateTempAsync(DataTable dt) + + public object FormatValue(object value) { - throw new NotSupportedException(); - } + if (value == null || value == DBNull.Value) + { + return "NULL"; + } + else + { + var type = value.GetType(); + if (type == typeof(DateTime)) + { + return $"'{Convert.ToDateTime(value).ToString("yyyy-MM-dd HH:mm:ss.ms")}'"; + } + else if (type == typeof(byte[])) + { + return $"0x{BitConverter.ToString((byte[])value).Replace("-", "")}"; + } + else if (type.IsEnum) + { + return Convert.ToInt64(value); + } + else if (type == typeof(bool)) + { + return value.ObjToBool() ? "1" : "0"; + } + else if (type == typeof(string)) + { + return $"'{value.ToString().ToSqlFilter()}'"; + } + else if (value is decimal) + { + return value.ToString(); + } + else + { + return $"'{value.ToString()}'"; + } + } + } + + private async Task ExecuteSqlAsync(TDengineConnection conn, string sql) + { + using var cmd = conn.CreateCommand(); + cmd.CommandText = sql; + await cmd.ExecuteNonQueryAsync(); + } } }