log messages

This commit is contained in:
Chris Lu
2020-04-30 03:05:34 -07:00
parent 8c73410a51
commit f9b6178b8f
5 changed files with 20 additions and 7 deletions

View File

@@ -22,7 +22,7 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
// TODO look it up
topicConfig := &messaging_pb.TopicConfiguration{
IsTransient: true,
// IsTransient: true,
}
// send init response

View File

@@ -34,7 +34,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
// TODO look it up
topicConfig := &messaging_pb.TopicConfiguration{
IsTransient: true,
// IsTransient: true,
}
if err = stream.Send(&messaging_pb.BrokerMessage{
@@ -79,7 +79,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
return err
}
// fmt.Printf("sending : %d bytes\n", len(m.Value))
// fmt.Printf("sending : %d bytes ts %d\n", len(m.Value), logEntry.TsNs)
if err = eachMessageFn(m); err != nil {
glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err)
return err
@@ -115,6 +115,7 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim
startTsNs := startTime.UnixNano()
topicDir := fmt.Sprintf("/topics/%s/%s", tp.Namespace, tp.Topic)
partitionSuffix := fmt.Sprintf(".part%02d", tp.Partition)
return filer_pb.List(broker, topicDir, "", func(dayEntry *filer_pb.Entry, isLast bool) error {
dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name)
@@ -124,7 +125,10 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim
return nil
}
}
// println("processing", hourMinuteEntry.FullPath)
if !strings.HasSuffix(hourMinuteEntry.Name, partitionSuffix){
return nil
}
// println("partition", tp.Partition, "processing", dayDir, "/", hourMinuteEntry.Name)
chunkedFileReader := filer2.NewChunkStreamReader(broker, hourMinuteEntry.Chunks)
defer chunkedFileReader.Close()
if err := filer2.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil {

View File

@@ -42,7 +42,7 @@ func (locks *TopicLocks) buildLogBuffer(tl *TopicLock, tp TopicPartition, topicC
flushFn := func(startTime, stopTime time.Time, buf []byte) {
if topicConfig.IsTransient {
return
// return
}
// fmt.Printf("flushing with topic config %+v\n", topicConfig)