diff --git a/.gitignore b/.gitignore index c78184886..044120bcd 100644 --- a/.gitignore +++ b/.gitignore @@ -123,4 +123,3 @@ ADVANCED_IAM_DEVELOPMENT_PLAN.md /test/s3/iam/test-volume-data *.log weed-iam -claude.md diff --git a/test/postgres/producer.go b/test/postgres/producer.go index ec11d5a28..0e19ef258 100644 --- a/test/postgres/producer.go +++ b/test/postgres/producer.go @@ -1,17 +1,25 @@ package main import ( + "context" "encoding/json" "fmt" "log" "math/rand" "os" + "strconv" "strings" "time" + "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client" + "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) type UserEvent struct { @@ -232,17 +240,99 @@ func convertToRecordValue(data interface{}) (*schema_pb.RecordValue, error) { return &schema_pb.RecordValue{Fields: fields}, nil } +// convertHTTPToGRPC converts HTTP address to gRPC address +// Follows SeaweedFS convention: gRPC port = HTTP port + 10000 +func convertHTTPToGRPC(httpAddress string) string { + if strings.Contains(httpAddress, ":") { + parts := strings.Split(httpAddress, ":") + if len(parts) == 2 { + if port, err := strconv.Atoi(parts[1]); err == nil { + return fmt.Sprintf("%s:%d", parts[0], port+10000) + } + } + } + // Fallback: return original address if conversion fails + return httpAddress +} + +// discoverFiler finds a filer from the master server +func discoverFiler(masterHTTPAddress string) (string, error) { + masterGRPCAddress := convertHTTPToGRPC(masterHTTPAddress) + + conn, err := grpc.Dial(masterGRPCAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return "", fmt.Errorf("failed to connect to master at %s: %v", masterGRPCAddress, err) + } + defer conn.Close() + + client := master_pb.NewSeaweedClient(conn) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + resp, err := client.ListClusterNodes(ctx, &master_pb.ListClusterNodesRequest{ + ClientType: cluster.FilerType, + }) + if err != nil { + return "", fmt.Errorf("failed to list filers from master: %v", err) + } + + if len(resp.ClusterNodes) == 0 { + return "", fmt.Errorf("no filers found in cluster") + } + + // Use the first available filer and convert HTTP address to gRPC + filerHTTPAddress := resp.ClusterNodes[0].Address + return convertHTTPToGRPC(filerHTTPAddress), nil +} + +// discoverBroker finds the broker balancer using filer lock mechanism +func discoverBroker(masterHTTPAddress string) (string, error) { + // First discover filer from master + filerAddress, err := discoverFiler(masterHTTPAddress) + if err != nil { + return "", fmt.Errorf("failed to discover filer: %v", err) + } + + conn, err := grpc.Dial(filerAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return "", fmt.Errorf("failed to connect to filer at %s: %v", filerAddress, err) + } + defer conn.Close() + + client := filer_pb.NewSeaweedFilerClient(conn) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + resp, err := client.FindLockOwner(ctx, &filer_pb.FindLockOwnerRequest{ + Name: pub_balancer.LockBrokerBalancer, + }) + if err != nil { + return "", fmt.Errorf("failed to find broker balancer: %v", err) + } + + return resp.Owner, nil +} + func createTopicData(masterAddr, filerAddr, namespace, topicName string, generator func() interface{}, count int) error { // Create schema based on topic type recordType := createSchemaForTopic(topicName) + // Dynamically discover broker address instead of hardcoded port replacement + brokerAddress, err := discoverBroker(masterAddr) + if err != nil { + // Fallback to hardcoded port replacement if discovery fails + log.Printf("Warning: Failed to discover broker dynamically (%v), using hardcoded port replacement", err) + brokerAddress = strings.Replace(masterAddr, ":9333", ":17777", 1) + } + // Create publisher configuration config := &pub_client.PublisherConfiguration{ Topic: topic.NewTopic(namespace, topicName), PartitionCount: 1, - Brokers: []string{strings.Replace(masterAddr, ":9333", ":17777", 1)}, // Use broker port + Brokers: []string{brokerAddress}, // Use dynamically discovered broker address PublisherName: fmt.Sprintf("test-producer-%s-%s", namespace, topicName), RecordType: recordType, // Use structured schema } diff --git a/weed/command/db.go b/weed/command/db.go index 71c46af4a..efa278c57 100644 --- a/weed/command/db.go +++ b/weed/command/db.go @@ -6,7 +6,6 @@ import ( "fmt" "os" "os/signal" - "strconv" "strings" "syscall" "time" @@ -368,21 +367,3 @@ func validatePortNumber(port int) error { } return nil } - -// parseConnectionLimit parses and validates the connection limit -func parseConnectionLimit(limitStr string) (int, error) { - limit, err := strconv.Atoi(limitStr) - if err != nil { - return 0, fmt.Errorf("invalid connection limit '%s': %v", limitStr, err) - } - - if limit < 1 { - return 0, fmt.Errorf("connection limit must be at least 1, got %d", limit) - } - - if limit > 10000 { - return 0, fmt.Errorf("connection limit too high (%d), maximum is 10000", limit) - } - - return limit, nil -} diff --git a/weed/mq/broker/broker_write.go b/weed/mq/broker/broker_write.go index 71399de78..2711f056b 100644 --- a/weed/mq/broker/broker_write.go +++ b/weed/mq/broker/broker_write.go @@ -8,6 +8,7 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" @@ -71,8 +72,8 @@ func (b *MessageQueueBroker) appendToFileWithBufferIndex(targetFile string, data if bufferIndex != expectedIndex { // This shouldn't happen in normal operation // Log warning but continue (don't crash the system) - fmt.Printf("Warning: non-consecutive buffer index. Expected %d, got %d\n", - expectedIndex, bufferIndex) + glog.Warningf("non-consecutive buffer index for %s. Expected %d, got %d", + fullpath, expectedIndex, bufferIndex) } // Note: We don't update the start index - it stays the same } diff --git a/weed/server/postgres/protocol.go b/weed/server/postgres/protocol.go index 1eb74fead..a110e03af 100644 --- a/weed/server/postgres/protocol.go +++ b/weed/server/postgres/protocol.go @@ -9,11 +9,74 @@ import ( "strings" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/query/engine" "github.com/seaweedfs/seaweedfs/weed/query/sqltypes" "github.com/seaweedfs/seaweedfs/weed/util/version" "github.com/xwb1989/sqlparser" ) +// mapErrorToPostgreSQLCode maps SeaweedFS SQL engine errors to appropriate PostgreSQL error codes +func mapErrorToPostgreSQLCode(err error) string { + if err == nil { + return "00000" // Success + } + + errStr := err.Error() + + // Map specific engine error types + switch e := err.(type) { + case engine.AggregationError: + // Aggregation errors are usually function-related issues + if strings.Contains(e.Error(), "unsupported") { + return "0A000" // Feature not supported + } + return "42703" // Undefined column (column-related aggregation issues) + + case engine.DataSourceError: + // Data source errors could be table/topic not found + if strings.Contains(e.Error(), "not found") || strings.Contains(e.Error(), "topic") { + return "42P01" // Undefined table + } + return "08000" // Connection exception (data source access issues) + + case engine.OptimizationError: + // Optimization failures are usually feature limitations + return "0A000" // Feature not supported + } + + // Map based on error message patterns + errLower := strings.ToLower(errStr) + + // Parsing and syntax errors + if strings.Contains(errLower, "parse error") || strings.Contains(errLower, "syntax") { + return "42601" // Syntax error + } + + // Unsupported features + if strings.Contains(errLower, "unsupported") || strings.Contains(errLower, "not supported") { + return "0A000" // Feature not supported + } + + // Table/topic not found + if strings.Contains(errLower, "not found") || + strings.Contains(errLower, "topic") && strings.Contains(errLower, "available") { + return "42P01" // Undefined table + } + + // Column-related errors + if strings.Contains(errLower, "column") || strings.Contains(errLower, "field") { + return "42703" // Undefined column + } + + // Multi-table or complex query limitations + if strings.Contains(errLower, "single table") || strings.Contains(errLower, "join") { + return "0A000" // Feature not supported + } + + // Default to generic syntax/access error + return "42000" // Syntax error or access rule violation +} + // handleMessage processes a single PostgreSQL protocol message func (s *PostgreSQLServer) handleMessage(session *PostgreSQLSession) error { // Read message type @@ -119,7 +182,8 @@ func (s *PostgreSQLServer) handleSimpleQuery(session *PostgreSQLSession, query s result, err := s.sqlEngine.ExecuteSQL(ctx, cleanQuery) if err != nil { // Send error message but keep connection alive - sendErr := s.sendError(session, "42000", err.Error()) + errorCode := mapErrorToPostgreSQLCode(err) + sendErr := s.sendError(session, errorCode, err.Error()) if sendErr != nil { return sendErr } @@ -129,7 +193,8 @@ func (s *PostgreSQLServer) handleSimpleQuery(session *PostgreSQLSession, query s if result.Error != nil { // Send error message but keep connection alive - sendErr := s.sendError(session, "42000", result.Error.Error()) + errorCode := mapErrorToPostgreSQLCode(result.Error) + sendErr := s.sendError(session, errorCode, result.Error.Error()) if sendErr != nil { return sendErr }