async meta caching: can stream updates now

This commit is contained in:
Chris Lu
2020-04-21 21:16:13 -07:00
parent 4f02f7121d
commit e24b25de78
10 changed files with 236 additions and 127 deletions

View File

@@ -14,11 +14,11 @@ type MetaCache struct {
func NewMetaCache(dbFolder string) *MetaCache {
return &MetaCache{
FilerStore: OpenMetaStore(dbFolder),
FilerStore: openMetaStore(dbFolder),
}
}
func OpenMetaStore(dbFolder string) filer2.FilerStore {
func openMetaStore(dbFolder string) filer2.FilerStore {
os.MkdirAll(dbFolder, 0755)
@@ -31,4 +31,4 @@ func OpenMetaStore(dbFolder string) filer2.FilerStore {
return store
}
}

View File

@@ -0,0 +1,10 @@
package meta_cache
import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
)
func InitMetaCache(mc *MetaCache, client filer_pb.FilerClient) error {
return nil
}

View File

@@ -0,0 +1,71 @@
package meta_cache
import (
"context"
"fmt"
"io"
"time"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
func SubscribeMetaEvents(mc *MetaCache, client filer_pb.FilerClient, dir string) error {
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
ctx := context.Background()
var err error
if message.OldEntry != nil {
key := util.NewFullPath(resp.Directory, message.OldEntry.Name)
glog.V(4).Infof("deleting %v", key)
err = mc.DeleteEntry(ctx, key)
}
if message.NewEntry != nil {
dir := resp.Directory
if message.NewParentPath != "" {
dir = message.NewParentPath
}
key := util.NewFullPath(dir, message.NewEntry.Name)
glog.V(4).Infof("creating %v", key)
err = mc.InsertEntry(ctx, filer2.FromPbEntry(dir, message.NewEntry))
}
return err
}
var lastTsNs int64
for {
err := client.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
stream, err := client.SubscribeMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{
ClientName: "mount",
PathPrefix: dir,
SinceNs: lastTsNs,
})
if err != nil {
return fmt.Errorf("subscribe: %v", err)
}
for {
resp, listenErr := stream.Recv()
if listenErr == io.EOF {
return nil
}
if listenErr != nil {
return listenErr
}
if err := processEventFn(resp); err != nil {
return fmt.Errorf("process %v: %v", resp, err)
}
lastTsNs = resp.TsNs
}
})
if err != nil {
glog.V(0).Infof("subscribing filer meta change: %v", err)
time.Sleep(time.Second)
}
}
}