combine parquet results with live logs

This commit is contained in:
chrislu
2025-09-01 16:26:35 -07:00
parent 352fad7172
commit 55cad6dc4a

View File

@@ -1376,11 +1376,18 @@ func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner
}
// Get all partitions for this topic
partitions, err := e.discoverTopicPartitions(hybridScanner.topic.Namespace, hybridScanner.topic.Name)
relativePartitions, err := e.discoverTopicPartitions(hybridScanner.topic.Namespace, hybridScanner.topic.Name)
if err != nil {
return nil, false
}
// Convert relative partition paths to full paths
topicBasePath := fmt.Sprintf("/topics/%s/%s", hybridScanner.topic.Namespace, hybridScanner.topic.Name)
partitions := make([]string, len(relativePartitions))
for i, relPartition := range relativePartitions {
partitions[i] = fmt.Sprintf("%s/%s", topicBasePath, relPartition)
}
// Collect statistics from all partitions (both parquet and live logs)
allFileStats := make(map[string][]*ParquetFileStats) // partitionPath -> parquet file stats
totalParquetRowCount := int64(0)
@@ -1448,10 +1455,12 @@ func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner
}
case "MIN":
// Find global minimum across all files and partitions
// Hybrid approach: combine parquet statistics with live log scanning
var globalMin interface{}
var globalMinValue *schema_pb.Value
hasParquetStats := false
// Step 1: Get minimum from parquet statistics
for _, fileStats := range allFileStats {
for _, fileStat := range fileStats {
if colStats, exists := fileStat.ColumnStats[spec.Column]; exists {
@@ -1459,22 +1468,55 @@ func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner
globalMinValue = colStats.MinValue
globalMin = e.extractRawValue(colStats.MinValue)
}
hasParquetStats = true
}
}
}
// Handle system columns that aren't in parquet column stats
if globalMin == nil {
// Step 2: Get minimum from live log data in each partition
for _, partition := range partitions {
// Get parquet source files for this partition (for deduplication)
partitionParquetSources := make(map[string]bool)
if partitionFileStats, exists := allFileStats[partition]; exists {
partitionParquetSources = e.extractParquetSourceFiles(partitionFileStats)
}
// Scan live log files for MIN value
liveLogMin, _, err := e.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources)
if err != nil {
fmt.Printf("Warning: failed to compute live log min for partition %s: %v\n", partition, err)
continue
}
// Update global minimum if live log has a smaller value
if liveLogMin != nil {
if globalMin == nil {
globalMin = liveLogMin
} else {
// Compare live log min with current global min
liveLogSchemaValue := e.convertRawValueToSchemaValue(liveLogMin)
if e.compareValues(liveLogSchemaValue, globalMinValue) < 0 {
globalMin = liveLogMin
globalMinValue = liveLogSchemaValue
}
}
}
}
// Step 3: Handle system columns that aren't in parquet column stats
if globalMin == nil && !hasParquetStats {
globalMin = e.getSystemColumnGlobalMin(spec.Column, allFileStats)
}
aggResults[i].Min = globalMin
case "MAX":
// Find global maximum across all files and partitions
// Hybrid approach: combine parquet statistics with live log scanning
var globalMax interface{}
var globalMaxValue *schema_pb.Value
hasParquetStats := false
// Step 1: Get maximum from parquet statistics
for _, fileStats := range allFileStats {
for _, fileStat := range fileStats {
if colStats, exists := fileStat.ColumnStats[spec.Column]; exists {
@@ -1482,12 +1524,43 @@ func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner
globalMaxValue = colStats.MaxValue
globalMax = e.extractRawValue(colStats.MaxValue)
}
hasParquetStats = true
}
}
}
// Handle system columns that aren't in parquet column stats
if globalMax == nil {
// Step 2: Get maximum from live log data in each partition
for _, partition := range partitions {
// Get parquet source files for this partition (for deduplication)
partitionParquetSources := make(map[string]bool)
if partitionFileStats, exists := allFileStats[partition]; exists {
partitionParquetSources = e.extractParquetSourceFiles(partitionFileStats)
}
// Scan live log files for MAX value
_, liveLogMax, err := e.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources)
if err != nil {
fmt.Printf("Warning: failed to compute live log max for partition %s: %v\n", partition, err)
continue
}
// Update global maximum if live log has a larger value
if liveLogMax != nil {
if globalMax == nil {
globalMax = liveLogMax
} else {
// Compare live log max with current global max
liveLogSchemaValue := e.convertRawValueToSchemaValue(liveLogMax)
if e.compareValues(liveLogSchemaValue, globalMaxValue) > 0 {
globalMax = liveLogMax
globalMaxValue = liveLogSchemaValue
}
}
}
}
// Step 3: Handle system columns that aren't in parquet column stats
if globalMax == nil && !hasParquetStats {
globalMax = e.getSystemColumnGlobalMax(spec.Column, allFileStats)
}
@@ -1516,6 +1589,283 @@ func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner
return result, true
}
// computeLiveLogMinMax scans live log files to find MIN/MAX values for a specific column
func (e *SQLEngine) computeLiveLogMinMax(partitionPath string, columnName string, parquetSourceFiles map[string]bool) (interface{}, interface{}, error) {
if e.catalog.brokerClient == nil {
return nil, nil, fmt.Errorf("no broker client available")
}
filerClient, err := e.catalog.brokerClient.GetFilerClient()
if err != nil {
return nil, nil, fmt.Errorf("failed to get filer client: %v", err)
}
var minValue, maxValue interface{}
var minSchemaValue, maxSchemaValue *schema_pb.Value
// Process each live log file
err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error {
// Skip parquet files and directories
if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") {
return nil
}
// Skip files that have been converted to parquet (deduplication)
if parquetSourceFiles[entry.Name] {
return nil
}
filePath := partitionPath + "/" + entry.Name
// Scan this log file for MIN/MAX values
fileMin, fileMax, err := e.computeFileMinMax(filerClient, filePath, columnName)
if err != nil {
fmt.Printf("Warning: failed to compute min/max for file %s: %v\n", filePath, err)
return nil // Continue with other files
}
// Update global min/max
if fileMin != nil {
if minSchemaValue == nil || e.compareValues(fileMin, minSchemaValue) < 0 {
minSchemaValue = fileMin
minValue = e.extractRawValue(fileMin)
}
}
if fileMax != nil {
if maxSchemaValue == nil || e.compareValues(fileMax, maxSchemaValue) > 0 {
maxSchemaValue = fileMax
maxValue = e.extractRawValue(fileMax)
}
}
return nil
})
if err != nil {
return nil, nil, fmt.Errorf("failed to process partition directory %s: %v", partitionPath, err)
}
return minValue, maxValue, nil
}
// computeFileMinMax scans a single log file to find MIN/MAX values for a specific column
func (e *SQLEngine) computeFileMinMax(filerClient filer_pb.FilerClient, filePath string, columnName string) (*schema_pb.Value, *schema_pb.Value, error) {
var minValue, maxValue *schema_pb.Value
err := e.eachLogEntryInFile(filerClient, filePath, func(logEntry *filer_pb.LogEntry) error {
// Convert log entry to record value
recordValue, _, err := e.convertLogEntryToRecordValue(logEntry)
if err != nil {
return err // This will stop processing this file but not fail the overall query
}
// Extract the requested column value
var columnValue *schema_pb.Value
if e.isSystemColumn(columnName) {
// Handle system columns
switch strings.ToLower(columnName) {
case "_timestamp_ns", "timestamp_ns":
columnValue = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}}
case "_key", "key":
columnValue = &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}}
case "_source", "source":
columnValue = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: "live_log"}}
}
} else {
// Handle regular data columns
if value, exists := recordValue.Fields[columnName]; exists {
columnValue = value
}
}
if columnValue == nil {
return nil // Skip this record
}
// Update min/max
if minValue == nil || e.compareValues(columnValue, minValue) < 0 {
minValue = columnValue
}
if maxValue == nil || e.compareValues(columnValue, maxValue) > 0 {
maxValue = columnValue
}
return nil
})
return minValue, maxValue, err
}
// eachLogEntryInFile reads a log file and calls the provided function for each log entry
func (e *SQLEngine) eachLogEntryInFile(filerClient filer_pb.FilerClient, filePath string, fn func(*filer_pb.LogEntry) error) error {
// Extract directory and filename
// filePath is like "partitionPath/filename"
lastSlash := strings.LastIndex(filePath, "/")
if lastSlash == -1 {
return fmt.Errorf("invalid file path: %s", filePath)
}
dirPath := filePath[:lastSlash]
fileName := filePath[lastSlash+1:]
// Get file entry
var fileEntry *filer_pb.Entry
err := filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(dirPath), "", func(entry *filer_pb.Entry, isLast bool) error {
if entry.Name == fileName {
fileEntry = entry
}
return nil
})
if err != nil {
return fmt.Errorf("failed to find file %s: %v", filePath, err)
}
if fileEntry == nil {
return fmt.Errorf("file not found: %s", filePath)
}
lookupFileIdFn := filer.LookupFn(filerClient)
// eachChunkFn processes each chunk's data (pattern from countRowsInLogFile)
eachChunkFn := func(buf []byte) error {
for pos := 0; pos+4 < len(buf); {
size := util.BytesToUint32(buf[pos : pos+4])
if pos+4+int(size) > len(buf) {
break
}
entryData := buf[pos+4 : pos+4+int(size)]
logEntry := &filer_pb.LogEntry{}
if err := proto.Unmarshal(entryData, logEntry); err != nil {
pos += 4 + int(size)
continue // Skip corrupted entries
}
// Call the provided function for each log entry
if err := fn(logEntry); err != nil {
return err
}
pos += 4 + int(size)
}
return nil
}
// Read file chunks and process them (pattern from countRowsInLogFile)
fileSize := filer.FileSize(fileEntry)
visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, fileEntry.Chunks, 0, int64(fileSize))
chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize))
for x := chunkViews.Front(); x != nil; x = x.Next {
chunk := x.Value
urlStrings, err := lookupFileIdFn(context.Background(), chunk.FileId)
if err != nil {
fmt.Printf("Warning: failed to lookup chunk %s: %v\n", chunk.FileId, err)
continue
}
if len(urlStrings) == 0 {
continue
}
// Read chunk data
// urlStrings[0] is already a complete URL (http://server:port/fileId)
data, _, err := util_http.Get(urlStrings[0])
if err != nil {
fmt.Printf("Warning: failed to read chunk %s from %s: %v\n", chunk.FileId, urlStrings[0], err)
continue
}
// Process this chunk
if err := eachChunkFn(data); err != nil {
return err
}
}
return nil
}
// convertLogEntryToRecordValue helper method (reuse existing logic)
func (e *SQLEngine) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) {
// Parse the log entry data as JSON
var jsonData map[string]interface{}
if err := json.Unmarshal(logEntry.Data, &jsonData); err != nil {
return nil, "", fmt.Errorf("failed to parse log entry JSON: %v", err)
}
// Create record value with system and user fields
recordValue := &schema_pb.RecordValue{Fields: make(map[string]*schema_pb.Value)}
// Add system columns
recordValue.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{
Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
}
recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
}
// Add user data fields
for fieldName, jsonValue := range jsonData {
if fieldName == SW_COLUMN_NAME_TS || fieldName == SW_COLUMN_NAME_KEY {
continue // Skip system fields in user data
}
// Convert JSON value to schema value (basic conversion)
schemaValue := e.convertJSONValueToSchemaValue(jsonValue)
if schemaValue != nil {
recordValue.Fields[fieldName] = schemaValue
}
}
return recordValue, "live_log", nil
}
// convertJSONValueToSchemaValue converts JSON values to schema_pb.Value
func (e *SQLEngine) convertJSONValueToSchemaValue(jsonValue interface{}) *schema_pb.Value {
switch v := jsonValue.(type) {
case string:
return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v}}
case float64:
// JSON numbers are always float64, try to detect if it's actually an integer
if v == float64(int64(v)) {
return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: int64(v)}}
}
return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v}}
case bool:
return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: v}}
case nil:
return nil
default:
// Convert other types to string
return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: fmt.Sprintf("%v", v)}}
}
}
// convertRawValueToSchemaValue converts raw Go values back to schema_pb.Value for comparison
func (e *SQLEngine) convertRawValueToSchemaValue(rawValue interface{}) *schema_pb.Value {
switch v := rawValue.(type) {
case int32:
return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: v}}
case int64:
return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v}}
case float32:
return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: v}}
case float64:
return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v}}
case string:
return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v}}
case bool:
return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: v}}
case []byte:
return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: v}}
default:
// Convert other types to string as fallback
return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: fmt.Sprintf("%v", v)}}
}
}
// canUseParquetStatsForAggregation determines if an aggregation can be optimized with parquet stats
func (e *SQLEngine) canUseParquetStatsForAggregation(spec AggregationSpec) bool {
switch spec.Function {