From c7a0b89067c15d7a2faa734eb315d65dbb060199 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 1 Sep 2025 21:40:24 -0700 Subject: [PATCH] fix after refactoring --- weed/query/engine/aggregations.go | 5 +- weed/query/engine/engine.go | 747 ---------------------------- weed/query/engine/system_columns.go | 77 --- weed/query/engine/types.go | 33 -- 4 files changed, 3 insertions(+), 859 deletions(-) diff --git a/weed/query/engine/aggregations.go b/weed/query/engine/aggregations.go index a4a9a70f4..faa139c04 100644 --- a/weed/query/engine/aggregations.go +++ b/weed/query/engine/aggregations.go @@ -103,8 +103,9 @@ func (opt *FastPathOptimizer) CollectDataSources(ctx context.Context, hybridScan } } - // Count live log files - liveLogCount, _ := opt.engine.countLiveLogFiles(partitionPath, dataSources.ParquetFiles[partitionPath]) + // Count live log files (excluding those converted to parquet) + parquetSources := opt.engine.extractParquetSourceFiles(dataSources.ParquetFiles[partitionPath]) + liveLogCount, _ := opt.engine.countLiveLogRowsExcludingParquetSources(partitionPath, parquetSources) dataSources.LiveLogRowCount += liveLogCount } diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index f0a8e8f2a..8be8e2b2d 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -1491,221 +1491,6 @@ func (e *SQLEngine) dropTable(ctx context.Context, stmt *sqlparser.DDL) (*QueryR return result, nil } -// AggregationComputer handles the computation of aggregations using fast path -type AggregationComputer struct { - engine *SQLEngine -} - -// NewAggregationComputer creates a new aggregation computer -func NewAggregationComputer(engine *SQLEngine) *AggregationComputer { - return &AggregationComputer{engine: engine} -} - -// ComputeFastPathAggregations computes aggregations using parquet statistics and live log data -func (comp *AggregationComputer) ComputeFastPathAggregations( - ctx context.Context, - aggregations []AggregationSpec, - dataSources *TopicDataSources, - partitions []string, -) ([]AggregationResult, error) { - - aggResults := make([]AggregationResult, len(aggregations)) - - for i, spec := range aggregations { - switch spec.Function { - case "COUNT": - if spec.Column == "*" { - aggResults[i].Count = dataSources.ParquetRowCount + dataSources.LiveLogRowCount - } else { - // For specific columns, we might need to account for NULLs in the future - aggResults[i].Count = dataSources.ParquetRowCount + dataSources.LiveLogRowCount - } - - case "MIN": - globalMin, err := comp.computeGlobalMin(spec, dataSources, partitions) - if err != nil { - return nil, AggregationError{ - Operation: spec.Function, - Column: spec.Column, - Cause: err, - } - } - aggResults[i].Min = globalMin - - case "MAX": - globalMax, err := comp.computeGlobalMax(spec, dataSources, partitions) - if err != nil { - return nil, AggregationError{ - Operation: spec.Function, - Column: spec.Column, - Cause: err, - } - } - aggResults[i].Max = globalMax - - default: - return nil, OptimizationError{ - Strategy: "fast_path_aggregation", - Reason: fmt.Sprintf("unsupported aggregation function: %s", spec.Function), - } - } - } - - return aggResults, nil -} - -// computeGlobalMin computes the global minimum value across all data sources -func (comp *AggregationComputer) computeGlobalMin(spec AggregationSpec, dataSources *TopicDataSources, partitions []string) (interface{}, error) { - var globalMin interface{} - var globalMinValue *schema_pb.Value - hasParquetStats := false - - // Step 1: Get minimum from parquet statistics - for _, fileStats := range dataSources.ParquetFiles { - for _, fileStat := range fileStats { - // Try case-insensitive column lookup - var colStats *ParquetColumnStats - var found bool - - // First try exact match - if stats, exists := fileStat.ColumnStats[spec.Column]; exists { - colStats = stats - found = true - } else { - // Try case-insensitive lookup - for colName, stats := range fileStat.ColumnStats { - if strings.EqualFold(colName, spec.Column) { - colStats = stats - found = true - break - } - } - } - - if found && colStats != nil && colStats.MinValue != nil { - if globalMinValue == nil || comp.engine.compareValues(colStats.MinValue, globalMinValue) < 0 { - globalMinValue = colStats.MinValue - extractedValue := comp.engine.extractRawValue(colStats.MinValue) - if extractedValue != nil { - globalMin = extractedValue - hasParquetStats = true - } - } - } - } - } - - // Step 2: Get minimum from live log data (only if no live logs or if we need to compare) - if dataSources.LiveLogRowCount > 0 { - for _, partition := range partitions { - partitionParquetSources := make(map[string]bool) - if partitionFileStats, exists := dataSources.ParquetFiles[partition]; exists { - partitionParquetSources = comp.engine.extractParquetSourceFiles(partitionFileStats) - } - - liveLogMin, _, err := comp.engine.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources) - if err != nil { - continue // Skip partitions with errors - } - - if liveLogMin != nil { - if globalMin == nil { - globalMin = liveLogMin - } else { - liveLogSchemaValue := comp.engine.convertRawValueToSchemaValue(liveLogMin) - if liveLogSchemaValue != nil && comp.engine.compareValues(liveLogSchemaValue, globalMinValue) < 0 { - globalMin = liveLogMin - globalMinValue = liveLogSchemaValue - } - } - } - } - } - - // Step 3: Handle system columns if no regular data found - if globalMin == nil && !hasParquetStats { - globalMin = comp.engine.getSystemColumnGlobalMin(spec.Column, dataSources.ParquetFiles) - } - - return globalMin, nil -} - -// computeGlobalMax computes the global maximum value across all data sources -func (comp *AggregationComputer) computeGlobalMax(spec AggregationSpec, dataSources *TopicDataSources, partitions []string) (interface{}, error) { - var globalMax interface{} - var globalMaxValue *schema_pb.Value - hasParquetStats := false - - // Step 1: Get maximum from parquet statistics - for _, fileStats := range dataSources.ParquetFiles { - for _, fileStat := range fileStats { - // Try case-insensitive column lookup - var colStats *ParquetColumnStats - var found bool - - // First try exact match - if stats, exists := fileStat.ColumnStats[spec.Column]; exists { - colStats = stats - found = true - } else { - // Try case-insensitive lookup - for colName, stats := range fileStat.ColumnStats { - if strings.EqualFold(colName, spec.Column) { - colStats = stats - found = true - break - } - } - } - - if found && colStats != nil && colStats.MaxValue != nil { - if globalMaxValue == nil || comp.engine.compareValues(colStats.MaxValue, globalMaxValue) > 0 { - globalMaxValue = colStats.MaxValue - extractedValue := comp.engine.extractRawValue(colStats.MaxValue) - if extractedValue != nil { - globalMax = extractedValue - hasParquetStats = true - } - } - } - } - } - - // Step 2: Get maximum from live log data (only if live logs exist) - if dataSources.LiveLogRowCount > 0 { - for _, partition := range partitions { - partitionParquetSources := make(map[string]bool) - if partitionFileStats, exists := dataSources.ParquetFiles[partition]; exists { - partitionParquetSources = comp.engine.extractParquetSourceFiles(partitionFileStats) - } - - _, liveLogMax, err := comp.engine.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources) - if err != nil { - continue // Skip partitions with errors - } - - if liveLogMax != nil { - if globalMax == nil { - globalMax = liveLogMax - } else { - liveLogSchemaValue := comp.engine.convertRawValueToSchemaValue(liveLogMax) - if liveLogSchemaValue != nil && comp.engine.compareValues(liveLogSchemaValue, globalMaxValue) > 0 { - globalMax = liveLogMax - globalMaxValue = liveLogSchemaValue - } - } - } - } - } - - // Step 3: Handle system columns if no regular data found - if globalMax == nil && !hasParquetStats { - globalMax = comp.engine.getSystemColumnGlobalMax(spec.Column, dataSources.ParquetFiles) - } - - return globalMax, nil -} - // ExecutionPlanBuilder handles building execution plans for queries type ExecutionPlanBuilder struct { engine *SQLEngine @@ -1879,350 +1664,6 @@ func (e *SQLEngine) parseAggregationFunction(funcExpr *sqlparser.FuncExpr, alias return spec, nil } -// executeAggregationQuery handles SELECT queries with aggregation functions -func (e *SQLEngine) executeAggregationQuery(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, stmt *sqlparser.Select) (*QueryResult, error) { - // Parse WHERE clause for filtering - var predicate func(*schema_pb.RecordValue) bool - var err error - if stmt.Where != nil { - predicate, err = e.buildPredicate(stmt.Where.Expr) - if err != nil { - return &QueryResult{Error: err}, err - } - } - - // Extract time filters for optimization - startTimeNs, stopTimeNs := int64(0), int64(0) - if stmt.Where != nil { - startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr) - } - - // FAST PATH: Try to use parquet statistics for optimization - // This can be ~130x faster than scanning all data - if stmt.Where == nil { // Only optimize when no complex WHERE clause - fastResult, canOptimize := e.tryFastParquetAggregation(ctx, hybridScanner, aggregations) - if canOptimize { - fmt.Printf("Using fast hybrid statistics for aggregation (parquet stats + live log counts)\n") - return fastResult, nil - } - } - - // SLOW PATH: Fall back to full table scan - fmt.Printf("Using full table scan for aggregation (parquet optimization not applicable)\n") - - // Build scan options for full table scan (aggregations need all data) - hybridScanOptions := HybridScanOptions{ - StartTimeNs: startTimeNs, - StopTimeNs: stopTimeNs, - Limit: 0, // No limit for aggregations - need all data - Predicate: predicate, - } - - // Execute the hybrid scan to get all matching records - results, err := hybridScanner.Scan(ctx, hybridScanOptions) - if err != nil { - return &QueryResult{Error: err}, err - } - - // Compute aggregations - aggResults := e.computeAggregations(results, aggregations) - - // Build result set - columns := make([]string, len(aggregations)) - row := make([]sqltypes.Value, len(aggregations)) - - for i, spec := range aggregations { - columns[i] = spec.Alias - row[i] = e.formatAggregationResult(spec, aggResults[i]) - } - - return &QueryResult{ - Columns: columns, - Rows: [][]sqltypes.Value{row}, - }, nil -} - -// computeAggregations computes aggregation functions over the scan results -func (e *SQLEngine) computeAggregations(results []HybridScanResult, aggregations []AggregationSpec) []AggregationResult { - aggResults := make([]AggregationResult, len(aggregations)) - - for i, spec := range aggregations { - switch spec.Function { - case "COUNT": - if spec.Column == "*" { - // COUNT(*) counts all rows - aggResults[i].Count = int64(len(results)) - } else if spec.Distinct { - // COUNT(DISTINCT column) counts unique non-null values - uniqueValues := make(map[string]bool) - for _, result := range results { - if value := e.findColumnValue(result, spec.Column); value != nil { - if !e.isNullValue(value) { - // Use string representation for uniqueness check - rawValue := e.extractRawValue(value) - if rawValue != nil { - uniqueValues[fmt.Sprintf("%v", rawValue)] = true - } - } - } - } - aggResults[i].Count = int64(len(uniqueValues)) - } else { - // COUNT(column) counts non-null values - count := int64(0) - for _, result := range results { - if value := e.findColumnValue(result, spec.Column); value != nil { - if !e.isNullValue(value) { - count++ - } - } - } - aggResults[i].Count = count - } - - case "SUM": - sum := float64(0) - for _, result := range results { - if value := e.findColumnValue(result, spec.Column); value != nil { - if numValue := e.convertToNumber(value); numValue != nil { - sum += *numValue - } - } - } - aggResults[i].Sum = sum - - case "AVG": - sum := float64(0) - count := int64(0) - for _, result := range results { - if value := e.findColumnValue(result, spec.Column); value != nil { - if numValue := e.convertToNumber(value); numValue != nil { - sum += *numValue - count++ - } - } - } - if count > 0 { - aggResults[i].Sum = sum / float64(count) // Store average in Sum field - aggResults[i].Count = count - } - - case "MIN": - var min interface{} - var minValue *schema_pb.Value - for _, result := range results { - if value := e.findColumnValue(result, spec.Column); value != nil { - if minValue == nil || e.compareValues(value, minValue) < 0 { - minValue = value - min = e.extractRawValue(value) - } - } - } - aggResults[i].Min = min - - case "MAX": - var max interface{} - var maxValue *schema_pb.Value - for _, result := range results { - if value := e.findColumnValue(result, spec.Column); value != nil { - if maxValue == nil || e.compareValues(value, maxValue) > 0 { - maxValue = value - max = e.extractRawValue(value) - } - } - } - aggResults[i].Max = max - } - } - - return aggResults -} - -// Helper functions for aggregation processing - -func (e *SQLEngine) isNullValue(value *schema_pb.Value) bool { - return value == nil || value.Kind == nil -} - -func (e *SQLEngine) convertToNumber(value *schema_pb.Value) *float64 { - switch v := value.Kind.(type) { - case *schema_pb.Value_Int32Value: - result := float64(v.Int32Value) - return &result - case *schema_pb.Value_Int64Value: - result := float64(v.Int64Value) - return &result - case *schema_pb.Value_FloatValue: - result := float64(v.FloatValue) - return &result - case *schema_pb.Value_DoubleValue: - return &v.DoubleValue - } - return nil -} - -func (e *SQLEngine) extractRawValue(value *schema_pb.Value) interface{} { - switch v := value.Kind.(type) { - case *schema_pb.Value_Int32Value: - return v.Int32Value - case *schema_pb.Value_Int64Value: - return v.Int64Value - case *schema_pb.Value_FloatValue: - return v.FloatValue - case *schema_pb.Value_DoubleValue: - return v.DoubleValue - case *schema_pb.Value_StringValue: - return v.StringValue - case *schema_pb.Value_BoolValue: - return v.BoolValue - case *schema_pb.Value_BytesValue: - return string(v.BytesValue) // Convert bytes to string for comparison - } - return nil -} - -func (e *SQLEngine) compareValues(value1 *schema_pb.Value, value2 *schema_pb.Value) int { - if value2 == nil { - return 1 // value1 > nil - } - raw1 := e.extractRawValue(value1) - raw2 := e.extractRawValue(value2) - if raw1 == nil { - return -1 - } - if raw2 == nil { - return 1 - } - - // Simple comparison - in a full implementation this would handle type coercion - switch v1 := raw1.(type) { - case int32: - if v2, ok := raw2.(int32); ok { - if v1 < v2 { - return -1 - } else if v1 > v2 { - return 1 - } - return 0 - } - case int64: - if v2, ok := raw2.(int64); ok { - if v1 < v2 { - return -1 - } else if v1 > v2 { - return 1 - } - return 0 - } - case float32: - if v2, ok := raw2.(float32); ok { - if v1 < v2 { - return -1 - } else if v1 > v2 { - return 1 - } - return 0 - } - case float64: - if v2, ok := raw2.(float64); ok { - if v1 < v2 { - return -1 - } else if v1 > v2 { - return 1 - } - return 0 - } - case string: - if v2, ok := raw2.(string); ok { - if v1 < v2 { - return -1 - } else if v1 > v2 { - return 1 - } - return 0 - } - case bool: - if v2, ok := raw2.(bool); ok { - if v1 == v2 { - return 0 - } else if v1 && !v2 { - return 1 - } - return -1 - } - } - return 0 -} - -// tryFastParquetAggregation attempts to compute aggregations using hybrid approach: -// - Use parquet metadata for parquet files -// - Count live log files for live data -// - Combine both for accurate results per partition -// Returns (result, canOptimize) where canOptimize=true means the hybrid fast path was used -func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec) (*QueryResult, bool) { - // Use the new modular components - optimizer := NewFastPathOptimizer(e) - computer := NewAggregationComputer(e) - - // Step 1: Determine strategy - strategy := optimizer.DetermineStrategy(aggregations) - if !strategy.CanUseFastPath { - return nil, false - } - - // Step 2: Collect data sources - dataSources, err := optimizer.CollectDataSources(ctx, hybridScanner) - if err != nil { - return nil, false - } - - // Build partition list for aggregation computer - relativePartitions, err := e.discoverTopicPartitions(hybridScanner.topic.Namespace, hybridScanner.topic.Name) - if err != nil { - return nil, false - } - - topicBasePath := fmt.Sprintf("/topics/%s/%s", hybridScanner.topic.Namespace, hybridScanner.topic.Name) - partitions := make([]string, len(relativePartitions)) - for i, relPartition := range relativePartitions { - partitions[i] = fmt.Sprintf("%s/%s", topicBasePath, relPartition) - } - - // Debug: Show the hybrid optimization results - if dataSources.ParquetRowCount > 0 || dataSources.LiveLogRowCount > 0 { - partitionsWithLiveLogs := 0 - if dataSources.LiveLogRowCount > 0 { - partitionsWithLiveLogs = 1 // Simplified for now - } - fmt.Printf("Hybrid fast aggregation with deduplication: %d parquet rows + %d deduplicated live log rows from %d partitions\n", - dataSources.ParquetRowCount, dataSources.LiveLogRowCount, partitionsWithLiveLogs) - } - - // Step 3: Compute aggregations using fast path - aggResults, err := computer.ComputeFastPathAggregations(ctx, aggregations, dataSources, partitions) - if err != nil { - return nil, false - } - - // Step 4: Build final query result - - // Build result using fast parquet statistics - columns := make([]string, len(aggregations)) - row := make([]sqltypes.Value, len(aggregations)) - - for i, spec := range aggregations { - columns[i] = spec.Alias - row[i] = e.formatAggregationResult(spec, aggResults[i]) - } - - result := &QueryResult{ - Columns: columns, - Rows: [][]sqltypes.Value{row}, - } - - return result, true -} - // computeLiveLogMinMax scans live log files to find MIN/MAX values for a specific column func (e *SQLEngine) computeLiveLogMinMax(partitionPath string, columnName string, parquetSourceFiles map[string]bool) (interface{}, interface{}, error) { if e.catalog.brokerClient == nil { @@ -2456,151 +1897,6 @@ func (e *SQLEngine) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (* return recordValue, "live_log", nil } -// convertJSONValueToSchemaValue converts JSON values to schema_pb.Value -func (e *SQLEngine) convertJSONValueToSchemaValue(jsonValue interface{}) *schema_pb.Value { - switch v := jsonValue.(type) { - case string: - return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v}} - case float64: - // JSON numbers are always float64, try to detect if it's actually an integer - if v == float64(int64(v)) { - return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: int64(v)}} - } - return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v}} - case bool: - return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: v}} - case nil: - return nil - default: - // Convert other types to string - return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: fmt.Sprintf("%v", v)}} - } -} - -// convertRawValueToSchemaValue converts raw Go values back to schema_pb.Value for comparison -func (e *SQLEngine) convertRawValueToSchemaValue(rawValue interface{}) *schema_pb.Value { - switch v := rawValue.(type) { - case int32: - return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: v}} - case int64: - return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v}} - case float32: - return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: v}} - case float64: - return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v}} - case string: - return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v}} - case bool: - return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: v}} - case []byte: - return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: v}} - default: - // Convert other types to string as fallback - return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: fmt.Sprintf("%v", v)}} - } -} - -// canUseParquetStatsForAggregation determines if an aggregation can be optimized with parquet stats -func (e *SQLEngine) canUseParquetStatsForAggregation(spec AggregationSpec) bool { - switch spec.Function { - case "COUNT": - return spec.Column == "*" || e.isSystemColumn(spec.Column) || e.isRegularColumn(spec.Column) - case "MIN", "MAX": - return e.isSystemColumn(spec.Column) || e.isRegularColumn(spec.Column) - case "SUM", "AVG": - // These require scanning actual values, not just min/max - return false - default: - return false - } -} - -// isSystemColumn checks if a column is a system column (_timestamp_ns, _key, _source) -func (e *SQLEngine) isSystemColumn(columnName string) bool { - lowerName := strings.ToLower(columnName) - return lowerName == "_timestamp_ns" || lowerName == "timestamp_ns" || - lowerName == "_key" || lowerName == "key" || - lowerName == "_source" || lowerName == "source" -} - -// isRegularColumn checks if a column might be a regular data column (placeholder) -func (e *SQLEngine) isRegularColumn(columnName string) bool { - // For now, assume any non-system column is a regular column - return !e.isSystemColumn(columnName) -} - -// getSystemColumnGlobalMin computes global min for system columns using file metadata -func (e *SQLEngine) getSystemColumnGlobalMin(columnName string, allFileStats map[string][]*ParquetFileStats) interface{} { - lowerName := strings.ToLower(columnName) - - switch lowerName { - case "_timestamp_ns", "timestamp_ns": - // For timestamps, find the earliest timestamp across all files - // This should match what's in the Extended["min"] metadata - var minTimestamp *int64 - for _, fileStats := range allFileStats { - for _, fileStat := range fileStats { - // Extract timestamp from filename (format: YYYY-MM-DD-HH-MM-SS.parquet) - timestamp := e.extractTimestampFromFilename(fileStat.FileName) - if timestamp != 0 { - if minTimestamp == nil || timestamp < *minTimestamp { - minTimestamp = ×tamp - } - } - } - } - if minTimestamp != nil { - return *minTimestamp - } - - case "_key", "key": - // For keys, we'd need to read the actual parquet column stats - // Fall back to scanning if not available in our current stats - return nil - - case "_source", "source": - // Source is always "parquet_archive" for parquet files - return "parquet_archive" - } - - return nil -} - -// getSystemColumnGlobalMax computes global max for system columns using file metadata -func (e *SQLEngine) getSystemColumnGlobalMax(columnName string, allFileStats map[string][]*ParquetFileStats) interface{} { - lowerName := strings.ToLower(columnName) - - switch lowerName { - case "_timestamp_ns", "timestamp_ns": - // For timestamps, find the latest timestamp across all files - var maxTimestamp *int64 - for _, fileStats := range allFileStats { - for _, fileStat := range fileStats { - // Extract timestamp from filename (format: YYYY-MM-DD-HH-MM-SS.parquet) - timestamp := e.extractTimestampFromFilename(fileStat.FileName) - if timestamp != 0 { - if maxTimestamp == nil || timestamp > *maxTimestamp { - maxTimestamp = ×tamp - } - } - } - } - if maxTimestamp != nil { - return *maxTimestamp - } - - case "_key", "key": - // For keys, we'd need to read the actual parquet column stats - return nil - - case "_source", "source": - // Source is always "parquet_archive" for parquet files - return "parquet_archive" - } - - return nil -} - // extractTimestampFromFilename extracts timestamp from parquet filename // Format: YYYY-MM-DD-HH-MM-SS.parquet func (e *SQLEngine) extractTimestampFromFilename(filename string) int64 { @@ -2979,49 +2275,6 @@ func (e *SQLEngine) getActualRowsScannedForFastPath(namespace, topicName string) return totalScannedRows, nil } -func (e *SQLEngine) formatAggregationResult(spec AggregationSpec, result AggregationResult) sqltypes.Value { - switch spec.Function { - case "COUNT": - return sqltypes.NewInt64(result.Count) - case "SUM": - return sqltypes.NewFloat64(result.Sum) - case "AVG": - return sqltypes.NewFloat64(result.Sum) // Sum contains the average for AVG - case "MIN": - if result.Min != nil { - return e.convertRawValueToSQL(result.Min) - } - return sqltypes.NULL - case "MAX": - if result.Max != nil { - return e.convertRawValueToSQL(result.Max) - } - return sqltypes.NULL - } - return sqltypes.NULL -} - -func (e *SQLEngine) convertRawValueToSQL(value interface{}) sqltypes.Value { - switch v := value.(type) { - case int32: - return sqltypes.NewInt32(v) - case int64: - return sqltypes.NewInt64(v) - case float32: - return sqltypes.NewFloat32(v) - case float64: - return sqltypes.NewFloat64(v) - case string: - return sqltypes.NewVarChar(v) - case bool: - if v { - return sqltypes.NewVarChar("1") - } - return sqltypes.NewVarChar("0") - } - return sqltypes.NULL -} - // findColumnValue performs case-insensitive lookup of column values // Now includes support for system columns stored in HybridScanResult func (e *SQLEngine) findColumnValue(result HybridScanResult, columnName string) *schema_pb.Value { diff --git a/weed/query/engine/system_columns.go b/weed/query/engine/system_columns.go index c675e73c0..e1374ab62 100644 --- a/weed/query/engine/system_columns.go +++ b/weed/query/engine/system_columns.go @@ -1,12 +1,7 @@ package engine import ( - "regexp" - "strconv" "strings" - "time" - - "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" ) // isSystemColumn checks if a column is a system column (_timestamp_ns, _key, _source) @@ -96,75 +91,3 @@ func (e *SQLEngine) getSystemColumnGlobalMax(columnName string, allFileStats map return nil } - -// extractTimestampFromFilename extracts timestamp from parquet filename -func (e *SQLEngine) extractTimestampFromFilename(filename string) int64 { - // Expected format: YYYY-MM-DD-HH-MM-SS.parquet or similar - // Try to parse timestamp from filename - re := regexp.MustCompile(`(\d{4}-\d{2}-\d{2}-\d{2}-\d{2}-\d{2})`) - matches := re.FindStringSubmatch(filename) - if len(matches) > 1 { - timestampStr := matches[1] - // Convert to time and then to nanoseconds - t, err := time.Parse("2006-01-02-15-04-05", timestampStr) - if err == nil { - return t.UnixNano() - } - } - - // Fallback: try to parse as unix timestamp if filename is numeric - if timestampStr := strings.TrimSuffix(filename, ".parquet"); timestampStr != filename { - if timestamp, err := strconv.ParseInt(timestampStr, 10, 64); err == nil { - // Assume it's already in nanoseconds - return timestamp - } - } - - return 0 -} - -// findColumnValue performs case-insensitive lookup of column values -// Now includes support for system columns stored in HybridScanResult -func (e *SQLEngine) findColumnValue(result HybridScanResult, columnName string) *schema_pb.Value { - lowerName := strings.ToLower(columnName) - - // Check system columns first - switch lowerName { - case "_timestamp_ns", "timestamp_ns": - return &schema_pb.Value{ - Kind: &schema_pb.Value_Int64Value{Int64Value: result.Timestamp}, - } - case "_key", "key": - return &schema_pb.Value{ - Kind: &schema_pb.Value_BytesValue{BytesValue: result.Key}, - } - case "_source", "source": - return &schema_pb.Value{ - Kind: &schema_pb.Value_StringValue{StringValue: result.Source}, - } - } - - // Check regular columns in the record data - if result.RecordValue != nil { - recordValue, ok := result.RecordValue.Kind.(*schema_pb.Value_RecordValue) - if !ok { - return nil - } - - if recordValue.RecordValue.Fields != nil { - // Try exact match first - if value, exists := recordValue.RecordValue.Fields[columnName]; exists { - return value - } - - // Try case-insensitive match - for fieldName, value := range recordValue.RecordValue.Fields { - if strings.EqualFold(fieldName, columnName) { - return value - } - } - } - } - - return nil -} diff --git a/weed/query/engine/types.go b/weed/query/engine/types.go index f0be49fb9..96aad7582 100644 --- a/weed/query/engine/types.go +++ b/weed/query/engine/types.go @@ -1,7 +1,6 @@ package engine import ( - "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/query/sqltypes" ) @@ -28,35 +27,3 @@ type QueryResult struct { Error error `json:"error,omitempty"` ExecutionPlan *QueryExecutionPlan `json:"execution_plan,omitempty"` } - -// ParquetColumnStats holds statistics for a single column in a Parquet file -type ParquetColumnStats struct { - ColumnName string - MinValue *schema_pb.Value - MaxValue *schema_pb.Value - NullCount int64 - RowCount int64 -} - -// ParquetFileStats holds statistics for a single Parquet file -type ParquetFileStats struct { - FileName string - RowCount int64 - ColumnStats map[string]*ParquetColumnStats -} - -// HybridScanResult represents a single record from hybrid scanning -type HybridScanResult struct { - RecordValue *schema_pb.Value - Source string // "live_log", "parquet_archive" - Timestamp int64 - Key []byte -} - -// HybridScanOptions configures how the hybrid scanner operates -type HybridScanOptions struct { - StartTimeNs int64 - StopTimeNs int64 - Limit int - Predicate func(*schema_pb.RecordValue) bool -}