diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index 782fdb268..e1bab2eb7 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -1376,11 +1376,18 @@ func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner } // Get all partitions for this topic - partitions, err := e.discoverTopicPartitions(hybridScanner.topic.Namespace, hybridScanner.topic.Name) + relativePartitions, err := e.discoverTopicPartitions(hybridScanner.topic.Namespace, hybridScanner.topic.Name) if err != nil { return nil, false } + // Convert relative partition paths to full paths + topicBasePath := fmt.Sprintf("/topics/%s/%s", hybridScanner.topic.Namespace, hybridScanner.topic.Name) + partitions := make([]string, len(relativePartitions)) + for i, relPartition := range relativePartitions { + partitions[i] = fmt.Sprintf("%s/%s", topicBasePath, relPartition) + } + // Collect statistics from all partitions (both parquet and live logs) allFileStats := make(map[string][]*ParquetFileStats) // partitionPath -> parquet file stats totalParquetRowCount := int64(0) @@ -1448,10 +1455,12 @@ func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner } case "MIN": - // Find global minimum across all files and partitions + // Hybrid approach: combine parquet statistics with live log scanning var globalMin interface{} var globalMinValue *schema_pb.Value + hasParquetStats := false + // Step 1: Get minimum from parquet statistics for _, fileStats := range allFileStats { for _, fileStat := range fileStats { if colStats, exists := fileStat.ColumnStats[spec.Column]; exists { @@ -1459,22 +1468,55 @@ func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner globalMinValue = colStats.MinValue globalMin = e.extractRawValue(colStats.MinValue) } + hasParquetStats = true } } } - // Handle system columns that aren't in parquet column stats - if globalMin == nil { + // Step 2: Get minimum from live log data in each partition + for _, partition := range partitions { + // Get parquet source files for this partition (for deduplication) + partitionParquetSources := make(map[string]bool) + if partitionFileStats, exists := allFileStats[partition]; exists { + partitionParquetSources = e.extractParquetSourceFiles(partitionFileStats) + } + + // Scan live log files for MIN value + liveLogMin, _, err := e.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources) + if err != nil { + fmt.Printf("Warning: failed to compute live log min for partition %s: %v\n", partition, err) + continue + } + + // Update global minimum if live log has a smaller value + if liveLogMin != nil { + if globalMin == nil { + globalMin = liveLogMin + } else { + // Compare live log min with current global min + liveLogSchemaValue := e.convertRawValueToSchemaValue(liveLogMin) + if e.compareValues(liveLogSchemaValue, globalMinValue) < 0 { + globalMin = liveLogMin + globalMinValue = liveLogSchemaValue + } + } + } + } + + // Step 3: Handle system columns that aren't in parquet column stats + if globalMin == nil && !hasParquetStats { globalMin = e.getSystemColumnGlobalMin(spec.Column, allFileStats) } aggResults[i].Min = globalMin case "MAX": - // Find global maximum across all files and partitions + // Hybrid approach: combine parquet statistics with live log scanning var globalMax interface{} var globalMaxValue *schema_pb.Value + hasParquetStats := false + // Step 1: Get maximum from parquet statistics for _, fileStats := range allFileStats { for _, fileStat := range fileStats { if colStats, exists := fileStat.ColumnStats[spec.Column]; exists { @@ -1482,12 +1524,43 @@ func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner globalMaxValue = colStats.MaxValue globalMax = e.extractRawValue(colStats.MaxValue) } + hasParquetStats = true } } } - // Handle system columns that aren't in parquet column stats - if globalMax == nil { + // Step 2: Get maximum from live log data in each partition + for _, partition := range partitions { + // Get parquet source files for this partition (for deduplication) + partitionParquetSources := make(map[string]bool) + if partitionFileStats, exists := allFileStats[partition]; exists { + partitionParquetSources = e.extractParquetSourceFiles(partitionFileStats) + } + + // Scan live log files for MAX value + _, liveLogMax, err := e.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources) + if err != nil { + fmt.Printf("Warning: failed to compute live log max for partition %s: %v\n", partition, err) + continue + } + + // Update global maximum if live log has a larger value + if liveLogMax != nil { + if globalMax == nil { + globalMax = liveLogMax + } else { + // Compare live log max with current global max + liveLogSchemaValue := e.convertRawValueToSchemaValue(liveLogMax) + if e.compareValues(liveLogSchemaValue, globalMaxValue) > 0 { + globalMax = liveLogMax + globalMaxValue = liveLogSchemaValue + } + } + } + } + + // Step 3: Handle system columns that aren't in parquet column stats + if globalMax == nil && !hasParquetStats { globalMax = e.getSystemColumnGlobalMax(spec.Column, allFileStats) } @@ -1516,6 +1589,283 @@ func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner return result, true } +// computeLiveLogMinMax scans live log files to find MIN/MAX values for a specific column +func (e *SQLEngine) computeLiveLogMinMax(partitionPath string, columnName string, parquetSourceFiles map[string]bool) (interface{}, interface{}, error) { + if e.catalog.brokerClient == nil { + return nil, nil, fmt.Errorf("no broker client available") + } + + filerClient, err := e.catalog.brokerClient.GetFilerClient() + if err != nil { + return nil, nil, fmt.Errorf("failed to get filer client: %v", err) + } + + var minValue, maxValue interface{} + var minSchemaValue, maxSchemaValue *schema_pb.Value + + // Process each live log file + err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { + // Skip parquet files and directories + if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") { + return nil + } + // Skip files that have been converted to parquet (deduplication) + if parquetSourceFiles[entry.Name] { + return nil + } + + filePath := partitionPath + "/" + entry.Name + + // Scan this log file for MIN/MAX values + fileMin, fileMax, err := e.computeFileMinMax(filerClient, filePath, columnName) + if err != nil { + fmt.Printf("Warning: failed to compute min/max for file %s: %v\n", filePath, err) + return nil // Continue with other files + } + + // Update global min/max + if fileMin != nil { + if minSchemaValue == nil || e.compareValues(fileMin, minSchemaValue) < 0 { + minSchemaValue = fileMin + minValue = e.extractRawValue(fileMin) + } + } + + if fileMax != nil { + if maxSchemaValue == nil || e.compareValues(fileMax, maxSchemaValue) > 0 { + maxSchemaValue = fileMax + maxValue = e.extractRawValue(fileMax) + } + } + + return nil + }) + + if err != nil { + return nil, nil, fmt.Errorf("failed to process partition directory %s: %v", partitionPath, err) + } + + return minValue, maxValue, nil +} + +// computeFileMinMax scans a single log file to find MIN/MAX values for a specific column +func (e *SQLEngine) computeFileMinMax(filerClient filer_pb.FilerClient, filePath string, columnName string) (*schema_pb.Value, *schema_pb.Value, error) { + var minValue, maxValue *schema_pb.Value + + err := e.eachLogEntryInFile(filerClient, filePath, func(logEntry *filer_pb.LogEntry) error { + // Convert log entry to record value + recordValue, _, err := e.convertLogEntryToRecordValue(logEntry) + if err != nil { + return err // This will stop processing this file but not fail the overall query + } + + // Extract the requested column value + var columnValue *schema_pb.Value + if e.isSystemColumn(columnName) { + // Handle system columns + switch strings.ToLower(columnName) { + case "_timestamp_ns", "timestamp_ns": + columnValue = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}} + case "_key", "key": + columnValue = &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}} + case "_source", "source": + columnValue = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: "live_log"}} + } + } else { + // Handle regular data columns + if value, exists := recordValue.Fields[columnName]; exists { + columnValue = value + } + } + + if columnValue == nil { + return nil // Skip this record + } + + // Update min/max + if minValue == nil || e.compareValues(columnValue, minValue) < 0 { + minValue = columnValue + } + if maxValue == nil || e.compareValues(columnValue, maxValue) > 0 { + maxValue = columnValue + } + + return nil + }) + + return minValue, maxValue, err +} + +// eachLogEntryInFile reads a log file and calls the provided function for each log entry +func (e *SQLEngine) eachLogEntryInFile(filerClient filer_pb.FilerClient, filePath string, fn func(*filer_pb.LogEntry) error) error { + // Extract directory and filename + // filePath is like "partitionPath/filename" + lastSlash := strings.LastIndex(filePath, "/") + if lastSlash == -1 { + return fmt.Errorf("invalid file path: %s", filePath) + } + + dirPath := filePath[:lastSlash] + fileName := filePath[lastSlash+1:] + + // Get file entry + var fileEntry *filer_pb.Entry + err := filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(dirPath), "", func(entry *filer_pb.Entry, isLast bool) error { + if entry.Name == fileName { + fileEntry = entry + } + return nil + }) + + if err != nil { + return fmt.Errorf("failed to find file %s: %v", filePath, err) + } + + if fileEntry == nil { + return fmt.Errorf("file not found: %s", filePath) + } + + lookupFileIdFn := filer.LookupFn(filerClient) + + // eachChunkFn processes each chunk's data (pattern from countRowsInLogFile) + eachChunkFn := func(buf []byte) error { + for pos := 0; pos+4 < len(buf); { + size := util.BytesToUint32(buf[pos : pos+4]) + if pos+4+int(size) > len(buf) { + break + } + + entryData := buf[pos+4 : pos+4+int(size)] + + logEntry := &filer_pb.LogEntry{} + if err := proto.Unmarshal(entryData, logEntry); err != nil { + pos += 4 + int(size) + continue // Skip corrupted entries + } + + // Call the provided function for each log entry + if err := fn(logEntry); err != nil { + return err + } + + pos += 4 + int(size) + } + return nil + } + + // Read file chunks and process them (pattern from countRowsInLogFile) + fileSize := filer.FileSize(fileEntry) + visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, fileEntry.Chunks, 0, int64(fileSize)) + chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize)) + + for x := chunkViews.Front(); x != nil; x = x.Next { + chunk := x.Value + urlStrings, err := lookupFileIdFn(context.Background(), chunk.FileId) + if err != nil { + fmt.Printf("Warning: failed to lookup chunk %s: %v\n", chunk.FileId, err) + continue + } + + if len(urlStrings) == 0 { + continue + } + + // Read chunk data + // urlStrings[0] is already a complete URL (http://server:port/fileId) + data, _, err := util_http.Get(urlStrings[0]) + if err != nil { + fmt.Printf("Warning: failed to read chunk %s from %s: %v\n", chunk.FileId, urlStrings[0], err) + continue + } + + // Process this chunk + if err := eachChunkFn(data); err != nil { + return err + } + } + + return nil +} + +// convertLogEntryToRecordValue helper method (reuse existing logic) +func (e *SQLEngine) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) { + // Parse the log entry data as JSON + var jsonData map[string]interface{} + if err := json.Unmarshal(logEntry.Data, &jsonData); err != nil { + return nil, "", fmt.Errorf("failed to parse log entry JSON: %v", err) + } + + // Create record value with system and user fields + recordValue := &schema_pb.RecordValue{Fields: make(map[string]*schema_pb.Value)} + + // Add system columns + recordValue.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{ + Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}, + } + recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}, + } + + // Add user data fields + for fieldName, jsonValue := range jsonData { + if fieldName == SW_COLUMN_NAME_TS || fieldName == SW_COLUMN_NAME_KEY { + continue // Skip system fields in user data + } + + // Convert JSON value to schema value (basic conversion) + schemaValue := e.convertJSONValueToSchemaValue(jsonValue) + if schemaValue != nil { + recordValue.Fields[fieldName] = schemaValue + } + } + + return recordValue, "live_log", nil +} + +// convertJSONValueToSchemaValue converts JSON values to schema_pb.Value +func (e *SQLEngine) convertJSONValueToSchemaValue(jsonValue interface{}) *schema_pb.Value { + switch v := jsonValue.(type) { + case string: + return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v}} + case float64: + // JSON numbers are always float64, try to detect if it's actually an integer + if v == float64(int64(v)) { + return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: int64(v)}} + } + return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v}} + case bool: + return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: v}} + case nil: + return nil + default: + // Convert other types to string + return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: fmt.Sprintf("%v", v)}} + } +} + +// convertRawValueToSchemaValue converts raw Go values back to schema_pb.Value for comparison +func (e *SQLEngine) convertRawValueToSchemaValue(rawValue interface{}) *schema_pb.Value { + switch v := rawValue.(type) { + case int32: + return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: v}} + case int64: + return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v}} + case float32: + return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: v}} + case float64: + return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v}} + case string: + return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v}} + case bool: + return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: v}} + case []byte: + return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: v}} + default: + // Convert other types to string as fallback + return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: fmt.Sprintf("%v", v)}} + } +} + // canUseParquetStatsForAggregation determines if an aggregation can be optimized with parquet stats func (e *SQLEngine) canUseParquetStatsForAggregation(spec AggregationSpec) bool { switch spec.Function {