mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-11-08 22:44:45 +08:00
feat: add context timeout propagation to produce path
This commit adds proper context propagation throughout the produce path, enabling client-side timeouts to be honored on the broker side. Previously, only fetch operations respected client timeouts - produce operations continued indefinitely even if the client gave up. Changes: - Add ctx parameter to ProduceRecord and ProduceRecordValue signatures - Add ctx parameter to PublishRecord and PublishRecordValue in BrokerClient - Add ctx parameter to handleProduce and related internal functions - Update all callers (protocol handlers, mocks, tests) to pass context - Add context cancellation checks in PublishRecord before operations Benefits: - Faster failure detection when client times out - No orphaned publish operations consuming broker resources - Resource efficiency improvements (no goroutine/stream/lock leaks) - Consistent timeout behavior between produce and fetch paths - Better error handling with proper cancellation signals This fixes the root cause of CI test timeouts where produce operations continued indefinitely after clients gave up, leading to cascading delays.
This commit is contained in:
@@ -98,7 +98,7 @@ func (m *mockSeaweedMQHandler) GetTopicInfo(topic string) (*integration.KafkaTop
|
|||||||
return info, exists
|
return info, exists
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockSeaweedMQHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
|
func (m *mockSeaweedMQHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) {
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
defer m.mu.Unlock()
|
defer m.mu.Unlock()
|
||||||
|
|
||||||
@@ -128,8 +128,8 @@ func (m *mockSeaweedMQHandler) ProduceRecord(topicName string, partitionID int32
|
|||||||
return offset, nil
|
return offset, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockSeaweedMQHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
|
func (m *mockSeaweedMQHandler) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
|
||||||
return m.ProduceRecord(topicName, partitionID, key, recordValueBytes)
|
return m.ProduceRecord(ctx, topicName, partitionID, key, recordValueBytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockSeaweedMQHandler) GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]integration.SMQRecord, error) {
|
func (m *mockSeaweedMQHandler) GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]integration.SMQRecord, error) {
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package integration
|
package integration
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
@@ -10,7 +11,12 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// PublishRecord publishes a single record to SeaweedMQ broker
|
// PublishRecord publishes a single record to SeaweedMQ broker
|
||||||
func (bc *BrokerClient) PublishRecord(topic string, partition int32, key []byte, value []byte, timestamp int64) (int64, error) {
|
// ctx controls the publish timeout - if client cancels, publish operation is cancelled
|
||||||
|
func (bc *BrokerClient) PublishRecord(ctx context.Context, topic string, partition int32, key []byte, value []byte, timestamp int64) (int64, error) {
|
||||||
|
// Check context before starting
|
||||||
|
if err := ctx.Err(); err != nil {
|
||||||
|
return 0, fmt.Errorf("context cancelled before publish: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
session, err := bc.getOrCreatePublisher(topic, partition)
|
session, err := bc.getOrCreatePublisher(topic, partition)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -26,6 +32,11 @@ func (bc *BrokerClient) PublishRecord(topic string, partition int32, key []byte,
|
|||||||
session.mu.Lock()
|
session.mu.Lock()
|
||||||
defer session.mu.Unlock()
|
defer session.mu.Unlock()
|
||||||
|
|
||||||
|
// Check context after acquiring lock
|
||||||
|
if err := ctx.Err(); err != nil {
|
||||||
|
return 0, fmt.Errorf("context cancelled after lock: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Send data message using broker API format
|
// Send data message using broker API format
|
||||||
dataMsg := &mq_pb.DataMessage{
|
dataMsg := &mq_pb.DataMessage{
|
||||||
Key: key,
|
Key: key,
|
||||||
@@ -68,7 +79,13 @@ func (bc *BrokerClient) PublishRecord(topic string, partition int32, key []byte,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// PublishRecordValue publishes a RecordValue message to SeaweedMQ via broker
|
// PublishRecordValue publishes a RecordValue message to SeaweedMQ via broker
|
||||||
func (bc *BrokerClient) PublishRecordValue(topic string, partition int32, key []byte, recordValueBytes []byte, timestamp int64) (int64, error) {
|
// ctx controls the publish timeout - if client cancels, publish operation is cancelled
|
||||||
|
func (bc *BrokerClient) PublishRecordValue(ctx context.Context, topic string, partition int32, key []byte, recordValueBytes []byte, timestamp int64) (int64, error) {
|
||||||
|
// Check context before starting
|
||||||
|
if err := ctx.Err(); err != nil {
|
||||||
|
return 0, fmt.Errorf("context cancelled before publish: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
session, err := bc.getOrCreatePublisher(topic, partition)
|
session, err := bc.getOrCreatePublisher(topic, partition)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
@@ -82,6 +99,11 @@ func (bc *BrokerClient) PublishRecordValue(topic string, partition int32, key []
|
|||||||
session.mu.Lock()
|
session.mu.Lock()
|
||||||
defer session.mu.Unlock()
|
defer session.mu.Unlock()
|
||||||
|
|
||||||
|
// Check context after acquiring lock
|
||||||
|
if err := ctx.Err(); err != nil {
|
||||||
|
return 0, fmt.Errorf("context cancelled after lock: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Send data message with RecordValue in the Value field
|
// Send data message with RecordValue in the Value field
|
||||||
dataMsg := &mq_pb.DataMessage{
|
dataMsg := &mq_pb.DataMessage{
|
||||||
Key: key,
|
Key: key,
|
||||||
|
|||||||
@@ -216,7 +216,8 @@ func (h *SeaweedMQHandler) GetFilerAddress() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ProduceRecord publishes a record to SeaweedMQ and lets SMQ generate the offset
|
// ProduceRecord publishes a record to SeaweedMQ and lets SMQ generate the offset
|
||||||
func (h *SeaweedMQHandler) ProduceRecord(topic string, partition int32, key []byte, value []byte) (int64, error) {
|
// ctx controls the publish timeout - if client cancels, broker operation is cancelled
|
||||||
|
func (h *SeaweedMQHandler) ProduceRecord(ctx context.Context, topic string, partition int32, key []byte, value []byte) (int64, error) {
|
||||||
if len(key) > 0 {
|
if len(key) > 0 {
|
||||||
}
|
}
|
||||||
if len(value) > 0 {
|
if len(value) > 0 {
|
||||||
@@ -237,7 +238,7 @@ func (h *SeaweedMQHandler) ProduceRecord(topic string, partition int32, key []by
|
|||||||
if h.brokerClient == nil {
|
if h.brokerClient == nil {
|
||||||
publishErr = fmt.Errorf("no broker client available")
|
publishErr = fmt.Errorf("no broker client available")
|
||||||
} else {
|
} else {
|
||||||
smqOffset, publishErr = h.brokerClient.PublishRecord(topic, partition, key, value, timestamp)
|
smqOffset, publishErr = h.brokerClient.PublishRecord(ctx, topic, partition, key, value, timestamp)
|
||||||
}
|
}
|
||||||
|
|
||||||
if publishErr != nil {
|
if publishErr != nil {
|
||||||
@@ -258,7 +259,8 @@ func (h *SeaweedMQHandler) ProduceRecord(topic string, partition int32, key []by
|
|||||||
|
|
||||||
// ProduceRecordValue produces a record using RecordValue format to SeaweedMQ
|
// ProduceRecordValue produces a record using RecordValue format to SeaweedMQ
|
||||||
// ALWAYS uses broker's assigned offset - no ledger involved
|
// ALWAYS uses broker's assigned offset - no ledger involved
|
||||||
func (h *SeaweedMQHandler) ProduceRecordValue(topic string, partition int32, key []byte, recordValueBytes []byte) (int64, error) {
|
// ctx controls the publish timeout - if client cancels, broker operation is cancelled
|
||||||
|
func (h *SeaweedMQHandler) ProduceRecordValue(ctx context.Context, topic string, partition int32, key []byte, recordValueBytes []byte) (int64, error) {
|
||||||
// Verify topic exists
|
// Verify topic exists
|
||||||
if !h.TopicExists(topic) {
|
if !h.TopicExists(topic) {
|
||||||
return 0, fmt.Errorf("topic %s does not exist", topic)
|
return 0, fmt.Errorf("topic %s does not exist", topic)
|
||||||
@@ -273,7 +275,7 @@ func (h *SeaweedMQHandler) ProduceRecordValue(topic string, partition int32, key
|
|||||||
if h.brokerClient == nil {
|
if h.brokerClient == nil {
|
||||||
publishErr = fmt.Errorf("no broker client available")
|
publishErr = fmt.Errorf("no broker client available")
|
||||||
} else {
|
} else {
|
||||||
smqOffset, publishErr = h.brokerClient.PublishRecordValue(topic, partition, key, recordValueBytes, timestamp)
|
smqOffset, publishErr = h.brokerClient.PublishRecordValue(ctx, topic, partition, key, recordValueBytes, timestamp)
|
||||||
}
|
}
|
||||||
|
|
||||||
if publishErr != nil {
|
if publishErr != nil {
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package integration
|
package integration
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@@ -269,7 +270,7 @@ func TestSeaweedMQHandler_ProduceRecord(t *testing.T) {
|
|||||||
key := []byte("produce-key")
|
key := []byte("produce-key")
|
||||||
value := []byte("produce-value")
|
value := []byte("produce-value")
|
||||||
|
|
||||||
offset, err := handler.ProduceRecord(topicName, 0, key, value)
|
offset, err := handler.ProduceRecord(context.Background(), topicName, 0, key, value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to produce record: %v", err)
|
t.Fatalf("Failed to produce record: %v", err)
|
||||||
}
|
}
|
||||||
@@ -316,7 +317,7 @@ func TestSeaweedMQHandler_MultiplePartitions(t *testing.T) {
|
|||||||
key := []byte("partition-key")
|
key := []byte("partition-key")
|
||||||
value := []byte("partition-value")
|
value := []byte("partition-value")
|
||||||
|
|
||||||
offset, err := handler.ProduceRecord(topicName, partitionID, key, value)
|
offset, err := handler.ProduceRecord(context.Background(), topicName, partitionID, key, value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to produce to partition %d: %v", partitionID, err)
|
t.Fatalf("Failed to produce to partition %d: %v", partitionID, err)
|
||||||
}
|
}
|
||||||
@@ -366,7 +367,7 @@ func TestSeaweedMQHandler_FetchRecords(t *testing.T) {
|
|||||||
|
|
||||||
var producedOffsets []int64
|
var producedOffsets []int64
|
||||||
for i, record := range testRecords {
|
for i, record := range testRecords {
|
||||||
offset, err := handler.ProduceRecord(topicName, 0, []byte(record.key), []byte(record.value))
|
offset, err := handler.ProduceRecord(context.Background(), topicName, 0, []byte(record.key), []byte(record.value))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to produce record %d: %v", i, err)
|
t.Fatalf("Failed to produce record %d: %v", i, err)
|
||||||
}
|
}
|
||||||
@@ -463,7 +464,7 @@ func TestSeaweedMQHandler_FetchRecords_ErrorHandling(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Test with very small maxBytes
|
// Test with very small maxBytes
|
||||||
_, err = handler.ProduceRecord(topicName, 0, []byte("key"), []byte("value"))
|
_, err = handler.ProduceRecord(context.Background(), topicName, 0, []byte("key"), []byte("value"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to produce test record: %v", err)
|
t.Fatalf("Failed to produce test record: %v", err)
|
||||||
}
|
}
|
||||||
@@ -490,7 +491,7 @@ func TestSeaweedMQHandler_ErrorHandling(t *testing.T) {
|
|||||||
defer handler.Close()
|
defer handler.Close()
|
||||||
|
|
||||||
// Try to produce to non-existent topic
|
// Try to produce to non-existent topic
|
||||||
_, err = handler.ProduceRecord("non-existent-topic", 0, []byte("key"), []byte("value"))
|
_, err = handler.ProduceRecord(context.Background(), "non-existent-topic", 0, []byte("key"), []byte("value"))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("Producing to non-existent topic should fail")
|
t.Errorf("Producing to non-existent topic should fail")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -132,8 +132,8 @@ type SeaweedMQHandlerInterface interface {
|
|||||||
DeleteTopic(topic string) error
|
DeleteTopic(topic string) error
|
||||||
GetTopicInfo(topic string) (*integration.KafkaTopicInfo, bool)
|
GetTopicInfo(topic string) (*integration.KafkaTopicInfo, bool)
|
||||||
// Ledger methods REMOVED - SMQ handles Kafka offsets natively
|
// Ledger methods REMOVED - SMQ handles Kafka offsets natively
|
||||||
ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error)
|
ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error)
|
||||||
ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error)
|
ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error)
|
||||||
// GetStoredRecords retrieves records from SMQ storage (optional - for advanced implementations)
|
// GetStoredRecords retrieves records from SMQ storage (optional - for advanced implementations)
|
||||||
// ctx is used to control the fetch timeout (should match Kafka fetch request's MaxWaitTime)
|
// ctx is used to control the fetch timeout (should match Kafka fetch request's MaxWaitTime)
|
||||||
GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]integration.SMQRecord, error)
|
GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]integration.SMQRecord, error)
|
||||||
@@ -1060,7 +1060,7 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) {
|
|||||||
response, err = h.handleDeleteTopics(req.correlationID, req.requestBody)
|
response, err = h.handleDeleteTopics(req.correlationID, req.requestBody)
|
||||||
|
|
||||||
case APIKeyProduce:
|
case APIKeyProduce:
|
||||||
response, err = h.handleProduce(req.correlationID, req.apiVersion, req.requestBody)
|
response, err = h.handleProduce(req.ctx, req.correlationID, req.apiVersion, req.requestBody)
|
||||||
|
|
||||||
case APIKeyFetch:
|
case APIKeyFetch:
|
||||||
response, err = h.handleFetch(req.ctx, req.correlationID, req.apiVersion, req.requestBody)
|
response, err = h.handleFetch(req.ctx, req.correlationID, req.apiVersion, req.requestBody)
|
||||||
|
|||||||
@@ -163,11 +163,11 @@ func (h *FastMockHandler) GetTopicInfo(name string) (*integration.KafkaTopicInfo
|
|||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *FastMockHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
|
func (h *FastMockHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) {
|
||||||
return 0, fmt.Errorf("not implemented")
|
return 0, fmt.Errorf("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *FastMockHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
|
func (h *FastMockHandler) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
|
||||||
return 0, fmt.Errorf("not implemented")
|
return 0, fmt.Errorf("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -234,11 +234,11 @@ func (h *BlockingMockHandler) GetTopicInfo(name string) (*integration.KafkaTopic
|
|||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *BlockingMockHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
|
func (h *BlockingMockHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) {
|
||||||
return 0, fmt.Errorf("not implemented")
|
return 0, fmt.Errorf("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *BlockingMockHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
|
func (h *BlockingMockHandler) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
|
||||||
return 0, fmt.Errorf("not implemented")
|
return 0, fmt.Errorf("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -320,11 +320,11 @@ func (h *TimeoutAwareMockHandler) GetTopicInfo(name string) (*integration.KafkaT
|
|||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *TimeoutAwareMockHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
|
func (h *TimeoutAwareMockHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) {
|
||||||
return 0, fmt.Errorf("not implemented")
|
return 0, fmt.Errorf("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *TimeoutAwareMockHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
|
func (h *TimeoutAwareMockHandler) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
|
||||||
return 0, fmt.Errorf("not implemented")
|
return 0, fmt.Errorf("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package protocol
|
package protocol
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -12,20 +13,20 @@ import (
|
|||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (h *Handler) handleProduce(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
|
func (h *Handler) handleProduce(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
|
||||||
|
|
||||||
// Version-specific handling
|
// Version-specific handling
|
||||||
switch apiVersion {
|
switch apiVersion {
|
||||||
case 0, 1:
|
case 0, 1:
|
||||||
return h.handleProduceV0V1(correlationID, apiVersion, requestBody)
|
return h.handleProduceV0V1(ctx, correlationID, apiVersion, requestBody)
|
||||||
case 2, 3, 4, 5, 6, 7:
|
case 2, 3, 4, 5, 6, 7:
|
||||||
return h.handleProduceV2Plus(correlationID, apiVersion, requestBody)
|
return h.handleProduceV2Plus(ctx, correlationID, apiVersion, requestBody)
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("produce version %d not implemented yet", apiVersion)
|
return nil, fmt.Errorf("produce version %d not implemented yet", apiVersion)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) handleProduceV0V1(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
|
func (h *Handler) handleProduceV0V1(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
|
||||||
// Parse Produce v0/v1 request
|
// Parse Produce v0/v1 request
|
||||||
// Request format: client_id + acks(2) + timeout(4) + topics_array
|
// Request format: client_id + acks(2) + timeout(4) + topics_array
|
||||||
|
|
||||||
@@ -147,11 +148,11 @@ func (h *Handler) handleProduceV0V1(correlationID uint32, apiVersion uint16, req
|
|||||||
// Process the record set
|
// Process the record set
|
||||||
recordCount, _, parseErr := h.parseRecordSet(recordSetData) // totalSize unused
|
recordCount, _, parseErr := h.parseRecordSet(recordSetData) // totalSize unused
|
||||||
if parseErr != nil {
|
if parseErr != nil {
|
||||||
errorCode = 42 // INVALID_RECORD
|
errorCode = 42 // INVALID_RECORD
|
||||||
} else if recordCount > 0 {
|
} else if recordCount > 0 {
|
||||||
// Use SeaweedMQ integration
|
// Use SeaweedMQ integration
|
||||||
offset, err := h.produceToSeaweedMQ(topicName, int32(partitionID), recordSetData)
|
offset, err := h.produceToSeaweedMQ(ctx, topicName, int32(partitionID), recordSetData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Check if this is a schema validation error and add delay to prevent overloading
|
// Check if this is a schema validation error and add delay to prevent overloading
|
||||||
if h.isSchemaValidationError(err) {
|
if h.isSchemaValidationError(err) {
|
||||||
time.Sleep(200 * time.Millisecond) // Brief delay for schema validation failures
|
time.Sleep(200 * time.Millisecond) // Brief delay for schema validation failures
|
||||||
@@ -232,7 +233,8 @@ func (h *Handler) parseRecordSet(recordSetData []byte) (recordCount int32, total
|
|||||||
}
|
}
|
||||||
|
|
||||||
// produceToSeaweedMQ publishes a single record to SeaweedMQ (simplified for Phase 2)
|
// produceToSeaweedMQ publishes a single record to SeaweedMQ (simplified for Phase 2)
|
||||||
func (h *Handler) produceToSeaweedMQ(topic string, partition int32, recordSetData []byte) (int64, error) {
|
// ctx controls the publish timeout - if client cancels, produce operation is cancelled
|
||||||
|
func (h *Handler) produceToSeaweedMQ(ctx context.Context, topic string, partition int32, recordSetData []byte) (int64, error) {
|
||||||
// Extract all records from the record set and publish each one
|
// Extract all records from the record set and publish each one
|
||||||
// extractAllRecords handles fallback internally for various cases
|
// extractAllRecords handles fallback internally for various cases
|
||||||
records := h.extractAllRecords(recordSetData)
|
records := h.extractAllRecords(recordSetData)
|
||||||
@@ -244,7 +246,7 @@ func (h *Handler) produceToSeaweedMQ(topic string, partition int32, recordSetDat
|
|||||||
// Publish all records and return the offset of the first record (base offset)
|
// Publish all records and return the offset of the first record (base offset)
|
||||||
var baseOffset int64
|
var baseOffset int64
|
||||||
for idx, kv := range records {
|
for idx, kv := range records {
|
||||||
offsetProduced, err := h.produceSchemaBasedRecord(topic, partition, kv.Key, kv.Value)
|
offsetProduced, err := h.produceSchemaBasedRecord(ctx, topic, partition, kv.Key, kv.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
@@ -581,7 +583,7 @@ func decodeVarint(data []byte) (int64, int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// handleProduceV2Plus handles Produce API v2-v7 (Kafka 0.11+)
|
// handleProduceV2Plus handles Produce API v2-v7 (Kafka 0.11+)
|
||||||
func (h *Handler) handleProduceV2Plus(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
|
func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
|
|
||||||
// For now, use simplified parsing similar to v0/v1 but handle v2+ response format
|
// For now, use simplified parsing similar to v0/v1 but handle v2+ response format
|
||||||
@@ -725,7 +727,7 @@ func (h *Handler) handleProduceV2Plus(correlationID uint32, apiVersion uint16, r
|
|||||||
} else {
|
} else {
|
||||||
var firstOffsetSet bool
|
var firstOffsetSet bool
|
||||||
for idx, kv := range records {
|
for idx, kv := range records {
|
||||||
offsetProduced, prodErr := h.produceSchemaBasedRecord(topicName, int32(partitionID), kv.Key, kv.Value)
|
offsetProduced, prodErr := h.produceSchemaBasedRecord(ctx, topicName, int32(partitionID), kv.Key, kv.Value)
|
||||||
if prodErr != nil {
|
if prodErr != nil {
|
||||||
// Check if this is a schema validation error and add delay to prevent overloading
|
// Check if this is a schema validation error and add delay to prevent overloading
|
||||||
if h.isSchemaValidationError(prodErr) {
|
if h.isSchemaValidationError(prodErr) {
|
||||||
@@ -795,7 +797,8 @@ func (h *Handler) handleProduceV2Plus(correlationID uint32, apiVersion uint16, r
|
|||||||
}
|
}
|
||||||
|
|
||||||
// processSchematizedMessage processes a message that may contain schema information
|
// processSchematizedMessage processes a message that may contain schema information
|
||||||
func (h *Handler) processSchematizedMessage(topicName string, partitionID int32, originalKey []byte, messageBytes []byte) error {
|
// ctx controls the publish timeout - if client cancels, process operation is cancelled
|
||||||
|
func (h *Handler) processSchematizedMessage(ctx context.Context, topicName string, partitionID int32, originalKey []byte, messageBytes []byte) error {
|
||||||
// System topics should bypass schema processing entirely
|
// System topics should bypass schema processing entirely
|
||||||
if h.isSystemTopic(topicName) {
|
if h.isSystemTopic(topicName) {
|
||||||
return nil // Skip schema processing for system topics
|
return nil // Skip schema processing for system topics
|
||||||
@@ -820,11 +823,12 @@ func (h *Handler) processSchematizedMessage(topicName string, partitionID int32,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Store the decoded message using SeaweedMQ
|
// Store the decoded message using SeaweedMQ
|
||||||
return h.storeDecodedMessage(topicName, partitionID, originalKey, decodedMsg)
|
return h.storeDecodedMessage(ctx, topicName, partitionID, originalKey, decodedMsg)
|
||||||
}
|
}
|
||||||
|
|
||||||
// storeDecodedMessage stores a decoded message using mq.broker integration
|
// storeDecodedMessage stores a decoded message using mq.broker integration
|
||||||
func (h *Handler) storeDecodedMessage(topicName string, partitionID int32, originalKey []byte, decodedMsg *schema.DecodedMessage) error {
|
// ctx controls the publish timeout - if client cancels, store operation is cancelled
|
||||||
|
func (h *Handler) storeDecodedMessage(ctx context.Context, topicName string, partitionID int32, originalKey []byte, decodedMsg *schema.DecodedMessage) error {
|
||||||
// Use broker client if available
|
// Use broker client if available
|
||||||
if h.IsBrokerIntegrationEnabled() {
|
if h.IsBrokerIntegrationEnabled() {
|
||||||
// Use the original Kafka message key
|
// Use the original Kafka message key
|
||||||
@@ -853,7 +857,7 @@ func (h *Handler) storeDecodedMessage(topicName string, partitionID int32, origi
|
|||||||
// NOT just the Avro payload, so we can return them as-is during fetch without re-encoding
|
// NOT just the Avro payload, so we can return them as-is during fetch without re-encoding
|
||||||
value := decodedMsg.Envelope.OriginalBytes
|
value := decodedMsg.Envelope.OriginalBytes
|
||||||
|
|
||||||
_, err := h.seaweedMQHandler.ProduceRecord(topicName, partitionID, key, value)
|
_, err := h.seaweedMQHandler.ProduceRecord(ctx, topicName, partitionID, key, value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to produce to SeaweedMQ: %w", err)
|
return fmt.Errorf("failed to produce to SeaweedMQ: %w", err)
|
||||||
}
|
}
|
||||||
@@ -1141,18 +1145,19 @@ func (h *Handler) isSystemTopic(topicName string) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// produceSchemaBasedRecord produces a record using schema-based encoding to RecordValue
|
// produceSchemaBasedRecord produces a record using schema-based encoding to RecordValue
|
||||||
func (h *Handler) produceSchemaBasedRecord(topic string, partition int32, key []byte, value []byte) (int64, error) {
|
// ctx controls the publish timeout - if client cancels, produce operation is cancelled
|
||||||
|
func (h *Handler) produceSchemaBasedRecord(ctx context.Context, topic string, partition int32, key []byte, value []byte) (int64, error) {
|
||||||
|
|
||||||
// System topics should always bypass schema processing and be stored as-is
|
// System topics should always bypass schema processing and be stored as-is
|
||||||
if h.isSystemTopic(topic) {
|
if h.isSystemTopic(topic) {
|
||||||
offset, err := h.seaweedMQHandler.ProduceRecord(topic, partition, key, value)
|
offset, err := h.seaweedMQHandler.ProduceRecord(ctx, topic, partition, key, value)
|
||||||
return offset, err
|
return offset, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// If schema management is not enabled, fall back to raw message handling
|
// If schema management is not enabled, fall back to raw message handling
|
||||||
isEnabled := h.IsSchemaEnabled()
|
isEnabled := h.IsSchemaEnabled()
|
||||||
if !isEnabled {
|
if !isEnabled {
|
||||||
return h.seaweedMQHandler.ProduceRecord(topic, partition, key, value)
|
return h.seaweedMQHandler.ProduceRecord(ctx, topic, partition, key, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
var keyDecodedMsg *schema.DecodedMessage
|
var keyDecodedMsg *schema.DecodedMessage
|
||||||
@@ -1190,7 +1195,7 @@ func (h *Handler) produceSchemaBasedRecord(topic string, partition int32, key []
|
|||||||
// If neither key nor value is schematized, fall back to raw message handling
|
// If neither key nor value is schematized, fall back to raw message handling
|
||||||
// This is OK for non-schematized messages (no magic byte 0x00)
|
// This is OK for non-schematized messages (no magic byte 0x00)
|
||||||
if keyDecodedMsg == nil && valueDecodedMsg == nil {
|
if keyDecodedMsg == nil && valueDecodedMsg == nil {
|
||||||
return h.seaweedMQHandler.ProduceRecord(topic, partition, key, value)
|
return h.seaweedMQHandler.ProduceRecord(ctx, topic, partition, key, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process key schema if present
|
// Process key schema if present
|
||||||
@@ -1261,10 +1266,10 @@ func (h *Handler) produceSchemaBasedRecord(topic string, partition int32, key []
|
|||||||
// CRITICAL FIX: Store the DECODED RecordValue (not the original Confluent Wire Format)
|
// CRITICAL FIX: Store the DECODED RecordValue (not the original Confluent Wire Format)
|
||||||
// This enables SQL queries to work properly. Kafka consumers will receive the RecordValue
|
// This enables SQL queries to work properly. Kafka consumers will receive the RecordValue
|
||||||
// which can be re-encoded to Confluent Wire Format during fetch if needed
|
// which can be re-encoded to Confluent Wire Format during fetch if needed
|
||||||
return h.seaweedMQHandler.ProduceRecordValue(topic, partition, finalKey, recordValueBytes)
|
return h.seaweedMQHandler.ProduceRecordValue(ctx, topic, partition, finalKey, recordValueBytes)
|
||||||
} else {
|
} else {
|
||||||
// Send with raw format for non-schematized data
|
// Send with raw format for non-schematized data
|
||||||
return h.seaweedMQHandler.ProduceRecord(topic, partition, finalKey, recordValueBytes)
|
return h.seaweedMQHandler.ProduceRecord(ctx, topic, partition, finalKey, recordValueBytes)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user