mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-09-20 02:19:23 +08:00
318 lines
7.9 KiB
Go
318 lines
7.9 KiB
Go
![]() |
package engine
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"strings"
|
||
|
"testing"
|
||
|
)
|
||
|
|
||
|
func TestSQLEngine_HybridSelectBasic(t *testing.T) {
|
||
|
engine := NewSQLEngine("localhost:8888")
|
||
|
|
||
|
// Test SELECT * FROM table (should show both live and archived data)
|
||
|
result, err := engine.ExecuteSQL(context.Background(), "SELECT * FROM user_events")
|
||
|
if err != nil {
|
||
|
t.Fatalf("Expected no error, got %v", err)
|
||
|
}
|
||
|
|
||
|
if result.Error != nil {
|
||
|
t.Fatalf("Expected no query error, got %v", result.Error)
|
||
|
}
|
||
|
|
||
|
if len(result.Columns) == 0 {
|
||
|
t.Error("Expected columns in result")
|
||
|
}
|
||
|
|
||
|
if len(result.Rows) == 0 {
|
||
|
t.Error("Expected rows in result")
|
||
|
}
|
||
|
|
||
|
// Should have both live and archived data (4 sample records)
|
||
|
if len(result.Rows) != 4 {
|
||
|
t.Errorf("Expected 4 rows (2 live + 2 archived), got %d", len(result.Rows))
|
||
|
}
|
||
|
|
||
|
// Check that we have the _source column showing data source
|
||
|
hasSourceColumn := false
|
||
|
sourceColumnIndex := -1
|
||
|
for i, column := range result.Columns {
|
||
|
if column == "_source" {
|
||
|
hasSourceColumn = true
|
||
|
sourceColumnIndex = i
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if !hasSourceColumn {
|
||
|
t.Error("Expected _source column to show data source (live_log vs parquet_archive)")
|
||
|
}
|
||
|
|
||
|
// Verify we have both data sources
|
||
|
if hasSourceColumn && sourceColumnIndex >= 0 {
|
||
|
foundLiveLog := false
|
||
|
foundParquetArchive := false
|
||
|
|
||
|
for _, row := range result.Rows {
|
||
|
if sourceColumnIndex < len(row) {
|
||
|
source := row[sourceColumnIndex].ToString()
|
||
|
if source == "live_log" {
|
||
|
foundLiveLog = true
|
||
|
} else if source == "parquet_archive" {
|
||
|
foundParquetArchive = true
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if !foundLiveLog {
|
||
|
t.Error("Expected to find live_log data source in results")
|
||
|
}
|
||
|
|
||
|
if !foundParquetArchive {
|
||
|
t.Error("Expected to find parquet_archive data source in results")
|
||
|
}
|
||
|
|
||
|
t.Logf("✅ Found both live_log and parquet_archive data sources")
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func TestSQLEngine_HybridSelectWithLimit(t *testing.T) {
|
||
|
engine := NewSQLEngine("localhost:8888")
|
||
|
|
||
|
// Test SELECT with LIMIT on hybrid data
|
||
|
result, err := engine.ExecuteSQL(context.Background(), "SELECT * FROM user_events LIMIT 2")
|
||
|
if err != nil {
|
||
|
t.Fatalf("Expected no error, got %v", err)
|
||
|
}
|
||
|
|
||
|
if result.Error != nil {
|
||
|
t.Fatalf("Expected no query error, got %v", result.Error)
|
||
|
}
|
||
|
|
||
|
// Should have exactly 2 rows due to LIMIT
|
||
|
if len(result.Rows) != 2 {
|
||
|
t.Errorf("Expected 2 rows with LIMIT 2, got %d", len(result.Rows))
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func TestSQLEngine_HybridSelectDifferentTables(t *testing.T) {
|
||
|
engine := NewSQLEngine("localhost:8888")
|
||
|
|
||
|
// Test both user_events and system_logs tables
|
||
|
tables := []string{"user_events", "system_logs"}
|
||
|
|
||
|
for _, tableName := range tables {
|
||
|
result, err := engine.ExecuteSQL(context.Background(), fmt.Sprintf("SELECT * FROM %s", tableName))
|
||
|
if err != nil {
|
||
|
t.Errorf("Error querying hybrid table %s: %v", tableName, err)
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
if result.Error != nil {
|
||
|
t.Errorf("Query error for hybrid table %s: %v", tableName, result.Error)
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
if len(result.Columns) == 0 {
|
||
|
t.Errorf("No columns returned for hybrid table %s", tableName)
|
||
|
}
|
||
|
|
||
|
if len(result.Rows) == 0 {
|
||
|
t.Errorf("No rows returned for hybrid table %s", tableName)
|
||
|
}
|
||
|
|
||
|
// Check for _source column
|
||
|
hasSourceColumn := false
|
||
|
for _, column := range result.Columns {
|
||
|
if column == "_source" {
|
||
|
hasSourceColumn = true
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if !hasSourceColumn {
|
||
|
t.Errorf("Table %s missing _source column for hybrid data", tableName)
|
||
|
}
|
||
|
|
||
|
t.Logf("✅ Table %s: %d columns, %d rows with hybrid data sources", tableName, len(result.Columns), len(result.Rows))
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func TestSQLEngine_HybridDataSource(t *testing.T) {
|
||
|
engine := NewSQLEngine("localhost:8888")
|
||
|
|
||
|
// Test that we can distinguish between live and archived data
|
||
|
result, err := engine.ExecuteSQL(context.Background(), "SELECT user_id, event_type, _source FROM user_events")
|
||
|
if err != nil {
|
||
|
t.Fatalf("Expected no error, got %v", err)
|
||
|
}
|
||
|
|
||
|
if result.Error != nil {
|
||
|
t.Fatalf("Expected no query error, got %v", result.Error)
|
||
|
}
|
||
|
|
||
|
// Find the _source column
|
||
|
sourceColumnIndex := -1
|
||
|
eventTypeColumnIndex := -1
|
||
|
|
||
|
for i, column := range result.Columns {
|
||
|
switch column {
|
||
|
case "_source":
|
||
|
sourceColumnIndex = i
|
||
|
case "event_type":
|
||
|
eventTypeColumnIndex = i
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if sourceColumnIndex == -1 {
|
||
|
t.Fatal("Could not find _source column")
|
||
|
}
|
||
|
|
||
|
if eventTypeColumnIndex == -1 {
|
||
|
t.Fatal("Could not find event_type column")
|
||
|
}
|
||
|
|
||
|
// Check the data characteristics
|
||
|
liveEventFound := false
|
||
|
archivedEventFound := false
|
||
|
|
||
|
for _, row := range result.Rows {
|
||
|
if sourceColumnIndex < len(row) && eventTypeColumnIndex < len(row) {
|
||
|
source := row[sourceColumnIndex].ToString()
|
||
|
eventType := row[eventTypeColumnIndex].ToString()
|
||
|
|
||
|
if source == "live_log" && strings.Contains(eventType, "live_") {
|
||
|
liveEventFound = true
|
||
|
t.Logf("Found live event: %s from %s", eventType, source)
|
||
|
}
|
||
|
|
||
|
if source == "parquet_archive" && strings.Contains(eventType, "archived_") {
|
||
|
archivedEventFound = true
|
||
|
t.Logf("Found archived event: %s from %s", eventType, source)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if !liveEventFound {
|
||
|
t.Error("Expected to find live events with live_ prefix")
|
||
|
}
|
||
|
|
||
|
if !archivedEventFound {
|
||
|
t.Error("Expected to find archived events with archived_ prefix")
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func TestSQLEngine_HybridSystemLogs(t *testing.T) {
|
||
|
engine := NewSQLEngine("localhost:8888")
|
||
|
|
||
|
// Test system_logs with hybrid data
|
||
|
result, err := engine.ExecuteSQL(context.Background(), "SELECT level, message, service, _source FROM system_logs")
|
||
|
if err != nil {
|
||
|
t.Fatalf("Expected no error, got %v", err)
|
||
|
}
|
||
|
|
||
|
if result.Error != nil {
|
||
|
t.Fatalf("Expected no query error, got %v", result.Error)
|
||
|
}
|
||
|
|
||
|
// Should have both live and archived system logs
|
||
|
if len(result.Rows) < 2 {
|
||
|
t.Errorf("Expected at least 2 system log entries, got %d", len(result.Rows))
|
||
|
}
|
||
|
|
||
|
// Find column indices
|
||
|
levelIndex := -1
|
||
|
sourceIndex := -1
|
||
|
|
||
|
for i, column := range result.Columns {
|
||
|
switch column {
|
||
|
case "level":
|
||
|
levelIndex = i
|
||
|
case "_source":
|
||
|
sourceIndex = i
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Verify we have both live and archived system logs
|
||
|
foundLive := false
|
||
|
foundArchived := false
|
||
|
|
||
|
for _, row := range result.Rows {
|
||
|
if sourceIndex >= 0 && sourceIndex < len(row) {
|
||
|
source := row[sourceIndex].ToString()
|
||
|
|
||
|
if source == "live_log" {
|
||
|
foundLive = true
|
||
|
if levelIndex >= 0 && levelIndex < len(row) {
|
||
|
level := row[levelIndex].ToString()
|
||
|
t.Logf("Live system log: level=%s", level)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if source == "parquet_archive" {
|
||
|
foundArchived = true
|
||
|
if levelIndex >= 0 && levelIndex < len(row) {
|
||
|
level := row[levelIndex].ToString()
|
||
|
t.Logf("Archived system log: level=%s", level)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if !foundLive {
|
||
|
t.Error("Expected to find live system logs")
|
||
|
}
|
||
|
|
||
|
if !foundArchived {
|
||
|
t.Error("Expected to find archived system logs")
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func TestSQLEngine_HybridSelectWithTimeImplications(t *testing.T) {
|
||
|
engine := NewSQLEngine("localhost:8888")
|
||
|
|
||
|
// Test that demonstrates the time-based nature of hybrid data
|
||
|
// Live data should be more recent than archived data
|
||
|
result, err := engine.ExecuteSQL(context.Background(), "SELECT event_type, _source FROM user_events")
|
||
|
if err != nil {
|
||
|
t.Fatalf("Expected no error, got %v", err)
|
||
|
}
|
||
|
|
||
|
if result.Error != nil {
|
||
|
t.Fatalf("Expected no query error, got %v", result.Error)
|
||
|
}
|
||
|
|
||
|
// This test documents that hybrid scanning provides a complete view
|
||
|
// of both recent (live) and historical (archived) data in a single query
|
||
|
liveCount := 0
|
||
|
archivedCount := 0
|
||
|
|
||
|
sourceIndex := -1
|
||
|
for i, column := range result.Columns {
|
||
|
if column == "_source" {
|
||
|
sourceIndex = i
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if sourceIndex >= 0 {
|
||
|
for _, row := range result.Rows {
|
||
|
if sourceIndex < len(row) {
|
||
|
source := row[sourceIndex].ToString()
|
||
|
switch source {
|
||
|
case "live_log":
|
||
|
liveCount++
|
||
|
case "parquet_archive":
|
||
|
archivedCount++
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
t.Logf("✅ Hybrid query results: %d live messages, %d archived messages", liveCount, archivedCount)
|
||
|
|
||
|
if liveCount == 0 && archivedCount == 0 {
|
||
|
t.Error("Expected to find both live and archived messages in hybrid scan")
|
||
|
}
|
||
|
}
|