save source buffer index start for log files

This commit is contained in:
chrislu
2025-09-01 23:33:35 -07:00
parent f5ed25f755
commit 7ca3b59c44
6 changed files with 325 additions and 13 deletions

View File

@@ -2,13 +2,20 @@ package broker
import ( import (
"fmt" "fmt"
"sync/atomic"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"sync/atomic"
"time"
) )
// LogBufferStart tracks the starting buffer index for a live log file
// Buffer indexes are monotonically increasing, count = number of chunks
type LogBufferStart struct {
StartIndex int64 `json:"start_index"` // Starting buffer index (count = len(chunks))
}
func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) log_buffer.LogFlushFuncType { func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) log_buffer.LogFlushFuncType {
partitionDir := topic.PartitionDir(t, p) partitionDir := topic.PartitionDir(t, p)
@@ -21,10 +28,11 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) l
targetFile := fmt.Sprintf("%s/%s", partitionDir, startTime.Format(topic.TIME_FORMAT)) targetFile := fmt.Sprintf("%s/%s", partitionDir, startTime.Format(topic.TIME_FORMAT))
// TODO append block with more metadata // Get buffer index (now globally unique across restarts)
bufferIndex := logBuffer.GetBatchIndex()
for { for {
if err := b.appendToFile(targetFile, buf); err != nil { if err := b.appendToFileWithBufferIndex(targetFile, buf, bufferIndex); err != nil {
glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err) glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err)
time.Sleep(737 * time.Millisecond) time.Sleep(737 * time.Millisecond)
} else { } else {
@@ -40,6 +48,6 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) l
localPartition.NotifyLogFlushed(logBuffer.LastFlushTsNs) localPartition.NotifyLogFlushed(logBuffer.LastFlushTsNs)
} }
glog.V(0).Infof("flushing at %d to %s size %d", logBuffer.LastFlushTsNs, targetFile, len(buf)) glog.V(0).Infof("flushing at %d to %s size %d from buffer %s (index %d)", logBuffer.LastFlushTsNs, targetFile, len(buf), logBuffer.GetName(), bufferIndex)
} }
} }

View File

@@ -2,16 +2,27 @@ package broker
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"os"
"time"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
"os"
"time"
) )
// LogBufferStart tracks the starting buffer index for a file
type LogBufferStart struct {
StartIndex int64 `json:"start_index"` // Starting buffer index (count = len(chunks))
}
func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error { func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error {
return b.appendToFileWithBufferIndex(targetFile, data, 0)
}
func (b *MessageQueueBroker) appendToFileWithBufferIndex(targetFile string, data []byte, bufferIndex int64) error {
fileId, uploadResult, err2 := b.assignAndUpload(targetFile, data) fileId, uploadResult, err2 := b.assignAndUpload(targetFile, data)
if err2 != nil { if err2 != nil {
@@ -35,10 +46,51 @@ func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error
Gid: uint32(os.Getgid()), Gid: uint32(os.Getgid()),
}, },
} }
// Add buffer start index for deduplication tracking
if bufferIndex != 0 {
entry.Extended = make(map[string][]byte)
bufferStart := LogBufferStart{
StartIndex: bufferIndex,
}
startJson, _ := json.Marshal(bufferStart)
entry.Extended["buffer_start"] = startJson
}
} else if err != nil { } else if err != nil {
return fmt.Errorf("find %s: %v", fullpath, err) return fmt.Errorf("find %s: %v", fullpath, err)
} else { } else {
offset = int64(filer.TotalSize(entry.GetChunks())) offset = int64(filer.TotalSize(entry.GetChunks()))
// Verify buffer index continuity for existing files (append operations)
if bufferIndex != 0 {
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
// Check for existing buffer start
if existingData, exists := entry.Extended["buffer_start"]; exists {
var bufferStart LogBufferStart
json.Unmarshal(existingData, &bufferStart)
// Verify that the new buffer index is consecutive
// Expected index = start + number of existing chunks
expectedIndex := bufferStart.StartIndex + int64(len(entry.GetChunks()))
if bufferIndex != expectedIndex {
// This shouldn't happen in normal operation
// Log warning but continue (don't crash the system)
fmt.Printf("Warning: non-consecutive buffer index. Expected %d, got %d\n",
expectedIndex, bufferIndex)
}
// Note: We don't update the start index - it stays the same
} else {
// No existing buffer start, create new one (shouldn't happen for existing files)
bufferStart := LogBufferStart{
StartIndex: bufferIndex,
}
startJson, _ := json.Marshal(bufferStart)
entry.Extended["buffer_start"] = startJson
}
}
} }
// append to existing chunks // append to existing chunks

View File

@@ -3,6 +3,10 @@ package topic
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"sync/atomic"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
@@ -10,9 +14,6 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"sync"
"sync/atomic"
"time"
) )
type LocalPartition struct { type LocalPartition struct {

View File

@@ -35,6 +35,12 @@ func withDebugMode(ctx context.Context) context.Context {
return context.WithValue(ctx, debugModeKey{}, true) return context.WithValue(ctx, debugModeKey{}, true)
} }
// LogBufferStart tracks the starting buffer index for a file
// Buffer indexes are monotonically increasing, count = len(chunks)
type LogBufferStart struct {
StartIndex int64 `json:"start_index"` // Starting buffer index (count = len(chunks))
}
// SQLEngine provides SQL query execution capabilities for SeaweedFS // SQLEngine provides SQL query execution capabilities for SeaweedFS
// Assumptions: // Assumptions:
// 1. MQ namespaces map directly to SQL databases // 1. MQ namespaces map directly to SQL databases
@@ -1994,7 +2000,7 @@ func (e *SQLEngine) extractParquetSourceFiles(fileStats []*ParquetFileStats) map
return sourceFiles return sourceFiles
} }
// countLiveLogRowsExcludingParquetSources counts live log rows but excludes files that were converted to parquet // countLiveLogRowsExcludingParquetSources counts live log rows but excludes files that were converted to parquet and duplicate log buffer data
func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context, partitionPath string, parquetSourceFiles map[string]bool) (int64, error) { func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context, partitionPath string, parquetSourceFiles map[string]bool) (int64, error) {
filerClient, err := e.catalog.brokerClient.GetFilerClient() filerClient, err := e.catalog.brokerClient.GetFilerClient()
if err != nil { if err != nil {
@@ -2009,9 +2015,23 @@ func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context,
actualSourceFiles = parquetSourceFiles actualSourceFiles = parquetSourceFiles
} }
// Second, get duplicate files from log buffer metadata
logBufferDuplicates, err := e.buildLogBufferDeduplicationMap(ctx, partitionPath)
if err != nil {
if isDebugMode(ctx) {
fmt.Printf("Warning: failed to build log buffer deduplication map: %v\n", err)
}
logBufferDuplicates = make(map[string]bool)
}
// Debug: Show deduplication status (only in explain mode) // Debug: Show deduplication status (only in explain mode)
if isDebugMode(ctx) && len(actualSourceFiles) > 0 { if isDebugMode(ctx) {
fmt.Printf("Excluding %d converted log files from %s\n", len(actualSourceFiles), partitionPath) if len(actualSourceFiles) > 0 {
fmt.Printf("Excluding %d converted log files from %s\n", len(actualSourceFiles), partitionPath)
}
if len(logBufferDuplicates) > 0 {
fmt.Printf("Excluding %d duplicate log buffer files from %s\n", len(logBufferDuplicates), partitionPath)
}
} }
totalRows := int64(0) totalRows := int64(0)
@@ -2028,6 +2048,14 @@ func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context,
return nil return nil
} }
// Skip files that are duplicated due to log buffer metadata
if logBufferDuplicates[entry.Name] {
if isDebugMode(ctx) {
fmt.Printf("Skipping %s (duplicate log buffer data)\n", entry.Name)
}
return nil
}
// Count rows in live log file // Count rows in live log file
rowCount, err := e.countRowsInLogFile(filerClient, partitionPath, entry) rowCount, err := e.countRowsInLogFile(filerClient, partitionPath, entry)
if err != nil { if err != nil {
@@ -2070,6 +2098,96 @@ func (e *SQLEngine) getParquetSourceFilesFromMetadata(partitionPath string) (map
return sourceFiles, err return sourceFiles, err
} }
// getLogBufferStartFromFile reads buffer start from file extended attributes
func (e *SQLEngine) getLogBufferStartFromFile(entry *filer_pb.Entry) (*LogBufferStart, error) {
if entry.Extended == nil {
return nil, nil
}
// Only support buffer_start format
if startJson, exists := entry.Extended["buffer_start"]; exists {
var bufferStart LogBufferStart
if err := json.Unmarshal(startJson, &bufferStart); err != nil {
return nil, fmt.Errorf("failed to parse buffer start: %v", err)
}
return &bufferStart, nil
}
return nil, nil
}
// buildLogBufferDeduplicationMap creates a map to track duplicate files based on buffer ranges (ultra-efficient)
func (e *SQLEngine) buildLogBufferDeduplicationMap(ctx context.Context, partitionPath string) (map[string]bool, error) {
if e.catalog.brokerClient == nil {
return make(map[string]bool), nil
}
filerClient, err := e.catalog.brokerClient.GetFilerClient()
if err != nil {
return make(map[string]bool), nil // Don't fail the query, just skip deduplication
}
// Track buffer ranges instead of individual indexes (much more efficient)
type BufferRange struct {
start, end int64
}
processedRanges := make([]BufferRange, 0)
duplicateFiles := make(map[string]bool)
err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error {
if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") {
return nil // Skip directories and parquet files
}
// Get buffer start for this file (most efficient)
bufferStart, err := e.getLogBufferStartFromFile(entry)
if err != nil || bufferStart == nil {
return nil // No buffer info, can't deduplicate
}
// Calculate range for this file: [start, start + chunkCount - 1]
chunkCount := int64(len(entry.GetChunks()))
if chunkCount == 0 {
return nil // Empty file, skip
}
fileRange := BufferRange{
start: bufferStart.StartIndex,
end: bufferStart.StartIndex + chunkCount - 1,
}
// Check if this range overlaps with any processed range
isDuplicate := false
for _, processedRange := range processedRanges {
if fileRange.start <= processedRange.end && fileRange.end >= processedRange.start {
// Ranges overlap - this file contains duplicate buffer indexes
isDuplicate = true
if isDebugMode(ctx) {
fmt.Printf("Marking %s as duplicate (buffer range [%d-%d] overlaps with [%d-%d])\n",
entry.Name, fileRange.start, fileRange.end, processedRange.start, processedRange.end)
}
break
}
}
if isDuplicate {
duplicateFiles[entry.Name] = true
} else {
// Add this range to processed ranges
processedRanges = append(processedRanges, fileRange)
}
return nil
})
if err != nil {
return make(map[string]bool), nil // Don't fail the query
}
return duplicateFiles, nil
}
// countRowsInLogFile counts rows in a single log file using SeaweedFS patterns // countRowsInLogFile counts rows in a single log file using SeaweedFS patterns
func (e *SQLEngine) countRowsInLogFile(filerClient filer_pb.FilerClient, partitionPath string, entry *filer_pb.Entry) (int64, error) { func (e *SQLEngine) countRowsInLogFile(filerClient filer_pb.FilerClient, partitionPath string, entry *filer_pb.Entry) (int64, error) {
lookupFileIdFn := filer.LookupFn(filerClient) lookupFileIdFn := filer.LookupFn(filerClient)

View File

@@ -2,6 +2,7 @@ package engine
import ( import (
"context" "context"
"encoding/json"
"errors" "errors"
"testing" "testing"
@@ -1136,3 +1137,120 @@ func TestSQLEngine_ConvertLogEntryToRecordValue_ComplexDataTypes(t *testing.T) {
assert.Contains(t, result.Fields, SW_COLUMN_NAME_TS) assert.Contains(t, result.Fields, SW_COLUMN_NAME_TS)
assert.Contains(t, result.Fields, SW_COLUMN_NAME_KEY) assert.Contains(t, result.Fields, SW_COLUMN_NAME_KEY)
} }
// Tests for log buffer deduplication functionality
func TestSQLEngine_GetLogBufferStartFromFile_NewFormat(t *testing.T) {
engine := NewTestSQLEngine()
// Create sample buffer start (new ultra-efficient format)
bufferStart := LogBufferStart{StartIndex: 1609459100000000001}
startJson, _ := json.Marshal(bufferStart)
// Create file entry with buffer start + some chunks
entry := &filer_pb.Entry{
Name: "test-log-file",
Extended: map[string][]byte{
"buffer_start": startJson,
},
Chunks: []*filer_pb.FileChunk{
{FileId: "chunk1", Offset: 0, Size: 1000},
{FileId: "chunk2", Offset: 1000, Size: 1000},
{FileId: "chunk3", Offset: 2000, Size: 1000},
},
}
// Test extraction
result, err := engine.getLogBufferStartFromFile(entry)
assert.NoError(t, err)
assert.NotNil(t, result)
assert.Equal(t, int64(1609459100000000001), result.StartIndex)
// Test extraction works correctly with the new format
}
func TestSQLEngine_GetLogBufferStartFromFile_NoMetadata(t *testing.T) {
engine := NewTestSQLEngine()
// Create file entry without buffer start
entry := &filer_pb.Entry{
Name: "test-log-file",
Extended: nil,
}
// Test extraction
result, err := engine.getLogBufferStartFromFile(entry)
assert.NoError(t, err)
assert.Nil(t, result)
}
func TestSQLEngine_GetLogBufferStartFromFile_InvalidData(t *testing.T) {
engine := NewTestSQLEngine()
// Create file entry with invalid buffer start
entry := &filer_pb.Entry{
Name: "test-log-file",
Extended: map[string][]byte{
"buffer_start": []byte("invalid-json"),
},
}
// Test extraction
result, err := engine.getLogBufferStartFromFile(entry)
assert.Error(t, err)
assert.Contains(t, err.Error(), "failed to parse buffer start")
assert.Nil(t, result)
}
func TestSQLEngine_BuildLogBufferDeduplicationMap_NoBrokerClient(t *testing.T) {
engine := NewTestSQLEngine()
engine.catalog.brokerClient = nil // Simulate no broker client
ctx := context.Background()
result, err := engine.buildLogBufferDeduplicationMap(ctx, "/topics/test/test-topic")
assert.NoError(t, err)
assert.NotNil(t, result)
assert.Empty(t, result)
}
func TestSQLEngine_LogBufferDeduplication_ServerRestartScenario(t *testing.T) {
// Simulate scenario: Buffer indexes are now initialized with process start time
// This tests that buffer start indexes are globally unique across server restarts
// Before server restart: Process 1 buffer start (3 chunks)
beforeRestartStart := LogBufferStart{
StartIndex: 1609459100000000000, // Process 1 start time
}
// After server restart: Process 2 buffer start (3 chunks)
afterRestartStart := LogBufferStart{
StartIndex: 1609459300000000000, // Process 2 start time (DIFFERENT)
}
// Simulate 3 chunks for each file
chunkCount := int64(3)
// Calculate end indexes for range comparison
beforeEnd := beforeRestartStart.StartIndex + chunkCount - 1 // [start, start+2]
afterStart := afterRestartStart.StartIndex // [start, start+2]
// Test range overlap detection (should NOT overlap)
overlaps := beforeRestartStart.StartIndex <= (afterStart+chunkCount-1) && beforeEnd >= afterStart
assert.False(t, overlaps, "Buffer ranges after restart should not overlap")
// Verify the start indexes are globally unique
assert.NotEqual(t, beforeRestartStart.StartIndex, afterRestartStart.StartIndex, "Start indexes should be different")
assert.Less(t, beforeEnd, afterStart, "Ranges should be completely separate")
// Expected values:
// Before restart: [1609459100000000000, 1609459100000000002]
// After restart: [1609459300000000000, 1609459300000000002]
expectedBeforeEnd := int64(1609459100000000002)
expectedAfterStart := int64(1609459300000000000)
assert.Equal(t, expectedBeforeEnd, beforeEnd)
assert.Equal(t, expectedAfterStart, afterStart)
// This demonstrates that buffer start indexes initialized with process start time
// prevent false positive duplicates across server restarts
}

View File

@@ -63,6 +63,7 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFunc
notifyFn: notifyFn, notifyFn: notifyFn,
flushChan: make(chan *dataToFlush, 256), flushChan: make(chan *dataToFlush, 256),
isStopping: new(atomic.Bool), isStopping: new(atomic.Bool),
batchIndex: time.Now().UnixNano(), // Initialize with process start time for uniqueness
} }
go lb.loopFlush() go lb.loopFlush()
go lb.loopInterval() go lb.loopInterval()
@@ -343,6 +344,20 @@ func (logBuffer *LogBuffer) ReleaseMemory(b *bytes.Buffer) {
bufferPool.Put(b) bufferPool.Put(b)
} }
// GetName returns the log buffer name for metadata tracking
func (logBuffer *LogBuffer) GetName() string {
logBuffer.RLock()
defer logBuffer.RUnlock()
return logBuffer.name
}
// GetBatchIndex returns the current batch index for metadata tracking
func (logBuffer *LogBuffer) GetBatchIndex() int64 {
logBuffer.RLock()
defer logBuffer.RUnlock()
return logBuffer.batchIndex
}
var bufferPool = sync.Pool{ var bufferPool = sync.Pool{
New: func() interface{} { New: func() interface{} {
return new(bytes.Buffer) return new(bytes.Buffer)