mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-10-07 21:45:50 +08:00
Goroutine count stable
This commit is contained in:
@@ -1,6 +1,10 @@
|
||||
package command
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
|
||||
"google.golang.org/grpc/reflection"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/util/grace"
|
||||
@@ -109,6 +113,17 @@ func (mqBrokerOpt *MessageQueueBrokerOptions) startQueueServer() bool {
|
||||
}()
|
||||
}
|
||||
|
||||
// Start HTTP profiling server
|
||||
pprofPort := *mqBrokerOpt.port + 1000 // e.g., 18777 for profiling if broker is on 17777
|
||||
go func() {
|
||||
pprofAddr := fmt.Sprintf(":%d", pprofPort)
|
||||
glog.V(0).Infof("MQ Broker pprof server listening on %s", pprofAddr)
|
||||
glog.V(0).Infof("Access profiling at: http://localhost:%d/debug/pprof/", pprofPort)
|
||||
if err := http.ListenAndServe(pprofAddr, nil); err != nil {
|
||||
glog.Errorf("pprof server error: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
glog.V(0).Infof("MQ Broker listening on %s:%d", *mqBrokerOpt.ip, *mqBrokerOpt.port)
|
||||
grpcS.Serve(grpcL)
|
||||
|
||||
|
@@ -74,7 +74,6 @@ import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
flag "github.com/seaweedfs/seaweedfs/weed/util/fla9"
|
||||
"io"
|
||||
stdLog "log"
|
||||
"os"
|
||||
@@ -85,6 +84,8 @@ import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
flag "github.com/seaweedfs/seaweedfs/weed/util/fla9"
|
||||
)
|
||||
|
||||
// severity identifies the sort of log: info, warning etc. It also implements
|
||||
@@ -690,18 +691,29 @@ func (l *loggingT) output(s severity, buf *buffer, file string, line int, alsoTo
|
||||
l.exit(err)
|
||||
}
|
||||
}
|
||||
switch s {
|
||||
case fatalLog:
|
||||
l.file[fatalLog].Write(data)
|
||||
fallthrough
|
||||
case errorLog:
|
||||
l.file[errorLog].Write(data)
|
||||
fallthrough
|
||||
case warningLog:
|
||||
l.file[warningLog].Write(data)
|
||||
fallthrough
|
||||
case infoLog:
|
||||
l.file[infoLog].Write(data)
|
||||
// After exit is called, don't try to write to files
|
||||
if !l.exited {
|
||||
switch s {
|
||||
case fatalLog:
|
||||
if l.file[fatalLog] != nil {
|
||||
l.file[fatalLog].Write(data)
|
||||
}
|
||||
fallthrough
|
||||
case errorLog:
|
||||
if l.file[errorLog] != nil {
|
||||
l.file[errorLog].Write(data)
|
||||
}
|
||||
fallthrough
|
||||
case warningLog:
|
||||
if l.file[warningLog] != nil {
|
||||
l.file[warningLog].Write(data)
|
||||
}
|
||||
fallthrough
|
||||
case infoLog:
|
||||
if l.file[infoLog] != nil {
|
||||
l.file[infoLog].Write(data)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if s == fatalLog {
|
||||
@@ -814,9 +826,14 @@ func (sb *syncBuffer) Write(p []byte) (n int, err error) {
|
||||
if sb.logger.exited {
|
||||
return
|
||||
}
|
||||
// Check if Writer is nil (can happen if rotateFile failed)
|
||||
if sb.Writer == nil {
|
||||
return 0, errors.New("log writer is nil")
|
||||
}
|
||||
if sb.nbytes+uint64(len(p)) >= MaxSize {
|
||||
if err := sb.rotateFile(time.Now()); err != nil {
|
||||
sb.logger.exit(err)
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
n, err = sb.Writer.Write(p)
|
||||
|
@@ -170,6 +170,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
|
||||
|
||||
return localTopicPartition.Subscribe(clientName, startPosition, func() bool {
|
||||
if !isConnected {
|
||||
glog.V(0).Infof("🔍 WAIT: %s - isConnected=false, returning false", clientName)
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -177,6 +178,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
|
||||
cancelOnce.Do(func() {
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
glog.V(0).Infof("🔍 CTX DONE: %s - context cancelled, broadcasting", clientName)
|
||||
localTopicPartition.ListenersLock.Lock()
|
||||
localTopicPartition.ListenersCond.Broadcast()
|
||||
localTopicPartition.ListenersLock.Unlock()
|
||||
@@ -190,7 +192,15 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
|
||||
atomic.AddInt64(&localTopicPartition.ListenersWaits, -1)
|
||||
localTopicPartition.ListenersLock.Unlock()
|
||||
|
||||
if ctx.Err() != nil || !isConnected {
|
||||
// Add a small sleep to avoid CPU busy-wait when checking for new data
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
if ctx.Err() != nil {
|
||||
glog.V(0).Infof("🔍 WAIT: %s - ctx.Err()=%v, returning false", clientName, ctx.Err())
|
||||
return false
|
||||
}
|
||||
if !isConnected {
|
||||
glog.V(0).Infof("🔍 WAIT: %s - isConnected=false after wait, returning false", clientName)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
|
@@ -104,7 +104,13 @@ func (b *MessageQueueBroker) subscribeWithOffsetSubscription(
|
||||
return false
|
||||
}
|
||||
|
||||
return !atEnd
|
||||
if atEnd {
|
||||
return false
|
||||
}
|
||||
|
||||
// Add a small sleep to avoid CPU busy-wait when checking for new data
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
return true
|
||||
},
|
||||
func(logEntry *filer_pb.LogEntry) (bool, error) {
|
||||
// Check if this message matches our offset requirements
|
||||
|
@@ -159,26 +159,46 @@ func (h *SeaweedMQHandler) GetStoredRecords(topic string, partition int32, fromO
|
||||
}
|
||||
}
|
||||
|
||||
// CRITICAL FIX: Create a FRESH subscriber session for each fetch request
|
||||
// Previously, GetOrCreateSubscriber would reuse a cached session if the startOffset matched,
|
||||
// but that session may have already consumed past the requested offset, causing stale/empty reads.
|
||||
// This was the root cause of Schema Registry seeing empty values for offsets 2-11.
|
||||
glog.Infof("[FETCH] Creating fresh subscriber for topic=%s partition=%d fromOffset=%d", topic, partition, fromOffset)
|
||||
// CRITICAL FIX: Reuse existing subscriber if offset matches to avoid concurrent subscriber storm
|
||||
// Creating too many concurrent subscribers to the same offset causes the broker to return
|
||||
// the same data repeatedly, creating an infinite loop.
|
||||
glog.Infof("[FETCH] Getting or creating subscriber for topic=%s partition=%d fromOffset=%d", topic, partition, fromOffset)
|
||||
|
||||
brokerSubscriber, err := brokerClient.CreateFreshSubscriber(topic, partition, fromOffset, consumerGroup, consumerID)
|
||||
brokerSubscriber, err := brokerClient.GetOrCreateSubscriber(topic, partition, fromOffset)
|
||||
if err != nil {
|
||||
glog.Errorf("[FETCH] Failed to create fresh subscriber: %v", err)
|
||||
return nil, fmt.Errorf("failed to create fresh subscriber: %v", err)
|
||||
glog.Errorf("[FETCH] Failed to get/create subscriber: %v", err)
|
||||
return nil, fmt.Errorf("failed to get/create subscriber: %v", err)
|
||||
}
|
||||
glog.Infof("[FETCH] Fresh subscriber created successfully")
|
||||
glog.Infof("[FETCH] Subscriber ready")
|
||||
|
||||
// Important: Close the subscriber after reading to avoid resource leaks
|
||||
defer func() {
|
||||
glog.Infof("[FETCH] Closing subscriber stream for topic=%s partition=%d", topic, partition)
|
||||
// CRITICAL FIX: If the subscriber has already consumed past the requested offset,
|
||||
// close it and create a fresh one to avoid broker tight loop
|
||||
if brokerSubscriber.StartOffset > fromOffset {
|
||||
glog.Infof("[FETCH] Subscriber already at offset %d (requested %d < current), closing and recreating",
|
||||
brokerSubscriber.StartOffset, fromOffset)
|
||||
|
||||
// Close the old subscriber
|
||||
if brokerSubscriber.Stream != nil {
|
||||
_ = brokerSubscriber.Stream.CloseSend()
|
||||
}
|
||||
}()
|
||||
|
||||
// Remove from cache
|
||||
key := fmt.Sprintf("%s-%d", topic, partition)
|
||||
brokerClient.subscribersLock.Lock()
|
||||
delete(brokerClient.subscribers, key)
|
||||
brokerClient.subscribersLock.Unlock()
|
||||
|
||||
// Create a fresh subscriber at the requested offset
|
||||
brokerSubscriber, err = brokerClient.CreateFreshSubscriber(topic, partition, fromOffset, consumerGroup, consumerID)
|
||||
if err != nil {
|
||||
glog.Errorf("[FETCH] Failed to create fresh subscriber: %v", err)
|
||||
return nil, fmt.Errorf("failed to create fresh subscriber: %v", err)
|
||||
}
|
||||
glog.Infof("[FETCH] Created fresh subscriber at offset %d", fromOffset)
|
||||
}
|
||||
|
||||
// NOTE: We DON'T close the subscriber here because we're reusing it across Fetch requests
|
||||
// The subscriber will be closed when the connection closes or when a different offset is requested
|
||||
|
||||
// Read records using the subscriber
|
||||
glog.Infof("[FETCH] Calling ReadRecords for topic=%s partition=%d maxRecords=%d", topic, partition, maxRecords)
|
||||
@@ -398,11 +418,11 @@ func (h *SeaweedMQHandler) CreateTopic(name string, partitions int32) error {
|
||||
|
||||
// CreateTopicWithSchema creates a topic with optional value schema
|
||||
func (h *SeaweedMQHandler) CreateTopicWithSchema(name string, partitions int32, recordType *schema_pb.RecordType) error {
|
||||
return h.CreateTopicWithSchemas(name, partitions, recordType, nil)
|
||||
return h.CreateTopicWithSchemas(name, partitions, nil, recordType)
|
||||
}
|
||||
|
||||
// CreateTopicWithSchemas creates a topic with optional key and value schemas
|
||||
func (h *SeaweedMQHandler) CreateTopicWithSchemas(name string, partitions int32, valueRecordType *schema_pb.RecordType, keyRecordType *schema_pb.RecordType) error {
|
||||
func (h *SeaweedMQHandler) CreateTopicWithSchemas(name string, partitions int32, keyRecordType *schema_pb.RecordType, valueRecordType *schema_pb.RecordType) error {
|
||||
// Check if topic already exists in filer
|
||||
if h.checkTopicInFiler(name) {
|
||||
return fmt.Errorf("topic %s already exists", name)
|
||||
@@ -1182,6 +1202,11 @@ type BrokerSubscriberSession struct {
|
||||
// Context for canceling reads (used for timeout)
|
||||
Ctx context.Context
|
||||
Cancel context.CancelFunc
|
||||
// Mutex to prevent concurrent reads from the same stream
|
||||
mu sync.Mutex
|
||||
// Cache of consumed records to avoid re-reading from broker
|
||||
consumedRecords []*SeaweedRecord
|
||||
nextOffsetToRead int64
|
||||
}
|
||||
|
||||
// NewBrokerClientWithFilerAccessor creates a client with a shared filer accessor
|
||||
@@ -1229,22 +1254,35 @@ func NewBrokerClientWithFilerAccessor(brokerAddress string, filerClientAccessor
|
||||
|
||||
// Close shuts down the broker client and all streams
|
||||
func (bc *BrokerClient) Close() error {
|
||||
glog.V(0).Infof("🔍 BrokerClient.Close() called - closing %d publishers and %d subscribers", len(bc.publishers), len(bc.subscribers))
|
||||
bc.cancel()
|
||||
|
||||
// Close all publisher streams
|
||||
bc.publishersLock.Lock()
|
||||
for key := range bc.publishers {
|
||||
for key, session := range bc.publishers {
|
||||
if session.Stream != nil {
|
||||
glog.V(0).Infof("🔍 Closing publisher stream for %s", key)
|
||||
_ = session.Stream.CloseSend()
|
||||
}
|
||||
delete(bc.publishers, key)
|
||||
}
|
||||
bc.publishersLock.Unlock()
|
||||
|
||||
// Close all subscriber streams
|
||||
bc.subscribersLock.Lock()
|
||||
for key := range bc.subscribers {
|
||||
for key, session := range bc.subscribers {
|
||||
glog.V(0).Infof("🔍 Closing subscriber stream for %s", key)
|
||||
if session.Stream != nil {
|
||||
_ = session.Stream.CloseSend()
|
||||
}
|
||||
if session.Cancel != nil {
|
||||
session.Cancel()
|
||||
}
|
||||
delete(bc.subscribers, key)
|
||||
}
|
||||
bc.subscribersLock.Unlock()
|
||||
|
||||
glog.V(0).Infof("🔍 BrokerClient.Close() complete, closing gRPC connection")
|
||||
return bc.conn.Close()
|
||||
}
|
||||
|
||||
@@ -1830,7 +1868,11 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta
|
||||
if old.Stream != nil {
|
||||
_ = old.Stream.CloseSend()
|
||||
}
|
||||
if old.Cancel != nil {
|
||||
old.Cancel()
|
||||
}
|
||||
delete(bc.subscribers, key)
|
||||
glog.V(0).Infof("Closed old subscriber session for %s due to offset change", key)
|
||||
}
|
||||
bc.subscribersLock.Unlock()
|
||||
} else {
|
||||
@@ -1849,9 +1891,8 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta
|
||||
return session, nil
|
||||
}
|
||||
|
||||
// Create a dedicated context for this subscriber that won't be canceled with the main BrokerClient context
|
||||
// This prevents subscriber streams from being canceled when BrokerClient.Close() is called during test cleanup
|
||||
subscriberCtx := context.Background() // Use background context instead of bc.ctx
|
||||
// Create a cancellable context for this subscriber so it can be cleaned up when the connection closes
|
||||
subscriberCtx, subscriberCancel := context.WithCancel(bc.ctx)
|
||||
|
||||
stream, err := bc.client.SubscribeMessage(subscriberCtx)
|
||||
if err != nil {
|
||||
@@ -1916,9 +1957,12 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta
|
||||
Partition: partition,
|
||||
Stream: stream,
|
||||
StartOffset: startOffset,
|
||||
Ctx: subscriberCtx,
|
||||
Cancel: subscriberCancel,
|
||||
}
|
||||
|
||||
bc.subscribers[key] = session
|
||||
glog.V(0).Infof("Created subscriber session for %s with context cancellation support", key)
|
||||
return session, nil
|
||||
}
|
||||
|
||||
@@ -1969,65 +2013,84 @@ func (bc *BrokerClient) ReadRecords(session *BrokerSubscriberSession, maxRecords
|
||||
return nil, fmt.Errorf("subscriber session cannot be nil")
|
||||
}
|
||||
|
||||
// CRITICAL: Lock to prevent concurrent reads from the same stream
|
||||
// Multiple Fetch requests may try to read from the same subscriber concurrently,
|
||||
// causing the broker to return the same offset repeatedly
|
||||
session.mu.Lock()
|
||||
defer session.mu.Unlock()
|
||||
|
||||
glog.Infof("[FETCH] ReadRecords: topic=%s partition=%d startOffset=%d maxRecords=%d",
|
||||
session.Topic, session.Partition, session.StartOffset, maxRecords)
|
||||
|
||||
var records []*SeaweedRecord
|
||||
currentOffset := session.StartOffset
|
||||
|
||||
// Use a channel to receive records from the stream
|
||||
// CRITICAL FIX: Return immediately if maxRecords is 0 or negative
|
||||
if maxRecords <= 0 {
|
||||
return records, nil
|
||||
}
|
||||
|
||||
// CRITICAL FIX: Use cached records if available to avoid broker tight loop
|
||||
// If we've already consumed these records, return them from cache
|
||||
if len(session.consumedRecords) > 0 {
|
||||
cacheStartOffset := session.consumedRecords[0].Offset
|
||||
cacheEndOffset := session.consumedRecords[len(session.consumedRecords)-1].Offset
|
||||
|
||||
if currentOffset >= cacheStartOffset && currentOffset <= cacheEndOffset {
|
||||
// Records are in cache
|
||||
glog.Infof("[FETCH] Returning cached records: requested offset %d is in cache [%d-%d]",
|
||||
currentOffset, cacheStartOffset, cacheEndOffset)
|
||||
|
||||
// Find starting index in cache
|
||||
startIdx := int(currentOffset - cacheStartOffset)
|
||||
if startIdx < 0 || startIdx >= len(session.consumedRecords) {
|
||||
glog.Errorf("[FETCH] Cache index out of bounds: startIdx=%d, cache size=%d", startIdx, len(session.consumedRecords))
|
||||
return records, nil
|
||||
}
|
||||
|
||||
// Return up to maxRecords from cache
|
||||
endIdx := startIdx + maxRecords
|
||||
if endIdx > len(session.consumedRecords) {
|
||||
endIdx = len(session.consumedRecords)
|
||||
}
|
||||
|
||||
glog.Infof("[FETCH] Returning %d cached records from index %d to %d", endIdx-startIdx, startIdx, endIdx-1)
|
||||
return session.consumedRecords[startIdx:endIdx], nil
|
||||
}
|
||||
}
|
||||
|
||||
// Read first record with timeout (important for empty topics)
|
||||
// Use longer timeout to avoid creating too many concurrent subscribers
|
||||
// Wait up to 10 seconds for first record
|
||||
// Broker now properly detects closed subscriptions so this is safe
|
||||
firstRecordTimeout := 10 * time.Second
|
||||
ctx, cancel := context.WithTimeout(context.Background(), firstRecordTimeout)
|
||||
defer cancel()
|
||||
|
||||
type recvResult struct {
|
||||
resp *mq_pb.SubscribeMessageResponse
|
||||
err error
|
||||
}
|
||||
recvChan := make(chan recvResult, 1)
|
||||
|
||||
// Read records with timeout after first record
|
||||
readTimeout := 50 * time.Millisecond // Wait 50ms for additional records after first one
|
||||
var timer *time.Timer
|
||||
|
||||
for len(records) < maxRecords {
|
||||
// Start async recv
|
||||
go func() {
|
||||
resp, err := session.Stream.Recv()
|
||||
recvChan <- recvResult{resp: resp, err: err}
|
||||
}()
|
||||
|
||||
// Wait for response or timeout
|
||||
var result recvResult
|
||||
if len(records) == 0 {
|
||||
// First record - wait indefinitely (no timeout)
|
||||
result = <-recvChan
|
||||
} else {
|
||||
// Subsequent records - use timeout
|
||||
if timer == nil {
|
||||
timer = time.NewTimer(readTimeout)
|
||||
} else {
|
||||
timer.Reset(readTimeout)
|
||||
}
|
||||
|
||||
select {
|
||||
case result = <-recvChan:
|
||||
timer.Stop()
|
||||
case <-timer.C:
|
||||
// Timeout - return what we have
|
||||
glog.V(4).Infof("[FETCH] Read timeout after %d records, returning batch", len(records))
|
||||
return records, nil
|
||||
}
|
||||
// Try to receive first record
|
||||
go func() {
|
||||
resp, err := session.Stream.Recv()
|
||||
select {
|
||||
case recvChan <- recvResult{resp: resp, err: err}:
|
||||
case <-ctx.Done():
|
||||
// Context cancelled, don't send (avoid blocking)
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case result := <-recvChan:
|
||||
if result.err != nil {
|
||||
glog.Infof("[FETCH] Stream.Recv() error after %d records: %v", len(records), result.err)
|
||||
if len(records) > 0 {
|
||||
return records, nil
|
||||
}
|
||||
return nil, fmt.Errorf("failed to receive record: %v", result.err)
|
||||
glog.Infof("[FETCH] Stream.Recv() error on first record: %v", result.err)
|
||||
return records, nil // Return empty - no error for empty topic
|
||||
}
|
||||
|
||||
if dataMsg := result.resp.GetData(); dataMsg != nil {
|
||||
glog.V(4).Infof("[FETCH] DataMessage from broker: keyLen=%d, valueLen=%d",
|
||||
len(dataMsg.Key), len(dataMsg.Value))
|
||||
|
||||
record := &SeaweedRecord{
|
||||
Key: dataMsg.Key,
|
||||
Value: dataMsg.Value,
|
||||
@@ -2036,13 +2099,77 @@ func (bc *BrokerClient) ReadRecords(session *BrokerSubscriberSession, maxRecords
|
||||
}
|
||||
records = append(records, record)
|
||||
currentOffset++
|
||||
|
||||
glog.Infof("[FETCH] Received record: offset=%d, keyLen=%d, valueLen=%d",
|
||||
record.Offset, len(record.Key), len(record.Value))
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
// Timeout on first record - topic is empty or no data available
|
||||
glog.V(4).Infof("[FETCH] No data available (timeout on first record)")
|
||||
return records, nil
|
||||
}
|
||||
|
||||
// If we got the first record, try to get more with shorter timeout
|
||||
additionalTimeout := 50 * time.Millisecond
|
||||
for len(records) < maxRecords {
|
||||
ctx2, cancel2 := context.WithTimeout(context.Background(), additionalTimeout)
|
||||
recvChan2 := make(chan recvResult, 1)
|
||||
|
||||
go func() {
|
||||
resp, err := session.Stream.Recv()
|
||||
select {
|
||||
case recvChan2 <- recvResult{resp: resp, err: err}:
|
||||
case <-ctx2.Done():
|
||||
// Context cancelled
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case result := <-recvChan2:
|
||||
cancel2()
|
||||
if result.err != nil {
|
||||
glog.Infof("[FETCH] Stream.Recv() error after %d records: %v", len(records), result.err)
|
||||
// Update session offset before returning
|
||||
session.StartOffset = currentOffset
|
||||
return records, nil
|
||||
}
|
||||
|
||||
if dataMsg := result.resp.GetData(); dataMsg != nil {
|
||||
record := &SeaweedRecord{
|
||||
Key: dataMsg.Key,
|
||||
Value: dataMsg.Value,
|
||||
Timestamp: dataMsg.TsNs,
|
||||
Offset: currentOffset,
|
||||
}
|
||||
records = append(records, record)
|
||||
currentOffset++
|
||||
glog.Infof("[FETCH] Received record: offset=%d, keyLen=%d, valueLen=%d",
|
||||
record.Offset, len(record.Key), len(record.Value))
|
||||
}
|
||||
|
||||
case <-ctx2.Done():
|
||||
cancel2()
|
||||
// Timeout - return what we have
|
||||
glog.V(4).Infof("[FETCH] Read timeout after %d records, returning batch", len(records))
|
||||
// CRITICAL: Update session offset so next fetch knows where we left off
|
||||
session.StartOffset = currentOffset
|
||||
return records, nil
|
||||
}
|
||||
}
|
||||
|
||||
glog.Infof("[FETCH] ReadRecords returning %d records (maxRecords reached)", len(records))
|
||||
// Update session offset after successful read
|
||||
session.StartOffset = currentOffset
|
||||
|
||||
// CRITICAL: Cache the consumed records to avoid broker tight loop
|
||||
// Append new records to cache (keep last 100 records max)
|
||||
session.consumedRecords = append(session.consumedRecords, records...)
|
||||
if len(session.consumedRecords) > 100 {
|
||||
// Keep only the most recent 100 records
|
||||
session.consumedRecords = session.consumedRecords[len(session.consumedRecords)-100:]
|
||||
}
|
||||
glog.Infof("[FETCH] Updated cache: now contains %d records", len(session.consumedRecords))
|
||||
|
||||
return records, nil
|
||||
}
|
||||
|
||||
|
@@ -121,13 +121,17 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers
|
||||
// Topics count - write the actual number of topics in the request
|
||||
// Kafka protocol: we MUST return all requested topics in the response (even with empty data)
|
||||
topicsCount := len(fetchRequest.Topics)
|
||||
glog.Infof("🔍 FETCH CORR=%d: Writing topics count=%d at offset=%d, isFlexible=%v", correlationID, topicsCount, len(response), isFlexible)
|
||||
if isFlexible {
|
||||
// Flexible versions use compact array format (count + 1)
|
||||
response = append(response, EncodeUvarint(uint32(topicsCount+1))...)
|
||||
} else {
|
||||
topicsCountBytes := make([]byte, 4)
|
||||
binary.BigEndian.PutUint32(topicsCountBytes, uint32(topicsCount))
|
||||
glog.Infof("🔍 FETCH CORR=%d: topicsCountBytes = %02x %02x %02x %02x", correlationID, topicsCountBytes[0], topicsCountBytes[1], topicsCountBytes[2], topicsCountBytes[3])
|
||||
response = append(response, topicsCountBytes...)
|
||||
glog.Infof("🔍 FETCH CORR=%d: After appending topics count, response length=%d, response[10-13]=%02x %02x %02x %02x",
|
||||
correlationID, len(response), response[10], response[11], response[12], response[13])
|
||||
}
|
||||
|
||||
// Process each requested topic
|
||||
@@ -390,6 +394,18 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers
|
||||
response = append(response, 0) // Empty tagged fields
|
||||
}
|
||||
|
||||
// Verify topics count hasn't been corrupted
|
||||
if !isFlexible && len(response) >= 14 {
|
||||
actualTopicsCount := binary.BigEndian.Uint32(response[10:14])
|
||||
if actualTopicsCount != uint32(topicsCount) {
|
||||
glog.Errorf("🚨 FETCH CORR=%d: Topics count CORRUPTED! Expected %d, found %d at response[10:14]=%02x %02x %02x %02x",
|
||||
correlationID, topicsCount, actualTopicsCount, response[10], response[11], response[12], response[13])
|
||||
} else {
|
||||
glog.Infof("✅ FETCH CORR=%d: Topics count verified OK: %d at response[10:14]=%02x %02x %02x %02x",
|
||||
correlationID, topicsCount, response[10], response[11], response[12], response[13])
|
||||
}
|
||||
}
|
||||
|
||||
Debug("Fetch v%d response constructed, size: %d bytes (flexible: %v)", apiVersion, len(response), isFlexible)
|
||||
glog.Infof("FETCH RESPONSE SUMMARY: correlationID=%d topics=%d totalRecordBytes=%d totalResponseBytes=%d", correlationID, topicsCount, totalAppendedRecordBytes, len(response))
|
||||
|
||||
|
@@ -424,16 +424,20 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
|
||||
Debug("[%s] NEW CONNECTION ESTABLISHED", connectionID)
|
||||
|
||||
defer func() {
|
||||
glog.V(0).Infof("🔍 [%s] Connection closing, cleaning up BrokerClient", connectionID)
|
||||
Debug("[%s] Connection closing, cleaning up BrokerClient", connectionID)
|
||||
// Close the per-connection broker client
|
||||
if connBrokerClient != nil {
|
||||
glog.V(0).Infof("🔍 [%s] Calling BrokerClient.Close()", connectionID)
|
||||
if closeErr := connBrokerClient.Close(); closeErr != nil {
|
||||
glog.Errorf("🔍 [%s] Error closing BrokerClient: %v", connectionID, closeErr)
|
||||
Error("[%s] Error closing BrokerClient: %v", connectionID, closeErr)
|
||||
}
|
||||
}
|
||||
RecordDisconnectionMetrics()
|
||||
h.connContext = nil // Clear connection context
|
||||
conn.Close()
|
||||
glog.V(0).Infof("🔍 [%s] Connection cleanup complete", connectionID)
|
||||
}()
|
||||
|
||||
r := bufio.NewReader(conn)
|
||||
@@ -2077,10 +2081,8 @@ func (h *Handler) handleCreateTopicsV2To4(correlationID uint32, requestBody []by
|
||||
|
||||
// Build response
|
||||
response := make([]byte, 0, 128)
|
||||
// Correlation ID
|
||||
cid := make([]byte, 4)
|
||||
binary.BigEndian.PutUint32(cid, correlationID)
|
||||
response = append(response, cid...)
|
||||
// NOTE: Correlation ID is handled by writeResponseWithHeader
|
||||
// Do NOT include it in the response body
|
||||
// throttle_time_ms (4 bytes)
|
||||
response = append(response, 0, 0, 0, 0)
|
||||
// topics array count (int32)
|
||||
@@ -3060,7 +3062,9 @@ func (h *Handler) writeResponseWithHeader(w *bufio.Writer, correlationID uint32,
|
||||
if dumpLen > 64 {
|
||||
dumpLen = 64
|
||||
}
|
||||
Debug("🔍 API %d v%d response wire format (first %d bytes):\n%s", apiKey, apiVersion, dumpLen, hexDump(fullResponse[:dumpLen]))
|
||||
if apiKey == 1 || apiKey == 19 { // Fetch or CreateTopics
|
||||
glog.Infof("🔍 API %d v%d response wire format (first %d bytes):\n%s", apiKey, apiVersion, dumpLen, hexDump(fullResponse[:dumpLen]))
|
||||
}
|
||||
Debug("Wrote API %d response v%d: size=%d, flexible=%t, correlationID=%d, totalBytes=%d", apiKey, apiVersion, totalSize, isFlexible, correlationID, len(fullResponse))
|
||||
|
||||
// Write to connection
|
||||
|
Reference in New Issue
Block a user