Files
seaweedfs/weed/command/sql.go

228 lines
5.2 KiB
Go
Raw Normal View History

package command
import (
"bufio"
"context"
"fmt"
"os"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/query/engine"
)
func init() {
cmdSql.Run = runSql
}
var cmdSql = &Command{
UsageLine: "sql [-server=localhost:8888]",
Short: "start a SQL query interface for SeaweedFS MQ topics",
Long: `Start an interactive SQL shell to query SeaweedFS Message Queue topics as tables.
Features:
- SHOW DATABASES: List all MQ namespaces
- SHOW TABLES: List topics in current namespace
- DESCRIBE table: Show table schema
- SELECT queries (coming soon)
- CREATE/ALTER/DROP TABLE operations (coming soon)
Assumptions:
- MQ namespaces map to SQL databases
- MQ topics map to SQL tables
- Messages are stored in Parquet format for efficient querying
Examples:
weed sql
weed sql -server=broker1:8888
`,
}
var (
sqlServer = cmdSql.Flag.String("server", "localhost:8888", "SeaweedFS server address")
)
func runSql(command *Command, args []string) bool {
fmt.Println("SeaweedFS SQL Interface")
fmt.Println("Type 'help;' for help, 'exit;' to quit")
fmt.Printf("Connected to: %s\n", *sqlServer)
fmt.Println()
// Initialize SQL engine
// Assumption: Engine will connect to MQ broker on demand
feat: Phase 2 - Add DDL operations and real MQ broker integration Implements comprehensive DDL support for MQ topic management: New Components: - Real MQ broker connectivity via BrokerClient - CREATE TABLE → ConfigureTopic gRPC calls - DROP TABLE → DeleteTopic operations - DESCRIBE table → Schema introspection - SQL type mapping (SQL ↔ MQ schema types) Enhanced Features: - Live topic discovery from MQ broker - Fallback to cached/sample data when broker unavailable - MySQL-compatible DESCRIBE output - Schema validation and error handling - CREATE TABLE with column definitions Key Infrastructure: - broker_client.go: gRPC communication with MQ broker - sql_types.go: Bidirectional SQL/MQ type conversion - describe.go: Table schema introspection - Enhanced engine.go: Full DDL routing and execution Supported SQL Operations: ✅ SHOW DATABASES, SHOW TABLES (live + fallback) ✅ CREATE TABLE table_name (col1 INT, col2 VARCHAR(50), ...) ✅ DROP TABLE table_name ✅ DESCRIBE table_name / SHOW COLUMNS FROM table_name Known Limitations: - SQL parser issues with reserved keywords (e.g., 'timestamp') - Requires running MQ broker for full functionality - ALTER TABLE not yet implemented - DeleteTopic method needs broker-side implementation Architecture Decisions: - Broker discovery via filer lock mechanism (same as shell commands) - Graceful fallback when broker unavailable - ConfigureTopic for CREATE TABLE with 6 default partitions - Schema versioning ready for ALTER TABLE support Testing: - Unit tests updated with filer address parameter - Integration tests for DDL operations - Error handling for connection failures Next Phase: SELECT query execution with Parquet scanning
2025-08-31 21:01:23 -07:00
sqlEngine := engine.NewSQLEngine(*sqlServer)
// Interactive shell loop
scanner := bufio.NewScanner(os.Stdin)
var queryBuffer strings.Builder
for {
// Show prompt
if queryBuffer.Len() == 0 {
fmt.Print("seaweedfs> ")
} else {
fmt.Print(" -> ") // Continuation prompt
}
// Read line
if !scanner.Scan() {
break
}
line := strings.TrimSpace(scanner.Text())
// Handle special commands
if line == "exit;" || line == "quit;" || line == "\\q" {
fmt.Println("Goodbye!")
break
}
if line == "help;" {
showHelp()
continue
}
if line == "" {
continue
}
// Accumulate multi-line queries
queryBuffer.WriteString(line)
queryBuffer.WriteString(" ")
// Execute when query ends with semicolon
if strings.HasSuffix(line, ";") {
query := strings.TrimSpace(queryBuffer.String())
query = strings.TrimSuffix(query, ";") // Remove trailing semicolon
executeQuery(sqlEngine, query)
// Reset buffer for next query
queryBuffer.Reset()
}
}
return true
}
// executeQuery runs a SQL query and displays results
// Assumption: All queries are executed synchronously for simplicity
func executeQuery(engine *engine.SQLEngine, query string) {
startTime := time.Now()
// Execute the query
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
result, err := engine.ExecuteSQL(ctx, query)
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
if result.Error != nil {
fmt.Printf("Query Error: %v\n", result.Error)
return
}
// Display results
displayQueryResult(result)
// Show execution time
elapsed := time.Since(startTime)
fmt.Printf("\n(%d rows in set, %.2f sec)\n\n", len(result.Rows), elapsed.Seconds())
}
// displayQueryResult formats and displays query results in table format
// Assumption: Results fit in terminal width (simple formatting for now)
func displayQueryResult(result *engine.QueryResult) {
if len(result.Columns) == 0 {
fmt.Println("Empty result set")
return
}
// Calculate column widths for formatting
colWidths := make([]int, len(result.Columns))
for i, col := range result.Columns {
colWidths[i] = len(col)
}
// Check data for wider columns
for _, row := range result.Rows {
for i, val := range row {
if i < len(colWidths) {
valStr := val.ToString()
if len(valStr) > colWidths[i] {
colWidths[i] = len(valStr)
}
}
}
}
// Print header separator
fmt.Print("+")
for _, width := range colWidths {
fmt.Print(strings.Repeat("-", width+2) + "+")
}
fmt.Println()
// Print column headers
fmt.Print("|")
for i, col := range result.Columns {
fmt.Printf(" %-*s |", colWidths[i], col)
}
fmt.Println()
// Print separator
fmt.Print("+")
for _, width := range colWidths {
fmt.Print(strings.Repeat("-", width+2) + "+")
}
fmt.Println()
// Print data rows
for _, row := range result.Rows {
fmt.Print("|")
for i, val := range row {
if i < len(colWidths) {
fmt.Printf(" %-*s |", colWidths[i], val.ToString())
}
}
fmt.Println()
}
// Print bottom separator
fmt.Print("+")
for _, width := range colWidths {
fmt.Print(strings.Repeat("-", width+2) + "+")
}
fmt.Println()
}
func showHelp() {
fmt.Println(`SeaweedFS SQL Interface Help:
Available Commands:
SHOW DATABASES; - List all MQ namespaces
SHOW TABLES; - List all topics in current namespace
SHOW TABLES FROM database; - List topics in specific namespace
DESCRIBE table_name; - Show table schema (coming soon)
SELECT * FROM table_name; - Query table data (coming soon)
CREATE TABLE ...; - Create new topic (coming soon)
ALTER TABLE ...; - Modify topic schema (coming soon)
DROP TABLE table_name; - Delete topic (coming soon)
Special Commands:
help; - Show this help
exit; or quit; or \q - Exit the SQL interface
Notes:
- MQ namespaces appear as SQL databases
- MQ topics appear as SQL tables
- All queries must end with semicolon (;)
- Multi-line queries are supported
Current Status: Basic metadata operations implemented`)
}