mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-09-19 22:57:56 +08:00
master load backend storage config from master.toml
This commit is contained in:
@@ -10,36 +10,77 @@ import (
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3iface"
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/backend"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
var (
|
||||
_ backend.DataStorageBackend = &S3Backend{}
|
||||
)
|
||||
|
||||
func init() {
|
||||
backend.StorageBackends = append(backend.StorageBackends, &S3Backend{})
|
||||
backend.BackendStorageFactories["s3"] = &S3BackendFactory{}
|
||||
}
|
||||
|
||||
type S3Backend struct {
|
||||
type S3BackendFactory struct {
|
||||
}
|
||||
|
||||
func (factory *S3BackendFactory) StorageType() backend.StorageType {
|
||||
return backend.StorageType("s3")
|
||||
}
|
||||
func (factory *S3BackendFactory) BuildStorage(configuration util.Configuration, id string) (backend.BackendStorage, error) {
|
||||
return newS3BackendStorage(configuration, id)
|
||||
}
|
||||
|
||||
type S3BackendStorage struct {
|
||||
id string
|
||||
conn s3iface.S3API
|
||||
region string
|
||||
bucket string
|
||||
vid needle.VolumeId
|
||||
key string
|
||||
}
|
||||
|
||||
func (s3backend S3Backend) ReadAt(p []byte, off int64) (n int, err error) {
|
||||
func newS3BackendStorage(configuration util.Configuration, id string) (s *S3BackendStorage, err error) {
|
||||
s = &S3BackendStorage{}
|
||||
s.id = id
|
||||
s.conn, err = createSession(
|
||||
configuration.GetString("aws_access_key_id"),
|
||||
configuration.GetString("aws_secret_access_key"),
|
||||
configuration.GetString("region"))
|
||||
s.region = configuration.GetString("region")
|
||||
s.bucket = configuration.GetString("bucket")
|
||||
|
||||
glog.V(0).Infof("created s3 backend storage %s for region %s bucket %s", s.Name(), s.region, s.bucket)
|
||||
return
|
||||
}
|
||||
|
||||
func (s *S3BackendStorage) Name() string {
|
||||
return "s3." + s.id
|
||||
}
|
||||
|
||||
func (s *S3BackendStorage) NewStorageFile(key string) backend.BackendStorageFile {
|
||||
if strings.HasPrefix(key, "/") {
|
||||
key = key[1:]
|
||||
}
|
||||
|
||||
f := &S3BackendStorageFile{
|
||||
backendStorage: s,
|
||||
key: key,
|
||||
}
|
||||
|
||||
return f
|
||||
}
|
||||
|
||||
type S3BackendStorageFile struct {
|
||||
backendStorage *S3BackendStorage
|
||||
key string
|
||||
}
|
||||
|
||||
func (s3backendStorageFile S3BackendStorageFile) ReadAt(p []byte, off int64) (n int, err error) {
|
||||
bytesRange := fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1)
|
||||
getObjectOutput, getObjectErr := s3backend.conn.GetObject(&s3.GetObjectInput{
|
||||
Bucket: &s3backend.bucket,
|
||||
Key: &s3backend.key,
|
||||
getObjectOutput, getObjectErr := s3backendStorageFile.backendStorage.conn.GetObject(&s3.GetObjectInput{
|
||||
Bucket: &s3backendStorageFile.backendStorage.bucket,
|
||||
Key: &s3backendStorageFile.key,
|
||||
Range: &bytesRange,
|
||||
})
|
||||
|
||||
if getObjectErr != nil {
|
||||
return 0, fmt.Errorf("bucket %s GetObject %s: %v", s3backend.bucket, s3backend.key, getObjectErr)
|
||||
return 0, fmt.Errorf("bucket %s GetObject %s: %v",
|
||||
s3backendStorageFile.backendStorage.bucket, s3backendStorageFile.key, getObjectErr)
|
||||
}
|
||||
defer getObjectOutput.Body.Close()
|
||||
|
||||
@@ -47,27 +88,28 @@ func (s3backend S3Backend) ReadAt(p []byte, off int64) (n int, err error) {
|
||||
|
||||
}
|
||||
|
||||
func (s3backend S3Backend) WriteAt(p []byte, off int64) (n int, err error) {
|
||||
func (s3backendStorageFile S3BackendStorageFile) WriteAt(p []byte, off int64) (n int, err error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s3backend S3Backend) Truncate(off int64) error {
|
||||
func (s3backendStorageFile S3BackendStorageFile) Truncate(off int64) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s3backend S3Backend) Close() error {
|
||||
func (s3backendStorageFile S3BackendStorageFile) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s3backend S3Backend) GetStat() (datSize int64, modTime time.Time, err error) {
|
||||
func (s3backendStorageFile S3BackendStorageFile) GetStat() (datSize int64, modTime time.Time, err error) {
|
||||
|
||||
headObjectOutput, headObjectErr := s3backend.conn.HeadObject(&s3.HeadObjectInput{
|
||||
Bucket: &s3backend.bucket,
|
||||
Key: &s3backend.key,
|
||||
headObjectOutput, headObjectErr := s3backendStorageFile.backendStorage.conn.HeadObject(&s3.HeadObjectInput{
|
||||
Bucket: &s3backendStorageFile.backendStorage.bucket,
|
||||
Key: &s3backendStorageFile.key,
|
||||
})
|
||||
|
||||
if headObjectErr != nil {
|
||||
return 0, time.Now(), fmt.Errorf("bucket %s HeadObject %s: %v", s3backend.bucket, s3backend.key, headObjectErr)
|
||||
return 0, time.Now(), fmt.Errorf("bucket %s HeadObject %s: %v",
|
||||
s3backendStorageFile.backendStorage.bucket, s3backendStorageFile.key, headObjectErr)
|
||||
}
|
||||
|
||||
datSize = int64(*headObjectOutput.ContentLength)
|
||||
@@ -76,44 +118,14 @@ func (s3backend S3Backend) GetStat() (datSize int64, modTime time.Time, err erro
|
||||
return
|
||||
}
|
||||
|
||||
func (s3backend S3Backend) String() string {
|
||||
return fmt.Sprintf("%s/%s", s3backend.bucket, s3backend.key)
|
||||
func (s3backendStorageFile S3BackendStorageFile) String() string {
|
||||
return s3backendStorageFile.key
|
||||
}
|
||||
|
||||
func (s3backend *S3Backend) GetName() string {
|
||||
func (s3backendStorageFile *S3BackendStorageFile) GetName() string {
|
||||
return "s3"
|
||||
}
|
||||
|
||||
func (s3backend S3Backend) Instantiate(src *os.File) error {
|
||||
func (s3backendStorageFile S3BackendStorageFile) Instantiate(src *os.File) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s3backend *S3Backend) Initialize(configuration util.Configuration, prefix string, vid needle.VolumeId) error {
|
||||
glog.V(0).Infof("storage.backend.s3.region: %v", configuration.GetString("region"))
|
||||
glog.V(0).Infof("storage.backend.s3.bucket: %v", configuration.GetString("bucket"))
|
||||
glog.V(0).Infof("storage.backend.s3.directory: %v", configuration.GetString("directory"))
|
||||
|
||||
return s3backend.initialize(
|
||||
configuration.GetString("aws_access_key_id"),
|
||||
configuration.GetString("aws_secret_access_key"),
|
||||
configuration.GetString("region"),
|
||||
configuration.GetString("bucket"),
|
||||
prefix,
|
||||
vid,
|
||||
)
|
||||
}
|
||||
|
||||
func (s3backend *S3Backend) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket string,
|
||||
prefix string, vid needle.VolumeId) (err error) {
|
||||
s3backend.region = region
|
||||
s3backend.bucket = bucket
|
||||
s3backend.conn, err = createSession(awsAccessKeyId, awsSecretAccessKey, region)
|
||||
|
||||
s3backend.vid = vid
|
||||
s3backend.key = fmt.Sprintf("%s_%d.dat", prefix, vid)
|
||||
if strings.HasPrefix(s3backend.key, "/") {
|
||||
s3backend.key = s3backend.key[1:]
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
Reference in New Issue
Block a user