buffer index => buffer offset

This commit is contained in:
chrislu
2025-09-29 12:13:07 -07:00
parent 82b7fa4203
commit 47ba698d01
14 changed files with 152 additions and 112 deletions

View File

@@ -28,9 +28,14 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
b.localTopicManager.RemoveLocalPartition(t, partition)
} else {
var localPartition *topic.LocalPartition
glog.V(0).Infof("🔍 DEBUG: Checking for existing local partition %s %s", t, partition)
if localPartition = b.localTopicManager.GetLocalPartition(t, partition); localPartition == nil {
glog.V(0).Infof("🔍 DEBUG: Creating new local partition %s %s", t, partition)
localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, partition), logstore.GenMergedReadFunc(b, t, partition))
b.localTopicManager.AddLocalPartition(t, localPartition)
glog.V(0).Infof("🔍 DEBUG: Added local partition %s %s to localTopicManager", t, partition)
} else {
glog.V(0).Infof("🔍 DEBUG: Local partition %s %s already exists", t, partition)
}
}
b.accessLock.Unlock()
@@ -51,7 +56,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
}
}
glog.V(0).Infof("AssignTopicPartitions: topic %s partition assignments: %v", request.Topic, request.BrokerPartitionAssignments)
glog.V(0).Infof("🔍 DEBUG: AssignTopicPartitions completed: topic %s partition assignments: %v", request.Topic, request.BrokerPartitionAssignments)
return ret, nil
}

View File

@@ -17,7 +17,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
)
// BufferRange represents a range of buffer indexes that have been flushed to disk
// BufferRange represents a range of buffer offsets that have been flushed to disk
type BufferRange struct {
start int64
end int64
@@ -29,20 +29,22 @@ var ErrNoPartitionAssignment = errors.New("no broker assignment found for partit
// GetUnflushedMessages returns messages from the broker's in-memory LogBuffer
// that haven't been flushed to disk yet, using buffer_start metadata for deduplication
// Now supports streaming responses and buffer index filtering for better performance
// Now supports streaming responses and buffer offset filtering for better performance
// Includes broker routing to redirect requests to the correct broker hosting the topic/partition
func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessagesRequest, stream mq_pb.SeaweedMessaging_GetUnflushedMessagesServer) error {
// Convert protobuf types to internal types
t := topic.FromPbTopic(req.Topic)
partition := topic.FromPbPartition(req.Partition)
glog.V(2).Infof("GetUnflushedMessages request for %v %v", t, partition)
glog.V(0).Infof("🔍 DEBUG: GetUnflushedMessages request for %v %v, StartBufferOffset=%d", t, partition, req.StartBufferOffset)
// Get the local partition for this topic/partition
b.accessLock.Lock()
localPartition := b.localTopicManager.GetLocalPartition(t, partition)
b.accessLock.Unlock()
glog.V(0).Infof("🔍 DEBUG: LocalPartition lookup result: %v", localPartition != nil)
if localPartition == nil {
// Topic/partition not found locally, attempt to find the correct broker and redirect
glog.V(1).Infof("Topic/partition %v %v not found locally, looking up broker", t, partition)
@@ -85,37 +87,42 @@ func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessage
flushedBufferRanges = make([]BufferRange, 0)
}
// Use buffer_start index for precise deduplication
// Use buffer_start offset for precise deduplication
lastFlushTsNs := localPartition.LogBuffer.LastFlushTsNs
startBufferIndex := req.StartBufferIndex
startBufferOffset := req.StartBufferOffset
startTimeNs := lastFlushTsNs // Still respect last flush time for safety
glog.V(2).Infof("Streaming unflushed messages for %v %v, buffer >= %d, timestamp >= %d (safety), excluding %d flushed buffer ranges",
t, partition, startBufferIndex, startTimeNs, len(flushedBufferRanges))
glog.V(0).Infof("🔍 DEBUG: Streaming unflushed messages for %v %v, buffer >= %d, timestamp >= %d (safety), excluding %d flushed buffer ranges",
t, partition, startBufferOffset, startTimeNs, len(flushedBufferRanges))
// Stream messages from LogBuffer with filtering
messageCount := 0
startPosition := log_buffer.NewMessagePosition(startTimeNs, startBufferIndex)
startPosition := log_buffer.NewMessagePosition(startTimeNs, startBufferOffset)
glog.V(0).Infof("🔍 DEBUG: Created MessagePosition: time=%d, offset=%d", startTimeNs, startBufferOffset)
// Use the new LoopProcessLogDataWithBatchIndex method to avoid code duplication
_, _, err = localPartition.LogBuffer.LoopProcessLogDataWithBatchIndex(
// Use the new LoopProcessLogDataWithOffset method to avoid code duplication
_, _, err = localPartition.LogBuffer.LoopProcessLogDataWithOffset(
"GetUnflushedMessages",
startPosition,
0, // stopTsNs = 0 means process all available data
func() bool { return false }, // waitForDataFn = false means don't wait for new data
func(logEntry *filer_pb.LogEntry, batchIndex int64) (isDone bool, err error) {
// Apply buffer index filtering if specified
if startBufferIndex > 0 && batchIndex < startBufferIndex {
glog.V(3).Infof("Skipping message from buffer index %d (< %d)", batchIndex, startBufferIndex)
func(logEntry *filer_pb.LogEntry, offset int64) (isDone bool, err error) {
glog.V(0).Infof("🔍 DEBUG: Processing message at offset %d, startBufferOffset=%d", offset, startBufferOffset)
// Apply buffer offset filtering if specified
if startBufferOffset > 0 && offset < startBufferOffset {
glog.V(0).Infof("🔍 DEBUG: Skipping message from buffer offset %d (< %d)", offset, startBufferOffset)
return false, nil
}
// Check if this message is from a buffer range that's already been flushed
if b.isBufferIndexFlushed(batchIndex, flushedBufferRanges) {
glog.V(3).Infof("Skipping message from flushed buffer index %d", batchIndex)
if b.isBufferOffsetFlushed(offset, flushedBufferRanges) {
glog.V(0).Infof("🔍 DEBUG: Skipping message from flushed buffer offset %d", offset)
return false, nil
}
glog.V(0).Infof("🔍 DEBUG: Streaming message at offset %d, key=%s", offset, string(logEntry.Key))
// Stream this message
err = stream.Send(&mq_pb.GetUnflushedMessagesResponse{
Message: &mq_pb.LogEntry{
@@ -133,6 +140,7 @@ func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessage
}
messageCount++
glog.V(0).Infof("🔍 DEBUG: Successfully streamed message %d", messageCount)
return false, nil // Continue processing
},
)
@@ -159,7 +167,7 @@ func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessage
return err
}
glog.V(1).Infof("Streamed %d unflushed messages for %v %v", messageCount, t, partition)
glog.V(0).Infof("🔍 DEBUG: Completed GetUnflushedMessages - streamed %d messages for %v %v", messageCount, t, partition)
return nil
}
@@ -263,10 +271,10 @@ func (b *MessageQueueBroker) getLogBufferStartFromFile(entry *filer_pb.Entry) (*
return nil, nil
}
// isBufferIndexFlushed checks if a buffer index is covered by any of the flushed ranges
func (b *MessageQueueBroker) isBufferIndexFlushed(bufferIndex int64, flushedRanges []BufferRange) bool {
// isBufferOffsetFlushed checks if a buffer offset is covered by any of the flushed ranges
func (b *MessageQueueBroker) isBufferOffsetFlushed(bufferOffset int64, flushedRanges []BufferRange) bool {
for _, flushedRange := range flushedRanges {
if bufferIndex >= flushedRange.start && bufferIndex <= flushedRange.end {
if bufferOffset >= flushedRange.start && bufferOffset <= flushedRange.end {
return true
}
}

View File

@@ -37,10 +37,13 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition)
glog.V(0).Infof("🔍 DEBUG: Calling GetOrGenerateLocalPartition for %s %s", t, partition)
localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, partition)
if getOrGenErr != nil {
glog.V(0).Infof("🔍 DEBUG: GetOrGenerateLocalPartition failed: %v", getOrGenErr)
return getOrGenErr
}
glog.V(0).Infof("🔍 DEBUG: GetOrGenerateLocalPartition succeeded, localTopicPartition=%v", localTopicPartition != nil)
if localTopicPartition == nil {
return fmt.Errorf("failed to get or generate local partition for topic %v partition %v", t, partition)
}

View File

@@ -40,15 +40,24 @@ func (b *MessageQueueBroker) doGetOrGenLocalPartition(t topic.Topic, partition t
func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition topic.Partition, conf *mq_pb.ConfigureTopicResponse) (localPartition *topic.LocalPartition, isGenerated bool, err error) {
self := b.option.BrokerAddress()
glog.V(0).Infof("🔍 DEBUG: genLocalPartitionFromFiler for %s %s, self=%s", t, partition, self)
glog.V(0).Infof("🔍 DEBUG: conf.BrokerPartitionAssignments: %v", conf.BrokerPartitionAssignments)
for _, assignment := range conf.BrokerPartitionAssignments {
glog.V(0).Infof("🔍 DEBUG: checking assignment: LeaderBroker=%s, Partition=%s", assignment.LeaderBroker, topic.FromPbPartition(assignment.Partition))
if assignment.LeaderBroker == string(self) && partition.Equals(topic.FromPbPartition(assignment.Partition)) {
glog.V(0).Infof("🔍 DEBUG: Creating local partition for %s %s", t, partition)
localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, partition), logstore.GenMergedReadFunc(b, t, partition))
b.localTopicManager.AddLocalPartition(t, localPartition)
isGenerated = true
glog.V(0).Infof("🔍 DEBUG: Successfully added local partition %s %s to localTopicManager", t, partition)
break
}
}
if !isGenerated {
glog.V(0).Infof("🔍 DEBUG: No matching assignment found for %s %s", t, partition)
}
return localPartition, isGenerated, nil
}

View File

@@ -10,11 +10,11 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
)
// LogBufferStart tracks the starting buffer index for a live log file
// Buffer indexes are monotonically increasing, count = number of chunks
// LogBufferStart tracks the starting buffer offset for a live log file
// Buffer offsets are monotonically increasing, count = number of chunks
// Now stored in binary format for efficiency
type LogBufferStart struct {
StartIndex int64 // Starting buffer index (count = len(chunks))
StartIndex int64 // Starting buffer offset (count = len(chunks))
}
func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) log_buffer.LogFlushFuncType {
@@ -29,11 +29,11 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) l
targetFile := fmt.Sprintf("%s/%s", partitionDir, startTime.Format(topic.TIME_FORMAT))
// Get buffer index (now globally unique across restarts)
bufferIndex := logBuffer.GetBatchIndex()
// Get buffer offset (sequential: 0, 1, 2, 3...)
bufferOffset := logBuffer.GetOffset()
for {
if err := b.appendToFileWithBufferIndex(targetFile, buf, bufferIndex, minOffset, maxOffset); err != nil {
if err := b.appendToFileWithBufferIndex(targetFile, buf, bufferOffset, minOffset, maxOffset); err != nil {
glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err)
time.Sleep(737 * time.Millisecond)
} else {
@@ -49,6 +49,6 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) l
localPartition.NotifyLogFlushed(logBuffer.LastFlushTsNs)
}
glog.V(0).Infof("flushing at %d to %s size %d from buffer %s (index %d)", logBuffer.LastFlushTsNs, targetFile, len(buf), logBuffer.GetName(), bufferIndex)
glog.V(0).Infof("flushing at %d to %s size %d from buffer %s (offset %d)", logBuffer.LastFlushTsNs, targetFile, len(buf), logBuffer.GetName(), bufferOffset)
}
}

View File

@@ -19,7 +19,7 @@ func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error
return b.appendToFileWithBufferIndex(targetFile, data, 0)
}
func (b *MessageQueueBroker) appendToFileWithBufferIndex(targetFile string, data []byte, bufferIndex int64, offsetArgs ...int64) error {
func (b *MessageQueueBroker) appendToFileWithBufferIndex(targetFile string, data []byte, bufferOffset int64, offsetArgs ...int64) error {
// Extract optional offset parameters (minOffset, maxOffset)
var minOffset, maxOffset int64
if len(offsetArgs) >= 2 {
@@ -50,11 +50,11 @@ func (b *MessageQueueBroker) appendToFileWithBufferIndex(targetFile string, data
},
}
// Add buffer start index for deduplication tracking (binary format)
if bufferIndex != 0 {
// Add buffer start offset for deduplication tracking (binary format)
if bufferOffset != 0 {
entry.Extended = make(map[string][]byte)
bufferStartBytes := make([]byte, 8)
binary.BigEndian.PutUint64(bufferStartBytes, uint64(bufferIndex))
binary.BigEndian.PutUint64(bufferStartBytes, uint64(bufferOffset))
entry.Extended[mq.ExtendedAttrBufferStart] = bufferStartBytes
}
@@ -76,8 +76,8 @@ func (b *MessageQueueBroker) appendToFileWithBufferIndex(targetFile string, data
} else {
offset = int64(filer.TotalSize(entry.GetChunks()))
// Verify buffer index continuity for existing files (append operations)
if bufferIndex != 0 {
// Verify buffer offset continuity for existing files (append operations)
if bufferOffset != 0 {
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
@@ -87,21 +87,21 @@ func (b *MessageQueueBroker) appendToFileWithBufferIndex(targetFile string, data
if len(existingData) == 8 {
existingStartIndex := int64(binary.BigEndian.Uint64(existingData))
// Verify that the new buffer index is consecutive
// Expected index = start + number of existing chunks
expectedIndex := existingStartIndex + int64(len(entry.GetChunks()))
if bufferIndex != expectedIndex {
// Verify that the new buffer offset is consecutive
// Expected offset = start + number of existing chunks
expectedOffset := existingStartIndex + int64(len(entry.GetChunks()))
if bufferOffset != expectedOffset {
// This shouldn't happen in normal operation
// Log warning but continue (don't crash the system)
glog.Warningf("non-consecutive buffer index for %s. Expected %d, got %d",
fullpath, expectedIndex, bufferIndex)
glog.Warningf("non-consecutive buffer offset for %s. Expected %d, got %d",
fullpath, expectedOffset, bufferOffset)
}
// Note: We don't update the start index - it stays the same
// Note: We don't update the start offset - it stays the same
}
} else {
// No existing buffer start, create new one (shouldn't happen for existing files)
bufferStartBytes := make([]byte, 8)
binary.BigEndian.PutUint64(bufferStartBytes, uint64(bufferIndex))
binary.BigEndian.PutUint64(bufferStartBytes, uint64(bufferOffset))
entry.Extended[mq.ExtendedAttrBufferStart] = bufferStartBytes
}
}

View File

@@ -4,6 +4,7 @@ import (
"time"
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/shirou/gopsutil/v3/cpu"
@@ -35,11 +36,16 @@ func (manager *LocalTopicManager) AddLocalPartition(topic Topic, localPartition
// GetLocalPartition gets a topic from the local topic manager
func (manager *LocalTopicManager) GetLocalPartition(topic Topic, partition Partition) *LocalPartition {
glog.V(0).Infof("🔍 DEBUG: GetLocalPartition called for topic=%s, partition=%s", topic.String(), partition.String())
localTopic, ok := manager.topics.Get(topic.String())
if !ok {
glog.V(0).Infof("🔍 DEBUG: Topic %s not found in localTopicManager", topic.String())
return nil
}
return localTopic.findPartition(partition)
glog.V(0).Infof("🔍 DEBUG: Topic %s found, searching for partition %s", topic.String(), partition.String())
result := localTopic.findPartition(partition)
glog.V(0).Infof("🔍 DEBUG: Partition search result: %v", result != nil)
return result
}
// RemoveTopic removes a topic from the local topic manager

View File

@@ -77,7 +77,7 @@ func (p *LocalPartition) GetOffsetInfo() map[string]interface{} {
"partition_range_stop": p.RangeStop,
"partition_unix_time": p.UnixTimeNs,
"buffer_name": p.LogBuffer.GetName(),
"buffer_batch_index": p.LogBuffer.GetBatchIndex(),
"buffer_offset": p.LogBuffer.GetOffset(),
}
}

View File

@@ -380,7 +380,7 @@ message CloseSubscribersResponse {
message GetUnflushedMessagesRequest {
schema_pb.Topic topic = 1;
schema_pb.Partition partition = 2;
int64 start_buffer_index = 3; // Filter by buffer index (messages from buffers >= this index)
int64 start_buffer_offset = 3; // Filter by buffer offset (messages from buffers >= this offset)
}
message GetUnflushedMessagesResponse {

View File

@@ -2702,12 +2702,12 @@ func (*CloseSubscribersResponse) Descriptor() ([]byte, []int) {
}
type GetUnflushedMessagesRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
Topic *schema_pb.Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"`
Partition *schema_pb.Partition `protobuf:"bytes,2,opt,name=partition,proto3" json:"partition,omitempty"`
StartBufferIndex int64 `protobuf:"varint,3,opt,name=start_buffer_index,json=startBufferIndex,proto3" json:"start_buffer_index,omitempty"` // Filter by buffer index (messages from buffers >= this index)
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
state protoimpl.MessageState `protogen:"open.v1"`
Topic *schema_pb.Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"`
Partition *schema_pb.Partition `protobuf:"bytes,2,opt,name=partition,proto3" json:"partition,omitempty"`
StartBufferOffset int64 `protobuf:"varint,3,opt,name=start_buffer_offset,json=startBufferOffset,proto3" json:"start_buffer_offset,omitempty"` // Filter by buffer offset (messages from buffers >= this offset)
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *GetUnflushedMessagesRequest) Reset() {
@@ -2754,9 +2754,9 @@ func (x *GetUnflushedMessagesRequest) GetPartition() *schema_pb.Partition {
return nil
}
func (x *GetUnflushedMessagesRequest) GetStartBufferIndex() int64 {
func (x *GetUnflushedMessagesRequest) GetStartBufferOffset() int64 {
if x != nil {
return x.StartBufferIndex
return x.StartBufferOffset
}
return 0
}
@@ -4238,11 +4238,11 @@ const file_mq_broker_proto_rawDesc = "" +
"\x05topic\x18\x01 \x01(\v2\x10.schema_pb.TopicR\x05topic\x12 \n" +
"\funix_time_ns\x18\x02 \x01(\x03R\n" +
"unixTimeNs\"\x1a\n" +
"\x18CloseSubscribersResponse\"\xa7\x01\n" +
"\x18CloseSubscribersResponse\"\xa9\x01\n" +
"\x1bGetUnflushedMessagesRequest\x12&\n" +
"\x05topic\x18\x01 \x01(\v2\x10.schema_pb.TopicR\x05topic\x122\n" +
"\tpartition\x18\x02 \x01(\v2\x14.schema_pb.PartitionR\tpartition\x12,\n" +
"\x12start_buffer_index\x18\x03 \x01(\x03R\x10startBufferIndex\"\x8a\x01\n" +
"\tpartition\x18\x02 \x01(\v2\x14.schema_pb.PartitionR\tpartition\x12.\n" +
"\x13start_buffer_offset\x18\x03 \x01(\x03R\x11startBufferOffset\"\x8a\x01\n" +
"\x1cGetUnflushedMessagesResponse\x120\n" +
"\amessage\x18\x01 \x01(\v2\x16.messaging_pb.LogEntryR\amessage\x12\x14\n" +
"\x05error\x18\x02 \x01(\tR\x05error\x12\"\n" +

View File

@@ -438,20 +438,20 @@ func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topi
client := mq_pb.NewSeaweedMessagingClient(conn)
// Step 3: Get earliest buffer_start from disk files for precise deduplication
// Step 3: For unflushed messages, always start from 0 to get all in-memory data
// The buffer_start metadata in log files uses timestamp-based indices for uniqueness,
// but the broker's LogBuffer uses sequential indices internally (0, 1, 2, 3...)
// For unflushed data queries, we want all messages in the buffer regardless of their
// timestamp-based buffer indices, so we always use 0.
topicObj := topic.Topic{Namespace: namespace, Name: topicName}
partitionPath := topic.PartitionDir(topicObj, partition)
fmt.Printf("DEBUG: Getting buffer start from partition path: %s\n", partitionPath)
earliestBufferIndex, err := c.getEarliestBufferStart(ctx, partitionPath)
if err != nil {
fmt.Printf("DEBUG: Failed to get buffer start: %v, using 0\n", err)
// If we can't get buffer info, use 0 (get all unflushed data)
earliestBufferIndex = 0
} else {
fmt.Printf("DEBUG: Using earliest buffer index: %d\n", earliestBufferIndex)
}
// Step 4: Prepare request using buffer index filtering only
// Always use 0 for unflushed messages to ensure we get all in-memory data
earliestBufferOffset := int64(0)
fmt.Printf("DEBUG: Using StartBufferOffset=0 for unflushed messages (buffer offsets are sequential internally)\n")
// Step 4: Prepare request using buffer offset filtering only
request := &mq_pb.GetUnflushedMessagesRequest{
Topic: &schema_pb.Topic{
Namespace: namespace,
@@ -463,11 +463,11 @@ func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topi
RangeStop: partition.RangeStop,
UnixTimeNs: partition.UnixTimeNs,
},
StartBufferIndex: earliestBufferIndex,
StartBufferOffset: earliestBufferOffset,
}
// Step 5: Call the broker streaming API
fmt.Printf("DEBUG: Calling GetUnflushedMessages gRPC with StartBufferIndex=%d\n", earliestBufferIndex)
fmt.Printf("DEBUG: Calling GetUnflushedMessages gRPC with StartBufferOffset=%d\n", earliestBufferOffset)
stream, err := client.GetUnflushedMessages(ctx, request)
if err != nil {
fmt.Printf("DEBUG: GetUnflushedMessages gRPC call failed: %v\n", err)

View File

@@ -26,7 +26,7 @@ type dataToFlush struct {
}
type EachLogEntryFuncType func(logEntry *filer_pb.LogEntry) (isDone bool, err error)
type EachLogEntryWithBatchIndexFuncType func(logEntry *filer_pb.LogEntry, batchIndex int64) (isDone bool, err error)
type EachLogEntryWithOffsetFuncType func(logEntry *filer_pb.LogEntry, offset int64) (isDone bool, err error)
type LogFlushFuncType func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64)
type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error)
@@ -35,7 +35,7 @@ type LogBuffer struct {
name string
prevBuffers *SealedBuffers
buf []byte
batchIndex int64
offset int64 // Sequential offset counter (0, 1, 2, 3...)
idx []int
pos int
startTime time.Time
@@ -51,9 +51,9 @@ type LogBuffer struct {
flushChan chan *dataToFlush
LastTsNs atomic.Int64
// Offset range tracking for Kafka integration
minOffset int64
maxOffset int64
hasOffsets bool
minOffset int64
maxOffset int64
hasOffsets bool
sync.RWMutex
}
@@ -70,7 +70,7 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFunc
notifyFn: notifyFn,
flushChan: make(chan *dataToFlush, 256),
isStopping: new(atomic.Bool),
batchIndex: time.Now().UnixNano(), // Initialize with creation time for uniqueness across restarts
offset: 0, // Start with sequential offset 0
}
go lb.loopFlush()
go lb.loopInterval()
@@ -150,7 +150,7 @@ func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) {
copy(logBuffer.buf[logBuffer.pos+4:logBuffer.pos+4+size], logEntryData)
logBuffer.pos += size + 4
logBuffer.batchIndex++
logBuffer.offset++
}
func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) {
@@ -203,7 +203,7 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin
}
if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 {
// glog.V(0).Infof("%s copyToFlush1 batch:%d count:%d start time %v, ts %v, remaining %d bytes", logBuffer.name, logBuffer.batchIndex, len(logBuffer.idx), logBuffer.startTime, ts, len(logBuffer.buf)-logBuffer.pos)
// glog.V(0).Infof("%s copyToFlush1 offset:%d count:%d start time %v, ts %v, remaining %d bytes", logBuffer.name, logBuffer.offset, len(logBuffer.idx), logBuffer.startTime, ts, len(logBuffer.buf)-logBuffer.pos)
toFlush = logBuffer.copyToFlush()
logBuffer.startTime = ts
if len(logBuffer.buf) < size+4 {
@@ -288,12 +288,12 @@ func (logBuffer *LogBuffer) copyToFlush() *dataToFlush {
// glog.V(4).Infof("%s removed from memory [0,%d) with %d entries [%v, %v]", m.name, m.pos, len(m.idx), m.startTime, m.stopTime)
logBuffer.lastFlushDataTime = logBuffer.stopTime
}
logBuffer.buf = logBuffer.prevBuffers.SealBuffer(logBuffer.startTime, logBuffer.stopTime, logBuffer.buf, logBuffer.pos, logBuffer.batchIndex)
logBuffer.buf = logBuffer.prevBuffers.SealBuffer(logBuffer.startTime, logBuffer.stopTime, logBuffer.buf, logBuffer.pos, logBuffer.offset)
logBuffer.startTime = time.Unix(0, 0)
logBuffer.stopTime = time.Unix(0, 0)
logBuffer.pos = 0
logBuffer.idx = logBuffer.idx[:0]
logBuffer.batchIndex++
logBuffer.offset++
// Reset offset tracking
logBuffer.hasOffsets = false
logBuffer.minOffset = 0
@@ -309,7 +309,7 @@ func (logBuffer *LogBuffer) GetEarliestTime() time.Time {
func (logBuffer *LogBuffer) GetEarliestPosition() MessagePosition {
return MessagePosition{
Time: logBuffer.startTime,
BatchIndex: logBuffer.batchIndex,
BatchIndex: logBuffer.offset,
}
}
@@ -335,12 +335,12 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
var tsBatchIndex int64
if !logBuffer.startTime.IsZero() {
tsMemory = logBuffer.startTime
tsBatchIndex = logBuffer.batchIndex
tsBatchIndex = logBuffer.offset
}
for _, prevBuf := range logBuffer.prevBuffers.buffers {
if !prevBuf.startTime.IsZero() && prevBuf.startTime.Before(tsMemory) {
tsMemory = prevBuf.startTime
tsBatchIndex = prevBuf.batchIndex
tsBatchIndex = prevBuf.offset
}
}
if tsMemory.IsZero() { // case 2.2
@@ -355,25 +355,25 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
// the following is case 2.1
if lastReadPosition.Equal(logBuffer.stopTime) {
return nil, logBuffer.batchIndex, nil
return nil, logBuffer.offset, nil
}
if lastReadPosition.After(logBuffer.stopTime) {
// glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadPosition, m.stopTime)
return nil, logBuffer.batchIndex, nil
return nil, logBuffer.offset, nil
}
if lastReadPosition.Before(logBuffer.startTime) {
for _, buf := range logBuffer.prevBuffers.buffers {
if buf.startTime.After(lastReadPosition.Time) {
// glog.V(4).Infof("%s return the %d sealed buffer %v", m.name, i, buf.startTime)
return copiedBytes(buf.buf[:buf.size]), buf.batchIndex, nil
return copiedBytes(buf.buf[:buf.size]), buf.offset, nil
}
if !buf.startTime.After(lastReadPosition.Time) && buf.stopTime.After(lastReadPosition.Time) {
pos := buf.locateByTs(lastReadPosition.Time)
return copiedBytes(buf.buf[pos:buf.size]), buf.batchIndex, nil
return copiedBytes(buf.buf[pos:buf.size]), buf.offset, nil
}
}
// glog.V(4).Infof("%s return the current buf %v", m.name, lastReadPosition)
return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.batchIndex, nil
return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.offset, nil
}
lastTs := lastReadPosition.UnixNano()
@@ -403,7 +403,7 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
_, prevT = readTs(logBuffer.buf, logBuffer.idx[mid-1])
}
if prevT <= lastTs {
return copiedBytes(logBuffer.buf[pos:logBuffer.pos]), logBuffer.batchIndex, nil
return copiedBytes(logBuffer.buf[pos:logBuffer.pos]), logBuffer.offset, nil
}
h = mid
}
@@ -424,11 +424,11 @@ func (logBuffer *LogBuffer) GetName() string {
return logBuffer.name
}
// GetBatchIndex returns the current batch index for metadata tracking
func (logBuffer *LogBuffer) GetBatchIndex() int64 {
// GetOffset returns the current offset for metadata tracking
func (logBuffer *LogBuffer) GetOffset() int64 {
logBuffer.RLock()
defer logBuffer.RUnlock()
return logBuffer.batchIndex
return logBuffer.offset
}
var bufferPool = sync.Pool{

View File

@@ -176,19 +176,20 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition
}
// LoopProcessLogDataWithBatchIndex is similar to LoopProcessLogData but provides batchIndex to the callback
func (logBuffer *LogBuffer) LoopProcessLogDataWithBatchIndex(readerName string, startPosition MessagePosition, stopTsNs int64,
waitForDataFn func() bool, eachLogDataFn EachLogEntryWithBatchIndexFuncType) (lastReadPosition MessagePosition, isDone bool, err error) {
// LoopProcessLogDataWithOffset is similar to LoopProcessLogData but provides offset to the callback
func (logBuffer *LogBuffer) LoopProcessLogDataWithOffset(readerName string, startPosition MessagePosition, stopTsNs int64,
waitForDataFn func() bool, eachLogDataFn EachLogEntryWithOffsetFuncType) (lastReadPosition MessagePosition, isDone bool, err error) {
glog.V(0).Infof("🔍 DEBUG: LoopProcessLogDataWithOffset started for %s, startPosition=%v", readerName, startPosition)
// loop through all messages
var bytesBuf *bytes.Buffer
var batchIndex int64
var offset int64
lastReadPosition = startPosition
var entryCounter int64
defer func() {
if bytesBuf != nil {
logBuffer.ReleaseMemory(bytesBuf)
}
// println("LoopProcessLogDataWithBatchIndex", readerName, "sent messages total", entryCounter)
// println("LoopProcessLogDataWithOffset", readerName, "sent messages total", entryCounter)
}()
for {
@@ -196,7 +197,8 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithBatchIndex(readerName string,
if bytesBuf != nil {
logBuffer.ReleaseMemory(bytesBuf)
}
bytesBuf, batchIndex, err = logBuffer.ReadFromBuffer(lastReadPosition)
bytesBuf, offset, err = logBuffer.ReadFromBuffer(lastReadPosition)
glog.V(0).Infof("🔍 DEBUG: ReadFromBuffer returned bytesBuf=%v, offset=%d, err=%v", bytesBuf != nil, offset, err)
if err == ResumeFromDiskError {
time.Sleep(1127 * time.Millisecond)
return lastReadPosition, isDone, ResumeFromDiskError
@@ -205,10 +207,10 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithBatchIndex(readerName string,
if bytesBuf != nil {
readSize = bytesBuf.Len()
}
glog.V(4).Infof("%s ReadFromBuffer at %v batch %d. Read bytes %v batch %d", readerName, lastReadPosition, lastReadPosition.BatchIndex, readSize, batchIndex)
glog.V(0).Infof("🔍 DEBUG: %s ReadFromBuffer at %v batch %d. Read bytes %v offset %d", readerName, lastReadPosition, lastReadPosition.BatchIndex, readSize, offset)
if bytesBuf == nil {
if batchIndex >= 0 {
lastReadPosition = NewMessagePosition(lastReadPosition.UnixNano(), batchIndex)
if offset >= 0 {
lastReadPosition = NewMessagePosition(lastReadPosition.UnixNano(), offset)
}
if stopTsNs != 0 {
isDone = true
@@ -233,6 +235,7 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithBatchIndex(readerName string,
buf := bytesBuf.Bytes()
// fmt.Printf("ReadFromBuffer %s by %v size %d\n", readerName, lastReadPosition, len(buf))
glog.V(0).Infof("🔍 DEBUG: Processing buffer with %d bytes for %s", len(buf), readerName)
batchSize := 0
@@ -241,7 +244,7 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithBatchIndex(readerName string,
size := util.BytesToUint32(buf[pos : pos+4])
if pos+4+int(size) > len(buf) {
err = ResumeError
glog.Errorf("LoopProcessLogDataWithBatchIndex: %s read buffer %v read %d entries [%d,%d) from [0,%d)", readerName, lastReadPosition, batchSize, pos, pos+int(size)+4, len(buf))
glog.Errorf("LoopProcessLogDataWithOffset: %s read buffer %v read %d entries [%d,%d) from [0,%d)", readerName, lastReadPosition, batchSize, pos, pos+int(size)+4, len(buf))
return
}
entryData := buf[pos+4 : pos+4+int(size)]
@@ -253,11 +256,15 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithBatchIndex(readerName string,
continue
}
glog.V(0).Infof("🔍 DEBUG: Unmarshaled log entry %d: TsNs=%d, Offset=%d, Key=%s", batchSize+1, logEntry.TsNs, logEntry.Offset, string(logEntry.Key))
// Handle offset-based filtering for offset-based start positions
if startPosition.IsOffsetBased() {
startOffset := startPosition.GetOffset()
glog.V(0).Infof("🔍 DEBUG: Offset-based filtering: logEntry.Offset=%d, startOffset=%d", logEntry.Offset, startOffset)
if logEntry.Offset < startOffset {
// Skip entries before the starting offset
glog.V(0).Infof("🔍 DEBUG: Skipping entry due to offset filter")
pos += 4 + int(size)
batchSize++
continue
@@ -265,18 +272,20 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithBatchIndex(readerName string,
}
if stopTsNs != 0 && logEntry.TsNs > stopTsNs {
glog.V(0).Infof("🔍 DEBUG: Stopping due to stopTsNs")
isDone = true
// println("stopTsNs", stopTsNs, "logEntry.TsNs", logEntry.TsNs)
return
}
lastReadPosition = NewMessagePosition(logEntry.TsNs, batchIndex)
lastReadPosition = NewMessagePosition(logEntry.TsNs, offset)
if isDone, err = eachLogDataFn(logEntry, batchIndex); err != nil {
glog.Errorf("LoopProcessLogDataWithBatchIndex: %s process log entry %d %v: %v", readerName, batchSize+1, logEntry, err)
glog.V(0).Infof("🔍 DEBUG: Calling eachLogDataFn for entry at offset %d", offset)
if isDone, err = eachLogDataFn(logEntry, offset); err != nil {
glog.Errorf("LoopProcessLogDataWithOffset: %s process log entry %d %v: %v", readerName, batchSize+1, logEntry, err)
return
}
if isDone {
glog.V(0).Infof("LoopProcessLogDataWithBatchIndex: %s process log entry %d", readerName, batchSize+1)
glog.V(0).Infof("LoopProcessLogDataWithOffset: %s process log entry %d", readerName, batchSize+1)
return
}

View File

@@ -6,11 +6,11 @@ import (
)
type MemBuffer struct {
buf []byte
size int
startTime time.Time
stopTime time.Time
batchIndex int64
buf []byte
size int
startTime time.Time
stopTime time.Time
offset int64 // Sequential offset counter
}
type SealedBuffers struct {
@@ -30,7 +30,7 @@ func newSealedBuffers(size int) *SealedBuffers {
return sbs
}
func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, pos int, batchIndex int64) (newBuf []byte) {
func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, pos int, offset int64) (newBuf []byte) {
oldMemBuffer := sbs.buffers[0]
size := len(sbs.buffers)
for i := 0; i < size-1; i++ {
@@ -38,13 +38,13 @@ func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte,
sbs.buffers[i].size = sbs.buffers[i+1].size
sbs.buffers[i].startTime = sbs.buffers[i+1].startTime
sbs.buffers[i].stopTime = sbs.buffers[i+1].stopTime
sbs.buffers[i].batchIndex = sbs.buffers[i+1].batchIndex
sbs.buffers[i].offset = sbs.buffers[i+1].offset
}
sbs.buffers[size-1].buf = buf
sbs.buffers[size-1].size = pos
sbs.buffers[size-1].startTime = startTime
sbs.buffers[size-1].stopTime = stopTime
sbs.buffers[size-1].batchIndex = batchIndex
sbs.buffers[size-1].offset = offset
return oldMemBuffer.buf
}