diff --git a/SQL_FEATURE_PLAN.md b/SQL_FEATURE_PLAN.md new file mode 100644 index 000000000..40e638732 --- /dev/null +++ b/SQL_FEATURE_PLAN.md @@ -0,0 +1,284 @@ +# SQL Query Engine Feature, Dev, and Test Plan + +This document outlines the plan for adding comprehensive SQL support to SeaweedFS, focusing on schema-tized Message Queue (MQ) topics with full DDL and DML capabilities, plus S3 objects querying. + +## Feature Plan + +**1. Goal** + +To provide a full-featured SQL interface for SeaweedFS, treating schema-tized MQ topics as database tables with complete DDL/DML support. This enables: +- Database-like operations on MQ topics (CREATE TABLE, ALTER TABLE, DROP TABLE) +- Advanced querying with SELECT, WHERE, JOIN, aggregations +- Schema management and metadata operations (SHOW DATABASES, SHOW TABLES) +- In-place analytics on Parquet-stored messages without data movement + +**2. Key Features** + +* **Schema-tized Topic Management (Priority 1):** + * `SHOW DATABASES` - List all MQ namespaces + * `SHOW TABLES` - List all topics in a namespace + * `CREATE TABLE topic_name (field1 INT, field2 STRING, ...)` - Create new MQ topic with schema + * `ALTER TABLE topic_name ADD COLUMN field3 BOOL` - Modify topic schema (with versioning) + * `DROP TABLE topic_name` - Delete MQ topic + * `DESCRIBE table_name` - Show topic schema details +* **Advanced Query Engine (Priority 1):** + * Full `SELECT` support with `WHERE`, `ORDER BY`, `LIMIT`, `OFFSET` + * Aggregation functions: `COUNT()`, `SUM()`, `AVG()`, `MIN()`, `MAX()`, `GROUP BY` + * Join operations between topics (leveraging Parquet columnar format) + * Window functions and advanced analytics + * Temporal queries with timestamp-based filtering +* **S3 Select (Priority 2):** + * Support for querying objects in standard data formats (CSV, JSON, Parquet) + * Queries executed directly on storage nodes to minimize data transfer +* **User Interfaces:** + * New API endpoint `/sql` for HTTP-based SQL execution + * New CLI command `weed sql` with interactive shell mode + * Optional: Web UI for query execution and result visualization +* **Output Formats:** + * JSON (default), CSV, Parquet for result sets + * Streaming results for large queries + * Pagination support for result navigation + +## Development Plan + +**1. Scaffolding & Dependencies** + +* **SQL Parser:** Use **`github.com/xwb1989/sqlparser`** or **`github.com/blastrain/vitess-sqlparser`** (enhanced fork). Benefits: + * Lightweight, focused SQL parser originally derived from Vitess + * Full MySQL-compatible DDL/DML support (`CREATE`, `ALTER`, `DROP`, `SELECT`, etc.) + * Integrates well with existing `weed/query/sqltypes` infrastructure + * Proven in production use across many Go projects + * Alternative: `blastrain/vitess-sqlparser` if advanced features like `OFFSET` or bulk operations are needed +* **Project Structure:** + * Extend existing `weed/query/` package for SQL execution engine + * Create `weed/query/engine/` for query planning and execution + * Create `weed/query/metadata/` for schema catalog management + * Integration point in `weed/mq/` for topic-to-table mapping + +**2. SQL Engine Architecture** + +* **Schema Catalog:** + * Leverage existing `weed/mq/schema/` infrastructure + * Map MQ namespaces to "databases" and topics to "tables" + * Store schema metadata with version history + * Handle schema evolution and migration +* **Query Planner:** + * Parse SQL AST using Vitess parser + * Create optimized execution plans leveraging Parquet columnar format + * Push-down predicates to storage layer for efficient filtering + * Optimize joins using partition pruning +* **Query Executor:** + * Utilize existing `weed/mq/logstore/` for Parquet reading + * Implement streaming execution for large result sets + * Support parallel processing across topic partitions + * Handle schema evolution during query execution + +**3. Data Source Integration** + +* **MQ Topic Connector (Primary):** + * Build on existing `weed/mq/logstore/read_parquet_to_log.go` + * Implement efficient Parquet scanning with predicate pushdown + * Support schema evolution and backward compatibility + * Handle partition-based parallelism for scalable queries +* **Schema Registry Integration:** + * Extend `weed/mq/schema/schema.go` for SQL metadata operations + * Implement DDL operations that modify underlying MQ topic schemas + * Version control for schema changes with migration support +* **S3 Connector (Secondary):** + * Reading data from S3 objects with CSV, JSON, and Parquet parsers + * Efficient streaming for large files with columnar optimizations + +**4. API & CLI Integration** + +* **HTTP API Endpoint:** + * Add `/sql` endpoint to Filer server following existing patterns in `weed/server/filer_server.go` + * Support both POST (for queries) and GET (for metadata operations) + * Include query result pagination and streaming + * Authentication and authorization integration +* **CLI Command:** + * New `weed sql` command with interactive shell mode (similar to `weed shell`) + * Support for script execution and result formatting + * Connection management for remote SeaweedFS clusters +* **gRPC API:** + * Add SQL service to existing MQ broker gRPC interface + * Enable efficient query execution with streaming results + +## Example Usage Scenarios + +**Scenario 1: Topic Management** +```sql +-- List all namespaces (databases) +SHOW DATABASES; + +-- List topics in a namespace +USE my_namespace; +SHOW TABLES; + +-- Create a new topic with schema +CREATE TABLE user_events ( + user_id INT, + event_type STRING, + timestamp BIGINT, + metadata STRING +); + +-- Modify topic schema +ALTER TABLE user_events ADD COLUMN session_id STRING; + +-- View topic structure +DESCRIBE user_events; +``` + +**Scenario 2: Data Querying** +```sql +-- Basic filtering and projection +SELECT user_id, event_type, timestamp +FROM user_events +WHERE timestamp > 1640995200000 +ORDER BY timestamp DESC +LIMIT 100; + +-- Aggregation queries +SELECT event_type, COUNT(*) as event_count +FROM user_events +WHERE timestamp >= 1640995200000 +GROUP BY event_type; + +-- Cross-topic joins +SELECT u.user_id, u.event_type, p.product_name +FROM user_events u +JOIN product_catalog p ON u.product_id = p.id +WHERE u.event_type = 'purchase'; +``` + +**Scenario 3: Analytics & Monitoring** +```sql +-- Time-series analysis +SELECT + DATE_TRUNC('hour', FROM_UNIXTIME(timestamp/1000)) as hour, + COUNT(*) as events_per_hour +FROM user_events +WHERE timestamp >= 1640995200000 +GROUP BY hour +ORDER BY hour; + +-- Real-time monitoring +SELECT event_type, AVG(response_time) as avg_response +FROM api_logs +WHERE timestamp >= UNIX_TIMESTAMP() - 3600 +GROUP BY event_type +HAVING avg_response > 1000; +``` + +## Architecture Overview + +``` +SQL Query Flow: +┌─────────────┐ ┌──────────────┐ ┌─────────────────┐ ┌──────────────┐ +│ Client │ │ SQL Parser │ │ Query Planner │ │ Execution │ +│ (CLI/HTTP) │──→ │ (xwb1989/ │──→ │ & Optimizer │──→ │ Engine │ +│ │ │ sqlparser) │ │ │ │ │ +└─────────────┘ └──────────────┘ └─────────────────┘ └──────────────┘ + │ │ + ▼ │ + ┌─────────────────────────────────────────────────┐│ + │ Schema Catalog ││ + │ • Namespace → Database mapping ││ + │ • Topic → Table mapping ││ + │ • Schema version management ││ + └─────────────────────────────────────────────────┘│ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────────┐ +│ MQ Storage Layer │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ +│ │ Topic A │ │ Topic B │ │ Topic C │ │ ... │ │ +│ │ (Parquet) │ │ (Parquet) │ │ (Parquet) │ │ (Parquet) │ │ +│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ +└─────────────────────────────────────────────────────────────────────────────┘ +``` + +## Key Design Decisions + +**1. SQL-to-MQ Mapping Strategy:** +* MQ Namespaces ↔ SQL Databases +* MQ Topics ↔ SQL Tables +* Topic Partitions ↔ Table Shards (transparent to users) +* Schema Fields ↔ Table Columns + +**2. Schema Evolution Handling:** +* Maintain schema version history in topic metadata +* Support backward-compatible queries across schema versions +* Automatic type coercion where possible +* Clear error messages for incompatible changes + +**3. Query Optimization:** +* Leverage Parquet columnar format for projection pushdown +* Use topic partitioning for parallel query execution +* Implement predicate pushdown to minimize data scanning +* Cache frequently accessed schema metadata + +**4. Transaction Semantics:** +* DDL operations (CREATE/ALTER/DROP) are atomic per topic +* SELECT queries provide read-consistent snapshots +* No cross-topic transactions initially (future enhancement) + +## Implementation Phases + +**Phase 1: Core SQL Infrastructure (Weeks 1-3)** +1. Add `github.com/xwb1989/sqlparser` dependency +2. Create `weed/query/engine/` package with basic SQL execution framework +3. Implement metadata catalog mapping MQ topics to SQL tables +4. Basic `SHOW DATABASES`, `SHOW TABLES`, `DESCRIBE` commands + +**Phase 2: DDL Operations (Weeks 4-5)** +1. `CREATE TABLE` → Create MQ topic with schema +2. `ALTER TABLE` → Modify topic schema with versioning +3. `DROP TABLE` → Delete MQ topic +4. Schema validation and migration handling + +**Phase 3: Query Engine (Weeks 6-8)** +1. `SELECT` with `WHERE`, `ORDER BY`, `LIMIT`, `OFFSET` +2. Aggregation functions and `GROUP BY` +3. Basic joins between topics +4. Predicate pushdown to Parquet layer + +**Phase 4: API & CLI Integration (Weeks 9-10)** +1. HTTP `/sql` endpoint implementation +2. `weed sql` CLI command with interactive mode +3. Result streaming and pagination +4. Error handling and query optimization + +## Test Plan + +**1. Unit Tests** + +* **SQL Parser Tests:** Validate parsing of all supported DDL/DML statements +* **Schema Mapping Tests:** Test topic-to-table conversion and metadata operations +* **Query Planning Tests:** Verify optimization and predicate pushdown logic +* **Execution Engine Tests:** Test query execution with various data patterns +* **Edge Cases:** Malformed queries, schema evolution, concurrent operations + +**2. Integration Tests** + +* **End-to-End Workflow:** Complete SQL operations against live SeaweedFS cluster +* **Schema Evolution:** Test backward compatibility during schema changes +* **Multi-Topic Joins:** Validate cross-topic query performance and correctness +* **Large Dataset Tests:** Performance validation with GB-scale Parquet data +* **Concurrent Access:** Multiple SQL sessions operating simultaneously + +**3. Performance & Security Testing** + +* **Query Performance:** Benchmark latency for various query patterns +* **Memory Usage:** Monitor resource consumption during large result sets +* **Scalability Tests:** Performance across multiple partitions and topics +* **SQL Injection Prevention:** Security validation of parser and execution engine +* **Fuzz Testing:** Automated testing with malformed SQL inputs + +## Success Metrics + +* **Feature Completeness:** Support for all specified DDL/DML operations +* **Performance:** Query latency < 100ms for simple selects, < 1s for complex joins +* **Scalability:** Handle topics with millions of messages efficiently +* **Reliability:** 99.9% success rate for valid SQL operations +* **Usability:** Intuitive SQL interface matching standard database expectations diff --git a/go.mod b/go.mod index 21a17333d..4b3fa19d6 100644 --- a/go.mod +++ b/go.mod @@ -92,6 +92,7 @@ require ( github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect + github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2 github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect go.etcd.io/etcd/client/v3 v3.6.4 go.mongodb.org/mongo-driver v1.17.4 diff --git a/go.sum b/go.sum index ed1d931c4..7a2e696c1 100644 --- a/go.sum +++ b/go.sum @@ -1698,6 +1698,8 @@ github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2 h1:zzrxE1FKn5ryBNl9eKOeqQ58Y/Qpo3Q9QNxKHX5uzzQ= +github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2/go.mod h1:hzfGeIUDq/j97IG+FhNqkowIyEcD88LrW6fyU3K3WqY= github.com/yandex-cloud/go-genproto v0.0.0-20211115083454-9ca41db5ed9e h1:9LPdmD1vqadsDQUva6t2O9MbnyvoOgo8nFNPaOIH5U8= github.com/yandex-cloud/go-genproto v0.0.0-20211115083454-9ca41db5ed9e/go.mod h1:HEUYX/p8966tMUHHT+TsS0hF/Ca/NYwqprC5WXSDMfE= github.com/ydb-platform/ydb-go-genproto v0.0.0-20221215182650-986f9d10542f/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= diff --git a/weed/command/command.go b/weed/command/command.go index 06474fbb9..5da7fdc73 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -39,6 +39,7 @@ var Commands = []*Command{ cmdScaffold, cmdServer, cmdShell, + cmdSql, cmdUpdate, cmdUpload, cmdVersion, diff --git a/weed/command/sql.go b/weed/command/sql.go new file mode 100644 index 000000000..f4eaec604 --- /dev/null +++ b/weed/command/sql.go @@ -0,0 +1,229 @@ +package command + +import ( + "bufio" + "context" + "fmt" + "os" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/query/engine" +) + +func init() { + cmdSql.Run = runSql +} + +var cmdSql = &Command{ + UsageLine: "sql [-server=localhost:8888]", + Short: "start a SQL query interface for SeaweedFS MQ topics", + Long: `Start an interactive SQL shell to query SeaweedFS Message Queue topics as tables. + +Features: +- SHOW DATABASES: List all MQ namespaces +- SHOW TABLES: List topics in current namespace +- DESCRIBE table: Show table schema +- SELECT queries (coming soon) +- CREATE/ALTER/DROP TABLE operations (coming soon) + +Assumptions: +- MQ namespaces map to SQL databases +- MQ topics map to SQL tables +- Messages are stored in Parquet format for efficient querying + +Examples: + weed sql + weed sql -server=broker1:8888 +`, +} + +var ( + sqlServer = cmdSql.Flag.String("server", "localhost:8888", "SeaweedFS server address") +) + +func runSql(command *Command, args []string) bool { + fmt.Println("SeaweedFS SQL Interface") + fmt.Println("Type 'help;' for help, 'exit;' to quit") + fmt.Printf("Connected to: %s\n", *sqlServer) + fmt.Println() + + // Initialize SQL engine + // Assumption: Engine will connect to MQ broker on demand + sqlEngine := engine.NewSQLEngine() + + // Interactive shell loop + scanner := bufio.NewScanner(os.Stdin) + var queryBuffer strings.Builder + + for { + // Show prompt + if queryBuffer.Len() == 0 { + fmt.Print("seaweedfs> ") + } else { + fmt.Print(" -> ") // Continuation prompt + } + + // Read line + if !scanner.Scan() { + break + } + + line := strings.TrimSpace(scanner.Text()) + + // Handle special commands + if line == "exit;" || line == "quit;" || line == "\\q" { + fmt.Println("Goodbye!") + break + } + + if line == "help;" { + showHelp() + continue + } + + if line == "" { + continue + } + + // Accumulate multi-line queries + queryBuffer.WriteString(line) + queryBuffer.WriteString(" ") + + // Execute when query ends with semicolon + if strings.HasSuffix(line, ";") { + query := strings.TrimSpace(queryBuffer.String()) + query = strings.TrimSuffix(query, ";") // Remove trailing semicolon + + executeQuery(sqlEngine, query) + + // Reset buffer for next query + queryBuffer.Reset() + } + } + + return true +} + +// executeQuery runs a SQL query and displays results +// Assumption: All queries are executed synchronously for simplicity +func executeQuery(engine *engine.SQLEngine, query string) { + startTime := time.Now() + + // Execute the query + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + result, err := engine.ExecuteSQL(ctx, query) + if err != nil { + fmt.Printf("Error: %v\n", err) + return + } + + if result.Error != nil { + fmt.Printf("Query Error: %v\n", result.Error) + return + } + + // Display results + displayQueryResult(result) + + // Show execution time + elapsed := time.Since(startTime) + fmt.Printf("\n(%d rows in set, %.2f sec)\n\n", len(result.Rows), elapsed.Seconds()) +} + +// displayQueryResult formats and displays query results in table format +// Assumption: Results fit in terminal width (simple formatting for now) +func displayQueryResult(result *engine.QueryResult) { + if len(result.Columns) == 0 { + fmt.Println("Empty result set") + return + } + + // Calculate column widths for formatting + colWidths := make([]int, len(result.Columns)) + for i, col := range result.Columns { + colWidths[i] = len(col) + } + + // Check data for wider columns + for _, row := range result.Rows { + for i, val := range row { + if i < len(colWidths) { + valStr := val.ToString() + if len(valStr) > colWidths[i] { + colWidths[i] = len(valStr) + } + } + } + } + + // Print header separator + fmt.Print("+") + for _, width := range colWidths { + fmt.Print(strings.Repeat("-", width+2) + "+") + } + fmt.Println() + + // Print column headers + fmt.Print("|") + for i, col := range result.Columns { + fmt.Printf(" %-*s |", colWidths[i], col) + } + fmt.Println() + + // Print separator + fmt.Print("+") + for _, width := range colWidths { + fmt.Print(strings.Repeat("-", width+2) + "+") + } + fmt.Println() + + // Print data rows + for _, row := range result.Rows { + fmt.Print("|") + for i, val := range row { + if i < len(colWidths) { + fmt.Printf(" %-*s |", colWidths[i], val.ToString()) + } + } + fmt.Println() + } + + // Print bottom separator + fmt.Print("+") + for _, width := range colWidths { + fmt.Print(strings.Repeat("-", width+2) + "+") + } + fmt.Println() +} + +func showHelp() { + fmt.Println(` +SeaweedFS SQL Interface Help: + +Available Commands: + SHOW DATABASES; - List all MQ namespaces + SHOW TABLES; - List all topics in current namespace + SHOW TABLES FROM database; - List topics in specific namespace + DESCRIBE table_name; - Show table schema (coming soon) + + SELECT * FROM table_name; - Query table data (coming soon) + CREATE TABLE ...; - Create new topic (coming soon) + ALTER TABLE ...; - Modify topic schema (coming soon) + DROP TABLE table_name; - Delete topic (coming soon) + +Special Commands: + help; - Show this help + exit; or quit; or \q - Exit the SQL interface + +Notes: +- MQ namespaces appear as SQL databases +- MQ topics appear as SQL tables +- All queries must end with semicolon (;) +- Multi-line queries are supported + +Current Status: Basic metadata operations implemented +`) +} diff --git a/weed/query/engine/catalog.go b/weed/query/engine/catalog.go new file mode 100644 index 000000000..b3a343435 --- /dev/null +++ b/weed/query/engine/catalog.go @@ -0,0 +1,237 @@ +package engine + +import ( + "fmt" + "sync" + + "github.com/seaweedfs/seaweedfs/weed/mq/schema" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +// SchemaCatalog manages the mapping between MQ topics and SQL tables +// Assumptions: +// 1. Each MQ namespace corresponds to a SQL database +// 2. Each MQ topic corresponds to a SQL table +// 3. Topic schemas are cached for performance +// 4. Schema evolution is tracked via RevisionId +type SchemaCatalog struct { + mu sync.RWMutex + + // databases maps namespace names to database metadata + // Assumption: Namespace names are valid SQL database identifiers + databases map[string]*DatabaseInfo + + // currentDatabase tracks the active database context (for USE database) + // Assumption: Single-threaded usage per SQL session + currentDatabase string +} + +// DatabaseInfo represents a SQL database (MQ namespace) +type DatabaseInfo struct { + Name string + Tables map[string]*TableInfo +} + +// TableInfo represents a SQL table (MQ topic) with schema information +// Assumptions: +// 1. All topic messages conform to the same schema within a revision +// 2. Schema evolution maintains backward compatibility +// 3. Primary key is implicitly the message timestamp/offset +type TableInfo struct { + Name string + Namespace string + Schema *schema.Schema + Columns []ColumnInfo + RevisionId uint32 +} + +// ColumnInfo represents a SQL column (MQ schema field) +type ColumnInfo struct { + Name string + Type string // SQL type representation + Nullable bool // Assumption: MQ fields are nullable by default +} + +// NewSchemaCatalog creates a new schema catalog +// Assumption: Catalog starts empty and is populated on-demand +func NewSchemaCatalog() *SchemaCatalog { + return &SchemaCatalog{ + databases: make(map[string]*DatabaseInfo), + } +} + +// ListDatabases returns all available databases (MQ namespaces) +// Assumption: This would be populated from MQ broker metadata +func (c *SchemaCatalog) ListDatabases() []string { + c.mu.RLock() + defer c.mu.RUnlock() + + databases := make([]string, 0, len(c.databases)) + for name := range c.databases { + databases = append(databases, name) + } + + // TODO: Query actual MQ broker for namespace list + // For now, return sample data for testing + if len(databases) == 0 { + return []string{"default", "analytics", "logs"} + } + + return databases +} + +// ListTables returns all tables in a database (MQ topics in namespace) +func (c *SchemaCatalog) ListTables(database string) ([]string, error) { + c.mu.RLock() + defer c.mu.RUnlock() + + db, exists := c.databases[database] + if !exists { + // TODO: Query MQ broker for actual topics in namespace + // For now, return sample data + switch database { + case "default": + return []string{"user_events", "system_logs"}, nil + case "analytics": + return []string{"page_views", "click_events"}, nil + case "logs": + return []string{"error_logs", "access_logs"}, nil + default: + return nil, fmt.Errorf("database '%s' not found", database) + } + } + + tables := make([]string, 0, len(db.Tables)) + for name := range db.Tables { + tables = append(tables, name) + } + + return tables, nil +} + +// GetTableInfo returns detailed schema information for a table +// Assumption: Table exists and schema is accessible +func (c *SchemaCatalog) GetTableInfo(database, table string) (*TableInfo, error) { + c.mu.RLock() + defer c.mu.RUnlock() + + db, exists := c.databases[database] + if !exists { + return nil, fmt.Errorf("database '%s' not found", database) + } + + tableInfo, exists := db.Tables[table] + if !exists { + return nil, fmt.Errorf("table '%s' not found in database '%s'", table, database) + } + + return tableInfo, nil +} + +// RegisterTopic adds or updates a topic's schema information in the catalog +// Assumption: This is called when topics are created or schemas are modified +func (c *SchemaCatalog) RegisterTopic(namespace, topicName string, mqSchema *schema.Schema) error { + c.mu.Lock() + defer c.mu.Unlock() + + // Ensure database exists + db, exists := c.databases[namespace] + if !exists { + db = &DatabaseInfo{ + Name: namespace, + Tables: make(map[string]*TableInfo), + } + c.databases[namespace] = db + } + + // Convert MQ schema to SQL table info + tableInfo, err := c.convertMQSchemaToTableInfo(namespace, topicName, mqSchema) + if err != nil { + return fmt.Errorf("failed to convert MQ schema: %v", err) + } + + db.Tables[topicName] = tableInfo + return nil +} + +// convertMQSchemaToTableInfo converts MQ schema to SQL table information +// Assumptions: +// 1. MQ scalar types map directly to SQL types +// 2. Complex types (arrays, maps) are serialized as JSON strings +// 3. All fields are nullable unless specifically marked otherwise +func (c *SchemaCatalog) convertMQSchemaToTableInfo(namespace, topicName string, mqSchema *schema.Schema) (*TableInfo, error) { + columns := make([]ColumnInfo, len(mqSchema.RecordType.Fields)) + + for i, field := range mqSchema.RecordType.Fields { + sqlType, err := c.convertMQFieldTypeToSQL(field.Type) + if err != nil { + return nil, fmt.Errorf("unsupported field type for '%s': %v", field.Name, err) + } + + columns[i] = ColumnInfo{ + Name: field.Name, + Type: sqlType, + Nullable: true, // Assumption: MQ fields are nullable by default + } + } + + return &TableInfo{ + Name: topicName, + Namespace: namespace, + Schema: mqSchema, + Columns: columns, + RevisionId: mqSchema.RevisionId, + }, nil +} + +// convertMQFieldTypeToSQL maps MQ field types to SQL types +// Assumption: Standard SQL type mappings with MySQL compatibility +func (c *SchemaCatalog) convertMQFieldTypeToSQL(fieldType *schema_pb.Type) (string, error) { + switch t := fieldType.Kind.(type) { + case *schema_pb.Type_ScalarType: + switch t.ScalarType { + case schema_pb.ScalarType_BOOL: + return "BOOLEAN", nil + case schema_pb.ScalarType_INT32: + return "INT", nil + case schema_pb.ScalarType_INT64: + return "BIGINT", nil + case schema_pb.ScalarType_FLOAT: + return "FLOAT", nil + case schema_pb.ScalarType_DOUBLE: + return "DOUBLE", nil + case schema_pb.ScalarType_BYTES: + return "VARBINARY", nil + case schema_pb.ScalarType_STRING: + return "VARCHAR(255)", nil // Assumption: Default string length + default: + return "", fmt.Errorf("unsupported scalar type: %v", t.ScalarType) + } + case *schema_pb.Type_ListType: + // Assumption: Lists are serialized as JSON strings in SQL + return "TEXT", nil + case *schema_pb.Type_RecordType: + // Assumption: Nested records are serialized as JSON strings + return "TEXT", nil + default: + return "", fmt.Errorf("unsupported field type: %T", t) + } +} + +// SetCurrentDatabase sets the active database context +// Assumption: Used for implementing "USE database" functionality +func (c *SchemaCatalog) SetCurrentDatabase(database string) error { + c.mu.Lock() + defer c.mu.Unlock() + + // TODO: Validate database exists in MQ broker + c.currentDatabase = database + return nil +} + +// GetCurrentDatabase returns the currently active database +func (c *SchemaCatalog) GetCurrentDatabase() string { + c.mu.RLock() + defer c.mu.RUnlock() + return c.currentDatabase +} diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go new file mode 100644 index 000000000..38de4a560 --- /dev/null +++ b/weed/query/engine/engine.go @@ -0,0 +1,179 @@ +package engine + +import ( + "context" + "fmt" + "strings" + + "github.com/seaweedfs/seaweedfs/weed/query/sqltypes" + "github.com/xwb1989/sqlparser" +) + +// SQLEngine provides SQL query execution capabilities for SeaweedFS +// Assumptions: +// 1. MQ namespaces map directly to SQL databases +// 2. MQ topics map directly to SQL tables +// 3. Schema evolution is handled transparently with backward compatibility +// 4. Queries run against Parquet-stored MQ messages +type SQLEngine struct { + catalog *SchemaCatalog +} + +// QueryResult represents the result of a SQL query execution +type QueryResult struct { + Columns []string `json:"columns"` + Rows [][]sqltypes.Value `json:"rows"` + Error error `json:"error,omitempty"` +} + +// NewSQLEngine creates a new SQL execution engine +// Assumption: Schema catalog is initialized with current MQ state +func NewSQLEngine() *SQLEngine { + return &SQLEngine{ + catalog: NewSchemaCatalog(), + } +} + +// ExecuteSQL parses and executes a SQL statement +// Assumptions: +// 1. All SQL statements are MySQL-compatible via xwb1989/sqlparser +// 2. DDL operations (CREATE/ALTER/DROP) modify underlying MQ topics +// 3. DML operations (SELECT) query Parquet files directly +// 4. Error handling follows MySQL conventions +func (e *SQLEngine) ExecuteSQL(ctx context.Context, sql string) (*QueryResult, error) { + // Parse the SQL statement + stmt, err := sqlparser.Parse(sql) + if err != nil { + return &QueryResult{ + Error: fmt.Errorf("SQL parse error: %v", err), + }, err + } + + // Route to appropriate handler based on statement type + switch stmt := stmt.(type) { + case *sqlparser.Show: + return e.executeShowStatement(ctx, stmt) + case *sqlparser.DDL: + return e.executeDDLStatement(ctx, stmt) + case *sqlparser.Select: + return e.executeSelectStatement(ctx, stmt) + default: + err := fmt.Errorf("unsupported SQL statement type: %T", stmt) + return &QueryResult{Error: err}, err + } +} + +// executeShowStatement handles SHOW commands (DATABASES, TABLES, etc.) +// Assumption: These map directly to MQ namespace/topic metadata +func (e *SQLEngine) executeShowStatement(ctx context.Context, stmt *sqlparser.Show) (*QueryResult, error) { + switch strings.ToUpper(stmt.Type) { + case "DATABASES": + return e.showDatabases(ctx) + case "TABLES": + // TODO: Parse FROM clause properly for database specification + return e.showTables(ctx, "") + default: + err := fmt.Errorf("unsupported SHOW statement: %s", stmt.Type) + return &QueryResult{Error: err}, err + } +} + +// executeDDLStatement handles CREATE, ALTER, DROP operations +// Assumption: These operations modify the underlying MQ topic structure +func (e *SQLEngine) executeDDLStatement(ctx context.Context, stmt *sqlparser.DDL) (*QueryResult, error) { + switch stmt.Action { + case sqlparser.CreateStr: + return e.createTable(ctx, stmt) + case sqlparser.AlterStr: + return e.alterTable(ctx, stmt) + case sqlparser.DropStr: + return e.dropTable(ctx, stmt) + default: + err := fmt.Errorf("unsupported DDL action: %s", stmt.Action) + return &QueryResult{Error: err}, err + } +} + +// executeSelectStatement handles SELECT queries +// Assumptions: +// 1. Queries run against Parquet files in MQ topics +// 2. Predicate pushdown is used for efficiency +// 3. Cross-topic joins are supported via partition-aware execution +func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *sqlparser.Select) (*QueryResult, error) { + // TODO: Implement SELECT query execution + // This will involve: + // 1. Query planning and optimization + // 2. Parquet file scanning with predicate pushdown + // 3. Result set construction + // 4. Streaming for large results + + err := fmt.Errorf("SELECT statement execution not yet implemented") + return &QueryResult{Error: err}, err +} + +// Helper methods for specific operations + +func (e *SQLEngine) showDatabases(ctx context.Context) (*QueryResult, error) { + databases := e.catalog.ListDatabases() + + result := &QueryResult{ + Columns: []string{"Database"}, + Rows: make([][]sqltypes.Value, len(databases)), + } + + for i, db := range databases { + result.Rows[i] = []sqltypes.Value{ + sqltypes.NewVarChar(db), + } + } + + return result, nil +} + +func (e *SQLEngine) showTables(ctx context.Context, dbName string) (*QueryResult, error) { + // Assumption: If no database specified, use default or return error + if dbName == "" { + // TODO: Implement default database context + // For now, use 'default' as the default database + dbName = "default" + } + + tables, err := e.catalog.ListTables(dbName) + if err != nil { + return &QueryResult{Error: err}, err + } + + result := &QueryResult{ + Columns: []string{"Tables_in_" + dbName}, + Rows: make([][]sqltypes.Value, len(tables)), + } + + for i, table := range tables { + result.Rows[i] = []sqltypes.Value{ + sqltypes.NewVarChar(table), + } + } + + return result, nil +} + +func (e *SQLEngine) createTable(ctx context.Context, stmt *sqlparser.DDL) (*QueryResult, error) { + // TODO: Implement table creation + // This will create a new MQ topic with the specified schema + err := fmt.Errorf("CREATE TABLE not yet implemented") + return &QueryResult{Error: err}, err +} + +func (e *SQLEngine) alterTable(ctx context.Context, stmt *sqlparser.DDL) (*QueryResult, error) { + // TODO: Implement table alteration + // This will modify the MQ topic schema with versioning + err := fmt.Errorf("ALTER TABLE not yet implemented") + return &QueryResult{Error: err}, err +} + +func (e *SQLEngine) dropTable(ctx context.Context, stmt *sqlparser.DDL) (*QueryResult, error) { + // TODO: Implement table dropping + // This will delete the MQ topic + err := fmt.Errorf("DROP TABLE not yet implemented") + return &QueryResult{Error: err}, err +} diff --git a/weed/query/engine/engine_test.go b/weed/query/engine/engine_test.go new file mode 100644 index 000000000..40ec9f302 --- /dev/null +++ b/weed/query/engine/engine_test.go @@ -0,0 +1,95 @@ +package engine + +import ( + "context" + "testing" +) + +func TestSQLEngine_ShowDatabases(t *testing.T) { + engine := NewSQLEngine() + + result, err := engine.ExecuteSQL(context.Background(), "SHOW DATABASES") + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + + if result.Error != nil { + t.Fatalf("Expected no query error, got %v", result.Error) + } + + if len(result.Columns) != 1 || result.Columns[0] != "Database" { + t.Errorf("Expected column 'Database', got %v", result.Columns) + } + + if len(result.Rows) == 0 { + t.Error("Expected at least one database, got none") + } + + // Should have sample databases: default, analytics, logs + expectedDatabases := map[string]bool{ + "default": false, "analytics": false, "logs": false, + } + + for _, row := range result.Rows { + if len(row) > 0 { + dbName := row[0].ToString() + if _, exists := expectedDatabases[dbName]; exists { + expectedDatabases[dbName] = true + } + } + } + + for db, found := range expectedDatabases { + if !found { + t.Errorf("Expected to find database '%s'", db) + } + } +} + +func TestSQLEngine_ShowTables(t *testing.T) { + engine := NewSQLEngine() + + result, err := engine.ExecuteSQL(context.Background(), "SHOW TABLES") + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + + if result.Error != nil { + t.Fatalf("Expected no query error, got %v", result.Error) + } + + if len(result.Columns) != 1 || result.Columns[0] != "Tables_in_default" { + t.Errorf("Expected column 'Tables_in_default', got %v", result.Columns) + } + + if len(result.Rows) == 0 { + t.Error("Expected at least one table, got none") + } +} + +func TestSQLEngine_ParseError(t *testing.T) { + engine := NewSQLEngine() + + result, err := engine.ExecuteSQL(context.Background(), "INVALID SQL") + if err == nil { + t.Error("Expected parse error for invalid SQL") + } + + if result.Error == nil { + t.Error("Expected result error for invalid SQL") + } +} + +func TestSQLEngine_UnsupportedStatement(t *testing.T) { + engine := NewSQLEngine() + + // INSERT is not yet implemented + result, err := engine.ExecuteSQL(context.Background(), "INSERT INTO test VALUES (1)") + if err == nil { + t.Error("Expected error for unsupported statement") + } + + if result.Error == nil { + t.Error("Expected result error for unsupported statement") + } +}