Update mongodb

This commit is contained in:
sunkaixuan 2025-07-11 17:35:57 +08:00
parent ce87e40d41
commit 14751c5003
13 changed files with 33 additions and 16 deletions

View File

@ -17,7 +17,7 @@ namespace MongoDb.Ado.data
{ "find", new QueryFindHandler() }, { "find", new QueryFindHandler() },
{ "aggregate", new QueryAggregateHandler() }, { "aggregate", new QueryAggregateHandler() },
}; };
public DbDataReader Handle(string operation, IMongoCollection<BsonDocument> collection, string json) public DbDataReader Handle(string operation, IMongoCollection<BsonDocument> collection, string json, HandlerContext context)
{ {
MongoDbMethodUtils.ValidateOperation(operation); MongoDbMethodUtils.ValidateOperation(operation);
var doc = MongoDB.Bson.Serialization.BsonSerializer.Deserialize<BsonValue>(json); var doc = MongoDB.Bson.Serialization.BsonSerializer.Deserialize<BsonValue>(json);
@ -27,6 +27,7 @@ namespace MongoDb.Ado.data
ExecuteHandlerFactory.Handler(operation, json, collection,new HandlerContext()); ExecuteHandlerFactory.Handler(operation, json, collection,new HandlerContext());
return new DataTable().CreateDataReader(); return new DataTable().CreateDataReader();
} }
handler.Context = context;
return handler.Handler(collection, doc); return handler.Handler(collection, doc);
} }

View File

@ -6,6 +6,8 @@ namespace MongoDb.Ado.data
{ {
public interface IQueryHandler public interface IQueryHandler
{ {
HandlerContext Context { get; set; }
DbDataReader Handler(IMongoCollection<BsonDocument> collection, BsonValue doc); DbDataReader Handler(IMongoCollection<BsonDocument> collection, BsonValue doc);
} }
} }

View File

@ -12,13 +12,16 @@ namespace MongoDb.Ado.data
{ {
public class QueryAggregateHandler : IQueryHandler public class QueryAggregateHandler : IQueryHandler
{ {
public HandlerContext Context { get; set; }
public DbDataReader Handler(IMongoCollection<BsonDocument> collection, BsonValue doc) public DbDataReader Handler(IMongoCollection<BsonDocument> collection, BsonValue doc)
{ {
// 解析 JSON 字符串为 BsonArray // 解析 JSON 字符串为 BsonArray
var pipeline = doc.AsBsonArray; ; var pipeline = doc.AsBsonArray; ;
// 构建聚合管道 // 构建聚合管道
var aggregateFluent = collection.Aggregate<BsonDocument>(pipeline.Select(stage => new BsonDocument(stage.AsBsonDocument)).ToArray()); var aggregateFluent = Context?.IsAnyServerSession == true ?
collection.Aggregate<BsonDocument>(Context.ServerSession,pipeline.Select(stage => new BsonDocument(stage.AsBsonDocument)).ToArray()):
collection.Aggregate<BsonDocument>(pipeline.Select(stage => new BsonDocument(stage.AsBsonDocument)).ToArray());
// 执行聚合查询并返回 DbDataReader // 执行聚合查询并返回 DbDataReader
var cursor = aggregateFluent.ToList(); var cursor = aggregateFluent.ToList();

View File

@ -10,6 +10,7 @@ namespace MongoDb.Ado.data
{ {
public class QueryFindHandler : IQueryHandler public class QueryFindHandler : IQueryHandler
{ {
public HandlerContext Context { get; set; }
public DbDataReader Handler(IMongoCollection<BsonDocument> collection, BsonValue doc) public DbDataReader Handler(IMongoCollection<BsonDocument> collection, BsonValue doc)
{ {
BsonDocument filter; BsonDocument filter;
@ -31,7 +32,7 @@ namespace MongoDb.Ado.data
throw new ArgumentException("Invalid JSON format for MongoDB find operation."); throw new ArgumentException("Invalid JSON format for MongoDB find operation.");
} }
var findFluent = collection.Find(filter); var findFluent =Context?.IsAnyServerSession==true? collection.Find(Context.ServerSession,filter): collection.Find(filter);
if (projection != null) if (projection != null)
findFluent = findFluent.Project<BsonDocument>(projection); findFluent = findFluent.Project<BsonDocument>(projection);

View File

@ -20,7 +20,7 @@ namespace MongoDb.Ado.data
{ "find", new QueryFindHandlerAsync() }, { "find", new QueryFindHandlerAsync() },
{ "aggregate", new QueryAggregateHandlerAsync() }, { "aggregate", new QueryAggregateHandlerAsync() },
}; };
public async Task<DbDataReader> HandleAsync(string operation, IMongoCollection<BsonDocument> collection, string json, CancellationToken cancellationToken) public async Task<DbDataReader> HandleAsync(string operation, IMongoCollection<BsonDocument> collection, string json, CancellationToken cancellationToken, HandlerContext context)
{ {
MongoDbMethodUtils.ValidateOperation(operation); MongoDbMethodUtils.ValidateOperation(operation);
var doc = MongoDB.Bson.Serialization.BsonSerializer.Deserialize<BsonValue>(json); var doc = MongoDB.Bson.Serialization.BsonSerializer.Deserialize<BsonValue>(json);
@ -31,6 +31,7 @@ namespace MongoDb.Ado.data
return new DataTable().CreateDataReader(); return new DataTable().CreateDataReader();
} }
handler.token = cancellationToken; handler.token = cancellationToken;
handler.Context = context;
return await handler.HandlerAsync(collection, doc); return await handler.HandlerAsync(collection, doc);
} }

View File

@ -9,6 +9,7 @@ namespace MongoDb.Ado.data
public interface IQueryHandlerAsync public interface IQueryHandlerAsync
{ {
CancellationToken token { get; set; } CancellationToken token { get; set; }
HandlerContext Context { get; set; }
Task<DbDataReader> HandlerAsync(IMongoCollection<BsonDocument> collection, BsonValue doc); Task<DbDataReader> HandlerAsync(IMongoCollection<BsonDocument> collection, BsonValue doc);
} }

View File

@ -13,6 +13,7 @@ namespace MongoDb.Ado.data
{ {
public class QueryAggregateHandlerAsync : IQueryHandlerAsync public class QueryAggregateHandlerAsync : IQueryHandlerAsync
{ {
public HandlerContext Context { get; set; }
public CancellationToken token { get; set; } public CancellationToken token { get; set; }
public async Task<DbDataReader> HandlerAsync(IMongoCollection<BsonDocument> collection, BsonValue doc) public async Task<DbDataReader> HandlerAsync(IMongoCollection<BsonDocument> collection, BsonValue doc)
{ {
@ -20,7 +21,9 @@ namespace MongoDb.Ado.data
var pipeline = doc.AsBsonArray; ; var pipeline = doc.AsBsonArray; ;
// 构建聚合管道 // 构建聚合管道
var aggregateFluent = collection.Aggregate<BsonDocument>(pipeline.Select(stage => new BsonDocument(stage.AsBsonDocument)).ToArray()); var aggregateFluent = Context?.IsAnyServerSession == true?
collection.Aggregate<BsonDocument>(Context.ServerSession,pipeline.Select(stage => new BsonDocument(stage.AsBsonDocument)).ToArray()):
collection.Aggregate<BsonDocument>(pipeline.Select(stage => new BsonDocument(stage.AsBsonDocument)).ToArray());
// 执行聚合查询并返回 DbDataReader // 执行聚合查询并返回 DbDataReader
var cursor =await aggregateFluent.ToListAsync(token); var cursor =await aggregateFluent.ToListAsync(token);

View File

@ -12,6 +12,7 @@ namespace MongoDb.Ado.data
{ {
public class QueryFindHandlerAsync : IQueryHandlerAsync public class QueryFindHandlerAsync : IQueryHandlerAsync
{ {
public HandlerContext Context { get; set; }
public CancellationToken token { get; set; } public CancellationToken token { get; set; }
public async Task<DbDataReader> HandlerAsync(IMongoCollection<BsonDocument> collection, BsonValue doc) public async Task<DbDataReader> HandlerAsync(IMongoCollection<BsonDocument> collection, BsonValue doc)
{ {
@ -34,7 +35,7 @@ namespace MongoDb.Ado.data
throw new ArgumentException("Invalid JSON format for MongoDB find operation."); throw new ArgumentException("Invalid JSON format for MongoDB find operation.");
} }
var findFluent = collection.Find(filter); var findFluent =Context?.IsAnyServerSession==true? collection.Find(Context.ServerSession,filter) : collection.Find(filter);
if (projection != null) if (projection != null)
findFluent = findFluent.Project<BsonDocument>(projection); findFluent = findFluent.Project<BsonDocument>(projection);

View File

@ -12,7 +12,7 @@ namespace MongoDb.Ado.data
public string operation { get; set; } public string operation { get; set; }
public int Handle(IMongoCollection<BsonDocument> collection, string json) public int Handle(IMongoCollection<BsonDocument> collection, string json)
{ {
using (var dr = new DbDataReaderFactory().Handle(operation, collection, json)) using (var dr = new DbDataReaderFactory().Handle(operation, collection, json,null))
{ {
if (dr.Read()) if (dr.Read())
{ {

View File

@ -16,7 +16,7 @@ namespace MongoDb.Ado.data
public async Task<int> HandleAsync(IMongoCollection<BsonDocument> collection, string json) public async Task<int> HandleAsync(IMongoCollection<BsonDocument> collection, string json)
{ {
using (var dr = await new DbDataReaderFactoryAsync().HandleAsync(operation, collection, json,token)) using (var dr = await new DbDataReaderFactoryAsync().HandleAsync(operation, collection, json,token,context))
{ {
if (dr.Read()) if (dr.Read())
{ {

View File

@ -9,9 +9,9 @@ namespace MongoDb.Ado.data
{ {
public class ExecuteScalarHandler public class ExecuteScalarHandler
{ {
public object Handle(string operation, IMongoCollection<BsonDocument> collection, string json) public object Handle(string operation, IMongoCollection<BsonDocument> collection, string json, HandlerContext context)
{ {
using (var dbReader = new DbDataReaderFactory().Handle(operation, collection, json)) using (var dbReader = new DbDataReaderFactory().Handle(operation, collection, json,context))
{ {
if (dbReader.Read()) if (dbReader.Read())
{ {

View File

@ -11,9 +11,9 @@ namespace MongoDb.Ado.data
{ {
public class ExecuteScalarHandlerAsync public class ExecuteScalarHandlerAsync
{ {
public async Task<object> HandleAsync(string operation, IMongoCollection<BsonDocument> collection, string json,CancellationToken cancellationToken) public async Task<object> HandleAsync(string operation, IMongoCollection<BsonDocument> collection, string json,CancellationToken cancellationToken, HandlerContext context)
{ {
using (var dbReader = await new DbDataReaderFactoryAsync().HandleAsync(operation, collection, json,cancellationToken)) using (var dbReader = await new DbDataReaderFactoryAsync().HandleAsync(operation, collection, json,cancellationToken,context))
{ {
if (dbReader.Read()) if (dbReader.Read())
{ {

View File

@ -70,13 +70,15 @@ namespace MongoDb.Ado.data
{ {
var (operation, collectionName, json) = ParseCommand(_commandText); var (operation, collectionName, json) = ParseCommand(_commandText);
var collection = GetCollection(collectionName); var collection = GetCollection(collectionName);
return new ExecuteScalarHandler().Handle(operation,collection, json); var context = new HandlerContext() { Connection = this.Connection };
return new ExecuteScalarHandler().Handle(operation,collection, json,context);
} }
protected override DbDataReader ExecuteDbDataReader(CommandBehavior behavior) protected override DbDataReader ExecuteDbDataReader(CommandBehavior behavior)
{ {
var (operation, collectionName, json) = ParseCommand(_commandText); var (operation, collectionName, json) = ParseCommand(_commandText);
var collection = GetCollection(collectionName); var collection = GetCollection(collectionName);
return new DbDataReaderFactory().Handle(operation, collection, json); var context = new HandlerContext() { Connection = this.Connection };
return new DbDataReaderFactory().Handle(operation, collection, json,context);
} }
public async override Task<int> ExecuteNonQueryAsync(CancellationToken cancellationToken) public async override Task<int> ExecuteNonQueryAsync(CancellationToken cancellationToken)
@ -92,13 +94,15 @@ namespace MongoDb.Ado.data
{ {
var (operation, collectionName, json) = ParseCommand(_commandText); var (operation, collectionName, json) = ParseCommand(_commandText);
var collection = GetCollection(collectionName); var collection = GetCollection(collectionName);
return new ExecuteScalarHandlerAsync().HandleAsync(operation, collection, json, cancellationToken); var context = new HandlerContext() { Connection = this.Connection };
return new ExecuteScalarHandlerAsync().HandleAsync(operation, collection, json, cancellationToken,context);
} }
protected override Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBehavior behavior,CancellationToken cancellationToken) protected override Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBehavior behavior,CancellationToken cancellationToken)
{ {
var (operation, collectionName, json) = ParseCommand(_commandText); var (operation, collectionName, json) = ParseCommand(_commandText);
var collection = GetCollection(collectionName); var collection = GetCollection(collectionName);
return new DbDataReaderFactoryAsync().HandleAsync(operation, collection, json, cancellationToken); var context = new HandlerContext() { Connection = this.Connection };
return new DbDataReaderFactoryAsync().HandleAsync(operation, collection, json, cancellationToken,context);
} }