diff --git a/weed/query/engine/describe.go b/weed/query/engine/describe.go index 5e21fe87b..d0ad4d163 100644 --- a/weed/query/engine/describe.go +++ b/weed/query/engine/describe.go @@ -19,18 +19,40 @@ func (e *SQLEngine) executeDescribeStatement(ctx context.Context, tableName stri } } + // Auto-discover and register topic if not already in catalog (same logic as SELECT) + if _, err := e.catalog.GetTableInfo(database, tableName); err != nil { + // Topic not in catalog, try to discover and register it + if regErr := e.discoverAndRegisterTopic(ctx, database, tableName); regErr != nil { + fmt.Printf("Warning: Failed to discover topic %s.%s: %v\n", database, tableName, regErr) + return &QueryResult{Error: fmt.Errorf("topic %s.%s not found and auto-discovery failed: %v", database, tableName, regErr)}, regErr + } + } + // Get topic schema from broker recordType, err := e.catalog.brokerClient.GetTopicSchema(ctx, database, tableName) if err != nil { return &QueryResult{Error: err}, err } - // Format schema as DESCRIBE output - result := &QueryResult{ - Columns: []string{"Field", "Type", "Null", "Key", "Default", "Extra"}, - Rows: make([][]sqltypes.Value, len(recordType.Fields)), + // System columns to include in DESCRIBE output + systemColumns := []struct { + Name string + Type string + Extra string + }{ + {"_timestamp_ns", "BIGINT", "System column: Message timestamp in nanoseconds"}, + {"_key", "VARBINARY", "System column: Message key"}, + {"_source", "VARCHAR(255)", "System column: Data source (parquet/log)"}, } + // Format schema as DESCRIBE output (regular fields + system columns) + totalRows := len(recordType.Fields) + len(systemColumns) + result := &QueryResult{ + Columns: []string{"Field", "Type", "Null", "Key", "Default", "Extra"}, + Rows: make([][]sqltypes.Value, totalRows), + } + + // Add regular fields for i, field := range recordType.Fields { sqlType := e.convertMQTypeToSQL(field.Type) @@ -44,6 +66,19 @@ func (e *SQLEngine) executeDescribeStatement(ctx context.Context, tableName stri } } + // Add system columns + for i, sysCol := range systemColumns { + rowIndex := len(recordType.Fields) + i + result.Rows[rowIndex] = []sqltypes.Value{ + sqltypes.NewVarChar(sysCol.Name), // Field + sqltypes.NewVarChar(sysCol.Type), // Type + sqltypes.NewVarChar("YES"), // Null + sqltypes.NewVarChar(""), // Key + sqltypes.NewVarChar("NULL"), // Default + sqltypes.NewVarChar(sysCol.Extra), // Extra - description + } + } + return result, nil } @@ -80,14 +115,14 @@ func (e *SQLEngine) executeShowStatementWithDescribe(ctx context.Context, stmt * } } -// Add support for DESCRIBE as a separate statement type -// This would be called from ExecuteSQL if we detect a DESCRIBE statement +// Add support for DESCRIBE/DESC as a separate statement type +// This would be called from ExecuteSQL if we detect a DESCRIBE/DESC statement func (e *SQLEngine) handleDescribeCommand(ctx context.Context, sql string) (*QueryResult, error) { - // Simple parsing for "DESCRIBE [TABLE] table_name" format - // Handle both "DESCRIBE table_name" and "DESCRIBE TABLE table_name" + // Simple parsing for "DESCRIBE/DESC [TABLE] table_name" format + // Handle both "DESCRIBE table_name", "DESC table_name", "DESCRIBE TABLE table_name", "DESC TABLE table_name" parts := strings.Fields(strings.TrimSpace(sql)) if len(parts) < 2 { - err := fmt.Errorf("DESCRIBE requires a table name") + err := fmt.Errorf("DESCRIBE/DESC requires a table name") return &QueryResult{Error: err}, err } @@ -101,13 +136,16 @@ func (e *SQLEngine) handleDescribeCommand(ctx context.Context, sql string) (*Que tableName = parts[1] } + // Remove backticks from table name if present (same as SQL parser does) + tableName = strings.Trim(tableName, "`") + database := "" // Handle database.table format if strings.Contains(tableName, ".") { dbTableParts := strings.SplitN(tableName, ".", 2) - database = dbTableParts[0] - tableName = dbTableParts[1] + database = strings.Trim(dbTableParts[0], "`") // Also strip backticks from database name + tableName = strings.Trim(dbTableParts[1], "`") } return e.executeDescribeStatement(ctx, tableName, database) diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index afff1a889..3e78e2208 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -9,13 +9,15 @@ import ( "strings" "time" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/mq/schema" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/query/sqltypes" "github.com/seaweedfs/seaweedfs/weed/util" - "github.com/seaweedfs/seaweedfs/weed/util/http" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" "github.com/xwb1989/sqlparser" + "google.golang.org/protobuf/proto" ) // SQLEngine provides SQL query execution capabilities for SeaweedFS @@ -40,8 +42,8 @@ type QueryResult struct { func NewSQLEngine(masterAddress string) *SQLEngine { // Initialize global HTTP client if not already done // This is needed for reading partition data from the filer - if http.GetGlobalHttpClient() == nil { - http.InitGlobalHttpClient() + if util_http.GetGlobalHttpClient() == nil { + util_http.InitGlobalHttpClient() } return &SQLEngine{ @@ -61,8 +63,9 @@ func (e *SQLEngine) GetCatalog() *SchemaCatalog { // 3. DML operations (SELECT) query Parquet files directly // 4. Error handling follows MySQL conventions func (e *SQLEngine) ExecuteSQL(ctx context.Context, sql string) (*QueryResult, error) { - // Handle DESCRIBE as a special case since it's not parsed as a standard statement - if strings.HasPrefix(strings.ToUpper(strings.TrimSpace(sql)), "DESCRIBE") { + // Handle DESCRIBE/DESC as a special case since it's not parsed as a standard statement + sqlUpper := strings.ToUpper(strings.TrimSpace(sql)) + if strings.HasPrefix(sqlUpper, "DESCRIBE") || strings.HasPrefix(sqlUpper, "DESC") { return e.handleDescribeCommand(ctx, sql) } @@ -1102,7 +1105,7 @@ func (e *SQLEngine) executeAggregationQuery(ctx context.Context, hybridScanner * if stmt.Where == nil { // Only optimize when no complex WHERE clause fastResult, canOptimize := e.tryFastParquetAggregation(ctx, hybridScanner, aggregations) if canOptimize { - fmt.Printf("Using fast parquet statistics for aggregation (skipped full scan)\n") + fmt.Printf("Using fast hybrid statistics for aggregation (parquet stats + live log counts)\n") return fastResult, nil } } @@ -1354,8 +1357,11 @@ func (e *SQLEngine) compareValues(value1 *schema_pb.Value, value2 *schema_pb.Val return 0 } -// tryFastParquetAggregation attempts to compute aggregations using parquet metadata instead of full scan -// Returns (result, canOptimize) where canOptimize=true means the fast path was used +// tryFastParquetAggregation attempts to compute aggregations using hybrid approach: +// - Use parquet metadata for parquet files +// - Count live log files for live data +// - Combine both for accurate results per partition +// Returns (result, canOptimize) where canOptimize=true means the hybrid fast path was used func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec) (*QueryResult, bool) { // Check if all aggregations are optimizable with parquet statistics for _, spec := range aggregations { @@ -1370,13 +1376,16 @@ func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner return nil, false } - // Collect parquet statistics from all partitions - allFileStats := make(map[string][]*ParquetFileStats) // partitionPath -> file stats - totalRowCount := int64(0) + // Collect statistics from all partitions (both parquet and live logs) + allFileStats := make(map[string][]*ParquetFileStats) // partitionPath -> parquet file stats + totalParquetRowCount := int64(0) + totalLiveLogRowCount := int64(0) + partitionsWithLiveLogs := 0 for _, partition := range partitions { partitionPath := fmt.Sprintf("/topics/%s/%s/%s", hybridScanner.topic.Namespace, hybridScanner.topic.Name, partition) + // Get parquet file statistics (always try this) fileStats, err := hybridScanner.ReadParquetStatistics(partitionPath) if err != nil { // If we can't read stats from any partition, fall back to full scan @@ -1386,13 +1395,32 @@ func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner if len(fileStats) > 0 { allFileStats[partitionPath] = fileStats for _, fileStat := range fileStats { - totalRowCount += fileStat.RowCount + totalParquetRowCount += fileStat.RowCount } } + + // Check if there are live log files and count their rows + liveLogRowCount, err := e.countLiveLogRows(partitionPath) + if err != nil { + // If we can't count live logs, fall back to full scan + return nil, false + } + if liveLogRowCount > 0 { + totalLiveLogRowCount += liveLogRowCount + partitionsWithLiveLogs++ + } } - // If no parquet files found, can't optimize - if len(allFileStats) == 0 || totalRowCount == 0 { + totalRowCount := totalParquetRowCount + totalLiveLogRowCount + + // Debug: Show the hybrid optimization results + if totalParquetRowCount > 0 || totalLiveLogRowCount > 0 { + fmt.Printf("Hybrid fast aggregation: %d parquet rows + %d live log rows from %d partitions with live logs\n", + totalParquetRowCount, totalLiveLogRowCount, partitionsWithLiveLogs) + } + + // If no data found, can't optimize + if totalRowCount == 0 { return nil, false } @@ -1598,6 +1626,118 @@ func (e *SQLEngine) extractTimestampFromFilename(filename string) int64 { return t.UnixNano() } +// hasLiveLogFiles checks if there are any live log files (non-parquet files) in a partition +func (e *SQLEngine) hasLiveLogFiles(partitionPath string) (bool, error) { + // Get FilerClient from BrokerClient + filerClient, err := e.catalog.brokerClient.GetFilerClient() + if err != nil { + return false, err + } + + hasLiveLogs := false + + // Read all files in the partition directory + err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { + // Skip directories and parquet files + if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") { + return nil + } + + // Found a non-parquet file (live log) + hasLiveLogs = true + return nil // Can continue or return early, doesn't matter for existence check + }) + + return hasLiveLogs, err +} + +// countLiveLogRows counts the total number of rows in live log files (non-parquet files) in a partition +func (e *SQLEngine) countLiveLogRows(partitionPath string) (int64, error) { + filerClient, err := e.catalog.brokerClient.GetFilerClient() + if err != nil { + return 0, err + } + + totalRows := int64(0) + err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { + if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") { + return nil // Skip directories and parquet files + } + + // Count rows in live log file + rowCount, err := e.countRowsInLogFile(filerClient, partitionPath, entry) + if err != nil { + fmt.Printf("Warning: failed to count rows in %s/%s: %v\n", partitionPath, entry.Name, err) + return nil // Continue with other files + } + totalRows += rowCount + return nil + }) + return totalRows, err +} + +// countRowsInLogFile counts rows in a single log file using SeaweedFS patterns +func (e *SQLEngine) countRowsInLogFile(filerClient filer_pb.FilerClient, partitionPath string, entry *filer_pb.Entry) (int64, error) { + lookupFileIdFn := filer.LookupFn(filerClient) + + rowCount := int64(0) + + // eachChunkFn processes each chunk's data (pattern from read_log_from_disk.go) + eachChunkFn := func(buf []byte) error { + for pos := 0; pos+4 < len(buf); { + size := util.BytesToUint32(buf[pos : pos+4]) + if pos+4+int(size) > len(buf) { + break + } + + entryData := buf[pos+4 : pos+4+int(size)] + + logEntry := &filer_pb.LogEntry{} + if err := proto.Unmarshal(entryData, logEntry); err != nil { + pos += 4 + int(size) + continue // Skip corrupted entries + } + + rowCount++ + pos += 4 + int(size) + } + return nil + } + + // Read file chunks and process them (pattern from read_log_from_disk.go) + fileSize := filer.FileSize(entry) + visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize)) + chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize)) + + for x := chunkViews.Front(); x != nil; x = x.Next { + chunk := x.Value + urlStrings, err := lookupFileIdFn(context.Background(), chunk.FileId) + if err != nil { + fmt.Printf("Warning: failed to lookup chunk %s: %v\n", chunk.FileId, err) + continue + } + + if len(urlStrings) == 0 { + continue + } + + // Read chunk data + // urlStrings[0] is already a complete URL (http://server:port/fileId) + data, _, err := util_http.Get(urlStrings[0]) + if err != nil { + fmt.Printf("Warning: failed to read chunk %s from %s: %v\n", chunk.FileId, urlStrings[0], err) + continue + } + + // Process this chunk + if err := eachChunkFn(data); err != nil { + return rowCount, err + } + } + + return rowCount, nil +} + // discoverTopicPartitions discovers all partitions for a given topic func (e *SQLEngine) discoverTopicPartitions(namespace, topicName string) ([]string, error) { // Use the same discovery logic as in hybrid_message_scanner.go