diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index b122e1781..9c7079f2b 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -1733,42 +1733,66 @@ func (comp *AggregationComputer) computeGlobalMin(spec AggregationSpec, dataSour // Step 1: Get minimum from parquet statistics for _, fileStats := range dataSources.ParquetFiles { for _, fileStat := range fileStats { - if colStats, exists := fileStat.ColumnStats[spec.Column]; exists { + // Try case-insensitive column lookup + var colStats *ParquetColumnStats + var found bool + + // First try exact match + if stats, exists := fileStat.ColumnStats[spec.Column]; exists { + colStats = stats + found = true + } else { + // Try case-insensitive lookup + for colName, stats := range fileStat.ColumnStats { + if strings.EqualFold(colName, spec.Column) { + colStats = stats + found = true + break + } + } + } + + if found && colStats != nil && colStats.MinValue != nil { if globalMinValue == nil || comp.engine.compareValues(colStats.MinValue, globalMinValue) < 0 { globalMinValue = colStats.MinValue - globalMin = comp.engine.extractRawValue(colStats.MinValue) + extractedValue := comp.engine.extractRawValue(colStats.MinValue) + if extractedValue != nil { + globalMin = extractedValue + hasParquetStats = true + } } - hasParquetStats = true } } } - // Step 2: Get minimum from live log data - for _, partition := range partitions { - partitionParquetSources := make(map[string]bool) - if partitionFileStats, exists := dataSources.ParquetFiles[partition]; exists { - partitionParquetSources = comp.engine.extractParquetSourceFiles(partitionFileStats) - } + // Step 2: Get minimum from live log data (only if no live logs or if we need to compare) + if dataSources.LiveLogRowCount > 0 { + for _, partition := range partitions { + partitionParquetSources := make(map[string]bool) + if partitionFileStats, exists := dataSources.ParquetFiles[partition]; exists { + partitionParquetSources = comp.engine.extractParquetSourceFiles(partitionFileStats) + } - liveLogMin, _, err := comp.engine.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources) - if err != nil { - continue // Skip partitions with errors - } + liveLogMin, _, err := comp.engine.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources) + if err != nil { + continue // Skip partitions with errors + } - if liveLogMin != nil { - if globalMin == nil { - globalMin = liveLogMin - } else { - liveLogSchemaValue := comp.engine.convertRawValueToSchemaValue(liveLogMin) - if comp.engine.compareValues(liveLogSchemaValue, globalMinValue) < 0 { + if liveLogMin != nil { + if globalMin == nil { globalMin = liveLogMin - globalMinValue = liveLogSchemaValue + } else { + liveLogSchemaValue := comp.engine.convertRawValueToSchemaValue(liveLogMin) + if liveLogSchemaValue != nil && comp.engine.compareValues(liveLogSchemaValue, globalMinValue) < 0 { + globalMin = liveLogMin + globalMinValue = liveLogSchemaValue + } } } } } - // Step 3: Handle system columns + // Step 3: Handle system columns if no regular data found if globalMin == nil && !hasParquetStats { globalMin = comp.engine.getSystemColumnGlobalMin(spec.Column, dataSources.ParquetFiles) } @@ -1785,42 +1809,66 @@ func (comp *AggregationComputer) computeGlobalMax(spec AggregationSpec, dataSour // Step 1: Get maximum from parquet statistics for _, fileStats := range dataSources.ParquetFiles { for _, fileStat := range fileStats { - if colStats, exists := fileStat.ColumnStats[spec.Column]; exists { + // Try case-insensitive column lookup + var colStats *ParquetColumnStats + var found bool + + // First try exact match + if stats, exists := fileStat.ColumnStats[spec.Column]; exists { + colStats = stats + found = true + } else { + // Try case-insensitive lookup + for colName, stats := range fileStat.ColumnStats { + if strings.EqualFold(colName, spec.Column) { + colStats = stats + found = true + break + } + } + } + + if found && colStats != nil && colStats.MaxValue != nil { if globalMaxValue == nil || comp.engine.compareValues(colStats.MaxValue, globalMaxValue) > 0 { globalMaxValue = colStats.MaxValue - globalMax = comp.engine.extractRawValue(colStats.MaxValue) + extractedValue := comp.engine.extractRawValue(colStats.MaxValue) + if extractedValue != nil { + globalMax = extractedValue + hasParquetStats = true + } } - hasParquetStats = true } } } - // Step 2: Get maximum from live log data - for _, partition := range partitions { - partitionParquetSources := make(map[string]bool) - if partitionFileStats, exists := dataSources.ParquetFiles[partition]; exists { - partitionParquetSources = comp.engine.extractParquetSourceFiles(partitionFileStats) - } + // Step 2: Get maximum from live log data (only if live logs exist) + if dataSources.LiveLogRowCount > 0 { + for _, partition := range partitions { + partitionParquetSources := make(map[string]bool) + if partitionFileStats, exists := dataSources.ParquetFiles[partition]; exists { + partitionParquetSources = comp.engine.extractParquetSourceFiles(partitionFileStats) + } - _, liveLogMax, err := comp.engine.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources) - if err != nil { - continue // Skip partitions with errors - } + _, liveLogMax, err := comp.engine.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources) + if err != nil { + continue // Skip partitions with errors + } - if liveLogMax != nil { - if globalMax == nil { - globalMax = liveLogMax - } else { - liveLogSchemaValue := comp.engine.convertRawValueToSchemaValue(liveLogMax) - if comp.engine.compareValues(liveLogSchemaValue, globalMaxValue) > 0 { + if liveLogMax != nil { + if globalMax == nil { globalMax = liveLogMax - globalMaxValue = liveLogSchemaValue + } else { + liveLogSchemaValue := comp.engine.convertRawValueToSchemaValue(liveLogMax) + if liveLogSchemaValue != nil && comp.engine.compareValues(liveLogSchemaValue, globalMaxValue) > 0 { + globalMax = liveLogMax + globalMaxValue = liveLogSchemaValue + } } } } } - // Step 3: Handle system columns + // Step 3: Handle system columns if no regular data found if globalMax == nil && !hasParquetStats { globalMax = comp.engine.getSystemColumnGlobalMax(spec.Column, dataSources.ParquetFiles) } diff --git a/weed/query/engine/engine_test.go b/weed/query/engine/engine_test.go index d29799803..c2f40ca5a 100644 --- a/weed/query/engine/engine_test.go +++ b/weed/query/engine/engine_test.go @@ -239,6 +239,483 @@ func TestAggregationComputer_ComputeFastPathAggregations(t *testing.T) { } } +// Test case-insensitive column lookup and null handling for MIN/MAX aggregations +func TestAggregationComputer_MinMaxEdgeCases(t *testing.T) { + engine := NewMockSQLEngine() + computer := NewAggregationComputer(engine.SQLEngine) + + tests := []struct { + name string + dataSources *TopicDataSources + aggregations []AggregationSpec + validate func(t *testing.T, results []AggregationResult, err error) + }{ + { + name: "Case insensitive column lookup", + dataSources: &TopicDataSources{ + ParquetFiles: map[string][]*ParquetFileStats{ + "/topics/test/partition-1": { + { + RowCount: 50, + ColumnStats: map[string]*ParquetColumnStats{ + "ID": createMockColumnStats("ID", int64(5), int64(95)), // Uppercase column name + }, + }, + }, + }, + ParquetRowCount: 50, + LiveLogRowCount: 0, + PartitionsCount: 1, + }, + aggregations: []AggregationSpec{ + {Function: "MIN", Column: "id"}, // lowercase column name + {Function: "MAX", Column: "id"}, + }, + validate: func(t *testing.T, results []AggregationResult, err error) { + assert.NoError(t, err) + assert.Len(t, results, 2) + assert.Equal(t, int64(5), results[0].Min, "MIN should work with case-insensitive lookup") + assert.Equal(t, int64(95), results[1].Max, "MAX should work with case-insensitive lookup") + }, + }, + { + name: "Null column stats handling", + dataSources: &TopicDataSources{ + ParquetFiles: map[string][]*ParquetFileStats{ + "/topics/test/partition-1": { + { + RowCount: 50, + ColumnStats: map[string]*ParquetColumnStats{ + "id": { + ColumnName: "id", + MinValue: nil, // Null min value + MaxValue: nil, // Null max value + NullCount: 50, + RowCount: 50, + }, + }, + }, + }, + }, + ParquetRowCount: 50, + LiveLogRowCount: 0, + PartitionsCount: 1, + }, + aggregations: []AggregationSpec{ + {Function: "MIN", Column: "id"}, + {Function: "MAX", Column: "id"}, + }, + validate: func(t *testing.T, results []AggregationResult, err error) { + assert.NoError(t, err) + assert.Len(t, results, 2) + // When stats are null, should fall back to system column or return nil + // This tests that we don't crash on null stats + }, + }, + { + name: "Mixed data types - string column", + dataSources: &TopicDataSources{ + ParquetFiles: map[string][]*ParquetFileStats{ + "/topics/test/partition-1": { + { + RowCount: 30, + ColumnStats: map[string]*ParquetColumnStats{ + "name": createMockColumnStats("name", "Alice", "Zoe"), + }, + }, + }, + }, + ParquetRowCount: 30, + LiveLogRowCount: 0, + PartitionsCount: 1, + }, + aggregations: []AggregationSpec{ + {Function: "MIN", Column: "name"}, + {Function: "MAX", Column: "name"}, + }, + validate: func(t *testing.T, results []AggregationResult, err error) { + assert.NoError(t, err) + assert.Len(t, results, 2) + assert.Equal(t, "Alice", results[0].Min) + assert.Equal(t, "Zoe", results[1].Max) + }, + }, + { + name: "Mixed data types - float column", + dataSources: &TopicDataSources{ + ParquetFiles: map[string][]*ParquetFileStats{ + "/topics/test/partition-1": { + { + RowCount: 25, + ColumnStats: map[string]*ParquetColumnStats{ + "price": createMockColumnStats("price", float64(19.99), float64(299.50)), + }, + }, + }, + }, + ParquetRowCount: 25, + LiveLogRowCount: 0, + PartitionsCount: 1, + }, + aggregations: []AggregationSpec{ + {Function: "MIN", Column: "price"}, + {Function: "MAX", Column: "price"}, + }, + validate: func(t *testing.T, results []AggregationResult, err error) { + assert.NoError(t, err) + assert.Len(t, results, 2) + assert.Equal(t, float64(19.99), results[0].Min) + assert.Equal(t, float64(299.50), results[1].Max) + }, + }, + { + name: "Column not found in parquet stats", + dataSources: &TopicDataSources{ + ParquetFiles: map[string][]*ParquetFileStats{ + "/topics/test/partition-1": { + { + RowCount: 20, + ColumnStats: map[string]*ParquetColumnStats{ + "id": createMockColumnStats("id", int64(1), int64(100)), + // Note: "nonexistent_column" is not in stats + }, + }, + }, + }, + ParquetRowCount: 20, + LiveLogRowCount: 10, // Has live logs to fall back to + PartitionsCount: 1, + }, + aggregations: []AggregationSpec{ + {Function: "MIN", Column: "nonexistent_column"}, + {Function: "MAX", Column: "nonexistent_column"}, + }, + validate: func(t *testing.T, results []AggregationResult, err error) { + assert.NoError(t, err) + assert.Len(t, results, 2) + // Should fall back to live log processing or return nil + // The key is that it shouldn't crash + }, + }, + { + name: "Multiple parquet files with different ranges", + dataSources: &TopicDataSources{ + ParquetFiles: map[string][]*ParquetFileStats{ + "/topics/test/partition-1": { + { + RowCount: 30, + ColumnStats: map[string]*ParquetColumnStats{ + "score": createMockColumnStats("score", int64(10), int64(50)), + }, + }, + { + RowCount: 40, + ColumnStats: map[string]*ParquetColumnStats{ + "score": createMockColumnStats("score", int64(5), int64(75)), // Lower min, higher max + }, + }, + }, + }, + ParquetRowCount: 70, + LiveLogRowCount: 0, + PartitionsCount: 1, + }, + aggregations: []AggregationSpec{ + {Function: "MIN", Column: "score"}, + {Function: "MAX", Column: "score"}, + }, + validate: func(t *testing.T, results []AggregationResult, err error) { + assert.NoError(t, err) + assert.Len(t, results, 2) + assert.Equal(t, int64(5), results[0].Min, "Should find global minimum across all files") + assert.Equal(t, int64(75), results[1].Max, "Should find global maximum across all files") + }, + }, + } + + partitions := []string{"/topics/test/partition-1"} + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + results, err := computer.ComputeFastPathAggregations(ctx, tt.aggregations, tt.dataSources, partitions) + tt.validate(t, results, err) + }) + } +} + +// Test the specific bug where MIN/MAX was returning empty values +func TestAggregationComputer_MinMaxEmptyValuesBugFix(t *testing.T) { + engine := NewMockSQLEngine() + computer := NewAggregationComputer(engine.SQLEngine) + + // This test specifically addresses the bug where MIN/MAX returned empty + // due to improper null checking and extraction logic + dataSources := &TopicDataSources{ + ParquetFiles: map[string][]*ParquetFileStats{ + "/topics/test/test-topic/partition1": { + { + RowCount: 100, + ColumnStats: map[string]*ParquetColumnStats{ + "id": { + ColumnName: "id", + MinValue: &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: 0}}, // Min should be 0 + MaxValue: &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: 99}}, // Max should be 99 + NullCount: 0, + RowCount: 100, + }, + }, + }, + }, + }, + ParquetRowCount: 100, + LiveLogRowCount: 0, // No live logs, pure parquet stats + PartitionsCount: 1, + } + + partitions := []string{"/topics/test/test-topic/partition1"} + + tests := []struct { + name string + aggregSpec AggregationSpec + expected interface{} + }{ + { + name: "MIN should return 0 not empty", + aggregSpec: AggregationSpec{Function: "MIN", Column: "id"}, + expected: int32(0), // Should extract the actual minimum value + }, + { + name: "MAX should return 99 not empty", + aggregSpec: AggregationSpec{Function: "MAX", Column: "id"}, + expected: int32(99), // Should extract the actual maximum value + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + results, err := computer.ComputeFastPathAggregations(ctx, []AggregationSpec{tt.aggregSpec}, dataSources, partitions) + + assert.NoError(t, err) + assert.Len(t, results, 1) + + // Verify the result is not nil/empty + if tt.aggregSpec.Function == "MIN" { + assert.NotNil(t, results[0].Min, "MIN result should not be nil") + assert.Equal(t, tt.expected, results[0].Min) + } else if tt.aggregSpec.Function == "MAX" { + assert.NotNil(t, results[0].Max, "MAX result should not be nil") + assert.Equal(t, tt.expected, results[0].Max) + } + }) + } +} + +// Test the formatAggregationResult function with MIN/MAX edge cases +func TestSQLEngine_FormatAggregationResult_MinMax(t *testing.T) { + engine := NewTestSQLEngine() + + tests := []struct { + name string + spec AggregationSpec + result AggregationResult + expected string + }{ + { + name: "MIN with zero value should not be empty", + spec: AggregationSpec{Function: "MIN", Column: "id"}, + result: AggregationResult{Min: int32(0)}, + expected: "0", + }, + { + name: "MAX with large value", + spec: AggregationSpec{Function: "MAX", Column: "id"}, + result: AggregationResult{Max: int32(99)}, + expected: "99", + }, + { + name: "MIN with negative value", + spec: AggregationSpec{Function: "MIN", Column: "score"}, + result: AggregationResult{Min: int64(-50)}, + expected: "-50", + }, + { + name: "MAX with float value", + spec: AggregationSpec{Function: "MAX", Column: "price"}, + result: AggregationResult{Max: float64(299.99)}, + expected: "299.99", + }, + { + name: "MIN with string value", + spec: AggregationSpec{Function: "MIN", Column: "name"}, + result: AggregationResult{Min: "Alice"}, + expected: "Alice", + }, + { + name: "MIN with nil should return NULL", + spec: AggregationSpec{Function: "MIN", Column: "missing"}, + result: AggregationResult{Min: nil}, + expected: "", // NULL values display as empty + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sqlValue := engine.formatAggregationResult(tt.spec, tt.result) + assert.Equal(t, tt.expected, sqlValue.String()) + }) + } +} + +// Test the direct formatAggregationResult scenario that was originally broken +func TestSQLEngine_MinMaxBugFixIntegration(t *testing.T) { + // This test focuses on the core bug fix without the complexity of table discovery + // It directly tests the scenario where MIN/MAX returned empty due to the bug + + engine := NewTestSQLEngine() + + // Test the direct formatting path that was failing + tests := []struct { + name string + aggregSpec AggregationSpec + aggResult AggregationResult + expectedEmpty bool + expectedValue string + }{ + { + name: "MIN with zero should not be empty (the original bug)", + aggregSpec: AggregationSpec{Function: "MIN", Column: "id", Alias: "MIN(id)"}, + aggResult: AggregationResult{Min: int32(0)}, // This was returning empty before fix + expectedEmpty: false, + expectedValue: "0", + }, + { + name: "MAX with valid value should not be empty", + aggregSpec: AggregationSpec{Function: "MAX", Column: "id", Alias: "MAX(id)"}, + aggResult: AggregationResult{Max: int32(99)}, + expectedEmpty: false, + expectedValue: "99", + }, + { + name: "MIN with negative value should work", + aggregSpec: AggregationSpec{Function: "MIN", Column: "score", Alias: "MIN(score)"}, + aggResult: AggregationResult{Min: int64(-10)}, + expectedEmpty: false, + expectedValue: "-10", + }, + { + name: "MIN with nil should be empty (expected behavior)", + aggregSpec: AggregationSpec{Function: "MIN", Column: "missing", Alias: "MIN(missing)"}, + aggResult: AggregationResult{Min: nil}, + expectedEmpty: true, + expectedValue: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Test the formatAggregationResult function directly + sqlValue := engine.formatAggregationResult(tt.aggregSpec, tt.aggResult) + result := sqlValue.String() + + if tt.expectedEmpty { + assert.Empty(t, result, "Result should be empty for nil values") + } else { + assert.NotEmpty(t, result, "Result should not be empty") + assert.Equal(t, tt.expectedValue, result) + } + }) + } +} + +// Test the tryFastParquetAggregation method specifically for the bug +func TestSQLEngine_FastParquetAggregationBugFix(t *testing.T) { + // This test verifies that the fast path aggregation logic works correctly + // and doesn't return nil/empty values when it should return actual data + + engine := NewMockSQLEngine() + computer := NewAggregationComputer(engine.SQLEngine) + + // Create realistic data sources that mimic the user's scenario + dataSources := &TopicDataSources{ + ParquetFiles: map[string][]*ParquetFileStats{ + "/topics/test/test-topic/v2025-09-01-22-54-02/0000-0630": { + { + RowCount: 100, + ColumnStats: map[string]*ParquetColumnStats{ + "id": { + ColumnName: "id", + MinValue: &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: 0}}, + MaxValue: &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: 99}}, + NullCount: 0, + RowCount: 100, + }, + }, + }, + }, + }, + ParquetRowCount: 100, + LiveLogRowCount: 0, // Pure parquet scenario + PartitionsCount: 1, + } + + partitions := []string{"/topics/test/test-topic/v2025-09-01-22-54-02/0000-0630"} + + tests := []struct { + name string + aggregations []AggregationSpec + validateResults func(t *testing.T, results []AggregationResult) + }{ + { + name: "Single MIN aggregation should return value not nil", + aggregations: []AggregationSpec{ + {Function: "MIN", Column: "id", Alias: "MIN(id)"}, + }, + validateResults: func(t *testing.T, results []AggregationResult) { + assert.Len(t, results, 1) + assert.NotNil(t, results[0].Min, "MIN result should not be nil") + assert.Equal(t, int32(0), results[0].Min, "MIN should return the correct minimum value") + }, + }, + { + name: "Single MAX aggregation should return value not nil", + aggregations: []AggregationSpec{ + {Function: "MAX", Column: "id", Alias: "MAX(id)"}, + }, + validateResults: func(t *testing.T, results []AggregationResult) { + assert.Len(t, results, 1) + assert.NotNil(t, results[0].Max, "MAX result should not be nil") + assert.Equal(t, int32(99), results[0].Max, "MAX should return the correct maximum value") + }, + }, + { + name: "Combined MIN/MAX should both return values", + aggregations: []AggregationSpec{ + {Function: "MIN", Column: "id", Alias: "MIN(id)"}, + {Function: "MAX", Column: "id", Alias: "MAX(id)"}, + }, + validateResults: func(t *testing.T, results []AggregationResult) { + assert.Len(t, results, 2) + assert.NotNil(t, results[0].Min, "MIN result should not be nil") + assert.NotNil(t, results[1].Max, "MAX result should not be nil") + assert.Equal(t, int32(0), results[0].Min) + assert.Equal(t, int32(99), results[1].Max) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + results, err := computer.ComputeFastPathAggregations(ctx, tt.aggregations, dataSources, partitions) + + assert.NoError(t, err, "ComputeFastPathAggregations should not error") + tt.validateResults(t, results) + }) + } +} + // Test ExecutionPlanBuilder func TestExecutionPlanBuilder_BuildAggregationPlan(t *testing.T) { engine := NewMockSQLEngine()