mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-10-21 10:07:24 +08:00
filer: support path-specific filer store
This commit is contained in:
@@ -1,10 +1,11 @@
|
||||
package filer
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/spf13/viper"
|
||||
"os"
|
||||
"reflect"
|
||||
"strings"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -15,28 +16,64 @@ func (f *Filer) LoadConfiguration(config *viper.Viper) {
|
||||
|
||||
validateOneEnabledStore(config)
|
||||
|
||||
// load configuration for default filer store
|
||||
hasDefaultStoreConfigured := false
|
||||
for _, store := range Stores {
|
||||
if config.GetBool(store.GetName() + ".enabled") {
|
||||
store = reflect.New(reflect.ValueOf(store).Elem().Type()).Interface().(FilerStore)
|
||||
if err := store.Initialize(config, store.GetName()+"."); err != nil {
|
||||
glog.Fatalf("Failed to initialize store for %s: %+v",
|
||||
store.GetName(), err)
|
||||
glog.Fatalf("failed to initialize store for %s: %+v", store.GetName(), err)
|
||||
}
|
||||
f.SetStore(store)
|
||||
glog.V(0).Infof("Configure filer for %s", store.GetName())
|
||||
return
|
||||
glog.V(0).Infof("configured filer for %s", store.GetName())
|
||||
hasDefaultStoreConfigured = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// TODO load path-specific filer store here
|
||||
// f.Store.AddPathSpecificStore(path, store)
|
||||
|
||||
println()
|
||||
println("Supported filer stores are:")
|
||||
for _, store := range Stores {
|
||||
println(" " + store.GetName())
|
||||
if !hasDefaultStoreConfigured {
|
||||
println()
|
||||
println("Supported filer stores are:")
|
||||
for _, store := range Stores {
|
||||
println(" " + store.GetName())
|
||||
}
|
||||
os.Exit(-1)
|
||||
}
|
||||
|
||||
// load path-specific filer store here
|
||||
// f.Store.AddPathSpecificStore(path, store)
|
||||
storeNames := make(map[string]FilerStore)
|
||||
for _, store := range Stores {
|
||||
storeNames[store.GetName()] = store
|
||||
}
|
||||
allKeys := config.AllKeys()
|
||||
for _, key := range allKeys {
|
||||
if !strings.HasSuffix(key, ".enabled") {
|
||||
continue
|
||||
}
|
||||
key = key[:len(key)-len(".enabled")]
|
||||
if !strings.Contains(key, ".") {
|
||||
continue
|
||||
}
|
||||
|
||||
parts := strings.Split(key, ".")
|
||||
storeName, storeId := parts[0], parts[1]
|
||||
|
||||
store := storeNames[storeName]
|
||||
store = reflect.New(reflect.ValueOf(store).Elem().Type()).Interface().(FilerStore)
|
||||
if err := store.Initialize(config, key+"."); err != nil {
|
||||
glog.Fatalf("Failed to initialize store for %s: %+v", key, err)
|
||||
}
|
||||
location := config.GetString(key+".location")
|
||||
if location == "" {
|
||||
glog.Errorf("path-specific filer store needs %s", key+".location")
|
||||
os.Exit(-1)
|
||||
}
|
||||
f.Store.AddPathSpecificStore(location, storeId, store)
|
||||
|
||||
glog.V(0).Infof("configure filer %s for %s", store.GetName(), location)
|
||||
}
|
||||
|
||||
os.Exit(-1)
|
||||
}
|
||||
|
||||
func validateOneEnabledStore(config *viper.Viper) {
|
||||
|
@@ -50,12 +50,13 @@ type VirtualFilerStore interface {
|
||||
FilerStore
|
||||
DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
|
||||
DeleteOneEntry(ctx context.Context, entry *Entry) error
|
||||
AddPathSpecificStore(path string, store FilerStore)
|
||||
AddPathSpecificStore(path string, storeId string, store FilerStore)
|
||||
}
|
||||
|
||||
type FilerStoreWrapper struct {
|
||||
defaultStore FilerStore
|
||||
pathToStore ptrie.Trie
|
||||
defaultStore FilerStore
|
||||
pathToStore ptrie.Trie
|
||||
storeIdToStore map[string]FilerStore
|
||||
}
|
||||
|
||||
func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
|
||||
@@ -63,23 +64,34 @@ func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
|
||||
return innerStore
|
||||
}
|
||||
return &FilerStoreWrapper{
|
||||
defaultStore: store,
|
||||
pathToStore: ptrie.New(),
|
||||
defaultStore: store,
|
||||
pathToStore: ptrie.New(),
|
||||
storeIdToStore: make(map[string]FilerStore),
|
||||
}
|
||||
}
|
||||
|
||||
func (fsw *FilerStoreWrapper) AddPathSpecificStore(path string, store FilerStore) {
|
||||
fsw.pathToStore.Put([]byte(path), store)
|
||||
func (fsw *FilerStoreWrapper) AddPathSpecificStore(path string, storeId string, store FilerStore) {
|
||||
fsw.storeIdToStore[storeId] = store
|
||||
err := fsw.pathToStore.Put([]byte(path), storeId)
|
||||
if err != nil {
|
||||
glog.Fatalf("put path specific store: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (fsw *FilerStoreWrapper) getActualStore(path util.FullPath) FilerStore {
|
||||
func (fsw *FilerStoreWrapper) getActualStore(path util.FullPath) (store FilerStore) {
|
||||
store = fsw.defaultStore
|
||||
if path == "" {
|
||||
return fsw.defaultStore
|
||||
return
|
||||
}
|
||||
if store, found := fsw.pathToStore.Get([]byte(path)); found {
|
||||
return store.(FilerStore)
|
||||
var storeId string
|
||||
fsw.pathToStore.MatchPrefix([]byte(path), func(key []byte, value interface{}) bool {
|
||||
storeId = value.(string)
|
||||
return false
|
||||
})
|
||||
if storeId != "" {
|
||||
store = fsw.storeIdToStore[storeId]
|
||||
}
|
||||
return fsw.defaultStore
|
||||
return
|
||||
}
|
||||
|
||||
func (fsw *FilerStoreWrapper) GetName() string {
|
||||
|
Reference in New Issue
Block a user