From 5d5f9acfff9d0849edfd961ab027b643d07078a0 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 6 Oct 2025 23:41:31 -0700 Subject: [PATCH] For offset-based reads, ignore startFileName --- test/kafka/kafka-client-loadtest/Makefile | 1 + .../kafka-client-loadtest/docker-compose.yml | 22 +++++++- weed/mq/kafka/protocol/handler.go | 55 ++----------------- weed/mq/logstore/read_log_from_disk.go | 17 ++++++ 4 files changed, 44 insertions(+), 51 deletions(-) diff --git a/test/kafka/kafka-client-loadtest/Makefile b/test/kafka/kafka-client-loadtest/Makefile index 411e80341..7495a05f2 100644 --- a/test/kafka/kafka-client-loadtest/Makefile +++ b/test/kafka/kafka-client-loadtest/Makefile @@ -91,6 +91,7 @@ start: build-gateway ## Start the infrastructure services (without load test) seaweedfs-filer \ seaweedfs-mq-broker \ kafka-gateway \ + schema-registry-init \ schema-registry @echo "$(GREEN)Infrastructure started$(NC)" @echo "Waiting for services to be ready..." diff --git a/test/kafka/kafka-client-loadtest/docker-compose.yml b/test/kafka/kafka-client-loadtest/docker-compose.yml index 46f151b04..7514fc843 100644 --- a/test/kafka/kafka-client-loadtest/docker-compose.yml +++ b/test/kafka/kafka-client-loadtest/docker-compose.yml @@ -12,10 +12,28 @@ x-seaweedfs-build: &seaweedfs-build services: # Schema Registry (for Avro/Protobuf support) # Using host networking to connect to localhost:9093 (where our gateway advertises) + # WORKAROUND: Schema Registry hangs on empty _schemas topic during bootstrap + # Pre-create the topic first to avoid "wait to catch up" hang + schema-registry-init: + image: confluentinc/cp-kafka:8.0.0 + container_name: loadtest-schema-registry-init + networks: + - kafka-loadtest-net + depends_on: + kafka-gateway: + condition: service_healthy + command: > + bash -c " + echo 'Creating _schemas topic...'; + kafka-topics --create --topic _schemas --partitions 1 --replication-factor 1 \ + --bootstrap-server kafka-gateway:9093 --if-not-exists || exit 0; + echo '_schemas topic created successfully'; + " + schema-registry: image: confluentinc/cp-schema-registry:8.0.0 container_name: loadtest-schema-registry - restart: on-failure:5 + restart: on-failure:3 ports: - "8081:8081" environment: @@ -50,6 +68,8 @@ services: retries: 10 start_period: 30s depends_on: + schema-registry-init: + condition: service_completed_successfully kafka-gateway: condition: service_healthy networks: diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index a69e5fd1f..3b4f9cc19 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -3920,57 +3920,12 @@ func (h *Handler) createTopicWithSchemaSupport(topicName string, partitions int3 // createTopicWithDefaultFlexibleSchema creates a topic with a flexible default schema // that can handle both Avro and JSON messages when schema management is enabled func (h *Handler) createTopicWithDefaultFlexibleSchema(topicName string, partitions int32) error { - // For system topics like _schemas, create both key and value fields - // Schema Registry messages have structured keys and values - var keySchema, valueSchema *schema_pb.RecordType + // CRITICAL FIX: System topics like _schemas should be PLAIN Kafka topics without schema management + // Schema Registry uses _schemas to STORE schemas, so it can't have schema management itself + // This was causing issues with Schema Registry bootstrap - if topicName == "_schemas" { - // _schemas topic needs both key and value fields - // Key contains metadata (magicByte, keytype, subject, version) - keySchema = &schema_pb.RecordType{ - Fields: []*schema_pb.Field{ - { - Name: "key", - Type: &schema_pb.Type{ - Kind: &schema_pb.Type_ScalarType{ - ScalarType: schema_pb.ScalarType_BYTES, - }, - }, - }, - }, - } - - // Value contains schema data - valueSchema = &schema_pb.RecordType{ - Fields: []*schema_pb.Field{ - { - Name: "value", - Type: &schema_pb.Type{ - Kind: &schema_pb.Type_ScalarType{ - ScalarType: schema_pb.ScalarType_BYTES, - }, - }, - }, - }, - } - } else { - // For other system topics, use flexible schema with just value - valueSchema = &schema_pb.RecordType{ - Fields: []*schema_pb.Field{ - { - Name: "value", - Type: &schema_pb.Type{ - Kind: &schema_pb.Type_ScalarType{ - ScalarType: schema_pb.ScalarType_BYTES, - }, - }, - }, - }, - } - } - - // Create topic with the schemas - return h.seaweedMQHandler.CreateTopicWithSchemas(topicName, partitions, keySchema, valueSchema) + glog.V(0).Infof("Creating system topic %s as PLAIN topic (no schema management)", topicName) + return h.seaweedMQHandler.CreateTopic(topicName, partitions) } // fetchSchemaForTopic attempts to fetch schema information for a topic from Schema Registry diff --git a/weed/mq/logstore/read_log_from_disk.go b/weed/mq/logstore/read_log_from_disk.go index 0f0acb868..d149bc645 100644 --- a/weed/mq/logstore/read_log_from_disk.go +++ b/weed/mq/logstore/read_log_from_disk.go @@ -164,6 +164,11 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top var startOffset int64 if isOffsetBased { startOffset = startPosition.Offset + // CRITICAL FIX: For offset-based reads, ignore startFileName (which is based on Time) + // and list all files from the beginning to find the right offset + startFileName = "" + glog.V(0).Infof("📖 DISK READ START: topic=%s partition=%s startOffset=%d (using empty startFileName for offset-based read)", + t.Name, p, startOffset) } // OPTIMIZATION: For offset-based reads, collect all files with their offset ranges first @@ -173,6 +178,7 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { // First pass: collect all relevant files with their metadata + glog.V(0).Infof("📁 DISK READ: Listing directory %s for offset %d startFileName=%q", partitionDir, startOffset, startFileName) return filer_pb.SeaweedList(context.Background(), client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error { if entry.IsDirectory { @@ -190,6 +196,7 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top // OPTIMIZATION: For offset-based reads, check if this file contains the requested offset if isOffsetBased { + glog.V(0).Infof("📄 DISK READ: Found file %s", entry.Name) // Check if file has offset range metadata if minOffsetBytes, hasMin := entry.Extended["offset_min"]; hasMin && len(minOffsetBytes) == 8 { if maxOffsetBytes, hasMax := entry.Extended["offset_max"]; hasMax && len(maxOffsetBytes) == 8 { @@ -222,15 +229,25 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top // Add file to candidates for processing candidateFiles = append(candidateFiles, entry) + glog.V(0).Infof("✅ DISK READ: Added candidate file %s (total=%d)", entry.Name, len(candidateFiles)) return nil }, startFileName, true, math.MaxInt32) }) if err != nil { + glog.Errorf("❌ DISK READ: Failed to list directory %s: %v", partitionDir, err) return } + glog.V(0).Infof("📊 DISK READ: Found %d candidate files for topic=%s partition=%s offset=%d", + len(candidateFiles), t.Name, p, startOffset) + + if len(candidateFiles) == 0 { + glog.V(0).Infof("⚠️ DISK READ: No files found in %s", partitionDir) + return startPosition, isDone, nil + } + // OPTIMIZATION: For offset-based reads with many files, use binary search to find start file if isOffsetBased && len(candidateFiles) > 10 { // Binary search to find the first file that might contain our offset