stop when in memory log is done

This commit is contained in:
chrislu
2022-05-30 15:25:21 -07:00
parent aece35a64f
commit f214dfb1f5
4 changed files with 18 additions and 11 deletions

View File

@@ -117,7 +117,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
lastReadTime = time.Unix(0, processedTsNs) lastReadTime = time.Unix(0, processedTsNs)
} }
lastReadTime, err = lock.logBuffer.LoopProcessLogData("broker", lastReadTime, func() bool { lastReadTime, _, err = lock.logBuffer.LoopProcessLogData("broker", lastReadTime, 0, func() bool {
lock.Mutex.Lock() lock.Mutex.Lock()
lock.cond.Wait() lock.cond.Wait()
lock.Mutex.Unlock() lock.Mutex.Unlock()

View File

@@ -59,7 +59,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
lastReadTime, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, func() bool { lastReadTime, isDone, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, req.UntilNs, func() bool {
fs.filer.MetaAggregator.ListenersLock.Lock() fs.filer.MetaAggregator.ListenersLock.Lock()
fs.filer.MetaAggregator.ListenersCond.Wait() fs.filer.MetaAggregator.ListenersCond.Wait()
fs.filer.MetaAggregator.ListenersLock.Unlock() fs.filer.MetaAggregator.ListenersLock.Unlock()
@@ -74,6 +74,9 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
break break
} }
} }
if isDone {
return nil
}
time.Sleep(1127 * time.Millisecond) time.Sleep(1127 * time.Millisecond)
} }
@@ -127,7 +130,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
lastReadTime, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, func() bool { lastReadTime, isDone, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, req.UntilNs, func() bool {
fs.listenersLock.Lock() fs.listenersLock.Lock()
fs.listenersCond.Wait() fs.listenersCond.Wait()
fs.listenersLock.Unlock() fs.listenersLock.Unlock()
@@ -142,6 +145,9 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
break break
} }
} }
if isDone {
return nil
}
} }
return readInMemoryLogErr return readInMemoryLogErr

View File

@@ -27,7 +27,7 @@ func TestNewLogBufferFirstBuffer(t *testing.T) {
} }
receivedmessageCount := 0 receivedmessageCount := 0
lb.LoopProcessLogData("test", startTime, func() bool { lb.LoopProcessLogData("test", startTime, 0, func() bool {
// stop if no more messages // stop if no more messages
return false return false
}, func(logEntry *filer_pb.LogEntry) error { }, func(logEntry *filer_pb.LogEntry) error {

View File

@@ -17,10 +17,11 @@ var (
ResumeFromDiskError = fmt.Errorf("resumeFromDisk") ResumeFromDiskError = fmt.Errorf("resumeFromDisk")
) )
func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime time.Time, waitForDataFn func() bool, eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadTime time.Time, err error) { func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startReadTime time.Time, stopTsNs int64,
waitForDataFn func() bool, eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadTime time.Time, isDone bool, err error) {
// loop through all messages // loop through all messages
var bytesBuf *bytes.Buffer var bytesBuf *bytes.Buffer
lastReadTime = startTreadTime lastReadTime = startReadTime
defer func() { defer func() {
if bytesBuf != nil { if bytesBuf != nil {
logBuffer.ReleaseMemory(bytesBuf) logBuffer.ReleaseMemory(bytesBuf)
@@ -35,7 +36,7 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime
bytesBuf, err = logBuffer.ReadFromBuffer(lastReadTime) bytesBuf, err = logBuffer.ReadFromBuffer(lastReadTime)
if err == ResumeFromDiskError { if err == ResumeFromDiskError {
time.Sleep(1127 * time.Millisecond) time.Sleep(1127 * time.Millisecond)
return lastReadTime, ResumeFromDiskError return lastReadTime, isDone, ResumeFromDiskError
} }
// glog.V(4).Infof("%s ReadFromBuffer by %v", readerName, lastReadTime) // glog.V(4).Infof("%s ReadFromBuffer by %v", readerName, lastReadTime)
if bytesBuf == nil { if bytesBuf == nil {
@@ -50,7 +51,6 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime
// fmt.Printf("ReadFromBuffer %s by %v size %d\n", readerName, lastReadTime, len(buf)) // fmt.Printf("ReadFromBuffer %s by %v size %d\n", readerName, lastReadTime, len(buf))
batchSize := 0 batchSize := 0
var startReadTime time.Time
for pos := 0; pos+4 < len(buf); { for pos := 0; pos+4 < len(buf); {
@@ -68,10 +68,11 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime
pos += 4 + int(size) pos += 4 + int(size)
continue continue
} }
lastReadTime = time.Unix(0, logEntry.TsNs) if stopTsNs != 0 && logEntry.TsNs > stopTsNs {
if startReadTime.IsZero() { isDone = true
startReadTime = lastReadTime return
} }
lastReadTime = time.Unix(0, logEntry.TsNs)
if err = eachLogDataFn(logEntry); err != nil { if err = eachLogDataFn(logEntry); err != nil {
return return