This commit is contained in:
chrislu
2025-09-01 20:19:59 -07:00
parent 61ad3c39ac
commit e385f0ce7d
6 changed files with 1036 additions and 170 deletions

View File

@@ -0,0 +1,551 @@
package engine
import (
"context"
"fmt"
"strings"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
"github.com/xwb1989/sqlparser"
)
// AggregationSpec defines an aggregation function to be computed
type AggregationSpec struct {
Function string // COUNT, SUM, AVG, MIN, MAX
Column string // Column name, or "*" for COUNT(*)
Alias string // Optional alias for the result column
Distinct bool // Support for DISTINCT keyword
}
// AggregationResult holds the computed result of an aggregation
type AggregationResult struct {
Count int64
Sum float64
Min interface{}
Max interface{}
}
// AggregationStrategy represents the strategy for executing aggregations
type AggregationStrategy struct {
CanUseFastPath bool
Reason string
UnsupportedSpecs []AggregationSpec
}
// TopicDataSources represents the data sources available for a topic
type TopicDataSources struct {
ParquetFiles map[string][]*ParquetFileStats // partitionPath -> parquet file stats
ParquetRowCount int64
LiveLogRowCount int64
PartitionsCount int
}
// FastPathOptimizer handles fast path aggregation optimization decisions
type FastPathOptimizer struct {
engine *SQLEngine
}
// NewFastPathOptimizer creates a new fast path optimizer
func NewFastPathOptimizer(engine *SQLEngine) *FastPathOptimizer {
return &FastPathOptimizer{engine: engine}
}
// DetermineStrategy analyzes aggregations and determines if fast path can be used
func (opt *FastPathOptimizer) DetermineStrategy(aggregations []AggregationSpec) AggregationStrategy {
strategy := AggregationStrategy{
CanUseFastPath: true,
Reason: "all_aggregations_supported",
UnsupportedSpecs: []AggregationSpec{},
}
for _, spec := range aggregations {
if !opt.engine.canUseParquetStatsForAggregation(spec) {
strategy.CanUseFastPath = false
strategy.Reason = "unsupported_aggregation_functions"
strategy.UnsupportedSpecs = append(strategy.UnsupportedSpecs, spec)
}
}
return strategy
}
// CollectDataSources gathers information about available data sources for a topic
func (opt *FastPathOptimizer) CollectDataSources(ctx context.Context, hybridScanner *HybridMessageScanner) (*TopicDataSources, error) {
dataSources := &TopicDataSources{
ParquetFiles: make(map[string][]*ParquetFileStats),
ParquetRowCount: 0,
LiveLogRowCount: 0,
PartitionsCount: 0,
}
// Discover partitions for the topic
relativePartitions, err := opt.engine.discoverTopicPartitions(hybridScanner.topic.Namespace, hybridScanner.topic.Name)
if err != nil {
return dataSources, DataSourceError{
Source: "partition_discovery",
Cause: err,
}
}
topicBasePath := fmt.Sprintf("/topics/%s/%s", hybridScanner.topic.Namespace, hybridScanner.topic.Name)
// Collect stats from each partition
for _, relPartition := range relativePartitions {
partitionPath := fmt.Sprintf("%s/%s", topicBasePath, relPartition)
// Read parquet file statistics
parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath)
if err == nil && len(parquetStats) > 0 {
dataSources.ParquetFiles[partitionPath] = parquetStats
for _, stat := range parquetStats {
dataSources.ParquetRowCount += stat.RowCount
}
}
// Count live log files
liveLogCount, _ := opt.engine.countLiveLogFiles(partitionPath, dataSources.ParquetFiles[partitionPath])
dataSources.LiveLogRowCount += liveLogCount
}
dataSources.PartitionsCount = len(relativePartitions)
return dataSources, nil
}
// AggregationComputer handles the computation of aggregations using fast path
type AggregationComputer struct {
engine *SQLEngine
}
// NewAggregationComputer creates a new aggregation computer
func NewAggregationComputer(engine *SQLEngine) *AggregationComputer {
return &AggregationComputer{engine: engine}
}
// ComputeFastPathAggregations computes aggregations using parquet statistics and live log data
func (comp *AggregationComputer) ComputeFastPathAggregations(
ctx context.Context,
aggregations []AggregationSpec,
dataSources *TopicDataSources,
partitions []string,
) ([]AggregationResult, error) {
aggResults := make([]AggregationResult, len(aggregations))
for i, spec := range aggregations {
switch spec.Function {
case "COUNT":
if spec.Column == "*" {
aggResults[i].Count = dataSources.ParquetRowCount + dataSources.LiveLogRowCount
} else {
// For specific columns, we might need to account for NULLs in the future
aggResults[i].Count = dataSources.ParquetRowCount + dataSources.LiveLogRowCount
}
case "MIN":
globalMin, err := comp.computeGlobalMin(spec, dataSources, partitions)
if err != nil {
return nil, AggregationError{
Operation: spec.Function,
Column: spec.Column,
Cause: err,
}
}
aggResults[i].Min = globalMin
case "MAX":
globalMax, err := comp.computeGlobalMax(spec, dataSources, partitions)
if err != nil {
return nil, AggregationError{
Operation: spec.Function,
Column: spec.Column,
Cause: err,
}
}
aggResults[i].Max = globalMax
default:
return nil, OptimizationError{
Strategy: "fast_path_aggregation",
Reason: fmt.Sprintf("unsupported aggregation function: %s", spec.Function),
}
}
}
return aggResults, nil
}
// computeGlobalMin computes the global minimum value across all data sources
func (comp *AggregationComputer) computeGlobalMin(spec AggregationSpec, dataSources *TopicDataSources, partitions []string) (interface{}, error) {
var globalMin interface{}
var globalMinValue *schema_pb.Value
hasParquetStats := false
// Step 1: Get minimum from parquet statistics
for _, fileStats := range dataSources.ParquetFiles {
for _, fileStat := range fileStats {
// Try case-insensitive column lookup
var colStats *ParquetColumnStats
var found bool
// First try exact match
if stats, exists := fileStat.ColumnStats[spec.Column]; exists {
colStats = stats
found = true
} else {
// Try case-insensitive lookup
for colName, stats := range fileStat.ColumnStats {
if strings.EqualFold(colName, spec.Column) {
colStats = stats
found = true
break
}
}
}
if found && colStats != nil && colStats.MinValue != nil {
if globalMinValue == nil || comp.engine.compareValues(colStats.MinValue, globalMinValue) < 0 {
globalMinValue = colStats.MinValue
extractedValue := comp.engine.extractRawValue(colStats.MinValue)
if extractedValue != nil {
globalMin = extractedValue
hasParquetStats = true
}
}
}
}
}
// Step 2: Get minimum from live log data (only if no live logs or if we need to compare)
if dataSources.LiveLogRowCount > 0 {
for _, partition := range partitions {
partitionParquetSources := make(map[string]bool)
if partitionFileStats, exists := dataSources.ParquetFiles[partition]; exists {
partitionParquetSources = comp.engine.extractParquetSourceFiles(partitionFileStats)
}
liveLogMin, _, err := comp.engine.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources)
if err != nil {
continue // Skip partitions with errors
}
if liveLogMin != nil {
if globalMin == nil {
globalMin = liveLogMin
} else {
liveLogSchemaValue := comp.engine.convertRawValueToSchemaValue(liveLogMin)
if liveLogSchemaValue != nil && comp.engine.compareValues(liveLogSchemaValue, globalMinValue) < 0 {
globalMin = liveLogMin
globalMinValue = liveLogSchemaValue
}
}
}
}
}
// Step 3: Handle system columns if no regular data found
if globalMin == nil && !hasParquetStats {
globalMin = comp.engine.getSystemColumnGlobalMin(spec.Column, dataSources.ParquetFiles)
}
return globalMin, nil
}
// computeGlobalMax computes the global maximum value across all data sources
func (comp *AggregationComputer) computeGlobalMax(spec AggregationSpec, dataSources *TopicDataSources, partitions []string) (interface{}, error) {
var globalMax interface{}
var globalMaxValue *schema_pb.Value
hasParquetStats := false
// Step 1: Get maximum from parquet statistics
for _, fileStats := range dataSources.ParquetFiles {
for _, fileStat := range fileStats {
// Try case-insensitive column lookup
var colStats *ParquetColumnStats
var found bool
// First try exact match
if stats, exists := fileStat.ColumnStats[spec.Column]; exists {
colStats = stats
found = true
} else {
// Try case-insensitive lookup
for colName, stats := range fileStat.ColumnStats {
if strings.EqualFold(colName, spec.Column) {
colStats = stats
found = true
break
}
}
}
if found && colStats != nil && colStats.MaxValue != nil {
if globalMaxValue == nil || comp.engine.compareValues(colStats.MaxValue, globalMaxValue) > 0 {
globalMaxValue = colStats.MaxValue
extractedValue := comp.engine.extractRawValue(colStats.MaxValue)
if extractedValue != nil {
globalMax = extractedValue
hasParquetStats = true
}
}
}
}
}
// Step 2: Get maximum from live log data (only if live logs exist)
if dataSources.LiveLogRowCount > 0 {
for _, partition := range partitions {
partitionParquetSources := make(map[string]bool)
if partitionFileStats, exists := dataSources.ParquetFiles[partition]; exists {
partitionParquetSources = comp.engine.extractParquetSourceFiles(partitionFileStats)
}
_, liveLogMax, err := comp.engine.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources)
if err != nil {
continue // Skip partitions with errors
}
if liveLogMax != nil {
if globalMax == nil {
globalMax = liveLogMax
} else {
liveLogSchemaValue := comp.engine.convertRawValueToSchemaValue(liveLogMax)
if liveLogSchemaValue != nil && comp.engine.compareValues(liveLogSchemaValue, globalMaxValue) > 0 {
globalMax = liveLogMax
globalMaxValue = liveLogSchemaValue
}
}
}
}
}
// Step 3: Handle system columns if no regular data found
if globalMax == nil && !hasParquetStats {
globalMax = comp.engine.getSystemColumnGlobalMax(spec.Column, dataSources.ParquetFiles)
}
return globalMax, nil
}
// executeAggregationQuery handles SELECT queries with aggregation functions
func (e *SQLEngine) executeAggregationQuery(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, stmt *sqlparser.Select) (*QueryResult, error) {
// Parse WHERE clause for filtering
var predicate func(*schema_pb.RecordValue) bool
var err error
if stmt.Where != nil {
predicate, err = e.buildPredicate(stmt.Where.Expr)
if err != nil {
return &QueryResult{Error: err}, err
}
}
// Extract time filters for optimization
startTimeNs, stopTimeNs := int64(0), int64(0)
if stmt.Where != nil {
startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr)
}
// FAST PATH: Try to use parquet statistics for optimization
// This can be ~130x faster than scanning all data
if stmt.Where == nil { // Only optimize when no complex WHERE clause
fastResult, canOptimize := e.tryFastParquetAggregation(ctx, hybridScanner, aggregations)
if canOptimize {
fmt.Printf("Using fast hybrid statistics for aggregation (parquet stats + live log counts)\n")
return fastResult, nil
}
}
// SLOW PATH: Fall back to full table scan
fmt.Printf("Using full table scan for aggregation (parquet optimization not applicable)\n")
// Build scan options for full table scan (aggregations need all data)
hybridScanOptions := HybridScanOptions{
StartTimeNs: startTimeNs,
StopTimeNs: stopTimeNs,
Limit: 0, // No limit for aggregations - need all data
Predicate: predicate,
}
// Execute the hybrid scan to get all matching records
results, err := hybridScanner.Scan(ctx, hybridScanOptions)
if err != nil {
return &QueryResult{Error: err}, err
}
// Compute aggregations
aggResults := e.computeAggregations(results, aggregations)
// Build result set
columns := make([]string, len(aggregations))
row := make([]sqltypes.Value, len(aggregations))
for i, spec := range aggregations {
columns[i] = spec.Alias
row[i] = e.formatAggregationResult(spec, aggResults[i])
}
return &QueryResult{
Columns: columns,
Rows: [][]sqltypes.Value{row},
}, nil
}
// tryFastParquetAggregation attempts to compute aggregations using hybrid approach:
// - Use parquet metadata for parquet files
// - Count live log files for live data
// - Combine both for accurate results per partition
// Returns (result, canOptimize) where canOptimize=true means the hybrid fast path was used
func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec) (*QueryResult, bool) {
// Use the new modular components
optimizer := NewFastPathOptimizer(e)
computer := NewAggregationComputer(e)
// Step 1: Determine strategy
strategy := optimizer.DetermineStrategy(aggregations)
if !strategy.CanUseFastPath {
return nil, false
}
// Step 2: Collect data sources
dataSources, err := optimizer.CollectDataSources(ctx, hybridScanner)
if err != nil {
return nil, false
}
// Build partition list for aggregation computer
relativePartitions, err := e.discoverTopicPartitions(hybridScanner.topic.Namespace, hybridScanner.topic.Name)
if err != nil {
return nil, false
}
topicBasePath := fmt.Sprintf("/topics/%s/%s", hybridScanner.topic.Namespace, hybridScanner.topic.Name)
partitions := make([]string, len(relativePartitions))
for i, relPartition := range relativePartitions {
partitions[i] = fmt.Sprintf("%s/%s", topicBasePath, relPartition)
}
// Debug: Show the hybrid optimization results
if dataSources.ParquetRowCount > 0 || dataSources.LiveLogRowCount > 0 {
partitionsWithLiveLogs := 0
if dataSources.LiveLogRowCount > 0 {
partitionsWithLiveLogs = 1 // Simplified for now
}
fmt.Printf("Hybrid fast aggregation with deduplication: %d parquet rows + %d deduplicated live log rows from %d partitions\n",
dataSources.ParquetRowCount, dataSources.LiveLogRowCount, partitionsWithLiveLogs)
}
// Step 3: Compute aggregations using fast path
aggResults, err := computer.ComputeFastPathAggregations(ctx, aggregations, dataSources, partitions)
if err != nil {
return nil, false
}
// Step 4: Build final query result
columns := make([]string, len(aggregations))
row := make([]sqltypes.Value, len(aggregations))
for i, spec := range aggregations {
columns[i] = spec.Alias
row[i] = e.formatAggregationResult(spec, aggResults[i])
}
result := &QueryResult{
Columns: columns,
Rows: [][]sqltypes.Value{row},
}
return result, true
}
// computeAggregations computes aggregation results from a full table scan
func (e *SQLEngine) computeAggregations(results []HybridScanResult, aggregations []AggregationSpec) []AggregationResult {
aggResults := make([]AggregationResult, len(aggregations))
for i, spec := range aggregations {
switch spec.Function {
case "COUNT":
if spec.Column == "*" {
aggResults[i].Count = int64(len(results))
} else {
count := int64(0)
for _, result := range results {
if value := e.findColumnValue(result, spec.Column); value != nil && !e.isNullValue(value) {
count++
}
}
aggResults[i].Count = count
}
case "SUM":
sum := float64(0)
for _, result := range results {
if value := e.findColumnValue(result, spec.Column); value != nil {
if numValue := e.convertToNumber(value); numValue != nil {
sum += *numValue
}
}
}
aggResults[i].Sum = sum
case "AVG":
sum := float64(0)
count := int64(0)
for _, result := range results {
if value := e.findColumnValue(result, spec.Column); value != nil {
if numValue := e.convertToNumber(value); numValue != nil {
sum += *numValue
count++
}
}
}
if count > 0 {
aggResults[i].Sum = sum / float64(count) // Store average in Sum field
aggResults[i].Count = count
}
case "MIN":
var min interface{}
var minValue *schema_pb.Value
for _, result := range results {
if value := e.findColumnValue(result, spec.Column); value != nil {
if minValue == nil || e.compareValues(value, minValue) < 0 {
minValue = value
min = e.extractRawValue(value)
}
}
}
aggResults[i].Min = min
case "MAX":
var max interface{}
var maxValue *schema_pb.Value
for _, result := range results {
if value := e.findColumnValue(result, spec.Column); value != nil {
if maxValue == nil || e.compareValues(value, maxValue) > 0 {
maxValue = value
max = e.extractRawValue(value)
}
}
}
aggResults[i].Max = max
}
}
return aggResults
}
// canUseParquetStatsForAggregation determines if an aggregation can be optimized with parquet stats
func (e *SQLEngine) canUseParquetStatsForAggregation(spec AggregationSpec) bool {
switch spec.Function {
case "COUNT":
return spec.Column == "*" || e.isSystemColumn(spec.Column) || e.isRegularColumn(spec.Column)
case "MIN", "MAX":
return e.isSystemColumn(spec.Column) || e.isRegularColumn(spec.Column)
case "SUM", "AVG":
// These require scanning actual values, not just min/max
return false
default:
return false
}
}

View File

@@ -0,0 +1,217 @@
package engine
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
)
// formatAggregationResult formats an aggregation result into a SQL value
func (e *SQLEngine) formatAggregationResult(spec AggregationSpec, result AggregationResult) sqltypes.Value {
switch spec.Function {
case "COUNT":
return sqltypes.NewInt64(result.Count)
case "SUM":
return sqltypes.NewFloat64(result.Sum)
case "AVG":
return sqltypes.NewFloat64(result.Sum) // Sum contains the average for AVG
case "MIN":
if result.Min != nil {
return e.convertRawValueToSQL(result.Min)
}
return sqltypes.NULL
case "MAX":
if result.Max != nil {
return e.convertRawValueToSQL(result.Max)
}
return sqltypes.NULL
}
return sqltypes.NULL
}
// convertRawValueToSQL converts a raw Go value to a SQL value
func (e *SQLEngine) convertRawValueToSQL(value interface{}) sqltypes.Value {
switch v := value.(type) {
case int32:
return sqltypes.NewInt32(v)
case int64:
return sqltypes.NewInt64(v)
case float32:
return sqltypes.NewFloat32(v)
case float64:
return sqltypes.NewFloat64(v)
case string:
return sqltypes.NewVarChar(v)
case bool:
if v {
return sqltypes.NewVarChar("1")
}
return sqltypes.NewVarChar("0")
}
return sqltypes.NULL
}
// extractRawValue extracts the raw Go value from a schema_pb.Value
func (e *SQLEngine) extractRawValue(value *schema_pb.Value) interface{} {
switch v := value.Kind.(type) {
case *schema_pb.Value_Int32Value:
return v.Int32Value
case *schema_pb.Value_Int64Value:
return v.Int64Value
case *schema_pb.Value_FloatValue:
return v.FloatValue
case *schema_pb.Value_DoubleValue:
return v.DoubleValue
case *schema_pb.Value_StringValue:
return v.StringValue
case *schema_pb.Value_BoolValue:
return v.BoolValue
case *schema_pb.Value_BytesValue:
return string(v.BytesValue) // Convert bytes to string for comparison
}
return nil
}
// compareValues compares two schema_pb.Value objects
func (e *SQLEngine) compareValues(value1 *schema_pb.Value, value2 *schema_pb.Value) int {
if value2 == nil {
return 1 // value1 > nil
}
raw1 := e.extractRawValue(value1)
raw2 := e.extractRawValue(value2)
if raw1 == nil {
return -1
}
if raw2 == nil {
return 1
}
// Simple comparison - in a full implementation this would handle type coercion
switch v1 := raw1.(type) {
case int32:
if v2, ok := raw2.(int32); ok {
if v1 < v2 {
return -1
} else if v1 > v2 {
return 1
}
return 0
}
case int64:
if v2, ok := raw2.(int64); ok {
if v1 < v2 {
return -1
} else if v1 > v2 {
return 1
}
return 0
}
case float32:
if v2, ok := raw2.(float32); ok {
if v1 < v2 {
return -1
} else if v1 > v2 {
return 1
}
return 0
}
case float64:
if v2, ok := raw2.(float64); ok {
if v1 < v2 {
return -1
} else if v1 > v2 {
return 1
}
return 0
}
case string:
if v2, ok := raw2.(string); ok {
if v1 < v2 {
return -1
} else if v1 > v2 {
return 1
}
return 0
}
case bool:
if v2, ok := raw2.(bool); ok {
if v1 == v2 {
return 0
} else if v1 && !v2 {
return 1
}
return -1
}
}
return 0
}
// convertRawValueToSchemaValue converts raw Go values back to schema_pb.Value for comparison
func (e *SQLEngine) convertRawValueToSchemaValue(rawValue interface{}) *schema_pb.Value {
switch v := rawValue.(type) {
case int32:
return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: v}}
case int64:
return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v}}
case float32:
return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: v}}
case float64:
return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v}}
case string:
return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v}}
case bool:
return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: v}}
case []byte:
return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: v}}
default:
// Convert other types to string as fallback
return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: fmt.Sprintf("%v", v)}}
}
}
// convertJSONValueToSchemaValue converts JSON values to schema_pb.Value
func (e *SQLEngine) convertJSONValueToSchemaValue(jsonValue interface{}) *schema_pb.Value {
switch v := jsonValue.(type) {
case string:
return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v}}
case float64:
// JSON numbers are always float64, try to detect if it's actually an integer
if v == float64(int64(v)) {
return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: int64(v)}}
}
return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v}}
case bool:
return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: v}}
case nil:
return nil
default:
// Convert other types to string
return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: fmt.Sprintf("%v", v)}}
}
}
// Helper functions for aggregation processing
// isNullValue checks if a schema_pb.Value is null or empty
func (e *SQLEngine) isNullValue(value *schema_pb.Value) bool {
return value == nil || value.Kind == nil
}
// convertToNumber converts a schema_pb.Value to a float64 for numeric operations
func (e *SQLEngine) convertToNumber(value *schema_pb.Value) *float64 {
switch v := value.Kind.(type) {
case *schema_pb.Value_Int32Value:
result := float64(v.Int32Value)
return &result
case *schema_pb.Value_Int64Value:
result := float64(v.Int64Value)
return &result
case *schema_pb.Value_FloatValue:
result := float64(v.FloatValue)
return &result
case *schema_pb.Value_DoubleValue:
return &v.DoubleValue
}
return nil
}

View File

@@ -31,30 +31,6 @@ type SQLEngine struct {
catalog *SchemaCatalog
}
// QueryExecutionPlan contains information about how a query was executed
type QueryExecutionPlan struct {
QueryType string `json:"query_type"` // SELECT, INSERT, etc.
ExecutionStrategy string `json:"execution_strategy"` // fast_path, full_scan, hybrid
DataSources []string `json:"data_sources"` // parquet_files, live_logs
PartitionsScanned int `json:"partitions_scanned"`
ParquetFilesScanned int `json:"parquet_files_scanned"`
LiveLogFilesScanned int `json:"live_log_files_scanned"`
TotalRowsProcessed int64 `json:"total_rows_processed"`
OptimizationsUsed []string `json:"optimizations_used"` // parquet_stats, predicate_pushdown, etc.
TimeRangeFilters map[string]interface{} `json:"time_range_filters,omitempty"`
Aggregations []string `json:"aggregations,omitempty"`
ExecutionTimeMs float64 `json:"execution_time_ms"`
Details map[string]interface{} `json:"details,omitempty"`
}
// QueryResult represents the result of a SQL query execution
type QueryResult struct {
Columns []string `json:"columns"`
Rows [][]sqltypes.Value `json:"rows"`
Error error `json:"error,omitempty"`
ExecutionPlan *QueryExecutionPlan `json:"execution_plan,omitempty"`
}
// NewSQLEngine creates a new SQL execution engine
// Uses master address for service discovery and initialization
func NewSQLEngine(masterAddress string) *SQLEngine {
@@ -1515,152 +1491,6 @@ func (e *SQLEngine) dropTable(ctx context.Context, stmt *sqlparser.DDL) (*QueryR
return result, nil
}
// AggregationSpec defines an aggregation function to be computed
type AggregationSpec struct {
Function string // COUNT, SUM, AVG, MIN, MAX
Column string // Column name, or "*" for COUNT(*)
Alias string // Optional alias for the result column
Distinct bool // Support for DISTINCT keyword
}
// AggregationResult holds the computed result of an aggregation
type AggregationResult struct {
Count int64
Sum float64
Min interface{}
Max interface{}
}
// AggregationStrategy represents the strategy for executing aggregations
type AggregationStrategy struct {
CanUseFastPath bool
Reason string
UnsupportedSpecs []AggregationSpec
}
// TopicDataSources represents the data sources available for a topic
type TopicDataSources struct {
ParquetFiles map[string][]*ParquetFileStats // partitionPath -> parquet file stats
ParquetRowCount int64
LiveLogRowCount int64
PartitionsCount int
}
// FastPathOptimizer handles fast path aggregation optimization decisions
type FastPathOptimizer struct {
engine *SQLEngine
}
// Error types for better error handling and testing
type AggregationError struct {
Operation string
Column string
Cause error
}
func (e AggregationError) Error() string {
return fmt.Sprintf("aggregation error in %s(%s): %v", e.Operation, e.Column, e.Cause)
}
type DataSourceError struct {
Source string
Cause error
}
func (e DataSourceError) Error() string {
return fmt.Sprintf("data source error in %s: %v", e.Source, e.Cause)
}
type OptimizationError struct {
Strategy string
Reason string
}
func (e OptimizationError) Error() string {
return fmt.Sprintf("optimization failed for %s: %s", e.Strategy, e.Reason)
}
// NewFastPathOptimizer creates a new fast path optimizer
func NewFastPathOptimizer(engine *SQLEngine) *FastPathOptimizer {
return &FastPathOptimizer{engine: engine}
}
// DetermineStrategy analyzes aggregations and determines if fast path can be used
func (opt *FastPathOptimizer) DetermineStrategy(aggregations []AggregationSpec) AggregationStrategy {
strategy := AggregationStrategy{
CanUseFastPath: true,
Reason: "all_aggregations_supported",
UnsupportedSpecs: []AggregationSpec{},
}
for _, spec := range aggregations {
if !opt.engine.canUseParquetStatsForAggregation(spec) {
strategy.CanUseFastPath = false
strategy.Reason = "unsupported_aggregation_functions"
strategy.UnsupportedSpecs = append(strategy.UnsupportedSpecs, spec)
}
}
return strategy
}
// CollectDataSources gathers information about available data sources for a topic
func (opt *FastPathOptimizer) CollectDataSources(ctx context.Context, hybridScanner *HybridMessageScanner) (*TopicDataSources, error) {
// Get all partitions for this topic
relativePartitions, err := opt.engine.discoverTopicPartitions(hybridScanner.topic.Namespace, hybridScanner.topic.Name)
if err != nil {
return nil, DataSourceError{
Source: fmt.Sprintf("partition_discovery:%s.%s", hybridScanner.topic.Namespace, hybridScanner.topic.Name),
Cause: err,
}
}
// Convert relative partition paths to full paths
topicBasePath := fmt.Sprintf("/topics/%s/%s", hybridScanner.topic.Namespace, hybridScanner.topic.Name)
partitions := make([]string, len(relativePartitions))
for i, relPartition := range relativePartitions {
partitions[i] = fmt.Sprintf("%s/%s", topicBasePath, relPartition)
}
// Collect statistics from all partitions
dataSources := &TopicDataSources{
ParquetFiles: make(map[string][]*ParquetFileStats),
ParquetRowCount: 0,
LiveLogRowCount: 0,
PartitionsCount: len(partitions),
}
for _, partition := range partitions {
partitionPath := partition
// Get parquet file statistics
fileStats, err := hybridScanner.ReadParquetStatistics(partitionPath)
if err != nil {
fileStats = []*ParquetFileStats{} // Empty stats, but continue
}
if len(fileStats) > 0 {
dataSources.ParquetFiles[partitionPath] = fileStats
for _, fileStat := range fileStats {
dataSources.ParquetRowCount += fileStat.RowCount
}
}
// Get parquet source files for deduplication
parquetSourceFiles := opt.engine.extractParquetSourceFiles(fileStats)
// Count live log rows (excluding parquet-converted files)
liveLogRowCount, err := opt.engine.countLiveLogRowsExcludingParquetSources(partitionPath, parquetSourceFiles)
if err != nil {
liveLogRowCount = 0 // No live logs is acceptable
}
dataSources.LiveLogRowCount += liveLogRowCount
}
return dataSources, nil
}
// AggregationComputer handles the computation of aggregations using fast path
type AggregationComputer struct {
engine *SQLEngine

View File

@@ -0,0 +1,36 @@
package engine
import "fmt"
// Error types for better error handling and testing
// AggregationError represents errors that occur during aggregation computation
type AggregationError struct {
Operation string
Column string
Cause error
}
func (e AggregationError) Error() string {
return fmt.Sprintf("aggregation error in %s(%s): %v", e.Operation, e.Column, e.Cause)
}
// DataSourceError represents errors that occur when accessing data sources
type DataSourceError struct {
Source string
Cause error
}
func (e DataSourceError) Error() string {
return fmt.Sprintf("data source error in %s: %v", e.Source, e.Cause)
}
// OptimizationError represents errors that occur during query optimization
type OptimizationError struct {
Strategy string
Reason string
}
func (e OptimizationError) Error() string {
return fmt.Sprintf("optimization failed for %s: %s", e.Strategy, e.Reason)
}

View File

@@ -0,0 +1,170 @@
package engine
import (
"regexp"
"strconv"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
// isSystemColumn checks if a column is a system column (_timestamp_ns, _key, _source)
func (e *SQLEngine) isSystemColumn(columnName string) bool {
lowerName := strings.ToLower(columnName)
return lowerName == "_timestamp_ns" || lowerName == "timestamp_ns" ||
lowerName == "_key" || lowerName == "key" ||
lowerName == "_source" || lowerName == "source"
}
// isRegularColumn checks if a column might be a regular data column (placeholder)
func (e *SQLEngine) isRegularColumn(columnName string) bool {
// For now, assume any non-system column is a regular column
return !e.isSystemColumn(columnName)
}
// getSystemColumnGlobalMin computes global min for system columns using file metadata
func (e *SQLEngine) getSystemColumnGlobalMin(columnName string, allFileStats map[string][]*ParquetFileStats) interface{} {
lowerName := strings.ToLower(columnName)
switch lowerName {
case "_timestamp_ns", "timestamp_ns":
// For timestamps, find the earliest timestamp across all files
// This should match what's in the Extended["min"] metadata
var minTimestamp *int64
for _, fileStats := range allFileStats {
for _, fileStat := range fileStats {
// Extract timestamp from filename (format: YYYY-MM-DD-HH-MM-SS.parquet)
timestamp := e.extractTimestampFromFilename(fileStat.FileName)
if timestamp != 0 {
if minTimestamp == nil || timestamp < *minTimestamp {
minTimestamp = &timestamp
}
}
}
}
if minTimestamp != nil {
return *minTimestamp
}
case "_key", "key":
// For keys, we'd need to read the actual parquet column stats
// Fall back to scanning if not available in our current stats
return nil
case "_source", "source":
// Source is always "parquet_archive" for parquet files
return "parquet_archive"
}
return nil
}
// getSystemColumnGlobalMax computes global max for system columns using file metadata
func (e *SQLEngine) getSystemColumnGlobalMax(columnName string, allFileStats map[string][]*ParquetFileStats) interface{} {
lowerName := strings.ToLower(columnName)
switch lowerName {
case "_timestamp_ns", "timestamp_ns":
// For timestamps, find the latest timestamp across all files
// This should match what's in the Extended["max"] metadata
var maxTimestamp *int64
for _, fileStats := range allFileStats {
for _, fileStat := range fileStats {
// Extract timestamp from filename (format: YYYY-MM-DD-HH-MM-SS.parquet)
timestamp := e.extractTimestampFromFilename(fileStat.FileName)
if timestamp != 0 {
if maxTimestamp == nil || timestamp > *maxTimestamp {
maxTimestamp = &timestamp
}
}
}
}
if maxTimestamp != nil {
return *maxTimestamp
}
case "_key", "key":
// For keys, we'd need to read the actual parquet column stats
// Fall back to scanning if not available in our current stats
return nil
case "_source", "source":
// Source is always "parquet_archive" for parquet files
return "parquet_archive"
}
return nil
}
// extractTimestampFromFilename extracts timestamp from parquet filename
func (e *SQLEngine) extractTimestampFromFilename(filename string) int64 {
// Expected format: YYYY-MM-DD-HH-MM-SS.parquet or similar
// Try to parse timestamp from filename
re := regexp.MustCompile(`(\d{4}-\d{2}-\d{2}-\d{2}-\d{2}-\d{2})`)
matches := re.FindStringSubmatch(filename)
if len(matches) > 1 {
timestampStr := matches[1]
// Convert to time and then to nanoseconds
t, err := time.Parse("2006-01-02-15-04-05", timestampStr)
if err == nil {
return t.UnixNano()
}
}
// Fallback: try to parse as unix timestamp if filename is numeric
if timestampStr := strings.TrimSuffix(filename, ".parquet"); timestampStr != filename {
if timestamp, err := strconv.ParseInt(timestampStr, 10, 64); err == nil {
// Assume it's already in nanoseconds
return timestamp
}
}
return 0
}
// findColumnValue performs case-insensitive lookup of column values
// Now includes support for system columns stored in HybridScanResult
func (e *SQLEngine) findColumnValue(result HybridScanResult, columnName string) *schema_pb.Value {
lowerName := strings.ToLower(columnName)
// Check system columns first
switch lowerName {
case "_timestamp_ns", "timestamp_ns":
return &schema_pb.Value{
Kind: &schema_pb.Value_Int64Value{Int64Value: result.Timestamp},
}
case "_key", "key":
return &schema_pb.Value{
Kind: &schema_pb.Value_BytesValue{BytesValue: result.Key},
}
case "_source", "source":
return &schema_pb.Value{
Kind: &schema_pb.Value_StringValue{StringValue: result.Source},
}
}
// Check regular columns in the record data
if result.RecordValue != nil {
recordValue, ok := result.RecordValue.Kind.(*schema_pb.Value_RecordValue)
if !ok {
return nil
}
if recordValue.RecordValue.Fields != nil {
// Try exact match first
if value, exists := recordValue.RecordValue.Fields[columnName]; exists {
return value
}
// Try case-insensitive match
for fieldName, value := range recordValue.RecordValue.Fields {
if strings.EqualFold(fieldName, columnName) {
return value
}
}
}
}
return nil
}

View File

@@ -0,0 +1,62 @@
package engine
import (
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
)
// QueryExecutionPlan contains information about how a query was executed
type QueryExecutionPlan struct {
QueryType string
ExecutionStrategy string `json:"execution_strategy"` // fast_path, full_scan, hybrid
DataSources []string `json:"data_sources"` // parquet_files, live_logs
PartitionsScanned int `json:"partitions_scanned"`
ParquetFilesScanned int `json:"parquet_files_scanned"`
LiveLogFilesScanned int `json:"live_log_files_scanned"`
TotalRowsProcessed int64 `json:"total_rows_processed"`
OptimizationsUsed []string `json:"optimizations_used"` // parquet_stats, predicate_pushdown, etc.
TimeRangeFilters map[string]interface{} `json:"time_range_filters,omitempty"`
Aggregations []string `json:"aggregations,omitempty"`
ExecutionTimeMs float64 `json:"execution_time_ms"`
Details map[string]interface{} `json:"details,omitempty"`
}
// QueryResult represents the result of a SQL query execution
type QueryResult struct {
Columns []string `json:"columns"`
Rows [][]sqltypes.Value `json:"rows"`
Error error `json:"error,omitempty"`
ExecutionPlan *QueryExecutionPlan `json:"execution_plan,omitempty"`
}
// ParquetColumnStats holds statistics for a single column in a Parquet file
type ParquetColumnStats struct {
ColumnName string
MinValue *schema_pb.Value
MaxValue *schema_pb.Value
NullCount int64
RowCount int64
}
// ParquetFileStats holds statistics for a single Parquet file
type ParquetFileStats struct {
FileName string
RowCount int64
ColumnStats map[string]*ParquetColumnStats
}
// HybridScanResult represents a single record from hybrid scanning
type HybridScanResult struct {
RecordValue *schema_pb.Value
Source string // "live_log", "parquet_archive"
Timestamp int64
Key []byte
}
// HybridScanOptions configures how the hybrid scanner operates
type HybridScanOptions struct {
StartTimeNs int64
StopTimeNs int64
Limit int
Predicate func(*schema_pb.RecordValue) bool
}