flush faster for _schemas topic

This commit is contained in:
chrislu
2025-10-06 17:27:45 -07:00
parent 2d957d2d68
commit c7e216c8ce
3 changed files with 37 additions and 4 deletions

View File

@@ -190,6 +190,15 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
return fmt.Errorf("topic %v partition %v publish error: %w", initMessage.Topic, initMessage.Partition, err)
}
// CRITICAL: Force immediate flush for _schemas topic to prevent Schema Registry deadlock
// Schema Registry needs immediate visibility of registered schemas
if initMessage.Topic != nil && initMessage.Topic.Name == "_schemas" {
if localTopicPartition.LogBuffer != nil {
localTopicPartition.LogBuffer.ForceFlush()
glog.V(2).Infof("Force flushed _schemas topic after offset %d", assignedOffset)
}
}
// CRITICAL FIX: Send immediate acknowledgment with the assigned offset
// This ensures read-after-write consistency for Kafka Gateway
response := &mq_pb.PublishMessageResponse{

View File

@@ -64,7 +64,7 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers
// Long-poll when client requests it via MaxWaitTime and there's no data
// Even if MinBytes=0, we should honor MaxWaitTime to reduce polling overhead
maxWaitMs := fetchRequest.MaxWaitTime
// CRITICAL: Disable long-polling for _schemas topic to prevent Schema Registry deadlock
// Schema Registry internally polls _schemas with high MaxWaitTime (60s), which can cause
// timeouts when it's waiting for its own produce to become visible.
@@ -75,19 +75,19 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers
maxWaitMs = 0
glog.V(2).Infof("Schema Registry fetch detected, disabling long-poll (original maxWaitMs=%d)", fetchRequest.MaxWaitTime)
}
// TEMPORARY: Disable long-polling for all other topics to eliminate 500ms delays
// The HWM cache can be stale, causing unnecessary waits
if !isSchemasTopic {
maxWaitMs = 0
}
// Long-poll if: (1) client wants to wait (maxWaitMs > 0), (2) no data available, (3) topics exist
// NOTE: We long-poll even if MinBytes=0, since the client specified a wait time
hasData := hasDataAvailable()
topicsExist := allTopicsExist()
shouldLongPoll := maxWaitMs > 0 && !hasData && topicsExist
// Debug Schema Registry polling
if isSchemasTopic && len(fetchRequest.Topics) > 0 {
glog.V(2).Infof("SR FETCH REQUEST: topic=%s maxWaitMs(original)=%d maxWaitMs(effective)=%d minBytes=%d hasData=%v topicsExist=%v shouldLongPoll=%v",

View File

@@ -274,6 +274,30 @@ func (logBuffer *LogBuffer) IsStopping() bool {
return logBuffer.isStopping.Load()
}
// ForceFlush immediately flushes the current buffer content without waiting for the flush interval
// This is useful for critical topics that need immediate persistence (e.g., _schemas for Schema Registry)
func (logBuffer *LogBuffer) ForceFlush() {
if logBuffer.isStopping.Load() {
return // Don't flush if we're shutting down
}
logBuffer.Lock()
toFlush := logBuffer.copyToFlush()
logBuffer.Unlock()
if toFlush != nil {
// Send to flush channel (non-blocking with timeout to avoid deadlock)
select {
case logBuffer.flushChan <- toFlush:
// Successfully queued for flush
case <-time.After(100 * time.Millisecond):
// Flush channel is full or blocked - skip this flush
// Data will be flushed on next interval anyway
glog.V(1).Infof("ForceFlush skipped for %s - flush channel busy", logBuffer.name)
}
}
}
// ShutdownLogBuffer flushes the buffer and stops the log buffer
func (logBuffer *LogBuffer) ShutdownLogBuffer() {
isAlreadyStopped := logBuffer.isStopping.Swap(true)