From f82dd57f9b5e0687d85bf7c9d93c8ee60d544c74 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 4 Oct 2025 08:28:25 -0700 Subject: [PATCH] Goroutine count stable --- weed/command/mq_broker.go | 15 ++ weed/glog/glog.go | 43 ++- weed/mq/broker/broker_grpc_sub.go | 12 +- weed/mq/broker/broker_grpc_sub_offset.go | 8 +- .../mq/kafka/integration/seaweedmq_handler.go | 251 +++++++++++++----- weed/mq/kafka/protocol/fetch.go | 16 ++ weed/mq/kafka/protocol/handler.go | 14 +- 7 files changed, 277 insertions(+), 82 deletions(-) diff --git a/weed/command/mq_broker.go b/weed/command/mq_broker.go index 6cf440612..420d4db0c 100644 --- a/weed/command/mq_broker.go +++ b/weed/command/mq_broker.go @@ -1,6 +1,10 @@ package command import ( + "fmt" + "net/http" + _ "net/http/pprof" + "google.golang.org/grpc/reflection" "github.com/seaweedfs/seaweedfs/weed/util/grace" @@ -109,6 +113,17 @@ func (mqBrokerOpt *MessageQueueBrokerOptions) startQueueServer() bool { }() } + // Start HTTP profiling server + pprofPort := *mqBrokerOpt.port + 1000 // e.g., 18777 for profiling if broker is on 17777 + go func() { + pprofAddr := fmt.Sprintf(":%d", pprofPort) + glog.V(0).Infof("MQ Broker pprof server listening on %s", pprofAddr) + glog.V(0).Infof("Access profiling at: http://localhost:%d/debug/pprof/", pprofPort) + if err := http.ListenAndServe(pprofAddr, nil); err != nil { + glog.Errorf("pprof server error: %v", err) + } + }() + glog.V(0).Infof("MQ Broker listening on %s:%d", *mqBrokerOpt.ip, *mqBrokerOpt.port) grpcS.Serve(grpcL) diff --git a/weed/glog/glog.go b/weed/glog/glog.go index 754c3ac36..e04df39e6 100644 --- a/weed/glog/glog.go +++ b/weed/glog/glog.go @@ -74,7 +74,6 @@ import ( "bytes" "errors" "fmt" - flag "github.com/seaweedfs/seaweedfs/weed/util/fla9" "io" stdLog "log" "os" @@ -85,6 +84,8 @@ import ( "sync" "sync/atomic" "time" + + flag "github.com/seaweedfs/seaweedfs/weed/util/fla9" ) // severity identifies the sort of log: info, warning etc. It also implements @@ -690,18 +691,29 @@ func (l *loggingT) output(s severity, buf *buffer, file string, line int, alsoTo l.exit(err) } } - switch s { - case fatalLog: - l.file[fatalLog].Write(data) - fallthrough - case errorLog: - l.file[errorLog].Write(data) - fallthrough - case warningLog: - l.file[warningLog].Write(data) - fallthrough - case infoLog: - l.file[infoLog].Write(data) + // After exit is called, don't try to write to files + if !l.exited { + switch s { + case fatalLog: + if l.file[fatalLog] != nil { + l.file[fatalLog].Write(data) + } + fallthrough + case errorLog: + if l.file[errorLog] != nil { + l.file[errorLog].Write(data) + } + fallthrough + case warningLog: + if l.file[warningLog] != nil { + l.file[warningLog].Write(data) + } + fallthrough + case infoLog: + if l.file[infoLog] != nil { + l.file[infoLog].Write(data) + } + } } } if s == fatalLog { @@ -814,9 +826,14 @@ func (sb *syncBuffer) Write(p []byte) (n int, err error) { if sb.logger.exited { return } + // Check if Writer is nil (can happen if rotateFile failed) + if sb.Writer == nil { + return 0, errors.New("log writer is nil") + } if sb.nbytes+uint64(len(p)) >= MaxSize { if err := sb.rotateFile(time.Now()); err != nil { sb.logger.exit(err) + return 0, err } } n, err = sb.Writer.Write(p) diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 73d10a2c8..fbd3df669 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -170,6 +170,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs return localTopicPartition.Subscribe(clientName, startPosition, func() bool { if !isConnected { + glog.V(0).Infof("🔍 WAIT: %s - isConnected=false, returning false", clientName) return false } @@ -177,6 +178,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs cancelOnce.Do(func() { go func() { <-ctx.Done() + glog.V(0).Infof("🔍 CTX DONE: %s - context cancelled, broadcasting", clientName) localTopicPartition.ListenersLock.Lock() localTopicPartition.ListenersCond.Broadcast() localTopicPartition.ListenersLock.Unlock() @@ -190,7 +192,15 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs atomic.AddInt64(&localTopicPartition.ListenersWaits, -1) localTopicPartition.ListenersLock.Unlock() - if ctx.Err() != nil || !isConnected { + // Add a small sleep to avoid CPU busy-wait when checking for new data + time.Sleep(10 * time.Millisecond) + + if ctx.Err() != nil { + glog.V(0).Infof("🔍 WAIT: %s - ctx.Err()=%v, returning false", clientName, ctx.Err()) + return false + } + if !isConnected { + glog.V(0).Infof("🔍 WAIT: %s - isConnected=false after wait, returning false", clientName) return false } return true diff --git a/weed/mq/broker/broker_grpc_sub_offset.go b/weed/mq/broker/broker_grpc_sub_offset.go index c38e6bcd0..c1116cb32 100644 --- a/weed/mq/broker/broker_grpc_sub_offset.go +++ b/weed/mq/broker/broker_grpc_sub_offset.go @@ -104,7 +104,13 @@ func (b *MessageQueueBroker) subscribeWithOffsetSubscription( return false } - return !atEnd + if atEnd { + return false + } + + // Add a small sleep to avoid CPU busy-wait when checking for new data + time.Sleep(10 * time.Millisecond) + return true }, func(logEntry *filer_pb.LogEntry) (bool, error) { // Check if this message matches our offset requirements diff --git a/weed/mq/kafka/integration/seaweedmq_handler.go b/weed/mq/kafka/integration/seaweedmq_handler.go index 458ff2f40..636641b00 100644 --- a/weed/mq/kafka/integration/seaweedmq_handler.go +++ b/weed/mq/kafka/integration/seaweedmq_handler.go @@ -159,26 +159,46 @@ func (h *SeaweedMQHandler) GetStoredRecords(topic string, partition int32, fromO } } - // CRITICAL FIX: Create a FRESH subscriber session for each fetch request - // Previously, GetOrCreateSubscriber would reuse a cached session if the startOffset matched, - // but that session may have already consumed past the requested offset, causing stale/empty reads. - // This was the root cause of Schema Registry seeing empty values for offsets 2-11. - glog.Infof("[FETCH] Creating fresh subscriber for topic=%s partition=%d fromOffset=%d", topic, partition, fromOffset) + // CRITICAL FIX: Reuse existing subscriber if offset matches to avoid concurrent subscriber storm + // Creating too many concurrent subscribers to the same offset causes the broker to return + // the same data repeatedly, creating an infinite loop. + glog.Infof("[FETCH] Getting or creating subscriber for topic=%s partition=%d fromOffset=%d", topic, partition, fromOffset) - brokerSubscriber, err := brokerClient.CreateFreshSubscriber(topic, partition, fromOffset, consumerGroup, consumerID) + brokerSubscriber, err := brokerClient.GetOrCreateSubscriber(topic, partition, fromOffset) if err != nil { - glog.Errorf("[FETCH] Failed to create fresh subscriber: %v", err) - return nil, fmt.Errorf("failed to create fresh subscriber: %v", err) + glog.Errorf("[FETCH] Failed to get/create subscriber: %v", err) + return nil, fmt.Errorf("failed to get/create subscriber: %v", err) } - glog.Infof("[FETCH] Fresh subscriber created successfully") + glog.Infof("[FETCH] Subscriber ready") - // Important: Close the subscriber after reading to avoid resource leaks - defer func() { - glog.Infof("[FETCH] Closing subscriber stream for topic=%s partition=%d", topic, partition) + // CRITICAL FIX: If the subscriber has already consumed past the requested offset, + // close it and create a fresh one to avoid broker tight loop + if brokerSubscriber.StartOffset > fromOffset { + glog.Infof("[FETCH] Subscriber already at offset %d (requested %d < current), closing and recreating", + brokerSubscriber.StartOffset, fromOffset) + + // Close the old subscriber if brokerSubscriber.Stream != nil { _ = brokerSubscriber.Stream.CloseSend() } - }() + + // Remove from cache + key := fmt.Sprintf("%s-%d", topic, partition) + brokerClient.subscribersLock.Lock() + delete(brokerClient.subscribers, key) + brokerClient.subscribersLock.Unlock() + + // Create a fresh subscriber at the requested offset + brokerSubscriber, err = brokerClient.CreateFreshSubscriber(topic, partition, fromOffset, consumerGroup, consumerID) + if err != nil { + glog.Errorf("[FETCH] Failed to create fresh subscriber: %v", err) + return nil, fmt.Errorf("failed to create fresh subscriber: %v", err) + } + glog.Infof("[FETCH] Created fresh subscriber at offset %d", fromOffset) + } + + // NOTE: We DON'T close the subscriber here because we're reusing it across Fetch requests + // The subscriber will be closed when the connection closes or when a different offset is requested // Read records using the subscriber glog.Infof("[FETCH] Calling ReadRecords for topic=%s partition=%d maxRecords=%d", topic, partition, maxRecords) @@ -398,11 +418,11 @@ func (h *SeaweedMQHandler) CreateTopic(name string, partitions int32) error { // CreateTopicWithSchema creates a topic with optional value schema func (h *SeaweedMQHandler) CreateTopicWithSchema(name string, partitions int32, recordType *schema_pb.RecordType) error { - return h.CreateTopicWithSchemas(name, partitions, recordType, nil) + return h.CreateTopicWithSchemas(name, partitions, nil, recordType) } // CreateTopicWithSchemas creates a topic with optional key and value schemas -func (h *SeaweedMQHandler) CreateTopicWithSchemas(name string, partitions int32, valueRecordType *schema_pb.RecordType, keyRecordType *schema_pb.RecordType) error { +func (h *SeaweedMQHandler) CreateTopicWithSchemas(name string, partitions int32, keyRecordType *schema_pb.RecordType, valueRecordType *schema_pb.RecordType) error { // Check if topic already exists in filer if h.checkTopicInFiler(name) { return fmt.Errorf("topic %s already exists", name) @@ -1182,6 +1202,11 @@ type BrokerSubscriberSession struct { // Context for canceling reads (used for timeout) Ctx context.Context Cancel context.CancelFunc + // Mutex to prevent concurrent reads from the same stream + mu sync.Mutex + // Cache of consumed records to avoid re-reading from broker + consumedRecords []*SeaweedRecord + nextOffsetToRead int64 } // NewBrokerClientWithFilerAccessor creates a client with a shared filer accessor @@ -1229,22 +1254,35 @@ func NewBrokerClientWithFilerAccessor(brokerAddress string, filerClientAccessor // Close shuts down the broker client and all streams func (bc *BrokerClient) Close() error { + glog.V(0).Infof("🔍 BrokerClient.Close() called - closing %d publishers and %d subscribers", len(bc.publishers), len(bc.subscribers)) bc.cancel() // Close all publisher streams bc.publishersLock.Lock() - for key := range bc.publishers { + for key, session := range bc.publishers { + if session.Stream != nil { + glog.V(0).Infof("🔍 Closing publisher stream for %s", key) + _ = session.Stream.CloseSend() + } delete(bc.publishers, key) } bc.publishersLock.Unlock() // Close all subscriber streams bc.subscribersLock.Lock() - for key := range bc.subscribers { + for key, session := range bc.subscribers { + glog.V(0).Infof("🔍 Closing subscriber stream for %s", key) + if session.Stream != nil { + _ = session.Stream.CloseSend() + } + if session.Cancel != nil { + session.Cancel() + } delete(bc.subscribers, key) } bc.subscribersLock.Unlock() + glog.V(0).Infof("🔍 BrokerClient.Close() complete, closing gRPC connection") return bc.conn.Close() } @@ -1830,7 +1868,11 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta if old.Stream != nil { _ = old.Stream.CloseSend() } + if old.Cancel != nil { + old.Cancel() + } delete(bc.subscribers, key) + glog.V(0).Infof("Closed old subscriber session for %s due to offset change", key) } bc.subscribersLock.Unlock() } else { @@ -1849,9 +1891,8 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta return session, nil } - // Create a dedicated context for this subscriber that won't be canceled with the main BrokerClient context - // This prevents subscriber streams from being canceled when BrokerClient.Close() is called during test cleanup - subscriberCtx := context.Background() // Use background context instead of bc.ctx + // Create a cancellable context for this subscriber so it can be cleaned up when the connection closes + subscriberCtx, subscriberCancel := context.WithCancel(bc.ctx) stream, err := bc.client.SubscribeMessage(subscriberCtx) if err != nil { @@ -1916,9 +1957,12 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta Partition: partition, Stream: stream, StartOffset: startOffset, + Ctx: subscriberCtx, + Cancel: subscriberCancel, } bc.subscribers[key] = session + glog.V(0).Infof("Created subscriber session for %s with context cancellation support", key) return session, nil } @@ -1969,65 +2013,84 @@ func (bc *BrokerClient) ReadRecords(session *BrokerSubscriberSession, maxRecords return nil, fmt.Errorf("subscriber session cannot be nil") } + // CRITICAL: Lock to prevent concurrent reads from the same stream + // Multiple Fetch requests may try to read from the same subscriber concurrently, + // causing the broker to return the same offset repeatedly + session.mu.Lock() + defer session.mu.Unlock() + glog.Infof("[FETCH] ReadRecords: topic=%s partition=%d startOffset=%d maxRecords=%d", session.Topic, session.Partition, session.StartOffset, maxRecords) var records []*SeaweedRecord currentOffset := session.StartOffset - // Use a channel to receive records from the stream + // CRITICAL FIX: Return immediately if maxRecords is 0 or negative + if maxRecords <= 0 { + return records, nil + } + + // CRITICAL FIX: Use cached records if available to avoid broker tight loop + // If we've already consumed these records, return them from cache + if len(session.consumedRecords) > 0 { + cacheStartOffset := session.consumedRecords[0].Offset + cacheEndOffset := session.consumedRecords[len(session.consumedRecords)-1].Offset + + if currentOffset >= cacheStartOffset && currentOffset <= cacheEndOffset { + // Records are in cache + glog.Infof("[FETCH] Returning cached records: requested offset %d is in cache [%d-%d]", + currentOffset, cacheStartOffset, cacheEndOffset) + + // Find starting index in cache + startIdx := int(currentOffset - cacheStartOffset) + if startIdx < 0 || startIdx >= len(session.consumedRecords) { + glog.Errorf("[FETCH] Cache index out of bounds: startIdx=%d, cache size=%d", startIdx, len(session.consumedRecords)) + return records, nil + } + + // Return up to maxRecords from cache + endIdx := startIdx + maxRecords + if endIdx > len(session.consumedRecords) { + endIdx = len(session.consumedRecords) + } + + glog.Infof("[FETCH] Returning %d cached records from index %d to %d", endIdx-startIdx, startIdx, endIdx-1) + return session.consumedRecords[startIdx:endIdx], nil + } + } + + // Read first record with timeout (important for empty topics) + // Use longer timeout to avoid creating too many concurrent subscribers + // Wait up to 10 seconds for first record + // Broker now properly detects closed subscriptions so this is safe + firstRecordTimeout := 10 * time.Second + ctx, cancel := context.WithTimeout(context.Background(), firstRecordTimeout) + defer cancel() + type recvResult struct { resp *mq_pb.SubscribeMessageResponse err error } recvChan := make(chan recvResult, 1) - // Read records with timeout after first record - readTimeout := 50 * time.Millisecond // Wait 50ms for additional records after first one - var timer *time.Timer - - for len(records) < maxRecords { - // Start async recv - go func() { - resp, err := session.Stream.Recv() - recvChan <- recvResult{resp: resp, err: err} - }() - - // Wait for response or timeout - var result recvResult - if len(records) == 0 { - // First record - wait indefinitely (no timeout) - result = <-recvChan - } else { - // Subsequent records - use timeout - if timer == nil { - timer = time.NewTimer(readTimeout) - } else { - timer.Reset(readTimeout) - } - - select { - case result = <-recvChan: - timer.Stop() - case <-timer.C: - // Timeout - return what we have - glog.V(4).Infof("[FETCH] Read timeout after %d records, returning batch", len(records)) - return records, nil - } + // Try to receive first record + go func() { + resp, err := session.Stream.Recv() + select { + case recvChan <- recvResult{resp: resp, err: err}: + case <-ctx.Done(): + // Context cancelled, don't send (avoid blocking) } + }() + select { + case result := <-recvChan: if result.err != nil { - glog.Infof("[FETCH] Stream.Recv() error after %d records: %v", len(records), result.err) - if len(records) > 0 { - return records, nil - } - return nil, fmt.Errorf("failed to receive record: %v", result.err) + glog.Infof("[FETCH] Stream.Recv() error on first record: %v", result.err) + return records, nil // Return empty - no error for empty topic } if dataMsg := result.resp.GetData(); dataMsg != nil { - glog.V(4).Infof("[FETCH] DataMessage from broker: keyLen=%d, valueLen=%d", - len(dataMsg.Key), len(dataMsg.Value)) - record := &SeaweedRecord{ Key: dataMsg.Key, Value: dataMsg.Value, @@ -2036,13 +2099,77 @@ func (bc *BrokerClient) ReadRecords(session *BrokerSubscriberSession, maxRecords } records = append(records, record) currentOffset++ - glog.Infof("[FETCH] Received record: offset=%d, keyLen=%d, valueLen=%d", record.Offset, len(record.Key), len(record.Value)) } + + case <-ctx.Done(): + // Timeout on first record - topic is empty or no data available + glog.V(4).Infof("[FETCH] No data available (timeout on first record)") + return records, nil + } + + // If we got the first record, try to get more with shorter timeout + additionalTimeout := 50 * time.Millisecond + for len(records) < maxRecords { + ctx2, cancel2 := context.WithTimeout(context.Background(), additionalTimeout) + recvChan2 := make(chan recvResult, 1) + + go func() { + resp, err := session.Stream.Recv() + select { + case recvChan2 <- recvResult{resp: resp, err: err}: + case <-ctx2.Done(): + // Context cancelled + } + }() + + select { + case result := <-recvChan2: + cancel2() + if result.err != nil { + glog.Infof("[FETCH] Stream.Recv() error after %d records: %v", len(records), result.err) + // Update session offset before returning + session.StartOffset = currentOffset + return records, nil + } + + if dataMsg := result.resp.GetData(); dataMsg != nil { + record := &SeaweedRecord{ + Key: dataMsg.Key, + Value: dataMsg.Value, + Timestamp: dataMsg.TsNs, + Offset: currentOffset, + } + records = append(records, record) + currentOffset++ + glog.Infof("[FETCH] Received record: offset=%d, keyLen=%d, valueLen=%d", + record.Offset, len(record.Key), len(record.Value)) + } + + case <-ctx2.Done(): + cancel2() + // Timeout - return what we have + glog.V(4).Infof("[FETCH] Read timeout after %d records, returning batch", len(records)) + // CRITICAL: Update session offset so next fetch knows where we left off + session.StartOffset = currentOffset + return records, nil + } } glog.Infof("[FETCH] ReadRecords returning %d records (maxRecords reached)", len(records)) + // Update session offset after successful read + session.StartOffset = currentOffset + + // CRITICAL: Cache the consumed records to avoid broker tight loop + // Append new records to cache (keep last 100 records max) + session.consumedRecords = append(session.consumedRecords, records...) + if len(session.consumedRecords) > 100 { + // Keep only the most recent 100 records + session.consumedRecords = session.consumedRecords[len(session.consumedRecords)-100:] + } + glog.Infof("[FETCH] Updated cache: now contains %d records", len(session.consumedRecords)) + return records, nil } diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index 8030c3426..1f7a53ab8 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -121,13 +121,17 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers // Topics count - write the actual number of topics in the request // Kafka protocol: we MUST return all requested topics in the response (even with empty data) topicsCount := len(fetchRequest.Topics) + glog.Infof("🔍 FETCH CORR=%d: Writing topics count=%d at offset=%d, isFlexible=%v", correlationID, topicsCount, len(response), isFlexible) if isFlexible { // Flexible versions use compact array format (count + 1) response = append(response, EncodeUvarint(uint32(topicsCount+1))...) } else { topicsCountBytes := make([]byte, 4) binary.BigEndian.PutUint32(topicsCountBytes, uint32(topicsCount)) + glog.Infof("🔍 FETCH CORR=%d: topicsCountBytes = %02x %02x %02x %02x", correlationID, topicsCountBytes[0], topicsCountBytes[1], topicsCountBytes[2], topicsCountBytes[3]) response = append(response, topicsCountBytes...) + glog.Infof("🔍 FETCH CORR=%d: After appending topics count, response length=%d, response[10-13]=%02x %02x %02x %02x", + correlationID, len(response), response[10], response[11], response[12], response[13]) } // Process each requested topic @@ -390,6 +394,18 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers response = append(response, 0) // Empty tagged fields } + // Verify topics count hasn't been corrupted + if !isFlexible && len(response) >= 14 { + actualTopicsCount := binary.BigEndian.Uint32(response[10:14]) + if actualTopicsCount != uint32(topicsCount) { + glog.Errorf("🚨 FETCH CORR=%d: Topics count CORRUPTED! Expected %d, found %d at response[10:14]=%02x %02x %02x %02x", + correlationID, topicsCount, actualTopicsCount, response[10], response[11], response[12], response[13]) + } else { + glog.Infof("✅ FETCH CORR=%d: Topics count verified OK: %d at response[10:14]=%02x %02x %02x %02x", + correlationID, topicsCount, response[10], response[11], response[12], response[13]) + } + } + Debug("Fetch v%d response constructed, size: %d bytes (flexible: %v)", apiVersion, len(response), isFlexible) glog.Infof("FETCH RESPONSE SUMMARY: correlationID=%d topics=%d totalRecordBytes=%d totalResponseBytes=%d", correlationID, topicsCount, totalAppendedRecordBytes, len(response)) diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 8a2d9d371..40a8cb015 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -424,16 +424,20 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { Debug("[%s] NEW CONNECTION ESTABLISHED", connectionID) defer func() { + glog.V(0).Infof("🔍 [%s] Connection closing, cleaning up BrokerClient", connectionID) Debug("[%s] Connection closing, cleaning up BrokerClient", connectionID) // Close the per-connection broker client if connBrokerClient != nil { + glog.V(0).Infof("🔍 [%s] Calling BrokerClient.Close()", connectionID) if closeErr := connBrokerClient.Close(); closeErr != nil { + glog.Errorf("🔍 [%s] Error closing BrokerClient: %v", connectionID, closeErr) Error("[%s] Error closing BrokerClient: %v", connectionID, closeErr) } } RecordDisconnectionMetrics() h.connContext = nil // Clear connection context conn.Close() + glog.V(0).Infof("🔍 [%s] Connection cleanup complete", connectionID) }() r := bufio.NewReader(conn) @@ -2077,10 +2081,8 @@ func (h *Handler) handleCreateTopicsV2To4(correlationID uint32, requestBody []by // Build response response := make([]byte, 0, 128) - // Correlation ID - cid := make([]byte, 4) - binary.BigEndian.PutUint32(cid, correlationID) - response = append(response, cid...) + // NOTE: Correlation ID is handled by writeResponseWithHeader + // Do NOT include it in the response body // throttle_time_ms (4 bytes) response = append(response, 0, 0, 0, 0) // topics array count (int32) @@ -3060,7 +3062,9 @@ func (h *Handler) writeResponseWithHeader(w *bufio.Writer, correlationID uint32, if dumpLen > 64 { dumpLen = 64 } - Debug("🔍 API %d v%d response wire format (first %d bytes):\n%s", apiKey, apiVersion, dumpLen, hexDump(fullResponse[:dumpLen])) + if apiKey == 1 || apiKey == 19 { // Fetch or CreateTopics + glog.Infof("🔍 API %d v%d response wire format (first %d bytes):\n%s", apiKey, apiVersion, dumpLen, hexDump(fullResponse[:dumpLen])) + } Debug("Wrote API %d response v%d: size=%d, flexible=%t, correlationID=%d, totalBytes=%d", apiKey, apiVersion, totalSize, isFlexible, correlationID, len(fullResponse)) // Write to connection