mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-09-24 02:13:35 +08:00
remove emoji
This commit is contained in:
@@ -172,7 +172,7 @@ func (f *filerClientImpl) GetDataCenter() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ListNamespaces retrieves all MQ namespaces (databases) from the filer
|
// ListNamespaces retrieves all MQ namespaces (databases) from the filer
|
||||||
// ✅ RESOLVED: Now queries actual topic directories instead of hardcoded values
|
// RESOLVED: Now queries actual topic directories instead of hardcoded values
|
||||||
func (c *BrokerClient) ListNamespaces(ctx context.Context) ([]string, error) {
|
func (c *BrokerClient) ListNamespaces(ctx context.Context) ([]string, error) {
|
||||||
// Get filer client to list directories under /topics
|
// Get filer client to list directories under /topics
|
||||||
filerClient, err := c.GetFilerClient()
|
filerClient, err := c.GetFilerClient()
|
||||||
@@ -219,7 +219,7 @@ func (c *BrokerClient) ListNamespaces(ctx context.Context) ([]string, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ListTopics retrieves all topics in a namespace from the filer
|
// ListTopics retrieves all topics in a namespace from the filer
|
||||||
// ✅ RESOLVED: Now queries actual topic directories instead of hardcoded values
|
// RESOLVED: Now queries actual topic directories instead of hardcoded values
|
||||||
func (c *BrokerClient) ListTopics(ctx context.Context, namespace string) ([]string, error) {
|
func (c *BrokerClient) ListTopics(ctx context.Context, namespace string) ([]string, error) {
|
||||||
// Get filer client to list directories under /topics/{namespace}
|
// Get filer client to list directories under /topics/{namespace}
|
||||||
filerClient, err := c.GetFilerClient()
|
filerClient, err := c.GetFilerClient()
|
||||||
|
@@ -152,7 +152,7 @@ func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *sqlparser.
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create HybridMessageScanner for the topic (reads both live logs + Parquet files)
|
// Create HybridMessageScanner for the topic (reads both live logs + Parquet files)
|
||||||
// ✅ RESOLVED TODO: Get real filerClient from broker connection
|
// RESOLVED TODO: Get real filerClient from broker connection
|
||||||
var filerClient filer_pb.FilerClient
|
var filerClient filer_pb.FilerClient
|
||||||
if e.catalog.brokerClient != nil {
|
if e.catalog.brokerClient != nil {
|
||||||
var filerClientErr error
|
var filerClientErr error
|
||||||
@@ -235,7 +235,7 @@ func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *sqlparser.
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Build hybrid scan options
|
// Build hybrid scan options
|
||||||
// ✅ RESOLVED TODO: Extract from WHERE clause time filters
|
// RESOLVED TODO: Extract from WHERE clause time filters
|
||||||
startTimeNs, stopTimeNs := int64(0), int64(0)
|
startTimeNs, stopTimeNs := int64(0), int64(0)
|
||||||
if stmt.Where != nil {
|
if stmt.Where != nil {
|
||||||
startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr)
|
startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr)
|
||||||
@@ -1097,18 +1097,18 @@ func (e *SQLEngine) executeAggregationQuery(ctx context.Context, hybridScanner *
|
|||||||
startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr)
|
startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 🚀 FAST PATH: Try to use parquet statistics for optimization
|
// FAST PATH: Try to use parquet statistics for optimization
|
||||||
// This can be ~130x faster than scanning all data
|
// This can be ~130x faster than scanning all data
|
||||||
if stmt.Where == nil { // Only optimize when no complex WHERE clause
|
if stmt.Where == nil { // Only optimize when no complex WHERE clause
|
||||||
fastResult, canOptimize := e.tryFastParquetAggregation(ctx, hybridScanner, aggregations)
|
fastResult, canOptimize := e.tryFastParquetAggregation(ctx, hybridScanner, aggregations)
|
||||||
if canOptimize {
|
if canOptimize {
|
||||||
fmt.Printf("✅ Using fast parquet statistics for aggregation (skipped full scan)\n")
|
fmt.Printf("Using fast parquet statistics for aggregation (skipped full scan)\n")
|
||||||
return fastResult, nil
|
return fastResult, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SLOW PATH: Fall back to full table scan
|
// SLOW PATH: Fall back to full table scan
|
||||||
fmt.Printf("⚠️ Using full table scan for aggregation (parquet optimization not applicable)\n")
|
fmt.Printf("Using full table scan for aggregation (parquet optimization not applicable)\n")
|
||||||
|
|
||||||
// Build scan options for full table scan (aggregations need all data)
|
// Build scan options for full table scan (aggregations need all data)
|
||||||
hybridScanOptions := HybridScanOptions{
|
hybridScanOptions := HybridScanOptions{
|
||||||
|
@@ -137,7 +137,7 @@ func (hms *HybridMessageScanner) Scan(ctx context.Context, options HybridScanOpt
|
|||||||
var results []HybridScanResult
|
var results []HybridScanResult
|
||||||
|
|
||||||
// Get all partitions for this topic
|
// Get all partitions for this topic
|
||||||
// ✅ RESOLVED TODO: Implement proper partition discovery via MQ broker
|
// RESOLVED TODO: Implement proper partition discovery via MQ broker
|
||||||
partitions, err := hms.discoverTopicPartitions(ctx)
|
partitions, err := hms.discoverTopicPartitions(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Fallback to default partition if discovery fails
|
// Fallback to default partition if discovery fails
|
||||||
@@ -316,7 +316,7 @@ func (hms *HybridMessageScanner) convertLogEntryToRecordValue(logEntry *filer_pb
|
|||||||
recordValue := &schema_pb.RecordValue{}
|
recordValue := &schema_pb.RecordValue{}
|
||||||
if err := proto.Unmarshal(logEntry.Data, recordValue); err == nil {
|
if err := proto.Unmarshal(logEntry.Data, recordValue); err == nil {
|
||||||
// This is an archived message from Parquet files
|
// This is an archived message from Parquet files
|
||||||
// ✅ FIX: Add system columns from LogEntry to RecordValue
|
// FIX: Add system columns from LogEntry to RecordValue
|
||||||
if recordValue.Fields == nil {
|
if recordValue.Fields == nil {
|
||||||
recordValue.Fields = make(map[string]*schema_pb.Value)
|
recordValue.Fields = make(map[string]*schema_pb.Value)
|
||||||
}
|
}
|
||||||
@@ -333,7 +333,7 @@ func (hms *HybridMessageScanner) convertLogEntryToRecordValue(logEntry *filer_pb
|
|||||||
}
|
}
|
||||||
|
|
||||||
// If not a RecordValue, this is raw live message data
|
// If not a RecordValue, this is raw live message data
|
||||||
// ✅ RESOLVED TODO: Implement proper schema-aware parsing based on topic schema
|
// RESOLVED TODO: Implement proper schema-aware parsing based on topic schema
|
||||||
return hms.parseRawMessageWithSchema(logEntry)
|
return hms.parseRawMessageWithSchema(logEntry)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user