parquet file generation remember the sources also

This commit is contained in:
chrislu
2025-09-01 15:11:06 -07:00
parent 2fa8991a52
commit 55dfb97fc8
2 changed files with 118 additions and 5 deletions

View File

@@ -3,6 +3,7 @@ package logstore
import ( import (
"context" "context"
"encoding/binary" "encoding/binary"
"encoding/json"
"fmt" "fmt"
"io" "io"
"os" "os"
@@ -285,7 +286,14 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin
// write to parquet file to partitionDir // write to parquet file to partitionDir
parquetFileName := fmt.Sprintf("%s.parquet", time.Unix(0, startTsNs).UTC().Format("2006-01-02-15-04-05")) parquetFileName := fmt.Sprintf("%s.parquet", time.Unix(0, startTsNs).UTC().Format("2006-01-02-15-04-05"))
if err := saveParquetFileToPartitionDir(filerClient, tempFile, partitionDir, parquetFileName, preference, startTsNs, stopTsNs); err != nil {
// Collect source log file names for deduplication metadata
var sourceLogFiles []string
for _, logFile := range logFileGroups {
sourceLogFiles = append(sourceLogFiles, logFile.Name)
}
if err := saveParquetFileToPartitionDir(filerClient, tempFile, partitionDir, parquetFileName, preference, startTsNs, stopTsNs, sourceLogFiles); err != nil {
return fmt.Errorf("save parquet file %s: %v", parquetFileName, err) return fmt.Errorf("save parquet file %s: %v", parquetFileName, err)
} }
@@ -293,7 +301,7 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin
} }
func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64) error { func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64, sourceLogFiles []string) error {
uploader, err := operation.NewUploader() uploader, err := operation.NewUploader()
if err != nil { if err != nil {
return fmt.Errorf("new uploader: %w", err) return fmt.Errorf("new uploader: %w", err)
@@ -326,6 +334,12 @@ func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile
binary.BigEndian.PutUint64(maxTsBytes, uint64(stopTsNs)) binary.BigEndian.PutUint64(maxTsBytes, uint64(stopTsNs))
entry.Extended["max"] = maxTsBytes entry.Extended["max"] = maxTsBytes
// Store source log files for deduplication (JSON-encoded list)
if len(sourceLogFiles) > 0 {
sourceLogFilesJson, _ := json.Marshal(sourceLogFiles)
entry.Extended["sources"] = sourceLogFilesJson
}
for i := int64(0); i < chunkCount; i++ { for i := int64(0); i < chunkCount; i++ {
fileId, uploadResult, err, _ := uploader.UploadWithRetry( fileId, uploadResult, err, _ := uploader.UploadWithRetry(
filerClient, filerClient,

View File

@@ -2,6 +2,7 @@ package engine
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"math" "math"
"regexp" "regexp"
@@ -1399,8 +1400,11 @@ func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner
} }
} }
// Check if there are live log files and count their rows // Get parquet source files for deduplication
liveLogRowCount, err := e.countLiveLogRows(partitionPath) parquetSourceFiles := e.extractParquetSourceFiles(fileStats)
// Check if there are live log files and count their rows (excluding parquet-converted files)
liveLogRowCount, err := e.countLiveLogRowsExcludingParquetSources(partitionPath, parquetSourceFiles)
if err != nil { if err != nil {
// If we can't count live logs, fall back to full scan // If we can't count live logs, fall back to full scan
return nil, false return nil, false
@@ -1415,7 +1419,7 @@ func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner
// Debug: Show the hybrid optimization results // Debug: Show the hybrid optimization results
if totalParquetRowCount > 0 || totalLiveLogRowCount > 0 { if totalParquetRowCount > 0 || totalLiveLogRowCount > 0 {
fmt.Printf("Hybrid fast aggregation: %d parquet rows + %d live log rows from %d partitions with live logs\n", fmt.Printf("Hybrid fast aggregation with deduplication: %d parquet rows + %d deduplicated live log rows from %d partitions\n",
totalParquetRowCount, totalLiveLogRowCount, partitionsWithLiveLogs) totalParquetRowCount, totalLiveLogRowCount, partitionsWithLiveLogs)
} }
@@ -1676,6 +1680,101 @@ func (e *SQLEngine) countLiveLogRows(partitionPath string) (int64, error) {
return totalRows, err return totalRows, err
} }
// extractParquetSourceFiles extracts source log file names from parquet file metadata for deduplication
func (e *SQLEngine) extractParquetSourceFiles(fileStats []*ParquetFileStats) map[string]bool {
sourceFiles := make(map[string]bool)
for _, fileStat := range fileStats {
// Each ParquetFileStats should have a reference to the original file entry
// but we need to get it through the hybrid scanner to access Extended metadata
// This is a simplified approach - in practice we'd need to access the filer entry
// For now, we'll use filename-based deduplication as a fallback
// Extract timestamp from parquet filename (YYYY-MM-DD-HH-MM-SS.parquet)
if strings.HasSuffix(fileStat.FileName, ".parquet") {
timeStr := strings.TrimSuffix(fileStat.FileName, ".parquet")
// Mark this timestamp range as covered by parquet
sourceFiles[timeStr] = true
}
}
return sourceFiles
}
// countLiveLogRowsExcludingParquetSources counts live log rows but excludes files that were converted to parquet
func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(partitionPath string, parquetSourceFiles map[string]bool) (int64, error) {
filerClient, err := e.catalog.brokerClient.GetFilerClient()
if err != nil {
return 0, err
}
// First, get the actual source files from parquet metadata
actualSourceFiles, err := e.getParquetSourceFilesFromMetadata(partitionPath)
if err != nil {
// If we can't read parquet metadata, use filename-based fallback
fmt.Printf("Warning: failed to read parquet metadata, using filename-based deduplication: %v\n", err)
actualSourceFiles = parquetSourceFiles
}
// Debug: Show deduplication status
if len(actualSourceFiles) > 0 {
fmt.Printf("Excluding %d converted log files from %s\n", len(actualSourceFiles), partitionPath)
}
totalRows := int64(0)
err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error {
if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") {
return nil // Skip directories and parquet files
}
// Skip files that have been converted to parquet
if actualSourceFiles[entry.Name] {
fmt.Printf("Skipping %s (already converted to parquet)\n", entry.Name)
return nil
}
// Count rows in live log file
rowCount, err := e.countRowsInLogFile(filerClient, partitionPath, entry)
if err != nil {
fmt.Printf("Warning: failed to count rows in %s/%s: %v\n", partitionPath, entry.Name, err)
return nil // Continue with other files
}
totalRows += rowCount
return nil
})
return totalRows, err
}
// getParquetSourceFilesFromMetadata reads parquet file metadata to get actual source log files
func (e *SQLEngine) getParquetSourceFilesFromMetadata(partitionPath string) (map[string]bool, error) {
filerClient, err := e.catalog.brokerClient.GetFilerClient()
if err != nil {
return nil, err
}
sourceFiles := make(map[string]bool)
err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error {
if entry.IsDirectory || !strings.HasSuffix(entry.Name, ".parquet") {
return nil
}
// Read source files from Extended metadata
if entry.Extended != nil && entry.Extended["sources"] != nil {
var sources []string
if err := json.Unmarshal(entry.Extended["sources"], &sources); err == nil {
for _, source := range sources {
sourceFiles[source] = true
}
}
}
return nil
})
return sourceFiles, err
}
// countRowsInLogFile counts rows in a single log file using SeaweedFS patterns // countRowsInLogFile counts rows in a single log file using SeaweedFS patterns
func (e *SQLEngine) countRowsInLogFile(filerClient filer_pb.FilerClient, partitionPath string, entry *filer_pb.Entry) (int64, error) { func (e *SQLEngine) countRowsInLogFile(filerClient filer_pb.FilerClient, partitionPath string, entry *filer_pb.Entry) (int64, error) {
lookupFileIdFn := filer.LookupFn(filerClient) lookupFileIdFn := filer.LookupFn(filerClient)