mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-11-07 23:46:13 +08:00
feat: add context timeout propagation to produce path
This commit adds proper context propagation throughout the produce path, enabling client-side timeouts to be honored on the broker side. Previously, only fetch operations respected client timeouts - produce operations continued indefinitely even if the client gave up. Changes: - Add ctx parameter to ProduceRecord and ProduceRecordValue signatures - Add ctx parameter to PublishRecord and PublishRecordValue in BrokerClient - Add ctx parameter to handleProduce and related internal functions - Update all callers (protocol handlers, mocks, tests) to pass context - Add context cancellation checks in PublishRecord before operations Benefits: - Faster failure detection when client times out - No orphaned publish operations consuming broker resources - Resource efficiency improvements (no goroutine/stream/lock leaks) - Consistent timeout behavior between produce and fetch paths - Better error handling with proper cancellation signals This fixes the root cause of CI test timeouts where produce operations continued indefinitely after clients gave up, leading to cascading delays.
This commit is contained in:
@@ -98,7 +98,7 @@ func (m *mockSeaweedMQHandler) GetTopicInfo(topic string) (*integration.KafkaTop
|
||||
return info, exists
|
||||
}
|
||||
|
||||
func (m *mockSeaweedMQHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
|
||||
func (m *mockSeaweedMQHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
@@ -128,8 +128,8 @@ func (m *mockSeaweedMQHandler) ProduceRecord(topicName string, partitionID int32
|
||||
return offset, nil
|
||||
}
|
||||
|
||||
func (m *mockSeaweedMQHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
|
||||
return m.ProduceRecord(topicName, partitionID, key, recordValueBytes)
|
||||
func (m *mockSeaweedMQHandler) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
|
||||
return m.ProduceRecord(ctx, topicName, partitionID, key, recordValueBytes)
|
||||
}
|
||||
|
||||
func (m *mockSeaweedMQHandler) GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]integration.SMQRecord, error) {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package integration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
@@ -10,7 +11,12 @@ import (
|
||||
)
|
||||
|
||||
// PublishRecord publishes a single record to SeaweedMQ broker
|
||||
func (bc *BrokerClient) PublishRecord(topic string, partition int32, key []byte, value []byte, timestamp int64) (int64, error) {
|
||||
// ctx controls the publish timeout - if client cancels, publish operation is cancelled
|
||||
func (bc *BrokerClient) PublishRecord(ctx context.Context, topic string, partition int32, key []byte, value []byte, timestamp int64) (int64, error) {
|
||||
// Check context before starting
|
||||
if err := ctx.Err(); err != nil {
|
||||
return 0, fmt.Errorf("context cancelled before publish: %w", err)
|
||||
}
|
||||
|
||||
session, err := bc.getOrCreatePublisher(topic, partition)
|
||||
if err != nil {
|
||||
@@ -26,6 +32,11 @@ func (bc *BrokerClient) PublishRecord(topic string, partition int32, key []byte,
|
||||
session.mu.Lock()
|
||||
defer session.mu.Unlock()
|
||||
|
||||
// Check context after acquiring lock
|
||||
if err := ctx.Err(); err != nil {
|
||||
return 0, fmt.Errorf("context cancelled after lock: %w", err)
|
||||
}
|
||||
|
||||
// Send data message using broker API format
|
||||
dataMsg := &mq_pb.DataMessage{
|
||||
Key: key,
|
||||
@@ -68,7 +79,13 @@ func (bc *BrokerClient) PublishRecord(topic string, partition int32, key []byte,
|
||||
}
|
||||
|
||||
// PublishRecordValue publishes a RecordValue message to SeaweedMQ via broker
|
||||
func (bc *BrokerClient) PublishRecordValue(topic string, partition int32, key []byte, recordValueBytes []byte, timestamp int64) (int64, error) {
|
||||
// ctx controls the publish timeout - if client cancels, publish operation is cancelled
|
||||
func (bc *BrokerClient) PublishRecordValue(ctx context.Context, topic string, partition int32, key []byte, recordValueBytes []byte, timestamp int64) (int64, error) {
|
||||
// Check context before starting
|
||||
if err := ctx.Err(); err != nil {
|
||||
return 0, fmt.Errorf("context cancelled before publish: %w", err)
|
||||
}
|
||||
|
||||
session, err := bc.getOrCreatePublisher(topic, partition)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@@ -82,6 +99,11 @@ func (bc *BrokerClient) PublishRecordValue(topic string, partition int32, key []
|
||||
session.mu.Lock()
|
||||
defer session.mu.Unlock()
|
||||
|
||||
// Check context after acquiring lock
|
||||
if err := ctx.Err(); err != nil {
|
||||
return 0, fmt.Errorf("context cancelled after lock: %w", err)
|
||||
}
|
||||
|
||||
// Send data message with RecordValue in the Value field
|
||||
dataMsg := &mq_pb.DataMessage{
|
||||
Key: key,
|
||||
|
||||
@@ -216,7 +216,8 @@ func (h *SeaweedMQHandler) GetFilerAddress() string {
|
||||
}
|
||||
|
||||
// ProduceRecord publishes a record to SeaweedMQ and lets SMQ generate the offset
|
||||
func (h *SeaweedMQHandler) ProduceRecord(topic string, partition int32, key []byte, value []byte) (int64, error) {
|
||||
// ctx controls the publish timeout - if client cancels, broker operation is cancelled
|
||||
func (h *SeaweedMQHandler) ProduceRecord(ctx context.Context, topic string, partition int32, key []byte, value []byte) (int64, error) {
|
||||
if len(key) > 0 {
|
||||
}
|
||||
if len(value) > 0 {
|
||||
@@ -237,7 +238,7 @@ func (h *SeaweedMQHandler) ProduceRecord(topic string, partition int32, key []by
|
||||
if h.brokerClient == nil {
|
||||
publishErr = fmt.Errorf("no broker client available")
|
||||
} else {
|
||||
smqOffset, publishErr = h.brokerClient.PublishRecord(topic, partition, key, value, timestamp)
|
||||
smqOffset, publishErr = h.brokerClient.PublishRecord(ctx, topic, partition, key, value, timestamp)
|
||||
}
|
||||
|
||||
if publishErr != nil {
|
||||
@@ -258,7 +259,8 @@ func (h *SeaweedMQHandler) ProduceRecord(topic string, partition int32, key []by
|
||||
|
||||
// ProduceRecordValue produces a record using RecordValue format to SeaweedMQ
|
||||
// ALWAYS uses broker's assigned offset - no ledger involved
|
||||
func (h *SeaweedMQHandler) ProduceRecordValue(topic string, partition int32, key []byte, recordValueBytes []byte) (int64, error) {
|
||||
// ctx controls the publish timeout - if client cancels, broker operation is cancelled
|
||||
func (h *SeaweedMQHandler) ProduceRecordValue(ctx context.Context, topic string, partition int32, key []byte, recordValueBytes []byte) (int64, error) {
|
||||
// Verify topic exists
|
||||
if !h.TopicExists(topic) {
|
||||
return 0, fmt.Errorf("topic %s does not exist", topic)
|
||||
@@ -273,7 +275,7 @@ func (h *SeaweedMQHandler) ProduceRecordValue(topic string, partition int32, key
|
||||
if h.brokerClient == nil {
|
||||
publishErr = fmt.Errorf("no broker client available")
|
||||
} else {
|
||||
smqOffset, publishErr = h.brokerClient.PublishRecordValue(topic, partition, key, recordValueBytes, timestamp)
|
||||
smqOffset, publishErr = h.brokerClient.PublishRecordValue(ctx, topic, partition, key, recordValueBytes, timestamp)
|
||||
}
|
||||
|
||||
if publishErr != nil {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package integration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
@@ -269,7 +270,7 @@ func TestSeaweedMQHandler_ProduceRecord(t *testing.T) {
|
||||
key := []byte("produce-key")
|
||||
value := []byte("produce-value")
|
||||
|
||||
offset, err := handler.ProduceRecord(topicName, 0, key, value)
|
||||
offset, err := handler.ProduceRecord(context.Background(), topicName, 0, key, value)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to produce record: %v", err)
|
||||
}
|
||||
@@ -316,7 +317,7 @@ func TestSeaweedMQHandler_MultiplePartitions(t *testing.T) {
|
||||
key := []byte("partition-key")
|
||||
value := []byte("partition-value")
|
||||
|
||||
offset, err := handler.ProduceRecord(topicName, partitionID, key, value)
|
||||
offset, err := handler.ProduceRecord(context.Background(), topicName, partitionID, key, value)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to produce to partition %d: %v", partitionID, err)
|
||||
}
|
||||
@@ -366,7 +367,7 @@ func TestSeaweedMQHandler_FetchRecords(t *testing.T) {
|
||||
|
||||
var producedOffsets []int64
|
||||
for i, record := range testRecords {
|
||||
offset, err := handler.ProduceRecord(topicName, 0, []byte(record.key), []byte(record.value))
|
||||
offset, err := handler.ProduceRecord(context.Background(), topicName, 0, []byte(record.key), []byte(record.value))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to produce record %d: %v", i, err)
|
||||
}
|
||||
@@ -463,7 +464,7 @@ func TestSeaweedMQHandler_FetchRecords_ErrorHandling(t *testing.T) {
|
||||
}
|
||||
|
||||
// Test with very small maxBytes
|
||||
_, err = handler.ProduceRecord(topicName, 0, []byte("key"), []byte("value"))
|
||||
_, err = handler.ProduceRecord(context.Background(), topicName, 0, []byte("key"), []byte("value"))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to produce test record: %v", err)
|
||||
}
|
||||
@@ -490,7 +491,7 @@ func TestSeaweedMQHandler_ErrorHandling(t *testing.T) {
|
||||
defer handler.Close()
|
||||
|
||||
// Try to produce to non-existent topic
|
||||
_, err = handler.ProduceRecord("non-existent-topic", 0, []byte("key"), []byte("value"))
|
||||
_, err = handler.ProduceRecord(context.Background(), "non-existent-topic", 0, []byte("key"), []byte("value"))
|
||||
if err == nil {
|
||||
t.Errorf("Producing to non-existent topic should fail")
|
||||
}
|
||||
|
||||
@@ -132,8 +132,8 @@ type SeaweedMQHandlerInterface interface {
|
||||
DeleteTopic(topic string) error
|
||||
GetTopicInfo(topic string) (*integration.KafkaTopicInfo, bool)
|
||||
// Ledger methods REMOVED - SMQ handles Kafka offsets natively
|
||||
ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error)
|
||||
ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error)
|
||||
ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error)
|
||||
ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error)
|
||||
// GetStoredRecords retrieves records from SMQ storage (optional - for advanced implementations)
|
||||
// ctx is used to control the fetch timeout (should match Kafka fetch request's MaxWaitTime)
|
||||
GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]integration.SMQRecord, error)
|
||||
@@ -1060,7 +1060,7 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) {
|
||||
response, err = h.handleDeleteTopics(req.correlationID, req.requestBody)
|
||||
|
||||
case APIKeyProduce:
|
||||
response, err = h.handleProduce(req.correlationID, req.apiVersion, req.requestBody)
|
||||
response, err = h.handleProduce(req.ctx, req.correlationID, req.apiVersion, req.requestBody)
|
||||
|
||||
case APIKeyFetch:
|
||||
response, err = h.handleFetch(req.ctx, req.correlationID, req.apiVersion, req.requestBody)
|
||||
|
||||
@@ -163,11 +163,11 @@ func (h *FastMockHandler) GetTopicInfo(name string) (*integration.KafkaTopicInfo
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (h *FastMockHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
|
||||
func (h *FastMockHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) {
|
||||
return 0, fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
func (h *FastMockHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
|
||||
func (h *FastMockHandler) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
|
||||
return 0, fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
@@ -234,11 +234,11 @@ func (h *BlockingMockHandler) GetTopicInfo(name string) (*integration.KafkaTopic
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (h *BlockingMockHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
|
||||
func (h *BlockingMockHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) {
|
||||
return 0, fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
func (h *BlockingMockHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
|
||||
func (h *BlockingMockHandler) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
|
||||
return 0, fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
@@ -320,11 +320,11 @@ func (h *TimeoutAwareMockHandler) GetTopicInfo(name string) (*integration.KafkaT
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (h *TimeoutAwareMockHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
|
||||
func (h *TimeoutAwareMockHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) {
|
||||
return 0, fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
func (h *TimeoutAwareMockHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
|
||||
func (h *TimeoutAwareMockHandler) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
|
||||
return 0, fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package protocol
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"strings"
|
||||
@@ -12,20 +13,20 @@ import (
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
func (h *Handler) handleProduce(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
|
||||
func (h *Handler) handleProduce(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
|
||||
|
||||
// Version-specific handling
|
||||
switch apiVersion {
|
||||
case 0, 1:
|
||||
return h.handleProduceV0V1(correlationID, apiVersion, requestBody)
|
||||
return h.handleProduceV0V1(ctx, correlationID, apiVersion, requestBody)
|
||||
case 2, 3, 4, 5, 6, 7:
|
||||
return h.handleProduceV2Plus(correlationID, apiVersion, requestBody)
|
||||
return h.handleProduceV2Plus(ctx, correlationID, apiVersion, requestBody)
|
||||
default:
|
||||
return nil, fmt.Errorf("produce version %d not implemented yet", apiVersion)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) handleProduceV0V1(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
|
||||
func (h *Handler) handleProduceV0V1(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
|
||||
// Parse Produce v0/v1 request
|
||||
// Request format: client_id + acks(2) + timeout(4) + topics_array
|
||||
|
||||
@@ -147,11 +148,11 @@ func (h *Handler) handleProduceV0V1(correlationID uint32, apiVersion uint16, req
|
||||
// Process the record set
|
||||
recordCount, _, parseErr := h.parseRecordSet(recordSetData) // totalSize unused
|
||||
if parseErr != nil {
|
||||
errorCode = 42 // INVALID_RECORD
|
||||
} else if recordCount > 0 {
|
||||
// Use SeaweedMQ integration
|
||||
offset, err := h.produceToSeaweedMQ(topicName, int32(partitionID), recordSetData)
|
||||
if err != nil {
|
||||
errorCode = 42 // INVALID_RECORD
|
||||
} else if recordCount > 0 {
|
||||
// Use SeaweedMQ integration
|
||||
offset, err := h.produceToSeaweedMQ(ctx, topicName, int32(partitionID), recordSetData)
|
||||
if err != nil {
|
||||
// Check if this is a schema validation error and add delay to prevent overloading
|
||||
if h.isSchemaValidationError(err) {
|
||||
time.Sleep(200 * time.Millisecond) // Brief delay for schema validation failures
|
||||
@@ -232,7 +233,8 @@ func (h *Handler) parseRecordSet(recordSetData []byte) (recordCount int32, total
|
||||
}
|
||||
|
||||
// produceToSeaweedMQ publishes a single record to SeaweedMQ (simplified for Phase 2)
|
||||
func (h *Handler) produceToSeaweedMQ(topic string, partition int32, recordSetData []byte) (int64, error) {
|
||||
// ctx controls the publish timeout - if client cancels, produce operation is cancelled
|
||||
func (h *Handler) produceToSeaweedMQ(ctx context.Context, topic string, partition int32, recordSetData []byte) (int64, error) {
|
||||
// Extract all records from the record set and publish each one
|
||||
// extractAllRecords handles fallback internally for various cases
|
||||
records := h.extractAllRecords(recordSetData)
|
||||
@@ -244,7 +246,7 @@ func (h *Handler) produceToSeaweedMQ(topic string, partition int32, recordSetDat
|
||||
// Publish all records and return the offset of the first record (base offset)
|
||||
var baseOffset int64
|
||||
for idx, kv := range records {
|
||||
offsetProduced, err := h.produceSchemaBasedRecord(topic, partition, kv.Key, kv.Value)
|
||||
offsetProduced, err := h.produceSchemaBasedRecord(ctx, topic, partition, kv.Key, kv.Value)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@@ -581,7 +583,7 @@ func decodeVarint(data []byte) (int64, int) {
|
||||
}
|
||||
|
||||
// handleProduceV2Plus handles Produce API v2-v7 (Kafka 0.11+)
|
||||
func (h *Handler) handleProduceV2Plus(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
|
||||
func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
|
||||
startTime := time.Now()
|
||||
|
||||
// For now, use simplified parsing similar to v0/v1 but handle v2+ response format
|
||||
@@ -725,7 +727,7 @@ func (h *Handler) handleProduceV2Plus(correlationID uint32, apiVersion uint16, r
|
||||
} else {
|
||||
var firstOffsetSet bool
|
||||
for idx, kv := range records {
|
||||
offsetProduced, prodErr := h.produceSchemaBasedRecord(topicName, int32(partitionID), kv.Key, kv.Value)
|
||||
offsetProduced, prodErr := h.produceSchemaBasedRecord(ctx, topicName, int32(partitionID), kv.Key, kv.Value)
|
||||
if prodErr != nil {
|
||||
// Check if this is a schema validation error and add delay to prevent overloading
|
||||
if h.isSchemaValidationError(prodErr) {
|
||||
@@ -795,7 +797,8 @@ func (h *Handler) handleProduceV2Plus(correlationID uint32, apiVersion uint16, r
|
||||
}
|
||||
|
||||
// processSchematizedMessage processes a message that may contain schema information
|
||||
func (h *Handler) processSchematizedMessage(topicName string, partitionID int32, originalKey []byte, messageBytes []byte) error {
|
||||
// ctx controls the publish timeout - if client cancels, process operation is cancelled
|
||||
func (h *Handler) processSchematizedMessage(ctx context.Context, topicName string, partitionID int32, originalKey []byte, messageBytes []byte) error {
|
||||
// System topics should bypass schema processing entirely
|
||||
if h.isSystemTopic(topicName) {
|
||||
return nil // Skip schema processing for system topics
|
||||
@@ -820,11 +823,12 @@ func (h *Handler) processSchematizedMessage(topicName string, partitionID int32,
|
||||
}
|
||||
|
||||
// Store the decoded message using SeaweedMQ
|
||||
return h.storeDecodedMessage(topicName, partitionID, originalKey, decodedMsg)
|
||||
return h.storeDecodedMessage(ctx, topicName, partitionID, originalKey, decodedMsg)
|
||||
}
|
||||
|
||||
// storeDecodedMessage stores a decoded message using mq.broker integration
|
||||
func (h *Handler) storeDecodedMessage(topicName string, partitionID int32, originalKey []byte, decodedMsg *schema.DecodedMessage) error {
|
||||
// ctx controls the publish timeout - if client cancels, store operation is cancelled
|
||||
func (h *Handler) storeDecodedMessage(ctx context.Context, topicName string, partitionID int32, originalKey []byte, decodedMsg *schema.DecodedMessage) error {
|
||||
// Use broker client if available
|
||||
if h.IsBrokerIntegrationEnabled() {
|
||||
// Use the original Kafka message key
|
||||
@@ -853,7 +857,7 @@ func (h *Handler) storeDecodedMessage(topicName string, partitionID int32, origi
|
||||
// NOT just the Avro payload, so we can return them as-is during fetch without re-encoding
|
||||
value := decodedMsg.Envelope.OriginalBytes
|
||||
|
||||
_, err := h.seaweedMQHandler.ProduceRecord(topicName, partitionID, key, value)
|
||||
_, err := h.seaweedMQHandler.ProduceRecord(ctx, topicName, partitionID, key, value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to produce to SeaweedMQ: %w", err)
|
||||
}
|
||||
@@ -1141,18 +1145,19 @@ func (h *Handler) isSystemTopic(topicName string) bool {
|
||||
}
|
||||
|
||||
// produceSchemaBasedRecord produces a record using schema-based encoding to RecordValue
|
||||
func (h *Handler) produceSchemaBasedRecord(topic string, partition int32, key []byte, value []byte) (int64, error) {
|
||||
// ctx controls the publish timeout - if client cancels, produce operation is cancelled
|
||||
func (h *Handler) produceSchemaBasedRecord(ctx context.Context, topic string, partition int32, key []byte, value []byte) (int64, error) {
|
||||
|
||||
// System topics should always bypass schema processing and be stored as-is
|
||||
if h.isSystemTopic(topic) {
|
||||
offset, err := h.seaweedMQHandler.ProduceRecord(topic, partition, key, value)
|
||||
offset, err := h.seaweedMQHandler.ProduceRecord(ctx, topic, partition, key, value)
|
||||
return offset, err
|
||||
}
|
||||
|
||||
// If schema management is not enabled, fall back to raw message handling
|
||||
isEnabled := h.IsSchemaEnabled()
|
||||
if !isEnabled {
|
||||
return h.seaweedMQHandler.ProduceRecord(topic, partition, key, value)
|
||||
return h.seaweedMQHandler.ProduceRecord(ctx, topic, partition, key, value)
|
||||
}
|
||||
|
||||
var keyDecodedMsg *schema.DecodedMessage
|
||||
@@ -1190,7 +1195,7 @@ func (h *Handler) produceSchemaBasedRecord(topic string, partition int32, key []
|
||||
// If neither key nor value is schematized, fall back to raw message handling
|
||||
// This is OK for non-schematized messages (no magic byte 0x00)
|
||||
if keyDecodedMsg == nil && valueDecodedMsg == nil {
|
||||
return h.seaweedMQHandler.ProduceRecord(topic, partition, key, value)
|
||||
return h.seaweedMQHandler.ProduceRecord(ctx, topic, partition, key, value)
|
||||
}
|
||||
|
||||
// Process key schema if present
|
||||
@@ -1261,10 +1266,10 @@ func (h *Handler) produceSchemaBasedRecord(topic string, partition int32, key []
|
||||
// CRITICAL FIX: Store the DECODED RecordValue (not the original Confluent Wire Format)
|
||||
// This enables SQL queries to work properly. Kafka consumers will receive the RecordValue
|
||||
// which can be re-encoded to Confluent Wire Format during fetch if needed
|
||||
return h.seaweedMQHandler.ProduceRecordValue(topic, partition, finalKey, recordValueBytes)
|
||||
return h.seaweedMQHandler.ProduceRecordValue(ctx, topic, partition, finalKey, recordValueBytes)
|
||||
} else {
|
||||
// Send with raw format for non-schematized data
|
||||
return h.seaweedMQHandler.ProduceRecord(topic, partition, finalKey, recordValueBytes)
|
||||
return h.seaweedMQHandler.ProduceRecord(ctx, topic, partition, finalKey, recordValueBytes)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user