working for in memory single log buffer

This commit is contained in:
Chris Lu
2020-04-19 23:37:04 -07:00
parent f373232227
commit ce3cb25cfb
11 changed files with 270 additions and 189 deletions

View File

@@ -16,10 +16,6 @@ import (
func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messaging_pb.TopicConfiguration, data []byte) error {
if topicConfig.IsTransient {
return nil
}
assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data)
if err2 != nil {
return err2

View File

@@ -2,7 +2,6 @@ package broker
import (
"io"
"time"
"github.com/golang/protobuf/proto"
@@ -77,16 +76,9 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
continue
}
m := &messaging_pb.Message{
Timestamp: time.Now().UnixNano(),
Key: in.Data.Key,
Value: in.Data.Value,
Headers: in.Data.Headers,
}
// fmt.Printf("received: %d : %s\n", len(in.Data.Value), string(in.Data.Value))
// fmt.Printf("received: %d : %s\n", len(m.Value), string(m.Value))
data, err := proto.Marshal(m)
data, err := proto.Marshal(in.Data)
if err != nil {
glog.Errorf("marshall error: %v\n", err)
continue

View File

@@ -1,6 +1,7 @@
package broker
import (
"fmt"
"io"
"time"
@@ -9,7 +10,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_SubscribeServer) error {
@@ -23,12 +23,22 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
return err
}
var messageCount int64
subscriberId := in.Init.SubscriberId
println("+ subscriber:", subscriberId)
defer println("- subscriber:", subscriberId)
fmt.Printf("+ subscriber %s\n", subscriberId)
defer func() {
fmt.Printf("- subscriber %s: %d messages\n", subscriberId, messageCount)
}()
// TODO look it up
topicConfig := &messaging_pb.TopicConfiguration{
IsTransient: true,
}
if err = stream.Send(&messaging_pb.BrokerMessage{
Redirect: nil,
}); err != nil {
return err
}
// get lock
@@ -52,7 +62,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
// an error returned will end the subscription
eachMessageFn := func(m *messaging_pb.Message) error {
err := stream.Send(&messaging_pb.BrokerMessage{
Data: m,
Data: m,
})
if err != nil {
glog.V(0).Infof("=> subscriber %v: %+v", subscriberId, err)
@@ -60,42 +70,25 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
return err
}
// loop through all messages
for {
_, buf := lock.logBuffer.ReadFromBuffer(lastReadTime)
for pos := 0; pos+4 < len(buf); {
size := util.BytesToUint32(buf[pos : pos+4])
entryData := buf[pos+4 : pos+4+int(size)]
logEntry := &filer_pb.LogEntry{}
if err = proto.Unmarshal(entryData, logEntry); err != nil {
glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
pos += 4 + int(size)
continue
}
m := &messaging_pb.Message{}
if err = proto.Unmarshal(logEntry.Data, m); err != nil {
glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
pos += 4 + int(size)
continue
}
// fmt.Printf("sending : %d : %s\n", len(m.Value), string(m.Value))
if err = eachMessageFn(m); err != nil {
return err
}
lastReadTime = time.Unix(0, m.Timestamp)
pos += 4 + int(size)
}
messageCount, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool {
lock.Mutex.Lock()
lock.cond.Wait()
lock.Mutex.Unlock()
}
return true
}, func(logEntry *filer_pb.LogEntry) error {
m := &messaging_pb.Message{}
if err = proto.Unmarshal(logEntry.Data, m); err != nil {
glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
return err
}
// fmt.Printf("sending : %d bytes\n", len(m.Value))
if err = eachMessageFn(m); err != nil {
glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err)
return err
}
return nil
})
return err
}

View File

@@ -41,6 +41,10 @@ func (locks *TopicLocks) buildLogBuffer(tl *TopicLock, tp TopicPartition, topicC
flushFn := func(startTime, stopTime time.Time, buf []byte) {
if topicConfig.IsTransient {
return
}
targetFile := fmt.Sprintf(
"%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d",
filer2.TopicsDir, tp.Namespace, tp.Topic,