mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-09-20 06:59:23 +08:00
filer: support cross filer meta data sync if sharing the same store
This commit is contained in:
@@ -35,6 +35,7 @@ type FilerOptions struct {
|
|||||||
enableNotification *bool
|
enableNotification *bool
|
||||||
disableHttp *bool
|
disableHttp *bool
|
||||||
cipher *bool
|
cipher *bool
|
||||||
|
peers *string
|
||||||
|
|
||||||
// default leveldb directory, used in "weed server" mode
|
// default leveldb directory, used in "weed server" mode
|
||||||
defaultLevelDbDirectory *string
|
defaultLevelDbDirectory *string
|
||||||
@@ -55,6 +56,7 @@ func init() {
|
|||||||
f.dataCenter = cmdFiler.Flag.String("dataCenter", "", "prefer to write to volumes in this data center")
|
f.dataCenter = cmdFiler.Flag.String("dataCenter", "", "prefer to write to volumes in this data center")
|
||||||
f.disableHttp = cmdFiler.Flag.Bool("disableHttp", false, "disable http request, only gRpc operations are allowed")
|
f.disableHttp = cmdFiler.Flag.Bool("disableHttp", false, "disable http request, only gRpc operations are allowed")
|
||||||
f.cipher = cmdFiler.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers")
|
f.cipher = cmdFiler.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers")
|
||||||
|
f.peers = cmdFiler.Flag.String("peers", "", "all filers sharing the same filer store in comma separated ip:port list")
|
||||||
}
|
}
|
||||||
|
|
||||||
var cmdFiler = &Command{
|
var cmdFiler = &Command{
|
||||||
@@ -114,6 +116,7 @@ func (fo *FilerOptions) startFiler() {
|
|||||||
Host: *fo.ip,
|
Host: *fo.ip,
|
||||||
Port: uint32(*fo.port),
|
Port: uint32(*fo.port),
|
||||||
Cipher: *fo.cipher,
|
Cipher: *fo.cipher,
|
||||||
|
Filers: strings.Split(*fo.peers, ","),
|
||||||
})
|
})
|
||||||
if nfs_err != nil {
|
if nfs_err != nil {
|
||||||
glog.Fatalf("Filer startup error: %v", nfs_err)
|
glog.Fatalf("Filer startup error: %v", nfs_err)
|
||||||
|
@@ -88,6 +88,7 @@ func init() {
|
|||||||
filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 32, "split files larger than the limit")
|
filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 32, "split files larger than the limit")
|
||||||
filerOptions.dirListingLimit = cmdServer.Flag.Int("filer.dirListLimit", 1000, "limit sub dir listing size")
|
filerOptions.dirListingLimit = cmdServer.Flag.Int("filer.dirListLimit", 1000, "limit sub dir listing size")
|
||||||
filerOptions.cipher = cmdServer.Flag.Bool("filer.encryptVolumeData", false, "encrypt data on volume servers")
|
filerOptions.cipher = cmdServer.Flag.Bool("filer.encryptVolumeData", false, "encrypt data on volume servers")
|
||||||
|
filerOptions.peers = cmdServer.Flag.String("filer.peers", "", "all filers sharing the same filer store in comma separated ip:port list")
|
||||||
|
|
||||||
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
|
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
|
||||||
serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port")
|
serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port")
|
||||||
|
@@ -60,6 +60,10 @@ func (f *Filer) SetStore(store FilerStore) {
|
|||||||
f.store = NewFilerStoreWrapper(store)
|
f.store = NewFilerStoreWrapper(store)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *Filer) GetStore() (store FilerStore) {
|
||||||
|
return f.store
|
||||||
|
}
|
||||||
|
|
||||||
func (f *Filer) DisableDirectoryCache() {
|
func (f *Filer) DisableDirectoryCache() {
|
||||||
f.directoryCache = nil
|
f.directoryCache = nil
|
||||||
}
|
}
|
||||||
|
@@ -5,6 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -86,9 +87,17 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
|
|||||||
fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, func() {
|
fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, func() {
|
||||||
fs.listenersCond.Broadcast()
|
fs.listenersCond.Broadcast()
|
||||||
})
|
})
|
||||||
fs.metaAggregator = filer2.NewMetaAggregator(append(option.Filers, fmt.Sprintf("%s:%d", option.Host, option.Port)), fs.grpcDialOption)
|
|
||||||
fs.filer.Cipher = option.Cipher
|
fs.filer.Cipher = option.Cipher
|
||||||
|
|
||||||
|
// set peers
|
||||||
|
if strings.HasPrefix(fs.filer.GetStore().GetName(), "leveldb") && len(option.Filers) > 0 {
|
||||||
|
glog.Fatal("filers using separate leveldb stores should not configure peers!")
|
||||||
|
}
|
||||||
|
if len(option.Filers) == 0 {
|
||||||
|
option.Filers = append(option.Filers, fmt.Sprintf("%s:%d", option.Host, option.Port)
|
||||||
|
}
|
||||||
|
fs.metaAggregator = filer2.NewMetaAggregator(option.Filers, fs.grpcDialOption)
|
||||||
|
|
||||||
maybeStartMetrics(fs, option)
|
maybeStartMetrics(fs, option)
|
||||||
|
|
||||||
go fs.filer.KeepConnectedToMaster()
|
go fs.filer.KeepConnectedToMaster()
|
||||||
|
Reference in New Issue
Block a user