the parquet file should also remember the first buffer_start attribute from the sources

This commit is contained in:
chrislu
2025-09-02 00:42:37 -07:00
parent db75742e37
commit 618cb89885
2 changed files with 56 additions and 10 deletions

View File

@@ -287,13 +287,21 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin
// write to parquet file to partitionDir
parquetFileName := fmt.Sprintf("%s.parquet", time.Unix(0, startTsNs).UTC().Format("2006-01-02-15-04-05"))
// Collect source log file names for deduplication metadata
// Collect source log file names and buffer_start metadata for deduplication
var sourceLogFiles []string
var earliestBufferStart int64
for _, logFile := range logFileGroups {
sourceLogFiles = append(sourceLogFiles, logFile.Name)
// Extract buffer_start from log file metadata
if bufferStart := getBufferStartFromLogFile(logFile); bufferStart > 0 {
if earliestBufferStart == 0 || bufferStart < earliestBufferStart {
earliestBufferStart = bufferStart
}
}
}
if err := saveParquetFileToPartitionDir(filerClient, tempFile, partitionDir, parquetFileName, preference, startTsNs, stopTsNs, sourceLogFiles); err != nil {
if err := saveParquetFileToPartitionDir(filerClient, tempFile, partitionDir, parquetFileName, preference, startTsNs, stopTsNs, sourceLogFiles, earliestBufferStart); err != nil {
return fmt.Errorf("save parquet file %s: %v", parquetFileName, err)
}
@@ -301,7 +309,7 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin
}
func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64, sourceLogFiles []string) error {
func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64, sourceLogFiles []string, earliestBufferStart int64) error {
uploader, err := operation.NewUploader()
if err != nil {
return fmt.Errorf("new uploader: %w", err)
@@ -340,6 +348,13 @@ func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile
entry.Extended["sources"] = sourceLogFilesJson
}
// Store earliest buffer_start for precise broker deduplication
if earliestBufferStart > 0 {
bufferStartBytes := make([]byte, 8)
binary.BigEndian.PutUint64(bufferStartBytes, uint64(earliestBufferStart))
entry.Extended["buffer_start"] = bufferStartBytes
}
for i := int64(0); i < chunkCount; i++ {
fileId, uploadResult, err, _ := uploader.UploadWithRetry(
filerClient,
@@ -472,3 +487,24 @@ func eachChunk(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType) (proc
return
}
// getBufferStartFromLogFile extracts the buffer_start index from log file extended metadata
func getBufferStartFromLogFile(logFile *filer_pb.Entry) int64 {
if logFile.Extended == nil {
return 0
}
// Parse buffer_start format (same as used in query engine)
if startJson, exists := logFile.Extended["buffer_start"]; exists {
// LogBufferStart struct (JSON format)
type LogBufferStart struct {
StartIndex int64 `json:"start_index"`
}
var bufferStart LogBufferStart
if err := json.Unmarshal(startJson, &bufferStart); err == nil {
return bufferStart.StartIndex
}
}
return 0
}

View File

@@ -2,6 +2,7 @@ package engine
import (
"context"
"encoding/binary"
"encoding/json"
"fmt"
"io"
@@ -517,7 +518,7 @@ func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topi
}
// getEarliestBufferStart finds the earliest buffer_start index from disk files in the partition
// This is used for precise deduplication - any buffer index >= this value may still be in memory
// This checks both live log files and Parquet files for the most precise deduplication
func (c *BrokerClient) getEarliestBufferStart(ctx context.Context, partitionPath string) (int64, error) {
filerClient, err := c.GetFilerClient()
if err != nil {
@@ -527,12 +528,12 @@ func (c *BrokerClient) getEarliestBufferStart(ctx context.Context, partitionPath
var earliestBufferIndex int64 = -1 // -1 means no buffer_start found
err = filer_pb.ReadDirAllEntries(ctx, filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error {
// Skip directories and parquet files
if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") {
// Skip directories
if entry.IsDirectory {
return nil
}
// Extract buffer_start from file extended attributes
// Extract buffer_start from file extended attributes (both log files and parquet files)
bufferStart := c.getBufferStartFromEntry(entry)
if bufferStart != nil && bufferStart.StartIndex > 0 {
if earliestBufferIndex == -1 || bufferStart.StartIndex < earliestBufferIndex {
@@ -555,15 +556,24 @@ func (c *BrokerClient) getEarliestBufferStart(ctx context.Context, partitionPath
}
// getBufferStartFromEntry extracts LogBufferStart from file entry metadata
// Handles both JSON format (log files) and binary format (Parquet files)
func (c *BrokerClient) getBufferStartFromEntry(entry *filer_pb.Entry) *LogBufferStart {
if entry.Extended == nil {
return nil
}
// Parse buffer_start format
if startJson, exists := entry.Extended["buffer_start"]; exists {
if startData, exists := entry.Extended["buffer_start"]; exists {
// Try binary format first (Parquet files)
if len(startData) == 8 {
startIndex := int64(binary.BigEndian.Uint64(startData))
if startIndex > 0 {
return &LogBufferStart{StartIndex: startIndex}
}
}
// Try JSON format (log files)
var bufferStart LogBufferStart
if err := json.Unmarshal(startJson, &bufferStart); err == nil {
if err := json.Unmarshal(startData, &bufferStart); err == nil {
return &bufferStart
}
}