Files
SqlSugar/Src/Asp.NetCore2/MongoDb.Ado.data/ExecuteNonQueryItems/BulkWriteHandler.cs
2025-08-09 10:45:45 +08:00

102 lines
3.6 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using MongoDB.Bson.Serialization;
using MongoDB.Bson;
using MongoDB.Driver;
using System;
using System.Collections.Generic;
using System.Text;
namespace MongoDb.Ado.data
{
public class BulkWriteHandler : IMongoOperationHandler
{
public HandlerContext context { get; set; }
public string operation { get; set; }
public int Handle(IMongoCollection<BsonDocument> collection, string json)
{
var documents = ParseJsonArray(json);
var bulkOps = new List<WriteModel<BsonDocument>>();
// 新增逻辑如果只有一个文档且update只包含$set
if (documents.Count == 1)
{
var doc = documents[0];
var filter = doc["filter"].AsBsonDocument;
var update = doc["update"].AsBsonDocument;
if (IsUpateBySql(update))
{
return HandlePipelineUpdate(collection, filter, update);
}
}
foreach (var doc in documents)
{
var filter = doc["filter"].AsBsonDocument;
var update = doc["update"].AsBsonDocument;
var op = new UpdateManyModel<BsonDocument>(filter, update);
bulkOps.Add(op);
}
if (bulkOps.Count == 0) return 0;
if (context.IsAnyServerSession)
{
var result = collection.BulkWrite(context.ServerSession, bulkOps);
return (int)result.ModifiedCount;
}
else
{
var result = collection.BulkWrite(bulkOps);
return (int)result.ModifiedCount;
}
}
private int HandlePipelineUpdate(IMongoCollection<BsonDocument> collection, BsonDocument filter, BsonDocument update)
{
// 构造pipeline update
// 构造pipeline update不写死循环现有的$set值
var setDoc = update["$set"].AsBsonDocument;
var setPipelineDoc = new BsonDocument();
foreach (var element in setDoc.Elements)
{
// 检查值是否为BsonDocument且包含操作符如$add否则直接赋值
if (element.Value.IsBsonDocument && element.Value.AsBsonDocument.GetElement(0).Name.StartsWith("$"))
{
setPipelineDoc[element.Name] = element.Value;
}
else
{
setPipelineDoc[element.Name] = element.Value;
}
}
var updatePipeline = new[]
{
new BsonDocument("$set", setPipelineDoc)
};
var pipelineUpdate = new PipelineUpdateDefinition<BsonDocument>(updatePipeline);
if (context.IsAnyServerSession)
{
var result = collection.UpdateMany(context.ServerSession,filter, pipelineUpdate);
return (int)result.ModifiedCount;
}
else
{
var result = collection.UpdateMany(filter, pipelineUpdate);
return (int)result.ModifiedCount;
}
}
private static bool IsUpateBySql(BsonDocument update)
{
return update.ElementCount == 1 && update.Contains("$set");
}
private List<BsonDocument> ParseJsonArray(string json)
{
if (json.TrimStart().StartsWith("["))
return BsonSerializer.Deserialize<List<BsonDocument>>(json);
return new List<BsonDocument> { BsonDocument.Parse(json) };
}
}
}