Files
seaweedfs/weed/query/engine/hybrid_message_scanner.go
Chris Lu 58e0c1b330 Fix sql bugs (#7219)
* fix nil when explaining

* add plain details when running full scan

* skip files by timestamp

* skip file by timestamp

* refactor

* handle filter by time

* skip broker memory only if it has unflushed messages

* refactoring

* refactor

* address comments

* address comments

* filter by parquet stats

* simplify

* refactor

* prune old code

* optimize

* Update aggregations.go

* ensure non-time predicates are properly detected

* add stmt to populatePlanFileDetails

This helper function is a great way to centralize logic for populating file details. However, it's missing an optimization that is present in executeSelectStatementWithBrokerStats: pruning Parquet files based on column statistics from the WHERE clause.

Aggregation queries that fall back to the slow path could benefit from this optimization. Consider modifying the function signature to accept the *SelectStatement and adding the column statistics pruning logic here, similar to how it's done in executeSelectStatementWithBrokerStats.

* refactoring to work with *schema_pb.Value directly after the initial conversion
2025-09-10 11:04:42 -07:00

1719 lines
54 KiB
Go

package engine
import (
"container/heap"
"context"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/parquet-go/parquet-go"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/mq/logstore"
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"google.golang.org/protobuf/proto"
)
// HybridMessageScanner scans from ALL data sources:
// Architecture:
// 1. Unflushed in-memory data from brokers (mq_pb.DataMessage format) - REAL-TIME
// 2. Recent/live messages in log files (filer_pb.LogEntry format) - FLUSHED
// 3. Older messages in Parquet files (schema_pb.RecordValue format) - ARCHIVED
// 4. Seamlessly merges data from all sources chronologically
// 5. Provides complete real-time view of all messages in a topic
type HybridMessageScanner struct {
filerClient filer_pb.FilerClient
brokerClient BrokerClientInterface // For querying unflushed data
topic topic.Topic
recordSchema *schema_pb.RecordType
parquetLevels *schema.ParquetLevels
engine *SQLEngine // Reference for system column formatting
}
// NewHybridMessageScanner creates a scanner that reads from all data sources
// This provides complete real-time message coverage including unflushed data
func NewHybridMessageScanner(filerClient filer_pb.FilerClient, brokerClient BrokerClientInterface, namespace, topicName string, engine *SQLEngine) (*HybridMessageScanner, error) {
// Check if filerClient is available
if filerClient == nil {
return nil, fmt.Errorf("filerClient is required but not available")
}
// Create topic reference
t := topic.Topic{
Namespace: namespace,
Name: topicName,
}
// Get topic schema from broker client (works with both real and mock clients)
recordType, err := brokerClient.GetTopicSchema(context.Background(), namespace, topicName)
if err != nil {
return nil, fmt.Errorf("failed to get topic schema: %v", err)
}
if recordType == nil {
return nil, NoSchemaError{Namespace: namespace, Topic: topicName}
}
// Create a copy of the recordType to avoid modifying the original
recordTypeCopy := &schema_pb.RecordType{
Fields: make([]*schema_pb.Field, len(recordType.Fields)),
}
copy(recordTypeCopy.Fields, recordType.Fields)
// Add system columns that MQ adds to all records
recordType = schema.NewRecordTypeBuilder(recordTypeCopy).
WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64).
WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
RecordTypeEnd()
// Convert to Parquet levels for efficient reading
parquetLevels, err := schema.ToParquetLevels(recordType)
if err != nil {
return nil, fmt.Errorf("failed to create Parquet levels: %v", err)
}
return &HybridMessageScanner{
filerClient: filerClient,
brokerClient: brokerClient,
topic: t,
recordSchema: recordType,
parquetLevels: parquetLevels,
engine: engine,
}, nil
}
// HybridScanOptions configure how the scanner reads from both live and archived data
type HybridScanOptions struct {
// Time range filtering (Unix nanoseconds)
StartTimeNs int64
StopTimeNs int64
// Column projection - if empty, select all columns
Columns []string
// Row limit - 0 means no limit
Limit int
// Row offset - 0 means no offset
Offset int
// Predicate for WHERE clause filtering
Predicate func(*schema_pb.RecordValue) bool
}
// HybridScanResult represents a message from either live logs or Parquet files
type HybridScanResult struct {
Values map[string]*schema_pb.Value // Column name -> value
Timestamp int64 // Message timestamp (_ts_ns)
Key []byte // Message key (_key)
Source string // "live_log" or "parquet_archive" or "in_memory_broker"
}
// HybridScanStats contains statistics about data sources scanned
type HybridScanStats struct {
BrokerBufferQueried bool
BrokerBufferMessages int
BufferStartIndex int64
PartitionsScanned int
LiveLogFilesScanned int // Number of live log files processed
}
// ParquetColumnStats holds statistics for a single column from parquet metadata
type ParquetColumnStats struct {
ColumnName string
MinValue *schema_pb.Value
MaxValue *schema_pb.Value
NullCount int64
RowCount int64
}
// ParquetFileStats holds aggregated statistics for a parquet file
type ParquetFileStats struct {
FileName string
RowCount int64
ColumnStats map[string]*ParquetColumnStats
// Optional file-level timestamp range from filer extended attributes
MinTimestampNs int64
MaxTimestampNs int64
}
// getTimestampRangeFromStats returns (minTsNs, maxTsNs, ok) by inspecting common timestamp columns
func (h *HybridMessageScanner) getTimestampRangeFromStats(fileStats *ParquetFileStats) (int64, int64, bool) {
if fileStats == nil {
return 0, 0, false
}
// Prefer column stats for _ts_ns if present
if len(fileStats.ColumnStats) > 0 {
if s, ok := fileStats.ColumnStats[logstore.SW_COLUMN_NAME_TS]; ok && s != nil && s.MinValue != nil && s.MaxValue != nil {
if minNs, okMin := h.schemaValueToNs(s.MinValue); okMin {
if maxNs, okMax := h.schemaValueToNs(s.MaxValue); okMax {
return minNs, maxNs, true
}
}
}
}
// Fallback to file-level range if present in filer extended metadata
if fileStats.MinTimestampNs != 0 || fileStats.MaxTimestampNs != 0 {
return fileStats.MinTimestampNs, fileStats.MaxTimestampNs, true
}
return 0, 0, false
}
// schemaValueToNs converts a schema_pb.Value that represents a timestamp to ns
func (h *HybridMessageScanner) schemaValueToNs(v *schema_pb.Value) (int64, bool) {
if v == nil {
return 0, false
}
switch k := v.Kind.(type) {
case *schema_pb.Value_Int64Value:
return k.Int64Value, true
case *schema_pb.Value_Int32Value:
return int64(k.Int32Value), true
default:
return 0, false
}
}
// StreamingDataSource provides a streaming interface for reading scan results
type StreamingDataSource interface {
Next() (*HybridScanResult, error) // Returns next result or nil when done
HasMore() bool // Returns true if more data available
Close() error // Clean up resources
}
// StreamingMergeItem represents an item in the priority queue for streaming merge
type StreamingMergeItem struct {
Result *HybridScanResult
SourceID int
DataSource StreamingDataSource
}
// StreamingMergeHeap implements heap.Interface for merging sorted streams by timestamp
type StreamingMergeHeap []*StreamingMergeItem
func (h StreamingMergeHeap) Len() int { return len(h) }
func (h StreamingMergeHeap) Less(i, j int) bool {
// Sort by timestamp (ascending order)
return h[i].Result.Timestamp < h[j].Result.Timestamp
}
func (h StreamingMergeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *StreamingMergeHeap) Push(x interface{}) {
*h = append(*h, x.(*StreamingMergeItem))
}
func (h *StreamingMergeHeap) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
*h = old[0 : n-1]
return item
}
// Scan reads messages from both live logs and archived Parquet files
// Uses SeaweedFS MQ's GenMergedReadFunc for seamless integration
// Assumptions:
// 1. Chronologically merges live and archived data
// 2. Applies filtering at the lowest level for efficiency
// 3. Handles schema evolution transparently
func (hms *HybridMessageScanner) Scan(ctx context.Context, options HybridScanOptions) ([]HybridScanResult, error) {
results, _, err := hms.ScanWithStats(ctx, options)
return results, err
}
// ScanWithStats reads messages and returns scan statistics for execution plans
func (hms *HybridMessageScanner) ScanWithStats(ctx context.Context, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) {
var results []HybridScanResult
stats := &HybridScanStats{}
// Get all partitions for this topic via MQ broker discovery
partitions, err := hms.discoverTopicPartitions(ctx)
if err != nil {
return nil, stats, fmt.Errorf("failed to discover partitions for topic %s: %v", hms.topic.String(), err)
}
stats.PartitionsScanned = len(partitions)
for _, partition := range partitions {
partitionResults, partitionStats, err := hms.scanPartitionHybridWithStats(ctx, partition, options)
if err != nil {
return nil, stats, fmt.Errorf("failed to scan partition %v: %v", partition, err)
}
results = append(results, partitionResults...)
// Aggregate broker buffer stats
if partitionStats != nil {
if partitionStats.BrokerBufferQueried {
stats.BrokerBufferQueried = true
}
stats.BrokerBufferMessages += partitionStats.BrokerBufferMessages
if partitionStats.BufferStartIndex > 0 && (stats.BufferStartIndex == 0 || partitionStats.BufferStartIndex < stats.BufferStartIndex) {
stats.BufferStartIndex = partitionStats.BufferStartIndex
}
}
// Apply global limit (without offset) across all partitions
// When OFFSET is used, collect more data to ensure we have enough after skipping
// Note: OFFSET will be applied at the end to avoid double-application
if options.Limit > 0 {
// Collect exact amount needed: LIMIT + OFFSET (no excessive doubling)
minRequired := options.Limit + options.Offset
// Small buffer only when needed to handle edge cases in distributed scanning
if options.Offset > 0 && minRequired < 10 {
minRequired = minRequired + 1 // Add 1 extra row buffer, not doubling
}
if len(results) >= minRequired {
break
}
}
}
// Apply final OFFSET and LIMIT processing (done once at the end)
// Limit semantics: -1 = no limit, 0 = LIMIT 0 (empty), >0 = limit to N rows
if options.Offset > 0 || options.Limit >= 0 {
// Handle LIMIT 0 special case first
if options.Limit == 0 {
return []HybridScanResult{}, stats, nil
}
// Apply OFFSET first
if options.Offset > 0 {
if options.Offset >= len(results) {
results = []HybridScanResult{}
} else {
results = results[options.Offset:]
}
}
// Apply LIMIT after OFFSET (only if limit > 0)
if options.Limit > 0 && len(results) > options.Limit {
results = results[:options.Limit]
}
}
return results, stats, nil
}
// scanUnflushedData queries brokers for unflushed in-memory data using buffer_start deduplication
func (hms *HybridMessageScanner) scanUnflushedData(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) {
results, _, err := hms.scanUnflushedDataWithStats(ctx, partition, options)
return results, err
}
// scanUnflushedDataWithStats queries brokers for unflushed data and returns statistics
func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) {
var results []HybridScanResult
stats := &HybridScanStats{}
// Skip if no broker client available
if hms.brokerClient == nil {
return results, stats, nil
}
// Mark that we attempted to query broker buffer
stats.BrokerBufferQueried = true
// Step 1: Get unflushed data from broker using buffer_start-based method
// This method uses buffer_start metadata to avoid double-counting with exact precision
unflushedEntries, err := hms.brokerClient.GetUnflushedMessages(ctx, hms.topic.Namespace, hms.topic.Name, partition, options.StartTimeNs)
if err != nil {
// Log error but don't fail the query - continue with disk data only
if isDebugMode(ctx) {
fmt.Printf("Debug: Failed to get unflushed messages: %v\n", err)
}
// Reset queried flag on error
stats.BrokerBufferQueried = false
return results, stats, nil
}
// Capture stats for EXPLAIN
stats.BrokerBufferMessages = len(unflushedEntries)
// Debug logging for EXPLAIN mode
if isDebugMode(ctx) {
fmt.Printf("Debug: Broker buffer queried - found %d unflushed messages\n", len(unflushedEntries))
if len(unflushedEntries) > 0 {
fmt.Printf("Debug: Using buffer_start deduplication for precise real-time data\n")
}
}
// Step 2: Process unflushed entries (already deduplicated by broker)
for _, logEntry := range unflushedEntries {
// Skip control entries without actual data
if hms.isControlEntry(logEntry) {
continue // Skip this entry
}
// Skip messages outside time range
if options.StartTimeNs > 0 && logEntry.TsNs < options.StartTimeNs {
continue
}
if options.StopTimeNs > 0 && logEntry.TsNs > options.StopTimeNs {
continue
}
// Convert LogEntry to RecordValue format (same as disk data)
recordValue, _, err := hms.convertLogEntryToRecordValue(logEntry)
if err != nil {
if isDebugMode(ctx) {
fmt.Printf("Debug: Failed to convert unflushed log entry: %v\n", err)
}
continue // Skip malformed messages
}
// Apply predicate filter if provided
if options.Predicate != nil && !options.Predicate(recordValue) {
continue
}
// Extract system columns for result
timestamp := recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value()
key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()
// Apply column projection
values := make(map[string]*schema_pb.Value)
if len(options.Columns) == 0 {
// Select all columns (excluding system columns from user view)
for name, value := range recordValue.Fields {
if name != SW_COLUMN_NAME_TIMESTAMP && name != SW_COLUMN_NAME_KEY {
values[name] = value
}
}
} else {
// Select specified columns only
for _, columnName := range options.Columns {
if value, exists := recordValue.Fields[columnName]; exists {
values[columnName] = value
}
}
}
// Create result with proper source tagging
result := HybridScanResult{
Values: values,
Timestamp: timestamp,
Key: key,
Source: "live_log", // Data from broker's unflushed messages
}
results = append(results, result)
// Apply limit (accounting for offset) - collect exact amount needed
if options.Limit > 0 {
// Collect exact amount needed: LIMIT + OFFSET (no excessive doubling)
minRequired := options.Limit + options.Offset
// Small buffer only when needed to handle edge cases in message streaming
if options.Offset > 0 && minRequired < 10 {
minRequired = minRequired + 1 // Add 1 extra row buffer, not doubling
}
if len(results) >= minRequired {
break
}
}
}
if isDebugMode(ctx) {
fmt.Printf("Debug: Retrieved %d unflushed messages from broker\n", len(results))
}
return results, stats, nil
}
// convertDataMessageToRecord converts mq_pb.DataMessage to schema_pb.RecordValue
func (hms *HybridMessageScanner) convertDataMessageToRecord(msg *mq_pb.DataMessage) (*schema_pb.RecordValue, string, error) {
// Parse the message data as RecordValue
recordValue := &schema_pb.RecordValue{}
if err := proto.Unmarshal(msg.Value, recordValue); err != nil {
return nil, "", fmt.Errorf("failed to unmarshal message data: %v", err)
}
// Add system columns
if recordValue.Fields == nil {
recordValue.Fields = make(map[string]*schema_pb.Value)
}
// Add timestamp
recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
Kind: &schema_pb.Value_Int64Value{Int64Value: msg.TsNs},
}
return recordValue, string(msg.Key), nil
}
// discoverTopicPartitions discovers the actual partitions for this topic by scanning the filesystem
// This finds real partition directories like v2025-09-01-07-16-34/0000-0630/
func (hms *HybridMessageScanner) discoverTopicPartitions(ctx context.Context) ([]topic.Partition, error) {
if hms.filerClient == nil {
return nil, fmt.Errorf("filerClient not available for partition discovery")
}
var allPartitions []topic.Partition
var err error
// Scan the topic directory for actual partition versions (timestamped directories)
// List all version directories in the topic directory
err = filer_pb.ReadDirAllEntries(ctx, hms.filerClient, util.FullPath(hms.topic.Dir()), "", func(versionEntry *filer_pb.Entry, isLast bool) error {
if !versionEntry.IsDirectory {
return nil // Skip non-directories
}
// Parse version timestamp from directory name (e.g., "v2025-09-01-07-16-34")
versionTime, parseErr := topic.ParseTopicVersion(versionEntry.Name)
if parseErr != nil {
// Skip directories that don't match the version format
return nil
}
// Scan partition directories within this version
versionDir := fmt.Sprintf("%s/%s", hms.topic.Dir(), versionEntry.Name)
return filer_pb.ReadDirAllEntries(ctx, hms.filerClient, util.FullPath(versionDir), "", func(partitionEntry *filer_pb.Entry, isLast bool) error {
if !partitionEntry.IsDirectory {
return nil // Skip non-directories
}
// Parse partition boundary from directory name (e.g., "0000-0630")
rangeStart, rangeStop := topic.ParsePartitionBoundary(partitionEntry.Name)
if rangeStart == rangeStop {
return nil // Skip invalid partition names
}
// Create partition object
partition := topic.Partition{
RangeStart: rangeStart,
RangeStop: rangeStop,
RingSize: topic.PartitionCount,
UnixTimeNs: versionTime.UnixNano(),
}
allPartitions = append(allPartitions, partition)
return nil
})
})
if err != nil {
return nil, fmt.Errorf("failed to scan topic directory for partitions: %v", err)
}
// If no partitions found, return empty slice (valid for newly created or empty topics)
if len(allPartitions) == 0 {
fmt.Printf("No partitions found for topic %s - returning empty result set\n", hms.topic.String())
return []topic.Partition{}, nil
}
fmt.Printf("Discovered %d partitions for topic %s\n", len(allPartitions), hms.topic.String())
return allPartitions, nil
}
// scanPartitionHybrid scans a specific partition using the hybrid approach
// This is where the magic happens - seamlessly reading ALL data sources:
// 1. Unflushed in-memory data from brokers (REAL-TIME)
// 2. Live logs + Parquet files from disk (FLUSHED/ARCHIVED)
func (hms *HybridMessageScanner) scanPartitionHybrid(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) {
results, _, err := hms.scanPartitionHybridWithStats(ctx, partition, options)
return results, err
}
// scanPartitionHybridWithStats scans a specific partition using streaming merge for memory efficiency
// PERFORMANCE IMPROVEMENT: Uses heap-based streaming merge instead of collecting all data and sorting
// - Memory usage: O(k) where k = number of data sources, instead of O(n) where n = total records
// - Scalable: Can handle large topics without LIMIT clauses efficiently
// - Streaming: Processes data as it arrives rather than buffering everything
func (hms *HybridMessageScanner) scanPartitionHybridWithStats(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) {
stats := &HybridScanStats{}
// STEP 1: Scan unflushed in-memory data from brokers (REAL-TIME)
unflushedResults, unflushedStats, err := hms.scanUnflushedDataWithStats(ctx, partition, options)
if err != nil {
// Don't fail the query if broker scanning fails, but provide clear warning to user
// This ensures users are aware that results may not include the most recent data
if isDebugMode(ctx) {
fmt.Printf("Debug: Failed to scan unflushed data from broker: %v\n", err)
} else {
fmt.Printf("Warning: Unable to access real-time data from message broker: %v\n", err)
fmt.Printf("Note: Query results may not include the most recent unflushed messages\n")
}
} else if unflushedStats != nil {
stats.BrokerBufferQueried = unflushedStats.BrokerBufferQueried
stats.BrokerBufferMessages = unflushedStats.BrokerBufferMessages
stats.BufferStartIndex = unflushedStats.BufferStartIndex
}
// Count live log files for statistics
liveLogCount, err := hms.countLiveLogFiles(partition)
if err != nil {
// Don't fail the query, just log warning
fmt.Printf("Warning: Failed to count live log files: %v\n", err)
liveLogCount = 0
}
stats.LiveLogFilesScanned = liveLogCount
// STEP 2: Create streaming data sources for memory-efficient merge
var dataSources []StreamingDataSource
// Add unflushed data source (if we have unflushed results)
if len(unflushedResults) > 0 {
// Sort unflushed results by timestamp before creating stream
if len(unflushedResults) > 1 {
hms.mergeSort(unflushedResults, 0, len(unflushedResults)-1)
}
dataSources = append(dataSources, NewSliceDataSource(unflushedResults))
}
// Add streaming flushed data source (live logs + Parquet files)
flushedDataSource := NewStreamingFlushedDataSource(hms, partition, options)
dataSources = append(dataSources, flushedDataSource)
// STEP 3: Use streaming merge for memory-efficient chronological ordering
var results []HybridScanResult
if len(dataSources) > 0 {
// Calculate how many rows we need to collect during scanning (before OFFSET/LIMIT)
// For LIMIT N OFFSET M, we need to collect at least N+M rows
scanLimit := options.Limit
if options.Limit > 0 && options.Offset > 0 {
scanLimit = options.Limit + options.Offset
}
mergedResults, err := hms.streamingMerge(dataSources, scanLimit)
if err != nil {
return nil, stats, fmt.Errorf("streaming merge failed: %v", err)
}
results = mergedResults
}
return results, stats, nil
}
// countLiveLogFiles counts the number of live log files in a partition for statistics
func (hms *HybridMessageScanner) countLiveLogFiles(partition topic.Partition) (int, error) {
partitionDir := topic.PartitionDir(hms.topic, partition)
var fileCount int
err := hms.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// List all files in partition directory
request := &filer_pb.ListEntriesRequest{
Directory: partitionDir,
Prefix: "",
StartFromFileName: "",
InclusiveStartFrom: true,
Limit: 10000, // reasonable limit for counting
}
stream, err := client.ListEntries(context.Background(), request)
if err != nil {
return err
}
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
// Count files that are not .parquet files (live log files)
// Live log files typically have timestamps or are named like log files
fileName := resp.Entry.Name
if !strings.HasSuffix(fileName, ".parquet") &&
!strings.HasSuffix(fileName, ".offset") &&
len(resp.Entry.Chunks) > 0 { // Has actual content
fileCount++
}
}
return nil
})
if err != nil {
return 0, err
}
return fileCount, nil
}
// isControlEntry checks if a log entry is a control entry without actual data
// Based on MQ system analysis, control entries are:
// 1. DataMessages with populated Ctrl field (publisher close signals)
// 2. Entries with empty keys (as filtered by subscriber)
// 3. Entries with no data
func (hms *HybridMessageScanner) isControlEntry(logEntry *filer_pb.LogEntry) bool {
// Skip entries with no data
if len(logEntry.Data) == 0 {
return true
}
// Skip entries with empty keys (same logic as subscriber)
if len(logEntry.Key) == 0 {
return true
}
// Check if this is a DataMessage with control field populated
dataMessage := &mq_pb.DataMessage{}
if err := proto.Unmarshal(logEntry.Data, dataMessage); err == nil {
// If it has a control field, it's a control message
if dataMessage.Ctrl != nil {
return true
}
}
return false
}
// convertLogEntryToRecordValue converts a filer_pb.LogEntry to schema_pb.RecordValue
// This handles both:
// 1. Live log entries (raw message format)
// 2. Parquet entries (already in schema_pb.RecordValue format)
func (hms *HybridMessageScanner) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) {
// Try to unmarshal as RecordValue first (Parquet format)
recordValue := &schema_pb.RecordValue{}
if err := proto.Unmarshal(logEntry.Data, recordValue); err == nil {
// This is an archived message from Parquet files
// FIX: Add system columns from LogEntry to RecordValue
if recordValue.Fields == nil {
recordValue.Fields = make(map[string]*schema_pb.Value)
}
// Add system columns from LogEntry
recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
}
recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
}
return recordValue, "parquet_archive", nil
}
// If not a RecordValue, this is raw live message data - parse with schema
return hms.parseRawMessageWithSchema(logEntry)
}
// parseRawMessageWithSchema parses raw live message data using the topic's schema
// This provides proper type conversion and field mapping instead of treating everything as strings
func (hms *HybridMessageScanner) parseRawMessageWithSchema(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) {
recordValue := &schema_pb.RecordValue{
Fields: make(map[string]*schema_pb.Value),
}
// Add system columns (always present)
recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
}
recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
}
// Parse message data based on schema
if hms.recordSchema == nil || len(hms.recordSchema.Fields) == 0 {
// Fallback: No schema available, treat as single "data" field
recordValue.Fields["data"] = &schema_pb.Value{
Kind: &schema_pb.Value_StringValue{StringValue: string(logEntry.Data)},
}
return recordValue, "live_log", nil
}
// Attempt schema-aware parsing
// Strategy 1: Try JSON parsing first (most common for live messages)
if parsedRecord, err := hms.parseJSONMessage(logEntry.Data); err == nil {
// Successfully parsed as JSON, merge with system columns
for fieldName, fieldValue := range parsedRecord.Fields {
recordValue.Fields[fieldName] = fieldValue
}
return recordValue, "live_log", nil
}
// Strategy 2: Try protobuf parsing (binary messages)
if parsedRecord, err := hms.parseProtobufMessage(logEntry.Data); err == nil {
// Successfully parsed as protobuf, merge with system columns
for fieldName, fieldValue := range parsedRecord.Fields {
recordValue.Fields[fieldName] = fieldValue
}
return recordValue, "live_log", nil
}
// Strategy 3: Fallback to single field with raw data
// If schema has a single field, map the raw data to it with type conversion
if len(hms.recordSchema.Fields) == 1 {
field := hms.recordSchema.Fields[0]
convertedValue, err := hms.convertRawDataToSchemaValue(logEntry.Data, field.Type)
if err == nil {
recordValue.Fields[field.Name] = convertedValue
return recordValue, "live_log", nil
}
}
// Final fallback: treat as string data field
recordValue.Fields["data"] = &schema_pb.Value{
Kind: &schema_pb.Value_StringValue{StringValue: string(logEntry.Data)},
}
return recordValue, "live_log", nil
}
// parseJSONMessage attempts to parse raw data as JSON and map to schema fields
func (hms *HybridMessageScanner) parseJSONMessage(data []byte) (*schema_pb.RecordValue, error) {
// Try to parse as JSON
var jsonData map[string]interface{}
if err := json.Unmarshal(data, &jsonData); err != nil {
return nil, fmt.Errorf("not valid JSON: %v", err)
}
recordValue := &schema_pb.RecordValue{
Fields: make(map[string]*schema_pb.Value),
}
// Map JSON fields to schema fields
for _, schemaField := range hms.recordSchema.Fields {
fieldName := schemaField.Name
if jsonValue, exists := jsonData[fieldName]; exists {
schemaValue, err := hms.convertJSONValueToSchemaValue(jsonValue, schemaField.Type)
if err != nil {
// Log conversion error but continue with other fields
continue
}
recordValue.Fields[fieldName] = schemaValue
}
}
return recordValue, nil
}
// parseProtobufMessage attempts to parse raw data as protobuf RecordValue
func (hms *HybridMessageScanner) parseProtobufMessage(data []byte) (*schema_pb.RecordValue, error) {
// This might be a raw protobuf message that didn't parse correctly the first time
// Try alternative protobuf unmarshaling approaches
recordValue := &schema_pb.RecordValue{}
// Strategy 1: Direct unmarshaling (might work if it's actually a RecordValue)
if err := proto.Unmarshal(data, recordValue); err == nil {
return recordValue, nil
}
// Strategy 2: Check if it's a different protobuf message type
// For now, return error as we need more specific knowledge of MQ message formats
return nil, fmt.Errorf("could not parse as protobuf RecordValue")
}
// convertRawDataToSchemaValue converts raw bytes to a specific schema type
func (hms *HybridMessageScanner) convertRawDataToSchemaValue(data []byte, fieldType *schema_pb.Type) (*schema_pb.Value, error) {
dataStr := string(data)
switch fieldType.Kind.(type) {
case *schema_pb.Type_ScalarType:
scalarType := fieldType.GetScalarType()
switch scalarType {
case schema_pb.ScalarType_STRING:
return &schema_pb.Value{
Kind: &schema_pb.Value_StringValue{StringValue: dataStr},
}, nil
case schema_pb.ScalarType_INT32:
if val, err := strconv.ParseInt(strings.TrimSpace(dataStr), 10, 32); err == nil {
return &schema_pb.Value{
Kind: &schema_pb.Value_Int32Value{Int32Value: int32(val)},
}, nil
}
case schema_pb.ScalarType_INT64:
if val, err := strconv.ParseInt(strings.TrimSpace(dataStr), 10, 64); err == nil {
return &schema_pb.Value{
Kind: &schema_pb.Value_Int64Value{Int64Value: val},
}, nil
}
case schema_pb.ScalarType_FLOAT:
if val, err := strconv.ParseFloat(strings.TrimSpace(dataStr), 32); err == nil {
return &schema_pb.Value{
Kind: &schema_pb.Value_FloatValue{FloatValue: float32(val)},
}, nil
}
case schema_pb.ScalarType_DOUBLE:
if val, err := strconv.ParseFloat(strings.TrimSpace(dataStr), 64); err == nil {
return &schema_pb.Value{
Kind: &schema_pb.Value_DoubleValue{DoubleValue: val},
}, nil
}
case schema_pb.ScalarType_BOOL:
lowerStr := strings.ToLower(strings.TrimSpace(dataStr))
if lowerStr == "true" || lowerStr == "1" || lowerStr == "yes" {
return &schema_pb.Value{
Kind: &schema_pb.Value_BoolValue{BoolValue: true},
}, nil
} else if lowerStr == "false" || lowerStr == "0" || lowerStr == "no" {
return &schema_pb.Value{
Kind: &schema_pb.Value_BoolValue{BoolValue: false},
}, nil
}
case schema_pb.ScalarType_BYTES:
return &schema_pb.Value{
Kind: &schema_pb.Value_BytesValue{BytesValue: data},
}, nil
}
}
return nil, fmt.Errorf("unsupported type conversion for %v", fieldType)
}
// convertJSONValueToSchemaValue converts a JSON value to schema_pb.Value based on schema type
func (hms *HybridMessageScanner) convertJSONValueToSchemaValue(jsonValue interface{}, fieldType *schema_pb.Type) (*schema_pb.Value, error) {
switch fieldType.Kind.(type) {
case *schema_pb.Type_ScalarType:
scalarType := fieldType.GetScalarType()
switch scalarType {
case schema_pb.ScalarType_STRING:
if str, ok := jsonValue.(string); ok {
return &schema_pb.Value{
Kind: &schema_pb.Value_StringValue{StringValue: str},
}, nil
}
// Convert other types to string
return &schema_pb.Value{
Kind: &schema_pb.Value_StringValue{StringValue: fmt.Sprintf("%v", jsonValue)},
}, nil
case schema_pb.ScalarType_INT32:
if num, ok := jsonValue.(float64); ok { // JSON numbers are float64
return &schema_pb.Value{
Kind: &schema_pb.Value_Int32Value{Int32Value: int32(num)},
}, nil
}
case schema_pb.ScalarType_INT64:
if num, ok := jsonValue.(float64); ok {
return &schema_pb.Value{
Kind: &schema_pb.Value_Int64Value{Int64Value: int64(num)},
}, nil
}
case schema_pb.ScalarType_FLOAT:
if num, ok := jsonValue.(float64); ok {
return &schema_pb.Value{
Kind: &schema_pb.Value_FloatValue{FloatValue: float32(num)},
}, nil
}
case schema_pb.ScalarType_DOUBLE:
if num, ok := jsonValue.(float64); ok {
return &schema_pb.Value{
Kind: &schema_pb.Value_DoubleValue{DoubleValue: num},
}, nil
}
case schema_pb.ScalarType_BOOL:
if boolVal, ok := jsonValue.(bool); ok {
return &schema_pb.Value{
Kind: &schema_pb.Value_BoolValue{BoolValue: boolVal},
}, nil
}
case schema_pb.ScalarType_BYTES:
if str, ok := jsonValue.(string); ok {
return &schema_pb.Value{
Kind: &schema_pb.Value_BytesValue{BytesValue: []byte(str)},
}, nil
}
}
}
return nil, fmt.Errorf("incompatible JSON value type %T for schema type %v", jsonValue, fieldType)
}
// ConvertToSQLResult converts HybridScanResults to SQL query results
func (hms *HybridMessageScanner) ConvertToSQLResult(results []HybridScanResult, columns []string) *QueryResult {
if len(results) == 0 {
return &QueryResult{
Columns: columns,
Rows: [][]sqltypes.Value{},
Database: hms.topic.Namespace,
Table: hms.topic.Name,
}
}
// Determine columns if not specified
if len(columns) == 0 {
columnSet := make(map[string]bool)
for _, result := range results {
for columnName := range result.Values {
columnSet[columnName] = true
}
}
columns = make([]string, 0, len(columnSet))
for columnName := range columnSet {
columns = append(columns, columnName)
}
}
// Convert to SQL rows
rows := make([][]sqltypes.Value, len(results))
for i, result := range results {
row := make([]sqltypes.Value, len(columns))
for j, columnName := range columns {
switch columnName {
case SW_COLUMN_NAME_SOURCE:
row[j] = sqltypes.NewVarChar(result.Source)
case SW_COLUMN_NAME_TIMESTAMP, SW_DISPLAY_NAME_TIMESTAMP:
// Format timestamp as proper timestamp type instead of raw nanoseconds
row[j] = hms.engine.formatTimestampColumn(result.Timestamp)
case SW_COLUMN_NAME_KEY:
row[j] = sqltypes.NewVarBinary(string(result.Key))
default:
if value, exists := result.Values[columnName]; exists {
row[j] = convertSchemaValueToSQL(value)
} else {
row[j] = sqltypes.NULL
}
}
}
rows[i] = row
}
return &QueryResult{
Columns: columns,
Rows: rows,
Database: hms.topic.Namespace,
Table: hms.topic.Name,
}
}
// ConvertToSQLResultWithMixedColumns handles SELECT *, specific_columns queries
// Combines auto-discovered columns (from *) with explicitly requested columns
func (hms *HybridMessageScanner) ConvertToSQLResultWithMixedColumns(results []HybridScanResult, explicitColumns []string) *QueryResult {
if len(results) == 0 {
// For empty results, combine auto-discovered columns with explicit ones
columnSet := make(map[string]bool)
// Add explicit columns first
for _, col := range explicitColumns {
columnSet[col] = true
}
// Build final column list
columns := make([]string, 0, len(columnSet))
for col := range columnSet {
columns = append(columns, col)
}
return &QueryResult{
Columns: columns,
Rows: [][]sqltypes.Value{},
Database: hms.topic.Namespace,
Table: hms.topic.Name,
}
}
// Auto-discover columns from data (like SELECT *)
autoColumns := make(map[string]bool)
for _, result := range results {
for columnName := range result.Values {
autoColumns[columnName] = true
}
}
// Combine auto-discovered and explicit columns
columnSet := make(map[string]bool)
// Add auto-discovered columns first (regular data columns)
for col := range autoColumns {
columnSet[col] = true
}
// Add explicit columns (may include system columns like _source)
for _, col := range explicitColumns {
columnSet[col] = true
}
// Build final column list
columns := make([]string, 0, len(columnSet))
for col := range columnSet {
columns = append(columns, col)
}
// Convert to SQL rows
rows := make([][]sqltypes.Value, len(results))
for i, result := range results {
row := make([]sqltypes.Value, len(columns))
for j, columnName := range columns {
switch columnName {
case SW_COLUMN_NAME_TIMESTAMP:
row[j] = sqltypes.NewInt64(result.Timestamp)
case SW_COLUMN_NAME_KEY:
row[j] = sqltypes.NewVarBinary(string(result.Key))
case SW_COLUMN_NAME_SOURCE:
row[j] = sqltypes.NewVarChar(result.Source)
default:
// Regular data column
if value, exists := result.Values[columnName]; exists {
row[j] = convertSchemaValueToSQL(value)
} else {
row[j] = sqltypes.NULL
}
}
}
rows[i] = row
}
return &QueryResult{
Columns: columns,
Rows: rows,
Database: hms.topic.Namespace,
Table: hms.topic.Name,
}
}
// ReadParquetStatistics efficiently reads column statistics from parquet files
// without scanning the full file content - uses parquet's built-in metadata
func (h *HybridMessageScanner) ReadParquetStatistics(partitionPath string) ([]*ParquetFileStats, error) {
var fileStats []*ParquetFileStats
// Use the same chunk cache as the logstore package
chunkCache := chunk_cache.NewChunkCacheInMemory(256)
lookupFileIdFn := filer.LookupFn(h.filerClient)
err := filer_pb.ReadDirAllEntries(context.Background(), h.filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error {
// Only process parquet files
if entry.IsDirectory || !strings.HasSuffix(entry.Name, ".parquet") {
return nil
}
// Extract statistics from this parquet file
stats, err := h.extractParquetFileStats(entry, lookupFileIdFn, chunkCache)
if err != nil {
// Log error but continue processing other files
fmt.Printf("Warning: failed to extract stats from %s: %v\n", entry.Name, err)
return nil
}
if stats != nil {
fileStats = append(fileStats, stats)
}
return nil
})
return fileStats, err
}
// extractParquetFileStats extracts column statistics from a single parquet file
func (h *HybridMessageScanner) extractParquetFileStats(entry *filer_pb.Entry, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunkCache *chunk_cache.ChunkCacheInMemory) (*ParquetFileStats, error) {
// Create reader for the parquet file
fileSize := filer.FileSize(entry)
visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize))
chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize))
readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn)
readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize))
// Create parquet reader - this only reads metadata, not data
parquetReader := parquet.NewReader(readerAt)
defer parquetReader.Close()
fileView := parquetReader.File()
fileStats := &ParquetFileStats{
FileName: entry.Name,
RowCount: fileView.NumRows(),
ColumnStats: make(map[string]*ParquetColumnStats),
}
// Populate optional min/max from filer extended attributes (writer stores ns timestamps)
if entry != nil && entry.Extended != nil {
if minBytes, ok := entry.Extended["min"]; ok && len(minBytes) == 8 {
fileStats.MinTimestampNs = int64(binary.BigEndian.Uint64(minBytes))
}
if maxBytes, ok := entry.Extended["max"]; ok && len(maxBytes) == 8 {
fileStats.MaxTimestampNs = int64(binary.BigEndian.Uint64(maxBytes))
}
}
// Get schema information
schema := fileView.Schema()
// Process each row group
rowGroups := fileView.RowGroups()
for _, rowGroup := range rowGroups {
columnChunks := rowGroup.ColumnChunks()
// Process each column chunk
for i, chunk := range columnChunks {
// Get column name from schema
columnName := h.getColumnNameFromSchema(schema, i)
if columnName == "" {
continue
}
// Try to get column statistics
columnIndex, err := chunk.ColumnIndex()
if err != nil {
// No column index available - skip this column
continue
}
// Extract min/max values from the first page (for simplicity)
// In a more sophisticated implementation, we could aggregate across all pages
numPages := columnIndex.NumPages()
if numPages == 0 {
continue
}
minParquetValue := columnIndex.MinValue(0)
maxParquetValue := columnIndex.MaxValue(numPages - 1)
nullCount := int64(0)
// Aggregate null counts across all pages
for pageIdx := 0; pageIdx < numPages; pageIdx++ {
nullCount += columnIndex.NullCount(pageIdx)
}
// Convert parquet values to schema_pb.Value
minValue, err := h.convertParquetValueToSchemaValue(minParquetValue)
if err != nil {
continue
}
maxValue, err := h.convertParquetValueToSchemaValue(maxParquetValue)
if err != nil {
continue
}
// Store column statistics (aggregate across row groups if column already exists)
if existingStats, exists := fileStats.ColumnStats[columnName]; exists {
// Update existing statistics
if h.compareSchemaValues(minValue, existingStats.MinValue) < 0 {
existingStats.MinValue = minValue
}
if h.compareSchemaValues(maxValue, existingStats.MaxValue) > 0 {
existingStats.MaxValue = maxValue
}
existingStats.NullCount += nullCount
} else {
// Create new column statistics
fileStats.ColumnStats[columnName] = &ParquetColumnStats{
ColumnName: columnName,
MinValue: minValue,
MaxValue: maxValue,
NullCount: nullCount,
RowCount: rowGroup.NumRows(),
}
}
}
}
return fileStats, nil
}
// getColumnNameFromSchema extracts column name from parquet schema by index
func (h *HybridMessageScanner) getColumnNameFromSchema(schema *parquet.Schema, columnIndex int) string {
// Get the leaf columns in order
var columnNames []string
h.collectColumnNames(schema.Fields(), &columnNames)
if columnIndex >= 0 && columnIndex < len(columnNames) {
return columnNames[columnIndex]
}
return ""
}
// collectColumnNames recursively collects leaf column names from schema
func (h *HybridMessageScanner) collectColumnNames(fields []parquet.Field, names *[]string) {
for _, field := range fields {
if len(field.Fields()) == 0 {
// This is a leaf field (no sub-fields)
*names = append(*names, field.Name())
} else {
// This is a group - recurse
h.collectColumnNames(field.Fields(), names)
}
}
}
// convertParquetValueToSchemaValue converts parquet.Value to schema_pb.Value
func (h *HybridMessageScanner) convertParquetValueToSchemaValue(pv parquet.Value) (*schema_pb.Value, error) {
switch pv.Kind() {
case parquet.Boolean:
return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: pv.Boolean()}}, nil
case parquet.Int32:
return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: pv.Int32()}}, nil
case parquet.Int64:
return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: pv.Int64()}}, nil
case parquet.Float:
return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: pv.Float()}}, nil
case parquet.Double:
return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: pv.Double()}}, nil
case parquet.ByteArray:
return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: pv.ByteArray()}}, nil
default:
return nil, fmt.Errorf("unsupported parquet value kind: %v", pv.Kind())
}
}
// compareSchemaValues compares two schema_pb.Value objects
func (h *HybridMessageScanner) compareSchemaValues(v1, v2 *schema_pb.Value) int {
if v1 == nil && v2 == nil {
return 0
}
if v1 == nil {
return -1
}
if v2 == nil {
return 1
}
// Extract raw values and compare
raw1 := h.extractRawValueFromSchema(v1)
raw2 := h.extractRawValueFromSchema(v2)
return h.compareRawValues(raw1, raw2)
}
// extractRawValueFromSchema extracts the raw value from schema_pb.Value
func (h *HybridMessageScanner) extractRawValueFromSchema(value *schema_pb.Value) interface{} {
switch v := value.Kind.(type) {
case *schema_pb.Value_BoolValue:
return v.BoolValue
case *schema_pb.Value_Int32Value:
return v.Int32Value
case *schema_pb.Value_Int64Value:
return v.Int64Value
case *schema_pb.Value_FloatValue:
return v.FloatValue
case *schema_pb.Value_DoubleValue:
return v.DoubleValue
case *schema_pb.Value_BytesValue:
return string(v.BytesValue) // Convert to string for comparison
case *schema_pb.Value_StringValue:
return v.StringValue
}
return nil
}
// compareRawValues compares two raw values
func (h *HybridMessageScanner) compareRawValues(v1, v2 interface{}) int {
// Handle nil cases
if v1 == nil && v2 == nil {
return 0
}
if v1 == nil {
return -1
}
if v2 == nil {
return 1
}
// Compare based on type
switch val1 := v1.(type) {
case bool:
if val2, ok := v2.(bool); ok {
if val1 == val2 {
return 0
}
if val1 {
return 1
}
return -1
}
case int32:
if val2, ok := v2.(int32); ok {
if val1 < val2 {
return -1
} else if val1 > val2 {
return 1
}
return 0
}
case int64:
if val2, ok := v2.(int64); ok {
if val1 < val2 {
return -1
} else if val1 > val2 {
return 1
}
return 0
}
case float32:
if val2, ok := v2.(float32); ok {
if val1 < val2 {
return -1
} else if val1 > val2 {
return 1
}
return 0
}
case float64:
if val2, ok := v2.(float64); ok {
if val1 < val2 {
return -1
} else if val1 > val2 {
return 1
}
return 0
}
case string:
if val2, ok := v2.(string); ok {
if val1 < val2 {
return -1
} else if val1 > val2 {
return 1
}
return 0
}
}
// Default: try string comparison
str1 := fmt.Sprintf("%v", v1)
str2 := fmt.Sprintf("%v", v2)
if str1 < str2 {
return -1
} else if str1 > str2 {
return 1
}
return 0
}
// streamingMerge merges multiple sorted data sources using a heap-based approach
// This provides memory-efficient merging without loading all data into memory
func (hms *HybridMessageScanner) streamingMerge(dataSources []StreamingDataSource, limit int) ([]HybridScanResult, error) {
if len(dataSources) == 0 {
return nil, nil
}
var results []HybridScanResult
mergeHeap := &StreamingMergeHeap{}
heap.Init(mergeHeap)
// Initialize heap with first item from each data source
for i, source := range dataSources {
if source.HasMore() {
result, err := source.Next()
if err != nil {
// Close all sources and return error
for _, s := range dataSources {
s.Close()
}
return nil, fmt.Errorf("failed to read from data source %d: %v", i, err)
}
if result != nil {
heap.Push(mergeHeap, &StreamingMergeItem{
Result: result,
SourceID: i,
DataSource: source,
})
}
}
}
// Process results in chronological order
for mergeHeap.Len() > 0 {
// Get next chronologically ordered result
item := heap.Pop(mergeHeap).(*StreamingMergeItem)
results = append(results, *item.Result)
// Check limit
if limit > 0 && len(results) >= limit {
break
}
// Try to get next item from the same data source
if item.DataSource.HasMore() {
nextResult, err := item.DataSource.Next()
if err != nil {
// Log error but continue with other sources
fmt.Printf("Warning: Error reading next item from source %d: %v\n", item.SourceID, err)
} else if nextResult != nil {
heap.Push(mergeHeap, &StreamingMergeItem{
Result: nextResult,
SourceID: item.SourceID,
DataSource: item.DataSource,
})
}
}
}
// Close all data sources
for _, source := range dataSources {
source.Close()
}
return results, nil
}
// SliceDataSource wraps a pre-loaded slice of results as a StreamingDataSource
// This is used for unflushed data that is already loaded into memory
type SliceDataSource struct {
results []HybridScanResult
index int
}
func NewSliceDataSource(results []HybridScanResult) *SliceDataSource {
return &SliceDataSource{
results: results,
index: 0,
}
}
func (s *SliceDataSource) Next() (*HybridScanResult, error) {
if s.index >= len(s.results) {
return nil, nil
}
result := &s.results[s.index]
s.index++
return result, nil
}
func (s *SliceDataSource) HasMore() bool {
return s.index < len(s.results)
}
func (s *SliceDataSource) Close() error {
return nil // Nothing to clean up for slice-based source
}
// StreamingFlushedDataSource provides streaming access to flushed data
type StreamingFlushedDataSource struct {
hms *HybridMessageScanner
partition topic.Partition
options HybridScanOptions
mergedReadFn func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error)
resultChan chan *HybridScanResult
errorChan chan error
doneChan chan struct{}
started bool
finished bool
closed int32 // atomic flag to prevent double close
mu sync.RWMutex
}
func NewStreamingFlushedDataSource(hms *HybridMessageScanner, partition topic.Partition, options HybridScanOptions) *StreamingFlushedDataSource {
mergedReadFn := logstore.GenMergedReadFunc(hms.filerClient, hms.topic, partition)
return &StreamingFlushedDataSource{
hms: hms,
partition: partition,
options: options,
mergedReadFn: mergedReadFn,
resultChan: make(chan *HybridScanResult, 100), // Buffer for better performance
errorChan: make(chan error, 1),
doneChan: make(chan struct{}),
started: false,
finished: false,
}
}
func (s *StreamingFlushedDataSource) startStreaming() {
if s.started {
return
}
s.started = true
go func() {
defer func() {
// Use atomic flag to ensure channels are only closed once
if atomic.CompareAndSwapInt32(&s.closed, 0, 1) {
close(s.resultChan)
close(s.errorChan)
close(s.doneChan)
}
}()
// Set up time range for scanning
startTime := time.Unix(0, s.options.StartTimeNs)
if s.options.StartTimeNs == 0 {
startTime = time.Unix(0, 0)
}
stopTsNs := s.options.StopTimeNs
// For SQL queries, stopTsNs = 0 means "no stop time restriction"
// This is different from message queue consumers which want to stop at "now"
// We detect SQL context by checking if we have a predicate function
if stopTsNs == 0 && s.options.Predicate == nil {
// Only set to current time for non-SQL queries (message queue consumers)
stopTsNs = time.Now().UnixNano()
}
// If stopTsNs is still 0, it means this is a SQL query that wants unrestricted scanning
// Message processing function
eachLogEntryFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
// Skip control entries without actual data
if s.hms.isControlEntry(logEntry) {
return false, nil // Skip this entry
}
// Convert log entry to schema_pb.RecordValue for consistent processing
recordValue, source, convertErr := s.hms.convertLogEntryToRecordValue(logEntry)
if convertErr != nil {
return false, fmt.Errorf("failed to convert log entry: %v", convertErr)
}
// Apply predicate filtering (WHERE clause)
if s.options.Predicate != nil && !s.options.Predicate(recordValue) {
return false, nil // Skip this message
}
// Extract system columns
timestamp := recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value()
key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()
// Apply column projection
values := make(map[string]*schema_pb.Value)
if len(s.options.Columns) == 0 {
// Select all columns (excluding system columns from user view)
for name, value := range recordValue.Fields {
if name != SW_COLUMN_NAME_TIMESTAMP && name != SW_COLUMN_NAME_KEY {
values[name] = value
}
}
} else {
// Select specified columns only
for _, columnName := range s.options.Columns {
if value, exists := recordValue.Fields[columnName]; exists {
values[columnName] = value
}
}
}
result := &HybridScanResult{
Values: values,
Timestamp: timestamp,
Key: key,
Source: source,
}
// Check if already closed before trying to send
if atomic.LoadInt32(&s.closed) != 0 {
return true, nil // Stop processing if closed
}
// Send result to channel with proper handling of closed channels
select {
case s.resultChan <- result:
return false, nil
case <-s.doneChan:
return true, nil // Stop processing if closed
default:
// Check again if closed (in case it was closed between the atomic check and select)
if atomic.LoadInt32(&s.closed) != 0 {
return true, nil
}
// If not closed, try sending again with blocking select
select {
case s.resultChan <- result:
return false, nil
case <-s.doneChan:
return true, nil
}
}
}
// Start scanning from the specified position
startPosition := log_buffer.MessagePosition{Time: startTime}
_, _, err := s.mergedReadFn(startPosition, stopTsNs, eachLogEntryFn)
if err != nil {
// Only try to send error if not already closed
if atomic.LoadInt32(&s.closed) == 0 {
select {
case s.errorChan <- fmt.Errorf("flushed data scan failed: %v", err):
case <-s.doneChan:
default:
// Channel might be full or closed, ignore
}
}
}
s.finished = true
}()
}
func (s *StreamingFlushedDataSource) Next() (*HybridScanResult, error) {
if !s.started {
s.startStreaming()
}
select {
case result, ok := <-s.resultChan:
if !ok {
return nil, nil // No more results
}
return result, nil
case err := <-s.errorChan:
return nil, err
case <-s.doneChan:
return nil, nil
}
}
func (s *StreamingFlushedDataSource) HasMore() bool {
if !s.started {
return true // Haven't started yet, so potentially has data
}
return !s.finished || len(s.resultChan) > 0
}
func (s *StreamingFlushedDataSource) Close() error {
// Use atomic flag to ensure channels are only closed once
if atomic.CompareAndSwapInt32(&s.closed, 0, 1) {
close(s.doneChan)
close(s.resultChan)
close(s.errorChan)
}
return nil
}
// mergeSort efficiently sorts HybridScanResult slice by timestamp using merge sort algorithm
func (hms *HybridMessageScanner) mergeSort(results []HybridScanResult, left, right int) {
if left < right {
mid := left + (right-left)/2
// Recursively sort both halves
hms.mergeSort(results, left, mid)
hms.mergeSort(results, mid+1, right)
// Merge the sorted halves
hms.merge(results, left, mid, right)
}
}
// merge combines two sorted subarrays into a single sorted array
func (hms *HybridMessageScanner) merge(results []HybridScanResult, left, mid, right int) {
// Create temporary arrays for the two subarrays
leftArray := make([]HybridScanResult, mid-left+1)
rightArray := make([]HybridScanResult, right-mid)
// Copy data to temporary arrays
copy(leftArray, results[left:mid+1])
copy(rightArray, results[mid+1:right+1])
// Merge the temporary arrays back into results[left..right]
i, j, k := 0, 0, left
for i < len(leftArray) && j < len(rightArray) {
if leftArray[i].Timestamp <= rightArray[j].Timestamp {
results[k] = leftArray[i]
i++
} else {
results[k] = rightArray[j]
j++
}
k++
}
// Copy remaining elements of leftArray, if any
for i < len(leftArray) {
results[k] = leftArray[i]
i++
k++
}
// Copy remaining elements of rightArray, if any
for j < len(rightArray) {
results[k] = rightArray[j]
j++
k++
}
}