mirror of
				https://github.com/seaweedfs/seaweedfs.git
				synced 2025-10-21 03:58:51 +08:00 
			
		
		
		
	Merge branch 'master' into head_check_all_chunks
This commit is contained in:
		| @@ -37,7 +37,7 @@ var ( | ||||
|  | ||||
| func init() { | ||||
| 	cmdMount.Run = runMount // break init cycle | ||||
| 	mountOptions.filer = cmdMount.Flag.String("filer", "localhost:8888", "weed filer location") | ||||
| 	mountOptions.filer = cmdMount.Flag.String("filer", "localhost:8888", "comma-separated weed filer location") | ||||
| 	mountOptions.filerMountRootPath = cmdMount.Flag.String("filer.path", "/", "mount this remote path from filer server") | ||||
| 	mountOptions.dir = cmdMount.Flag.String("dir", ".", "mount weed filer to this directory") | ||||
| 	mountOptions.dirAutoCreate = cmdMount.Flag.Bool("dirAutoCreate", false, "auto create the directory to mount to") | ||||
|   | ||||
| @@ -51,9 +51,9 @@ func runMount(cmd *Command, args []string) bool { | ||||
|  | ||||
| func RunMount(option *MountOptions, umask os.FileMode) bool { | ||||
|  | ||||
| 	filer := *option.filer | ||||
| 	filers := strings.Split(*option.filer, ",") | ||||
| 	// parse filer grpc address | ||||
| 	filerGrpcAddress, err := pb.ParseServerToGrpcAddress(filer) | ||||
| 	filerGrpcAddresses, err := pb.ParseServersToGrpcAddresses(filers) | ||||
| 	if err != nil { | ||||
| 		glog.V(0).Infof("ParseFilerGrpcAddress: %v", err) | ||||
| 		return true | ||||
| @@ -64,22 +64,22 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { | ||||
| 	grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") | ||||
| 	var cipher bool | ||||
| 	for i := 0; i < 10; i++ { | ||||
| 		err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { | ||||
| 		err = pb.WithOneOfGrpcFilerClients(filerGrpcAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { | ||||
| 			resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) | ||||
| 			if err != nil { | ||||
| 				return fmt.Errorf("get filer grpc address %s configuration: %v", filerGrpcAddress, err) | ||||
| 				return fmt.Errorf("get filer grpc address %v configuration: %v", filerGrpcAddresses, err) | ||||
| 			} | ||||
| 			cipher = resp.Cipher | ||||
| 			return nil | ||||
| 		}) | ||||
| 		if err != nil { | ||||
| 			glog.V(0).Infof("failed to talk to filer %s: %v", filerGrpcAddress, err) | ||||
| 			glog.V(0).Infof("failed to talk to filer %v: %v", filerGrpcAddresses, err) | ||||
| 			glog.V(0).Infof("wait for %d seconds ...", i+1) | ||||
| 			time.Sleep(time.Duration(i+1) * time.Second) | ||||
| 		} | ||||
| 	} | ||||
| 	if err != nil { | ||||
| 		glog.Errorf("failed to talk to filer %s: %v", filerGrpcAddress, err) | ||||
| 		glog.Errorf("failed to talk to filer %v: %v", filerGrpcAddresses, err) | ||||
| 		return true | ||||
| 	} | ||||
|  | ||||
| @@ -145,7 +145,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { | ||||
|  | ||||
| 	options := []fuse.MountOption{ | ||||
| 		fuse.VolumeName(mountName), | ||||
| 		fuse.FSName(filer + ":" + filerMountRootPath), | ||||
| 		fuse.FSName(*option.filer + ":" + filerMountRootPath), | ||||
| 		fuse.Subtype("seaweedfs"), | ||||
| 		// fuse.NoAppleDouble(), // include .DS_Store, otherwise can not delete non-empty folders | ||||
| 		fuse.NoAppleXattr(), | ||||
| @@ -181,8 +181,8 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { | ||||
|  | ||||
| 	seaweedFileSystem := filesys.NewSeaweedFileSystem(&filesys.Option{ | ||||
| 		MountDirectory:     dir, | ||||
| 		FilerAddress:       filer, | ||||
| 		FilerGrpcAddress:   filerGrpcAddress, | ||||
| 		FilerAddresses:     filers, | ||||
| 		FilerGrpcAddresses: filerGrpcAddresses, | ||||
| 		GrpcDialOption:     grpcDialOption, | ||||
| 		FilerMountRootPath: mountRoot, | ||||
| 		Collection:         *option.collection, | ||||
| @@ -218,7 +218,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { | ||||
| 		c.Close() | ||||
| 	}) | ||||
|  | ||||
| 	glog.V(0).Infof("mounted %s%s to %s", filer, mountRoot, dir) | ||||
| 	glog.V(0).Infof("mounted %s%s to %v", *option.filer, mountRoot, dir) | ||||
| 	server := fs.New(c, nil) | ||||
| 	seaweedFileSystem.Server = server | ||||
| 	err = server.Serve(seaweedFileSystem) | ||||
|   | ||||
| @@ -101,6 +101,11 @@ dir = "./filerldb3"					# directory to store level db files | ||||
| enabled = false | ||||
| dir = "./filerrdb"					# directory to store rocksdb files | ||||
|  | ||||
| [sqlite] | ||||
| # local on disk, similar to leveldb | ||||
| enabled = false | ||||
| dbFile = "./filer.db"				# sqlite db file | ||||
|  | ||||
| [mysql]  # or memsql, tidb | ||||
| # CREATE TABLE IF NOT EXISTS filemeta ( | ||||
| #   dirhash     BIGINT               COMMENT 'first 64 bits of MD5 hash value of directory field', | ||||
|   | ||||
| @@ -71,6 +71,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string | ||||
|  | ||||
| 	// when filer store is not shared by multiple filers | ||||
| 	if peerSignature != f.Signature { | ||||
| 		lastTsNs = 0 | ||||
| 		if prevTsNs, err := ma.readOffset(f, peer, peerSignature); err == nil { | ||||
| 			lastTsNs = prevTsNs | ||||
| 		} | ||||
|   | ||||
							
								
								
									
										74
									
								
								weed/filer/sqlite/sqlite_store.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										74
									
								
								weed/filer/sqlite/sqlite_store.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,74 @@ | ||||
| package sqlite | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"database/sql" | ||||
| 	"fmt" | ||||
|  | ||||
| 	"github.com/chrislusf/seaweedfs/weed/filer" | ||||
| 	"github.com/chrislusf/seaweedfs/weed/filer/abstract_sql" | ||||
| 	"github.com/chrislusf/seaweedfs/weed/filer/mysql" | ||||
| 	"github.com/chrislusf/seaweedfs/weed/util" | ||||
| 	_ "modernc.org/sqlite" | ||||
| ) | ||||
|  | ||||
| func init() { | ||||
| 	filer.Stores = append(filer.Stores, &SqliteStore{}) | ||||
| } | ||||
|  | ||||
| type SqliteStore struct { | ||||
| 	abstract_sql.AbstractSqlStore | ||||
| } | ||||
|  | ||||
| func (store *SqliteStore) GetName() string { | ||||
| 	return "sqlite" | ||||
| } | ||||
|  | ||||
| func (store *SqliteStore) Initialize(configuration util.Configuration, prefix string) (err error) { | ||||
| 	dbFile := configuration.GetString(prefix + "dbFile") | ||||
| 	createTable := `CREATE TABLE IF NOT EXISTS "%s" ( | ||||
| 		dirhash BIGINT, | ||||
| 		name VARCHAR(1000), | ||||
| 		directory TEXT, | ||||
| 		meta BLOB, | ||||
| 		PRIMARY KEY (dirhash, name) | ||||
| 	) WITHOUT ROWID;` | ||||
| 	upsertQuery := `INSERT INTO "%s"(dirhash,name,directory,meta)VALUES(?,?,?,?) | ||||
| 	ON CONFLICT(dirhash,name) DO UPDATE SET | ||||
| 		directory=excluded.directory, | ||||
| 		meta=excluded.meta; | ||||
| 	` | ||||
| 	return store.initialize( | ||||
| 		dbFile, | ||||
| 		createTable, | ||||
| 		upsertQuery, | ||||
| 	) | ||||
| } | ||||
|  | ||||
| func (store *SqliteStore) initialize(dbFile, createTable, upsertQuery string) (err error) { | ||||
|  | ||||
| 	store.SupportBucketTable = true | ||||
| 	store.SqlGenerator = &mysql.SqlGenMysql{ | ||||
| 		CreateTableSqlTemplate: createTable, | ||||
| 		DropTableSqlTemplate:   "drop table `%s`", | ||||
| 		UpsertQueryTemplate:    upsertQuery, | ||||
| 	} | ||||
|  | ||||
| 	var dbErr error | ||||
| 	store.DB, dbErr = sql.Open("sqlite", dbFile) | ||||
| 	if dbErr != nil { | ||||
| 		store.DB.Close() | ||||
| 		store.DB = nil | ||||
| 		return fmt.Errorf("can not connect to %s error:%v", dbFile, err) | ||||
| 	} | ||||
|  | ||||
| 	if err = store.DB.Ping(); err != nil { | ||||
| 		return fmt.Errorf("connect to %s error:%v", dbFile, err) | ||||
| 	} | ||||
|  | ||||
| 	if err = store.CreateTable(context.Background(), abstract_sql.DEFAULT_TABLE); err != nil { | ||||
| 		return fmt.Errorf("init table %s: %v", abstract_sql.DEFAULT_TABLE, err) | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
| @@ -3,13 +3,16 @@ package filer | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"fmt" | ||||
| 	"github.com/chrislusf/seaweedfs/weed/glog" | ||||
| 	"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" | ||||
| 	"github.com/chrislusf/seaweedfs/weed/util" | ||||
| 	"github.com/chrislusf/seaweedfs/weed/wdclient" | ||||
| 	"io" | ||||
| 	"math" | ||||
| 	"strings" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/chrislusf/seaweedfs/weed/glog" | ||||
| 	"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" | ||||
| 	"github.com/chrislusf/seaweedfs/weed/stats" | ||||
| 	"github.com/chrislusf/seaweedfs/weed/util" | ||||
| 	"github.com/chrislusf/seaweedfs/weed/wdclient" | ||||
| ) | ||||
|  | ||||
| func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { | ||||
| @@ -35,15 +38,20 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, c | ||||
| 	for _, chunkView := range chunkViews { | ||||
|  | ||||
| 		urlStrings := fileId2Url[chunkView.FileId] | ||||
| 		start := time.Now() | ||||
| 		data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size)) | ||||
| 		stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds()) | ||||
| 		if err != nil { | ||||
| 			stats.FilerRequestCounter.WithLabelValues("chunkDownloadError").Inc() | ||||
| 			return fmt.Errorf("read chunk: %v", err) | ||||
| 		} | ||||
|  | ||||
| 		_, err = w.Write(data) | ||||
| 		if err != nil { | ||||
| 			stats.FilerRequestCounter.WithLabelValues("chunkDownloadedError").Inc() | ||||
| 			return fmt.Errorf("write chunk: %v", err) | ||||
| 		} | ||||
| 		stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc() | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
|   | ||||
| @@ -6,7 +6,6 @@ import ( | ||||
| 	"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" | ||||
| 	"io" | ||||
| 	"os" | ||||
| 	"path/filepath" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| ) | ||||
| @@ -24,14 +23,6 @@ type TempFileDirtyPages struct { | ||||
| 	replication      string | ||||
| } | ||||
|  | ||||
| var ( | ||||
| 	tmpDir = filepath.Join(os.TempDir(), "sw") | ||||
| ) | ||||
|  | ||||
| func init() { | ||||
| 	os.Mkdir(tmpDir, 0755) | ||||
| } | ||||
|  | ||||
| func newTempFileDirtyPages(file *File, writeOnly bool) *TempFileDirtyPages { | ||||
|  | ||||
| 	tempFile := &TempFileDirtyPages{ | ||||
| @@ -49,7 +40,7 @@ func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) { | ||||
| 	defer pages.pageAddLock.Unlock() | ||||
|  | ||||
| 	if pages.tf == nil { | ||||
| 		tf, err := os.CreateTemp(tmpDir, "") | ||||
| 		tf, err := os.CreateTemp(pages.f.wfs.option.getTempFilePageDir(), "") | ||||
| 		if err != nil { | ||||
| 			glog.Errorf("create temp file: %v", err) | ||||
| 			pages.lastErr = err | ||||
|   | ||||
| @@ -7,8 +7,10 @@ import ( | ||||
| 	"github.com/chrislusf/seaweedfs/weed/storage/types" | ||||
| 	"github.com/chrislusf/seaweedfs/weed/wdclient" | ||||
| 	"math" | ||||
| 	"math/rand" | ||||
| 	"os" | ||||
| 	"path" | ||||
| 	"path/filepath" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| @@ -28,8 +30,9 @@ import ( | ||||
|  | ||||
| type Option struct { | ||||
| 	MountDirectory     string | ||||
| 	FilerAddress       string | ||||
| 	FilerGrpcAddress   string | ||||
| 	FilerAddresses     []string | ||||
| 	filerIndex         int | ||||
| 	FilerGrpcAddresses []string | ||||
| 	GrpcDialOption     grpc.DialOption | ||||
| 	FilerMountRootPath string | ||||
| 	Collection         string | ||||
| @@ -52,6 +55,9 @@ type Option struct { | ||||
| 	VolumeServerAccess string // how to access volume servers | ||||
| 	Cipher             bool   // whether encrypt data on volume server | ||||
| 	UidGidMapper       *meta_cache.UidGidMapper | ||||
|  | ||||
| 	uniqueCacheDir         string | ||||
| 	uniqueCacheTempPageDir string | ||||
| } | ||||
|  | ||||
| var _ = fs.FS(&WFS{}) | ||||
| @@ -95,14 +101,13 @@ func NewSeaweedFileSystem(option *Option) *WFS { | ||||
| 		}, | ||||
| 		signature: util.RandomInt32(), | ||||
| 	} | ||||
| 	cacheUniqueId := util.Md5String([]byte(option.MountDirectory + option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:8] | ||||
| 	cacheDir := path.Join(option.CacheDir, cacheUniqueId) | ||||
| 	wfs.option.filerIndex = rand.Intn(len(option.FilerAddresses)) | ||||
| 	wfs.option.setupUniqueCacheDirectory() | ||||
| 	if option.CacheSizeMB > 0 { | ||||
| 		os.MkdirAll(cacheDir, os.FileMode(0777)&^option.Umask) | ||||
| 		wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024) | ||||
| 		wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDir(), option.CacheSizeMB, 1024*1024) | ||||
| 	} | ||||
|  | ||||
| 	wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) { | ||||
| 	wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) { | ||||
|  | ||||
| 		fsNode := NodeWithId(filePath.AsInode()) | ||||
| 		if err := wfs.Server.InvalidateNodeData(fsNode); err != nil { | ||||
| @@ -259,11 +264,27 @@ func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) { | ||||
| func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType { | ||||
| 	if wfs.option.VolumeServerAccess == "filerProxy" { | ||||
| 		return func(fileId string) (targetUrls []string, err error) { | ||||
| 			return []string{"http://" + wfs.option.FilerAddress + "/?proxyChunkId=" + fileId}, nil | ||||
| 			return []string{"http://" + wfs.getCurrentFiler() + "/?proxyChunkId=" + fileId}, nil | ||||
| 		} | ||||
| 	} | ||||
| 	return filer.LookupFn(wfs) | ||||
| } | ||||
| func (wfs *WFS) getCurrentFiler() string { | ||||
| 	return wfs.option.FilerAddresses[wfs.option.filerIndex] | ||||
| } | ||||
|  | ||||
| func (option *Option) setupUniqueCacheDirectory() { | ||||
| 	cacheUniqueId := util.Md5String([]byte(option.MountDirectory + option.FilerGrpcAddresses[0] + option.FilerMountRootPath + util.Version()))[0:8] | ||||
| 	option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId) | ||||
| 	option.uniqueCacheTempPageDir = filepath.Join(option.uniqueCacheDir, "sw") | ||||
| 	os.MkdirAll(option.uniqueCacheTempPageDir, os.FileMode(0777)&^option.Umask) | ||||
| } | ||||
|  | ||||
| func (option *Option) getTempFilePageDir() string { | ||||
| 	return option.uniqueCacheTempPageDir | ||||
| } | ||||
| func (option *Option) getUniqueCacheDir() string { | ||||
| 	return option.uniqueCacheDir | ||||
| } | ||||
|  | ||||
| type NodeWithId uint64 | ||||
|   | ||||
| @@ -1,6 +1,7 @@ | ||||
| package filesys | ||||
|  | ||||
| import ( | ||||
| 	"github.com/chrislusf/seaweedfs/weed/glog" | ||||
| 	"github.com/chrislusf/seaweedfs/weed/util" | ||||
| 	"google.golang.org/grpc" | ||||
|  | ||||
| @@ -10,20 +11,36 @@ import ( | ||||
|  | ||||
| var _ = filer_pb.FilerClient(&WFS{}) | ||||
|  | ||||
| func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { | ||||
| func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) { | ||||
|  | ||||
| 	err := util.Retry("filer grpc "+wfs.option.FilerGrpcAddress, func() error { | ||||
| 		return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { | ||||
| 			client := filer_pb.NewSeaweedFilerClient(grpcConnection) | ||||
| 			return fn(client) | ||||
| 		}, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption) | ||||
| 	return util.Retry("filer grpc", func() error { | ||||
|  | ||||
| 		i := wfs.option.filerIndex | ||||
| 		n := len(wfs.option.FilerGrpcAddresses) | ||||
| 		for x := 0; x < n; x++ { | ||||
|  | ||||
| 			filerGrpcAddress := wfs.option.FilerGrpcAddresses[i] | ||||
| 			err = pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { | ||||
| 				client := filer_pb.NewSeaweedFilerClient(grpcConnection) | ||||
| 				return fn(client) | ||||
| 			}, filerGrpcAddress, wfs.option.GrpcDialOption) | ||||
|  | ||||
| 			if err != nil { | ||||
| 				glog.V(0).Infof("WithFilerClient %d %v: %v", x, filerGrpcAddress, err) | ||||
| 			} else { | ||||
| 				wfs.option.filerIndex = i | ||||
| 				return nil | ||||
| 			} | ||||
|  | ||||
| 			i++ | ||||
| 			if i >= n { | ||||
| 				i = 0 | ||||
| 			} | ||||
|  | ||||
| 		} | ||||
| 		return err | ||||
| 	}) | ||||
|  | ||||
| 	if err == nil { | ||||
| 		return nil | ||||
| 	} | ||||
| 	return err | ||||
|  | ||||
| } | ||||
|  | ||||
| func (wfs *WFS) AdjustedUrl(location *filer_pb.Location) string { | ||||
|   | ||||
| @@ -56,7 +56,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath, writeOnly bool) filer.Sa | ||||
|  | ||||
| 		fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) | ||||
| 		if wfs.option.VolumeServerAccess == "filerProxy" { | ||||
| 			fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.option.FilerAddress, fileId) | ||||
| 			fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.getCurrentFiler(), fileId) | ||||
| 		} | ||||
| 		uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth) | ||||
| 		if err != nil { | ||||
|   | ||||
| @@ -4,6 +4,7 @@ import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"github.com/chrislusf/seaweedfs/weed/glog" | ||||
| 	"math/rand" | ||||
| 	"net/http" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| @@ -24,10 +25,15 @@ const ( | ||||
|  | ||||
| var ( | ||||
| 	// cache grpc connections | ||||
| 	grpcClients     = make(map[string]*grpc.ClientConn) | ||||
| 	grpcClients     = make(map[string]*versionedGrpcClient) | ||||
| 	grpcClientsLock sync.Mutex | ||||
| ) | ||||
|  | ||||
| type versionedGrpcClient struct { | ||||
| 	*grpc.ClientConn | ||||
| 	version int | ||||
| } | ||||
|  | ||||
| func init() { | ||||
| 	http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 1024 | ||||
| 	http.DefaultTransport.(*http.Transport).MaxIdleConns = 1024 | ||||
| @@ -79,7 +85,7 @@ func GrpcDial(ctx context.Context, address string, opts ...grpc.DialOption) (*gr | ||||
| 	return grpc.DialContext(ctx, address, options...) | ||||
| } | ||||
|  | ||||
| func getOrCreateConnection(address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { | ||||
| func getOrCreateConnection(address string, opts ...grpc.DialOption) (*versionedGrpcClient, error) { | ||||
|  | ||||
| 	grpcClientsLock.Lock() | ||||
| 	defer grpcClientsLock.Unlock() | ||||
| @@ -94,23 +100,49 @@ func getOrCreateConnection(address string, opts ...grpc.DialOption) (*grpc.Clien | ||||
| 		return nil, fmt.Errorf("fail to dial %s: %v", address, err) | ||||
| 	} | ||||
|  | ||||
| 	grpcClients[address] = grpcConnection | ||||
| 	vgc := &versionedGrpcClient{ | ||||
| 		grpcConnection, | ||||
| 		rand.Int(), | ||||
| 	} | ||||
| 	grpcClients[address] = vgc | ||||
|  | ||||
| 	return grpcConnection, nil | ||||
| 	return vgc, nil | ||||
| } | ||||
|  | ||||
| func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error { | ||||
|  | ||||
| 	grpcConnection, err := getOrCreateConnection(address, opts...) | ||||
| 	vgc, err := getOrCreateConnection(address, opts...) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("getOrCreateConnection %s: %v", address, err) | ||||
| 	} | ||||
| 	return fn(grpcConnection) | ||||
| 	executionErr := fn(vgc.ClientConn) | ||||
| 	if executionErr != nil && strings.Contains(executionErr.Error(), "transport") { | ||||
| 		grpcClientsLock.Lock() | ||||
| 		if t, ok := grpcClients[address]; ok { | ||||
| 			if t.version == vgc.version { | ||||
| 				vgc.Close() | ||||
| 				delete(grpcClients, address) | ||||
| 			} | ||||
| 		} | ||||
| 		grpcClientsLock.Unlock() | ||||
| 	} | ||||
|  | ||||
| 	return executionErr | ||||
| } | ||||
|  | ||||
| func ParseServerToGrpcAddress(server string) (serverGrpcAddress string, err error) { | ||||
| 	return ParseServerAddress(server, 10000) | ||||
| } | ||||
| func ParseServersToGrpcAddresses(servers []string) (serverGrpcAddresses []string, err error) { | ||||
| 	for _, server := range servers { | ||||
| 		if serverGrpcAddress, parseErr := ParseServerToGrpcAddress(server); parseErr == nil { | ||||
| 			serverGrpcAddresses = append(serverGrpcAddresses, serverGrpcAddress) | ||||
| 		} else { | ||||
| 			return nil, parseErr | ||||
| 		} | ||||
| 	} | ||||
| 	return | ||||
| } | ||||
|  | ||||
| func ParseServerAddress(server string, deltaPort int) (newServerAddress string, err error) { | ||||
|  | ||||
| @@ -202,3 +234,18 @@ func WithGrpcFilerClient(filerGrpcAddress string, grpcDialOption grpc.DialOption | ||||
| 	}, filerGrpcAddress, grpcDialOption) | ||||
|  | ||||
| } | ||||
|  | ||||
| func WithOneOfGrpcFilerClients(filerGrpcAddresses []string, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) (err error) { | ||||
|  | ||||
| 	for _, filerGrpcAddress := range filerGrpcAddresses { | ||||
| 		err = WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { | ||||
| 			client := filer_pb.NewSeaweedFilerClient(grpcConnection) | ||||
| 			return fn(client) | ||||
| 		}, filerGrpcAddress, grpcDialOption) | ||||
| 		if err == nil { | ||||
| 			return nil | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return err | ||||
| } | ||||
|   | ||||
| @@ -62,6 +62,12 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) | ||||
| 			writeErrorResponse(w, s3ErrCode, r.URL) | ||||
| 			return | ||||
| 		} | ||||
| 	} else { | ||||
| 		rAuthType := getRequestAuthType(r) | ||||
| 		if authTypeAnonymous != rAuthType { | ||||
| 			writeErrorResponse(w, s3err.ErrAuthNotSetup, r.URL) | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| 	defer dataReader.Close() | ||||
|  | ||||
|   | ||||
| @@ -91,6 +91,7 @@ const ( | ||||
| 	ErrRequestNotReadyYet | ||||
| 	ErrMissingDateHeader | ||||
| 	ErrInvalidRequest | ||||
| 	ErrAuthNotSetup | ||||
| 	ErrNotImplemented | ||||
|  | ||||
| 	ErrExistingObjectIsDirectory | ||||
| @@ -341,6 +342,11 @@ var errorCodeResponse = map[ErrorCode]APIError{ | ||||
| 		Description:    "Invalid Request", | ||||
| 		HTTPStatusCode: http.StatusBadRequest, | ||||
| 	}, | ||||
| 	ErrAuthNotSetup : { | ||||
| 		Code:           "InvalidRequest", | ||||
| 		Description:    "Signed request requires setting up SeaweedFS S3 authentication", | ||||
| 		HTTPStatusCode: http.StatusBadRequest, | ||||
| 	}, | ||||
| 	ErrNotImplemented: { | ||||
| 		Code:           "NotImplemented", | ||||
| 		Description:    "A header you provided implies functionality that is not implemented", | ||||
|   | ||||
| @@ -30,6 +30,7 @@ import ( | ||||
| 	_ "github.com/chrislusf/seaweedfs/weed/filer/mongodb" | ||||
| 	_ "github.com/chrislusf/seaweedfs/weed/filer/mysql" | ||||
| 	_ "github.com/chrislusf/seaweedfs/weed/filer/mysql2" | ||||
| 	_ "github.com/chrislusf/seaweedfs/weed/filer/sqlite" | ||||
| 	_ "github.com/chrislusf/seaweedfs/weed/filer/postgres" | ||||
| 	_ "github.com/chrislusf/seaweedfs/weed/filer/postgres2" | ||||
| 	_ "github.com/chrislusf/seaweedfs/weed/filer/redis" | ||||
| @@ -128,7 +129,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) | ||||
| 	fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder") | ||||
| 	// TODO deprecated, will be be removed after 2020-12-31 | ||||
| 	// replaced by https://github.com/chrislusf/seaweedfs/wiki/Path-Specific-Configuration | ||||
| 	fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync") | ||||
| 	// fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync") | ||||
| 	fs.filer.LoadConfiguration(v) | ||||
|  | ||||
| 	notification.LoadConfiguration(v, "notification.") | ||||
|   | ||||
| @@ -38,7 +38,7 @@ func (c *commandFsMetaSave) Help() string { | ||||
| 	fs.meta.save                 # save from current directory | ||||
|  | ||||
| 	The meta data will be saved into a local <filer_host>-<port>-<time>.meta file. | ||||
| 	These meta data can be later loaded by fs.meta.load command,  | ||||
| 	These meta data can be later loaded by fs.meta.load command | ||||
|  | ||||
| ` | ||||
| } | ||||
|   | ||||
| @@ -46,6 +46,9 @@ func (c *commandS3BucketList) Do(args []string, commandEnv *CommandEnv, writer i | ||||
| 	} | ||||
|  | ||||
| 	err = filer_pb.List(commandEnv, filerBucketsPath, "", func(entry *filer_pb.Entry, isLast bool) error { | ||||
| 		if !entry.IsDirectory { | ||||
| 			return nil | ||||
| 		} | ||||
| 		if entry.Attributes.Replication == "" || entry.Attributes.Replication == "000" { | ||||
| 			fmt.Fprintf(writer, "  %s\n", entry.Name) | ||||
| 		} else { | ||||
|   | ||||
| @@ -5,7 +5,7 @@ import ( | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| 	VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 48) | ||||
| 	VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 49) | ||||
| 	COMMIT  = "" | ||||
| ) | ||||
|  | ||||
|   | ||||
| @@ -36,6 +36,7 @@ func NewMasterClient(grpcDialOption grpc.DialOption, clientType string, clientHo | ||||
| } | ||||
|  | ||||
| func (mc *MasterClient) GetMaster() string { | ||||
| 	mc.WaitUntilConnected() | ||||
| 	return mc.currentMaster | ||||
| } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Konstantin Lebedev
					Konstantin Lebedev