mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-09-21 01:57:58 +08:00
setup integration test for postgres
This commit is contained in:
@@ -84,19 +84,24 @@ func (s *PostgreSQLServer) handleSimpleQuery(session *PostgreSQLSession, query s
|
||||
}
|
||||
}
|
||||
|
||||
// Clean query by removing trailing semicolons and whitespace early
|
||||
cleanQuery := strings.TrimSpace(query)
|
||||
cleanQuery = strings.TrimSuffix(cleanQuery, ";")
|
||||
cleanQuery = strings.TrimSpace(cleanQuery)
|
||||
|
||||
// Set database context in SQL engine if session database is different from current
|
||||
if session.database != "" && session.database != s.sqlEngine.GetCatalog().GetCurrentDatabase() {
|
||||
s.sqlEngine.GetCatalog().SetCurrentDatabase(session.database)
|
||||
}
|
||||
|
||||
// Handle PostgreSQL-specific system queries
|
||||
if postgresQuery := s.translatePostgreSQLSystemQuery(query); postgresQuery != "" {
|
||||
query = postgresQuery
|
||||
// Handle PostgreSQL-specific system queries directly
|
||||
if systemResult := s.handleSystemQuery(session, cleanQuery); systemResult != nil {
|
||||
return s.sendSystemQueryResult(session, systemResult, cleanQuery)
|
||||
}
|
||||
|
||||
// Execute using SQL engine directly
|
||||
ctx := context.Background()
|
||||
result, err := s.sqlEngine.ExecuteSQL(ctx, query)
|
||||
result, err := s.sqlEngine.ExecuteSQL(ctx, cleanQuery)
|
||||
if err != nil {
|
||||
return s.sendError(session, "42000", err.Error())
|
||||
}
|
||||
@@ -133,9 +138,14 @@ func (s *PostgreSQLServer) handleSimpleQuery(session *PostgreSQLSession, query s
|
||||
return s.sendReadyForQuery(session)
|
||||
}
|
||||
|
||||
// translatePostgreSQLSystemQuery translates essential PostgreSQL system queries
|
||||
// Only handles queries that PostgreSQL clients expect but SeaweedFS SQL engine doesn't natively support
|
||||
func (s *PostgreSQLServer) translatePostgreSQLSystemQuery(query string) string {
|
||||
// SystemQueryResult represents the result of a system query
|
||||
type SystemQueryResult struct {
|
||||
Columns []string
|
||||
Rows [][]string
|
||||
}
|
||||
|
||||
// handleSystemQuery handles PostgreSQL system queries directly
|
||||
func (s *PostgreSQLServer) handleSystemQuery(session *PostgreSQLSession, query string) *SystemQueryResult {
|
||||
// Trim and normalize query
|
||||
query = strings.TrimSpace(query)
|
||||
query = strings.TrimSuffix(query, ";")
|
||||
@@ -144,44 +154,109 @@ func (s *PostgreSQLServer) translatePostgreSQLSystemQuery(query string) string {
|
||||
// Handle essential PostgreSQL system queries
|
||||
switch queryLower {
|
||||
case "select version()":
|
||||
return "SELECT 'SeaweedFS 1.0 (PostgreSQL 14.0 compatible)' as version"
|
||||
return &SystemQueryResult{
|
||||
Columns: []string{"version"},
|
||||
Rows: [][]string{{"SeaweedFS 1.0 (PostgreSQL 14.0 compatible)"}},
|
||||
}
|
||||
case "select current_database()":
|
||||
return "SELECT '" + s.config.Database + "' as current_database"
|
||||
return &SystemQueryResult{
|
||||
Columns: []string{"current_database"},
|
||||
Rows: [][]string{{s.config.Database}},
|
||||
}
|
||||
case "select current_user":
|
||||
return "SELECT 'seaweedfs' as current_user"
|
||||
return &SystemQueryResult{
|
||||
Columns: []string{"current_user"},
|
||||
Rows: [][]string{{"seaweedfs"}},
|
||||
}
|
||||
case "select current_setting('server_version')":
|
||||
return "SELECT '14.0' as server_version"
|
||||
return &SystemQueryResult{
|
||||
Columns: []string{"server_version"},
|
||||
Rows: [][]string{{"14.0"}},
|
||||
}
|
||||
case "select current_setting('server_encoding')":
|
||||
return "SELECT 'UTF8' as server_encoding"
|
||||
return &SystemQueryResult{
|
||||
Columns: []string{"server_encoding"},
|
||||
Rows: [][]string{{"UTF8"}},
|
||||
}
|
||||
case "select current_setting('client_encoding')":
|
||||
return "SELECT 'UTF8' as client_encoding"
|
||||
}
|
||||
|
||||
// Handle pg_* catalog queries by mapping to equivalent SHOW commands
|
||||
if strings.Contains(queryLower, "pg_tables") || strings.Contains(queryLower, "information_schema.tables") {
|
||||
return "SHOW TABLES"
|
||||
}
|
||||
if strings.Contains(queryLower, "pg_database") || strings.Contains(queryLower, "information_schema.schemata") {
|
||||
return "SHOW DATABASES"
|
||||
return &SystemQueryResult{
|
||||
Columns: []string{"client_encoding"},
|
||||
Rows: [][]string{{"UTF8"}},
|
||||
}
|
||||
}
|
||||
|
||||
// Handle transaction commands (no-op for read-only)
|
||||
switch queryLower {
|
||||
case "begin", "start transaction":
|
||||
return "SELECT 'BEGIN' as status"
|
||||
return &SystemQueryResult{
|
||||
Columns: []string{"status"},
|
||||
Rows: [][]string{{"BEGIN"}},
|
||||
}
|
||||
case "commit":
|
||||
return "SELECT 'COMMIT' as status"
|
||||
return &SystemQueryResult{
|
||||
Columns: []string{"status"},
|
||||
Rows: [][]string{{"COMMIT"}},
|
||||
}
|
||||
case "rollback":
|
||||
return "SELECT 'ROLLBACK' as status"
|
||||
return &SystemQueryResult{
|
||||
Columns: []string{"status"},
|
||||
Rows: [][]string{{"ROLLBACK"}},
|
||||
}
|
||||
}
|
||||
|
||||
// If starts with SET, return a no-op
|
||||
if strings.HasPrefix(queryLower, "set ") {
|
||||
return "SELECT 'SET' as status"
|
||||
return &SystemQueryResult{
|
||||
Columns: []string{"status"},
|
||||
Rows: [][]string{{"SET"}},
|
||||
}
|
||||
}
|
||||
|
||||
// Return empty string to use original query (let SQL engine handle it)
|
||||
return ""
|
||||
// Return nil to use SQL engine
|
||||
return nil
|
||||
}
|
||||
|
||||
// sendSystemQueryResult sends the result of a system query
|
||||
func (s *PostgreSQLServer) sendSystemQueryResult(session *PostgreSQLSession, result *SystemQueryResult, query string) error {
|
||||
// Create column descriptions for system query results
|
||||
columns := make([]string, len(result.Columns))
|
||||
for i, col := range result.Columns {
|
||||
columns[i] = col
|
||||
}
|
||||
|
||||
// Convert to sqltypes.Value format
|
||||
var sqlRows [][]sqltypes.Value
|
||||
for _, row := range result.Rows {
|
||||
sqlRow := make([]sqltypes.Value, len(row))
|
||||
for i, cell := range row {
|
||||
sqlRow[i] = sqltypes.NewVarChar(cell)
|
||||
}
|
||||
sqlRows = append(sqlRows, sqlRow)
|
||||
}
|
||||
|
||||
// Send row description
|
||||
err := s.sendRowDescription(session, columns, sqlRows)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Send data rows
|
||||
for _, row := range sqlRows {
|
||||
err = s.sendDataRow(session, row)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Send command complete
|
||||
tag := s.getCommandTag(query, len(result.Rows))
|
||||
err = s.sendCommandComplete(session, tag)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Send ready for query
|
||||
return s.sendReadyForQuery(session)
|
||||
}
|
||||
|
||||
// handleParse processes a Parse message (prepared statement)
|
||||
@@ -316,7 +391,7 @@ func (s *PostgreSQLServer) sendParameterStatus(session *PostgreSQLSession, name,
|
||||
|
||||
// sendBackendKeyData sends backend key data
|
||||
func (s *PostgreSQLServer) sendBackendKeyData(session *PostgreSQLSession) error {
|
||||
msg := make([]byte, 12)
|
||||
msg := make([]byte, 13)
|
||||
msg[0] = PG_RESP_BACKEND_KEY
|
||||
binary.BigEndian.PutUint32(msg[1:5], 12)
|
||||
binary.BigEndian.PutUint32(msg[5:9], session.processID)
|
||||
@@ -331,7 +406,7 @@ func (s *PostgreSQLServer) sendBackendKeyData(session *PostgreSQLSession) error
|
||||
|
||||
// sendReadyForQuery sends ready for query message
|
||||
func (s *PostgreSQLServer) sendReadyForQuery(session *PostgreSQLSession) error {
|
||||
msg := make([]byte, 5)
|
||||
msg := make([]byte, 6)
|
||||
msg[0] = PG_RESP_READY
|
||||
binary.BigEndian.PutUint32(msg[1:5], 5)
|
||||
msg[5] = session.transactionState
|
||||
|
Reference in New Issue
Block a user