explain with broker buffer

This commit is contained in:
chrislu
2025-09-02 00:35:52 -07:00
parent de866bfd09
commit db75742e37
4 changed files with 179 additions and 17 deletions

View File

@@ -331,6 +331,11 @@ func (comp *AggregationComputer) computeGlobalMax(spec AggregationSpec, dataSour
// executeAggregationQuery handles SELECT queries with aggregation functions
func (e *SQLEngine) executeAggregationQuery(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, stmt *sqlparser.Select) (*QueryResult, error) {
return e.executeAggregationQueryWithPlan(ctx, hybridScanner, aggregations, stmt, nil)
}
// executeAggregationQueryWithPlan handles SELECT queries with aggregation functions and populates execution plan
func (e *SQLEngine) executeAggregationQueryWithPlan(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, stmt *sqlparser.Select, plan *QueryExecutionPlan) (*QueryResult, error) {
// Parse WHERE clause for filtering
var predicate func(*schema_pb.RecordValue) bool
var err error
@@ -373,9 +378,42 @@ func (e *SQLEngine) executeAggregationQuery(ctx context.Context, hybridScanner *
}
// Execute the hybrid scan to get all matching records
results, err := hybridScanner.Scan(ctx, hybridScanOptions)
if err != nil {
return &QueryResult{Error: err}, err
var results []HybridScanResult
if plan != nil {
// EXPLAIN mode - capture broker buffer stats
var stats *HybridScanStats
results, stats, err = hybridScanner.ScanWithStats(ctx, hybridScanOptions)
if err != nil {
return &QueryResult{Error: err}, err
}
// Populate plan with broker buffer information
if stats != nil {
plan.BrokerBufferQueried = stats.BrokerBufferQueried
plan.BrokerBufferMessages = stats.BrokerBufferMessages
plan.BufferStartIndex = stats.BufferStartIndex
// Add broker_buffer to data sources if buffer was queried
if stats.BrokerBufferQueried {
// Check if broker_buffer is already in data sources
hasBrokerBuffer := false
for _, source := range plan.DataSources {
if source == "broker_buffer" {
hasBrokerBuffer = true
break
}
}
if !hasBrokerBuffer {
plan.DataSources = append(plan.DataSources, "broker_buffer")
}
}
}
} else {
// Normal mode - just get results
results, err = hybridScanner.Scan(ctx, hybridScanOptions)
if err != nil {
return &QueryResult{Error: err}, err
}
}
// Compute aggregations

View File

@@ -306,6 +306,14 @@ func (e *SQLEngine) buildHierarchicalPlan(plan *QueryExecutionPlan, err error) [
stats = append(stats, fmt.Sprintf("Rows Processed: %d", plan.TotalRowsProcessed))
}
// Broker buffer information
if plan.BrokerBufferQueried {
stats = append(stats, fmt.Sprintf("Broker Buffer Queried: Yes (%d messages)", plan.BrokerBufferMessages))
if plan.BufferStartIndex > 0 {
stats = append(stats, fmt.Sprintf("Buffer Start Index: %d (deduplication enabled)", plan.BufferStartIndex))
}
}
for i, stat := range stats {
if hasMoreAfterStats {
// More sections after Statistics, so use │ prefix
@@ -375,6 +383,8 @@ func (e *SQLEngine) formatDataSource(source string) string {
return "Parquet Files (full scan)"
case "live_logs":
return "Live Log Files"
case "broker_buffer":
return "Broker Buffer (real-time)"
default:
return source
}
@@ -441,8 +451,52 @@ func (e *SQLEngine) executeSelectStatementWithPlan(ctx context.Context, stmt *sq
}
}
// Execute the query
result, err := e.executeSelectStatement(ctx, stmt)
// Execute the query (handle aggregations specially for plan tracking)
var result *QueryResult
var err error
if hasAggregations {
// Extract table information for aggregation execution
var database, tableName string
if len(stmt.From) == 1 {
if table, ok := stmt.From[0].(*sqlparser.AliasedTableExpr); ok {
if tableExpr, ok := table.Expr.(sqlparser.TableName); ok {
tableName = tableExpr.Name.String()
if tableExpr.Qualifier.String() != "" {
database = tableExpr.Qualifier.String()
}
}
}
}
// Use current database if not specified
if database == "" {
database = e.catalog.currentDatabase
if database == "" {
database = "default"
}
}
// Create hybrid scanner for aggregation execution
var filerClient filer_pb.FilerClient
if e.catalog.brokerClient != nil {
filerClient, err = e.catalog.brokerClient.GetFilerClient()
if err != nil {
return &QueryResult{Error: err}, err
}
}
hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, database, tableName)
if err != nil {
return &QueryResult{Error: err}, err
}
// Execute aggregation query with plan tracking
result, err = e.executeAggregationQueryWithPlan(ctx, hybridScanner, aggregations, stmt, plan)
} else {
// Regular SELECT query
result, err = e.executeSelectStatement(ctx, stmt)
}
if err == nil && result != nil {
// Extract table name for use in execution strategy determination

View File

@@ -111,7 +111,15 @@ type HybridScanResult struct {
Values map[string]*schema_pb.Value // Column name -> value
Timestamp int64 // Message timestamp (_ts_ns)
Key []byte // Message key (_key)
Source string // "live_log" or "parquet_archive"
Source string // "live_log" or "parquet_archive" or "in_memory_broker"
}
// HybridScanStats contains statistics about data sources scanned
type HybridScanStats struct {
BrokerBufferQueried bool
BrokerBufferMessages int
BufferStartIndex int64
PartitionsScanned int
}
// ParquetColumnStats holds statistics for a single column from parquet metadata
@@ -137,7 +145,14 @@ type ParquetFileStats struct {
// 2. Applies filtering at the lowest level for efficiency
// 3. Handles schema evolution transparently
func (hms *HybridMessageScanner) Scan(ctx context.Context, options HybridScanOptions) ([]HybridScanResult, error) {
results, _, err := hms.ScanWithStats(ctx, options)
return results, err
}
// ScanWithStats reads messages and returns scan statistics for execution plans
func (hms *HybridMessageScanner) ScanWithStats(ctx context.Context, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) {
var results []HybridScanResult
stats := &HybridScanStats{}
// Get all partitions for this topic
// RESOLVED TODO: Implement proper partition discovery via MQ broker
@@ -147,14 +162,27 @@ func (hms *HybridMessageScanner) Scan(ctx context.Context, options HybridScanOpt
partitions = []topic.Partition{{RangeStart: 0, RangeStop: 1000}}
}
stats.PartitionsScanned = len(partitions)
for _, partition := range partitions {
partitionResults, err := hms.scanPartitionHybrid(ctx, partition, options)
partitionResults, partitionStats, err := hms.scanPartitionHybridWithStats(ctx, partition, options)
if err != nil {
return nil, fmt.Errorf("failed to scan partition %v: %v", partition, err)
return nil, stats, fmt.Errorf("failed to scan partition %v: %v", partition, err)
}
results = append(results, partitionResults...)
// Aggregate broker buffer stats
if partitionStats != nil {
if partitionStats.BrokerBufferQueried {
stats.BrokerBufferQueried = true
}
stats.BrokerBufferMessages += partitionStats.BrokerBufferMessages
if partitionStats.BufferStartIndex > 0 && (stats.BufferStartIndex == 0 || partitionStats.BufferStartIndex < stats.BufferStartIndex) {
stats.BufferStartIndex = partitionStats.BufferStartIndex
}
}
// Apply global limit across all partitions
if options.Limit > 0 && len(results) >= options.Limit {
results = results[:options.Limit]
@@ -162,18 +190,28 @@ func (hms *HybridMessageScanner) Scan(ctx context.Context, options HybridScanOpt
}
}
return results, nil
return results, stats, nil
}
// scanUnflushedData queries brokers for unflushed in-memory data using buffer_start deduplication
func (hms *HybridMessageScanner) scanUnflushedData(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) {
results, _, err := hms.scanUnflushedDataWithStats(ctx, partition, options)
return results, err
}
// scanUnflushedDataWithStats queries brokers for unflushed data and returns statistics
func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) {
var results []HybridScanResult
stats := &HybridScanStats{}
// Skip if no broker client available
if hms.brokerClient == nil {
return results, nil
return results, stats, nil
}
// Mark that we attempted to query broker buffer
stats.BrokerBufferQueried = true
// Step 1: Get unflushed data from broker using buffer_start-based method
// This method uses buffer_start metadata to avoid double-counting with exact precision
unflushedEntries, err := hms.brokerClient.GetUnflushedMessages(ctx, hms.topic.Namespace, hms.topic.Name, partition, options.StartTimeNs)
@@ -182,7 +220,20 @@ func (hms *HybridMessageScanner) scanUnflushedData(ctx context.Context, partitio
if isDebugMode(ctx) {
fmt.Printf("Debug: Failed to get unflushed messages: %v\n", err)
}
return results, nil
// Reset queried flag on error
stats.BrokerBufferQueried = false
return results, stats, nil
}
// Capture stats for EXPLAIN
stats.BrokerBufferMessages = len(unflushedEntries)
// Debug logging for EXPLAIN mode
if isDebugMode(ctx) {
fmt.Printf("Debug: Broker buffer queried - found %d unflushed messages\n", len(unflushedEntries))
if len(unflushedEntries) > 0 {
fmt.Printf("Debug: Using buffer_start deduplication for precise real-time data\n")
}
}
// Step 2: Process unflushed entries (already deduplicated by broker)
@@ -251,7 +302,7 @@ func (hms *HybridMessageScanner) scanUnflushedData(ctx context.Context, partitio
fmt.Printf("Debug: Retrieved %d unflushed messages from broker\n", len(results))
}
return results, nil
return results, stats, nil
}
// convertDataMessageToRecord converts mq_pb.DataMessage to schema_pb.RecordValue
@@ -344,15 +395,29 @@ func (hms *HybridMessageScanner) discoverTopicPartitions(ctx context.Context) ([
// 1. Unflushed in-memory data from brokers (REAL-TIME)
// 2. Live logs + Parquet files from disk (FLUSHED/ARCHIVED)
func (hms *HybridMessageScanner) scanPartitionHybrid(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) {
results, _, err := hms.scanPartitionHybridWithStats(ctx, partition, options)
return results, err
}
// scanPartitionHybridWithStats scans a specific partition and returns statistics
func (hms *HybridMessageScanner) scanPartitionHybridWithStats(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) {
var results []HybridScanResult
stats := &HybridScanStats{}
// STEP 1: Scan unflushed in-memory data from brokers (REAL-TIME)
unflushedResults, err := hms.scanUnflushedData(ctx, partition, options)
unflushedResults, unflushedStats, err := hms.scanUnflushedDataWithStats(ctx, partition, options)
if err != nil {
// Don't fail the query if broker scanning fails - just log and continue with disk data
fmt.Printf("Warning: Failed to scan unflushed data from broker: %v\n", err)
if !isDebugMode(ctx) {
fmt.Printf("Warning: Failed to scan unflushed data from broker: %v\n", err)
}
} else {
results = append(results, unflushedResults...)
if unflushedStats != nil {
stats.BrokerBufferQueried = unflushedStats.BrokerBufferQueried
stats.BrokerBufferMessages = unflushedStats.BrokerBufferMessages
stats.BufferStartIndex = unflushedStats.BufferStartIndex
}
}
// STEP 2: Scan flushed data from disk (live logs + Parquet files)
@@ -436,7 +501,7 @@ func (hms *HybridMessageScanner) scanPartitionHybrid(ctx context.Context, partit
_, _, err = mergedReadFn(startPosition, stopTsNs, eachLogEntryFn)
if err != nil {
return nil, fmt.Errorf("flushed data scan failed: %v", err)
return nil, stats, fmt.Errorf("flushed data scan failed: %v", err)
}
}
@@ -458,7 +523,7 @@ func (hms *HybridMessageScanner) scanPartitionHybrid(ctx context.Context, partit
results = results[:options.Limit]
}
return results, nil
return results, stats, nil
}
// convertLogEntryToRecordValue converts a filer_pb.LogEntry to schema_pb.RecordValue

View File

@@ -8,7 +8,7 @@ import (
type QueryExecutionPlan struct {
QueryType string
ExecutionStrategy string `json:"execution_strategy"` // fast_path, full_scan, hybrid
DataSources []string `json:"data_sources"` // parquet_files, live_logs
DataSources []string `json:"data_sources"` // parquet_files, live_logs, broker_buffer
PartitionsScanned int `json:"partitions_scanned"`
ParquetFilesScanned int `json:"parquet_files_scanned"`
LiveLogFilesScanned int `json:"live_log_files_scanned"`
@@ -18,6 +18,11 @@ type QueryExecutionPlan struct {
Aggregations []string `json:"aggregations,omitempty"`
ExecutionTimeMs float64 `json:"execution_time_ms"`
Details map[string]interface{} `json:"details,omitempty"`
// Broker buffer information
BrokerBufferQueried bool `json:"broker_buffer_queried"`
BrokerBufferMessages int `json:"broker_buffer_messages"`
BufferStartIndex int64 `json:"buffer_start_index,omitempty"`
}
// QueryResult represents the result of a SQL query execution