parquet file can query messages in broker memory, if log files do not exist

This commit is contained in:
chrislu
2025-09-02 00:49:12 -07:00
parent 618cb89885
commit 61bacd23b0
2 changed files with 78 additions and 1 deletions

View File

@@ -518,7 +518,13 @@ func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topi
} }
// getEarliestBufferStart finds the earliest buffer_start index from disk files in the partition // getEarliestBufferStart finds the earliest buffer_start index from disk files in the partition
// This checks both live log files and Parquet files for the most precise deduplication //
// This method handles three scenarios for seamless broker querying:
// 1. Live log files exist: Uses their buffer_start metadata (most recent boundaries)
// 2. Only Parquet files exist: Uses Parquet buffer_start metadata (preserved from archived sources)
// 3. Mixed files: Uses earliest buffer_start from all sources for comprehensive coverage
//
// This ensures continuous real-time querying capability even after log file compaction/archival
func (c *BrokerClient) getEarliestBufferStart(ctx context.Context, partitionPath string) (int64, error) { func (c *BrokerClient) getEarliestBufferStart(ctx context.Context, partitionPath string) (int64, error) {
filerClient, err := c.GetFilerClient() filerClient, err := c.GetFilerClient()
if err != nil { if err != nil {
@@ -526,6 +532,8 @@ func (c *BrokerClient) getEarliestBufferStart(ctx context.Context, partitionPath
} }
var earliestBufferIndex int64 = -1 // -1 means no buffer_start found var earliestBufferIndex int64 = -1 // -1 means no buffer_start found
var logFileCount, parquetFileCount int
var bufferStartSources []string // Track which files provide buffer_start
err = filer_pb.ReadDirAllEntries(ctx, filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { err = filer_pb.ReadDirAllEntries(ctx, filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error {
// Skip directories // Skip directories
@@ -533,17 +541,38 @@ func (c *BrokerClient) getEarliestBufferStart(ctx context.Context, partitionPath
return nil return nil
} }
// Count file types for scenario detection
if strings.HasSuffix(entry.Name, ".parquet") {
parquetFileCount++
} else {
logFileCount++
}
// Extract buffer_start from file extended attributes (both log files and parquet files) // Extract buffer_start from file extended attributes (both log files and parquet files)
bufferStart := c.getBufferStartFromEntry(entry) bufferStart := c.getBufferStartFromEntry(entry)
if bufferStart != nil && bufferStart.StartIndex > 0 { if bufferStart != nil && bufferStart.StartIndex > 0 {
if earliestBufferIndex == -1 || bufferStart.StartIndex < earliestBufferIndex { if earliestBufferIndex == -1 || bufferStart.StartIndex < earliestBufferIndex {
earliestBufferIndex = bufferStart.StartIndex earliestBufferIndex = bufferStart.StartIndex
} }
bufferStartSources = append(bufferStartSources, entry.Name)
} }
return nil return nil
}) })
// Debug: Show buffer_start determination logic in EXPLAIN mode
if isDebugMode(ctx) && len(bufferStartSources) > 0 {
if logFileCount == 0 && parquetFileCount > 0 {
fmt.Printf("Debug: Using Parquet buffer_start metadata (no log files) - sources: %v\n", bufferStartSources)
} else if logFileCount > 0 && parquetFileCount > 0 {
fmt.Printf("Debug: Using mixed sources for buffer_start - log files: %d, Parquet files: %d, sources: %v\n",
logFileCount, parquetFileCount, bufferStartSources)
} else {
fmt.Printf("Debug: Using log file buffer_start metadata - sources: %v\n", bufferStartSources)
}
fmt.Printf("Debug: Earliest buffer_start index: %d\n", earliestBufferIndex)
}
if err != nil { if err != nil {
return 0, fmt.Errorf("failed to scan partition directory: %v", err) return 0, fmt.Errorf("failed to scan partition directory: %v", err)
} }

View File

@@ -2,6 +2,7 @@ package engine
import ( import (
"context" "context"
"encoding/binary"
"encoding/json" "encoding/json"
"errors" "errors"
"testing" "testing"
@@ -1254,3 +1255,50 @@ func TestSQLEngine_LogBufferDeduplication_ServerRestartScenario(t *testing.T) {
// This demonstrates that buffer start indexes initialized with process start time // This demonstrates that buffer start indexes initialized with process start time
// prevent false positive duplicates across server restarts // prevent false positive duplicates across server restarts
} }
func TestBrokerClient_ParquetBufferStartForBrokerQuery(t *testing.T) {
// Test scenario: getBufferStartFromEntry should handle both JSON and binary formats
// This tests the dual format support for buffer_start metadata
realBrokerClient := &BrokerClient{}
// Test binary format (Parquet files)
parquetEntry := &filer_pb.Entry{
Name: "2025-01-07-14-30.parquet",
IsDirectory: false,
Extended: map[string][]byte{
"buffer_start": func() []byte {
// Binary format: 8-byte BigEndian
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(2000001))
return buf
}(),
},
}
bufferStart := realBrokerClient.getBufferStartFromEntry(parquetEntry)
assert.NotNil(t, bufferStart)
assert.Equal(t, int64(2000001), bufferStart.StartIndex, "Should parse binary buffer_start from Parquet file")
// Test JSON format (log files)
logEntry := &filer_pb.Entry{
Name: "2025-01-07-14-30-45",
IsDirectory: false,
Extended: map[string][]byte{
"buffer_start": []byte(`{"start_index": 1500001}`),
},
}
bufferStart = realBrokerClient.getBufferStartFromEntry(logEntry)
assert.NotNil(t, bufferStart)
assert.Equal(t, int64(1500001), bufferStart.StartIndex, "Should parse JSON buffer_start from log file")
// Test missing metadata
emptyEntry := &filer_pb.Entry{
Name: "no-metadata",
IsDirectory: false,
Extended: nil,
}
bufferStart = realBrokerClient.getBufferStartFromEntry(emptyEntry)
assert.Nil(t, bufferStart, "Should return nil for entry without buffer_start metadata")
}