add client id for all metadata listening clients

This commit is contained in:
chrislu
2021-12-30 00:23:57 -08:00
parent fb434318e3
commit 5c87fcc6d2
20 changed files with 293 additions and 244 deletions

View File

@@ -54,8 +54,10 @@ func runFilerBackup(cmd *Command, args []string) bool {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
clientId := util.RandomInt32()
for {
err := doFilerBackup(grpcDialOption, &filerBackupOptions)
err := doFilerBackup(grpcDialOption, &filerBackupOptions, clientId)
if err != nil {
glog.Errorf("backup from %s: %v", *filerBackupOptions.filer, err)
time.Sleep(1747 * time.Millisecond)
@@ -69,7 +71,7 @@ const (
BackupKeyPrefix = "backup."
)
func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions) error {
func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions, clientId int32) error {
// find data sink
config := util.GetViper()
@@ -112,7 +114,7 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
return setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), lastTsNs)
})
return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(),
return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(), clientId,
sourcePath, nil, startFrom.UnixNano(), 0, processEventFnWithOffset, false)
}

View File

@@ -27,7 +27,8 @@ type FilerMetaBackupOptions struct {
restart *bool
backupFilerConfig *string
store filer.FilerStore
store filer.FilerStore
clientId int32
}
func init() {
@@ -36,6 +37,7 @@ func init() {
metaBackup.filerDirectory = cmdFilerMetaBackup.Flag.String("filerDir", "/", "a folder on the filer")
metaBackup.restart = cmdFilerMetaBackup.Flag.Bool("restart", false, "copy the full metadata before async incremental backup")
metaBackup.backupFilerConfig = cmdFilerMetaBackup.Flag.String("config", "", "path to filer.toml specifying backup filer store")
metaBackup.clientId = util.RandomInt32()
}
var cmdFilerMetaBackup = &Command{
@@ -195,7 +197,7 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error {
return metaBackup.setOffset(lastTime)
})
return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, "meta_backup",
return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, "meta_backup", metaBackup.clientId,
*metaBackup.filerDirectory, nil, startTime.UnixNano(), 0, processEventFnWithOffset, false)
}

View File

@@ -6,7 +6,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/golang/protobuf/jsonpb"
jsoniter "github.com/json-iterator/go"
"github.com/olivere/elastic/v7"
elastic "github.com/olivere/elastic/v7"
"os"
"path/filepath"
"strings"
@@ -48,6 +48,7 @@ func runFilerMetaTail(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
clientId := util.RandomInt32()
var filterFunc func(dir, fname string) bool
if *tailPattern != "" {
@@ -105,7 +106,7 @@ func runFilerMetaTail(cmd *Command, args []string) bool {
}
}
tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpcDialOption, "tail",
tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpcDialOption, "tail", clientId,
*tailTarget, nil, time.Now().Add(-*tailStart).UnixNano(), 0,
func(resp *filer_pb.SubscribeMetadataResponse) error {
if !shouldPrint(resp) {

View File

@@ -28,6 +28,7 @@ type RemoteGatewayOptions struct {
mappings *remote_pb.RemoteStorageMapping
remoteConfs map[string]*remote_pb.RemoteConf
bucketsDir string
clientId int32
}
var _ = filer_pb.FilerClient(&RemoteGatewayOptions{})
@@ -54,6 +55,7 @@ func init() {
remoteGatewayOptions.timeAgo = cmdFilerRemoteGateway.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
remoteGatewayOptions.include = cmdFilerRemoteGateway.Flag.String("include", "", "pattens of new bucket names, e.g., s3*")
remoteGatewayOptions.exclude = cmdFilerRemoteGateway.Flag.String("exclude", "", "pattens of new bucket names, e.g., local*")
remoteGatewayOptions.clientId = util.RandomInt32()
}
var cmdFilerRemoteGateway = &Command{

View File

@@ -38,7 +38,7 @@ func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSo
lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo)
return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync",
return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId,
option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
}

View File

@@ -18,6 +18,7 @@ type RemoteSyncOptions struct {
readChunkFromFiler *bool
timeAgo *time.Duration
dir *string
clientId int32
}
var _ = filer_pb.FilerClient(&RemoteSyncOptions{})
@@ -41,6 +42,7 @@ func init() {
remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "", "a mounted directory on filer")
remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now, skipping previous metadata changes. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
remoteSyncOptions.clientId = util.RandomInt32()
}
var cmdFilerRemoteSynchronize = &Command{

View File

@@ -40,7 +40,7 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, *option.timeAgo)
return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync",
return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId,
mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
}

View File

@@ -37,6 +37,7 @@ type SyncOptions struct {
bDebug *bool
aProxyByFiler *bool
bProxyByFiler *bool
clientId int32
}
var (
@@ -66,6 +67,7 @@ func init() {
syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file")
syncOptions.clientId = util.RandomInt32()
}
var cmdFilerSynchronize = &Command{
@@ -97,7 +99,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
filerB := pb.ServerAddress(*syncOptions.filerB)
go func() {
for {
err := doSubscribeFilerMetaChanges(grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB,
err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB,
*syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType, *syncOptions.bDebug)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
@@ -109,7 +111,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
if !*syncOptions.isActivePassive {
go func() {
for {
err := doSubscribeFilerMetaChanges(grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA,
err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA,
*syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType, *syncOptions.aDebug)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
@@ -124,7 +126,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
return true
}
func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool) error {
// read source filer signature
@@ -172,7 +174,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler pb.
return setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, lastTsNs)
})
return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+string(targetFiler),
return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+string(targetFiler), clientId,
sourcePath, nil, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false)
}