fix reading system fields

This commit is contained in:
chrislu
2025-09-01 13:28:38 -07:00
parent 8498240460
commit 471ba271dc
2 changed files with 88 additions and 16 deletions

View File

@@ -1008,6 +1008,7 @@ type AggregationSpec struct {
Function string // COUNT, SUM, AVG, MIN, MAX
Column string // Column name, or "*" for COUNT(*)
Alias string // Optional alias for the result column
Distinct bool // Support for DISTINCT keyword
}
// AggregationResult holds the computed result of an aggregation
@@ -1137,11 +1138,26 @@ func (e *SQLEngine) computeAggregations(results []HybridScanResult, aggregations
if spec.Column == "*" {
// COUNT(*) counts all rows
aggResults[i].Count = int64(len(results))
} else if spec.Distinct {
// COUNT(DISTINCT column) counts unique non-null values
uniqueValues := make(map[string]bool)
for _, result := range results {
if value := e.findColumnValue(result, spec.Column); value != nil {
if !e.isNullValue(value) {
// Use string representation for uniqueness check
rawValue := e.extractRawValue(value)
if rawValue != nil {
uniqueValues[fmt.Sprintf("%v", rawValue)] = true
}
}
}
}
aggResults[i].Count = int64(len(uniqueValues))
} else {
// COUNT(column) counts non-null values
count := int64(0)
for _, result := range results {
if value := e.findColumnValue(result.Values, spec.Column); value != nil {
if value := e.findColumnValue(result, spec.Column); value != nil {
if !e.isNullValue(value) {
count++
}
@@ -1153,7 +1169,7 @@ func (e *SQLEngine) computeAggregations(results []HybridScanResult, aggregations
case "SUM":
sum := float64(0)
for _, result := range results {
if value := e.findColumnValue(result.Values, spec.Column); value != nil {
if value := e.findColumnValue(result, spec.Column); value != nil {
if numValue := e.convertToNumber(value); numValue != nil {
sum += *numValue
}
@@ -1165,7 +1181,7 @@ func (e *SQLEngine) computeAggregations(results []HybridScanResult, aggregations
sum := float64(0)
count := int64(0)
for _, result := range results {
if value := e.findColumnValue(result.Values, spec.Column); value != nil {
if value := e.findColumnValue(result, spec.Column); value != nil {
if numValue := e.convertToNumber(value); numValue != nil {
sum += *numValue
count++
@@ -1179,9 +1195,11 @@ func (e *SQLEngine) computeAggregations(results []HybridScanResult, aggregations
case "MIN":
var min interface{}
var minValue *schema_pb.Value
for _, result := range results {
if value := e.findColumnValue(result.Values, spec.Column); value != nil {
if min == nil || e.compareValues(value, min) < 0 {
if value := e.findColumnValue(result, spec.Column); value != nil {
if minValue == nil || e.compareValues(value, minValue) < 0 {
minValue = value
min = e.extractRawValue(value)
}
}
@@ -1190,9 +1208,11 @@ func (e *SQLEngine) computeAggregations(results []HybridScanResult, aggregations
case "MAX":
var max interface{}
var maxValue *schema_pb.Value
for _, result := range results {
if value := e.findColumnValue(result.Values, spec.Column); value != nil {
if max == nil || e.compareValues(value, max) > 0 {
if value := e.findColumnValue(result, spec.Column); value != nil {
if maxValue == nil || e.compareValues(value, maxValue) > 0 {
maxValue = value
max = e.extractRawValue(value)
}
}
@@ -1241,20 +1261,29 @@ func (e *SQLEngine) extractRawValue(value *schema_pb.Value) interface{} {
return v.StringValue
case *schema_pb.Value_BoolValue:
return v.BoolValue
case *schema_pb.Value_BytesValue:
return string(v.BytesValue) // Convert bytes to string for comparison
}
return nil
}
func (e *SQLEngine) compareValues(value1 *schema_pb.Value, value2 interface{}) int {
func (e *SQLEngine) compareValues(value1 *schema_pb.Value, value2 *schema_pb.Value) int {
if value2 == nil {
return 1 // value1 > nil
}
raw1 := e.extractRawValue(value1)
raw2 := e.extractRawValue(value2)
if raw1 == nil {
return -1
}
if raw2 == nil {
return 1
}
// Simple comparison - in a full implementation this would handle type coercion
switch v1 := raw1.(type) {
case int32:
if v2, ok := value2.(int32); ok {
if v2, ok := raw2.(int32); ok {
if v1 < v2 {
return -1
} else if v1 > v2 {
@@ -1263,7 +1292,16 @@ func (e *SQLEngine) compareValues(value1 *schema_pb.Value, value2 interface{}) i
return 0
}
case int64:
if v2, ok := value2.(int64); ok {
if v2, ok := raw2.(int64); ok {
if v1 < v2 {
return -1
} else if v1 > v2 {
return 1
}
return 0
}
case float32:
if v2, ok := raw2.(float32); ok {
if v1 < v2 {
return -1
} else if v1 > v2 {
@@ -1272,7 +1310,7 @@ func (e *SQLEngine) compareValues(value1 *schema_pb.Value, value2 interface{}) i
return 0
}
case float64:
if v2, ok := value2.(float64); ok {
if v2, ok := raw2.(float64); ok {
if v1 < v2 {
return -1
} else if v1 > v2 {
@@ -1281,7 +1319,7 @@ func (e *SQLEngine) compareValues(value1 *schema_pb.Value, value2 interface{}) i
return 0
}
case string:
if v2, ok := value2.(string); ok {
if v2, ok := raw2.(string); ok {
if v1 < v2 {
return -1
} else if v1 > v2 {
@@ -1289,6 +1327,15 @@ func (e *SQLEngine) compareValues(value1 *schema_pb.Value, value2 interface{}) i
}
return 0
}
case bool:
if v2, ok := raw2.(bool); ok {
if v1 == v2 {
return 0
} else if v1 && !v2 {
return 1
}
return -1
}
}
return 0
}
@@ -1337,15 +1384,27 @@ func (e *SQLEngine) convertRawValueToSQL(value interface{}) sqltypes.Value {
}
// findColumnValue performs case-insensitive lookup of column values
func (e *SQLEngine) findColumnValue(values map[string]*schema_pb.Value, columnName string) *schema_pb.Value {
// Now includes support for system columns stored in HybridScanResult
func (e *SQLEngine) findColumnValue(result HybridScanResult, columnName string) *schema_pb.Value {
// Check system columns first (stored separately in HybridScanResult)
lowerColumnName := strings.ToLower(columnName)
switch lowerColumnName {
case "_timestamp_ns", "timestamp_ns":
return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: result.Timestamp}}
case "_key", "key":
return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: result.Key}}
case "_source", "source":
return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: result.Source}}
}
// Then check regular columns in Values map
// First try exact match
if value, exists := values[columnName]; exists {
if value, exists := result.Values[columnName]; exists {
return value
}
// Then try case-insensitive match
lowerColumnName := strings.ToLower(columnName)
for key, value := range values {
for key, value := range result.Values {
if strings.ToLower(key) == lowerColumnName {
return value
}

View File

@@ -296,6 +296,19 @@ func (hms *HybridMessageScanner) convertLogEntryToRecordValue(logEntry *filer_pb
recordValue := &schema_pb.RecordValue{}
if err := proto.Unmarshal(logEntry.Data, recordValue); err == nil {
// This is an archived message from Parquet files
// ✅ FIX: Add system columns from LogEntry to RecordValue
if recordValue.Fields == nil {
recordValue.Fields = make(map[string]*schema_pb.Value)
}
// Add system columns from LogEntry
recordValue.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{
Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
}
recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
}
return recordValue, "parquet_archive", nil
}