This commit is contained in:
chrislu
2025-09-01 13:09:44 -07:00
parent 8645f3a264
commit 8498240460
2 changed files with 52 additions and 51 deletions

View File

@@ -4,6 +4,11 @@ import (
"context" "context"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"io"
"os"
"strings"
"time"
"github.com/parquet-go/parquet-go" "github.com/parquet-go/parquet-go"
"github.com/parquet-go/parquet-go/compress/zstd" "github.com/parquet-go/parquet-go/compress/zstd"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
@@ -16,10 +21,6 @@ import (
util_http "github.com/seaweedfs/seaweedfs/weed/util/http" util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"io"
"os"
"strings"
"time"
) )
const ( const (

View File

@@ -29,10 +29,10 @@ const (
// 3. System columns (_ts_ns, _key) are added to user schema // 3. System columns (_ts_ns, _key) are added to user schema
// 4. Predicate pushdown is used for efficient scanning // 4. Predicate pushdown is used for efficient scanning
type ParquetScanner struct { type ParquetScanner struct {
filerClient filer_pb.FilerClient filerClient filer_pb.FilerClient
chunkCache chunk_cache.ChunkCache chunkCache chunk_cache.ChunkCache
topic topic.Topic topic topic.Topic
recordSchema *schema_pb.RecordType recordSchema *schema_pb.RecordType
parquetLevels *schema.ParquetLevels parquetLevels *schema.ParquetLevels
} }
@@ -47,7 +47,7 @@ func NewParquetScanner(filerClient filer_pb.FilerClient, namespace, topicName st
// Create topic reference // Create topic reference
t := topic.Topic{ t := topic.Topic{
Namespace: namespace, Namespace: namespace,
Name: topicName, Name: topicName,
} }
// Read topic configuration to get schema // Read topic configuration to get schema
@@ -92,13 +92,13 @@ type ScanOptions struct {
// Time range filtering (Unix nanoseconds) // Time range filtering (Unix nanoseconds)
StartTimeNs int64 StartTimeNs int64
StopTimeNs int64 StopTimeNs int64
// Column projection - if empty, select all columns // Column projection - if empty, select all columns
Columns []string Columns []string
// Row limit - 0 means no limit // Row limit - 0 means no limit
Limit int Limit int
// Predicate for WHERE clause filtering // Predicate for WHERE clause filtering
Predicate func(*schema_pb.RecordValue) bool Predicate func(*schema_pb.RecordValue) bool
} }
@@ -117,59 +117,59 @@ type ScanResult struct {
// 3. Applies predicates and projections after reading // 3. Applies predicates and projections after reading
func (ps *ParquetScanner) Scan(ctx context.Context, options ScanOptions) ([]ScanResult, error) { func (ps *ParquetScanner) Scan(ctx context.Context, options ScanOptions) ([]ScanResult, error) {
var results []ScanResult var results []ScanResult
// Get all partitions for this topic // Get all partitions for this topic
// TODO: Implement proper partition discovery // TODO: Implement proper partition discovery
// For now, assume partition 0 exists // For now, assume partition 0 exists
partitions := []topic.Partition{{RangeStart: 0, RangeStop: 1000}} partitions := []topic.Partition{{RangeStart: 0, RangeStop: 1000}}
for _, partition := range partitions { for _, partition := range partitions {
partitionResults, err := ps.scanPartition(ctx, partition, options) partitionResults, err := ps.scanPartition(ctx, partition, options)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to scan partition %v: %v", partition, err) return nil, fmt.Errorf("failed to scan partition %v: %v", partition, err)
} }
results = append(results, partitionResults...) results = append(results, partitionResults...)
// Apply global limit across all partitions // Apply global limit across all partitions
if options.Limit > 0 && len(results) >= options.Limit { if options.Limit > 0 && len(results) >= options.Limit {
results = results[:options.Limit] results = results[:options.Limit]
break break
} }
} }
return results, nil return results, nil
} }
// scanPartition scans a specific topic partition // scanPartition scans a specific topic partition
func (ps *ParquetScanner) scanPartition(ctx context.Context, partition topic.Partition, options ScanOptions) ([]ScanResult, error) { func (ps *ParquetScanner) scanPartition(ctx context.Context, partition topic.Partition, options ScanOptions) ([]ScanResult, error) {
// partitionDir := topic.PartitionDir(ps.topic, partition) // TODO: Use for actual file listing // partitionDir := topic.PartitionDir(ps.topic, partition) // TODO: Use for actual file listing
var results []ScanResult var results []ScanResult
// List Parquet files in partition directory // List Parquet files in partition directory
// TODO: Implement proper file listing with date range filtering // TODO: Implement proper file listing with date range filtering
// For now, this is a placeholder that would list actual Parquet files // For now, this is a placeholder that would list actual Parquet files
// Simulate file processing - in real implementation, this would: // Simulate file processing - in real implementation, this would:
// 1. List files in partitionDir via filerClient // 1. List files in partitionDir via filerClient
// 2. Filter files by date range if time filtering is enabled // 2. Filter files by date range if time filtering is enabled
// 3. Process each Parquet file in chronological order // 3. Process each Parquet file in chronological order
// Placeholder: Create sample data for testing // Placeholder: Create sample data for testing
if len(results) == 0 { if len(results) == 0 {
// Generate sample data for demonstration // Generate sample data for demonstration
sampleData := ps.generateSampleData(options) sampleData := ps.generateSampleData(options)
results = append(results, sampleData...) results = append(results, sampleData...)
} }
return results, nil return results, nil
} }
// scanParquetFile scans a single Parquet file (real implementation) // scanParquetFile scans a single Parquet file (real implementation)
func (ps *ParquetScanner) scanParquetFile(ctx context.Context, entry *filer_pb.Entry, options ScanOptions) ([]ScanResult, error) { func (ps *ParquetScanner) scanParquetFile(ctx context.Context, entry *filer_pb.Entry, options ScanOptions) ([]ScanResult, error) {
var results []ScanResult var results []ScanResult
// Create reader for the Parquet file (same pattern as logstore) // Create reader for the Parquet file (same pattern as logstore)
lookupFileIdFn := filer.LookupFn(ps.filerClient) lookupFileIdFn := filer.LookupFn(ps.filerClient)
fileSize := filer.FileSize(entry) fileSize := filer.FileSize(entry)
@@ -177,16 +177,16 @@ func (ps *ParquetScanner) scanParquetFile(ctx context.Context, entry *filer_pb.E
chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize)) chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize))
readerCache := filer.NewReaderCache(32, ps.chunkCache, lookupFileIdFn) readerCache := filer.NewReaderCache(32, ps.chunkCache, lookupFileIdFn)
readerAt := filer.NewChunkReaderAtFromClient(ctx, readerCache, chunkViews, int64(fileSize)) readerAt := filer.NewChunkReaderAtFromClient(ctx, readerCache, chunkViews, int64(fileSize))
// Create Parquet reader // Create Parquet reader
parquetReader := parquet.NewReader(readerAt) parquetReader := parquet.NewReader(readerAt)
defer parquetReader.Close() defer parquetReader.Close()
rows := make([]parquet.Row, 128) // Read in batches like logstore rows := make([]parquet.Row, 128) // Read in batches like logstore
for { for {
rowCount, readErr := parquetReader.ReadRows(rows) rowCount, readErr := parquetReader.ReadRows(rows)
// Process rows even if EOF // Process rows even if EOF
for i := 0; i < rowCount; i++ { for i := 0; i < rowCount; i++ {
// Convert Parquet row to schema value // Convert Parquet row to schema value
@@ -194,11 +194,11 @@ func (ps *ParquetScanner) scanParquetFile(ctx context.Context, entry *filer_pb.E
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to convert row: %v", err) return nil, fmt.Errorf("failed to convert row: %v", err)
} }
// Extract system columns // Extract system columns
timestamp := recordValue.Fields[SW_COLUMN_NAME_TS].GetInt64Value() timestamp := recordValue.Fields[SW_COLUMN_NAME_TS].GetInt64Value()
key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue() key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()
// Apply time filtering // Apply time filtering
if options.StartTimeNs > 0 && timestamp < options.StartTimeNs { if options.StartTimeNs > 0 && timestamp < options.StartTimeNs {
continue continue
@@ -206,12 +206,12 @@ func (ps *ParquetScanner) scanParquetFile(ctx context.Context, entry *filer_pb.E
if options.StopTimeNs > 0 && timestamp >= options.StopTimeNs { if options.StopTimeNs > 0 && timestamp >= options.StopTimeNs {
break // Assume data is time-ordered break // Assume data is time-ordered
} }
// Apply predicate filtering (WHERE clause) // Apply predicate filtering (WHERE clause)
if options.Predicate != nil && !options.Predicate(recordValue) { if options.Predicate != nil && !options.Predicate(recordValue) {
continue continue
} }
// Apply column projection // Apply column projection
values := make(map[string]*schema_pb.Value) values := make(map[string]*schema_pb.Value)
if len(options.Columns) == 0 { if len(options.Columns) == 0 {
@@ -229,61 +229,61 @@ func (ps *ParquetScanner) scanParquetFile(ctx context.Context, entry *filer_pb.E
} }
} }
} }
results = append(results, ScanResult{ results = append(results, ScanResult{
Values: values, Values: values,
Timestamp: timestamp, Timestamp: timestamp,
Key: key, Key: key,
}) })
// Apply row limit // Apply row limit
if options.Limit > 0 && len(results) >= options.Limit { if options.Limit > 0 && len(results) >= options.Limit {
return results, nil return results, nil
} }
} }
if readErr != nil { if readErr != nil {
break // EOF or error break // EOF or error
} }
} }
return results, nil return results, nil
} }
// generateSampleData creates sample data for testing when no real Parquet files exist // generateSampleData creates sample data for testing when no real Parquet files exist
func (ps *ParquetScanner) generateSampleData(options ScanOptions) []ScanResult { func (ps *ParquetScanner) generateSampleData(options ScanOptions) []ScanResult {
now := time.Now().UnixNano() now := time.Now().UnixNano()
sampleData := []ScanResult{ sampleData := []ScanResult{
{ {
Values: map[string]*schema_pb.Value{ Values: map[string]*schema_pb.Value{
"user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1001}}, "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1001}},
"event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "login"}}, "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "login"}},
"data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"ip": "192.168.1.1"}`}}, "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"ip": "192.168.1.1"}`}},
}, },
Timestamp: now - 3600000000000, // 1 hour ago Timestamp: now - 3600000000000, // 1 hour ago
Key: []byte("user-1001"), Key: []byte("user-1001"),
}, },
{ {
Values: map[string]*schema_pb.Value{ Values: map[string]*schema_pb.Value{
"user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1002}}, "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1002}},
"event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "page_view"}}, "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "page_view"}},
"data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"page": "/dashboard"}`}}, "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"page": "/dashboard"}`}},
}, },
Timestamp: now - 1800000000000, // 30 minutes ago Timestamp: now - 1800000000000, // 30 minutes ago
Key: []byte("user-1002"), Key: []byte("user-1002"),
}, },
{ {
Values: map[string]*schema_pb.Value{ Values: map[string]*schema_pb.Value{
"user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1001}}, "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1001}},
"event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "logout"}}, "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "logout"}},
"data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"session_duration": 3600}`}}, "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"session_duration": 3600}`}},
}, },
Timestamp: now - 900000000000, // 15 minutes ago Timestamp: now - 900000000000, // 15 minutes ago
Key: []byte("user-1001"), Key: []byte("user-1001"),
}, },
} }
// Apply predicate filtering if specified // Apply predicate filtering if specified
if options.Predicate != nil { if options.Predicate != nil {
var filtered []ScanResult var filtered []ScanResult
@@ -295,19 +295,19 @@ func (ps *ParquetScanner) generateSampleData(options ScanOptions) []ScanResult {
} }
recordValue.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: result.Timestamp}} recordValue.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: result.Timestamp}}
recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: result.Key}} recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: result.Key}}
if options.Predicate(recordValue) { if options.Predicate(recordValue) {
filtered = append(filtered, result) filtered = append(filtered, result)
} }
} }
sampleData = filtered sampleData = filtered
} }
// Apply limit // Apply limit
if options.Limit > 0 && len(sampleData) > options.Limit { if options.Limit > 0 && len(sampleData) > options.Limit {
sampleData = sampleData[:options.Limit] sampleData = sampleData[:options.Limit]
} }
return sampleData return sampleData
} }
@@ -319,7 +319,7 @@ func (ps *ParquetScanner) ConvertToSQLResult(results []ScanResult, columns []str
Rows: [][]sqltypes.Value{}, Rows: [][]sqltypes.Value{},
} }
} }
// Determine columns if not specified // Determine columns if not specified
if len(columns) == 0 { if len(columns) == 0 {
columnSet := make(map[string]bool) columnSet := make(map[string]bool)
@@ -328,13 +328,13 @@ func (ps *ParquetScanner) ConvertToSQLResult(results []ScanResult, columns []str
columnSet[columnName] = true columnSet[columnName] = true
} }
} }
columns = make([]string, 0, len(columnSet)) columns = make([]string, 0, len(columnSet))
for columnName := range columnSet { for columnName := range columnSet {
columns = append(columns, columnName) columns = append(columns, columnName)
} }
} }
// Convert to SQL rows // Convert to SQL rows
rows := make([][]sqltypes.Value, len(results)) rows := make([][]sqltypes.Value, len(results))
for i, result := range results { for i, result := range results {
@@ -348,7 +348,7 @@ func (ps *ParquetScanner) ConvertToSQLResult(results []ScanResult, columns []str
} }
rows[i] = row rows[i] = row
} }
return &QueryResult{ return &QueryResult{
Columns: columns, Columns: columns,
Rows: rows, Rows: rows,
@@ -360,7 +360,7 @@ func convertSchemaValueToSQL(value *schema_pb.Value) sqltypes.Value {
if value == nil { if value == nil {
return sqltypes.NULL return sqltypes.NULL
} }
switch v := value.Kind.(type) { switch v := value.Kind.(type) {
case *schema_pb.Value_BoolValue: case *schema_pb.Value_BoolValue:
if v.BoolValue { if v.BoolValue {