support env variables to overwrite toml file

This commit is contained in:
Chris Lu
2020-01-29 09:09:55 -08:00
parent 27b94cb65b
commit d335f04de6
60 changed files with 268 additions and 247 deletions

View File

@@ -18,10 +18,10 @@ type Replicator struct {
source *source.FilerSource
}
func NewReplicator(sourceConfig util.Configuration, dataSink sink.ReplicationSink) *Replicator {
func NewReplicator(sourceConfig util.Configuration, configPrefix string, dataSink sink.ReplicationSink) *Replicator {
source := &source.FilerSource{}
source.Initialize(sourceConfig)
source.Initialize(sourceConfig, configPrefix)
dataSink.SetSourceFiler(source)

View File

@@ -35,12 +35,12 @@ func (g *AzureSink) GetSinkToDirectory() string {
return g.dir
}
func (g *AzureSink) Initialize(configuration util.Configuration) error {
func (g *AzureSink) Initialize(configuration util.Configuration, prefix string) error {
return g.initialize(
configuration.GetString("account_name"),
configuration.GetString("account_key"),
configuration.GetString("container"),
configuration.GetString("directory"),
configuration.GetString(prefix+"account_name"),
configuration.GetString(prefix+"account_key"),
configuration.GetString(prefix+"container"),
configuration.GetString(prefix+"directory"),
)
}

View File

@@ -31,12 +31,12 @@ func (g *B2Sink) GetSinkToDirectory() string {
return g.dir
}
func (g *B2Sink) Initialize(configuration util.Configuration) error {
func (g *B2Sink) Initialize(configuration util.Configuration, prefix string) error {
return g.initialize(
configuration.GetString("b2_account_id"),
configuration.GetString("b2_master_application_key"),
configuration.GetString("bucket"),
configuration.GetString("directory"),
configuration.GetString(prefix+"b2_account_id"),
configuration.GetString(prefix+"b2_master_application_key"),
configuration.GetString(prefix+"bucket"),
configuration.GetString(prefix+"directory"),
)
}

View File

@@ -3,10 +3,11 @@ package filersink
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/spf13/viper"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -38,13 +39,13 @@ func (fs *FilerSink) GetSinkToDirectory() string {
return fs.dir
}
func (fs *FilerSink) Initialize(configuration util.Configuration) error {
func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error {
return fs.initialize(
configuration.GetString("grpcAddress"),
configuration.GetString("directory"),
configuration.GetString("replication"),
configuration.GetString("collection"),
configuration.GetInt("ttlSec"),
configuration.GetString(prefix+"grpcAddress"),
configuration.GetString(prefix+"directory"),
configuration.GetString(prefix+"replication"),
configuration.GetString(prefix+"collection"),
configuration.GetInt(prefix+"ttlSec"),
)
}
@@ -59,7 +60,7 @@ func (fs *FilerSink) initialize(grpcAddress string, dir string,
fs.replication = replication
fs.collection = collection
fs.ttlSec = int32(ttlSec)
fs.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
return nil
}

View File

@@ -34,11 +34,11 @@ func (g *GcsSink) GetSinkToDirectory() string {
return g.dir
}
func (g *GcsSink) Initialize(configuration util.Configuration) error {
func (g *GcsSink) Initialize(configuration util.Configuration, prefix string) error {
return g.initialize(
configuration.GetString("google_application_credentials"),
configuration.GetString("bucket"),
configuration.GetString("directory"),
configuration.GetString(prefix+"google_application_credentials"),
configuration.GetString(prefix+"bucket"),
configuration.GetString(prefix+"directory"),
)
}

View File

@@ -9,7 +9,7 @@ import (
type ReplicationSink interface {
GetName() string
Initialize(configuration util.Configuration) error
Initialize(configuration util.Configuration, prefix string) error
DeleteEntry(ctx context.Context, key string, isDirectory, deleteIncludeChunks bool) error
CreateEntry(ctx context.Context, key string, entry *filer_pb.Entry) error
UpdateEntry(ctx context.Context, key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error)

View File

@@ -39,16 +39,16 @@ func (s3sink *S3Sink) GetSinkToDirectory() string {
return s3sink.dir
}
func (s3sink *S3Sink) Initialize(configuration util.Configuration) error {
glog.V(0).Infof("sink.s3.region: %v", configuration.GetString("region"))
glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString("bucket"))
glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString("directory"))
func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string) error {
glog.V(0).Infof("sink.s3.region: %v", configuration.GetString(prefix+"region"))
glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket"))
glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory"))
return s3sink.initialize(
configuration.GetString("aws_access_key_id"),
configuration.GetString("aws_secret_access_key"),
configuration.GetString("region"),
configuration.GetString("bucket"),
configuration.GetString("directory"),
configuration.GetString(prefix+"aws_access_key_id"),
configuration.GetString(prefix+"aws_secret_access_key"),
configuration.GetString(prefix+"region"),
configuration.GetString(prefix+"bucket"),
configuration.GetString(prefix+"directory"),
)
}

View File

@@ -3,13 +3,14 @@ package source
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/spf13/viper"
"google.golang.org/grpc"
"io"
"net/http"
"strings"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -25,17 +26,17 @@ type FilerSource struct {
Dir string
}
func (fs *FilerSource) Initialize(configuration util.Configuration) error {
func (fs *FilerSource) Initialize(configuration util.Configuration, prefix string) error {
return fs.initialize(
configuration.GetString("grpcAddress"),
configuration.GetString("directory"),
configuration.GetString(prefix+"grpcAddress"),
configuration.GetString(prefix+"directory"),
)
}
func (fs *FilerSource) initialize(grpcAddress string, dir string) (err error) {
fs.grpcAddress = grpcAddress
fs.Dir = dir
fs.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
return nil
}

View File

@@ -27,14 +27,14 @@ func (k *AwsSqsInput) GetName() string {
return "aws_sqs"
}
func (k *AwsSqsInput) Initialize(configuration util.Configuration) error {
glog.V(0).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString("region"))
glog.V(0).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString("sqs_queue_name"))
func (k *AwsSqsInput) Initialize(configuration util.Configuration, prefix string) error {
glog.V(0).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString(prefix+"region"))
glog.V(0).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString(prefix+"sqs_queue_name"))
return k.initialize(
configuration.GetString("aws_access_key_id"),
configuration.GetString("aws_secret_access_key"),
configuration.GetString("region"),
configuration.GetString("sqs_queue_name"),
configuration.GetString(prefix+"aws_access_key_id"),
configuration.GetString(prefix+"aws_secret_access_key"),
configuration.GetString(prefix+"region"),
configuration.GetString(prefix+"sqs_queue_name"),
)
}

View File

@@ -27,8 +27,8 @@ func (k *GoCDKPubSubInput) GetName() string {
return "gocdk_pub_sub"
}
func (k *GoCDKPubSubInput) Initialize(config util.Configuration) error {
subURL := config.GetString("sub_url")
func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix string) error {
subURL := configuration.GetString(prefix + "sub_url")
glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", subURL)
sub, err := pubsub.OpenSubscription(context.Background(), subURL)
if err != nil {

View File

@@ -27,13 +27,13 @@ func (k *GooglePubSubInput) GetName() string {
return "google_pub_sub"
}
func (k *GooglePubSubInput) Initialize(configuration util.Configuration) error {
glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString("project_id"))
glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString("topic"))
func (k *GooglePubSubInput) Initialize(configuration util.Configuration, prefix string) error {
glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString(prefix+"project_id"))
glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString(prefix+"topic"))
return k.initialize(
configuration.GetString("google_application_credentials"),
configuration.GetString("project_id"),
configuration.GetString("topic"),
configuration.GetString(prefix+"google_application_credentials"),
configuration.GetString(prefix+"project_id"),
configuration.GetString(prefix+"topic"),
)
}

View File

@@ -28,14 +28,14 @@ func (k *KafkaInput) GetName() string {
return "kafka"
}
func (k *KafkaInput) Initialize(configuration util.Configuration) error {
glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice("hosts"))
glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString("topic"))
func (k *KafkaInput) Initialize(configuration util.Configuration, prefix string) error {
glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts"))
glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic"))
return k.initialize(
configuration.GetStringSlice("hosts"),
configuration.GetString("topic"),
configuration.GetString("offsetFile"),
configuration.GetInt("offsetSaveIntervalSeconds"),
configuration.GetStringSlice(prefix+"hosts"),
configuration.GetString(prefix+"topic"),
configuration.GetString(prefix+"offsetFile"),
configuration.GetInt(prefix+"offsetSaveIntervalSeconds"),
)
}

View File

@@ -9,7 +9,7 @@ type NotificationInput interface {
// GetName gets the name to locate the configuration in sync.toml file
GetName() string
// Initialize initializes the file store
Initialize(configuration util.Configuration) error
Initialize(configuration util.Configuration, prefix string) error
ReceiveMessage() (key string, message *filer_pb.EventNotification, err error)
}