scan all files

This commit is contained in:
chrislu
2025-09-01 14:51:23 -07:00
parent ada704ad80
commit 2fa8991a52
2 changed files with 203 additions and 25 deletions

View File

@@ -19,18 +19,40 @@ func (e *SQLEngine) executeDescribeStatement(ctx context.Context, tableName stri
} }
} }
// Auto-discover and register topic if not already in catalog (same logic as SELECT)
if _, err := e.catalog.GetTableInfo(database, tableName); err != nil {
// Topic not in catalog, try to discover and register it
if regErr := e.discoverAndRegisterTopic(ctx, database, tableName); regErr != nil {
fmt.Printf("Warning: Failed to discover topic %s.%s: %v\n", database, tableName, regErr)
return &QueryResult{Error: fmt.Errorf("topic %s.%s not found and auto-discovery failed: %v", database, tableName, regErr)}, regErr
}
}
// Get topic schema from broker // Get topic schema from broker
recordType, err := e.catalog.brokerClient.GetTopicSchema(ctx, database, tableName) recordType, err := e.catalog.brokerClient.GetTopicSchema(ctx, database, tableName)
if err != nil { if err != nil {
return &QueryResult{Error: err}, err return &QueryResult{Error: err}, err
} }
// Format schema as DESCRIBE output // System columns to include in DESCRIBE output
result := &QueryResult{ systemColumns := []struct {
Columns: []string{"Field", "Type", "Null", "Key", "Default", "Extra"}, Name string
Rows: make([][]sqltypes.Value, len(recordType.Fields)), Type string
Extra string
}{
{"_timestamp_ns", "BIGINT", "System column: Message timestamp in nanoseconds"},
{"_key", "VARBINARY", "System column: Message key"},
{"_source", "VARCHAR(255)", "System column: Data source (parquet/log)"},
} }
// Format schema as DESCRIBE output (regular fields + system columns)
totalRows := len(recordType.Fields) + len(systemColumns)
result := &QueryResult{
Columns: []string{"Field", "Type", "Null", "Key", "Default", "Extra"},
Rows: make([][]sqltypes.Value, totalRows),
}
// Add regular fields
for i, field := range recordType.Fields { for i, field := range recordType.Fields {
sqlType := e.convertMQTypeToSQL(field.Type) sqlType := e.convertMQTypeToSQL(field.Type)
@@ -44,6 +66,19 @@ func (e *SQLEngine) executeDescribeStatement(ctx context.Context, tableName stri
} }
} }
// Add system columns
for i, sysCol := range systemColumns {
rowIndex := len(recordType.Fields) + i
result.Rows[rowIndex] = []sqltypes.Value{
sqltypes.NewVarChar(sysCol.Name), // Field
sqltypes.NewVarChar(sysCol.Type), // Type
sqltypes.NewVarChar("YES"), // Null
sqltypes.NewVarChar(""), // Key
sqltypes.NewVarChar("NULL"), // Default
sqltypes.NewVarChar(sysCol.Extra), // Extra - description
}
}
return result, nil return result, nil
} }
@@ -80,14 +115,14 @@ func (e *SQLEngine) executeShowStatementWithDescribe(ctx context.Context, stmt *
} }
} }
// Add support for DESCRIBE as a separate statement type // Add support for DESCRIBE/DESC as a separate statement type
// This would be called from ExecuteSQL if we detect a DESCRIBE statement // This would be called from ExecuteSQL if we detect a DESCRIBE/DESC statement
func (e *SQLEngine) handleDescribeCommand(ctx context.Context, sql string) (*QueryResult, error) { func (e *SQLEngine) handleDescribeCommand(ctx context.Context, sql string) (*QueryResult, error) {
// Simple parsing for "DESCRIBE [TABLE] table_name" format // Simple parsing for "DESCRIBE/DESC [TABLE] table_name" format
// Handle both "DESCRIBE table_name" and "DESCRIBE TABLE table_name" // Handle both "DESCRIBE table_name", "DESC table_name", "DESCRIBE TABLE table_name", "DESC TABLE table_name"
parts := strings.Fields(strings.TrimSpace(sql)) parts := strings.Fields(strings.TrimSpace(sql))
if len(parts) < 2 { if len(parts) < 2 {
err := fmt.Errorf("DESCRIBE requires a table name") err := fmt.Errorf("DESCRIBE/DESC requires a table name")
return &QueryResult{Error: err}, err return &QueryResult{Error: err}, err
} }
@@ -101,13 +136,16 @@ func (e *SQLEngine) handleDescribeCommand(ctx context.Context, sql string) (*Que
tableName = parts[1] tableName = parts[1]
} }
// Remove backticks from table name if present (same as SQL parser does)
tableName = strings.Trim(tableName, "`")
database := "" database := ""
// Handle database.table format // Handle database.table format
if strings.Contains(tableName, ".") { if strings.Contains(tableName, ".") {
dbTableParts := strings.SplitN(tableName, ".", 2) dbTableParts := strings.SplitN(tableName, ".", 2)
database = dbTableParts[0] database = strings.Trim(dbTableParts[0], "`") // Also strip backticks from database name
tableName = dbTableParts[1] tableName = strings.Trim(dbTableParts[1], "`")
} }
return e.executeDescribeStatement(ctx, tableName, database) return e.executeDescribeStatement(ctx, tableName, database)

View File

@@ -9,13 +9,15 @@ import (
"strings" "strings"
"time" "time"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/mq/schema" "github.com/seaweedfs/seaweedfs/weed/mq/schema"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/query/sqltypes" "github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/http" util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"github.com/xwb1989/sqlparser" "github.com/xwb1989/sqlparser"
"google.golang.org/protobuf/proto"
) )
// SQLEngine provides SQL query execution capabilities for SeaweedFS // SQLEngine provides SQL query execution capabilities for SeaweedFS
@@ -40,8 +42,8 @@ type QueryResult struct {
func NewSQLEngine(masterAddress string) *SQLEngine { func NewSQLEngine(masterAddress string) *SQLEngine {
// Initialize global HTTP client if not already done // Initialize global HTTP client if not already done
// This is needed for reading partition data from the filer // This is needed for reading partition data from the filer
if http.GetGlobalHttpClient() == nil { if util_http.GetGlobalHttpClient() == nil {
http.InitGlobalHttpClient() util_http.InitGlobalHttpClient()
} }
return &SQLEngine{ return &SQLEngine{
@@ -61,8 +63,9 @@ func (e *SQLEngine) GetCatalog() *SchemaCatalog {
// 3. DML operations (SELECT) query Parquet files directly // 3. DML operations (SELECT) query Parquet files directly
// 4. Error handling follows MySQL conventions // 4. Error handling follows MySQL conventions
func (e *SQLEngine) ExecuteSQL(ctx context.Context, sql string) (*QueryResult, error) { func (e *SQLEngine) ExecuteSQL(ctx context.Context, sql string) (*QueryResult, error) {
// Handle DESCRIBE as a special case since it's not parsed as a standard statement // Handle DESCRIBE/DESC as a special case since it's not parsed as a standard statement
if strings.HasPrefix(strings.ToUpper(strings.TrimSpace(sql)), "DESCRIBE") { sqlUpper := strings.ToUpper(strings.TrimSpace(sql))
if strings.HasPrefix(sqlUpper, "DESCRIBE") || strings.HasPrefix(sqlUpper, "DESC") {
return e.handleDescribeCommand(ctx, sql) return e.handleDescribeCommand(ctx, sql)
} }
@@ -1102,7 +1105,7 @@ func (e *SQLEngine) executeAggregationQuery(ctx context.Context, hybridScanner *
if stmt.Where == nil { // Only optimize when no complex WHERE clause if stmt.Where == nil { // Only optimize when no complex WHERE clause
fastResult, canOptimize := e.tryFastParquetAggregation(ctx, hybridScanner, aggregations) fastResult, canOptimize := e.tryFastParquetAggregation(ctx, hybridScanner, aggregations)
if canOptimize { if canOptimize {
fmt.Printf("Using fast parquet statistics for aggregation (skipped full scan)\n") fmt.Printf("Using fast hybrid statistics for aggregation (parquet stats + live log counts)\n")
return fastResult, nil return fastResult, nil
} }
} }
@@ -1354,8 +1357,11 @@ func (e *SQLEngine) compareValues(value1 *schema_pb.Value, value2 *schema_pb.Val
return 0 return 0
} }
// tryFastParquetAggregation attempts to compute aggregations using parquet metadata instead of full scan // tryFastParquetAggregation attempts to compute aggregations using hybrid approach:
// Returns (result, canOptimize) where canOptimize=true means the fast path was used // - Use parquet metadata for parquet files
// - Count live log files for live data
// - Combine both for accurate results per partition
// Returns (result, canOptimize) where canOptimize=true means the hybrid fast path was used
func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec) (*QueryResult, bool) { func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec) (*QueryResult, bool) {
// Check if all aggregations are optimizable with parquet statistics // Check if all aggregations are optimizable with parquet statistics
for _, spec := range aggregations { for _, spec := range aggregations {
@@ -1370,13 +1376,16 @@ func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner
return nil, false return nil, false
} }
// Collect parquet statistics from all partitions // Collect statistics from all partitions (both parquet and live logs)
allFileStats := make(map[string][]*ParquetFileStats) // partitionPath -> file stats allFileStats := make(map[string][]*ParquetFileStats) // partitionPath -> parquet file stats
totalRowCount := int64(0) totalParquetRowCount := int64(0)
totalLiveLogRowCount := int64(0)
partitionsWithLiveLogs := 0
for _, partition := range partitions { for _, partition := range partitions {
partitionPath := fmt.Sprintf("/topics/%s/%s/%s", hybridScanner.topic.Namespace, hybridScanner.topic.Name, partition) partitionPath := fmt.Sprintf("/topics/%s/%s/%s", hybridScanner.topic.Namespace, hybridScanner.topic.Name, partition)
// Get parquet file statistics (always try this)
fileStats, err := hybridScanner.ReadParquetStatistics(partitionPath) fileStats, err := hybridScanner.ReadParquetStatistics(partitionPath)
if err != nil { if err != nil {
// If we can't read stats from any partition, fall back to full scan // If we can't read stats from any partition, fall back to full scan
@@ -1386,13 +1395,32 @@ func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner
if len(fileStats) > 0 { if len(fileStats) > 0 {
allFileStats[partitionPath] = fileStats allFileStats[partitionPath] = fileStats
for _, fileStat := range fileStats { for _, fileStat := range fileStats {
totalRowCount += fileStat.RowCount totalParquetRowCount += fileStat.RowCount
} }
} }
// Check if there are live log files and count their rows
liveLogRowCount, err := e.countLiveLogRows(partitionPath)
if err != nil {
// If we can't count live logs, fall back to full scan
return nil, false
}
if liveLogRowCount > 0 {
totalLiveLogRowCount += liveLogRowCount
partitionsWithLiveLogs++
}
} }
// If no parquet files found, can't optimize totalRowCount := totalParquetRowCount + totalLiveLogRowCount
if len(allFileStats) == 0 || totalRowCount == 0 {
// Debug: Show the hybrid optimization results
if totalParquetRowCount > 0 || totalLiveLogRowCount > 0 {
fmt.Printf("Hybrid fast aggregation: %d parquet rows + %d live log rows from %d partitions with live logs\n",
totalParquetRowCount, totalLiveLogRowCount, partitionsWithLiveLogs)
}
// If no data found, can't optimize
if totalRowCount == 0 {
return nil, false return nil, false
} }
@@ -1598,6 +1626,118 @@ func (e *SQLEngine) extractTimestampFromFilename(filename string) int64 {
return t.UnixNano() return t.UnixNano()
} }
// hasLiveLogFiles checks if there are any live log files (non-parquet files) in a partition
func (e *SQLEngine) hasLiveLogFiles(partitionPath string) (bool, error) {
// Get FilerClient from BrokerClient
filerClient, err := e.catalog.brokerClient.GetFilerClient()
if err != nil {
return false, err
}
hasLiveLogs := false
// Read all files in the partition directory
err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error {
// Skip directories and parquet files
if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") {
return nil
}
// Found a non-parquet file (live log)
hasLiveLogs = true
return nil // Can continue or return early, doesn't matter for existence check
})
return hasLiveLogs, err
}
// countLiveLogRows counts the total number of rows in live log files (non-parquet files) in a partition
func (e *SQLEngine) countLiveLogRows(partitionPath string) (int64, error) {
filerClient, err := e.catalog.brokerClient.GetFilerClient()
if err != nil {
return 0, err
}
totalRows := int64(0)
err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error {
if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") {
return nil // Skip directories and parquet files
}
// Count rows in live log file
rowCount, err := e.countRowsInLogFile(filerClient, partitionPath, entry)
if err != nil {
fmt.Printf("Warning: failed to count rows in %s/%s: %v\n", partitionPath, entry.Name, err)
return nil // Continue with other files
}
totalRows += rowCount
return nil
})
return totalRows, err
}
// countRowsInLogFile counts rows in a single log file using SeaweedFS patterns
func (e *SQLEngine) countRowsInLogFile(filerClient filer_pb.FilerClient, partitionPath string, entry *filer_pb.Entry) (int64, error) {
lookupFileIdFn := filer.LookupFn(filerClient)
rowCount := int64(0)
// eachChunkFn processes each chunk's data (pattern from read_log_from_disk.go)
eachChunkFn := func(buf []byte) error {
for pos := 0; pos+4 < len(buf); {
size := util.BytesToUint32(buf[pos : pos+4])
if pos+4+int(size) > len(buf) {
break
}
entryData := buf[pos+4 : pos+4+int(size)]
logEntry := &filer_pb.LogEntry{}
if err := proto.Unmarshal(entryData, logEntry); err != nil {
pos += 4 + int(size)
continue // Skip corrupted entries
}
rowCount++
pos += 4 + int(size)
}
return nil
}
// Read file chunks and process them (pattern from read_log_from_disk.go)
fileSize := filer.FileSize(entry)
visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize))
chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize))
for x := chunkViews.Front(); x != nil; x = x.Next {
chunk := x.Value
urlStrings, err := lookupFileIdFn(context.Background(), chunk.FileId)
if err != nil {
fmt.Printf("Warning: failed to lookup chunk %s: %v\n", chunk.FileId, err)
continue
}
if len(urlStrings) == 0 {
continue
}
// Read chunk data
// urlStrings[0] is already a complete URL (http://server:port/fileId)
data, _, err := util_http.Get(urlStrings[0])
if err != nil {
fmt.Printf("Warning: failed to read chunk %s from %s: %v\n", chunk.FileId, urlStrings[0], err)
continue
}
// Process this chunk
if err := eachChunkFn(data); err != nil {
return rowCount, err
}
}
return rowCount, nil
}
// discoverTopicPartitions discovers all partitions for a given topic // discoverTopicPartitions discovers all partitions for a given topic
func (e *SQLEngine) discoverTopicPartitions(namespace, topicName string) ([]string, error) { func (e *SQLEngine) discoverTopicPartitions(namespace, topicName string) ([]string, error) {
// Use the same discovery logic as in hybrid_message_scanner.go // Use the same discovery logic as in hybrid_message_scanner.go