mirror of
https://gitee.com/dotnetchina/SqlSugar.git
synced 2026-01-28 11:11:32 +08:00
TDenine Support BulkCopy
This commit is contained in:
@@ -4,6 +4,7 @@ using System;
|
|||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Data;
|
using System.Data;
|
||||||
using System.Data.Common;
|
using System.Data.Common;
|
||||||
|
using System.Globalization;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Net.Http.Headers;
|
using System.Net.Http.Headers;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
@@ -14,6 +15,29 @@ namespace SqlSugar.TDengine
|
|||||||
public class TDengineFastBuilder : FastBuilder, IFastBuilder
|
public class TDengineFastBuilder : FastBuilder, IFastBuilder
|
||||||
{
|
{
|
||||||
public const string TagKey = "TDengineFastBuilderTagNames";
|
public const string TagKey = "TDengineFastBuilderTagNames";
|
||||||
|
|
||||||
|
|
||||||
|
public async Task<int> ExecuteBulkCopyAsync(DataTable dt)
|
||||||
|
{
|
||||||
|
// 移除自增列
|
||||||
|
var identities = this.FastEntityInfo.Columns.Where(it => it.IsIdentity).Select(it => it.DbColumnName).ToList();
|
||||||
|
foreach (var identity in identities)
|
||||||
|
{
|
||||||
|
if (dt.Columns.Contains(identity))
|
||||||
|
{
|
||||||
|
dt.Columns.Remove(identity);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
var db = this.Context;
|
||||||
|
string[] tagNames = null;
|
||||||
|
if (db.TempItems!=null&&db.TempItems.ContainsKey(TagKey))
|
||||||
|
{
|
||||||
|
tagNames = db.TempItems[TagKey]as string[];
|
||||||
|
}
|
||||||
|
await BulkInsertToTDengine((TDengineConnection)this.Context.Ado.Connection, dt.TableName, dt, this.Context.Ado.IsNoTran(), tagNames);
|
||||||
|
return dt.Rows.Count;
|
||||||
|
}
|
||||||
|
|
||||||
public static void SetTags(ISqlSugarClient db,params string [] tagNames)
|
public static void SetTags(ISqlSugarClient db,params string [] tagNames)
|
||||||
{
|
{
|
||||||
if (db.TempItems == null)
|
if (db.TempItems == null)
|
||||||
@@ -22,127 +46,57 @@ namespace SqlSugar.TDengine
|
|||||||
}
|
}
|
||||||
db.TempItems.Add(TagKey, tagNames);
|
db.TempItems.Add(TagKey, tagNames);
|
||||||
}
|
}
|
||||||
public async Task<int> ExecuteBulkCopyAsync(DataTable dt)
|
|
||||||
{
|
|
||||||
var db = this.Context;
|
|
||||||
if (db.TempItems == null)
|
|
||||||
{
|
|
||||||
db.TempItems = new Dictionary<string, object>();
|
|
||||||
}
|
|
||||||
string[] tagNames = new string[] { };
|
|
||||||
if (db.TempItems.ContainsKey("TDengineFastBuilderTagNames"))
|
|
||||||
{
|
|
||||||
tagNames = (string[])db.TempItems[TagKey];
|
|
||||||
}
|
|
||||||
db.TempItems.Add("TDengineFastBuilderTagNames", tagNames);
|
|
||||||
await BulkInsertToTDengine((TDengineConnection)this.Context.Ado.Connection,dt.TableName,dt, this.Context.Ado.IsNoTran(), tagNames);
|
|
||||||
return dt.Rows.Count;
|
|
||||||
}
|
|
||||||
public async Task BulkInsertToTDengine(TDengineConnection conn, string tableName, DataTable table, bool isTran, string[] tagColumns)
|
public async Task BulkInsertToTDengine(TDengineConnection conn, string tableName, DataTable table, bool isTran, string[] tagColumns)
|
||||||
{
|
{
|
||||||
DbTransaction? transaction = null;
|
|
||||||
var isAutoClose = this.Context.CurrentConnectionConfig.IsAutoCloseConnection;
|
|
||||||
if (isAutoClose)
|
|
||||||
{
|
|
||||||
this.Context.CurrentConnectionConfig.IsAutoCloseConnection = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (conn.State != ConnectionState.Open)
|
// Build the column names and value placeholders
|
||||||
|
var valuePlaceholdersList = table.Rows.Cast<DataRow>().Select(row =>
|
||||||
{
|
{
|
||||||
await conn.OpenAsync();
|
var values = row.ItemArray.Select(item => FormatValue(item)).ToList();
|
||||||
|
|
||||||
|
// If there are tags, move them to the beginning of the values list and adjust insert syntax
|
||||||
|
if (tagColumns != null && tagColumns.Length > 0)
|
||||||
|
{
|
||||||
|
foreach (var tag in tagColumns)
|
||||||
|
{
|
||||||
|
int index = table.Columns.IndexOf(tag);
|
||||||
|
values.Insert(0, FormatValue(row[index]));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return $"({string.Join(", ", values)})";
|
||||||
|
}).ToList();
|
||||||
|
|
||||||
|
var valuePlaceholders = string.Join(", ", valuePlaceholdersList);
|
||||||
|
|
||||||
|
// Check if tagColumns is provided and adjust the SQL statement accordingly
|
||||||
|
string insertSql;
|
||||||
|
if (tagColumns != null && tagColumns.Length > 0)
|
||||||
|
{
|
||||||
|
// Construct SQL with tags included
|
||||||
|
var columnNames = string.Join(", ", tagColumns.Concat(table.Columns.Cast<DataColumn>().Where(c => !tagColumns.Contains(c.ColumnName)).Select(c => c.ColumnName)));
|
||||||
|
insertSql = $"INSERT INTO {tableName} ({columnNames}) VALUES {valuePlaceholders}";
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Construct SQL without tags
|
||||||
|
var columnNames = string.Join(", ", table.Columns.Cast<DataColumn>().Select(c => c.ColumnName));
|
||||||
|
insertSql = $"INSERT INTO {tableName} ({columnNames}) VALUES {valuePlaceholders}";
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isTran)
|
// Execute the command asynchronously
|
||||||
{
|
await this.Context.Ado.ExecuteCommandAsync(insertSql);
|
||||||
transaction = await conn.BeginTransactionAsync();
|
|
||||||
}
|
|
||||||
|
|
||||||
// 自动建表
|
|
||||||
var createTableSql = BuildCreateTableSql(tableName, table, tagColumns);
|
|
||||||
await ExecuteSqlAsync(conn, createTableSql);
|
|
||||||
|
|
||||||
// 构造 INSERT 语句
|
|
||||||
var columnNames = string.Join(", ", table.Columns.Cast<DataColumn>().Select(c => c.ColumnName));
|
|
||||||
var valuePlaceholders = string.Join(", ", table.Rows.Cast<DataRow>().Select(row =>
|
|
||||||
$"({string.Join(", ", row.ItemArray.Select(item => FormatValue(item)))})"
|
|
||||||
));
|
|
||||||
|
|
||||||
var insertSql = $"INSERT INTO {tableName} ({columnNames}) TAGS ({string.Join(", ", tagColumns)}) VALUES {valuePlaceholders}";
|
|
||||||
|
|
||||||
using var cmd = conn.CreateCommand();
|
|
||||||
cmd.CommandText = insertSql;
|
|
||||||
|
|
||||||
// 执行批量插入
|
|
||||||
await cmd.ExecuteNonQueryAsync();
|
|
||||||
|
|
||||||
if (isTran)
|
|
||||||
{
|
|
||||||
await transaction!.CommitAsync();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
catch
|
catch
|
||||||
{
|
{
|
||||||
if (isTran && transaction != null)
|
|
||||||
{
|
|
||||||
await transaction.RollbackAsync();
|
|
||||||
}
|
|
||||||
|
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
if (isTran && transaction != null)
|
|
||||||
{
|
|
||||||
transaction?.Dispose();
|
|
||||||
}
|
|
||||||
this.Context.CurrentConnectionConfig.IsAutoCloseConnection = isAutoClose;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private string BuildCreateTableSql(string tableName, DataTable table, string[] tagColumns)
|
|
||||||
{
|
|
||||||
// 自动建表语句
|
|
||||||
var columnDefinitions = table.Columns.Cast<DataColumn>()
|
|
||||||
.Select(c => $"{c.ColumnName} {GetTDengineColumnType(c.DataType)}");
|
|
||||||
|
|
||||||
var tagDefinitions = tagColumns.Select(tag => $"{tag} STRING");
|
|
||||||
|
|
||||||
var createTableSql = $@"
|
|
||||||
CREATE TABLE IF NOT EXISTS {tableName} (
|
|
||||||
{string.Join(", ", columnDefinitions)},
|
|
||||||
{string.Join(", ", tagDefinitions)},
|
|
||||||
ts TIMESTAMP
|
|
||||||
) TAGS ({string.Join(", ", tagColumns)});
|
|
||||||
";
|
|
||||||
|
|
||||||
return createTableSql;
|
|
||||||
}
|
|
||||||
|
|
||||||
private string GetTDengineColumnType(Type type)
|
|
||||||
{
|
|
||||||
if (type == typeof(int) || type == typeof(long) || type == typeof(float) || type == typeof(double))
|
|
||||||
{
|
|
||||||
return "FLOAT";
|
|
||||||
}
|
|
||||||
else if (type == typeof(DateTime))
|
|
||||||
{
|
|
||||||
return "TIMESTAMP";
|
|
||||||
}
|
|
||||||
else if (type == typeof(bool))
|
|
||||||
{
|
|
||||||
return "BOOLEAN";
|
|
||||||
}
|
|
||||||
else if (type == typeof(string))
|
|
||||||
{
|
|
||||||
return "STRING";
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
return "STRING"; // 默认类型为 STRING
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public object FormatValue(object value)
|
public object FormatValue(object value)
|
||||||
{
|
{
|
||||||
@@ -153,42 +107,44 @@ namespace SqlSugar.TDengine
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
var type = value.GetType();
|
var type = value.GetType();
|
||||||
if (type == typeof(DateTime))
|
if (type == UtilConstants.DateType)
|
||||||
{
|
{
|
||||||
return $"'{Convert.ToDateTime(value).ToString("yyyy-MM-dd HH:mm:ss.ms")}'";
|
return Convert.ToDateTime(value).ToString("yyyy-MM-dd HH:mm:ss.ms").ToSqlValue();
|
||||||
}
|
}
|
||||||
else if (type == typeof(byte[]))
|
else if (type == UtilConstants.ByteArrayType)
|
||||||
{
|
{
|
||||||
return $"0x{BitConverter.ToString((byte[])value).Replace("-", "")}";
|
string bytesString = "0x" + BitConverter.ToString((byte[])value);
|
||||||
|
return bytesString;
|
||||||
}
|
}
|
||||||
else if (type.IsEnum)
|
else if (type.IsEnum())
|
||||||
{
|
{
|
||||||
return Convert.ToInt64(value);
|
if (this.Context.CurrentConnectionConfig.MoreSettings?.TableEnumIsString == true)
|
||||||
|
{
|
||||||
|
return value.ToSqlValue();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return Convert.ToInt64(value);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else if (type == typeof(bool))
|
else if (type == UtilConstants.BoolType)
|
||||||
{
|
{
|
||||||
return value.ObjToBool() ? "1" : "0";
|
return value.ObjToBool() ? "1" : "0";
|
||||||
}
|
}
|
||||||
else if (type == typeof(string))
|
else if (type == UtilConstants.StringType || type == UtilConstants.ObjType)
|
||||||
{
|
{
|
||||||
return $"'{value.ToString().ToSqlFilter()}'";
|
return "'" + value.ToString().ToSqlFilter() + "'";
|
||||||
}
|
}
|
||||||
else if (value is decimal)
|
else if (value is decimal v)
|
||||||
{
|
{
|
||||||
return value.ToString();
|
return v.ToString(CultureInfo.InvariantCulture);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
return $"'{value.ToString()}'";
|
return "'" + value.ToString() + "'";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task ExecuteSqlAsync(TDengineConnection conn, string sql)
|
|
||||||
{
|
|
||||||
using var cmd = conn.CreateCommand();
|
|
||||||
cmd.CommandText = sql;
|
|
||||||
await cmd.ExecuteNonQueryAsync();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user