From 58e0c1b3301f5c5c0723f71726b6f1053e9152f9 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 10 Sep 2025 11:04:42 -0700 Subject: [PATCH] Fix sql bugs (#7219) * fix nil when explaining * add plain details when running full scan * skip files by timestamp * skip file by timestamp * refactor * handle filter by time * skip broker memory only if it has unflushed messages * refactoring * refactor * address comments * address comments * filter by parquet stats * simplify * refactor * prune old code * optimize * Update aggregations.go * ensure non-time predicates are properly detected * add stmt to populatePlanFileDetails This helper function is a great way to centralize logic for populating file details. However, it's missing an optimization that is present in executeSelectStatementWithBrokerStats: pruning Parquet files based on column statistics from the WHERE clause. Aggregation queries that fall back to the slow path could benefit from this optimization. Consider modifying the function signature to accept the *SelectStatement and adding the column statistics pruning logic here, similar to how it's done in executeSelectStatementWithBrokerStats. * refactoring to work with *schema_pb.Value directly after the initial conversion --- weed/query/engine/aggregations.go | 166 +++-- weed/query/engine/engine.go | 656 +++++++++++------- .../fast_path_predicate_validation_test.go | 272 ++++++++ weed/query/engine/hybrid_message_scanner.go | 50 ++ weed/query/engine/types.go | 6 + 5 files changed, 799 insertions(+), 351 deletions(-) create mode 100644 weed/query/engine/fast_path_predicate_validation_test.go diff --git a/weed/query/engine/aggregations.go b/weed/query/engine/aggregations.go index 623e489dd..6b58517e1 100644 --- a/weed/query/engine/aggregations.go +++ b/weed/query/engine/aggregations.go @@ -8,10 +8,8 @@ import ( "strings" "github.com/seaweedfs/seaweedfs/weed/mq/topic" - "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/query/sqltypes" - "github.com/seaweedfs/seaweedfs/weed/util" ) // AggregationSpec defines an aggregation function to be computed @@ -78,6 +76,12 @@ func (opt *FastPathOptimizer) DetermineStrategy(aggregations []AggregationSpec) // CollectDataSources gathers information about available data sources for a topic func (opt *FastPathOptimizer) CollectDataSources(ctx context.Context, hybridScanner *HybridMessageScanner) (*TopicDataSources, error) { + return opt.CollectDataSourcesWithTimeFilter(ctx, hybridScanner, 0, 0) +} + +// CollectDataSourcesWithTimeFilter gathers information about available data sources for a topic +// with optional time filtering to skip irrelevant parquet files +func (opt *FastPathOptimizer) CollectDataSourcesWithTimeFilter(ctx context.Context, hybridScanner *HybridMessageScanner, startTimeNs, stopTimeNs int64) (*TopicDataSources, error) { dataSources := &TopicDataSources{ ParquetFiles: make(map[string][]*ParquetFileStats), ParquetRowCount: 0, @@ -125,14 +129,16 @@ func (opt *FastPathOptimizer) CollectDataSources(ctx context.Context, hybridScan fmt.Printf(" No parquet files found in partition\n") } } else { - dataSources.ParquetFiles[partitionPath] = parquetStats + // Prune by time range using parquet column statistics + filtered := pruneParquetFilesByTime(ctx, parquetStats, hybridScanner, startTimeNs, stopTimeNs) + dataSources.ParquetFiles[partitionPath] = filtered partitionParquetRows := int64(0) - for _, stat := range parquetStats { + for _, stat := range filtered { partitionParquetRows += stat.RowCount dataSources.ParquetRowCount += stat.RowCount } if isDebugMode(ctx) { - fmt.Printf(" Found %d parquet files with %d total rows\n", len(parquetStats), partitionParquetRows) + fmt.Printf(" Found %d parquet files with %d total rows\n", len(filtered), partitionParquetRows) } } @@ -452,20 +458,27 @@ func (e *SQLEngine) executeAggregationQueryWithPlan(ctx context.Context, hybridS } } - // Extract time filters for optimization + // Extract time filters and validate that WHERE clause contains only time-based predicates startTimeNs, stopTimeNs := int64(0), int64(0) + onlyTimePredicates := true if stmt.Where != nil { - startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr) + startTimeNs, stopTimeNs, onlyTimePredicates = e.extractTimeFiltersWithValidation(stmt.Where.Expr) } - // FAST PATH RE-ENABLED WITH DEBUG LOGGING: - // Added comprehensive debug logging to identify data counting issues - // This will help us understand why fast path was returning 0 when slow path returns 1803 - if stmt.Where == nil { + // FAST PATH WITH TIME-BASED OPTIMIZATION: + // Allow fast path only for queries without WHERE clause or with time-only WHERE clauses + // This prevents incorrect results when non-time predicates are present + canAttemptFastPath := stmt.Where == nil || onlyTimePredicates + + if canAttemptFastPath { if isDebugMode(ctx) { - fmt.Printf("\nFast path optimization attempt...\n") + if stmt.Where == nil { + fmt.Printf("\nFast path optimization attempt (no WHERE clause)...\n") + } else { + fmt.Printf("\nFast path optimization attempt (time-only WHERE clause)...\n") + } } - fastResult, canOptimize := e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, plan) + fastResult, canOptimize := e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, plan, startTimeNs, stopTimeNs, stmt) if canOptimize { if isDebugMode(ctx) { fmt.Printf("Fast path optimization succeeded!\n") @@ -478,7 +491,7 @@ func (e *SQLEngine) executeAggregationQueryWithPlan(ctx context.Context, hybridS } } else { if isDebugMode(ctx) { - fmt.Printf("Fast path not applicable due to WHERE clause\n") + fmt.Printf("Fast path not applicable due to complex WHERE clause\n") } } @@ -605,23 +618,66 @@ func (e *SQLEngine) executeAggregationQueryWithPlan(ctx context.Context, hybridS // Build execution tree for aggregation queries if plan is provided if plan != nil { + // Populate detailed plan information for full scan (similar to fast path) + e.populateFullScanPlanDetails(ctx, plan, hybridScanner, stmt) plan.RootNode = e.buildExecutionTree(plan, stmt) } return result, nil } +// populateFullScanPlanDetails populates detailed plan information for full scan queries +// This provides consistency with fast path execution plan details +func (e *SQLEngine) populateFullScanPlanDetails(ctx context.Context, plan *QueryExecutionPlan, hybridScanner *HybridMessageScanner, stmt *SelectStatement) { + // plan.Details is initialized at the start of the SELECT execution + + // Extract table information + var database, tableName string + if len(stmt.From) == 1 { + if table, ok := stmt.From[0].(*AliasedTableExpr); ok { + if tableExpr, ok := table.Expr.(TableName); ok { + tableName = tableExpr.Name.String() + if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" { + database = tableExpr.Qualifier.String() + } + } + } + } + + // Use current database if not specified + if database == "" { + database = e.catalog.currentDatabase + if database == "" { + database = "default" + } + } + + // Discover partitions and populate file details + if partitions, discoverErr := e.discoverTopicPartitions(database, tableName); discoverErr == nil { + // Add partition paths to execution plan details + plan.Details["partition_paths"] = partitions + + // Populate detailed file information using shared helper + e.populatePlanFileDetails(ctx, plan, hybridScanner, partitions, stmt) + } else { + // Record discovery error to plan for better diagnostics + plan.Details["error_partition_discovery"] = discoverErr.Error() + } +} + // tryFastParquetAggregation attempts to compute aggregations using hybrid approach: // - Use parquet metadata for parquet files // - Count live log files for live data // - Combine both for accurate results per partition // Returns (result, canOptimize) where canOptimize=true means the hybrid fast path was used func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec) (*QueryResult, bool) { - return e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, nil) + return e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, nil, 0, 0, nil) } // tryFastParquetAggregationWithPlan is the same as tryFastParquetAggregation but also populates execution plan if provided -func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, plan *QueryExecutionPlan) (*QueryResult, bool) { +// startTimeNs, stopTimeNs: optional time range filters for parquet file optimization (0 means no filtering) +// stmt: SELECT statement for column statistics pruning optimization (can be nil) +func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, plan *QueryExecutionPlan, startTimeNs, stopTimeNs int64, stmt *SelectStatement) (*QueryResult, bool) { // Use the new modular components optimizer := NewFastPathOptimizer(e) computer := NewAggregationComputer(e) @@ -632,8 +688,8 @@ func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybri return nil, false } - // Step 2: Collect data sources - dataSources, err := optimizer.CollectDataSources(ctx, hybridScanner) + // Step 2: Collect data sources with time filtering for parquet file optimization + dataSources, err := optimizer.CollectDataSourcesWithTimeFilter(ctx, hybridScanner, startTimeNs, stopTimeNs) if err != nil { return nil, false } @@ -725,9 +781,6 @@ func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybri } // Merge details while preserving existing ones - if plan.Details == nil { - plan.Details = make(map[string]interface{}) - } for key, value := range aggPlan.Details { plan.Details[key] = value } @@ -735,51 +788,17 @@ func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybri // Add file path information from the data collection plan.Details["partition_paths"] = partitions - // Collect actual file information for each partition - var parquetFiles []string - var liveLogFiles []string - parquetSources := make(map[string]bool) + // Populate detailed file information using shared helper, including time filters for pruning + plan.Details[PlanDetailStartTimeNs] = startTimeNs + plan.Details[PlanDetailStopTimeNs] = stopTimeNs + e.populatePlanFileDetails(ctx, plan, hybridScanner, partitions, stmt) - for _, partitionPath := range partitions { - // Get parquet files for this partition - if parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath); err == nil { - for _, stats := range parquetStats { - parquetFiles = append(parquetFiles, fmt.Sprintf("%s/%s", partitionPath, stats.FileName)) - } - } - - // Merge accurate parquet sources from metadata (preferred over filename fallback) - if sources, err := e.getParquetSourceFilesFromMetadata(partitionPath); err == nil { - for src := range sources { - parquetSources[src] = true - } - } - - // Get live log files for this partition - if liveFiles, err := e.collectLiveLogFileNames(hybridScanner.filerClient, partitionPath); err == nil { - for _, fileName := range liveFiles { - // Exclude live log files that have been converted to parquet (deduplicated) - if parquetSources[fileName] { - continue - } - liveLogFiles = append(liveLogFiles, fmt.Sprintf("%s/%s", partitionPath, fileName)) - } - } + // Update counts to match discovered live log files + if liveLogFiles, ok := plan.Details["live_log_files"].([]string); ok { + dataSources.LiveLogFilesCount = len(liveLogFiles) + plan.LiveLogFilesScanned = len(liveLogFiles) } - if len(parquetFiles) > 0 { - plan.Details["parquet_files"] = parquetFiles - } - if len(liveLogFiles) > 0 { - plan.Details["live_log_files"] = liveLogFiles - } - - // Update the dataSources.LiveLogFilesCount to match the actual files found - dataSources.LiveLogFilesCount = len(liveLogFiles) - - // Also update the plan's LiveLogFilesScanned to match - plan.LiveLogFilesScanned = len(liveLogFiles) - // Ensure PartitionsScanned is set so Statistics section appears if plan.PartitionsScanned == 0 && len(partitions) > 0 { plan.PartitionsScanned = len(partitions) @@ -912,24 +931,3 @@ func debugHybridScanOptions(ctx context.Context, options HybridScanOptions, quer fmt.Printf("==========================================\n") } } - -// collectLiveLogFileNames collects the names of live log files in a partition -func collectLiveLogFileNames(filerClient filer_pb.FilerClient, partitionPath string) ([]string, error) { - var fileNames []string - - err := filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { - // Skip directories and parquet files - if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") || strings.HasSuffix(entry.Name, ".offset") { - return nil - } - - // Only include files with actual content - if len(entry.Chunks) > 0 { - fileNames = append(fileNames, entry.Name) - } - - return nil - }) - - return fileNames, err -} diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index 84c238583..ffed03f35 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -495,69 +495,6 @@ func ParseSQL(sql string) (Statement, error) { } } -// extractFunctionArguments extracts the arguments from a function call expression using CockroachDB parser -func extractFunctionArguments(expr string) ([]SelectExpr, error) { - // Find the parentheses - startParen := strings.Index(expr, "(") - endParen := strings.LastIndex(expr, ")") - - if startParen == -1 || endParen == -1 || endParen <= startParen { - return nil, fmt.Errorf("invalid function syntax") - } - - // Extract arguments string - argsStr := strings.TrimSpace(expr[startParen+1 : endParen]) - - // Handle empty arguments - if argsStr == "" { - return []SelectExpr{}, nil - } - - // Handle single * argument (for COUNT(*)) - if argsStr == "*" { - return []SelectExpr{&StarExpr{}}, nil - } - - // Parse multiple arguments separated by commas - args := []SelectExpr{} - argParts := strings.Split(argsStr, ",") - - // Use CockroachDB parser to parse each argument as a SELECT expression - cockroachParser := NewCockroachSQLParser() - - for _, argPart := range argParts { - argPart = strings.TrimSpace(argPart) - if argPart == "*" { - args = append(args, &StarExpr{}) - } else { - // Create a dummy SELECT statement to parse the argument expression - dummySelect := fmt.Sprintf("SELECT %s", argPart) - - // Parse using CockroachDB parser - stmt, err := cockroachParser.ParseSQL(dummySelect) - if err != nil { - // If CockroachDB parser fails, fall back to simple column name - args = append(args, &AliasedExpr{ - Expr: &ColName{Name: stringValue(argPart)}, - }) - continue - } - - // Extract the expression from the parsed SELECT statement - if selectStmt, ok := stmt.(*SelectStatement); ok && len(selectStmt.SelectExprs) > 0 { - args = append(args, selectStmt.SelectExprs[0]) - } else { - // Fallback to column name if parsing fails - args = append(args, &AliasedExpr{ - Expr: &ColName{Name: stringValue(argPart)}, - }) - } - } - } - - return args, nil -} - // debugModeKey is used to store debug mode flag in context type debugModeKey struct{} @@ -1221,8 +1158,8 @@ func (e *SQLEngine) buildExecutionTree(plan *QueryExecutionPlan, stmt *SelectSta } } - // Create broker buffer node if queried - if plan.BrokerBufferQueried { + // Create broker buffer node only if queried AND has unflushed messages + if plan.BrokerBufferQueried && plan.BrokerBufferMessages > 0 { brokerBufferNodes = append(brokerBufferNodes, &FileSourceNode{ FilePath: "broker_memory_buffer", SourceType: "broker_buffer", @@ -1489,6 +1426,8 @@ func (e *SQLEngine) formatOptimization(opt string) string { return "Duplicate Data Avoidance" case "predicate_pushdown": return "WHERE Clause Pushdown" + case "column_statistics_pruning": + return "Column Statistics File Pruning" case "column_projection": return "Column Selection" case "limit_pushdown": @@ -1540,6 +1479,10 @@ func (e *SQLEngine) executeDDLStatement(ctx context.Context, stmt *DDLStatement) // executeSelectStatementWithPlan handles SELECT queries with execution plan tracking func (e *SQLEngine) executeSelectStatementWithPlan(ctx context.Context, stmt *SelectStatement, plan *QueryExecutionPlan) (*QueryResult, error) { + // Initialize plan details once + if plan != nil && plan.Details == nil { + plan.Details = make(map[string]interface{}) + } // Parse aggregations to populate plan var aggregations []AggregationSpec hasAggregations := false @@ -1577,7 +1520,7 @@ func (e *SQLEngine) executeSelectStatementWithPlan(ctx context.Context, stmt *Se if table, ok := stmt.From[0].(*AliasedTableExpr); ok { if tableExpr, ok := table.Expr.(TableName); ok { tableName = tableExpr.Name.String() - if tableExpr.Qualifier.String() != "" { + if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" { database = tableExpr.Qualifier.String() } } @@ -2290,18 +2233,51 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s if partitions, discoverErr := e.discoverTopicPartitions(database, tableName); discoverErr == nil { // Add partition paths to execution plan details plan.Details["partition_paths"] = partitions + // Persist time filter details for downstream pruning/diagnostics + plan.Details[PlanDetailStartTimeNs] = startTimeNs + plan.Details[PlanDetailStopTimeNs] = stopTimeNs + + if isDebugMode(ctx) { + fmt.Printf("Debug: Time filters extracted - startTimeNs=%d stopTimeNs=%d\n", startTimeNs, stopTimeNs) + } // Collect actual file information for each partition var parquetFiles []string var liveLogFiles []string parquetSources := make(map[string]bool) + var parquetReadErrors []string + var liveLogListErrors []string for _, partitionPath := range partitions { // Get parquet files for this partition if parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath); err == nil { - for _, stats := range parquetStats { + // Prune files by time range with debug logging + filteredStats := pruneParquetFilesByTime(ctx, parquetStats, hybridScanner, startTimeNs, stopTimeNs) + + // Further prune by column statistics from WHERE clause + if stmt.Where != nil { + beforeColumnPrune := len(filteredStats) + filteredStats = e.pruneParquetFilesByColumnStats(ctx, filteredStats, stmt.Where.Expr) + columnPrunedCount := beforeColumnPrune - len(filteredStats) + + if columnPrunedCount > 0 { + if isDebugMode(ctx) { + fmt.Printf("Debug: Column statistics pruning skipped %d parquet files in %s\n", columnPrunedCount, partitionPath) + } + // Track column statistics optimization + if !contains(plan.OptimizationsUsed, "column_statistics_pruning") { + plan.OptimizationsUsed = append(plan.OptimizationsUsed, "column_statistics_pruning") + } + } + } + for _, stats := range filteredStats { parquetFiles = append(parquetFiles, fmt.Sprintf("%s/%s", partitionPath, stats.FileName)) } + } else { + parquetReadErrors = append(parquetReadErrors, fmt.Sprintf("%s: %v", partitionPath, err)) + if isDebugMode(ctx) { + fmt.Printf("Debug: Failed to read parquet statistics in %s: %v\n", partitionPath, err) + } } // Merge accurate parquet sources from metadata @@ -2320,6 +2296,11 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s } liveLogFiles = append(liveLogFiles, fmt.Sprintf("%s/%s", partitionPath, fileName)) } + } else { + liveLogListErrors = append(liveLogListErrors, fmt.Sprintf("%s: %v", partitionPath, err)) + if isDebugMode(ctx) { + fmt.Printf("Debug: Failed to list live log files in %s: %v\n", partitionPath, err) + } } } @@ -2329,11 +2310,20 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s if len(liveLogFiles) > 0 { plan.Details["live_log_files"] = liveLogFiles } + if len(parquetReadErrors) > 0 { + plan.Details["error_parquet_statistics"] = parquetReadErrors + } + if len(liveLogListErrors) > 0 { + plan.Details["error_live_log_listing"] = liveLogListErrors + } // Update scan statistics for execution plan display plan.PartitionsScanned = len(partitions) plan.ParquetFilesScanned = len(parquetFiles) plan.LiveLogFilesScanned = len(liveLogFiles) + } else { + // Handle partition discovery error + plan.Details["error_partition_discovery"] = discoverErr.Error() } } else { // Normal mode - just get results @@ -2377,6 +2367,23 @@ func (e *SQLEngine) extractTimeFilters(expr ExprNode) (int64, int64) { return startTimeNs, stopTimeNs } +// extractTimeFiltersWithValidation extracts time filters and validates that WHERE clause contains only time-based predicates +// Returns (startTimeNs, stopTimeNs, onlyTimePredicates) where onlyTimePredicates indicates if fast path is safe +func (e *SQLEngine) extractTimeFiltersWithValidation(expr ExprNode) (int64, int64, bool) { + startTimeNs, stopTimeNs := int64(0), int64(0) + onlyTimePredicates := true + + // Recursively extract time filters and validate predicates + e.extractTimeFiltersWithValidationRecursive(expr, &startTimeNs, &stopTimeNs, &onlyTimePredicates) + + // Special case: if startTimeNs == stopTimeNs, treat it like an equality query + if startTimeNs != 0 && startTimeNs == stopTimeNs { + stopTimeNs = 0 + } + + return startTimeNs, stopTimeNs, onlyTimePredicates +} + // extractTimeFiltersRecursive recursively processes WHERE expressions to find time comparisons func (e *SQLEngine) extractTimeFiltersRecursive(expr ExprNode, startTimeNs, stopTimeNs *int64) { switch exprType := expr.(type) { @@ -2396,6 +2403,39 @@ func (e *SQLEngine) extractTimeFiltersRecursive(expr ExprNode, startTimeNs, stop } } +// extractTimeFiltersWithValidationRecursive recursively processes WHERE expressions to find time comparisons and validate predicates +func (e *SQLEngine) extractTimeFiltersWithValidationRecursive(expr ExprNode, startTimeNs, stopTimeNs *int64, onlyTimePredicates *bool) { + switch exprType := expr.(type) { + case *ComparisonExpr: + // Check if this is a time-based comparison + leftCol := e.getColumnName(exprType.Left) + rightCol := e.getColumnName(exprType.Right) + + isTimeComparison := e.isTimestampColumn(leftCol) || e.isTimestampColumn(rightCol) + if isTimeComparison { + // Extract time filter from this comparison + e.extractTimeFromComparison(exprType, startTimeNs, stopTimeNs) + } else { + // Non-time predicate found - fast path is not safe + *onlyTimePredicates = false + } + case *AndExpr: + // For AND expressions, both sides must be time-only for fast path to be safe + e.extractTimeFiltersWithValidationRecursive(exprType.Left, startTimeNs, stopTimeNs, onlyTimePredicates) + e.extractTimeFiltersWithValidationRecursive(exprType.Right, startTimeNs, stopTimeNs, onlyTimePredicates) + case *OrExpr: + // OR expressions are complex and not supported in fast path + *onlyTimePredicates = false + return + case *ParenExpr: + // Unwrap parentheses and continue + e.extractTimeFiltersWithValidationRecursive(exprType.Expr, startTimeNs, stopTimeNs, onlyTimePredicates) + default: + // Unknown expression type - not safe for fast path + *onlyTimePredicates = false + } +} + // extractTimeFromComparison extracts time bounds from comparison expressions // Handles comparisons against timestamp columns (system columns and schema-defined timestamp types) func (e *SQLEngine) extractTimeFromComparison(comp *ComparisonExpr, startTimeNs, stopTimeNs *int64) { @@ -2465,7 +2505,7 @@ func (e *SQLEngine) isTimestampColumn(columnName string) bool { } // System timestamp columns are always time columns - if columnName == SW_COLUMN_NAME_TIMESTAMP { + if columnName == SW_COLUMN_NAME_TIMESTAMP || columnName == SW_DISPLAY_NAME_TIMESTAMP { return true } @@ -2495,6 +2535,280 @@ func (e *SQLEngine) isTimestampColumn(columnName string) bool { return false } +// getTimeFiltersFromPlan extracts time filter values from execution plan details +func getTimeFiltersFromPlan(plan *QueryExecutionPlan) (startTimeNs, stopTimeNs int64) { + if plan == nil || plan.Details == nil { + return 0, 0 + } + if startNsVal, ok := plan.Details[PlanDetailStartTimeNs]; ok { + if startNs, ok2 := startNsVal.(int64); ok2 { + startTimeNs = startNs + } + } + if stopNsVal, ok := plan.Details[PlanDetailStopTimeNs]; ok { + if stopNs, ok2 := stopNsVal.(int64); ok2 { + stopTimeNs = stopNs + } + } + return +} + +// pruneParquetFilesByTime filters parquet files based on timestamp ranges, with optional debug logging +func pruneParquetFilesByTime(ctx context.Context, parquetStats []*ParquetFileStats, hybridScanner *HybridMessageScanner, startTimeNs, stopTimeNs int64) []*ParquetFileStats { + if startTimeNs == 0 && stopTimeNs == 0 { + return parquetStats + } + + debugEnabled := ctx != nil && isDebugMode(ctx) + qStart := startTimeNs + qStop := stopTimeNs + if qStop == 0 { + qStop = math.MaxInt64 + } + + n := 0 + for _, fs := range parquetStats { + if debugEnabled { + fmt.Printf("Debug: Checking parquet file %s for pruning\n", fs.FileName) + } + if minNs, maxNs, ok := hybridScanner.getTimestampRangeFromStats(fs); ok { + if debugEnabled { + fmt.Printf("Debug: Prune check parquet %s min=%d max=%d qStart=%d qStop=%d\n", fs.FileName, minNs, maxNs, qStart, qStop) + } + if qStop < minNs || (qStart != 0 && qStart > maxNs) { + if debugEnabled { + fmt.Printf("Debug: Skipping parquet file %s due to no time overlap\n", fs.FileName) + } + continue + } + } else if debugEnabled { + fmt.Printf("Debug: No stats range available for parquet %s, cannot prune\n", fs.FileName) + } + parquetStats[n] = fs + n++ + } + return parquetStats[:n] +} + +// pruneParquetFilesByColumnStats filters parquet files based on column statistics and WHERE predicates +func (e *SQLEngine) pruneParquetFilesByColumnStats(ctx context.Context, parquetStats []*ParquetFileStats, whereExpr ExprNode) []*ParquetFileStats { + if whereExpr == nil { + return parquetStats + } + + debugEnabled := ctx != nil && isDebugMode(ctx) + n := 0 + for _, fs := range parquetStats { + if e.canSkipParquetFile(ctx, fs, whereExpr) { + if debugEnabled { + fmt.Printf("Debug: Skipping parquet file %s due to column statistics pruning\n", fs.FileName) + } + continue + } + parquetStats[n] = fs + n++ + } + return parquetStats[:n] +} + +// canSkipParquetFile determines if a parquet file can be skipped based on column statistics +func (e *SQLEngine) canSkipParquetFile(ctx context.Context, fileStats *ParquetFileStats, whereExpr ExprNode) bool { + switch expr := whereExpr.(type) { + case *ComparisonExpr: + return e.canSkipFileByComparison(ctx, fileStats, expr) + case *AndExpr: + // For AND: skip if ANY condition allows skipping (more aggressive pruning) + return e.canSkipParquetFile(ctx, fileStats, expr.Left) || e.canSkipParquetFile(ctx, fileStats, expr.Right) + case *OrExpr: + // For OR: skip only if ALL conditions allow skipping (conservative) + return e.canSkipParquetFile(ctx, fileStats, expr.Left) && e.canSkipParquetFile(ctx, fileStats, expr.Right) + default: + // Unknown expression type - don't skip + return false + } +} + +// canSkipFileByComparison checks if a file can be skipped based on a comparison predicate +func (e *SQLEngine) canSkipFileByComparison(ctx context.Context, fileStats *ParquetFileStats, expr *ComparisonExpr) bool { + // Extract column name and comparison value + var columnName string + var compareSchemaValue *schema_pb.Value + var operator string = expr.Operator + + // Determine which side is the column and which is the value + if colRef, ok := expr.Left.(*ColName); ok { + columnName = colRef.Name.String() + if sqlVal, ok := expr.Right.(*SQLVal); ok { + compareSchemaValue = e.convertSQLValToSchemaValue(sqlVal) + } else { + return false // Can't optimize complex expressions + } + } else if colRef, ok := expr.Right.(*ColName); ok { + columnName = colRef.Name.String() + if sqlVal, ok := expr.Left.(*SQLVal); ok { + compareSchemaValue = e.convertSQLValToSchemaValue(sqlVal) + // Flip operator for reversed comparison + operator = e.flipOperator(operator) + } else { + return false + } + } else { + return false // No column reference found + } + + // Validate comparison value + if compareSchemaValue == nil { + return false + } + + // Get column statistics + colStats, exists := fileStats.ColumnStats[columnName] + if !exists || colStats == nil { + // Try case-insensitive lookup + for colName, stats := range fileStats.ColumnStats { + if strings.EqualFold(colName, columnName) { + colStats = stats + exists = true + break + } + } + } + + if !exists || colStats == nil || colStats.MinValue == nil || colStats.MaxValue == nil { + return false // No statistics available + } + + // Apply pruning logic based on operator + switch operator { + case ">": + // Skip if max(column) <= compareValue + return e.compareValues(colStats.MaxValue, compareSchemaValue) <= 0 + case ">=": + // Skip if max(column) < compareValue + return e.compareValues(colStats.MaxValue, compareSchemaValue) < 0 + case "<": + // Skip if min(column) >= compareValue + return e.compareValues(colStats.MinValue, compareSchemaValue) >= 0 + case "<=": + // Skip if min(column) > compareValue + return e.compareValues(colStats.MinValue, compareSchemaValue) > 0 + case "=": + // Skip if compareValue is outside [min, max] range + return e.compareValues(compareSchemaValue, colStats.MinValue) < 0 || + e.compareValues(compareSchemaValue, colStats.MaxValue) > 0 + case "!=", "<>": + // Skip if min == max == compareValue (all values are the same and equal to compareValue) + return e.compareValues(colStats.MinValue, colStats.MaxValue) == 0 && + e.compareValues(colStats.MinValue, compareSchemaValue) == 0 + default: + return false // Unknown operator + } +} + +// flipOperator flips comparison operators when operands are swapped +func (e *SQLEngine) flipOperator(op string) string { + switch op { + case ">": + return "<" + case ">=": + return "<=" + case "<": + return ">" + case "<=": + return ">=" + case "=", "!=", "<>": + return op // These are symmetric + default: + return op + } +} + +// populatePlanFileDetails populates execution plan with detailed file information for partitions +// Includes column statistics pruning optimization when WHERE clause is provided +func (e *SQLEngine) populatePlanFileDetails(ctx context.Context, plan *QueryExecutionPlan, hybridScanner *HybridMessageScanner, partitions []string, stmt *SelectStatement) { + debugEnabled := ctx != nil && isDebugMode(ctx) + // Collect actual file information for each partition + var parquetFiles []string + var liveLogFiles []string + parquetSources := make(map[string]bool) + var parquetReadErrors []string + var liveLogListErrors []string + + // Extract time filters from plan details + startTimeNs, stopTimeNs := getTimeFiltersFromPlan(plan) + + for _, partitionPath := range partitions { + // Get parquet files for this partition + if parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath); err == nil { + // Prune files by time range + filteredStats := pruneParquetFilesByTime(ctx, parquetStats, hybridScanner, startTimeNs, stopTimeNs) + + // Further prune by column statistics from WHERE clause + if stmt != nil && stmt.Where != nil { + beforeColumnPrune := len(filteredStats) + filteredStats = e.pruneParquetFilesByColumnStats(ctx, filteredStats, stmt.Where.Expr) + columnPrunedCount := beforeColumnPrune - len(filteredStats) + + if columnPrunedCount > 0 { + if debugEnabled { + fmt.Printf("Debug: Column statistics pruning skipped %d parquet files in %s\n", columnPrunedCount, partitionPath) + } + // Track column statistics optimization + if !contains(plan.OptimizationsUsed, "column_statistics_pruning") { + plan.OptimizationsUsed = append(plan.OptimizationsUsed, "column_statistics_pruning") + } + } + } + + for _, stats := range filteredStats { + parquetFiles = append(parquetFiles, fmt.Sprintf("%s/%s", partitionPath, stats.FileName)) + } + } else { + parquetReadErrors = append(parquetReadErrors, fmt.Sprintf("%s: %v", partitionPath, err)) + if debugEnabled { + fmt.Printf("Debug: Failed to read parquet statistics in %s: %v\n", partitionPath, err) + } + } + + // Merge accurate parquet sources from metadata + if sources, err := e.getParquetSourceFilesFromMetadata(partitionPath); err == nil { + for src := range sources { + parquetSources[src] = true + } + } + + // Get live log files for this partition + if liveFiles, err := e.collectLiveLogFileNames(hybridScanner.filerClient, partitionPath); err == nil { + for _, fileName := range liveFiles { + // Exclude live log files that have been converted to parquet (deduplicated) + if parquetSources[fileName] { + continue + } + liveLogFiles = append(liveLogFiles, fmt.Sprintf("%s/%s", partitionPath, fileName)) + } + } else { + liveLogListErrors = append(liveLogListErrors, fmt.Sprintf("%s: %v", partitionPath, err)) + if debugEnabled { + fmt.Printf("Debug: Failed to list live log files in %s: %v\n", partitionPath, err) + } + } + } + + // Add file lists to plan details + if len(parquetFiles) > 0 { + plan.Details["parquet_files"] = parquetFiles + } + if len(liveLogFiles) > 0 { + plan.Details["live_log_files"] = liveLogFiles + } + if len(parquetReadErrors) > 0 { + plan.Details["error_parquet_statistics"] = parquetReadErrors + } + if len(liveLogListErrors) > 0 { + plan.Details["error_live_log_listing"] = liveLogListErrors + } +} + // isSQLTypeTimestamp checks if a SQL type string represents a timestamp type func (e *SQLEngine) isSQLTypeTimestamp(sqlType string) bool { upperType := strings.ToUpper(strings.TrimSpace(sqlType)) @@ -2664,56 +2978,6 @@ func (e *SQLEngine) buildPredicateWithContext(expr ExprNode, selectExprs []Selec } } -// buildComparisonPredicateWithAliases creates a predicate for comparison operations with alias support -func (e *SQLEngine) buildComparisonPredicateWithAliases(expr *ComparisonExpr, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) { - var columnName string - var compareValue interface{} - var operator string - - // Extract the comparison details, resolving aliases if needed - leftCol := e.getColumnNameWithAliases(expr.Left, aliases) - rightCol := e.getColumnNameWithAliases(expr.Right, aliases) - operator = e.normalizeOperator(expr.Operator) - - if leftCol != "" && rightCol == "" { - // Left side is column, right side is value - columnName = e.getSystemColumnInternalName(leftCol) - val, err := e.extractValueFromExpr(expr.Right) - if err != nil { - return nil, err - } - compareValue = e.convertValueForTimestampColumn(columnName, val, expr.Right) - } else if rightCol != "" && leftCol == "" { - // Right side is column, left side is value - columnName = e.getSystemColumnInternalName(rightCol) - val, err := e.extractValueFromExpr(expr.Left) - if err != nil { - return nil, err - } - compareValue = e.convertValueForTimestampColumn(columnName, val, expr.Left) - // Reverse the operator when column is on the right - operator = e.reverseOperator(operator) - } else if leftCol != "" && rightCol != "" { - return nil, fmt.Errorf("column-to-column comparisons not yet supported") - } else { - return nil, fmt.Errorf("at least one side of comparison must be a column") - } - - return func(record *schema_pb.RecordValue) bool { - fieldValue, exists := record.Fields[columnName] - if !exists { - return false - } - return e.evaluateComparison(fieldValue, operator, compareValue) - }, nil -} - -// buildComparisonPredicate creates a predicate for comparison operations (=, <, >, etc.) -// Handles column names on both left and right sides of the comparison -func (e *SQLEngine) buildComparisonPredicate(expr *ComparisonExpr) (func(*schema_pb.RecordValue) bool, error) { - return e.buildComparisonPredicateWithContext(expr, nil) -} - // buildComparisonPredicateWithContext creates a predicate for comparison operations with alias support func (e *SQLEngine) buildComparisonPredicateWithContext(expr *ComparisonExpr, selectExprs []SelectExpr) (func(*schema_pb.RecordValue) bool, error) { var columnName string @@ -2836,54 +3100,6 @@ func (e *SQLEngine) buildBetweenPredicateWithContext(expr *BetweenExpr, selectEx }, nil } -// buildBetweenPredicateWithAliases creates a predicate for BETWEEN operations with alias support -func (e *SQLEngine) buildBetweenPredicateWithAliases(expr *BetweenExpr, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) { - var columnName string - var fromValue, toValue interface{} - - // Extract column name from left side with alias resolution - leftCol := e.getColumnNameWithAliases(expr.Left, aliases) - if leftCol == "" { - return nil, fmt.Errorf("BETWEEN left operand must be a column name, got: %T", expr.Left) - } - columnName = e.getSystemColumnInternalName(leftCol) - - // Extract FROM value - fromVal, err := e.extractValueFromExpr(expr.From) - if err != nil { - return nil, fmt.Errorf("failed to extract BETWEEN from value: %v", err) - } - fromValue = e.convertValueForTimestampColumn(columnName, fromVal, expr.From) - - // Extract TO value - toVal, err := e.extractValueFromExpr(expr.To) - if err != nil { - return nil, fmt.Errorf("failed to extract BETWEEN to value: %v", err) - } - toValue = e.convertValueForTimestampColumn(columnName, toVal, expr.To) - - // Return the predicate function - return func(record *schema_pb.RecordValue) bool { - fieldValue, exists := record.Fields[columnName] - if !exists { - return false - } - - // Evaluate: fieldValue >= fromValue AND fieldValue <= toValue - greaterThanOrEqualFrom := e.evaluateComparison(fieldValue, ">=", fromValue) - lessThanOrEqualTo := e.evaluateComparison(fieldValue, "<=", toValue) - - result := greaterThanOrEqualFrom && lessThanOrEqualTo - - // Handle NOT BETWEEN - if expr.Not { - result = !result - } - - return result - }, nil -} - // buildIsNullPredicateWithContext creates a predicate for IS NULL operations func (e *SQLEngine) buildIsNullPredicateWithContext(expr *IsNullExpr, selectExprs []SelectExpr) (func(*schema_pb.RecordValue) bool, error) { // Check if the expression is a column name @@ -2936,50 +3152,6 @@ func (e *SQLEngine) buildIsNotNullPredicateWithContext(expr *IsNotNullExpr, sele } } -// buildIsNullPredicateWithAliases creates a predicate for IS NULL operations with alias support -func (e *SQLEngine) buildIsNullPredicateWithAliases(expr *IsNullExpr, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) { - // Extract column name from expression with alias resolution - columnName := e.getColumnNameWithAliases(expr.Expr, aliases) - if columnName == "" { - return nil, fmt.Errorf("IS NULL operand must be a column name, got: %T", expr.Expr) - } - columnName = e.getSystemColumnInternalName(columnName) - - // Return the predicate function - return func(record *schema_pb.RecordValue) bool { - // Check if field exists and if it's null or missing - fieldValue, exists := record.Fields[columnName] - if !exists { - return true // Field doesn't exist = NULL - } - - // Check if the field value itself is null/empty - return e.isValueNull(fieldValue) - }, nil -} - -// buildIsNotNullPredicateWithAliases creates a predicate for IS NOT NULL operations with alias support -func (e *SQLEngine) buildIsNotNullPredicateWithAliases(expr *IsNotNullExpr, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) { - // Extract column name from expression with alias resolution - columnName := e.getColumnNameWithAliases(expr.Expr, aliases) - if columnName == "" { - return nil, fmt.Errorf("IS NOT NULL operand must be a column name, got: %T", expr.Expr) - } - columnName = e.getSystemColumnInternalName(columnName) - - // Return the predicate function - return func(record *schema_pb.RecordValue) bool { - // Check if field exists and if it's not null - fieldValue, exists := record.Fields[columnName] - if !exists { - return false // Field doesn't exist = NULL, so NOT NULL is false - } - - // Check if the field value itself is not null/empty - return !e.isValueNull(fieldValue) - }, nil -} - // isValueNull checks if a schema_pb.Value is null or represents a null value func (e *SQLEngine) isValueNull(value *schema_pb.Value) bool { if value == nil { @@ -3019,33 +3191,6 @@ func (e *SQLEngine) isValueNull(value *schema_pb.Value) bool { } } -// getColumnNameWithAliases extracts column name from expression, resolving aliases if needed -func (e *SQLEngine) getColumnNameWithAliases(expr ExprNode, aliases map[string]ExprNode) string { - switch exprType := expr.(type) { - case *ColName: - colName := exprType.Name.String() - // Check if this is an alias that should be resolved - if aliases != nil { - if actualExpr, exists := aliases[colName]; exists { - // Recursively resolve the aliased expression - return e.getColumnNameWithAliases(actualExpr, nil) // Don't recurse aliases - } - } - return colName - } - return "" -} - -// extractValueFromExpr extracts a value from an expression node (for alias support) -func (e *SQLEngine) extractValueFromExpr(expr ExprNode) (interface{}, error) { - return e.extractComparisonValue(expr) -} - -// normalizeOperator normalizes comparison operators -func (e *SQLEngine) normalizeOperator(op string) string { - return op // For now, just return as-is -} - // extractComparisonValue extracts the comparison value from a SQL expression func (e *SQLEngine) extractComparisonValue(expr ExprNode) (interface{}, error) { switch val := expr.(type) { @@ -4178,31 +4323,6 @@ func (e *SQLEngine) extractTimestampFromFilename(filename string) int64 { return t.UnixNano() } -// countLiveLogRows counts the total number of rows in live log files (non-parquet files) in a partition -func (e *SQLEngine) countLiveLogRows(partitionPath string) (int64, error) { - filerClient, err := e.catalog.brokerClient.GetFilerClient() - if err != nil { - return 0, err - } - - totalRows := int64(0) - err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { - if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") { - return nil // Skip directories and parquet files - } - - // Count rows in live log file - rowCount, err := e.countRowsInLogFile(filerClient, partitionPath, entry) - if err != nil { - fmt.Printf("Warning: failed to count rows in %s/%s: %v\n", partitionPath, entry.Name, err) - return nil // Continue with other files - } - totalRows += rowCount - return nil - }) - return totalRows, err -} - // extractParquetSourceFiles extracts source log file names from parquet file metadata for deduplication func (e *SQLEngine) extractParquetSourceFiles(fileStats []*ParquetFileStats) map[string]bool { sourceFiles := make(map[string]bool) @@ -4226,6 +4346,7 @@ func (e *SQLEngine) extractParquetSourceFiles(fileStats []*ParquetFileStats) map // countLiveLogRowsExcludingParquetSources counts live log rows but excludes files that were converted to parquet and duplicate log buffer data func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context, partitionPath string, parquetSourceFiles map[string]bool) (int64, error) { + debugEnabled := ctx != nil && isDebugMode(ctx) filerClient, err := e.catalog.brokerClient.GetFilerClient() if err != nil { return 0, err @@ -4242,14 +4363,14 @@ func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context, // Second, get duplicate files from log buffer metadata logBufferDuplicates, err := e.buildLogBufferDeduplicationMap(ctx, partitionPath) if err != nil { - if isDebugMode(ctx) { + if debugEnabled { fmt.Printf("Warning: failed to build log buffer deduplication map: %v\n", err) } logBufferDuplicates = make(map[string]bool) } // Debug: Show deduplication status (only in explain mode) - if isDebugMode(ctx) { + if debugEnabled { if len(actualSourceFiles) > 0 { fmt.Printf("Excluding %d converted log files from %s\n", len(actualSourceFiles), partitionPath) } @@ -4266,7 +4387,7 @@ func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context, // Skip files that have been converted to parquet if actualSourceFiles[entry.Name] { - if isDebugMode(ctx) { + if debugEnabled { fmt.Printf("Skipping %s (already converted to parquet)\n", entry.Name) } return nil @@ -4274,7 +4395,7 @@ func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context, // Skip files that are duplicated due to log buffer metadata if logBufferDuplicates[entry.Name] { - if isDebugMode(ctx) { + if debugEnabled { fmt.Printf("Skipping %s (duplicate log buffer data)\n", entry.Name) } return nil @@ -4345,6 +4466,7 @@ func (e *SQLEngine) getLogBufferStartFromFile(entry *filer_pb.Entry) (*LogBuffer // buildLogBufferDeduplicationMap creates a map to track duplicate files based on buffer ranges (ultra-efficient) func (e *SQLEngine) buildLogBufferDeduplicationMap(ctx context.Context, partitionPath string) (map[string]bool, error) { + debugEnabled := ctx != nil && isDebugMode(ctx) if e.catalog.brokerClient == nil { return make(map[string]bool), nil } @@ -4390,7 +4512,7 @@ func (e *SQLEngine) buildLogBufferDeduplicationMap(ctx context.Context, partitio if fileRange.start <= processedRange.end && fileRange.end >= processedRange.start { // Ranges overlap - this file contains duplicate buffer indexes isDuplicate = true - if isDebugMode(ctx) { + if debugEnabled { fmt.Printf("Marking %s as duplicate (buffer range [%d-%d] overlaps with [%d-%d])\n", entry.Name, fileRange.start, fileRange.end, processedRange.start, processedRange.end) } diff --git a/weed/query/engine/fast_path_predicate_validation_test.go b/weed/query/engine/fast_path_predicate_validation_test.go new file mode 100644 index 000000000..3322ed51f --- /dev/null +++ b/weed/query/engine/fast_path_predicate_validation_test.go @@ -0,0 +1,272 @@ +package engine + +import ( + "testing" +) + +// TestFastPathPredicateValidation tests the critical fix for fast-path aggregation +// to ensure non-time predicates are properly detected and fast-path is blocked +func TestFastPathPredicateValidation(t *testing.T) { + engine := NewTestSQLEngine() + + testCases := []struct { + name string + whereClause string + expectedTimeOnly bool + expectedStartTimeNs int64 + expectedStopTimeNs int64 + description string + }{ + { + name: "No WHERE clause", + whereClause: "", + expectedTimeOnly: true, // No WHERE means time-only is true + description: "Queries without WHERE clause should allow fast path", + }, + { + name: "Time-only predicate (greater than)", + whereClause: "_ts > 1640995200000000000", + expectedTimeOnly: true, + expectedStartTimeNs: 1640995200000000000, + expectedStopTimeNs: 0, + description: "Pure time predicates should allow fast path", + }, + { + name: "Time-only predicate (less than)", + whereClause: "_ts < 1640995200000000000", + expectedTimeOnly: true, + expectedStartTimeNs: 0, + expectedStopTimeNs: 1640995200000000000, + description: "Pure time predicates should allow fast path", + }, + { + name: "Time-only predicate (range with AND)", + whereClause: "_ts > 1640995200000000000 AND _ts < 1641081600000000000", + expectedTimeOnly: true, + expectedStartTimeNs: 1640995200000000000, + expectedStopTimeNs: 1641081600000000000, + description: "Time range predicates should allow fast path", + }, + { + name: "Mixed predicate (time + non-time)", + whereClause: "_ts > 1640995200000000000 AND user_id = 'user123'", + expectedTimeOnly: false, + description: "CRITICAL: Mixed predicates must block fast path to prevent incorrect results", + }, + { + name: "Non-time predicate only", + whereClause: "user_id = 'user123'", + expectedTimeOnly: false, + description: "Non-time predicates must block fast path", + }, + { + name: "Multiple non-time predicates", + whereClause: "user_id = 'user123' AND status = 'active'", + expectedTimeOnly: false, + description: "Multiple non-time predicates must block fast path", + }, + { + name: "OR with time predicate (unsafe)", + whereClause: "_ts > 1640995200000000000 OR user_id = 'user123'", + expectedTimeOnly: false, + description: "OR expressions are complex and must block fast path", + }, + { + name: "OR with only time predicates (still unsafe)", + whereClause: "_ts > 1640995200000000000 OR _ts < 1640908800000000000", + expectedTimeOnly: false, + description: "Even time-only OR expressions must block fast path due to complexity", + }, + // Note: Parenthesized expressions are not supported by the current parser + // These test cases are commented out until parser support is added + { + name: "String column comparison", + whereClause: "event_type = 'click'", + expectedTimeOnly: false, + description: "String column comparisons must block fast path", + }, + { + name: "Numeric column comparison", + whereClause: "id > 1000", + expectedTimeOnly: false, + description: "Numeric column comparisons must block fast path", + }, + { + name: "Internal timestamp column", + whereClause: "_timestamp_ns > 1640995200000000000", + expectedTimeOnly: true, + expectedStartTimeNs: 1640995200000000000, + description: "Internal timestamp column should allow fast path", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Parse the WHERE clause if present + var whereExpr ExprNode + if tc.whereClause != "" { + sql := "SELECT COUNT(*) FROM test WHERE " + tc.whereClause + stmt, err := ParseSQL(sql) + if err != nil { + t.Fatalf("Failed to parse SQL: %v", err) + } + selectStmt := stmt.(*SelectStatement) + whereExpr = selectStmt.Where.Expr + } + + // Test the validation function + var startTimeNs, stopTimeNs int64 + var onlyTimePredicates bool + + if whereExpr == nil { + // No WHERE clause case + onlyTimePredicates = true + } else { + startTimeNs, stopTimeNs, onlyTimePredicates = engine.SQLEngine.extractTimeFiltersWithValidation(whereExpr) + } + + // Verify the results + if onlyTimePredicates != tc.expectedTimeOnly { + t.Errorf("Expected onlyTimePredicates=%v, got %v. %s", + tc.expectedTimeOnly, onlyTimePredicates, tc.description) + } + + // Check time filters if expected + if tc.expectedStartTimeNs != 0 && startTimeNs != tc.expectedStartTimeNs { + t.Errorf("Expected startTimeNs=%d, got %d", tc.expectedStartTimeNs, startTimeNs) + } + if tc.expectedStopTimeNs != 0 && stopTimeNs != tc.expectedStopTimeNs { + t.Errorf("Expected stopTimeNs=%d, got %d", tc.expectedStopTimeNs, stopTimeNs) + } + + t.Logf("✅ %s: onlyTimePredicates=%v, startTimeNs=%d, stopTimeNs=%d", + tc.name, onlyTimePredicates, startTimeNs, stopTimeNs) + }) + } +} + +// TestFastPathAggregationSafety tests that fast-path aggregation is only attempted +// when it's safe to do so (no non-time predicates) +func TestFastPathAggregationSafety(t *testing.T) { + engine := NewTestSQLEngine() + + testCases := []struct { + name string + sql string + shouldUseFastPath bool + description string + }{ + { + name: "No WHERE - should use fast path", + sql: "SELECT COUNT(*) FROM test", + shouldUseFastPath: true, + description: "Queries without WHERE should use fast path", + }, + { + name: "Time-only WHERE - should use fast path", + sql: "SELECT COUNT(*) FROM test WHERE _ts > 1640995200000000000", + shouldUseFastPath: true, + description: "Time-only predicates should use fast path", + }, + { + name: "Mixed WHERE - should NOT use fast path", + sql: "SELECT COUNT(*) FROM test WHERE _ts > 1640995200000000000 AND user_id = 'user123'", + shouldUseFastPath: false, + description: "CRITICAL: Mixed predicates must NOT use fast path to prevent wrong results", + }, + { + name: "Non-time WHERE - should NOT use fast path", + sql: "SELECT COUNT(*) FROM test WHERE user_id = 'user123'", + shouldUseFastPath: false, + description: "Non-time predicates must NOT use fast path", + }, + { + name: "OR expression - should NOT use fast path", + sql: "SELECT COUNT(*) FROM test WHERE _ts > 1640995200000000000 OR user_id = 'user123'", + shouldUseFastPath: false, + description: "OR expressions must NOT use fast path due to complexity", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Parse the SQL + stmt, err := ParseSQL(tc.sql) + if err != nil { + t.Fatalf("Failed to parse SQL: %v", err) + } + selectStmt := stmt.(*SelectStatement) + + // Test the fast path decision logic + startTimeNs, stopTimeNs := int64(0), int64(0) + onlyTimePredicates := true + if selectStmt.Where != nil { + startTimeNs, stopTimeNs, onlyTimePredicates = engine.SQLEngine.extractTimeFiltersWithValidation(selectStmt.Where.Expr) + } + + canAttemptFastPath := selectStmt.Where == nil || onlyTimePredicates + + // Verify the decision + if canAttemptFastPath != tc.shouldUseFastPath { + t.Errorf("Expected canAttemptFastPath=%v, got %v. %s", + tc.shouldUseFastPath, canAttemptFastPath, tc.description) + } + + t.Logf("✅ %s: canAttemptFastPath=%v (onlyTimePredicates=%v, startTimeNs=%d, stopTimeNs=%d)", + tc.name, canAttemptFastPath, onlyTimePredicates, startTimeNs, stopTimeNs) + }) + } +} + +// TestTimestampColumnDetection tests that the engine correctly identifies timestamp columns +func TestTimestampColumnDetection(t *testing.T) { + engine := NewTestSQLEngine() + + testCases := []struct { + columnName string + isTimestamp bool + description string + }{ + { + columnName: "_ts", + isTimestamp: true, + description: "System timestamp display column should be detected", + }, + { + columnName: "_timestamp_ns", + isTimestamp: true, + description: "Internal timestamp column should be detected", + }, + { + columnName: "user_id", + isTimestamp: false, + description: "Non-timestamp column should not be detected as timestamp", + }, + { + columnName: "id", + isTimestamp: false, + description: "ID column should not be detected as timestamp", + }, + { + columnName: "status", + isTimestamp: false, + description: "Status column should not be detected as timestamp", + }, + { + columnName: "event_type", + isTimestamp: false, + description: "Event type column should not be detected as timestamp", + }, + } + + for _, tc := range testCases { + t.Run(tc.columnName, func(t *testing.T) { + isTimestamp := engine.SQLEngine.isTimestampColumn(tc.columnName) + if isTimestamp != tc.isTimestamp { + t.Errorf("Expected isTimestampColumn(%s)=%v, got %v. %s", + tc.columnName, tc.isTimestamp, isTimestamp, tc.description) + } + t.Logf("✅ Column '%s': isTimestamp=%v", tc.columnName, isTimestamp) + }) + } +} diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go index 2584b54a6..eee57bc23 100644 --- a/weed/query/engine/hybrid_message_scanner.go +++ b/weed/query/engine/hybrid_message_scanner.go @@ -3,6 +3,7 @@ package engine import ( "container/heap" "context" + "encoding/binary" "encoding/json" "fmt" "io" @@ -145,6 +146,46 @@ type ParquetFileStats struct { FileName string RowCount int64 ColumnStats map[string]*ParquetColumnStats + // Optional file-level timestamp range from filer extended attributes + MinTimestampNs int64 + MaxTimestampNs int64 +} + +// getTimestampRangeFromStats returns (minTsNs, maxTsNs, ok) by inspecting common timestamp columns +func (h *HybridMessageScanner) getTimestampRangeFromStats(fileStats *ParquetFileStats) (int64, int64, bool) { + if fileStats == nil { + return 0, 0, false + } + // Prefer column stats for _ts_ns if present + if len(fileStats.ColumnStats) > 0 { + if s, ok := fileStats.ColumnStats[logstore.SW_COLUMN_NAME_TS]; ok && s != nil && s.MinValue != nil && s.MaxValue != nil { + if minNs, okMin := h.schemaValueToNs(s.MinValue); okMin { + if maxNs, okMax := h.schemaValueToNs(s.MaxValue); okMax { + return minNs, maxNs, true + } + } + } + } + // Fallback to file-level range if present in filer extended metadata + if fileStats.MinTimestampNs != 0 || fileStats.MaxTimestampNs != 0 { + return fileStats.MinTimestampNs, fileStats.MaxTimestampNs, true + } + return 0, 0, false +} + +// schemaValueToNs converts a schema_pb.Value that represents a timestamp to ns +func (h *HybridMessageScanner) schemaValueToNs(v *schema_pb.Value) (int64, bool) { + if v == nil { + return 0, false + } + switch k := v.Kind.(type) { + case *schema_pb.Value_Int64Value: + return k.Int64Value, true + case *schema_pb.Value_Int32Value: + return int64(k.Int32Value), true + default: + return 0, false + } } // StreamingDataSource provides a streaming interface for reading scan results @@ -1080,6 +1121,15 @@ func (h *HybridMessageScanner) extractParquetFileStats(entry *filer_pb.Entry, lo RowCount: fileView.NumRows(), ColumnStats: make(map[string]*ParquetColumnStats), } + // Populate optional min/max from filer extended attributes (writer stores ns timestamps) + if entry != nil && entry.Extended != nil { + if minBytes, ok := entry.Extended["min"]; ok && len(minBytes) == 8 { + fileStats.MinTimestampNs = int64(binary.BigEndian.Uint64(minBytes)) + } + if maxBytes, ok := entry.Extended["max"]; ok && len(maxBytes) == 8 { + fileStats.MaxTimestampNs = int64(binary.BigEndian.Uint64(maxBytes)) + } + } // Get schema information schema := fileView.Schema() diff --git a/weed/query/engine/types.go b/weed/query/engine/types.go index 08be17fc0..edcd5bd9a 100644 --- a/weed/query/engine/types.go +++ b/weed/query/engine/types.go @@ -87,6 +87,12 @@ type QueryExecutionPlan struct { BufferStartIndex int64 `json:"buffer_start_index,omitempty"` } +// Plan detail keys +const ( + PlanDetailStartTimeNs = "StartTimeNs" + PlanDetailStopTimeNs = "StopTimeNs" +) + // QueryResult represents the result of a SQL query execution type QueryResult struct { Columns []string `json:"columns"`