Update mongodb

This commit is contained in:
sunkaixuan 2025-05-02 11:39:41 +08:00
parent 5ec98af257
commit 8b1710b09e
9 changed files with 286 additions and 15 deletions

View File

@ -0,0 +1,33 @@
using MongoDB.Bson;
using MongoDB.Bson.Serialization.Serializers;
using MongoDB.Driver;
using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Text;
namespace MongoDb.Ado.data
{
public class DbDataReaderFactory
{
public DbDataReader Handle(string operation, IMongoCollection<BsonDocument> collection, string json)
{
var doc = MongoDB.Bson.Serialization.BsonSerializer.Deserialize<BsonValue>(json);
IQueryHandler queryHandler = null;
if (operation == "find")
{
queryHandler = new QueryFindHandler();
}
else if (operation == "aggregate")
{
queryHandler = new QueryAggregateHandler();
}
else
{
throw new NotSupportedException($" NotSupportedException: {operation} ");
}
return queryHandler.Find(collection, doc);
}
}
}

View File

@ -0,0 +1,11 @@
using MongoDB.Bson;
using MongoDB.Driver;
using System.Data.Common;
namespace MongoDb.Ado.data
{
public interface IQueryHandler
{
DbDataReader Find(IMongoCollection<BsonDocument> collection, BsonValue doc);
}
}

View File

@ -0,0 +1,27 @@
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Driver;
using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Linq;
using System.Text;
namespace MongoDb.Ado.data
{
public class QueryAggregateHandler : IQueryHandler
{
public DbDataReader Find(IMongoCollection<BsonDocument> collection, BsonValue doc)
{
// 解析 JSON 字符串为 BsonArray
var pipeline = doc.AsBsonArray; ;
// 构建聚合管道
var aggregateFluent = collection.Aggregate<BsonDocument>(pipeline.Select(stage => new BsonDocument(stage.AsBsonDocument)).ToArray());
// 执行聚合查询并返回 DbDataReader
var cursor = aggregateFluent.ToList();
return new MongoDbBsonDocumentDataReader(cursor);
}
}
}

View File

@ -0,0 +1,43 @@
using MongoDB.Bson;
using MongoDB.Driver;
using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Text;
namespace MongoDb.Ado.data
{
public class QueryFindHandler : IQueryHandler
{
public DbDataReader Find(IMongoCollection<BsonDocument> collection, BsonValue doc)
{
BsonDocument filter;
BsonDocument projection = null;
if (doc.IsBsonArray)
{
var array = doc.AsBsonArray;
filter = array.Count > 0 ? array[0].AsBsonDocument : new BsonDocument();
if (array.Count > 1)
projection = array[1].AsBsonDocument;
}
else if (doc.IsBsonDocument)
{
filter = doc.AsBsonDocument;
}
else
{
throw new ArgumentException("Invalid JSON format for MongoDB find operation.");
}
var findFluent = collection.Find(filter);
if (projection != null)
findFluent = findFluent.Project<BsonDocument>(projection);
var cursor = findFluent.ToCursor(); // 已包含 filter + projection 的结果
return new MongoDbIAsyncCursorDataReader(cursor); // 你要确保这个类支持逐行读取 BsonDocument
}
}
}

View File

@ -15,7 +15,7 @@ namespace MongoDb.Ado.data
{ "updatemany", new UpdateManyHandler() },
{ "delete", new DeleteHandler() },
{ "deletemany", new DeleteManyHandler() },
{ "find", new FindHandler() }
{ "find", new NonFindHandler() }
};
}
}

View File

@ -6,7 +6,7 @@ using System.Text;
namespace MongoDb.Ado.data
{
public class FindHandler : IMongoOperationHandler
public class NonFindHandler : IMongoOperationHandler
{
public int Handle(IMongoCollection<BsonDocument> collection, string json)
{

View File

@ -6,6 +6,7 @@ using MongoDB.Driver;
using System.Linq;
using MongoDB.Bson.Serialization;
using System.Collections.Generic;
using System.Xml.Linq;
namespace MongoDb.Ado.data
{
@ -98,16 +99,7 @@ namespace MongoDb.Ado.data
{
var (operation, collectionName, json) = ParseCommand(_commandText);
var collection = GetCollection(collectionName);
if (operation == "find")
{
var filter = string.IsNullOrWhiteSpace(json) ? FilterDefinition<BsonDocument>.Empty : BsonDocument.Parse(json);
//var filter = new BsonDocument { { "age", new BsonDocument { { "$gt", 25 } } } };
var cursor = collection.Find(filter).ToCursor();
return new MongoDbDataReader(cursor);
}
throw new NotSupportedException("只支持 find 操作。");
return new DbDataReaderFactory().Handle(operation, collection, json);
}
public override void Prepare() { }

View File

@ -9,13 +9,13 @@ using System.Collections;
namespace MongoDb.Ado.data
{
public class MongoDbDataReader : DbDataReader
public class MongoDbIAsyncCursorDataReader : DbDataReader
{
private readonly IAsyncCursor<BsonDocument> _cursor;
private IEnumerator<BsonDocument> _enumerator;
private BsonDocument _current;
public MongoDbDataReader(IAsyncCursor<BsonDocument> cursor)
public MongoDbIAsyncCursorDataReader(IAsyncCursor<BsonDocument> cursor)
{
_cursor = cursor;
_enumerator = cursor.ToEnumerable().GetEnumerator();
@ -119,4 +119,111 @@ namespace MongoDb.Ado.data
public override int Depth => 0;
}
public class MongoDbBsonDocumentDataReader : DbDataReader
{
private readonly IEnumerator<BsonDocument> _enumerator;
private BsonDocument _current;
public MongoDbBsonDocumentDataReader(IEnumerable<BsonDocument> documents)
{
_enumerator = documents.GetEnumerator();
}
public override bool Read()
{
if (_enumerator.MoveNext())
{
_current = _enumerator.Current;
return true;
}
return false;
}
public override int FieldCount => _current?.ElementCount ?? 0;
public override int Depth => throw new NotImplementedException();
public override bool HasRows => true;
public override bool IsClosed => false;
public override int RecordsAffected => -1;
public override bool NextResult() => false;
public override object this[int ordinal] => GetValue(ordinal);
public override object this[string name] => GetValue(GetOrdinal(name));
// 下面这些可以根据需要进一步实现或抛异常
public override bool GetBoolean(int ordinal) => (bool)GetValue(ordinal);
public override byte GetByte(int ordinal) => (byte)GetValue(ordinal);
public override long GetBytes(int ordinal, long dataOffset, byte[] buffer, int bufferOffset, int length) => throw new NotSupportedException();
public override char GetChar(int ordinal) => (char)GetValue(ordinal);
public override long GetChars(int ordinal, long dataOffset, char[] buffer, int bufferOffset, int length) => throw new NotSupportedException();
public override string GetDataTypeName(int ordinal) => GetFieldType(ordinal).Name;
public override DateTime GetDateTime(int ordinal) => (DateTime)GetValue(ordinal);
public override decimal GetDecimal(int ordinal) => (decimal)GetValue(ordinal);
public override double GetDouble(int ordinal) => (double)GetValue(ordinal);
public override Type GetFieldType(int ordinal) => GetValue(ordinal)?.GetType() ?? typeof(object);
public override float GetFloat(int ordinal) => (float)GetValue(ordinal);
public override Guid GetGuid(int ordinal) => (Guid)GetValue(ordinal);
public override short GetInt16(int ordinal) => (short)GetValue(ordinal);
public override int GetInt32(int ordinal) => (int)GetValue(ordinal);
public override long GetInt64(int ordinal) => (long)GetValue(ordinal);
public override string GetString(int ordinal) => (string)GetValue(ordinal);
public override IEnumerator GetEnumerator()
{
throw new NotImplementedException();
}
public override int GetValues(object[] values)
{
throw new NotImplementedException();
}
public override bool IsDBNull(int ordinal)
{
throw new NotImplementedException();
}
public override string GetName(int ordinal)
{
var element = GetElementByOrdinal(ordinal);
return element.Name;
}
public override int GetOrdinal(string name)
{
int i = 0;
foreach (var elem in _current.Elements)
{
if (elem.Name.Equals(name, StringComparison.OrdinalIgnoreCase))
return i;
i++;
}
throw new IndexOutOfRangeException($"Field '{name}' not found.");
}
public override object GetValue(int ordinal)
{
if (_current == null)
throw new InvalidOperationException("No current document.");
var element = GetElementByOrdinal(ordinal);
return BsonTypeMapper.MapToDotNetValue(element.Value);
}
private BsonElement GetElementByOrdinal(int ordinal)
{
if (_current == null)
throw new InvalidOperationException("No current document.");
int i = 0;
foreach (var elem in _current.Elements)
{
if (i == ordinal)
return elem;
i++;
}
throw new IndexOutOfRangeException();
}
}
}

View File

@ -21,8 +21,56 @@ namespace MongoDbTest
private static void MongoDbCommandTest()
{
//ExecuteReader
//ExecuteReader single query 1
{
var connection = new MongoDbConnection(DbHelper.SqlSugarConnectionString);
connection.Open();
//对应的SQL: SELECT * FROM b WHERE age > 18;
MongoDbCommand mongoDbCommand = new MongoDbCommand(" find b { age: { $gt: 32 } } ", connection);
using (var reader = mongoDbCommand.ExecuteReader())
{
while (reader.Read())
{
var name = reader.GetString("name");
var age = reader.GetInt32("age");
}
}
connection.Close();
}
//ExecuteReader single query 2
{
var connection = new MongoDbConnection(DbHelper.SqlSugarConnectionString);
connection.Open();
//对应的SQL: SELECT name, age, _id FROM b WHERE age > 18;
MongoDbCommand mongoDbCommand = new MongoDbCommand(" find b [{ age: { $gt: 18 } }, { name: 1, age: 1 }]", connection);
using (var reader = mongoDbCommand.ExecuteReader())
{
while (reader.Read())
{
var name = reader.GetString("name");
var age = reader.GetInt32("age");
}
}
connection.Close();
}
//ExecuteReader single query 3
{
var connection = new MongoDbConnection(DbHelper.SqlSugarConnectionString);
connection.Open();
//对应的SQL: SELECT * FROM B LIMIT 2;
MongoDbCommand mongoDbCommand = new MongoDbCommand(" aggregate b [ { $limit:2 } ]", connection);
using (var reader = mongoDbCommand.ExecuteReader())
{
while (reader.Read())
{
var name = reader.GetString("name");
var age = reader.GetInt32("age");
}
}
connection.Close();
}
//ExecuteReader join query
{
var connection = new MongoDbConnection(DbHelper.SqlSugarConnectionString);
connection.Open();
MongoDbCommand mongoDbCommand = new MongoDbCommand(" find b { age: { $gt: 31 } }", connection);
@ -118,6 +166,16 @@ namespace MongoDbTest
var value = mongoDbCommand.ExecuteNonQuery();
connection.Close();
}
//ExecuteNonQuery Find
{
var connection = new MongoDbConnection(DbHelper.SqlSugarConnectionString);
connection.Open();
MongoDbCommand mongoDbCommand = new MongoDbCommand(
" find b { age: { $gt: 31 } }",
connection);
var value = mongoDbCommand.ExecuteNonQuery();
connection.Close();
}
}
private static void MongoDbConnectionTest()