feat: Enable publishers to use Parquet logical types

Enhanced MQ publishers to utilize the new logical types:
- Updated convertToRecordValue() to use TimestampValue instead of string RFC3339
- Added DateValue support for birth_date field (days since epoch)
- Added DecimalValue support for precise_amount field with configurable precision/scale
- Enhanced UserEvent struct with PreciseAmount and BirthDate fields
- Added convertToDecimal() helper using big.Rat for precise decimal conversion
- Updated test data generator to produce varied birth dates (1970-2005) and precise amounts

Publishers now generate structured data with proper logical types:
-  TIMESTAMP: Microsecond precision UTC timestamps
-  DATE: Birth dates as days since Unix epoch
-  DECIMAL: Precise amounts with 18-digit precision, 4-decimal scale

Successfully tested with PostgreSQL integration - all topics created with logical type data.
This commit is contained in:
chrislu
2025-09-03 07:26:36 -07:00
parent ec1e74a6e8
commit 3570027656

View File

@@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"log" "log"
"math/big"
"math/rand" "math/rand"
"os" "os"
"strconv" "strconv"
@@ -29,6 +30,8 @@ type UserEvent struct {
Action string `json:"action"` Action string `json:"action"`
Status string `json:"status"` Status string `json:"status"`
Amount float64 `json:"amount,omitempty"` Amount float64 `json:"amount,omitempty"`
PreciseAmount string `json:"precise_amount,omitempty"` // Will be converted to DECIMAL
BirthDate time.Time `json:"birth_date"` // Will be converted to DATE
Timestamp time.Time `json:"timestamp"` Timestamp time.Time `json:"timestamp"`
Metadata string `json:"metadata,omitempty"` Metadata string `json:"metadata,omitempty"`
} }
@@ -189,6 +192,26 @@ func createSchemaForTopic(topicName string) *schema_pb.RecordType {
} }
} }
// convertToDecimal converts a string to decimal format for Parquet logical type
func convertToDecimal(value string) ([]byte, int32, int32) {
// Parse the decimal string using big.Rat for precision
rat := new(big.Rat)
if _, success := rat.SetString(value); !success {
return nil, 0, 0
}
// Convert to a fixed scale (e.g., 4 decimal places)
scale := int32(4)
precision := int32(18) // Total digits
// Scale the rational number to integer representation
multiplier := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(scale)), nil)
scaled := new(big.Int).Mul(rat.Num(), multiplier)
scaled.Div(scaled, rat.Denom())
return scaled.Bytes(), precision, scale
}
// convertToRecordValue converts Go structs to RecordValue format // convertToRecordValue converts Go structs to RecordValue format
func convertToRecordValue(data interface{}) (*schema_pb.RecordValue, error) { func convertToRecordValue(data interface{}) (*schema_pb.RecordValue, error) {
fields := make(map[string]*schema_pb.Value) fields := make(map[string]*schema_pb.Value)
@@ -201,7 +224,27 @@ func convertToRecordValue(data interface{}) (*schema_pb.RecordValue, error) {
fields["action"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Action}} fields["action"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Action}}
fields["status"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Status}} fields["status"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Status}}
fields["amount"] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v.Amount}} fields["amount"] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v.Amount}}
fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Timestamp.Format(time.RFC3339)}}
// Convert precise amount to DECIMAL logical type
if v.PreciseAmount != "" {
if decimal, precision, scale := convertToDecimal(v.PreciseAmount); decimal != nil {
fields["precise_amount"] = &schema_pb.Value{Kind: &schema_pb.Value_DecimalValue{DecimalValue: &schema_pb.DecimalValue{
Value: decimal,
Precision: precision,
Scale: scale,
}}}
}
}
// Convert birth date to DATE logical type
fields["birth_date"] = &schema_pb.Value{Kind: &schema_pb.Value_DateValue{DateValue: &schema_pb.DateValue{
DaysSinceEpoch: int32(v.BirthDate.Unix() / 86400), // Convert to days since epoch
}}}
fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{
TimestampMicros: v.Timestamp.UnixMicro(),
IsUtc: true,
}}}
fields["metadata"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Metadata}} fields["metadata"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Metadata}}
case SystemLog: case SystemLog:
@@ -210,14 +253,20 @@ func convertToRecordValue(data interface{}) (*schema_pb.RecordValue, error) {
fields["service"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Service}} fields["service"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Service}}
fields["message"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Message}} fields["message"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Message}}
fields["error_code"] = &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: int32(v.ErrorCode)}} fields["error_code"] = &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: int32(v.ErrorCode)}}
fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Timestamp.Format(time.RFC3339)}} fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{
TimestampMicros: v.Timestamp.UnixMicro(),
IsUtc: true,
}}}
case MetricEntry: case MetricEntry:
fields["id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ID}} fields["id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ID}}
fields["name"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Name}} fields["name"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Name}}
fields["value"] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v.Value}} fields["value"] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v.Value}}
fields["tags"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Tags}} fields["tags"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Tags}}
fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Timestamp.Format(time.RFC3339)}} fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{
TimestampMicros: v.Timestamp.UnixMicro(),
IsUtc: true,
}}}
case ProductView: case ProductView:
fields["id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ID}} fields["id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ID}}
@@ -226,7 +275,10 @@ func convertToRecordValue(data interface{}) (*schema_pb.RecordValue, error) {
fields["category"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Category}} fields["category"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Category}}
fields["price"] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v.Price}} fields["price"] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v.Price}}
fields["view_count"] = &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: int32(v.ViewCount)}} fields["view_count"] = &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: int32(v.ViewCount)}}
fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Timestamp.Format(time.RFC3339)}} fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{
TimestampMicros: v.Timestamp.UnixMicro(),
IsUtc: true,
}}}
default: default:
// Fallback to JSON for unknown types // Fallback to JSON for unknown types
@@ -384,6 +436,15 @@ func generateUserEvent() interface{} {
actions := []string{"login", "logout", "purchase", "view", "search", "click", "download"} actions := []string{"login", "logout", "purchase", "view", "search", "click", "download"}
statuses := []string{"active", "inactive", "pending", "completed", "failed"} statuses := []string{"active", "inactive", "pending", "completed", "failed"}
// Generate a birth date between 1970 and 2005 (18+ years old)
birthYear := 1970 + rand.Intn(35)
birthMonth := 1 + rand.Intn(12)
birthDay := 1 + rand.Intn(28) // Keep it simple, avoid month-specific day issues
birthDate := time.Date(birthYear, time.Month(birthMonth), birthDay, 0, 0, 0, 0, time.UTC)
// Generate a precise amount as a string with 4 decimal places
preciseAmount := fmt.Sprintf("%.4f", rand.Float64()*10000)
return UserEvent{ return UserEvent{
ID: rand.Int63n(1000000) + 1, ID: rand.Int63n(1000000) + 1,
UserID: rand.Int63n(10000) + 1, UserID: rand.Int63n(10000) + 1,
@@ -391,6 +452,8 @@ func generateUserEvent() interface{} {
Action: actions[rand.Intn(len(actions))], Action: actions[rand.Intn(len(actions))],
Status: statuses[rand.Intn(len(statuses))], Status: statuses[rand.Intn(len(statuses))],
Amount: rand.Float64() * 1000, Amount: rand.Float64() * 1000,
PreciseAmount: preciseAmount,
BirthDate: birthDate,
Timestamp: time.Now().Add(-time.Duration(rand.Intn(86400*30)) * time.Second), Timestamp: time.Now().Add(-time.Duration(rand.Intn(86400*30)) * time.Second),
Metadata: fmt.Sprintf("{\"session_id\":\"%d\"}", rand.Int63n(100000)), Metadata: fmt.Sprintf("{\"session_id\":\"%d\"}", rand.Int63n(100000)),
} }