package command import ( "context" "crypto/tls" "fmt" "os" "os/signal" "strings" "syscall" "time" "github.com/seaweedfs/seaweedfs/weed/server/postgres" "github.com/seaweedfs/seaweedfs/weed/util" ) var ( dbOptions DBOptions ) type DBOptions struct { host *string port *int masterAddr *string authMethod *string users *string database *string maxConns *int idleTimeout *string tlsCert *string tlsKey *string } func init() { cmdDB.Run = runDB // break init cycle dbOptions.host = cmdDB.Flag.String("host", "localhost", "Database server host") dbOptions.port = cmdDB.Flag.Int("port", 5432, "Database server port") dbOptions.masterAddr = cmdDB.Flag.String("master", "localhost:9333", "SeaweedFS master server address") dbOptions.authMethod = cmdDB.Flag.String("auth", "trust", "Authentication method: trust, password, md5") dbOptions.users = cmdDB.Flag.String("users", "", "User credentials for auth (format: user1:pass1,user2:pass2)") dbOptions.database = cmdDB.Flag.String("database", "default", "Default database name") dbOptions.maxConns = cmdDB.Flag.Int("max-connections", 100, "Maximum concurrent connections") dbOptions.idleTimeout = cmdDB.Flag.String("idle-timeout", "1h", "Connection idle timeout") dbOptions.tlsCert = cmdDB.Flag.String("tls-cert", "", "TLS certificate file path") dbOptions.tlsKey = cmdDB.Flag.String("tls-key", "", "TLS private key file path") } var cmdDB = &Command{ UsageLine: "db -port=5432 -master=", Short: "start a PostgreSQL-compatible database server for SQL queries", Long: `Start a PostgreSQL wire protocol compatible database server that provides SQL query access to SeaweedFS. This database server enables any PostgreSQL client, tool, or application to connect to SeaweedFS and execute SQL queries against MQ topics. It implements the PostgreSQL wire protocol for maximum compatibility with the existing PostgreSQL ecosystem. Examples: # Start database server on default port 5432 weed db # Start with password authentication weed db -auth=password -users="admin:secret,readonly:view123" # Start with MD5 authentication weed db -auth=md5 -users="user1:pass1,user2:pass2" # Start with custom port and master weed db -port=5433 -master=master1:9333 # Allow connections from any host weed db -host=0.0.0.0 -port=5432 # Start with TLS encryption weed db -tls-cert=server.crt -tls-key=server.key Client Connection Examples: # psql command line client psql "host=localhost port=5432 dbname=default user=seaweedfs" psql -h localhost -p 5432 -U seaweedfs -d default # With password PGPASSWORD=secret psql -h localhost -p 5432 -U admin -d default # Connection string psql "postgresql://admin:secret@localhost:5432/default" Programming Language Examples: # Python (psycopg2) import psycopg2 conn = psycopg2.connect( host="localhost", port=5432, user="seaweedfs", database="default" ) # Java JDBC String url = "jdbc:postgresql://localhost:5432/default"; Connection conn = DriverManager.getConnection(url, "seaweedfs", ""); # Go (lib/pq) db, err := sql.Open("postgres", "host=localhost port=5432 user=seaweedfs dbname=default sslmode=disable") # Node.js (pg) const client = new Client({ host: 'localhost', port: 5432, user: 'seaweedfs', database: 'default' }); Supported SQL Operations: - SELECT queries on MQ topics - DESCRIBE/DESC table_name commands - EXPLAIN query execution plans - SHOW DATABASES/TABLES commands - Aggregation functions (COUNT, SUM, AVG, MIN, MAX) - WHERE clauses with filtering - System columns (_timestamp_ns, _key, _source) - PostgreSQL system queries (version(), current_database(), etc.) - psql meta-commands (\d, \dt, \l, etc.) Authentication Methods: - trust: No authentication required (default) - password: Clear text password authentication - md5: MD5 password authentication (more secure) Compatible Tools: - psql (PostgreSQL command line client) - pgAdmin (PostgreSQL admin tool) - DBeaver (universal database tool) - DataGrip (JetBrains database IDE) - Grafana (PostgreSQL data source) - Superset (PostgreSQL connector) - Tableau (PostgreSQL native connector) - Any PostgreSQL JDBC/ODBC compatible tool Security Features: - Multiple authentication methods - TLS encryption support - User access control - Connection limits - Read-only access (no data modification) Performance Features: - Connection pooling - Configurable connection limits - Idle connection timeout - Efficient wire protocol - Query result streaming `, } func runDB(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) // Validate options if *dbOptions.masterAddr == "" { fmt.Fprintf(os.Stderr, "Error: master address is required\n") return false } // Parse authentication method authMethod, err := parseAuthMethod(*dbOptions.authMethod) if err != nil { fmt.Fprintf(os.Stderr, "Error: %v\n", err) return false } // Parse user credentials users, err := parseUsers(*dbOptions.users, authMethod) if err != nil { fmt.Fprintf(os.Stderr, "Error: %v\n", err) return false } // Parse idle timeout idleTimeout, err := time.ParseDuration(*dbOptions.idleTimeout) if err != nil { fmt.Fprintf(os.Stderr, "Error parsing idle timeout: %v\n", err) return false } // Validate port number if err := validatePortNumber(*dbOptions.port); err != nil { fmt.Fprintf(os.Stderr, "Error: %v\n", err) return false } // Setup TLS if requested var tlsConfig *tls.Config if *dbOptions.tlsCert != "" && *dbOptions.tlsKey != "" { cert, err := tls.LoadX509KeyPair(*dbOptions.tlsCert, *dbOptions.tlsKey) if err != nil { fmt.Fprintf(os.Stderr, "Error loading TLS certificates: %v\n", err) return false } tlsConfig = &tls.Config{ Certificates: []tls.Certificate{cert}, } } // Create server configuration config := &postgres.PostgreSQLServerConfig{ Host: *dbOptions.host, Port: *dbOptions.port, AuthMethod: authMethod, Users: users, Database: *dbOptions.database, MaxConns: *dbOptions.maxConns, IdleTimeout: idleTimeout, TLSConfig: tlsConfig, } // Create database server dbServer, err := postgres.NewPostgreSQLServer(config, *dbOptions.masterAddr) if err != nil { fmt.Fprintf(os.Stderr, "Error creating database server: %v\n", err) return false } // Print startup information fmt.Printf("Starting SeaweedFS Database Server...\n") fmt.Printf("Host: %s\n", *dbOptions.host) fmt.Printf("Port: %d\n", *dbOptions.port) fmt.Printf("Master: %s\n", *dbOptions.masterAddr) fmt.Printf("Database: %s\n", *dbOptions.database) fmt.Printf("Auth Method: %s\n", *dbOptions.authMethod) fmt.Printf("Max Connections: %d\n", *dbOptions.maxConns) fmt.Printf("Idle Timeout: %s\n", *dbOptions.idleTimeout) if tlsConfig != nil { fmt.Printf("TLS: Enabled\n") } else { fmt.Printf("TLS: Disabled\n") } if len(users) > 0 { fmt.Printf("Users: %d configured\n", len(users)) } fmt.Printf("\nDatabase Connection Examples:\n") fmt.Printf(" psql -h %s -p %d -U seaweedfs -d %s\n", *dbOptions.host, *dbOptions.port, *dbOptions.database) if len(users) > 0 { // Show first user as example for username := range users { fmt.Printf(" psql -h %s -p %d -U %s -d %s\n", *dbOptions.host, *dbOptions.port, username, *dbOptions.database) break } } fmt.Printf(" postgresql://%s:%d/%s\n", *dbOptions.host, *dbOptions.port, *dbOptions.database) fmt.Printf("\nSupported Operations:\n") fmt.Printf(" - SELECT queries on MQ topics\n") fmt.Printf(" - DESCRIBE/DESC table_name\n") fmt.Printf(" - EXPLAIN query execution plans\n") fmt.Printf(" - SHOW DATABASES/TABLES\n") fmt.Printf(" - Aggregations: COUNT, SUM, AVG, MIN, MAX\n") fmt.Printf(" - System columns: _timestamp_ns, _key, _source\n") fmt.Printf(" - psql commands: \\d, \\dt, \\l, \\q\n") fmt.Printf("\nReady for database connections!\n\n") // Start the server err = dbServer.Start() if err != nil { fmt.Fprintf(os.Stderr, "Error starting database server: %v\n", err) return false } // Set up signal handling for graceful shutdown sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) // Wait for shutdown signal <-sigChan fmt.Printf("\nReceived shutdown signal, stopping database server...\n") // Create context with timeout for graceful shutdown ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() // Stop the server with timeout done := make(chan error, 1) go func() { done <- dbServer.Stop() }() select { case err := <-done: if err != nil { fmt.Fprintf(os.Stderr, "Error stopping database server: %v\n", err) return false } fmt.Printf("Database server stopped successfully\n") case <-ctx.Done(): fmt.Fprintf(os.Stderr, "Timeout waiting for database server to stop\n") return false } return true } // parseAuthMethod parses the authentication method string func parseAuthMethod(method string) (postgres.AuthMethod, error) { switch strings.ToLower(method) { case "trust": return postgres.AuthTrust, nil case "password": return postgres.AuthPassword, nil case "md5": return postgres.AuthMD5, nil default: return postgres.AuthTrust, fmt.Errorf("unsupported auth method '%s'. Supported: trust, password, md5", method) } } // parseUsers parses the user credentials string func parseUsers(usersStr string, authMethod postgres.AuthMethod) (map[string]string, error) { users := make(map[string]string) if usersStr == "" { // No users specified if authMethod != postgres.AuthTrust { return nil, fmt.Errorf("users must be specified when auth method is not 'trust'") } return users, nil } // Parse user:password pairs pairs := strings.Split(usersStr, ",") for _, pair := range pairs { pair = strings.TrimSpace(pair) if pair == "" { continue } parts := strings.SplitN(pair, ":", 2) if len(parts) != 2 { return nil, fmt.Errorf("invalid user format '%s'. Expected 'username:password'", pair) } username := strings.TrimSpace(parts[0]) password := strings.TrimSpace(parts[1]) if username == "" { return nil, fmt.Errorf("empty username in user specification") } if authMethod != postgres.AuthTrust && password == "" { return nil, fmt.Errorf("empty password for user '%s' with auth method", username) } users[username] = password } return users, nil } // validatePortNumber validates that the port number is reasonable func validatePortNumber(port int) error { if port < 1 || port > 65535 { return fmt.Errorf("port number must be between 1 and 65535, got %d", port) } if port < 1024 { fmt.Fprintf(os.Stderr, "Warning: port number %d may require root privileges\n", port) } return nil }