mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-01-09 19:24:45 +08:00
broker: append message logs
This commit is contained in:
116
weed/messaging/broker_append.go
Normal file
116
weed/messaging/broker_append.go
Normal file
@@ -0,0 +1,116 @@
|
||||
package messaging
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/operation"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/security"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messaging_pb.TopicConfiguration, data []byte) error {
|
||||
|
||||
assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data)
|
||||
if err2 != nil {
|
||||
return err2
|
||||
}
|
||||
|
||||
dir, name := util.FullPath(targetFile).DirAndName()
|
||||
|
||||
chunk := &filer_pb.FileChunk{
|
||||
FileId: assignResult.Fid,
|
||||
Offset: 0, // needs to be fixed during appending
|
||||
Size: uint64(uploadResult.Size),
|
||||
Mtime: time.Now().UnixNano(),
|
||||
ETag: uploadResult.ETag,
|
||||
IsGzipped: uploadResult.Gzip > 0,
|
||||
}
|
||||
|
||||
// append the chunk
|
||||
if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||
|
||||
request := &filer_pb.AppendToEntryRequest{
|
||||
Directory: dir,
|
||||
EntryName: name,
|
||||
Chunks: []*filer_pb.FileChunk{chunk},
|
||||
}
|
||||
|
||||
_, err := client.AppendToEntry(context.Background(), request)
|
||||
if err != nil {
|
||||
glog.V(0).Infof("append to file %v: %v", request, err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return fmt.Errorf("append to file %v: %v", targetFile, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
|
||||
|
||||
var assignResult = &operation.AssignResult{}
|
||||
var collection, replication string
|
||||
|
||||
// assign a volume location
|
||||
if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||
|
||||
request := &filer_pb.AssignVolumeRequest{
|
||||
Count: 1,
|
||||
Replication: topicConfig.Replication,
|
||||
Collection: topicConfig.Collection,
|
||||
}
|
||||
|
||||
resp, err := client.AssignVolume(context.Background(), request)
|
||||
if err != nil {
|
||||
glog.V(0).Infof("assign volume failure %v: %v", request, err)
|
||||
return err
|
||||
}
|
||||
if resp.Error != "" {
|
||||
return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
|
||||
}
|
||||
|
||||
assignResult.Auth = security.EncodedJwt(resp.Auth)
|
||||
assignResult.Fid = resp.FileId
|
||||
assignResult.Url = resp.Url
|
||||
assignResult.PublicUrl = resp.PublicUrl
|
||||
assignResult.Count = uint64(resp.Count)
|
||||
|
||||
collection, replication = resp.Collection, resp.Replication
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// upload data
|
||||
targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid)
|
||||
uploadResult, err := operation.UploadData(targetUrl, "", broker.option.Cipher, data, false, "", nil, assignResult.Auth)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
|
||||
}
|
||||
// println("uploaded to", targetUrl)
|
||||
return assignResult, uploadResult, nil
|
||||
}
|
||||
|
||||
func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) {
|
||||
|
||||
for _, filer := range broker.option.Filers {
|
||||
if err = pb.WithFilerClient(filer, broker.grpcDialOption, fn); err != nil {
|
||||
glog.V(0).Infof("fail to connect to %s: %v", filer, err)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
|
||||
}
|
||||
@@ -32,6 +32,11 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
|
||||
|
||||
updatesChan := make(chan int32)
|
||||
|
||||
// TODO look it up
|
||||
topicConfig := &messaging_pb.TopicConfiguration{
|
||||
|
||||
}
|
||||
|
||||
go func() {
|
||||
for update := range updatesChan {
|
||||
if err := stream.Send(&messaging_pb.PublishResponse{
|
||||
@@ -47,18 +52,16 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
|
||||
|
||||
logBuffer := log_buffer.NewLogBuffer(time.Minute, func(startTime, stopTime time.Time, buf []byte) {
|
||||
|
||||
//targetFile :=
|
||||
fmt.Sprintf("%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d",
|
||||
targetFile := fmt.Sprintf(
|
||||
"%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d",
|
||||
filer2.TopicsDir, namespace, topic,
|
||||
startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
|
||||
partition,
|
||||
)
|
||||
|
||||
/*
|
||||
if err := f.appendToFile(targetFile, buf); err != nil {
|
||||
glog.V(0).Infof("log write failed %s: %v", targetFile, err)
|
||||
}
|
||||
*/
|
||||
if err := broker.appendToFile(targetFile, topicConfig, buf); err != nil {
|
||||
glog.V(0).Infof("log write failed %s: %v", targetFile, err)
|
||||
}
|
||||
|
||||
}, func() {
|
||||
// notify subscribers
|
||||
|
||||
Reference in New Issue
Block a user