diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go index 7571da19c..9a172737a 100644 --- a/weed/mq/broker/broker_grpc_assign.go +++ b/weed/mq/broker/broker_grpc_assign.go @@ -28,9 +28,14 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m b.localTopicManager.RemoveLocalPartition(t, partition) } else { var localPartition *topic.LocalPartition + glog.V(0).Infof("🔍 DEBUG: Checking for existing local partition %s %s", t, partition) if localPartition = b.localTopicManager.GetLocalPartition(t, partition); localPartition == nil { + glog.V(0).Infof("🔍 DEBUG: Creating new local partition %s %s", t, partition) localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, partition), logstore.GenMergedReadFunc(b, t, partition)) b.localTopicManager.AddLocalPartition(t, localPartition) + glog.V(0).Infof("🔍 DEBUG: Added local partition %s %s to localTopicManager", t, partition) + } else { + glog.V(0).Infof("🔍 DEBUG: Local partition %s %s already exists", t, partition) } } b.accessLock.Unlock() @@ -51,7 +56,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m } } - glog.V(0).Infof("AssignTopicPartitions: topic %s partition assignments: %v", request.Topic, request.BrokerPartitionAssignments) + glog.V(0).Infof("🔍 DEBUG: AssignTopicPartitions completed: topic %s partition assignments: %v", request.Topic, request.BrokerPartitionAssignments) return ret, nil } diff --git a/weed/mq/broker/broker_grpc_query.go b/weed/mq/broker/broker_grpc_query.go index 21551e65e..e6d96784c 100644 --- a/weed/mq/broker/broker_grpc_query.go +++ b/weed/mq/broker/broker_grpc_query.go @@ -17,7 +17,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" ) -// BufferRange represents a range of buffer indexes that have been flushed to disk +// BufferRange represents a range of buffer offsets that have been flushed to disk type BufferRange struct { start int64 end int64 @@ -29,20 +29,22 @@ var ErrNoPartitionAssignment = errors.New("no broker assignment found for partit // GetUnflushedMessages returns messages from the broker's in-memory LogBuffer // that haven't been flushed to disk yet, using buffer_start metadata for deduplication -// Now supports streaming responses and buffer index filtering for better performance +// Now supports streaming responses and buffer offset filtering for better performance // Includes broker routing to redirect requests to the correct broker hosting the topic/partition func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessagesRequest, stream mq_pb.SeaweedMessaging_GetUnflushedMessagesServer) error { // Convert protobuf types to internal types t := topic.FromPbTopic(req.Topic) partition := topic.FromPbPartition(req.Partition) - glog.V(2).Infof("GetUnflushedMessages request for %v %v", t, partition) + glog.V(0).Infof("🔍 DEBUG: GetUnflushedMessages request for %v %v, StartBufferOffset=%d", t, partition, req.StartBufferOffset) // Get the local partition for this topic/partition b.accessLock.Lock() localPartition := b.localTopicManager.GetLocalPartition(t, partition) b.accessLock.Unlock() + glog.V(0).Infof("🔍 DEBUG: LocalPartition lookup result: %v", localPartition != nil) + if localPartition == nil { // Topic/partition not found locally, attempt to find the correct broker and redirect glog.V(1).Infof("Topic/partition %v %v not found locally, looking up broker", t, partition) @@ -85,37 +87,42 @@ func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessage flushedBufferRanges = make([]BufferRange, 0) } - // Use buffer_start index for precise deduplication + // Use buffer_start offset for precise deduplication lastFlushTsNs := localPartition.LogBuffer.LastFlushTsNs - startBufferIndex := req.StartBufferIndex + startBufferOffset := req.StartBufferOffset startTimeNs := lastFlushTsNs // Still respect last flush time for safety - glog.V(2).Infof("Streaming unflushed messages for %v %v, buffer >= %d, timestamp >= %d (safety), excluding %d flushed buffer ranges", - t, partition, startBufferIndex, startTimeNs, len(flushedBufferRanges)) + glog.V(0).Infof("🔍 DEBUG: Streaming unflushed messages for %v %v, buffer >= %d, timestamp >= %d (safety), excluding %d flushed buffer ranges", + t, partition, startBufferOffset, startTimeNs, len(flushedBufferRanges)) // Stream messages from LogBuffer with filtering messageCount := 0 - startPosition := log_buffer.NewMessagePosition(startTimeNs, startBufferIndex) + startPosition := log_buffer.NewMessagePosition(startTimeNs, startBufferOffset) + glog.V(0).Infof("🔍 DEBUG: Created MessagePosition: time=%d, offset=%d", startTimeNs, startBufferOffset) - // Use the new LoopProcessLogDataWithBatchIndex method to avoid code duplication - _, _, err = localPartition.LogBuffer.LoopProcessLogDataWithBatchIndex( + // Use the new LoopProcessLogDataWithOffset method to avoid code duplication + _, _, err = localPartition.LogBuffer.LoopProcessLogDataWithOffset( "GetUnflushedMessages", startPosition, 0, // stopTsNs = 0 means process all available data func() bool { return false }, // waitForDataFn = false means don't wait for new data - func(logEntry *filer_pb.LogEntry, batchIndex int64) (isDone bool, err error) { - // Apply buffer index filtering if specified - if startBufferIndex > 0 && batchIndex < startBufferIndex { - glog.V(3).Infof("Skipping message from buffer index %d (< %d)", batchIndex, startBufferIndex) + func(logEntry *filer_pb.LogEntry, offset int64) (isDone bool, err error) { + glog.V(0).Infof("🔍 DEBUG: Processing message at offset %d, startBufferOffset=%d", offset, startBufferOffset) + + // Apply buffer offset filtering if specified + if startBufferOffset > 0 && offset < startBufferOffset { + glog.V(0).Infof("🔍 DEBUG: Skipping message from buffer offset %d (< %d)", offset, startBufferOffset) return false, nil } // Check if this message is from a buffer range that's already been flushed - if b.isBufferIndexFlushed(batchIndex, flushedBufferRanges) { - glog.V(3).Infof("Skipping message from flushed buffer index %d", batchIndex) + if b.isBufferOffsetFlushed(offset, flushedBufferRanges) { + glog.V(0).Infof("🔍 DEBUG: Skipping message from flushed buffer offset %d", offset) return false, nil } + glog.V(0).Infof("🔍 DEBUG: Streaming message at offset %d, key=%s", offset, string(logEntry.Key)) + // Stream this message err = stream.Send(&mq_pb.GetUnflushedMessagesResponse{ Message: &mq_pb.LogEntry{ @@ -133,6 +140,7 @@ func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessage } messageCount++ + glog.V(0).Infof("🔍 DEBUG: Successfully streamed message %d", messageCount) return false, nil // Continue processing }, ) @@ -159,7 +167,7 @@ func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessage return err } - glog.V(1).Infof("Streamed %d unflushed messages for %v %v", messageCount, t, partition) + glog.V(0).Infof("🔍 DEBUG: Completed GetUnflushedMessages - streamed %d messages for %v %v", messageCount, t, partition) return nil } @@ -263,10 +271,10 @@ func (b *MessageQueueBroker) getLogBufferStartFromFile(entry *filer_pb.Entry) (* return nil, nil } -// isBufferIndexFlushed checks if a buffer index is covered by any of the flushed ranges -func (b *MessageQueueBroker) isBufferIndexFlushed(bufferIndex int64, flushedRanges []BufferRange) bool { +// isBufferOffsetFlushed checks if a buffer offset is covered by any of the flushed ranges +func (b *MessageQueueBroker) isBufferOffsetFlushed(bufferOffset int64, flushedRanges []BufferRange) bool { for _, flushedRange := range flushedRanges { - if bufferIndex >= flushedRange.start && bufferIndex <= flushedRange.end { + if bufferOffset >= flushedRange.start && bufferOffset <= flushedRange.end { return true } } diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 48ed6d105..7b5dcaf46 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -37,10 +37,13 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition) + glog.V(0).Infof("🔍 DEBUG: Calling GetOrGenerateLocalPartition for %s %s", t, partition) localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, partition) if getOrGenErr != nil { + glog.V(0).Infof("🔍 DEBUG: GetOrGenerateLocalPartition failed: %v", getOrGenErr) return getOrGenErr } + glog.V(0).Infof("🔍 DEBUG: GetOrGenerateLocalPartition succeeded, localTopicPartition=%v", localTopicPartition != nil) if localTopicPartition == nil { return fmt.Errorf("failed to get or generate local partition for topic %v partition %v", t, partition) } diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go index 0b0cde4e5..151227f2b 100644 --- a/weed/mq/broker/broker_topic_conf_read_write.go +++ b/weed/mq/broker/broker_topic_conf_read_write.go @@ -40,15 +40,24 @@ func (b *MessageQueueBroker) doGetOrGenLocalPartition(t topic.Topic, partition t func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition topic.Partition, conf *mq_pb.ConfigureTopicResponse) (localPartition *topic.LocalPartition, isGenerated bool, err error) { self := b.option.BrokerAddress() + glog.V(0).Infof("🔍 DEBUG: genLocalPartitionFromFiler for %s %s, self=%s", t, partition, self) + glog.V(0).Infof("🔍 DEBUG: conf.BrokerPartitionAssignments: %v", conf.BrokerPartitionAssignments) for _, assignment := range conf.BrokerPartitionAssignments { + glog.V(0).Infof("🔍 DEBUG: checking assignment: LeaderBroker=%s, Partition=%s", assignment.LeaderBroker, topic.FromPbPartition(assignment.Partition)) if assignment.LeaderBroker == string(self) && partition.Equals(topic.FromPbPartition(assignment.Partition)) { + glog.V(0).Infof("🔍 DEBUG: Creating local partition for %s %s", t, partition) localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, partition), logstore.GenMergedReadFunc(b, t, partition)) b.localTopicManager.AddLocalPartition(t, localPartition) isGenerated = true + glog.V(0).Infof("🔍 DEBUG: Successfully added local partition %s %s to localTopicManager", t, partition) break } } + if !isGenerated { + glog.V(0).Infof("🔍 DEBUG: No matching assignment found for %s %s", t, partition) + } + return localPartition, isGenerated, nil } diff --git a/weed/mq/broker/broker_topic_partition_read_write.go b/weed/mq/broker/broker_topic_partition_read_write.go index 786a44ec9..18f9c98b0 100644 --- a/weed/mq/broker/broker_topic_partition_read_write.go +++ b/weed/mq/broker/broker_topic_partition_read_write.go @@ -10,11 +10,11 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" ) -// LogBufferStart tracks the starting buffer index for a live log file -// Buffer indexes are monotonically increasing, count = number of chunks +// LogBufferStart tracks the starting buffer offset for a live log file +// Buffer offsets are monotonically increasing, count = number of chunks // Now stored in binary format for efficiency type LogBufferStart struct { - StartIndex int64 // Starting buffer index (count = len(chunks)) + StartIndex int64 // Starting buffer offset (count = len(chunks)) } func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) log_buffer.LogFlushFuncType { @@ -29,11 +29,11 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) l targetFile := fmt.Sprintf("%s/%s", partitionDir, startTime.Format(topic.TIME_FORMAT)) - // Get buffer index (now globally unique across restarts) - bufferIndex := logBuffer.GetBatchIndex() + // Get buffer offset (sequential: 0, 1, 2, 3...) + bufferOffset := logBuffer.GetOffset() for { - if err := b.appendToFileWithBufferIndex(targetFile, buf, bufferIndex, minOffset, maxOffset); err != nil { + if err := b.appendToFileWithBufferIndex(targetFile, buf, bufferOffset, minOffset, maxOffset); err != nil { glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err) time.Sleep(737 * time.Millisecond) } else { @@ -49,6 +49,6 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) l localPartition.NotifyLogFlushed(logBuffer.LastFlushTsNs) } - glog.V(0).Infof("flushing at %d to %s size %d from buffer %s (index %d)", logBuffer.LastFlushTsNs, targetFile, len(buf), logBuffer.GetName(), bufferIndex) + glog.V(0).Infof("flushing at %d to %s size %d from buffer %s (offset %d)", logBuffer.LastFlushTsNs, targetFile, len(buf), logBuffer.GetName(), bufferOffset) } } diff --git a/weed/mq/broker/broker_write.go b/weed/mq/broker/broker_write.go index 21827757d..bdb72a770 100644 --- a/weed/mq/broker/broker_write.go +++ b/weed/mq/broker/broker_write.go @@ -19,7 +19,7 @@ func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error return b.appendToFileWithBufferIndex(targetFile, data, 0) } -func (b *MessageQueueBroker) appendToFileWithBufferIndex(targetFile string, data []byte, bufferIndex int64, offsetArgs ...int64) error { +func (b *MessageQueueBroker) appendToFileWithBufferIndex(targetFile string, data []byte, bufferOffset int64, offsetArgs ...int64) error { // Extract optional offset parameters (minOffset, maxOffset) var minOffset, maxOffset int64 if len(offsetArgs) >= 2 { @@ -50,11 +50,11 @@ func (b *MessageQueueBroker) appendToFileWithBufferIndex(targetFile string, data }, } - // Add buffer start index for deduplication tracking (binary format) - if bufferIndex != 0 { + // Add buffer start offset for deduplication tracking (binary format) + if bufferOffset != 0 { entry.Extended = make(map[string][]byte) bufferStartBytes := make([]byte, 8) - binary.BigEndian.PutUint64(bufferStartBytes, uint64(bufferIndex)) + binary.BigEndian.PutUint64(bufferStartBytes, uint64(bufferOffset)) entry.Extended[mq.ExtendedAttrBufferStart] = bufferStartBytes } @@ -76,8 +76,8 @@ func (b *MessageQueueBroker) appendToFileWithBufferIndex(targetFile string, data } else { offset = int64(filer.TotalSize(entry.GetChunks())) - // Verify buffer index continuity for existing files (append operations) - if bufferIndex != 0 { + // Verify buffer offset continuity for existing files (append operations) + if bufferOffset != 0 { if entry.Extended == nil { entry.Extended = make(map[string][]byte) } @@ -87,21 +87,21 @@ func (b *MessageQueueBroker) appendToFileWithBufferIndex(targetFile string, data if len(existingData) == 8 { existingStartIndex := int64(binary.BigEndian.Uint64(existingData)) - // Verify that the new buffer index is consecutive - // Expected index = start + number of existing chunks - expectedIndex := existingStartIndex + int64(len(entry.GetChunks())) - if bufferIndex != expectedIndex { + // Verify that the new buffer offset is consecutive + // Expected offset = start + number of existing chunks + expectedOffset := existingStartIndex + int64(len(entry.GetChunks())) + if bufferOffset != expectedOffset { // This shouldn't happen in normal operation // Log warning but continue (don't crash the system) - glog.Warningf("non-consecutive buffer index for %s. Expected %d, got %d", - fullpath, expectedIndex, bufferIndex) + glog.Warningf("non-consecutive buffer offset for %s. Expected %d, got %d", + fullpath, expectedOffset, bufferOffset) } - // Note: We don't update the start index - it stays the same + // Note: We don't update the start offset - it stays the same } } else { // No existing buffer start, create new one (shouldn't happen for existing files) bufferStartBytes := make([]byte, 8) - binary.BigEndian.PutUint64(bufferStartBytes, uint64(bufferIndex)) + binary.BigEndian.PutUint64(bufferStartBytes, uint64(bufferOffset)) entry.Extended[mq.ExtendedAttrBufferStart] = bufferStartBytes } } diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go index 09dcaa336..ae1b1a4a7 100644 --- a/weed/mq/topic/local_manager.go +++ b/weed/mq/topic/local_manager.go @@ -4,6 +4,7 @@ import ( "time" cmap "github.com/orcaman/concurrent-map/v2" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/shirou/gopsutil/v3/cpu" @@ -35,11 +36,16 @@ func (manager *LocalTopicManager) AddLocalPartition(topic Topic, localPartition // GetLocalPartition gets a topic from the local topic manager func (manager *LocalTopicManager) GetLocalPartition(topic Topic, partition Partition) *LocalPartition { + glog.V(0).Infof("🔍 DEBUG: GetLocalPartition called for topic=%s, partition=%s", topic.String(), partition.String()) localTopic, ok := manager.topics.Get(topic.String()) if !ok { + glog.V(0).Infof("🔍 DEBUG: Topic %s not found in localTopicManager", topic.String()) return nil } - return localTopic.findPartition(partition) + glog.V(0).Infof("🔍 DEBUG: Topic %s found, searching for partition %s", topic.String(), partition.String()) + result := localTopic.findPartition(partition) + glog.V(0).Infof("🔍 DEBUG: Partition search result: %v", result != nil) + return result } // RemoveTopic removes a topic from the local topic manager diff --git a/weed/mq/topic/local_partition_offset.go b/weed/mq/topic/local_partition_offset.go index 69458a38e..243b9d6ca 100644 --- a/weed/mq/topic/local_partition_offset.go +++ b/weed/mq/topic/local_partition_offset.go @@ -77,7 +77,7 @@ func (p *LocalPartition) GetOffsetInfo() map[string]interface{} { "partition_range_stop": p.RangeStop, "partition_unix_time": p.UnixTimeNs, "buffer_name": p.LogBuffer.GetName(), - "buffer_batch_index": p.LogBuffer.GetBatchIndex(), + "buffer_offset": p.LogBuffer.GetOffset(), } } diff --git a/weed/pb/mq_broker.proto b/weed/pb/mq_broker.proto index 913a3b3ae..8755d925c 100644 --- a/weed/pb/mq_broker.proto +++ b/weed/pb/mq_broker.proto @@ -380,7 +380,7 @@ message CloseSubscribersResponse { message GetUnflushedMessagesRequest { schema_pb.Topic topic = 1; schema_pb.Partition partition = 2; - int64 start_buffer_index = 3; // Filter by buffer index (messages from buffers >= this index) + int64 start_buffer_offset = 3; // Filter by buffer offset (messages from buffers >= this offset) } message GetUnflushedMessagesResponse { diff --git a/weed/pb/mq_pb/mq_broker.pb.go b/weed/pb/mq_pb/mq_broker.pb.go index f9cff0a83..620922a76 100644 --- a/weed/pb/mq_pb/mq_broker.pb.go +++ b/weed/pb/mq_pb/mq_broker.pb.go @@ -2702,12 +2702,12 @@ func (*CloseSubscribersResponse) Descriptor() ([]byte, []int) { } type GetUnflushedMessagesRequest struct { - state protoimpl.MessageState `protogen:"open.v1"` - Topic *schema_pb.Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` - Partition *schema_pb.Partition `protobuf:"bytes,2,opt,name=partition,proto3" json:"partition,omitempty"` - StartBufferIndex int64 `protobuf:"varint,3,opt,name=start_buffer_index,json=startBufferIndex,proto3" json:"start_buffer_index,omitempty"` // Filter by buffer index (messages from buffers >= this index) - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Topic *schema_pb.Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + Partition *schema_pb.Partition `protobuf:"bytes,2,opt,name=partition,proto3" json:"partition,omitempty"` + StartBufferOffset int64 `protobuf:"varint,3,opt,name=start_buffer_offset,json=startBufferOffset,proto3" json:"start_buffer_offset,omitempty"` // Filter by buffer offset (messages from buffers >= this offset) + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *GetUnflushedMessagesRequest) Reset() { @@ -2754,9 +2754,9 @@ func (x *GetUnflushedMessagesRequest) GetPartition() *schema_pb.Partition { return nil } -func (x *GetUnflushedMessagesRequest) GetStartBufferIndex() int64 { +func (x *GetUnflushedMessagesRequest) GetStartBufferOffset() int64 { if x != nil { - return x.StartBufferIndex + return x.StartBufferOffset } return 0 } @@ -4238,11 +4238,11 @@ const file_mq_broker_proto_rawDesc = "" + "\x05topic\x18\x01 \x01(\v2\x10.schema_pb.TopicR\x05topic\x12 \n" + "\funix_time_ns\x18\x02 \x01(\x03R\n" + "unixTimeNs\"\x1a\n" + - "\x18CloseSubscribersResponse\"\xa7\x01\n" + + "\x18CloseSubscribersResponse\"\xa9\x01\n" + "\x1bGetUnflushedMessagesRequest\x12&\n" + "\x05topic\x18\x01 \x01(\v2\x10.schema_pb.TopicR\x05topic\x122\n" + - "\tpartition\x18\x02 \x01(\v2\x14.schema_pb.PartitionR\tpartition\x12,\n" + - "\x12start_buffer_index\x18\x03 \x01(\x03R\x10startBufferIndex\"\x8a\x01\n" + + "\tpartition\x18\x02 \x01(\v2\x14.schema_pb.PartitionR\tpartition\x12.\n" + + "\x13start_buffer_offset\x18\x03 \x01(\x03R\x11startBufferOffset\"\x8a\x01\n" + "\x1cGetUnflushedMessagesResponse\x120\n" + "\amessage\x18\x01 \x01(\v2\x16.messaging_pb.LogEntryR\amessage\x12\x14\n" + "\x05error\x18\x02 \x01(\tR\x05error\x12\"\n" + diff --git a/weed/query/engine/broker_client.go b/weed/query/engine/broker_client.go index f42e9e7eb..f60325d85 100644 --- a/weed/query/engine/broker_client.go +++ b/weed/query/engine/broker_client.go @@ -438,20 +438,20 @@ func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topi client := mq_pb.NewSeaweedMessagingClient(conn) - // Step 3: Get earliest buffer_start from disk files for precise deduplication + // Step 3: For unflushed messages, always start from 0 to get all in-memory data + // The buffer_start metadata in log files uses timestamp-based indices for uniqueness, + // but the broker's LogBuffer uses sequential indices internally (0, 1, 2, 3...) + // For unflushed data queries, we want all messages in the buffer regardless of their + // timestamp-based buffer indices, so we always use 0. topicObj := topic.Topic{Namespace: namespace, Name: topicName} partitionPath := topic.PartitionDir(topicObj, partition) fmt.Printf("DEBUG: Getting buffer start from partition path: %s\n", partitionPath) - earliestBufferIndex, err := c.getEarliestBufferStart(ctx, partitionPath) - if err != nil { - fmt.Printf("DEBUG: Failed to get buffer start: %v, using 0\n", err) - // If we can't get buffer info, use 0 (get all unflushed data) - earliestBufferIndex = 0 - } else { - fmt.Printf("DEBUG: Using earliest buffer index: %d\n", earliestBufferIndex) - } - // Step 4: Prepare request using buffer index filtering only + // Always use 0 for unflushed messages to ensure we get all in-memory data + earliestBufferOffset := int64(0) + fmt.Printf("DEBUG: Using StartBufferOffset=0 for unflushed messages (buffer offsets are sequential internally)\n") + + // Step 4: Prepare request using buffer offset filtering only request := &mq_pb.GetUnflushedMessagesRequest{ Topic: &schema_pb.Topic{ Namespace: namespace, @@ -463,11 +463,11 @@ func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topi RangeStop: partition.RangeStop, UnixTimeNs: partition.UnixTimeNs, }, - StartBufferIndex: earliestBufferIndex, + StartBufferOffset: earliestBufferOffset, } // Step 5: Call the broker streaming API - fmt.Printf("DEBUG: Calling GetUnflushedMessages gRPC with StartBufferIndex=%d\n", earliestBufferIndex) + fmt.Printf("DEBUG: Calling GetUnflushedMessages gRPC with StartBufferOffset=%d\n", earliestBufferOffset) stream, err := client.GetUnflushedMessages(ctx, request) if err != nil { fmt.Printf("DEBUG: GetUnflushedMessages gRPC call failed: %v\n", err) diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index ceff091a8..0e6797b96 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -26,7 +26,7 @@ type dataToFlush struct { } type EachLogEntryFuncType func(logEntry *filer_pb.LogEntry) (isDone bool, err error) -type EachLogEntryWithBatchIndexFuncType func(logEntry *filer_pb.LogEntry, batchIndex int64) (isDone bool, err error) +type EachLogEntryWithOffsetFuncType func(logEntry *filer_pb.LogEntry, offset int64) (isDone bool, err error) type LogFlushFuncType func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error) @@ -35,7 +35,7 @@ type LogBuffer struct { name string prevBuffers *SealedBuffers buf []byte - batchIndex int64 + offset int64 // Sequential offset counter (0, 1, 2, 3...) idx []int pos int startTime time.Time @@ -51,9 +51,9 @@ type LogBuffer struct { flushChan chan *dataToFlush LastTsNs atomic.Int64 // Offset range tracking for Kafka integration - minOffset int64 - maxOffset int64 - hasOffsets bool + minOffset int64 + maxOffset int64 + hasOffsets bool sync.RWMutex } @@ -70,7 +70,7 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFunc notifyFn: notifyFn, flushChan: make(chan *dataToFlush, 256), isStopping: new(atomic.Bool), - batchIndex: time.Now().UnixNano(), // Initialize with creation time for uniqueness across restarts + offset: 0, // Start with sequential offset 0 } go lb.loopFlush() go lb.loopInterval() @@ -150,7 +150,7 @@ func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) { copy(logBuffer.buf[logBuffer.pos+4:logBuffer.pos+4+size], logEntryData) logBuffer.pos += size + 4 - logBuffer.batchIndex++ + logBuffer.offset++ } func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) { @@ -203,7 +203,7 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin } if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 { - // glog.V(0).Infof("%s copyToFlush1 batch:%d count:%d start time %v, ts %v, remaining %d bytes", logBuffer.name, logBuffer.batchIndex, len(logBuffer.idx), logBuffer.startTime, ts, len(logBuffer.buf)-logBuffer.pos) + // glog.V(0).Infof("%s copyToFlush1 offset:%d count:%d start time %v, ts %v, remaining %d bytes", logBuffer.name, logBuffer.offset, len(logBuffer.idx), logBuffer.startTime, ts, len(logBuffer.buf)-logBuffer.pos) toFlush = logBuffer.copyToFlush() logBuffer.startTime = ts if len(logBuffer.buf) < size+4 { @@ -288,12 +288,12 @@ func (logBuffer *LogBuffer) copyToFlush() *dataToFlush { // glog.V(4).Infof("%s removed from memory [0,%d) with %d entries [%v, %v]", m.name, m.pos, len(m.idx), m.startTime, m.stopTime) logBuffer.lastFlushDataTime = logBuffer.stopTime } - logBuffer.buf = logBuffer.prevBuffers.SealBuffer(logBuffer.startTime, logBuffer.stopTime, logBuffer.buf, logBuffer.pos, logBuffer.batchIndex) + logBuffer.buf = logBuffer.prevBuffers.SealBuffer(logBuffer.startTime, logBuffer.stopTime, logBuffer.buf, logBuffer.pos, logBuffer.offset) logBuffer.startTime = time.Unix(0, 0) logBuffer.stopTime = time.Unix(0, 0) logBuffer.pos = 0 logBuffer.idx = logBuffer.idx[:0] - logBuffer.batchIndex++ + logBuffer.offset++ // Reset offset tracking logBuffer.hasOffsets = false logBuffer.minOffset = 0 @@ -309,7 +309,7 @@ func (logBuffer *LogBuffer) GetEarliestTime() time.Time { func (logBuffer *LogBuffer) GetEarliestPosition() MessagePosition { return MessagePosition{ Time: logBuffer.startTime, - BatchIndex: logBuffer.batchIndex, + BatchIndex: logBuffer.offset, } } @@ -335,12 +335,12 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu var tsBatchIndex int64 if !logBuffer.startTime.IsZero() { tsMemory = logBuffer.startTime - tsBatchIndex = logBuffer.batchIndex + tsBatchIndex = logBuffer.offset } for _, prevBuf := range logBuffer.prevBuffers.buffers { if !prevBuf.startTime.IsZero() && prevBuf.startTime.Before(tsMemory) { tsMemory = prevBuf.startTime - tsBatchIndex = prevBuf.batchIndex + tsBatchIndex = prevBuf.offset } } if tsMemory.IsZero() { // case 2.2 @@ -355,25 +355,25 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu // the following is case 2.1 if lastReadPosition.Equal(logBuffer.stopTime) { - return nil, logBuffer.batchIndex, nil + return nil, logBuffer.offset, nil } if lastReadPosition.After(logBuffer.stopTime) { // glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadPosition, m.stopTime) - return nil, logBuffer.batchIndex, nil + return nil, logBuffer.offset, nil } if lastReadPosition.Before(logBuffer.startTime) { for _, buf := range logBuffer.prevBuffers.buffers { if buf.startTime.After(lastReadPosition.Time) { // glog.V(4).Infof("%s return the %d sealed buffer %v", m.name, i, buf.startTime) - return copiedBytes(buf.buf[:buf.size]), buf.batchIndex, nil + return copiedBytes(buf.buf[:buf.size]), buf.offset, nil } if !buf.startTime.After(lastReadPosition.Time) && buf.stopTime.After(lastReadPosition.Time) { pos := buf.locateByTs(lastReadPosition.Time) - return copiedBytes(buf.buf[pos:buf.size]), buf.batchIndex, nil + return copiedBytes(buf.buf[pos:buf.size]), buf.offset, nil } } // glog.V(4).Infof("%s return the current buf %v", m.name, lastReadPosition) - return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.batchIndex, nil + return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.offset, nil } lastTs := lastReadPosition.UnixNano() @@ -403,7 +403,7 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu _, prevT = readTs(logBuffer.buf, logBuffer.idx[mid-1]) } if prevT <= lastTs { - return copiedBytes(logBuffer.buf[pos:logBuffer.pos]), logBuffer.batchIndex, nil + return copiedBytes(logBuffer.buf[pos:logBuffer.pos]), logBuffer.offset, nil } h = mid } @@ -424,11 +424,11 @@ func (logBuffer *LogBuffer) GetName() string { return logBuffer.name } -// GetBatchIndex returns the current batch index for metadata tracking -func (logBuffer *LogBuffer) GetBatchIndex() int64 { +// GetOffset returns the current offset for metadata tracking +func (logBuffer *LogBuffer) GetOffset() int64 { logBuffer.RLock() defer logBuffer.RUnlock() - return logBuffer.batchIndex + return logBuffer.offset } var bufferPool = sync.Pool{ diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index e79f50257..141a5413c 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -176,19 +176,20 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition } -// LoopProcessLogDataWithBatchIndex is similar to LoopProcessLogData but provides batchIndex to the callback -func (logBuffer *LogBuffer) LoopProcessLogDataWithBatchIndex(readerName string, startPosition MessagePosition, stopTsNs int64, - waitForDataFn func() bool, eachLogDataFn EachLogEntryWithBatchIndexFuncType) (lastReadPosition MessagePosition, isDone bool, err error) { +// LoopProcessLogDataWithOffset is similar to LoopProcessLogData but provides offset to the callback +func (logBuffer *LogBuffer) LoopProcessLogDataWithOffset(readerName string, startPosition MessagePosition, stopTsNs int64, + waitForDataFn func() bool, eachLogDataFn EachLogEntryWithOffsetFuncType) (lastReadPosition MessagePosition, isDone bool, err error) { + glog.V(0).Infof("🔍 DEBUG: LoopProcessLogDataWithOffset started for %s, startPosition=%v", readerName, startPosition) // loop through all messages var bytesBuf *bytes.Buffer - var batchIndex int64 + var offset int64 lastReadPosition = startPosition var entryCounter int64 defer func() { if bytesBuf != nil { logBuffer.ReleaseMemory(bytesBuf) } - // println("LoopProcessLogDataWithBatchIndex", readerName, "sent messages total", entryCounter) + // println("LoopProcessLogDataWithOffset", readerName, "sent messages total", entryCounter) }() for { @@ -196,7 +197,8 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithBatchIndex(readerName string, if bytesBuf != nil { logBuffer.ReleaseMemory(bytesBuf) } - bytesBuf, batchIndex, err = logBuffer.ReadFromBuffer(lastReadPosition) + bytesBuf, offset, err = logBuffer.ReadFromBuffer(lastReadPosition) + glog.V(0).Infof("🔍 DEBUG: ReadFromBuffer returned bytesBuf=%v, offset=%d, err=%v", bytesBuf != nil, offset, err) if err == ResumeFromDiskError { time.Sleep(1127 * time.Millisecond) return lastReadPosition, isDone, ResumeFromDiskError @@ -205,10 +207,10 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithBatchIndex(readerName string, if bytesBuf != nil { readSize = bytesBuf.Len() } - glog.V(4).Infof("%s ReadFromBuffer at %v batch %d. Read bytes %v batch %d", readerName, lastReadPosition, lastReadPosition.BatchIndex, readSize, batchIndex) + glog.V(0).Infof("🔍 DEBUG: %s ReadFromBuffer at %v batch %d. Read bytes %v offset %d", readerName, lastReadPosition, lastReadPosition.BatchIndex, readSize, offset) if bytesBuf == nil { - if batchIndex >= 0 { - lastReadPosition = NewMessagePosition(lastReadPosition.UnixNano(), batchIndex) + if offset >= 0 { + lastReadPosition = NewMessagePosition(lastReadPosition.UnixNano(), offset) } if stopTsNs != 0 { isDone = true @@ -233,6 +235,7 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithBatchIndex(readerName string, buf := bytesBuf.Bytes() // fmt.Printf("ReadFromBuffer %s by %v size %d\n", readerName, lastReadPosition, len(buf)) + glog.V(0).Infof("🔍 DEBUG: Processing buffer with %d bytes for %s", len(buf), readerName) batchSize := 0 @@ -241,7 +244,7 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithBatchIndex(readerName string, size := util.BytesToUint32(buf[pos : pos+4]) if pos+4+int(size) > len(buf) { err = ResumeError - glog.Errorf("LoopProcessLogDataWithBatchIndex: %s read buffer %v read %d entries [%d,%d) from [0,%d)", readerName, lastReadPosition, batchSize, pos, pos+int(size)+4, len(buf)) + glog.Errorf("LoopProcessLogDataWithOffset: %s read buffer %v read %d entries [%d,%d) from [0,%d)", readerName, lastReadPosition, batchSize, pos, pos+int(size)+4, len(buf)) return } entryData := buf[pos+4 : pos+4+int(size)] @@ -253,11 +256,15 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithBatchIndex(readerName string, continue } + glog.V(0).Infof("🔍 DEBUG: Unmarshaled log entry %d: TsNs=%d, Offset=%d, Key=%s", batchSize+1, logEntry.TsNs, logEntry.Offset, string(logEntry.Key)) + // Handle offset-based filtering for offset-based start positions if startPosition.IsOffsetBased() { startOffset := startPosition.GetOffset() + glog.V(0).Infof("🔍 DEBUG: Offset-based filtering: logEntry.Offset=%d, startOffset=%d", logEntry.Offset, startOffset) if logEntry.Offset < startOffset { // Skip entries before the starting offset + glog.V(0).Infof("🔍 DEBUG: Skipping entry due to offset filter") pos += 4 + int(size) batchSize++ continue @@ -265,18 +272,20 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithBatchIndex(readerName string, } if stopTsNs != 0 && logEntry.TsNs > stopTsNs { + glog.V(0).Infof("🔍 DEBUG: Stopping due to stopTsNs") isDone = true // println("stopTsNs", stopTsNs, "logEntry.TsNs", logEntry.TsNs) return } - lastReadPosition = NewMessagePosition(logEntry.TsNs, batchIndex) + lastReadPosition = NewMessagePosition(logEntry.TsNs, offset) - if isDone, err = eachLogDataFn(logEntry, batchIndex); err != nil { - glog.Errorf("LoopProcessLogDataWithBatchIndex: %s process log entry %d %v: %v", readerName, batchSize+1, logEntry, err) + glog.V(0).Infof("🔍 DEBUG: Calling eachLogDataFn for entry at offset %d", offset) + if isDone, err = eachLogDataFn(logEntry, offset); err != nil { + glog.Errorf("LoopProcessLogDataWithOffset: %s process log entry %d %v: %v", readerName, batchSize+1, logEntry, err) return } if isDone { - glog.V(0).Infof("LoopProcessLogDataWithBatchIndex: %s process log entry %d", readerName, batchSize+1) + glog.V(0).Infof("LoopProcessLogDataWithOffset: %s process log entry %d", readerName, batchSize+1) return } diff --git a/weed/util/log_buffer/sealed_buffer.go b/weed/util/log_buffer/sealed_buffer.go index c41b30fcc..a8d08fe4d 100644 --- a/weed/util/log_buffer/sealed_buffer.go +++ b/weed/util/log_buffer/sealed_buffer.go @@ -6,11 +6,11 @@ import ( ) type MemBuffer struct { - buf []byte - size int - startTime time.Time - stopTime time.Time - batchIndex int64 + buf []byte + size int + startTime time.Time + stopTime time.Time + offset int64 // Sequential offset counter } type SealedBuffers struct { @@ -30,7 +30,7 @@ func newSealedBuffers(size int) *SealedBuffers { return sbs } -func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, pos int, batchIndex int64) (newBuf []byte) { +func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, pos int, offset int64) (newBuf []byte) { oldMemBuffer := sbs.buffers[0] size := len(sbs.buffers) for i := 0; i < size-1; i++ { @@ -38,13 +38,13 @@ func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, sbs.buffers[i].size = sbs.buffers[i+1].size sbs.buffers[i].startTime = sbs.buffers[i+1].startTime sbs.buffers[i].stopTime = sbs.buffers[i+1].stopTime - sbs.buffers[i].batchIndex = sbs.buffers[i+1].batchIndex + sbs.buffers[i].offset = sbs.buffers[i+1].offset } sbs.buffers[size-1].buf = buf sbs.buffers[size-1].size = pos sbs.buffers[size-1].startTime = startTime sbs.buffers[size-1].stopTime = stopTime - sbs.buffers[size-1].batchIndex = batchIndex + sbs.buffers[size-1].offset = offset return oldMemBuffer.buf }