From 55dfb97fc80e8c19689ac4b9495fb88294f9854e Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 1 Sep 2025 15:11:06 -0700 Subject: [PATCH] parquet file generation remember the sources also --- weed/mq/logstore/log_to_parquet.go | 18 ++++- weed/query/engine/engine.go | 105 ++++++++++++++++++++++++++++- 2 files changed, 118 insertions(+), 5 deletions(-) diff --git a/weed/mq/logstore/log_to_parquet.go b/weed/mq/logstore/log_to_parquet.go index 9c9e776e6..3e04f64ae 100644 --- a/weed/mq/logstore/log_to_parquet.go +++ b/weed/mq/logstore/log_to_parquet.go @@ -3,6 +3,7 @@ package logstore import ( "context" "encoding/binary" + "encoding/json" "fmt" "io" "os" @@ -285,7 +286,14 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin // write to parquet file to partitionDir parquetFileName := fmt.Sprintf("%s.parquet", time.Unix(0, startTsNs).UTC().Format("2006-01-02-15-04-05")) - if err := saveParquetFileToPartitionDir(filerClient, tempFile, partitionDir, parquetFileName, preference, startTsNs, stopTsNs); err != nil { + + // Collect source log file names for deduplication metadata + var sourceLogFiles []string + for _, logFile := range logFileGroups { + sourceLogFiles = append(sourceLogFiles, logFile.Name) + } + + if err := saveParquetFileToPartitionDir(filerClient, tempFile, partitionDir, parquetFileName, preference, startTsNs, stopTsNs, sourceLogFiles); err != nil { return fmt.Errorf("save parquet file %s: %v", parquetFileName, err) } @@ -293,7 +301,7 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin } -func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64) error { +func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64, sourceLogFiles []string) error { uploader, err := operation.NewUploader() if err != nil { return fmt.Errorf("new uploader: %w", err) @@ -326,6 +334,12 @@ func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile binary.BigEndian.PutUint64(maxTsBytes, uint64(stopTsNs)) entry.Extended["max"] = maxTsBytes + // Store source log files for deduplication (JSON-encoded list) + if len(sourceLogFiles) > 0 { + sourceLogFilesJson, _ := json.Marshal(sourceLogFiles) + entry.Extended["sources"] = sourceLogFilesJson + } + for i := int64(0); i < chunkCount; i++ { fileId, uploadResult, err, _ := uploader.UploadWithRetry( filerClient, diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index 3e78e2208..ce96e9dfb 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -2,6 +2,7 @@ package engine import ( "context" + "encoding/json" "fmt" "math" "regexp" @@ -1399,8 +1400,11 @@ func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner } } - // Check if there are live log files and count their rows - liveLogRowCount, err := e.countLiveLogRows(partitionPath) + // Get parquet source files for deduplication + parquetSourceFiles := e.extractParquetSourceFiles(fileStats) + + // Check if there are live log files and count their rows (excluding parquet-converted files) + liveLogRowCount, err := e.countLiveLogRowsExcludingParquetSources(partitionPath, parquetSourceFiles) if err != nil { // If we can't count live logs, fall back to full scan return nil, false @@ -1415,7 +1419,7 @@ func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner // Debug: Show the hybrid optimization results if totalParquetRowCount > 0 || totalLiveLogRowCount > 0 { - fmt.Printf("Hybrid fast aggregation: %d parquet rows + %d live log rows from %d partitions with live logs\n", + fmt.Printf("Hybrid fast aggregation with deduplication: %d parquet rows + %d deduplicated live log rows from %d partitions\n", totalParquetRowCount, totalLiveLogRowCount, partitionsWithLiveLogs) } @@ -1676,6 +1680,101 @@ func (e *SQLEngine) countLiveLogRows(partitionPath string) (int64, error) { return totalRows, err } +// extractParquetSourceFiles extracts source log file names from parquet file metadata for deduplication +func (e *SQLEngine) extractParquetSourceFiles(fileStats []*ParquetFileStats) map[string]bool { + sourceFiles := make(map[string]bool) + + for _, fileStat := range fileStats { + // Each ParquetFileStats should have a reference to the original file entry + // but we need to get it through the hybrid scanner to access Extended metadata + // This is a simplified approach - in practice we'd need to access the filer entry + + // For now, we'll use filename-based deduplication as a fallback + // Extract timestamp from parquet filename (YYYY-MM-DD-HH-MM-SS.parquet) + if strings.HasSuffix(fileStat.FileName, ".parquet") { + timeStr := strings.TrimSuffix(fileStat.FileName, ".parquet") + // Mark this timestamp range as covered by parquet + sourceFiles[timeStr] = true + } + } + + return sourceFiles +} + +// countLiveLogRowsExcludingParquetSources counts live log rows but excludes files that were converted to parquet +func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(partitionPath string, parquetSourceFiles map[string]bool) (int64, error) { + filerClient, err := e.catalog.brokerClient.GetFilerClient() + if err != nil { + return 0, err + } + + // First, get the actual source files from parquet metadata + actualSourceFiles, err := e.getParquetSourceFilesFromMetadata(partitionPath) + if err != nil { + // If we can't read parquet metadata, use filename-based fallback + fmt.Printf("Warning: failed to read parquet metadata, using filename-based deduplication: %v\n", err) + actualSourceFiles = parquetSourceFiles + } + + // Debug: Show deduplication status + if len(actualSourceFiles) > 0 { + fmt.Printf("Excluding %d converted log files from %s\n", len(actualSourceFiles), partitionPath) + } + + totalRows := int64(0) + err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { + if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") { + return nil // Skip directories and parquet files + } + + // Skip files that have been converted to parquet + if actualSourceFiles[entry.Name] { + fmt.Printf("Skipping %s (already converted to parquet)\n", entry.Name) + return nil + } + + // Count rows in live log file + rowCount, err := e.countRowsInLogFile(filerClient, partitionPath, entry) + if err != nil { + fmt.Printf("Warning: failed to count rows in %s/%s: %v\n", partitionPath, entry.Name, err) + return nil // Continue with other files + } + totalRows += rowCount + return nil + }) + return totalRows, err +} + +// getParquetSourceFilesFromMetadata reads parquet file metadata to get actual source log files +func (e *SQLEngine) getParquetSourceFilesFromMetadata(partitionPath string) (map[string]bool, error) { + filerClient, err := e.catalog.brokerClient.GetFilerClient() + if err != nil { + return nil, err + } + + sourceFiles := make(map[string]bool) + + err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { + if entry.IsDirectory || !strings.HasSuffix(entry.Name, ".parquet") { + return nil + } + + // Read source files from Extended metadata + if entry.Extended != nil && entry.Extended["sources"] != nil { + var sources []string + if err := json.Unmarshal(entry.Extended["sources"], &sources); err == nil { + for _, source := range sources { + sourceFiles[source] = true + } + } + } + + return nil + }) + + return sourceFiles, err +} + // countRowsInLogFile counts rows in a single log file using SeaweedFS patterns func (e *SQLEngine) countRowsInLogFile(filerClient filer_pb.FilerClient, partitionPath string, entry *filer_pb.Entry) (int64, error) { lookupFileIdFn := filer.LookupFn(filerClient)