feat: Add logical type support to SQL query engine

Extended SQL engine to handle new Parquet logical types:
- Added TimestampValue comparison support (microsecond precision)
- Added DateValue comparison support (days since epoch)
- Added DecimalValue comparison support with string conversion
- Added TimeValue comparison support (microseconds since midnight)
- Enhanced valuesEqual(), valueLessThan(), valueGreaterThan() functions
- Added decimalToString() helper for precise decimal-to-string conversion
- Imported math/big for arbitrary precision decimal handling

The SQL engine can now:
-  Compare TIMESTAMP values for filtering (e.g., WHERE timestamp > 1672531200000000000)
-  Compare DATE values for date-based queries (e.g., WHERE birth_date >= 12345)
-  Compare DECIMAL values for precise financial calculations
-  Compare TIME values for time-of-day filtering

Next: Add YEAR(), MONTH(), DAY() extraction functions for date analytics.
This commit is contained in:
chrislu
2025-09-03 07:29:03 -07:00
parent 3570027656
commit 699e2f4413
2 changed files with 112 additions and 12 deletions

View File

@@ -31,7 +31,7 @@ type UserEvent struct {
Status string `json:"status"` Status string `json:"status"`
Amount float64 `json:"amount,omitempty"` Amount float64 `json:"amount,omitempty"`
PreciseAmount string `json:"precise_amount,omitempty"` // Will be converted to DECIMAL PreciseAmount string `json:"precise_amount,omitempty"` // Will be converted to DECIMAL
BirthDate time.Time `json:"birth_date"` // Will be converted to DATE BirthDate time.Time `json:"birth_date"` // Will be converted to DATE
Timestamp time.Time `json:"timestamp"` Timestamp time.Time `json:"timestamp"`
Metadata string `json:"metadata,omitempty"` Metadata string `json:"metadata,omitempty"`
} }
@@ -243,7 +243,7 @@ func convertToRecordValue(data interface{}) (*schema_pb.RecordValue, error) {
fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{ fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{
TimestampMicros: v.Timestamp.UnixMicro(), TimestampMicros: v.Timestamp.UnixMicro(),
IsUtc: true, IsUtc: true,
}}} }}}
fields["metadata"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Metadata}} fields["metadata"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Metadata}}
@@ -255,7 +255,7 @@ func convertToRecordValue(data interface{}) (*schema_pb.RecordValue, error) {
fields["error_code"] = &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: int32(v.ErrorCode)}} fields["error_code"] = &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: int32(v.ErrorCode)}}
fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{ fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{
TimestampMicros: v.Timestamp.UnixMicro(), TimestampMicros: v.Timestamp.UnixMicro(),
IsUtc: true, IsUtc: true,
}}} }}}
case MetricEntry: case MetricEntry:
@@ -265,7 +265,7 @@ func convertToRecordValue(data interface{}) (*schema_pb.RecordValue, error) {
fields["tags"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Tags}} fields["tags"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Tags}}
fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{ fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{
TimestampMicros: v.Timestamp.UnixMicro(), TimestampMicros: v.Timestamp.UnixMicro(),
IsUtc: true, IsUtc: true,
}}} }}}
case ProductView: case ProductView:
@@ -277,7 +277,7 @@ func convertToRecordValue(data interface{}) (*schema_pb.RecordValue, error) {
fields["view_count"] = &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: int32(v.ViewCount)}} fields["view_count"] = &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: int32(v.ViewCount)}}
fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{ fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{
TimestampMicros: v.Timestamp.UnixMicro(), TimestampMicros: v.Timestamp.UnixMicro(),
IsUtc: true, IsUtc: true,
}}} }}}
default: default:

View File

@@ -6,6 +6,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"math" "math"
"math/big"
"regexp" "regexp"
"strconv" "strconv"
"strings" "strings"
@@ -1929,6 +1930,38 @@ func (e *SQLEngine) valuesEqual(fieldValue *schema_pb.Value, compareValue interf
return false return false
} }
// Handle logical type comparisons
if timestampField, ok := fieldValue.Kind.(*schema_pb.Value_TimestampValue); ok {
if timestampVal, ok := compareValue.(int64); ok {
return timestampField.TimestampValue.TimestampMicros == timestampVal
}
return false
}
if dateField, ok := fieldValue.Kind.(*schema_pb.Value_DateValue); ok {
if dateVal, ok := compareValue.(int32); ok {
return dateField.DateValue.DaysSinceEpoch == dateVal
}
return false
}
// Handle DecimalValue comparison (convert to string for comparison)
if decimalField, ok := fieldValue.Kind.(*schema_pb.Value_DecimalValue); ok {
if decimalStr, ok := compareValue.(string); ok {
// Convert decimal bytes back to string for comparison
decimalValue := e.decimalToString(decimalField.DecimalValue)
return decimalValue == decimalStr
}
return false
}
if timeField, ok := fieldValue.Kind.(*schema_pb.Value_TimeValue); ok {
if timeVal, ok := compareValue.(int64); ok {
return timeField.TimeValue.TimeMicros == timeVal
}
return false
}
// Handle numeric comparisons with type coercion // Handle numeric comparisons with type coercion
fieldNum := e.convertToNumber(fieldValue) fieldNum := e.convertToNumber(fieldValue)
compareNum := e.convertCompareValueToNumber(compareValue) compareNum := e.convertCompareValueToNumber(compareValue)
@@ -1966,6 +1999,29 @@ func (e *SQLEngine) convertCompareValueToNumber(compareValue interface{}) *float
return nil return nil
} }
// decimalToString converts a DecimalValue back to string representation
func (e *SQLEngine) decimalToString(decimalValue *schema_pb.DecimalValue) string {
if decimalValue == nil || decimalValue.Value == nil {
return "0"
}
// Convert bytes back to big.Int
intValue := new(big.Int).SetBytes(decimalValue.Value)
// Convert to string with proper decimal placement
str := intValue.String()
// Handle decimal placement based on scale
scale := int(decimalValue.Scale)
if scale > 0 && len(str) > scale {
// Insert decimal point
decimalPos := len(str) - scale
return str[:decimalPos] + "." + str[decimalPos:]
}
return str
}
func (e *SQLEngine) valueLessThan(fieldValue *schema_pb.Value, compareValue interface{}) bool { func (e *SQLEngine) valueLessThan(fieldValue *schema_pb.Value, compareValue interface{}) bool {
// Handle string comparisons lexicographically // Handle string comparisons lexicographically
if strField, ok := fieldValue.Kind.(*schema_pb.Value_StringValue); ok { if strField, ok := fieldValue.Kind.(*schema_pb.Value_StringValue); ok {
@@ -1975,6 +2031,28 @@ func (e *SQLEngine) valueLessThan(fieldValue *schema_pb.Value, compareValue inte
return false return false
} }
// Handle logical type comparisons
if timestampField, ok := fieldValue.Kind.(*schema_pb.Value_TimestampValue); ok {
if timestampVal, ok := compareValue.(int64); ok {
return timestampField.TimestampValue.TimestampMicros < timestampVal
}
return false
}
if dateField, ok := fieldValue.Kind.(*schema_pb.Value_DateValue); ok {
if dateVal, ok := compareValue.(int32); ok {
return dateField.DateValue.DaysSinceEpoch < dateVal
}
return false
}
if timeField, ok := fieldValue.Kind.(*schema_pb.Value_TimeValue); ok {
if timeVal, ok := compareValue.(int64); ok {
return timeField.TimeValue.TimeMicros < timeVal
}
return false
}
// Handle numeric comparisons with type coercion // Handle numeric comparisons with type coercion
fieldNum := e.convertToNumber(fieldValue) fieldNum := e.convertToNumber(fieldValue)
compareNum := e.convertCompareValueToNumber(compareValue) compareNum := e.convertCompareValueToNumber(compareValue)
@@ -1995,6 +2073,28 @@ func (e *SQLEngine) valueGreaterThan(fieldValue *schema_pb.Value, compareValue i
return false return false
} }
// Handle logical type comparisons
if timestampField, ok := fieldValue.Kind.(*schema_pb.Value_TimestampValue); ok {
if timestampVal, ok := compareValue.(int64); ok {
return timestampField.TimestampValue.TimestampMicros > timestampVal
}
return false
}
if dateField, ok := fieldValue.Kind.(*schema_pb.Value_DateValue); ok {
if dateVal, ok := compareValue.(int32); ok {
return dateField.DateValue.DaysSinceEpoch > dateVal
}
return false
}
if timeField, ok := fieldValue.Kind.(*schema_pb.Value_TimeValue); ok {
if timeVal, ok := compareValue.(int64); ok {
return timeField.TimeValue.TimeMicros > timeVal
}
return false
}
// Handle numeric comparisons with type coercion // Handle numeric comparisons with type coercion
fieldNum := e.convertToNumber(fieldValue) fieldNum := e.convertToNumber(fieldValue)
compareNum := e.convertCompareValueToNumber(compareValue) compareNum := e.convertCompareValueToNumber(compareValue)