From db75742e37c32e8ddfffac4b85462ee40105626e Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 2 Sep 2025 00:35:52 -0700 Subject: [PATCH] explain with broker buffer --- weed/query/engine/aggregations.go | 44 ++++++++++- weed/query/engine/engine.go | 58 +++++++++++++- weed/query/engine/hybrid_message_scanner.go | 87 ++++++++++++++++++--- weed/query/engine/types.go | 7 +- 4 files changed, 179 insertions(+), 17 deletions(-) diff --git a/weed/query/engine/aggregations.go b/weed/query/engine/aggregations.go index 7db7cd649..de9706683 100644 --- a/weed/query/engine/aggregations.go +++ b/weed/query/engine/aggregations.go @@ -331,6 +331,11 @@ func (comp *AggregationComputer) computeGlobalMax(spec AggregationSpec, dataSour // executeAggregationQuery handles SELECT queries with aggregation functions func (e *SQLEngine) executeAggregationQuery(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, stmt *sqlparser.Select) (*QueryResult, error) { + return e.executeAggregationQueryWithPlan(ctx, hybridScanner, aggregations, stmt, nil) +} + +// executeAggregationQueryWithPlan handles SELECT queries with aggregation functions and populates execution plan +func (e *SQLEngine) executeAggregationQueryWithPlan(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, stmt *sqlparser.Select, plan *QueryExecutionPlan) (*QueryResult, error) { // Parse WHERE clause for filtering var predicate func(*schema_pb.RecordValue) bool var err error @@ -373,9 +378,42 @@ func (e *SQLEngine) executeAggregationQuery(ctx context.Context, hybridScanner * } // Execute the hybrid scan to get all matching records - results, err := hybridScanner.Scan(ctx, hybridScanOptions) - if err != nil { - return &QueryResult{Error: err}, err + var results []HybridScanResult + if plan != nil { + // EXPLAIN mode - capture broker buffer stats + var stats *HybridScanStats + results, stats, err = hybridScanner.ScanWithStats(ctx, hybridScanOptions) + if err != nil { + return &QueryResult{Error: err}, err + } + + // Populate plan with broker buffer information + if stats != nil { + plan.BrokerBufferQueried = stats.BrokerBufferQueried + plan.BrokerBufferMessages = stats.BrokerBufferMessages + plan.BufferStartIndex = stats.BufferStartIndex + + // Add broker_buffer to data sources if buffer was queried + if stats.BrokerBufferQueried { + // Check if broker_buffer is already in data sources + hasBrokerBuffer := false + for _, source := range plan.DataSources { + if source == "broker_buffer" { + hasBrokerBuffer = true + break + } + } + if !hasBrokerBuffer { + plan.DataSources = append(plan.DataSources, "broker_buffer") + } + } + } + } else { + // Normal mode - just get results + results, err = hybridScanner.Scan(ctx, hybridScanOptions) + if err != nil { + return &QueryResult{Error: err}, err + } } // Compute aggregations diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index c009d8a5d..1d9eac0dd 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -306,6 +306,14 @@ func (e *SQLEngine) buildHierarchicalPlan(plan *QueryExecutionPlan, err error) [ stats = append(stats, fmt.Sprintf("Rows Processed: %d", plan.TotalRowsProcessed)) } + // Broker buffer information + if plan.BrokerBufferQueried { + stats = append(stats, fmt.Sprintf("Broker Buffer Queried: Yes (%d messages)", plan.BrokerBufferMessages)) + if plan.BufferStartIndex > 0 { + stats = append(stats, fmt.Sprintf("Buffer Start Index: %d (deduplication enabled)", plan.BufferStartIndex)) + } + } + for i, stat := range stats { if hasMoreAfterStats { // More sections after Statistics, so use │ prefix @@ -375,6 +383,8 @@ func (e *SQLEngine) formatDataSource(source string) string { return "Parquet Files (full scan)" case "live_logs": return "Live Log Files" + case "broker_buffer": + return "Broker Buffer (real-time)" default: return source } @@ -441,8 +451,52 @@ func (e *SQLEngine) executeSelectStatementWithPlan(ctx context.Context, stmt *sq } } - // Execute the query - result, err := e.executeSelectStatement(ctx, stmt) + // Execute the query (handle aggregations specially for plan tracking) + var result *QueryResult + var err error + + if hasAggregations { + // Extract table information for aggregation execution + var database, tableName string + if len(stmt.From) == 1 { + if table, ok := stmt.From[0].(*sqlparser.AliasedTableExpr); ok { + if tableExpr, ok := table.Expr.(sqlparser.TableName); ok { + tableName = tableExpr.Name.String() + if tableExpr.Qualifier.String() != "" { + database = tableExpr.Qualifier.String() + } + } + } + } + + // Use current database if not specified + if database == "" { + database = e.catalog.currentDatabase + if database == "" { + database = "default" + } + } + + // Create hybrid scanner for aggregation execution + var filerClient filer_pb.FilerClient + if e.catalog.brokerClient != nil { + filerClient, err = e.catalog.brokerClient.GetFilerClient() + if err != nil { + return &QueryResult{Error: err}, err + } + } + + hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, database, tableName) + if err != nil { + return &QueryResult{Error: err}, err + } + + // Execute aggregation query with plan tracking + result, err = e.executeAggregationQueryWithPlan(ctx, hybridScanner, aggregations, stmt, plan) + } else { + // Regular SELECT query + result, err = e.executeSelectStatement(ctx, stmt) + } if err == nil && result != nil { // Extract table name for use in execution strategy determination diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go index b225e1cc0..9d33e399b 100644 --- a/weed/query/engine/hybrid_message_scanner.go +++ b/weed/query/engine/hybrid_message_scanner.go @@ -111,7 +111,15 @@ type HybridScanResult struct { Values map[string]*schema_pb.Value // Column name -> value Timestamp int64 // Message timestamp (_ts_ns) Key []byte // Message key (_key) - Source string // "live_log" or "parquet_archive" + Source string // "live_log" or "parquet_archive" or "in_memory_broker" +} + +// HybridScanStats contains statistics about data sources scanned +type HybridScanStats struct { + BrokerBufferQueried bool + BrokerBufferMessages int + BufferStartIndex int64 + PartitionsScanned int } // ParquetColumnStats holds statistics for a single column from parquet metadata @@ -137,7 +145,14 @@ type ParquetFileStats struct { // 2. Applies filtering at the lowest level for efficiency // 3. Handles schema evolution transparently func (hms *HybridMessageScanner) Scan(ctx context.Context, options HybridScanOptions) ([]HybridScanResult, error) { + results, _, err := hms.ScanWithStats(ctx, options) + return results, err +} + +// ScanWithStats reads messages and returns scan statistics for execution plans +func (hms *HybridMessageScanner) ScanWithStats(ctx context.Context, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) { var results []HybridScanResult + stats := &HybridScanStats{} // Get all partitions for this topic // RESOLVED TODO: Implement proper partition discovery via MQ broker @@ -147,14 +162,27 @@ func (hms *HybridMessageScanner) Scan(ctx context.Context, options HybridScanOpt partitions = []topic.Partition{{RangeStart: 0, RangeStop: 1000}} } + stats.PartitionsScanned = len(partitions) + for _, partition := range partitions { - partitionResults, err := hms.scanPartitionHybrid(ctx, partition, options) + partitionResults, partitionStats, err := hms.scanPartitionHybridWithStats(ctx, partition, options) if err != nil { - return nil, fmt.Errorf("failed to scan partition %v: %v", partition, err) + return nil, stats, fmt.Errorf("failed to scan partition %v: %v", partition, err) } results = append(results, partitionResults...) + // Aggregate broker buffer stats + if partitionStats != nil { + if partitionStats.BrokerBufferQueried { + stats.BrokerBufferQueried = true + } + stats.BrokerBufferMessages += partitionStats.BrokerBufferMessages + if partitionStats.BufferStartIndex > 0 && (stats.BufferStartIndex == 0 || partitionStats.BufferStartIndex < stats.BufferStartIndex) { + stats.BufferStartIndex = partitionStats.BufferStartIndex + } + } + // Apply global limit across all partitions if options.Limit > 0 && len(results) >= options.Limit { results = results[:options.Limit] @@ -162,18 +190,28 @@ func (hms *HybridMessageScanner) Scan(ctx context.Context, options HybridScanOpt } } - return results, nil + return results, stats, nil } // scanUnflushedData queries brokers for unflushed in-memory data using buffer_start deduplication func (hms *HybridMessageScanner) scanUnflushedData(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) { + results, _, err := hms.scanUnflushedDataWithStats(ctx, partition, options) + return results, err +} + +// scanUnflushedDataWithStats queries brokers for unflushed data and returns statistics +func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) { var results []HybridScanResult + stats := &HybridScanStats{} // Skip if no broker client available if hms.brokerClient == nil { - return results, nil + return results, stats, nil } + // Mark that we attempted to query broker buffer + stats.BrokerBufferQueried = true + // Step 1: Get unflushed data from broker using buffer_start-based method // This method uses buffer_start metadata to avoid double-counting with exact precision unflushedEntries, err := hms.brokerClient.GetUnflushedMessages(ctx, hms.topic.Namespace, hms.topic.Name, partition, options.StartTimeNs) @@ -182,7 +220,20 @@ func (hms *HybridMessageScanner) scanUnflushedData(ctx context.Context, partitio if isDebugMode(ctx) { fmt.Printf("Debug: Failed to get unflushed messages: %v\n", err) } - return results, nil + // Reset queried flag on error + stats.BrokerBufferQueried = false + return results, stats, nil + } + + // Capture stats for EXPLAIN + stats.BrokerBufferMessages = len(unflushedEntries) + + // Debug logging for EXPLAIN mode + if isDebugMode(ctx) { + fmt.Printf("Debug: Broker buffer queried - found %d unflushed messages\n", len(unflushedEntries)) + if len(unflushedEntries) > 0 { + fmt.Printf("Debug: Using buffer_start deduplication for precise real-time data\n") + } } // Step 2: Process unflushed entries (already deduplicated by broker) @@ -251,7 +302,7 @@ func (hms *HybridMessageScanner) scanUnflushedData(ctx context.Context, partitio fmt.Printf("Debug: Retrieved %d unflushed messages from broker\n", len(results)) } - return results, nil + return results, stats, nil } // convertDataMessageToRecord converts mq_pb.DataMessage to schema_pb.RecordValue @@ -344,15 +395,29 @@ func (hms *HybridMessageScanner) discoverTopicPartitions(ctx context.Context) ([ // 1. Unflushed in-memory data from brokers (REAL-TIME) // 2. Live logs + Parquet files from disk (FLUSHED/ARCHIVED) func (hms *HybridMessageScanner) scanPartitionHybrid(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) { + results, _, err := hms.scanPartitionHybridWithStats(ctx, partition, options) + return results, err +} + +// scanPartitionHybridWithStats scans a specific partition and returns statistics +func (hms *HybridMessageScanner) scanPartitionHybridWithStats(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) { var results []HybridScanResult + stats := &HybridScanStats{} // STEP 1: Scan unflushed in-memory data from brokers (REAL-TIME) - unflushedResults, err := hms.scanUnflushedData(ctx, partition, options) + unflushedResults, unflushedStats, err := hms.scanUnflushedDataWithStats(ctx, partition, options) if err != nil { // Don't fail the query if broker scanning fails - just log and continue with disk data - fmt.Printf("Warning: Failed to scan unflushed data from broker: %v\n", err) + if !isDebugMode(ctx) { + fmt.Printf("Warning: Failed to scan unflushed data from broker: %v\n", err) + } } else { results = append(results, unflushedResults...) + if unflushedStats != nil { + stats.BrokerBufferQueried = unflushedStats.BrokerBufferQueried + stats.BrokerBufferMessages = unflushedStats.BrokerBufferMessages + stats.BufferStartIndex = unflushedStats.BufferStartIndex + } } // STEP 2: Scan flushed data from disk (live logs + Parquet files) @@ -436,7 +501,7 @@ func (hms *HybridMessageScanner) scanPartitionHybrid(ctx context.Context, partit _, _, err = mergedReadFn(startPosition, stopTsNs, eachLogEntryFn) if err != nil { - return nil, fmt.Errorf("flushed data scan failed: %v", err) + return nil, stats, fmt.Errorf("flushed data scan failed: %v", err) } } @@ -458,7 +523,7 @@ func (hms *HybridMessageScanner) scanPartitionHybrid(ctx context.Context, partit results = results[:options.Limit] } - return results, nil + return results, stats, nil } // convertLogEntryToRecordValue converts a filer_pb.LogEntry to schema_pb.RecordValue diff --git a/weed/query/engine/types.go b/weed/query/engine/types.go index 96aad7582..3b72ca7da 100644 --- a/weed/query/engine/types.go +++ b/weed/query/engine/types.go @@ -8,7 +8,7 @@ import ( type QueryExecutionPlan struct { QueryType string ExecutionStrategy string `json:"execution_strategy"` // fast_path, full_scan, hybrid - DataSources []string `json:"data_sources"` // parquet_files, live_logs + DataSources []string `json:"data_sources"` // parquet_files, live_logs, broker_buffer PartitionsScanned int `json:"partitions_scanned"` ParquetFilesScanned int `json:"parquet_files_scanned"` LiveLogFilesScanned int `json:"live_log_files_scanned"` @@ -18,6 +18,11 @@ type QueryExecutionPlan struct { Aggregations []string `json:"aggregations,omitempty"` ExecutionTimeMs float64 `json:"execution_time_ms"` Details map[string]interface{} `json:"details,omitempty"` + + // Broker buffer information + BrokerBufferQueried bool `json:"broker_buffer_queried"` + BrokerBufferMessages int `json:"broker_buffer_messages"` + BufferStartIndex int64 `json:"buffer_start_index,omitempty"` } // QueryResult represents the result of a SQL query execution