diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index c358eccf6..290c84e34 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -70,12 +70,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs return err } - messageCount, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool { - lock.Mutex.Lock() - lock.cond.Wait() - lock.Mutex.Unlock() - return true - }, func(logEntry *filer_pb.LogEntry) error { + eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error { m := &messaging_pb.Message{} if err = proto.Unmarshal(logEntry.Data, m); err != nil { glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) @@ -87,7 +82,14 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs return err } return nil - }) + } + + messageCount, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool { + lock.Mutex.Lock() + lock.cond.Wait() + lock.Mutex.Unlock() + return true + }, eachLogEntryFn) return err