mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-08-23 22:09:16 +08:00
refactoring
This commit is contained in:
parent
91d6785cf3
commit
202a29d014
@ -162,24 +162,21 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error {
|
|||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
message := resp.EventNotification
|
message := resp.EventNotification
|
||||||
|
|
||||||
if message.OldEntry == nil && message.NewEntry == nil {
|
if filer_pb.IsEmpty(resp) {
|
||||||
return nil
|
return nil
|
||||||
}
|
} else if filer_pb.IsCreate(resp) {
|
||||||
if message.OldEntry == nil && message.NewEntry != nil {
|
|
||||||
println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
|
println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
|
||||||
entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry)
|
entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry)
|
||||||
return store.InsertEntry(ctx, entry)
|
return store.InsertEntry(ctx, entry)
|
||||||
}
|
} else if filer_pb.IsDelete(resp) {
|
||||||
if message.OldEntry != nil && message.NewEntry == nil {
|
|
||||||
println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name))
|
println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name))
|
||||||
return store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name))
|
return store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name))
|
||||||
}
|
} else if filer_pb.IsUpdate(resp) {
|
||||||
if message.OldEntry != nil && message.NewEntry != nil {
|
println("~", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
|
||||||
if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
|
entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry)
|
||||||
println("~", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
|
return store.UpdateEntry(ctx, entry)
|
||||||
entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry)
|
} else {
|
||||||
return store.UpdateEntry(ctx, entry)
|
// renaming
|
||||||
}
|
|
||||||
println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name))
|
println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name))
|
||||||
if err := store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name)); err != nil {
|
if err := store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name)); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -74,7 +74,7 @@ func runFilerMetaTail(cmd *Command, args []string) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool {
|
shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool {
|
||||||
if resp.EventNotification.OldEntry == nil && resp.EventNotification.NewEntry == nil {
|
if filer_pb.IsEmpty(resp) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if filterFunc == nil {
|
if filterFunc == nil {
|
||||||
|
@ -174,10 +174,10 @@ func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *sour
|
|||||||
return handleEtcRemoteChanges(resp)
|
return handleEtcRemoteChanges(resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
if message.OldEntry == nil && message.NewEntry == nil {
|
if filer_pb.IsEmpty(resp) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if message.OldEntry == nil && message.NewEntry != nil {
|
if filer_pb.IsCreate(resp) {
|
||||||
if message.NewParentPath == option.bucketsDir {
|
if message.NewParentPath == option.bucketsDir {
|
||||||
return handleCreateBucket(message.NewEntry)
|
return handleCreateBucket(message.NewEntry)
|
||||||
}
|
}
|
||||||
@ -212,7 +212,7 @@ func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *sour
|
|||||||
}
|
}
|
||||||
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
|
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
|
||||||
}
|
}
|
||||||
if message.OldEntry != nil && message.NewEntry == nil {
|
if filer_pb.IsDelete(resp) {
|
||||||
if resp.Directory == option.bucketsDir {
|
if resp.Directory == option.bucketsDir {
|
||||||
return handleDeleteBucket(message.OldEntry)
|
return handleDeleteBucket(message.OldEntry)
|
||||||
}
|
}
|
||||||
|
@ -91,10 +91,10 @@ func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string,
|
|||||||
return handleEtcRemoteChanges(resp)
|
return handleEtcRemoteChanges(resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
if message.OldEntry == nil && message.NewEntry == nil {
|
if filer_pb.IsEmpty(resp) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if message.OldEntry == nil && message.NewEntry != nil {
|
if filer_pb.IsCreate(resp) {
|
||||||
if !filer.HasData(message.NewEntry) {
|
if !filer.HasData(message.NewEntry) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -115,7 +115,7 @@ func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string,
|
|||||||
}
|
}
|
||||||
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
|
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
|
||||||
}
|
}
|
||||||
if message.OldEntry != nil && message.NewEntry == nil {
|
if filer_pb.IsDelete(resp) {
|
||||||
glog.V(2).Infof("delete: %+v", resp)
|
glog.V(2).Infof("delete: %+v", resp)
|
||||||
dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
|
dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
|
||||||
if message.OldEntry.IsDirectory {
|
if message.OldEntry.IsDirectory {
|
||||||
|
@ -262,7 +262,7 @@ func genProcessFunction(sourcePath string, targetPath string, dataSink sink.Repl
|
|||||||
}
|
}
|
||||||
|
|
||||||
// handle deletions
|
// handle deletions
|
||||||
if message.OldEntry != nil && message.NewEntry == nil {
|
if filer_pb.IsDelete(resp) {
|
||||||
if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
|
if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -271,7 +271,7 @@ func genProcessFunction(sourcePath string, targetPath string, dataSink sink.Repl
|
|||||||
}
|
}
|
||||||
|
|
||||||
// handle new entries
|
// handle new entries
|
||||||
if message.OldEntry == nil && message.NewEntry != nil {
|
if filer_pb.IsCreate(resp) {
|
||||||
if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
|
if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -280,7 +280,7 @@ func genProcessFunction(sourcePath string, targetPath string, dataSink sink.Repl
|
|||||||
}
|
}
|
||||||
|
|
||||||
// this is something special?
|
// this is something special?
|
||||||
if message.OldEntry == nil && message.NewEntry == nil {
|
if filer_pb.IsEmpty(resp) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -22,12 +22,12 @@ func (f *Filer) onBucketEvents(event *filer_pb.SubscribeMetadataResponse) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if f.DirBucketsPath == event.Directory {
|
if f.DirBucketsPath == event.Directory {
|
||||||
if message.OldEntry == nil && message.NewEntry != nil {
|
if filer_pb.IsCreate(event) {
|
||||||
if message.NewEntry.IsDirectory {
|
if message.NewEntry.IsDirectory {
|
||||||
f.Store.OnBucketCreation(message.NewEntry.Name)
|
f.Store.OnBucketCreation(message.NewEntry.Name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if message.OldEntry != nil && message.NewEntry == nil {
|
if filer_pb.IsDelete(event) {
|
||||||
if message.OldEntry.IsDirectory {
|
if message.OldEntry.IsDirectory {
|
||||||
f.Store.OnBucketDeletion(message.OldEntry.Name)
|
f.Store.OnBucketDeletion(message.OldEntry.Name)
|
||||||
}
|
}
|
||||||
|
@ -45,9 +45,9 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
|
|||||||
newKey := util.NewFullPath(dir, message.NewEntry.Name)
|
newKey := util.NewFullPath(dir, message.NewEntry.Name)
|
||||||
mc.invalidateFunc(newKey, message.NewEntry)
|
mc.invalidateFunc(newKey, message.NewEntry)
|
||||||
}
|
}
|
||||||
} else if message.OldEntry == nil && message.NewEntry != nil {
|
} else if filer_pb.IsCreate(resp) {
|
||||||
// no need to invaalidate
|
// no need to invaalidate
|
||||||
} else if message.OldEntry != nil && message.NewEntry == nil {
|
} else if filer_pb.IsDelete(resp) {
|
||||||
oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
|
oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
|
||||||
mc.invalidateFunc(oldKey, message.OldEntry)
|
mc.invalidateFunc(oldKey, message.OldEntry)
|
||||||
}
|
}
|
||||||
|
@ -136,6 +136,9 @@ func LookupEntry(client SeaweedFilerClient, request *LookupDirectoryEntryRequest
|
|||||||
|
|
||||||
var ErrNotFound = errors.New("filer: no entry is found in filer store")
|
var ErrNotFound = errors.New("filer: no entry is found in filer store")
|
||||||
|
|
||||||
|
func IsEmpty(event *SubscribeMetadataResponse) bool {
|
||||||
|
return event.EventNotification.NewEntry == nil && event.EventNotification.OldEntry == nil
|
||||||
|
}
|
||||||
func IsCreate(event *SubscribeMetadataResponse) bool {
|
func IsCreate(event *SubscribeMetadataResponse) bool {
|
||||||
return event.EventNotification.NewEntry != nil && event.EventNotification.OldEntry == nil
|
return event.EventNotification.NewEntry != nil && event.EventNotification.OldEntry == nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user