Update mongodb

This commit is contained in:
sunkaixuan 2025-05-02 14:30:13 +08:00
parent 1a4a5f6d02
commit df4460c5a9
5 changed files with 331 additions and 215 deletions

View File

@ -20,7 +20,7 @@ namespace MongoDb.Ado.data
var aggregateFluent = collection.Aggregate<BsonDocument>(pipeline.Select(stage => new BsonDocument(stage.AsBsonDocument)).ToArray());
// 执行聚合查询并返回 DbDataReader
var cursor = aggregateFluent.ToList();
var cursor = aggregateFluent.ToEnumerable();
return new MongoDbBsonDocumentDataReader(cursor);
}
}

View File

@ -1,6 +1,7 @@
using MongoDB.Bson;
using MongoDB.Driver;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Data.Common;
using System.Text;
@ -35,9 +36,8 @@ namespace MongoDb.Ado.data
if (projection != null)
findFluent = findFluent.Project<BsonDocument>(projection);
var cursor = findFluent.ToCursor(); // 已包含 filter + projection 的结果
return new MongoDbIAsyncCursorDataReader(cursor); // 你要确保这个类支持逐行读取 BsonDocument
var cursor = findFluent.ToEnumerable();
return new MongoDbBsonDocumentDataReader(cursor); // 你要确保这个类支持逐行读取 BsonDocument
}
}
}

View File

@ -0,0 +1,168 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Text;
namespace MongoDb.Ado.data
{
/// <summary>
/// 数据填充器
/// </summary>
public class MongoDbDataAdapter : IDataAdapter
{
private MongoDbCommand command;
private string sql;
private MongoDbConnection _sqlConnection;
/// <summary>
/// SqlDataAdapter
/// </summary>
/// <param name="command"></param>
public MongoDbDataAdapter(MongoDbCommand command)
{
this.command = command;
}
public MongoDbDataAdapter()
{
}
/// <summary>
/// SqlDataAdapter
/// </summary>
/// <param name="sql"></param>
/// <param name="_sqlConnection"></param>
public MongoDbDataAdapter(string sql, MongoDbConnection _sqlConnection)
{
this.sql = sql;
this._sqlConnection = _sqlConnection;
}
/// <summary>
/// SelectCommand
/// </summary>
public MongoDbCommand SelectCommand
{
get
{
if (this.command == null)
{
this.command = new MongoDbCommand(this.sql, this._sqlConnection);
}
return this.command;
}
set
{
this.command = value;
}
}
public MissingMappingAction MissingMappingAction { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
public MissingSchemaAction MissingSchemaAction { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
public ITableMappingCollection TableMappings => throw new NotImplementedException();
/// <summary>
/// Fill
/// </summary>
/// <param name="dt"></param>
public void Fill(DataTable dt)
{
if (dt == null)
{
dt = new DataTable();
}
var columns = dt.Columns;
var rows = dt.Rows;
using (DbDataReader dr = command.ExecuteReader())
{
for (int i = 0; i < dr.FieldCount; i++)
{
string name = dr.GetName(i).Trim();
if (!columns.Contains(name))
columns.Add(new DataColumn(name, dr.GetFieldType(i)));
else
{
columns.Add(new DataColumn(name + i, dr.GetFieldType(i)));
}
}
while (dr.Read())
{
DataRow daRow = dt.NewRow();
for (int i = 0; i < columns.Count; i++)
{
daRow[columns[i].ColumnName] = dr.GetValue(i);
}
dt.Rows.Add(daRow);
}
}
dt.AcceptChanges();
}
/// <summary>
/// Fill
/// </summary>
/// <param name="ds"></param>
public void Fill(DataSet ds)
{
if (ds == null)
{
ds = new DataSet();
}
using (DbDataReader dr = command.ExecuteReader())
{
do
{
var dt = new DataTable();
var columns = dt.Columns;
var rows = dt.Rows;
for (int i = 0; i < dr.FieldCount; i++)
{
string name = dr.GetName(i).Trim();
if (!columns.Contains(name))
columns.Add(new DataColumn(name, dr.GetFieldType(i)));
else
{
columns.Add(new DataColumn(name + i, dr.GetFieldType(i)));
}
}
while (dr.Read())
{
DataRow daRow = dt.NewRow();
for (int i = 0; i < columns.Count; i++)
{
daRow[columns[i].ColumnName] = dr.GetValue(i);
}
dt.Rows.Add(daRow);
}
dt.AcceptChanges();
ds.Tables.Add(dt);
} while (dr.NextResult());
}
}
int IDataAdapter.Fill(DataSet dataSet)
{
throw new NotImplementedException();
}
public DataTable[] FillSchema(DataSet dataSet, SchemaType schemaType)
{
throw new NotImplementedException();
}
public IDataParameter[] GetFillParameters()
{
throw new NotImplementedException();
}
public int Update(DataSet dataSet)
{
throw new NotImplementedException();
}
}
}

View File

@ -5,128 +5,44 @@ using MongoDB.Driver;
using MongoDB.Bson;
using System.Data;
using System.Collections;
using System.Linq;
namespace MongoDb.Ado.data
{
public class MongoDbIAsyncCursorDataReader : DbDataReader
{
private readonly IAsyncCursor<BsonDocument> _cursor;
private IEnumerator<BsonDocument> _enumerator;
private BsonDocument _current;
public MongoDbIAsyncCursorDataReader(IAsyncCursor<BsonDocument> cursor)
{
_cursor = cursor;
_enumerator = cursor.ToEnumerable().GetEnumerator();
}
public override bool Read()
{
if (_enumerator.MoveNext())
{
_current = _enumerator.Current;
return true;
}
return false;
}
public override int FieldCount => _current?.ElementCount ?? 0;
public override object GetValue(int ordinal)
{
if (_current == null)
throw new InvalidOperationException("No current document.");
var element = GetElementByOrdinal(ordinal);
return BsonTypeMapper.MapToDotNetValue(element.Value);
}
public override string GetName(int ordinal)
{
var element = GetElementByOrdinal(ordinal);
return element.Name;
}
public override int GetOrdinal(string name)
{
int i = 0;
foreach (var elem in _current.Elements)
{
if (elem.Name.Equals(name, StringComparison.OrdinalIgnoreCase))
return i;
i++;
}
throw new IndexOutOfRangeException($"Field '{name}' not found.");
}
private BsonElement GetElementByOrdinal(int ordinal)
{
if (_current == null)
throw new InvalidOperationException("No current document.");
int i = 0;
foreach (var elem in _current.Elements)
{
if (i == ordinal)
return elem;
i++;
}
throw new IndexOutOfRangeException();
}
public override bool HasRows => true;
public override bool IsClosed => false;
public override int RecordsAffected => -1;
public override bool NextResult() => false;
public override object this[int ordinal] => GetValue(ordinal);
public override object this[string name] => GetValue(GetOrdinal(name));
// 下面这些可以根据需要进一步实现或抛异常
public override bool GetBoolean(int ordinal) => (bool)GetValue(ordinal);
public override byte GetByte(int ordinal) => (byte)GetValue(ordinal);
public override long GetBytes(int ordinal, long dataOffset, byte[] buffer, int bufferOffset, int length) => throw new NotSupportedException();
public override char GetChar(int ordinal) => (char)GetValue(ordinal);
public override long GetChars(int ordinal, long dataOffset, char[] buffer, int bufferOffset, int length) => throw new NotSupportedException();
public override string GetDataTypeName(int ordinal) => GetFieldType(ordinal).Name;
public override DateTime GetDateTime(int ordinal) => (DateTime)GetValue(ordinal);
public override decimal GetDecimal(int ordinal) => (decimal)GetValue(ordinal);
public override double GetDouble(int ordinal) => (double)GetValue(ordinal);
public override Type GetFieldType(int ordinal) => GetValue(ordinal)?.GetType() ?? typeof(object);
public override float GetFloat(int ordinal) => (float)GetValue(ordinal);
public override Guid GetGuid(int ordinal) => (Guid)GetValue(ordinal);
public override short GetInt16(int ordinal) => (short)GetValue(ordinal);
public override int GetInt32(int ordinal) => (int)GetValue(ordinal);
public override long GetInt64(int ordinal) => (long)GetValue(ordinal);
public override string GetString(int ordinal) => (string)GetValue(ordinal);
public override IEnumerator GetEnumerator()
{
throw new NotImplementedException();
}
public override int GetValues(object[] values)
{
throw new NotImplementedException();
}
public override bool IsDBNull(int ordinal)
{
throw new NotImplementedException();
}
public override int Depth => 0;
}
public class MongoDbBsonDocumentDataReader : DbDataReader
{
private readonly IEnumerator<BsonDocument> _enumerator;
private BsonDocument _current;
private List<string> _fieldNames;
private List<Type> _fieldTypes;
public MongoDbBsonDocumentDataReader(IEnumerable<BsonDocument> documents)
{
_enumerator = documents.GetEnumerator();
var docList = documents.ToList();
_enumerator = docList.GetEnumerator();
if (docList.Any()==true)
{
_fieldNames = docList.Take(1).SelectMany(d => d.Names).Distinct(StringComparer.OrdinalIgnoreCase).ToList();
_fieldTypes = new List<Type>();
foreach (var fieldName in _fieldNames)
{
Type fieldType = typeof(object); // 默认类型
foreach (var doc in docList)
{
if (doc.TryGetValue(fieldName, out var value) && value != BsonNull.Value)
{
fieldType = BsonTypeMapper.MapToDotNetValue(value)?.GetType() ?? typeof(object);
}
break;
}
_fieldTypes.Add(fieldType);
}
}
else
{
_fieldNames = new List<string>();
_fieldTypes = new List<Type>();
}
}
public override bool Read()
@ -139,7 +55,7 @@ namespace MongoDb.Ado.data
return false;
}
public override int FieldCount => _current?.ElementCount ?? 0;
public override int FieldCount => _fieldNames.Count;
public override int Depth => throw new NotImplementedException();
@ -161,7 +77,7 @@ namespace MongoDb.Ado.data
public override DateTime GetDateTime(int ordinal) => (DateTime)GetValue(ordinal);
public override decimal GetDecimal(int ordinal) => (decimal)GetValue(ordinal);
public override double GetDouble(int ordinal) => (double)GetValue(ordinal);
public override Type GetFieldType(int ordinal) => GetValue(ordinal)?.GetType() ?? typeof(object);
public override Type GetFieldType(int ordinal) => _fieldTypes[ordinal] ?? typeof(object);
public override float GetFloat(int ordinal) => (float)GetValue(ordinal);
public override Guid GetGuid(int ordinal) => (Guid)GetValue(ordinal);
public override short GetInt16(int ordinal) => (short)GetValue(ordinal);
@ -186,8 +102,9 @@ namespace MongoDb.Ado.data
public override string GetName(int ordinal)
{
var element = GetElementByOrdinal(ordinal);
return element.Name;
if (ordinal < 0 || ordinal >= _fieldNames.Count)
throw new IndexOutOfRangeException($"Invalid ordinal: {ordinal}");
return _fieldNames[ordinal];
}
public override int GetOrdinal(string name)

View File

@ -7,6 +7,7 @@ using System.Data;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using static System.Net.Mime.MediaTypeNames;
namespace MongoDbTest
{
@ -21,107 +22,15 @@ namespace MongoDbTest
private static void MongoDbCommandTest()
{
//ExecuteReader single query 1
{
var connection = new MongoDbConnection(DbHelper.SqlSugarConnectionString);
connection.Open();
//对应的SQL: SELECT * FROM b WHERE age > 18;
MongoDbCommand mongoDbCommand = new MongoDbCommand(" find b { age: { $gt: 32 } } ", connection);
using (var reader = mongoDbCommand.ExecuteReader())
{
while (reader.Read())
{
var name = reader.GetString("name");
var age = reader.GetInt32("age");
DataReaderTest();
DataTableTest();
ExecuteScalarTest();
ExecuteNonQueryTest();
}
}
connection.Close();
}
//ExecuteReader single query 2
private static void ExecuteNonQueryTest()
{
var connection = new MongoDbConnection(DbHelper.SqlSugarConnectionString);
connection.Open();
//对应的SQL: SELECT name, age, _id FROM b WHERE age > 18;
MongoDbCommand mongoDbCommand = new MongoDbCommand(" find b [{ age: { $gt: 18 } }, { name: 1, age: 1 }]", connection);
using (var reader = mongoDbCommand.ExecuteReader())
{
while (reader.Read())
{
var name = reader.GetString("name");
var age = reader.GetInt32("age");
}
}
connection.Close();
}
//ExecuteReader single query 3
{
var connection = new MongoDbConnection(DbHelper.SqlSugarConnectionString);
connection.Open();
//SELECT * FROM b WHERE age > 18 ORDER BY age DESC LIMIT 2 OFFSET 1;
MongoDbCommand mongoDbCommand = new MongoDbCommand(" aggregate b [\r\n { \"$sort\": { \"age\": -1 } },\r\n { \"$skip\": 1 },\r\n { \"$limit\": 2 }\r\n] ]", connection);
using (var reader = mongoDbCommand.ExecuteReader())
{
while (reader.Read())
{
var name = reader.GetString("name");
var age = reader.GetInt32("age");
}
}
connection.Close();
}
//ExecuteReader join query
{
//SELECT b1._id, b1.a_id, b2.some_field AS joined_field
// FROM b AS b1
//LEFT JOIN b AS b2 ON b1.a_id = b2._id
//WHERE b2.some_field > 100
//ORDER BY b2.some_field DESC
// LIMIT 20 OFFSET 10;
var connection = new MongoDbConnection(DbHelper.SqlSugarConnectionString);
connection.Open();
MongoDbCommand mongoDbCommand = new MongoDbCommand(@" aggregate b [
{
""$lookup"": {
""from"": ""b"",
""localField"": ""a_id"",
""foreignField"": ""_id"",
""as"": ""joined_docs""
}
},
{
""$unwind"": ""$joined_docs""
},
{
""$match"": {
""joined_docs.some_field"": { ""$gt"": 100 }
}
},
{
""$project"": {
""_id"": 1,
""a_id"": 1,
""joined_field"": ""$joined_docs.some_field""
}
}
])", connection);
using (var reader = mongoDbCommand.ExecuteReader())
{
while (reader.Read())
{
var name = reader.GetString("name");
var age = reader.GetInt32("age");
}
}
connection.Close();
}
//ExecuteScalar
{
var connection = new MongoDbConnection(DbHelper.SqlSugarConnectionString);
connection.Open();
MongoDbCommand mongoDbCommand = new MongoDbCommand(" find b { age: { $gt: 31 } }", connection);
var value=mongoDbCommand.ExecuteScalar();
connection.Close();
}
//ExecuteNonQuery insert
{
var connection = new MongoDbConnection(DbHelper.SqlSugarConnectionString);
@ -208,6 +117,128 @@ namespace MongoDbTest
}
}
private static void DataTableTest()
{
{
var connection = new MongoDbConnection(DbHelper.SqlSugarConnectionString);
connection.Open();
////SELECT * FROM b ORDER BY age DESC OFFSET 1 ROWS FETCH NEXT 2 ROWS ONLY;
MongoDbCommand mongoDbCommand = new MongoDbCommand(" aggregate b [\r\n { \"$sort\": { \"age\": -1 } },\r\n { \"$skip\": 1 },\r\n { \"$limit\": 2 }\r\n] ]", connection);
MongoDbDataAdapter mongoDbDataAdapter = new MongoDbDataAdapter();
mongoDbDataAdapter.SelectCommand = mongoDbCommand;
DataTable dt = new DataTable();
mongoDbDataAdapter.Fill(dt);
connection.Close();
}
}
private static void ExecuteScalarTest()
{
//ExecuteScalar
var connection = new MongoDbConnection(DbHelper.SqlSugarConnectionString);
connection.Open();
MongoDbCommand mongoDbCommand = new MongoDbCommand(" find b { age: { $gt: 31 } }", connection);
var value = mongoDbCommand.ExecuteScalar();
connection.Close();
}
private static void DataReaderTest()
{
//ExecuteReader single query 1
{
var connection = new MongoDbConnection(DbHelper.SqlSugarConnectionString);
connection.Open();
//对应的SQL: SELECT * FROM b WHERE age > 18;
MongoDbCommand mongoDbCommand = new MongoDbCommand(" find b { age: { $gt: 32 } } ", connection);
using (var reader = mongoDbCommand.ExecuteReader())
{
while (reader.Read())
{
var name = reader.GetString("name");
var age = reader.GetInt32("age");
}
}
connection.Close();
}
//ExecuteReader single query 2
{
var connection = new MongoDbConnection(DbHelper.SqlSugarConnectionString);
connection.Open();
//对应的SQL: SELECT name, age, _id FROM b WHERE age > 18;
MongoDbCommand mongoDbCommand = new MongoDbCommand(" find b [{ age: { $gt: 18 } }, { name: 1, age: 1 }]", connection);
using (var reader = mongoDbCommand.ExecuteReader())
{
while (reader.Read())
{
var name = reader.GetString("name");
var age = reader.GetInt32("age");
}
}
connection.Close();
}
//ExecuteReader single query 3
{
var connection = new MongoDbConnection(DbHelper.SqlSugarConnectionString);
connection.Open();
//SELECT * FROM b ORDER BY age DESC OFFSET 1 ROWS FETCH NEXT 2 ROWS ONLY;
MongoDbCommand mongoDbCommand = new MongoDbCommand(" aggregate b [\r\n { \"$sort\": { \"age\": -1 } },\r\n { \"$skip\": 1 },\r\n { \"$limit\": 2 }\r\n] ]", connection);
using (var reader = mongoDbCommand.ExecuteReader())
{
while (reader.Read())
{
var name = reader.GetString("name");
var age = reader.GetInt32("age");
}
}
connection.Close();
}
//ExecuteReader join query
{
//SELECT b1._id, b1.a_id, b2.some_field AS joined_field
// FROM b AS b1
//LEFT JOIN b AS b2 ON b1.a_id = b2._id
//WHERE b2.some_field > 100
//ORDER BY b2.some_field DESC
// LIMIT 20 OFFSET 10;
var connection = new MongoDbConnection(DbHelper.SqlSugarConnectionString);
connection.Open();
MongoDbCommand mongoDbCommand = new MongoDbCommand(@" aggregate b [
{
""$lookup"": {
""from"": ""b"",
""localField"": ""a_id"",
""foreignField"": ""_id"",
""as"": ""joined_docs""
}
},
{
""$unwind"": ""$joined_docs""
},
{
""$match"": {
""joined_docs.some_field"": { ""$gt"": 100 }
}
},
{
""$project"": {
""_id"": 1,
""a_id"": 1,
""joined_field"": ""$joined_docs.some_field""
}
}
])", connection);
using (var reader = mongoDbCommand.ExecuteReader())
{
while (reader.Read())
{
var name = reader.GetString("name");
var age = reader.GetInt32("age");
}
}
connection.Close();
}
}
private static void MongoDbConnectionTest()
{
var db= new MongoDbConnection(DbHelper.SqlSugarConnectionString);