Files
seaweedfs/test/postgres/producer.go

393 lines
17 KiB
Go
Raw Normal View History

2025-09-02 10:51:37 -07:00
package main
import (
"encoding/json"
"fmt"
"log"
"math/rand"
"os"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
2025-09-02 14:09:38 -07:00
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
2025-09-02 10:51:37 -07:00
)
type UserEvent struct {
ID int64 `json:"id"`
UserID int64 `json:"user_id"`
UserType string `json:"user_type"`
Action string `json:"action"`
Status string `json:"status"`
Amount float64 `json:"amount,omitempty"`
Timestamp time.Time `json:"timestamp"`
Metadata string `json:"metadata,omitempty"`
}
type SystemLog struct {
ID int64 `json:"id"`
Level string `json:"level"`
Service string `json:"service"`
Message string `json:"message"`
ErrorCode int `json:"error_code,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
type MetricEntry struct {
ID int64 `json:"id"`
Name string `json:"name"`
Value float64 `json:"value"`
Tags string `json:"tags"`
Timestamp time.Time `json:"timestamp"`
}
type ProductView struct {
ID int64 `json:"id"`
ProductID int64 `json:"product_id"`
UserID int64 `json:"user_id"`
Category string `json:"category"`
Price float64 `json:"price"`
ViewCount int `json:"view_count"`
Timestamp time.Time `json:"timestamp"`
}
func main() {
// Get SeaweedFS configuration from environment
masterAddr := getEnv("SEAWEEDFS_MASTER", "localhost:9333")
filerAddr := getEnv("SEAWEEDFS_FILER", "localhost:8888")
log.Printf("Creating MQ test data...")
log.Printf("Master: %s", masterAddr)
log.Printf("Filer: %s", filerAddr)
// Wait for SeaweedFS to be ready
log.Println("Waiting for SeaweedFS to be ready...")
time.Sleep(10 * time.Second)
// Create topics and populate with data
topics := []struct {
namespace string
topic string
generator func() interface{}
count int
}{
{"analytics", "user_events", generateUserEvent, 1000},
{"analytics", "system_logs", generateSystemLog, 500},
{"analytics", "metrics", generateMetric, 800},
{"ecommerce", "product_views", generateProductView, 1200},
{"ecommerce", "user_events", generateUserEvent, 600},
{"logs", "application_logs", generateSystemLog, 2000},
{"logs", "error_logs", generateErrorLog, 300},
}
for _, topicConfig := range topics {
log.Printf("Creating topic %s.%s with %d records...",
topicConfig.namespace, topicConfig.topic, topicConfig.count)
err := createTopicData(masterAddr, filerAddr,
topicConfig.namespace, topicConfig.topic,
topicConfig.generator, topicConfig.count)
if err != nil {
log.Printf("Error creating topic %s.%s: %v",
topicConfig.namespace, topicConfig.topic, err)
} else {
log.Printf("✓ Successfully created %s.%s",
topicConfig.namespace, topicConfig.topic)
}
// Small delay between topics
time.Sleep(2 * time.Second)
}
log.Println("✓ MQ test data creation completed!")
log.Println("\nCreated namespaces:")
log.Println(" - analytics (user_events, system_logs, metrics)")
log.Println(" - ecommerce (product_views, user_events)")
log.Println(" - logs (application_logs, error_logs)")
log.Println("\nYou can now test with PostgreSQL clients:")
log.Println(" psql -h localhost -p 5432 -U seaweedfs -d analytics")
log.Println(" postgres=> SHOW TABLES;")
log.Println(" postgres=> SELECT COUNT(*) FROM user_events;")
}
2025-09-02 14:09:38 -07:00
// createSchemaForTopic creates a proper RecordType schema based on topic name
func createSchemaForTopic(topicName string) *schema_pb.RecordType {
switch topicName {
case "user_events":
return &schema_pb.RecordType{
Fields: []*schema_pb.Field{
{Name: "id", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true},
{Name: "user_id", FieldIndex: 1, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true},
{Name: "user_type", FieldIndex: 2, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
{Name: "action", FieldIndex: 3, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
{Name: "status", FieldIndex: 4, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
{Name: "amount", FieldIndex: 5, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_DOUBLE}}, IsRequired: false},
{Name: "timestamp", FieldIndex: 6, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
{Name: "metadata", FieldIndex: 7, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: false},
},
}
case "system_logs":
return &schema_pb.RecordType{
Fields: []*schema_pb.Field{
{Name: "id", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true},
{Name: "level", FieldIndex: 1, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
{Name: "service", FieldIndex: 2, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
{Name: "message", FieldIndex: 3, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
{Name: "error_code", FieldIndex: 4, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT32}}, IsRequired: false},
{Name: "timestamp", FieldIndex: 5, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
},
}
case "metrics":
return &schema_pb.RecordType{
Fields: []*schema_pb.Field{
{Name: "id", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true},
{Name: "name", FieldIndex: 1, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
{Name: "value", FieldIndex: 2, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_DOUBLE}}, IsRequired: true},
{Name: "tags", FieldIndex: 3, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
{Name: "timestamp", FieldIndex: 4, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
},
}
case "product_views":
return &schema_pb.RecordType{
Fields: []*schema_pb.Field{
{Name: "id", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true},
{Name: "product_id", FieldIndex: 1, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true},
{Name: "user_id", FieldIndex: 2, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true},
{Name: "category", FieldIndex: 3, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
{Name: "price", FieldIndex: 4, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_DOUBLE}}, IsRequired: true},
{Name: "view_count", FieldIndex: 5, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT32}}, IsRequired: true},
{Name: "timestamp", FieldIndex: 6, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
},
}
case "application_logs", "error_logs":
return &schema_pb.RecordType{
Fields: []*schema_pb.Field{
{Name: "id", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true},
{Name: "level", FieldIndex: 1, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
{Name: "service", FieldIndex: 2, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
{Name: "message", FieldIndex: 3, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
{Name: "error_code", FieldIndex: 4, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT32}}, IsRequired: false},
{Name: "timestamp", FieldIndex: 5, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
},
}
default:
// Default generic schema
return &schema_pb.RecordType{
Fields: []*schema_pb.Field{
{Name: "data", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_BYTES}}, IsRequired: true},
},
}
}
}
// convertToRecordValue converts Go structs to RecordValue format
func convertToRecordValue(data interface{}) (*schema_pb.RecordValue, error) {
fields := make(map[string]*schema_pb.Value)
switch v := data.(type) {
case UserEvent:
fields["id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ID}}
fields["user_id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.UserID}}
fields["user_type"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.UserType}}
fields["action"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Action}}
fields["status"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Status}}
fields["amount"] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v.Amount}}
fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Timestamp.Format(time.RFC3339)}}
fields["metadata"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Metadata}}
case SystemLog:
fields["id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ID}}
fields["level"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Level}}
fields["service"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Service}}
fields["message"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Message}}
fields["error_code"] = &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: int32(v.ErrorCode)}}
fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Timestamp.Format(time.RFC3339)}}
case MetricEntry:
fields["id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ID}}
fields["name"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Name}}
fields["value"] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v.Value}}
fields["tags"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Tags}}
fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Timestamp.Format(time.RFC3339)}}
case ProductView:
fields["id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ID}}
fields["product_id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ProductID}}
fields["user_id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.UserID}}
fields["category"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Category}}
fields["price"] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v.Price}}
fields["view_count"] = &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: int32(v.ViewCount)}}
fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Timestamp.Format(time.RFC3339)}}
default:
// Fallback to JSON for unknown types
jsonData, err := json.Marshal(data)
if err != nil {
return nil, fmt.Errorf("failed to marshal unknown type: %v", err)
}
fields["data"] = &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: jsonData}}
}
return &schema_pb.RecordValue{Fields: fields}, nil
}
2025-09-02 10:51:37 -07:00
func createTopicData(masterAddr, filerAddr, namespace, topicName string,
generator func() interface{}, count int) error {
2025-09-02 14:09:38 -07:00
// Create schema based on topic type
recordType := createSchemaForTopic(topicName)
2025-09-02 10:51:37 -07:00
// Create publisher configuration
config := &pub_client.PublisherConfiguration{
Topic: topic.NewTopic(namespace, topicName),
PartitionCount: 1,
Brokers: []string{strings.Replace(masterAddr, ":9333", ":17777", 1)}, // Use broker port
PublisherName: fmt.Sprintf("test-producer-%s-%s", namespace, topicName),
2025-09-02 14:09:38 -07:00
RecordType: recordType, // Use structured schema
2025-09-02 10:51:37 -07:00
}
// Create publisher
publisher, err := pub_client.NewTopicPublisher(config)
if err != nil {
return fmt.Errorf("failed to create publisher: %v", err)
}
defer publisher.Shutdown()
// Generate and publish data
for i := 0; i < count; i++ {
data := generator()
2025-09-02 14:09:38 -07:00
// Convert struct to RecordValue
recordValue, err := convertToRecordValue(data)
2025-09-02 10:51:37 -07:00
if err != nil {
2025-09-02 14:09:38 -07:00
log.Printf("Error converting data to RecordValue: %v", err)
2025-09-02 10:51:37 -07:00
continue
}
2025-09-02 14:09:38 -07:00
// Publish structured record
err = publisher.PublishRecord([]byte(fmt.Sprintf("key-%d", i)), recordValue)
2025-09-02 10:51:37 -07:00
if err != nil {
log.Printf("Error publishing message %d: %v", i+1, err)
continue
}
// Small delay every 100 messages
if (i+1)%100 == 0 {
log.Printf(" Published %d/%d messages to %s.%s",
i+1, count, namespace, topicName)
time.Sleep(100 * time.Millisecond)
}
}
// Finish publishing
err = publisher.FinishPublish()
if err != nil {
return fmt.Errorf("failed to finish publishing: %v", err)
}
return nil
}
func generateUserEvent() interface{} {
userTypes := []string{"premium", "standard", "trial", "enterprise"}
actions := []string{"login", "logout", "purchase", "view", "search", "click", "download"}
statuses := []string{"active", "inactive", "pending", "completed", "failed"}
return UserEvent{
ID: rand.Int63n(1000000) + 1,
UserID: rand.Int63n(10000) + 1,
UserType: userTypes[rand.Intn(len(userTypes))],
Action: actions[rand.Intn(len(actions))],
Status: statuses[rand.Intn(len(statuses))],
Amount: rand.Float64() * 1000,
Timestamp: time.Now().Add(-time.Duration(rand.Intn(86400*30)) * time.Second),
Metadata: fmt.Sprintf("{\"session_id\":\"%d\"}", rand.Int63n(100000)),
}
}
func generateSystemLog() interface{} {
levels := []string{"debug", "info", "warning", "error", "critical"}
services := []string{"auth-service", "payment-service", "user-service", "notification-service", "api-gateway"}
messages := []string{
"Request processed successfully",
"User authentication completed",
"Payment transaction initiated",
"Database connection established",
"Cache miss for key",
"API rate limit exceeded",
"Service health check passed",
}
return SystemLog{
ID: rand.Int63n(1000000) + 1,
Level: levels[rand.Intn(len(levels))],
Service: services[rand.Intn(len(services))],
Message: messages[rand.Intn(len(messages))],
ErrorCode: rand.Intn(1000),
Timestamp: time.Now().Add(-time.Duration(rand.Intn(86400*7)) * time.Second),
}
}
func generateErrorLog() interface{} {
levels := []string{"error", "critical", "fatal"}
services := []string{"auth-service", "payment-service", "user-service", "notification-service", "api-gateway"}
messages := []string{
"Database connection failed",
"Authentication token expired",
"Payment processing error",
"Service unavailable",
"Memory limit exceeded",
"Timeout waiting for response",
"Invalid request parameters",
}
return SystemLog{
ID: rand.Int63n(1000000) + 1,
Level: levels[rand.Intn(len(levels))],
Service: services[rand.Intn(len(services))],
Message: messages[rand.Intn(len(messages))],
ErrorCode: rand.Intn(100) + 400, // 400-499 error codes
Timestamp: time.Now().Add(-time.Duration(rand.Intn(86400*7)) * time.Second),
}
}
func generateMetric() interface{} {
names := []string{"cpu_usage", "memory_usage", "disk_usage", "request_latency", "error_rate", "throughput"}
tags := []string{
"service=web,region=us-east",
"service=api,region=us-west",
"service=db,region=eu-central",
"service=cache,region=asia-pacific",
}
return MetricEntry{
ID: rand.Int63n(1000000) + 1,
Name: names[rand.Intn(len(names))],
Value: rand.Float64() * 100,
Tags: tags[rand.Intn(len(tags))],
Timestamp: time.Now().Add(-time.Duration(rand.Intn(86400*3)) * time.Second),
}
}
func generateProductView() interface{} {
categories := []string{"electronics", "books", "clothing", "home", "sports", "automotive"}
return ProductView{
ID: rand.Int63n(1000000) + 1,
ProductID: rand.Int63n(10000) + 1,
UserID: rand.Int63n(5000) + 1,
Category: categories[rand.Intn(len(categories))],
Price: rand.Float64() * 500,
ViewCount: rand.Intn(100) + 1,
Timestamp: time.Now().Add(-time.Duration(rand.Intn(86400*14)) * time.Second),
}
}
func getEnv(key, defaultValue string) string {
if value, exists := os.LookupEnv(key); exists {
return value
}
return defaultValue
}