add s3 replication sink

This commit is contained in:
Chris Lu
2018-10-03 23:36:52 -07:00
parent 56a5d5af8d
commit e8ef501f02
8 changed files with 369 additions and 46 deletions

View File

@@ -9,6 +9,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/replication/sink"
)
type FilerSink struct {
@@ -21,6 +22,14 @@ type FilerSink struct {
dataCenter string
}
func init(){
sink.Sinks = append(sink.Sinks, &FilerSink{})
}
func (fs *FilerSink) GetName() string {
return "filer"
}
func (fs *FilerSink) GetSinkToDirectory() string {
return fs.dir
}
@@ -49,7 +58,7 @@ func (fs *FilerSink) initialize(grpcAddress string, dir string,
return nil
}
func (fs *FilerSink) DeleteEntry(key string, entry *filer_pb.Entry, deleteIncludeChunks bool) error {
func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error {
return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
dir, name := filer2.FullPath(key).DirAndName()
@@ -57,7 +66,7 @@ func (fs *FilerSink) DeleteEntry(key string, entry *filer_pb.Entry, deleteInclud
request := &filer_pb.DeleteEntryRequest{
Directory: dir,
Name: name,
IsDirectory: entry.IsDirectory,
IsDirectory: isDirectory,
IsDeleteData: deleteIncludeChunks,
}
@@ -121,13 +130,14 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error {
})
}
func (fs *FilerSink) LookupEntry(key string) (entry *filer_pb.Entry, err error) {
func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
ctx := context.Background()
dir, name := filer2.FullPath(key).DirAndName()
// read existing entry
var existingEntry *filer_pb.Entry
err = fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
@@ -142,24 +152,15 @@ func (fs *FilerSink) LookupEntry(key string) (entry *filer_pb.Entry, err error)
return err
}
entry = resp.Entry
existingEntry = resp.Entry
return nil
})
if err != nil {
return nil, fmt.Errorf("lookup %s: %v", key, err)
return false, fmt.Errorf("lookup %s: %v", key, err)
}
return entry, nil
}
func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry, existingEntry *filer_pb.Entry, deleteIncludeChunks bool) (err error) {
ctx := context.Background()
dir, _ := filer2.FullPath(key).DirAndName()
glog.V(0).Infof("oldEntry %+v, newEntry %+v, existingEntry: %+v", oldEntry, newEntry, existingEntry)
if filer2.ETag(newEntry.Chunks) == filer2.ETag(existingEntry.Chunks) {
@@ -179,13 +180,13 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry, existingEntry *
// replicate the chunks that are new in the source
replicatedChunks, err := fs.replicateChunks(newChunks)
if err != nil {
return fmt.Errorf("replicte %s chunks error: %v", key, err)
return true, fmt.Errorf("replicte %s chunks error: %v", key, err)
}
existingEntry.Chunks = append(existingEntry.Chunks, replicatedChunks...)
}
// save updated meta data
return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
return true, fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.UpdateEntryRequest{
Directory: dir,