filter out already flushed messages

This commit is contained in:
chrislu
2025-09-02 00:10:07 -07:00
parent 467034c8c7
commit e3a56d7c30
10 changed files with 864 additions and 177 deletions

View File

@@ -427,3 +427,86 @@ func (c *BrokerClient) ListTopicPartitions(ctx context.Context, namespace, topic
return partitions, nil
}
// GetUnflushedMessages returns only messages that haven't been flushed to disk yet
// Uses buffer_start metadata to determine what data has been persisted vs still in-memory
// This prevents double-counting when combining with disk-based data
func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topicName string, partition topic.Partition, startTimeNs int64) ([]*filer_pb.LogEntry, error) {
// Step 1: Find the broker that hosts this partition
if err := c.findBrokerBalancer(); err != nil {
// Return empty slice if we can't find broker - prevents double-counting
return []*filer_pb.LogEntry{}, nil
}
// Step 2: Connect to broker and call the GetUnflushedMessages gRPC method
conn, err := grpc.Dial(c.brokerAddress, c.grpcDialOption)
if err != nil {
// Return empty slice if connection fails - prevents double-counting
return []*filer_pb.LogEntry{}, nil
}
defer conn.Close()
client := mq_pb.NewSeaweedMessagingClient(conn)
// Step 3: Prepare the request using oneof start_filter (timestamp-based)
request := &mq_pb.GetUnflushedMessagesRequest{
Topic: &schema_pb.Topic{
Namespace: namespace,
Name: topicName,
},
Partition: &schema_pb.Partition{
RingSize: partition.RingSize,
RangeStart: partition.RangeStart,
RangeStop: partition.RangeStop,
UnixTimeNs: partition.UnixTimeNs,
},
StartFilter: &mq_pb.GetUnflushedMessagesRequest_StartTimeNs{
StartTimeNs: startTimeNs,
},
// TODO: Could use buffer index filtering for more precision:
// StartFilter: &mq_pb.GetUnflushedMessagesRequest_StartBufferIndex{
// StartBufferIndex: latestBufferIndex,
// },
}
// Step 4: Call the broker streaming API
stream, err := client.GetUnflushedMessages(ctx, request)
if err != nil {
// Return empty slice if gRPC call fails - prevents double-counting
return []*filer_pb.LogEntry{}, nil
}
// Step 5: Receive streaming responses
var logEntries []*filer_pb.LogEntry
for {
response, err := stream.Recv()
if err != nil {
// End of stream or error - return what we have to prevent double-counting
break
}
// Handle error messages
if response.Error != "" {
// Log the error but return empty slice - prevents double-counting
// (In debug mode, this would be visible)
return []*filer_pb.LogEntry{}, nil
}
// Check for end of stream
if response.EndOfStream {
break
}
// Convert and collect the message
if response.Message != nil {
logEntries = append(logEntries, &filer_pb.LogEntry{
TsNs: response.Message.TsNs,
Key: response.Message.Key,
Data: response.Message.Data,
PartitionKeyHash: int32(response.Message.PartitionKeyHash), // Convert uint32 to int32
})
}
}
return logEntries, nil
}

View File

@@ -7,6 +7,7 @@ import (
"time"
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
@@ -20,6 +21,9 @@ type BrokerClientInterface interface {
GetFilerClient() (filer_pb.FilerClient, error)
ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, recordType *schema_pb.RecordType) error
DeleteTopic(ctx context.Context, namespace, topicName string) error
// GetUnflushedMessages returns only messages that haven't been flushed to disk yet
// This prevents double-counting when combining with disk-based data
GetUnflushedMessages(ctx context.Context, namespace, topicName string, partition topic.Partition, startTimeNs int64) ([]*filer_pb.LogEntry, error)
}
// SchemaCatalog manages the mapping between MQ topics and SQL tables

View File

@@ -165,7 +165,7 @@ func (hms *HybridMessageScanner) Scan(ctx context.Context, options HybridScanOpt
return results, nil
}
// scanUnflushedData queries brokers for unflushed in-memory data
// scanUnflushedData queries brokers for unflushed in-memory data using buffer_start deduplication
func (hms *HybridMessageScanner) scanUnflushedData(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) {
var results []HybridScanResult
@@ -174,31 +174,33 @@ func (hms *HybridMessageScanner) scanUnflushedData(ctx context.Context, partitio
return results, nil
}
// Get broker address for this partition
// TODO: Implement proper broker discovery for partition
// For now, assume broker client knows how to reach the right broker
// Step 1: Get unflushed data from broker using our new interface method
// This method uses buffer_start metadata to avoid double-counting
unflushedEntries, err := hms.brokerClient.GetUnflushedMessages(ctx, hms.topic.Namespace, hms.topic.Name, partition, options.StartTimeNs)
if err != nil {
// Log error but don't fail the query - continue with disk data only
if isDebugMode(ctx) {
fmt.Printf("Debug: Failed to get unflushed messages: %v\n", err)
}
return results, nil
}
// Create a temporary slice to collect unflushed messages
unflushedMessages := make([]*mq_pb.DataMessage, 0)
// We need to call the broker to get unflushed data
// For now, we'll implement this as a best-effort approach
// In a full implementation, this would require a new gRPC method on the broker
// TODO: Implement actual broker gRPC call to get unflushed data
// Convert unflushed messages to HybridScanResult format
for _, msg := range unflushedMessages {
// Step 2: Process unflushed entries (already deduplicated by broker)
for _, logEntry := range unflushedEntries {
// Skip messages outside time range
if options.StartTimeNs > 0 && msg.TsNs < options.StartTimeNs {
if options.StartTimeNs > 0 && logEntry.TsNs < options.StartTimeNs {
continue
}
if options.StopTimeNs > 0 && msg.TsNs > options.StopTimeNs {
if options.StopTimeNs > 0 && logEntry.TsNs > options.StopTimeNs {
continue
}
// Convert DataMessage to RecordValue format
recordValue, _, err := hms.convertDataMessageToRecord(msg)
// Convert LogEntry to RecordValue format (same as disk data)
recordValue, _, err := hms.convertLogEntryToRecordValue(logEntry)
if err != nil {
if isDebugMode(ctx) {
fmt.Printf("Debug: Failed to convert unflushed log entry: %v\n", err)
}
continue // Skip malformed messages
}
@@ -207,12 +209,34 @@ func (hms *HybridMessageScanner) scanUnflushedData(ctx context.Context, partitio
continue
}
// Convert to HybridScanResult
// Extract system columns for result
timestamp := recordValue.Fields[SW_COLUMN_NAME_TS].GetInt64Value()
key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()
// Apply column projection
values := make(map[string]*schema_pb.Value)
if len(options.Columns) == 0 {
// Select all columns (excluding system columns from user view)
for name, value := range recordValue.Fields {
if name != SW_COLUMN_NAME_TS && name != SW_COLUMN_NAME_KEY {
values[name] = value
}
}
} else {
// Select specified columns only
for _, columnName := range options.Columns {
if value, exists := recordValue.Fields[columnName]; exists {
values[columnName] = value
}
}
}
// Create result with proper source tagging
result := HybridScanResult{
Values: recordValue.Fields,
Timestamp: msg.TsNs,
Key: msg.Key,
Source: "in_memory_broker",
Values: values,
Timestamp: timestamp,
Key: key,
Source: "in_memory_broker", // Tag for debugging/analysis
}
results = append(results, result)
@@ -223,6 +247,10 @@ func (hms *HybridMessageScanner) scanUnflushedData(ctx context.Context, partitio
}
}
if isDebugMode(ctx) {
fmt.Printf("Debug: Retrieved %d unflushed messages from broker\n", len(results))
}
return results, nil
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
@@ -221,3 +222,24 @@ func (m *MockBrokerClient) DeleteTopic(ctx context.Context, namespace, topicName
return nil
}
// GetUnflushedMessages returns mock unflushed data for testing
// Always returns empty slice to simulate safe deduplication behavior
func (m *MockBrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topicName string, partition topic.Partition, startTimeNs int64) ([]*filer_pb.LogEntry, error) {
if m.shouldFail {
return nil, fmt.Errorf("mock broker failed to get unflushed messages: %s", m.failMessage)
}
// For testing, return empty slice to simulate:
// 1. No unflushed data available
// 2. Safe deduplication behavior (prevents double-counting)
// 3. Successful broker communication
//
// In a real implementation, this would:
// - Connect to actual broker
// - Access LocalPartition's LogBuffer
// - Use buffer_start metadata for deduplication
// - Return only truly unflushed messages
return []*filer_pb.LogEntry{}, nil
}