toRow conversion with levels info

This commit is contained in:
chrislu 2024-04-25 09:14:37 -07:00
parent 9cb9d27b5b
commit 977e7988e6

View File

@ -6,24 +6,23 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
) )
func rowBuilderVisit(rowBuilder *parquet.RowBuilder, fieldType *schema_pb.Type, fieldValue *schema_pb.Value, columnIndex int) (endIndex int, err error) { func rowBuilderVisit(rowBuilder *parquet.RowBuilder, fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value) (err error) {
switch fieldType.Kind.(type) { switch fieldType.Kind.(type) {
case *schema_pb.Type_ScalarType: case *schema_pb.Type_ScalarType:
endIndex = columnIndex+1
var parquetValue parquet.Value var parquetValue parquet.Value
parquetValue, err = toParquetValue(fieldValue) parquetValue, err = toParquetValue(fieldValue)
if err != nil { if err != nil {
return return
} }
rowBuilder.Add(columnIndex, parquetValue) rowBuilder.Add(levels.startColumnIndex, parquetValue)
// fmt.Printf("rowBuilder.Add %d %v\n", columnIndex, parquetValue) // fmt.Printf("rowBuilder.Add %d %v\n", columnIndex, parquetValue)
case *schema_pb.Type_ListType: case *schema_pb.Type_ListType:
rowBuilder.Next(columnIndex) rowBuilder.Next(levels.startColumnIndex)
// fmt.Printf("rowBuilder.Next %d\n", columnIndex) // fmt.Printf("rowBuilder.Next %d\n", columnIndex)
elementType := fieldType.GetListType().ElementType elementType := fieldType.GetListType().ElementType
for _, value := range fieldValue.GetListValue().Values { for _, value := range fieldValue.GetListValue().Values {
if endIndex, err = rowBuilderVisit(rowBuilder, elementType, value, columnIndex); err != nil { if err = rowBuilderVisit(rowBuilder, elementType, levels, value); err != nil {
return return
} }
} }
@ -32,44 +31,43 @@ func rowBuilderVisit(rowBuilder *parquet.RowBuilder, fieldType *schema_pb.Type,
} }
func AddRecordValue(rowBuilder *parquet.RowBuilder, recordType *schema_pb.RecordType, recordValue *schema_pb.RecordValue) error { func AddRecordValue(rowBuilder *parquet.RowBuilder, recordType *schema_pb.RecordType, recordValue *schema_pb.RecordValue) error {
visitor := func(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, index int) (endIndex int, err error) { parquetLevels, err := ToParquetLevels(recordType)
return rowBuilderVisit(rowBuilder, fieldType, fieldValue, index) if err != nil {
return err
}
visitor := func(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value) (err error) {
return rowBuilderVisit(rowBuilder, fieldType, levels, fieldValue)
} }
fieldType := &schema_pb.Type{Kind: &schema_pb.Type_RecordType{RecordType: recordType}} fieldType := &schema_pb.Type{Kind: &schema_pb.Type_RecordType{RecordType: recordType}}
fieldValue := &schema_pb.Value{Kind: &schema_pb.Value_RecordValue{RecordValue: recordValue}} fieldValue := &schema_pb.Value{Kind: &schema_pb.Value_RecordValue{RecordValue: recordValue}}
return visitValue(fieldType, fieldValue, visitor) return doVisitValue(fieldType, parquetLevels, fieldValue, visitor)
} }
// typeValueVisitor is a function that is called for each value in a schema_pb.Value // typeValueVisitor is a function that is called for each value in a schema_pb.Value
// Find the column index. // Find the column index.
// intended to be used in RowBuilder.Add(columnIndex, value) // intended to be used in RowBuilder.Add(columnIndex, value)
type typeValueVisitor func(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, index int) (endIndex int, err error) type typeValueVisitor func(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value) (err error)
func visitValue(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, visitor typeValueVisitor) (err error) {
_, err = doVisitValue(fieldType, fieldValue, 0, visitor)
return
}
// endIndex is exclusive // endIndex is exclusive
// same logic as RowBuilder.configure in row_builder.go // same logic as RowBuilder.configure in row_builder.go
func doVisitValue(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, columnIndex int, visitor typeValueVisitor) (endIndex int, err error) { func doVisitValue(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value, visitor typeValueVisitor) (err error) {
switch fieldType.Kind.(type) { switch fieldType.Kind.(type) {
case *schema_pb.Type_ScalarType: case *schema_pb.Type_ScalarType:
return visitor(fieldType, fieldValue, columnIndex) return visitor(fieldType, levels, fieldValue)
case *schema_pb.Type_ListType: case *schema_pb.Type_ListType:
return visitor(fieldType, fieldValue, columnIndex) return visitor(fieldType, levels, fieldValue)
case *schema_pb.Type_RecordType: case *schema_pb.Type_RecordType:
for _, field := range fieldType.GetRecordType().Fields { for _, field := range fieldType.GetRecordType().Fields {
fieldValue, found := fieldValue.GetRecordValue().Fields[field.Name] fieldValue, found := fieldValue.GetRecordValue().Fields[field.Name]
if !found { if !found {
// TODO check this if no such field found // TODO check this if no such field found
return columnIndex, nil continue
} }
endIndex, err = doVisitValue(field.Type, fieldValue, columnIndex, visitor) fieldLevels := levels.levels[field.Name]
err = doVisitValue(field.Type, fieldLevels, fieldValue, visitor)
if err != nil { if err != nil {
return return
} }
columnIndex = endIndex
} }
return return
} }