mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-10-21 10:37:24 +08:00
replicate need to include new entry path
This commit is contained in:
@@ -2,6 +2,7 @@ package replication
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
@@ -51,12 +52,17 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p
|
||||
return nil
|
||||
}
|
||||
|
||||
foundExisting, err := r.sink.UpdateEntry(ctx, key, message.OldEntry, message.NewEntry, message.DeleteChunks)
|
||||
foundExisting, err := r.sink.UpdateEntry(ctx, key, message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks)
|
||||
if foundExisting {
|
||||
glog.V(4).Infof("updated %v", key)
|
||||
return err
|
||||
}
|
||||
|
||||
err = r.sink.DeleteEntry(ctx, key, message.OldEntry.IsDirectory, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete old entry %v: %v", key, err)
|
||||
}
|
||||
|
||||
glog.V(4).Infof("creating missing %v", key)
|
||||
return r.sink.CreateEntry(ctx, key, message.NewEntry)
|
||||
}
|
||||
|
@@ -132,7 +132,7 @@ func (g *AzureSink) CreateEntry(ctx context.Context, key string, entry *filer_pb
|
||||
|
||||
}
|
||||
|
||||
func (g *AzureSink) UpdateEntry(ctx context.Context, key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
|
||||
func (g *AzureSink) UpdateEntry(ctx context.Context, key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
|
||||
key = cleanKey(key)
|
||||
// TODO improve efficiency
|
||||
return false, nil
|
||||
|
@@ -124,7 +124,7 @@ func (g *B2Sink) CreateEntry(ctx context.Context, key string, entry *filer_pb.En
|
||||
|
||||
}
|
||||
|
||||
func (g *B2Sink) UpdateEntry(ctx context.Context, key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
|
||||
func (g *B2Sink) UpdateEntry(ctx context.Context, key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
|
||||
|
||||
key = cleanKey(key)
|
||||
|
||||
|
@@ -133,7 +133,7 @@ func (fs *FilerSink) CreateEntry(ctx context.Context, key string, entry *filer_p
|
||||
})
|
||||
}
|
||||
|
||||
func (fs *FilerSink) UpdateEntry(ctx context.Context, key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
|
||||
func (fs *FilerSink) UpdateEntry(ctx context.Context, key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
|
||||
|
||||
dir, name := filer2.FullPath(key).DirAndName()
|
||||
|
||||
@@ -194,7 +194,7 @@ func (fs *FilerSink) UpdateEntry(ctx context.Context, key string, oldEntry, newE
|
||||
return true, fs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
||||
|
||||
request := &filer_pb.UpdateEntryRequest{
|
||||
Directory: dir,
|
||||
Directory: newParentPath,
|
||||
Entry: existingEntry,
|
||||
}
|
||||
|
||||
|
@@ -119,7 +119,7 @@ func (g *GcsSink) CreateEntry(ctx context.Context, key string, entry *filer_pb.E
|
||||
|
||||
}
|
||||
|
||||
func (g *GcsSink) UpdateEntry(ctx context.Context, key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
|
||||
func (g *GcsSink) UpdateEntry(ctx context.Context, key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
|
||||
// TODO improve efficiency
|
||||
return false, nil
|
||||
}
|
||||
|
@@ -12,7 +12,7 @@ type ReplicationSink interface {
|
||||
Initialize(configuration util.Configuration) error
|
||||
DeleteEntry(ctx context.Context, key string, isDirectory, deleteIncludeChunks bool) error
|
||||
CreateEntry(ctx context.Context, key string, entry *filer_pb.Entry) error
|
||||
UpdateEntry(ctx context.Context, key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error)
|
||||
UpdateEntry(ctx context.Context, key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error)
|
||||
GetSinkToDirectory() string
|
||||
SetSourceFiler(s *source.FilerSource)
|
||||
}
|
||||
|
@@ -130,7 +130,7 @@ func (s3sink *S3Sink) CreateEntry(ctx context.Context, key string, entry *filer_
|
||||
|
||||
}
|
||||
|
||||
func (s3sink *S3Sink) UpdateEntry(ctx context.Context, key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
|
||||
func (s3sink *S3Sink) UpdateEntry(ctx context.Context, key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
|
||||
key = cleanKey(key)
|
||||
// TODO improve efficiency
|
||||
return false, nil
|
||||
|
Reference in New Issue
Block a user