feat: add context timeout propagation to produce path

This commit adds proper context propagation throughout the produce path,
enabling client-side timeouts to be honored on the broker side. Previously,
only fetch operations respected client timeouts - produce operations continued
indefinitely even if the client gave up.

Changes:
- Add ctx parameter to ProduceRecord and ProduceRecordValue signatures
- Add ctx parameter to PublishRecord and PublishRecordValue in BrokerClient
- Add ctx parameter to handleProduce and related internal functions
- Update all callers (protocol handlers, mocks, tests) to pass context
- Add context cancellation checks in PublishRecord before operations

Benefits:
- Faster failure detection when client times out
- No orphaned publish operations consuming broker resources
- Resource efficiency improvements (no goroutine/stream/lock leaks)
- Consistent timeout behavior between produce and fetch paths
- Better error handling with proper cancellation signals

This fixes the root cause of CI test timeouts where produce operations
continued indefinitely after clients gave up, leading to cascading delays.
This commit is contained in:
chrislu
2025-10-15 20:31:44 -07:00
parent 66d87659e5
commit e1a4bff794
7 changed files with 76 additions and 46 deletions

View File

@@ -98,7 +98,7 @@ func (m *mockSeaweedMQHandler) GetTopicInfo(topic string) (*integration.KafkaTop
return info, exists
}
func (m *mockSeaweedMQHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
func (m *mockSeaweedMQHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) {
m.mu.Lock()
defer m.mu.Unlock()
@@ -128,8 +128,8 @@ func (m *mockSeaweedMQHandler) ProduceRecord(topicName string, partitionID int32
return offset, nil
}
func (m *mockSeaweedMQHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
return m.ProduceRecord(topicName, partitionID, key, recordValueBytes)
func (m *mockSeaweedMQHandler) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
return m.ProduceRecord(ctx, topicName, partitionID, key, recordValueBytes)
}
func (m *mockSeaweedMQHandler) GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]integration.SMQRecord, error) {

View File

@@ -1,6 +1,7 @@
package integration
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -10,7 +11,12 @@ import (
)
// PublishRecord publishes a single record to SeaweedMQ broker
func (bc *BrokerClient) PublishRecord(topic string, partition int32, key []byte, value []byte, timestamp int64) (int64, error) {
// ctx controls the publish timeout - if client cancels, publish operation is cancelled
func (bc *BrokerClient) PublishRecord(ctx context.Context, topic string, partition int32, key []byte, value []byte, timestamp int64) (int64, error) {
// Check context before starting
if err := ctx.Err(); err != nil {
return 0, fmt.Errorf("context cancelled before publish: %w", err)
}
session, err := bc.getOrCreatePublisher(topic, partition)
if err != nil {
@@ -26,6 +32,11 @@ func (bc *BrokerClient) PublishRecord(topic string, partition int32, key []byte,
session.mu.Lock()
defer session.mu.Unlock()
// Check context after acquiring lock
if err := ctx.Err(); err != nil {
return 0, fmt.Errorf("context cancelled after lock: %w", err)
}
// Send data message using broker API format
dataMsg := &mq_pb.DataMessage{
Key: key,
@@ -68,7 +79,13 @@ func (bc *BrokerClient) PublishRecord(topic string, partition int32, key []byte,
}
// PublishRecordValue publishes a RecordValue message to SeaweedMQ via broker
func (bc *BrokerClient) PublishRecordValue(topic string, partition int32, key []byte, recordValueBytes []byte, timestamp int64) (int64, error) {
// ctx controls the publish timeout - if client cancels, publish operation is cancelled
func (bc *BrokerClient) PublishRecordValue(ctx context.Context, topic string, partition int32, key []byte, recordValueBytes []byte, timestamp int64) (int64, error) {
// Check context before starting
if err := ctx.Err(); err != nil {
return 0, fmt.Errorf("context cancelled before publish: %w", err)
}
session, err := bc.getOrCreatePublisher(topic, partition)
if err != nil {
return 0, err
@@ -82,6 +99,11 @@ func (bc *BrokerClient) PublishRecordValue(topic string, partition int32, key []
session.mu.Lock()
defer session.mu.Unlock()
// Check context after acquiring lock
if err := ctx.Err(); err != nil {
return 0, fmt.Errorf("context cancelled after lock: %w", err)
}
// Send data message with RecordValue in the Value field
dataMsg := &mq_pb.DataMessage{
Key: key,

View File

@@ -216,7 +216,8 @@ func (h *SeaweedMQHandler) GetFilerAddress() string {
}
// ProduceRecord publishes a record to SeaweedMQ and lets SMQ generate the offset
func (h *SeaweedMQHandler) ProduceRecord(topic string, partition int32, key []byte, value []byte) (int64, error) {
// ctx controls the publish timeout - if client cancels, broker operation is cancelled
func (h *SeaweedMQHandler) ProduceRecord(ctx context.Context, topic string, partition int32, key []byte, value []byte) (int64, error) {
if len(key) > 0 {
}
if len(value) > 0 {
@@ -237,7 +238,7 @@ func (h *SeaweedMQHandler) ProduceRecord(topic string, partition int32, key []by
if h.brokerClient == nil {
publishErr = fmt.Errorf("no broker client available")
} else {
smqOffset, publishErr = h.brokerClient.PublishRecord(topic, partition, key, value, timestamp)
smqOffset, publishErr = h.brokerClient.PublishRecord(ctx, topic, partition, key, value, timestamp)
}
if publishErr != nil {
@@ -258,7 +259,8 @@ func (h *SeaweedMQHandler) ProduceRecord(topic string, partition int32, key []by
// ProduceRecordValue produces a record using RecordValue format to SeaweedMQ
// ALWAYS uses broker's assigned offset - no ledger involved
func (h *SeaweedMQHandler) ProduceRecordValue(topic string, partition int32, key []byte, recordValueBytes []byte) (int64, error) {
// ctx controls the publish timeout - if client cancels, broker operation is cancelled
func (h *SeaweedMQHandler) ProduceRecordValue(ctx context.Context, topic string, partition int32, key []byte, recordValueBytes []byte) (int64, error) {
// Verify topic exists
if !h.TopicExists(topic) {
return 0, fmt.Errorf("topic %s does not exist", topic)
@@ -273,7 +275,7 @@ func (h *SeaweedMQHandler) ProduceRecordValue(topic string, partition int32, key
if h.brokerClient == nil {
publishErr = fmt.Errorf("no broker client available")
} else {
smqOffset, publishErr = h.brokerClient.PublishRecordValue(topic, partition, key, recordValueBytes, timestamp)
smqOffset, publishErr = h.brokerClient.PublishRecordValue(ctx, topic, partition, key, recordValueBytes, timestamp)
}
if publishErr != nil {

View File

@@ -1,6 +1,7 @@
package integration
import (
"context"
"testing"
"time"
)
@@ -269,7 +270,7 @@ func TestSeaweedMQHandler_ProduceRecord(t *testing.T) {
key := []byte("produce-key")
value := []byte("produce-value")
offset, err := handler.ProduceRecord(topicName, 0, key, value)
offset, err := handler.ProduceRecord(context.Background(), topicName, 0, key, value)
if err != nil {
t.Fatalf("Failed to produce record: %v", err)
}
@@ -316,7 +317,7 @@ func TestSeaweedMQHandler_MultiplePartitions(t *testing.T) {
key := []byte("partition-key")
value := []byte("partition-value")
offset, err := handler.ProduceRecord(topicName, partitionID, key, value)
offset, err := handler.ProduceRecord(context.Background(), topicName, partitionID, key, value)
if err != nil {
t.Fatalf("Failed to produce to partition %d: %v", partitionID, err)
}
@@ -366,7 +367,7 @@ func TestSeaweedMQHandler_FetchRecords(t *testing.T) {
var producedOffsets []int64
for i, record := range testRecords {
offset, err := handler.ProduceRecord(topicName, 0, []byte(record.key), []byte(record.value))
offset, err := handler.ProduceRecord(context.Background(), topicName, 0, []byte(record.key), []byte(record.value))
if err != nil {
t.Fatalf("Failed to produce record %d: %v", i, err)
}
@@ -463,7 +464,7 @@ func TestSeaweedMQHandler_FetchRecords_ErrorHandling(t *testing.T) {
}
// Test with very small maxBytes
_, err = handler.ProduceRecord(topicName, 0, []byte("key"), []byte("value"))
_, err = handler.ProduceRecord(context.Background(), topicName, 0, []byte("key"), []byte("value"))
if err != nil {
t.Fatalf("Failed to produce test record: %v", err)
}
@@ -490,7 +491,7 @@ func TestSeaweedMQHandler_ErrorHandling(t *testing.T) {
defer handler.Close()
// Try to produce to non-existent topic
_, err = handler.ProduceRecord("non-existent-topic", 0, []byte("key"), []byte("value"))
_, err = handler.ProduceRecord(context.Background(), "non-existent-topic", 0, []byte("key"), []byte("value"))
if err == nil {
t.Errorf("Producing to non-existent topic should fail")
}

View File

@@ -132,8 +132,8 @@ type SeaweedMQHandlerInterface interface {
DeleteTopic(topic string) error
GetTopicInfo(topic string) (*integration.KafkaTopicInfo, bool)
// Ledger methods REMOVED - SMQ handles Kafka offsets natively
ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error)
ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error)
ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error)
ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error)
// GetStoredRecords retrieves records from SMQ storage (optional - for advanced implementations)
// ctx is used to control the fetch timeout (should match Kafka fetch request's MaxWaitTime)
GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]integration.SMQRecord, error)
@@ -1060,7 +1060,7 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) {
response, err = h.handleDeleteTopics(req.correlationID, req.requestBody)
case APIKeyProduce:
response, err = h.handleProduce(req.correlationID, req.apiVersion, req.requestBody)
response, err = h.handleProduce(req.ctx, req.correlationID, req.apiVersion, req.requestBody)
case APIKeyFetch:
response, err = h.handleFetch(req.ctx, req.correlationID, req.apiVersion, req.requestBody)

View File

@@ -163,11 +163,11 @@ func (h *FastMockHandler) GetTopicInfo(name string) (*integration.KafkaTopicInfo
return nil, false
}
func (h *FastMockHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
func (h *FastMockHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) {
return 0, fmt.Errorf("not implemented")
}
func (h *FastMockHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
func (h *FastMockHandler) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
return 0, fmt.Errorf("not implemented")
}
@@ -234,11 +234,11 @@ func (h *BlockingMockHandler) GetTopicInfo(name string) (*integration.KafkaTopic
return nil, false
}
func (h *BlockingMockHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
func (h *BlockingMockHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) {
return 0, fmt.Errorf("not implemented")
}
func (h *BlockingMockHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
func (h *BlockingMockHandler) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
return 0, fmt.Errorf("not implemented")
}
@@ -320,11 +320,11 @@ func (h *TimeoutAwareMockHandler) GetTopicInfo(name string) (*integration.KafkaT
return nil, false
}
func (h *TimeoutAwareMockHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
func (h *TimeoutAwareMockHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) {
return 0, fmt.Errorf("not implemented")
}
func (h *TimeoutAwareMockHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
func (h *TimeoutAwareMockHandler) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
return 0, fmt.Errorf("not implemented")
}

View File

@@ -1,6 +1,7 @@
package protocol
import (
"context"
"encoding/binary"
"fmt"
"strings"
@@ -12,20 +13,20 @@ import (
"google.golang.org/protobuf/proto"
)
func (h *Handler) handleProduce(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
func (h *Handler) handleProduce(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
// Version-specific handling
switch apiVersion {
case 0, 1:
return h.handleProduceV0V1(correlationID, apiVersion, requestBody)
return h.handleProduceV0V1(ctx, correlationID, apiVersion, requestBody)
case 2, 3, 4, 5, 6, 7:
return h.handleProduceV2Plus(correlationID, apiVersion, requestBody)
return h.handleProduceV2Plus(ctx, correlationID, apiVersion, requestBody)
default:
return nil, fmt.Errorf("produce version %d not implemented yet", apiVersion)
}
}
func (h *Handler) handleProduceV0V1(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
func (h *Handler) handleProduceV0V1(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
// Parse Produce v0/v1 request
// Request format: client_id + acks(2) + timeout(4) + topics_array
@@ -147,11 +148,11 @@ func (h *Handler) handleProduceV0V1(correlationID uint32, apiVersion uint16, req
// Process the record set
recordCount, _, parseErr := h.parseRecordSet(recordSetData) // totalSize unused
if parseErr != nil {
errorCode = 42 // INVALID_RECORD
} else if recordCount > 0 {
// Use SeaweedMQ integration
offset, err := h.produceToSeaweedMQ(topicName, int32(partitionID), recordSetData)
if err != nil {
errorCode = 42 // INVALID_RECORD
} else if recordCount > 0 {
// Use SeaweedMQ integration
offset, err := h.produceToSeaweedMQ(ctx, topicName, int32(partitionID), recordSetData)
if err != nil {
// Check if this is a schema validation error and add delay to prevent overloading
if h.isSchemaValidationError(err) {
time.Sleep(200 * time.Millisecond) // Brief delay for schema validation failures
@@ -232,7 +233,8 @@ func (h *Handler) parseRecordSet(recordSetData []byte) (recordCount int32, total
}
// produceToSeaweedMQ publishes a single record to SeaweedMQ (simplified for Phase 2)
func (h *Handler) produceToSeaweedMQ(topic string, partition int32, recordSetData []byte) (int64, error) {
// ctx controls the publish timeout - if client cancels, produce operation is cancelled
func (h *Handler) produceToSeaweedMQ(ctx context.Context, topic string, partition int32, recordSetData []byte) (int64, error) {
// Extract all records from the record set and publish each one
// extractAllRecords handles fallback internally for various cases
records := h.extractAllRecords(recordSetData)
@@ -244,7 +246,7 @@ func (h *Handler) produceToSeaweedMQ(topic string, partition int32, recordSetDat
// Publish all records and return the offset of the first record (base offset)
var baseOffset int64
for idx, kv := range records {
offsetProduced, err := h.produceSchemaBasedRecord(topic, partition, kv.Key, kv.Value)
offsetProduced, err := h.produceSchemaBasedRecord(ctx, topic, partition, kv.Key, kv.Value)
if err != nil {
return 0, err
}
@@ -581,7 +583,7 @@ func decodeVarint(data []byte) (int64, int) {
}
// handleProduceV2Plus handles Produce API v2-v7 (Kafka 0.11+)
func (h *Handler) handleProduceV2Plus(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
startTime := time.Now()
// For now, use simplified parsing similar to v0/v1 but handle v2+ response format
@@ -725,7 +727,7 @@ func (h *Handler) handleProduceV2Plus(correlationID uint32, apiVersion uint16, r
} else {
var firstOffsetSet bool
for idx, kv := range records {
offsetProduced, prodErr := h.produceSchemaBasedRecord(topicName, int32(partitionID), kv.Key, kv.Value)
offsetProduced, prodErr := h.produceSchemaBasedRecord(ctx, topicName, int32(partitionID), kv.Key, kv.Value)
if prodErr != nil {
// Check if this is a schema validation error and add delay to prevent overloading
if h.isSchemaValidationError(prodErr) {
@@ -795,7 +797,8 @@ func (h *Handler) handleProduceV2Plus(correlationID uint32, apiVersion uint16, r
}
// processSchematizedMessage processes a message that may contain schema information
func (h *Handler) processSchematizedMessage(topicName string, partitionID int32, originalKey []byte, messageBytes []byte) error {
// ctx controls the publish timeout - if client cancels, process operation is cancelled
func (h *Handler) processSchematizedMessage(ctx context.Context, topicName string, partitionID int32, originalKey []byte, messageBytes []byte) error {
// System topics should bypass schema processing entirely
if h.isSystemTopic(topicName) {
return nil // Skip schema processing for system topics
@@ -820,11 +823,12 @@ func (h *Handler) processSchematizedMessage(topicName string, partitionID int32,
}
// Store the decoded message using SeaweedMQ
return h.storeDecodedMessage(topicName, partitionID, originalKey, decodedMsg)
return h.storeDecodedMessage(ctx, topicName, partitionID, originalKey, decodedMsg)
}
// storeDecodedMessage stores a decoded message using mq.broker integration
func (h *Handler) storeDecodedMessage(topicName string, partitionID int32, originalKey []byte, decodedMsg *schema.DecodedMessage) error {
// ctx controls the publish timeout - if client cancels, store operation is cancelled
func (h *Handler) storeDecodedMessage(ctx context.Context, topicName string, partitionID int32, originalKey []byte, decodedMsg *schema.DecodedMessage) error {
// Use broker client if available
if h.IsBrokerIntegrationEnabled() {
// Use the original Kafka message key
@@ -853,7 +857,7 @@ func (h *Handler) storeDecodedMessage(topicName string, partitionID int32, origi
// NOT just the Avro payload, so we can return them as-is during fetch without re-encoding
value := decodedMsg.Envelope.OriginalBytes
_, err := h.seaweedMQHandler.ProduceRecord(topicName, partitionID, key, value)
_, err := h.seaweedMQHandler.ProduceRecord(ctx, topicName, partitionID, key, value)
if err != nil {
return fmt.Errorf("failed to produce to SeaweedMQ: %w", err)
}
@@ -1141,18 +1145,19 @@ func (h *Handler) isSystemTopic(topicName string) bool {
}
// produceSchemaBasedRecord produces a record using schema-based encoding to RecordValue
func (h *Handler) produceSchemaBasedRecord(topic string, partition int32, key []byte, value []byte) (int64, error) {
// ctx controls the publish timeout - if client cancels, produce operation is cancelled
func (h *Handler) produceSchemaBasedRecord(ctx context.Context, topic string, partition int32, key []byte, value []byte) (int64, error) {
// System topics should always bypass schema processing and be stored as-is
if h.isSystemTopic(topic) {
offset, err := h.seaweedMQHandler.ProduceRecord(topic, partition, key, value)
offset, err := h.seaweedMQHandler.ProduceRecord(ctx, topic, partition, key, value)
return offset, err
}
// If schema management is not enabled, fall back to raw message handling
isEnabled := h.IsSchemaEnabled()
if !isEnabled {
return h.seaweedMQHandler.ProduceRecord(topic, partition, key, value)
return h.seaweedMQHandler.ProduceRecord(ctx, topic, partition, key, value)
}
var keyDecodedMsg *schema.DecodedMessage
@@ -1190,7 +1195,7 @@ func (h *Handler) produceSchemaBasedRecord(topic string, partition int32, key []
// If neither key nor value is schematized, fall back to raw message handling
// This is OK for non-schematized messages (no magic byte 0x00)
if keyDecodedMsg == nil && valueDecodedMsg == nil {
return h.seaweedMQHandler.ProduceRecord(topic, partition, key, value)
return h.seaweedMQHandler.ProduceRecord(ctx, topic, partition, key, value)
}
// Process key schema if present
@@ -1261,10 +1266,10 @@ func (h *Handler) produceSchemaBasedRecord(topic string, partition int32, key []
// CRITICAL FIX: Store the DECODED RecordValue (not the original Confluent Wire Format)
// This enables SQL queries to work properly. Kafka consumers will receive the RecordValue
// which can be re-encoded to Confluent Wire Format during fetch if needed
return h.seaweedMQHandler.ProduceRecordValue(topic, partition, finalKey, recordValueBytes)
return h.seaweedMQHandler.ProduceRecordValue(ctx, topic, partition, finalKey, recordValueBytes)
} else {
// Send with raw format for non-schematized data
return h.seaweedMQHandler.ProduceRecord(topic, partition, finalKey, recordValueBytes)
return h.seaweedMQHandler.ProduceRecord(ctx, topic, partition, finalKey, recordValueBytes)
}
}