mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-09-23 01:03:32 +08:00
read broker, logs, and parquet files
This commit is contained in:
@@ -9,17 +9,19 @@ import (
|
||||
func GenMergedReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic.Partition) log_buffer.LogReadFromDiskFuncType {
|
||||
fromParquetFn := GenParquetReadFunc(filerClient, t, p)
|
||||
readLogDirectFn := GenLogOnDiskReadFunc(filerClient, t, p)
|
||||
return mergeReadFuncs(fromParquetFn, readLogDirectFn)
|
||||
// Reversed order: live logs first (recent), then Parquet files (historical)
|
||||
// This provides better performance for real-time analytics queries
|
||||
return mergeReadFuncs(readLogDirectFn, fromParquetFn)
|
||||
}
|
||||
|
||||
func mergeReadFuncs(fromParquetFn, readLogDirectFn log_buffer.LogReadFromDiskFuncType) log_buffer.LogReadFromDiskFuncType {
|
||||
var exhaustedParquet bool
|
||||
func mergeReadFuncs(readLogDirectFn, fromParquetFn log_buffer.LogReadFromDiskFuncType) log_buffer.LogReadFromDiskFuncType {
|
||||
var exhaustedLiveLogs bool
|
||||
var lastProcessedPosition log_buffer.MessagePosition
|
||||
return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) {
|
||||
if !exhaustedParquet {
|
||||
// glog.V(4).Infof("reading from parquet startPosition: %v\n", startPosition.UTC())
|
||||
lastReadPosition, isDone, err = fromParquetFn(startPosition, stopTsNs, eachLogEntryFn)
|
||||
// glog.V(4).Infof("read from parquet: %v %v %v %v\n", startPosition, lastReadPosition, isDone, err)
|
||||
if !exhaustedLiveLogs {
|
||||
// glog.V(4).Infof("reading from live logs startPosition: %v\n", startPosition.UTC())
|
||||
lastReadPosition, isDone, err = readLogDirectFn(startPosition, stopTsNs, eachLogEntryFn)
|
||||
// glog.V(4).Infof("read from live logs: %v %v %v %v\n", startPosition, lastReadPosition, isDone, err)
|
||||
if isDone {
|
||||
isDone = false
|
||||
}
|
||||
@@ -28,14 +30,14 @@ func mergeReadFuncs(fromParquetFn, readLogDirectFn log_buffer.LogReadFromDiskFun
|
||||
}
|
||||
lastProcessedPosition = lastReadPosition
|
||||
}
|
||||
exhaustedParquet = true
|
||||
exhaustedLiveLogs = true
|
||||
|
||||
if startPosition.Before(lastProcessedPosition.Time) {
|
||||
startPosition = lastProcessedPosition
|
||||
}
|
||||
|
||||
// glog.V(4).Infof("reading from direct log startPosition: %v\n", startPosition.UTC())
|
||||
lastReadPosition, isDone, err = readLogDirectFn(startPosition, stopTsNs, eachLogEntryFn)
|
||||
// glog.V(4).Infof("reading from parquet startPosition: %v\n", startPosition.UTC())
|
||||
lastReadPosition, isDone, err = fromParquetFn(startPosition, stopTsNs, eachLogEntryFn)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@@ -500,8 +500,8 @@ func (e *SQLEngine) executeSelectStatementWithPlan(ctx context.Context, stmt *sq
|
||||
// Execute aggregation query with plan tracking
|
||||
result, err = e.executeAggregationQueryWithPlan(ctx, hybridScanner, aggregations, stmt, plan)
|
||||
} else {
|
||||
// Regular SELECT query
|
||||
result, err = e.executeSelectStatement(ctx, stmt)
|
||||
// Regular SELECT query with plan tracking
|
||||
result, err = e.executeSelectStatementWithBrokerStats(ctx, stmt, plan)
|
||||
}
|
||||
|
||||
if err == nil && result != nil {
|
||||
@@ -834,6 +834,198 @@ func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *sqlparser.
|
||||
return hybridScanner.ConvertToSQLResult(results, columns), nil
|
||||
}
|
||||
|
||||
// executeSelectStatementWithBrokerStats handles SELECT queries with broker buffer statistics capture
|
||||
// This is used by EXPLAIN queries to capture complete data source information including broker memory
|
||||
func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, stmt *sqlparser.Select, plan *QueryExecutionPlan) (*QueryResult, error) {
|
||||
// Parse FROM clause to get table (topic) information
|
||||
if len(stmt.From) != 1 {
|
||||
err := fmt.Errorf("SELECT supports single table queries only")
|
||||
return &QueryResult{Error: err}, err
|
||||
}
|
||||
|
||||
// Extract table reference
|
||||
var database, tableName string
|
||||
switch table := stmt.From[0].(type) {
|
||||
case *sqlparser.AliasedTableExpr:
|
||||
switch tableExpr := table.Expr.(type) {
|
||||
case sqlparser.TableName:
|
||||
tableName = tableExpr.Name.String()
|
||||
if tableExpr.Qualifier.String() != "" {
|
||||
database = tableExpr.Qualifier.String()
|
||||
}
|
||||
default:
|
||||
err := fmt.Errorf("unsupported table expression: %T", tableExpr)
|
||||
return &QueryResult{Error: err}, err
|
||||
}
|
||||
default:
|
||||
err := fmt.Errorf("unsupported FROM clause: %T", table)
|
||||
return &QueryResult{Error: err}, err
|
||||
}
|
||||
|
||||
// Use current database context if not specified
|
||||
if database == "" {
|
||||
database = e.catalog.GetCurrentDatabase()
|
||||
if database == "" {
|
||||
database = "default"
|
||||
}
|
||||
}
|
||||
|
||||
// Auto-discover and register topic if not already in catalog
|
||||
if _, err := e.catalog.GetTableInfo(database, tableName); err != nil {
|
||||
// Topic not in catalog, try to discover and register it
|
||||
if regErr := e.discoverAndRegisterTopic(ctx, database, tableName); regErr != nil {
|
||||
// Return error immediately for non-existent topics instead of falling back to sample data
|
||||
return &QueryResult{Error: regErr}, regErr
|
||||
}
|
||||
}
|
||||
|
||||
// Create HybridMessageScanner for the topic (reads both live logs + Parquet files)
|
||||
// Get filerClient from broker connection (works with both real and mock brokers)
|
||||
var filerClient filer_pb.FilerClient
|
||||
var filerClientErr error
|
||||
filerClient, filerClientErr = e.catalog.brokerClient.GetFilerClient()
|
||||
if filerClientErr != nil {
|
||||
// Return error if filer client is not available for topic access
|
||||
return &QueryResult{Error: filerClientErr}, filerClientErr
|
||||
}
|
||||
|
||||
hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, database, tableName)
|
||||
if err != nil {
|
||||
// Return error for topic access issues instead of misleading sample data
|
||||
topicErr := fmt.Errorf("failed to access topic %s.%s: %v", database, tableName, err)
|
||||
return &QueryResult{Error: topicErr}, topicErr
|
||||
}
|
||||
|
||||
// Parse SELECT columns and detect aggregation functions
|
||||
var columns []string
|
||||
var aggregations []AggregationSpec
|
||||
selectAll := false
|
||||
hasAggregations := false
|
||||
|
||||
for _, selectExpr := range stmt.SelectExprs {
|
||||
switch expr := selectExpr.(type) {
|
||||
case *sqlparser.StarExpr:
|
||||
selectAll = true
|
||||
case *sqlparser.AliasedExpr:
|
||||
switch col := expr.Expr.(type) {
|
||||
case *sqlparser.ColName:
|
||||
columns = append(columns, col.Name.String())
|
||||
case *sqlparser.FuncExpr:
|
||||
// Handle aggregation functions
|
||||
aggSpec, err := e.parseAggregationFunction(col, expr)
|
||||
if err != nil {
|
||||
return &QueryResult{Error: err}, err
|
||||
}
|
||||
aggregations = append(aggregations, *aggSpec)
|
||||
hasAggregations = true
|
||||
default:
|
||||
err := fmt.Errorf("unsupported SELECT expression: %T", col)
|
||||
return &QueryResult{Error: err}, err
|
||||
}
|
||||
default:
|
||||
err := fmt.Errorf("unsupported SELECT expression: %T", expr)
|
||||
return &QueryResult{Error: err}, err
|
||||
}
|
||||
}
|
||||
|
||||
// If we have aggregations, use aggregation query path
|
||||
if hasAggregations {
|
||||
return e.executeAggregationQuery(ctx, hybridScanner, aggregations, stmt)
|
||||
}
|
||||
|
||||
// Parse WHERE clause for predicate pushdown
|
||||
var predicate func(*schema_pb.RecordValue) bool
|
||||
if stmt.Where != nil {
|
||||
predicate, err = e.buildPredicate(stmt.Where.Expr)
|
||||
if err != nil {
|
||||
return &QueryResult{Error: err}, err
|
||||
}
|
||||
}
|
||||
|
||||
// Parse LIMIT clause
|
||||
limit := 0
|
||||
if stmt.Limit != nil && stmt.Limit.Rowcount != nil {
|
||||
switch limitExpr := stmt.Limit.Rowcount.(type) {
|
||||
case *sqlparser.SQLVal:
|
||||
if limitExpr.Type == sqlparser.IntVal {
|
||||
var parseErr error
|
||||
limit64, parseErr := strconv.ParseInt(string(limitExpr.Val), 10, 64)
|
||||
if parseErr != nil {
|
||||
return &QueryResult{Error: parseErr}, parseErr
|
||||
}
|
||||
if limit64 > math.MaxInt32 || limit64 < 0 {
|
||||
return &QueryResult{Error: fmt.Errorf("LIMIT value %d is out of valid range", limit64)}, fmt.Errorf("LIMIT value %d is out of valid range", limit64)
|
||||
}
|
||||
limit = int(limit64)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Build hybrid scan options
|
||||
// RESOLVED TODO: Extract from WHERE clause time filters
|
||||
startTimeNs, stopTimeNs := int64(0), int64(0)
|
||||
if stmt.Where != nil {
|
||||
startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr)
|
||||
}
|
||||
|
||||
hybridScanOptions := HybridScanOptions{
|
||||
StartTimeNs: startTimeNs, // Extracted from WHERE clause time comparisons
|
||||
StopTimeNs: stopTimeNs, // Extracted from WHERE clause time comparisons
|
||||
Limit: limit,
|
||||
Predicate: predicate,
|
||||
}
|
||||
|
||||
if !selectAll {
|
||||
hybridScanOptions.Columns = columns
|
||||
}
|
||||
|
||||
// Execute the hybrid scan with stats capture for EXPLAIN
|
||||
var results []HybridScanResult
|
||||
if plan != nil {
|
||||
// EXPLAIN mode - capture broker buffer stats
|
||||
var stats *HybridScanStats
|
||||
results, stats, err = hybridScanner.ScanWithStats(ctx, hybridScanOptions)
|
||||
if err != nil {
|
||||
return &QueryResult{Error: err}, err
|
||||
}
|
||||
|
||||
// Populate plan with broker buffer information
|
||||
if stats != nil {
|
||||
plan.BrokerBufferQueried = stats.BrokerBufferQueried
|
||||
plan.BrokerBufferMessages = stats.BrokerBufferMessages
|
||||
plan.BufferStartIndex = stats.BufferStartIndex
|
||||
|
||||
// Add broker_buffer to data sources if buffer was queried
|
||||
if stats.BrokerBufferQueried {
|
||||
// Check if broker_buffer is already in data sources
|
||||
hasBrokerBuffer := false
|
||||
for _, source := range plan.DataSources {
|
||||
if source == "broker_buffer" {
|
||||
hasBrokerBuffer = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !hasBrokerBuffer {
|
||||
plan.DataSources = append(plan.DataSources, "broker_buffer")
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Normal mode - just get results
|
||||
results, err = hybridScanner.Scan(ctx, hybridScanOptions)
|
||||
if err != nil {
|
||||
return &QueryResult{Error: err}, err
|
||||
}
|
||||
}
|
||||
|
||||
// Convert to SQL result format
|
||||
if selectAll {
|
||||
columns = nil // Let converter determine all columns
|
||||
}
|
||||
|
||||
return hybridScanner.ConvertToSQLResult(results, columns), nil
|
||||
}
|
||||
|
||||
// extractTimeFilters extracts time range filters from WHERE clause for optimization
|
||||
// This allows push-down of time-based queries to improve scan performance
|
||||
// Returns (startTimeNs, stopTimeNs) where 0 means unbounded
|
||||
@@ -1469,6 +1661,9 @@ func (builder *ExecutionPlanBuilder) buildDataSourcesList(strategy AggregationSt
|
||||
sources = append(sources, "live_logs", "parquet_files")
|
||||
}
|
||||
|
||||
// Note: broker_buffer is added dynamically during execution when broker is queried
|
||||
// See aggregations.go lines 397-409 for the broker buffer data source addition logic
|
||||
|
||||
return sources
|
||||
}
|
||||
|
||||
@@ -1803,9 +1998,7 @@ func (e *SQLEngine) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (*
|
||||
// Format: YYYY-MM-DD-HH-MM-SS.parquet
|
||||
func (e *SQLEngine) extractTimestampFromFilename(filename string) int64 {
|
||||
// Remove .parquet extension
|
||||
if strings.HasSuffix(filename, ".parquet") {
|
||||
filename = filename[:len(filename)-8]
|
||||
}
|
||||
filename = strings.TrimSuffix(filename, ".parquet")
|
||||
|
||||
// Parse timestamp format: 2006-01-02-15-04-05
|
||||
t, err := time.Parse("2006-01-02-15-04-05", filename)
|
||||
|
Reference in New Issue
Block a user