diff --git a/weed/mq/kafka/protocol/offset_management.go b/weed/mq/kafka/protocol/offset_management.go index 96c51f156..f1937e93e 100644 --- a/weed/mq/kafka/protocol/offset_management.go +++ b/weed/mq/kafka/protocol/offset_management.go @@ -115,11 +115,10 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re return h.buildOffsetCommitErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil } - // Get consumer group - group := h.groupCoordinator.GetGroup(req.GroupID) - if group == nil { - return h.buildOffsetCommitErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil - } + // Get or create consumer group + // Some Kafka clients (like kafka-go Reader) commit offsets without formally joining + // the group via JoinGroup/SyncGroup. We need to support these "simple consumer" use cases. + group := h.groupCoordinator.GetOrCreateGroup(req.GroupID) group.Mu.Lock() defer group.Mu.Unlock() @@ -127,8 +126,14 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re // Update group's last activity group.LastActivity = time.Now() - // Require matching generation to store commits; return IllegalGeneration otherwise - generationMatches := (req.GenerationID == group.Generation) + // Check generation compatibility + // Allow commits for empty groups (no active members) to support simple consumers + // that commit offsets without formal group membership + groupIsEmpty := len(group.Members) == 0 + generationMatches := groupIsEmpty || (req.GenerationID == group.Generation) + + glog.V(1).Infof("[OFFSET_COMMIT] Group check: id=%s reqGen=%d groupGen=%d members=%d empty=%v matches=%v", + req.GroupID, req.GenerationID, group.Generation, len(group.Members), groupIsEmpty, generationMatches) // Process offset commits resp := OffsetCommitResponse{ @@ -166,13 +171,18 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re glog.V(2).Infof("[OFFSET_COMMIT] SMQ storage not available: %v", err) } - glog.V(0).Infof("[OFFSET_COMMIT] Committed: group=%s topic=%s partition=%d offset=%d", - req.GroupID, t.Name, p.Index, p.Offset) + if groupIsEmpty { + glog.V(0).Infof("[OFFSET_COMMIT] ✓ Committed (empty group): group=%s topic=%s partition=%d offset=%d", + req.GroupID, t.Name, p.Index, p.Offset) + } else { + glog.V(0).Infof("[OFFSET_COMMIT] ✓ Committed: group=%s topic=%s partition=%d offset=%d gen=%d", + req.GroupID, t.Name, p.Index, p.Offset, group.Generation) + } } else { // Do not store commit if generation mismatch errCode = 22 // IllegalGeneration - glog.V(0).Infof("[OFFSET_COMMIT] Generation mismatch: group=%s expected=%d got=%d", - req.GroupID, group.Generation, req.GenerationID) + glog.V(0).Infof("[OFFSET_COMMIT] ❌ Rejected - generation mismatch: group=%s expected=%d got=%d members=%d", + req.GroupID, group.Generation, req.GenerationID, len(group.Members)) } topicResp.Partitions = append(topicResp.Partitions, OffsetCommitPartitionResponse{