mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-10-22 00:07:24 +08:00
Merge branch 'master' of https://github.com/seaweedfs/seaweedfs
This commit is contained in:
@@ -32,12 +32,15 @@ var cmdFix = &Command{
|
||||
var (
|
||||
fixVolumeCollection = cmdFix.Flag.String("collection", "", "an optional volume collection name, if specified only it will be processed")
|
||||
fixVolumeId = cmdFix.Flag.Int64("volumeId", 0, "an optional volume id, if not 0 (default) only it will be processed")
|
||||
fixIncludeDeleted = cmdFix.Flag.Bool("includeDeleted", true, "include deleted entries in the index file")
|
||||
fixIgnoreError = cmdFix.Flag.Bool("ignoreError", false, "an optional, if true will be processed despite errors")
|
||||
)
|
||||
|
||||
type VolumeFileScanner4Fix struct {
|
||||
version needle.Version
|
||||
nm *needle_map.MemDb
|
||||
version needle.Version
|
||||
nm *needle_map.MemDb
|
||||
nmDeleted *needle_map.MemDb
|
||||
includeDeleted bool
|
||||
}
|
||||
|
||||
func (scanner *VolumeFileScanner4Fix) VisitSuperBlock(superBlock super_block.SuperBlock) error {
|
||||
@@ -50,13 +53,20 @@ func (scanner *VolumeFileScanner4Fix) ReadNeedleBody() bool {
|
||||
}
|
||||
|
||||
func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
|
||||
glog.V(2).Infof("key %d offset %d size %d disk_size %d compressed %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsCompressed())
|
||||
glog.V(2).Infof("key %v offset %d size %d disk_size %d compressed %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsCompressed())
|
||||
if n.Size.IsValid() {
|
||||
pe := scanner.nm.Set(n.Id, types.ToOffset(offset), n.Size)
|
||||
glog.V(2).Infof("saved %d with error %v", n.Size, pe)
|
||||
if pe := scanner.nm.Set(n.Id, types.ToOffset(offset), n.Size); pe != nil {
|
||||
return fmt.Errorf("saved %d with error %v", n.Size, pe)
|
||||
}
|
||||
} else {
|
||||
glog.V(2).Infof("skipping deleted file ...")
|
||||
return scanner.nm.Delete(n.Id)
|
||||
if scanner.includeDeleted {
|
||||
if pe := scanner.nmDeleted.Set(n.Id, types.ToOffset(offset), types.TombstoneFileSize); pe != nil {
|
||||
return fmt.Errorf("saved deleted %d with error %v", n.Size, pe)
|
||||
}
|
||||
} else {
|
||||
glog.V(2).Infof("skipping deleted file ...")
|
||||
return scanner.nm.Delete(n.Id)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -109,21 +119,45 @@ func runFix(cmd *Command, args []string) bool {
|
||||
if *fixVolumeId != 0 && *fixVolumeId != volumeId {
|
||||
continue
|
||||
}
|
||||
doFixOneVolume(basePath, baseFileName, collection, volumeId)
|
||||
doFixOneVolume(basePath, baseFileName, collection, volumeId, *fixIncludeDeleted)
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func doFixOneVolume(basepath string, baseFileName string, collection string, volumeId int64) {
|
||||
func SaveToIdx(scaner *VolumeFileScanner4Fix, idxName string) (ret error) {
|
||||
idxFile, err := os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
idxFile.Close()
|
||||
}()
|
||||
|
||||
return scaner.nm.AscendingVisit(func(value needle_map.NeedleValue) error {
|
||||
_, err := idxFile.Write(value.ToBytes())
|
||||
if scaner.includeDeleted && err == nil {
|
||||
if deleted, ok := scaner.nmDeleted.Get(value.Key); ok {
|
||||
_, err = idxFile.Write(deleted.ToBytes())
|
||||
}
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
func doFixOneVolume(basepath string, baseFileName string, collection string, volumeId int64, fixIncludeDeleted bool) {
|
||||
indexFileName := path.Join(basepath, baseFileName+".idx")
|
||||
|
||||
nm := needle_map.NewMemDb()
|
||||
nmDeleted := needle_map.NewMemDb()
|
||||
defer nm.Close()
|
||||
defer nmDeleted.Close()
|
||||
|
||||
vid := needle.VolumeId(volumeId)
|
||||
scanner := &VolumeFileScanner4Fix{
|
||||
nm: nm,
|
||||
nm: nm,
|
||||
nmDeleted: nmDeleted,
|
||||
includeDeleted: fixIncludeDeleted,
|
||||
}
|
||||
|
||||
if err := storage.ScanVolumeFile(basepath, collection, vid, storage.NeedleMapInMemory, scanner); err != nil {
|
||||
@@ -135,12 +169,12 @@ func doFixOneVolume(basepath string, baseFileName string, collection string, vol
|
||||
}
|
||||
}
|
||||
|
||||
if err := nm.SaveToIdx(indexFileName); err != nil {
|
||||
os.Remove(indexFileName)
|
||||
if err := SaveToIdx(scanner, indexFileName); err != nil {
|
||||
err := fmt.Errorf("save to .idx File: %v", err)
|
||||
if *fixIgnoreError {
|
||||
glog.Error(err)
|
||||
} else {
|
||||
os.Remove(indexFileName)
|
||||
glog.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
@@ -280,6 +280,13 @@ tls_client_key_file=""
|
||||
[mongodb]
|
||||
enabled = false
|
||||
uri = "mongodb://localhost:27017"
|
||||
username = ""
|
||||
password = ""
|
||||
ssl = false
|
||||
ssl_ca_file = ""
|
||||
ssl_cert_file = ""
|
||||
ssl_key_file = "
|
||||
insecure_skip_verify = false
|
||||
option_pool_size = 0
|
||||
database = "seaweedfs"
|
||||
|
||||
|
@@ -2,7 +2,12 @@ package mongodb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
@@ -10,7 +15,6 @@ import (
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"time"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -37,17 +41,44 @@ func (store *MongodbStore) Initialize(configuration util.Configuration, prefix s
|
||||
store.database = configuration.GetString(prefix + "database")
|
||||
store.collectionName = "filemeta"
|
||||
poolSize := configuration.GetInt(prefix + "option_pool_size")
|
||||
return store.connection(configuration.GetString(prefix+"uri"), uint64(poolSize))
|
||||
uri := configuration.GetString(prefix + "uri")
|
||||
ssl := configuration.GetBool(prefix + "ssl")
|
||||
sslCAFile := configuration.GetString(prefix + "ssl_ca_file")
|
||||
sslCertFile := configuration.GetString(prefix + "ssl_cert_file")
|
||||
sslKeyFile := configuration.GetString(prefix + "ssl_key_file")
|
||||
username := configuration.GetString(prefix + "username")
|
||||
password := configuration.GetString(prefix + "password")
|
||||
insecure_skip_verify := configuration.GetBool(prefix + "insecure_skip_verify")
|
||||
|
||||
return store.connection(uri, uint64(poolSize), ssl, sslCAFile, sslCertFile, sslKeyFile, username, password, insecure_skip_verify)
|
||||
}
|
||||
|
||||
func (store *MongodbStore) connection(uri string, poolSize uint64) (err error) {
|
||||
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
func (store *MongodbStore) connection(uri string, poolSize uint64, ssl bool, sslCAFile, sslCertFile, sslKeyFile string, username, password string, insecure bool) (err error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
opts := options.Client().ApplyURI(uri)
|
||||
|
||||
if poolSize > 0 {
|
||||
opts.SetMaxPoolSize(poolSize)
|
||||
}
|
||||
|
||||
if ssl {
|
||||
tlsConfig, err := configureTLS(sslCAFile, sslCertFile, sslKeyFile, insecure)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
opts.SetTLSConfig(tlsConfig)
|
||||
}
|
||||
|
||||
if username != "" && password != "" {
|
||||
creds := options.Credential{
|
||||
Username: username,
|
||||
Password: password,
|
||||
}
|
||||
opts.SetAuth(creds)
|
||||
}
|
||||
|
||||
client, err := mongo.Connect(ctx, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -55,10 +86,36 @@ func (store *MongodbStore) connection(uri string, poolSize uint64) (err error) {
|
||||
|
||||
c := client.Database(store.database).Collection(store.collectionName)
|
||||
err = store.indexUnique(c)
|
||||
|
||||
store.connect = client
|
||||
return err
|
||||
}
|
||||
|
||||
func configureTLS(caFile, certFile, keyFile string, insecure bool) (*tls.Config, error) {
|
||||
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not load client key pair: %s", err)
|
||||
}
|
||||
|
||||
caCert, err := os.ReadFile(caFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not read CA certificate: %s", err)
|
||||
}
|
||||
|
||||
caCertPool := x509.NewCertPool()
|
||||
if !caCertPool.AppendCertsFromPEM(caCert) {
|
||||
return nil, fmt.Errorf("failed to append CA certificate")
|
||||
}
|
||||
|
||||
tlsConfig := &tls.Config{
|
||||
Certificates: []tls.Certificate{cert},
|
||||
RootCAs: caCertPool,
|
||||
InsecureSkipVerify: insecure,
|
||||
}
|
||||
|
||||
return tlsConfig, nil
|
||||
}
|
||||
|
||||
func (store *MongodbStore) createIndex(c *mongo.Collection, index mongo.IndexModel, opts *options.CreateIndexesOptions) error {
|
||||
_, err := c.Indexes().CreateOne(context.Background(), index, opts)
|
||||
return err
|
||||
@@ -93,13 +150,10 @@ func (store *MongodbStore) RollbackTransaction(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (store *MongodbStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
|
||||
|
||||
return store.UpdateEntry(ctx, entry)
|
||||
|
||||
}
|
||||
|
||||
func (store *MongodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
|
||||
|
||||
dir, name := entry.FullPath.DirAndName()
|
||||
meta, err := entry.EncodeAttributesAndChunks()
|
||||
if err != nil {
|
||||
@@ -126,7 +180,6 @@ func (store *MongodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry)
|
||||
}
|
||||
|
||||
func (store *MongodbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
|
||||
|
||||
dir, name := fullpath.DirAndName()
|
||||
var data Model
|
||||
|
||||
@@ -154,7 +207,6 @@ func (store *MongodbStore) FindEntry(ctx context.Context, fullpath util.FullPath
|
||||
}
|
||||
|
||||
func (store *MongodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
|
||||
|
||||
dir, name := fullpath.DirAndName()
|
||||
|
||||
where := bson.M{"directory": dir, "name": name}
|
||||
@@ -167,7 +219,6 @@ func (store *MongodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPa
|
||||
}
|
||||
|
||||
func (store *MongodbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
|
||||
|
||||
where := bson.M{"directory": fullpath}
|
||||
_, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteMany(ctx, where)
|
||||
if err != nil {
|
||||
@@ -182,7 +233,6 @@ func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
|
||||
}
|
||||
|
||||
func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
|
||||
|
||||
var where = bson.M{"directory": string(dirPath), "name": bson.M{"$gt": startFileName}}
|
||||
if includeStartFile {
|
||||
where["name"] = bson.M{
|
||||
|
@@ -33,6 +33,7 @@ const (
|
||||
StatementActionReadAcp = "GetBucketAcl"
|
||||
StatementActionList = "List*"
|
||||
StatementActionTagging = "Tagging*"
|
||||
StatementActionDelete = "DeleteBucket*"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -58,6 +59,8 @@ func MapToStatementAction(action string) string {
|
||||
return s3_constants.ACTION_LIST
|
||||
case StatementActionTagging:
|
||||
return s3_constants.ACTION_TAGGING
|
||||
case StatementActionDelete:
|
||||
return s3_constants.ACTION_DELETE_BUCKET
|
||||
default:
|
||||
return ""
|
||||
}
|
||||
@@ -79,6 +82,8 @@ func MapToIdentitiesAction(action string) string {
|
||||
return StatementActionList
|
||||
case s3_constants.ACTION_TAGGING:
|
||||
return StatementActionTagging
|
||||
case s3_constants.ACTION_DELETE_BUCKET:
|
||||
return StatementActionDelete
|
||||
default:
|
||||
return ""
|
||||
}
|
||||
|
@@ -11,6 +11,7 @@ import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||
"strconv"
|
||||
"strings"
|
||||
"encoding/base64"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
@@ -202,7 +203,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures
|
||||
Tagging: aws.String(tags),
|
||||
}
|
||||
if len(entry.Attributes.Md5) > 0 {
|
||||
uploadInput.ContentMD5 = aws.String(fmt.Sprintf("%x", entry.Attributes.Md5))
|
||||
uploadInput.ContentMD5 = aws.String(base64.StdEncoding.EncodeToString([]byte(entry.Attributes.Md5)))
|
||||
}
|
||||
_, err = uploader.Upload(&uploadInput)
|
||||
|
||||
|
@@ -317,6 +317,7 @@ func (iam *IdentityAccessManagement) Auth(f http.HandlerFunc, action Action) htt
|
||||
}
|
||||
|
||||
identity, errCode := iam.authRequest(r, action)
|
||||
glog.V(3).Infof("auth error: %v", errCode)
|
||||
if errCode == s3err.ErrNone {
|
||||
if identity != nil && identity.Name != "" {
|
||||
r.Header.Set(s3_constants.AmzIdentityId, identity.Name)
|
||||
@@ -453,6 +454,7 @@ func (identity *Identity) canDo(action Action, bucket string, objectKey string)
|
||||
}
|
||||
}
|
||||
if bucket == "" {
|
||||
glog.V(3).Infof("identity %s is not allowed to perform action %s on %s -- bucket is empty", identity.Name, action, bucket+objectKey)
|
||||
return false
|
||||
}
|
||||
target := string(action) + ":" + bucket + objectKey
|
||||
@@ -477,6 +479,8 @@ func (identity *Identity) canDo(action Action, bucket string, objectKey string)
|
||||
}
|
||||
}
|
||||
}
|
||||
//log error
|
||||
glog.V(3).Infof("identity %s is not allowed to perform action %s on %s", identity.Name, action, bucket+objectKey)
|
||||
return false
|
||||
}
|
||||
|
||||
|
@@ -1,11 +1,12 @@
|
||||
package s3api
|
||||
|
||||
import (
|
||||
. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
|
||||
jsonpb "google.golang.org/protobuf/encoding/protojson"
|
||||
)
|
||||
@@ -79,6 +80,7 @@ func TestCanDo(t *testing.T) {
|
||||
}
|
||||
// object specific
|
||||
assert.Equal(t, true, ident1.canDo(ACTION_WRITE, "bucket1", "/a/b/c/d.txt"))
|
||||
assert.Equal(t, false, ident1.canDo(ACTION_DELETE_BUCKET, "bucket1", ""))
|
||||
assert.Equal(t, false, ident1.canDo(ACTION_WRITE, "bucket1", "/a/b/other/some"), "action without *")
|
||||
|
||||
// bucket specific
|
||||
@@ -141,6 +143,15 @@ func TestCanDo(t *testing.T) {
|
||||
},
|
||||
}
|
||||
assert.Equal(t, true, ident6.canDo(ACTION_READ, "anything_bucket", "/a/b/c/d.txt"))
|
||||
|
||||
//test deleteBucket operation
|
||||
ident7 := &Identity{
|
||||
Name: "anything",
|
||||
Actions: []Action{
|
||||
"DeleteBucket:bucket1",
|
||||
},
|
||||
}
|
||||
assert.Equal(t, true, ident7.canDo(ACTION_DELETE_BUCKET, "bucket1", ""))
|
||||
}
|
||||
|
||||
type LoadS3ApiConfigurationTestCase struct {
|
||||
|
@@ -1,13 +1,14 @@
|
||||
package s3_constants
|
||||
|
||||
const (
|
||||
ACTION_READ = "Read"
|
||||
ACTION_READ_ACP = "ReadAcp"
|
||||
ACTION_WRITE = "Write"
|
||||
ACTION_WRITE_ACP = "WriteAcp"
|
||||
ACTION_ADMIN = "Admin"
|
||||
ACTION_TAGGING = "Tagging"
|
||||
ACTION_LIST = "List"
|
||||
ACTION_READ = "Read"
|
||||
ACTION_READ_ACP = "ReadAcp"
|
||||
ACTION_WRITE = "Write"
|
||||
ACTION_WRITE_ACP = "WriteAcp"
|
||||
ACTION_ADMIN = "Admin"
|
||||
ACTION_TAGGING = "Tagging"
|
||||
ACTION_LIST = "List"
|
||||
ACTION_DELETE_BUCKET = "DeleteBucket"
|
||||
|
||||
SeaweedStorageDestinationHeader = "x-seaweedfs-destination"
|
||||
MultipartUploadsFolder = ".uploads"
|
||||
|
@@ -7,7 +7,7 @@ import (
|
||||
var (
|
||||
CircuitBreakerConfigDir = "/etc/s3"
|
||||
CircuitBreakerConfigFile = "circuit_breaker.json"
|
||||
AllowedActions = []string{ACTION_READ, ACTION_READ_ACP, ACTION_WRITE, ACTION_WRITE_ACP, ACTION_LIST, ACTION_TAGGING, ACTION_ADMIN}
|
||||
AllowedActions = []string{ACTION_READ, ACTION_READ_ACP, ACTION_WRITE, ACTION_WRITE_ACP, ACTION_LIST, ACTION_TAGGING, ACTION_ADMIN, ACTION_DELETE_BUCKET}
|
||||
LimitTypeCount = "Count"
|
||||
LimitTypeBytes = "MB"
|
||||
Separator = ":"
|
||||
|
@@ -6,14 +6,15 @@ import (
|
||||
"encoding/xml"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/aws/aws-sdk-go/private/protocol/xml/xmlutil"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3bucket"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
"math"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/private/protocol/xml/xmlutil"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3bucket"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
@@ -218,6 +219,10 @@ func (s3a *S3ApiServer) checkBucket(r *http.Request, bucket string) s3err.ErrorC
|
||||
return s3err.ErrNoSuchBucket
|
||||
}
|
||||
|
||||
//if iam is enabled, the access was already checked before
|
||||
if s3a.iam.isEnabled() {
|
||||
return s3err.ErrNone
|
||||
}
|
||||
if !s3a.hasAccess(r, entry) {
|
||||
return s3err.ErrAccessDenied
|
||||
}
|
||||
@@ -236,6 +241,7 @@ func (s3a *S3ApiServer) hasAccess(r *http.Request, entry *filer_pb.Entry) bool {
|
||||
identityId := r.Header.Get(s3_constants.AmzIdentityId)
|
||||
if id, ok := entry.Extended[s3_constants.AmzIdentityId]; ok {
|
||||
if identityId != string(id) {
|
||||
glog.V(3).Infof("hasAccess: %s != %s (entry.Extended = %v)", identityId, id, entry.Extended)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
@@ -279,7 +279,7 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
|
||||
bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketHandler, ACTION_ADMIN)), "PUT"))
|
||||
|
||||
// DeleteBucket
|
||||
bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketHandler, ACTION_ADMIN)), "DELETE"))
|
||||
bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketHandler, ACTION_DELETE_BUCKET)), "DELETE"))
|
||||
|
||||
// ListObjectsV1 (Legacy)
|
||||
bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListObjectsV1Handler, ACTION_LIST)), "LIST"))
|
||||
|
@@ -1,5 +1,5 @@
|
||||
//go:build openbsd || netbsd || plan9 || solaris
|
||||
// +build openbsd netbsd plan9 solaris
|
||||
//go:build netbsd || plan9 || solaris
|
||||
// +build netbsd plan9 solaris
|
||||
|
||||
package stats
|
||||
|
||||
|
25
weed/stats/disk_openbsd.go
Normal file
25
weed/stats/disk_openbsd.go
Normal file
@@ -0,0 +1,25 @@
|
||||
//go:build openbsd
|
||||
// +build openbsd
|
||||
|
||||
package stats
|
||||
|
||||
import (
|
||||
"syscall"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||
)
|
||||
|
||||
func fillInDiskStatus(disk *volume_server_pb.DiskStatus) {
|
||||
fs := syscall.Statfs_t{}
|
||||
err := syscall.Statfs(disk.Dir, &fs)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
disk.All = fs.F_blocks * uint64(fs.F_bsize)
|
||||
disk.Free = fs.F_bfree * uint64(fs.F_bsize)
|
||||
disk.Used = disk.All - disk.Free
|
||||
disk.PercentFree = float32((float64(disk.Free) / float64(disk.All)) * 100)
|
||||
disk.PercentUsed = float32((float64(disk.Used) / float64(disk.All)) * 100)
|
||||
return
|
||||
}
|
||||
|
@@ -36,8 +36,8 @@ func LoadCompactNeedleMap(file *os.File) (*NeedleMap, error) {
|
||||
func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
|
||||
e := idx.WalkIndexFile(file, 0, func(key NeedleId, offset Offset, size Size) error {
|
||||
nm.MaybeSetMaxFileKey(key)
|
||||
nm.FileCounter++
|
||||
if !offset.IsZero() && size.IsValid() {
|
||||
nm.FileCounter++
|
||||
nm.FileByteCounter = nm.FileByteCounter + uint64(size)
|
||||
oldOffset, oldSize := nm.m.Set(NeedleId(key), offset, size)
|
||||
if !oldOffset.IsZero() && oldSize.IsValid() {
|
||||
@@ -51,7 +51,7 @@ func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
|
||||
}
|
||||
return nil
|
||||
})
|
||||
glog.V(1).Infof("max file key: %d for file: %s", nm.MaxFileKey(), file.Name())
|
||||
glog.V(1).Infof("max file key: %v count: %d deleted: %d for file: %s", nm.MaxFileKey(), nm.FileCount(), nm.DeletedCount(), file.Name())
|
||||
return nm, e
|
||||
}
|
||||
|
||||
|
@@ -109,9 +109,6 @@ func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version,
|
||||
return 0, fmt.Errorf("verifyNeedleIntegrity check %s entry offset %d size %d: %v", datFile.Name(), offset, size, err)
|
||||
}
|
||||
n.AppendAtNs = util.BytesToUint64(bytes)
|
||||
if n.HasTtl() {
|
||||
return n.AppendAtNs, nil
|
||||
}
|
||||
fileTailOffset := offset + needle.GetActualSize(size, v)
|
||||
fileSize, _, err := datFile.GetStat()
|
||||
if err != nil {
|
||||
@@ -130,7 +127,7 @@ func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version,
|
||||
return n.AppendAtNs, fmt.Errorf("read data [%d,%d) : %v", offset, offset+int64(size), err)
|
||||
}
|
||||
if n.Id != key {
|
||||
return n.AppendAtNs, fmt.Errorf("index key %#x does not match needle's Id %#x", key, n.Id)
|
||||
return n.AppendAtNs, fmt.Errorf("index key %v does not match needle's Id %v", key, n.Id)
|
||||
}
|
||||
return n.AppendAtNs, err
|
||||
}
|
||||
@@ -147,7 +144,7 @@ func verifyDeletedNeedleIntegrity(datFile backend.BackendStorageFile, v needle.V
|
||||
return n.AppendAtNs, fmt.Errorf("read data [%d,%d) : %v", fileSize-size, size, err)
|
||||
}
|
||||
if n.Id != key {
|
||||
return n.AppendAtNs, fmt.Errorf("index key %#x does not match needle's Id %#x", key, n.Id)
|
||||
return n.AppendAtNs, fmt.Errorf("index key %v does not match needle's Id %v", key, n.Id)
|
||||
}
|
||||
return n.AppendAtNs, err
|
||||
}
|
||||
|
@@ -487,19 +487,21 @@ func (v *Volume) copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, da
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dstDatSize, _, err := dstDatBackend.GetStat()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if v.nm.ContentSize() > v.nm.DeletedSize() {
|
||||
expectedContentSize := v.nm.ContentSize() - v.nm.DeletedSize()
|
||||
if expectedContentSize > uint64(dstDatSize) {
|
||||
return fmt.Errorf("volume %s unexpected new data size: %d does not match size of content minus deleted: %d",
|
||||
v.Id.String(), dstDatSize, expectedContentSize)
|
||||
if v.Ttl.String() == "" {
|
||||
dstDatSize, _, err := dstDatBackend.GetStat()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if v.nm.ContentSize() > v.nm.DeletedSize() {
|
||||
expectedContentSize := v.nm.ContentSize() - v.nm.DeletedSize()
|
||||
if expectedContentSize > uint64(dstDatSize) {
|
||||
return fmt.Errorf("volume %s unexpected new data size: %d does not match size of content minus deleted: %d",
|
||||
v.Id.String(), dstDatSize, expectedContentSize)
|
||||
}
|
||||
} else {
|
||||
glog.Warningf("volume %s content size: %d less deleted size: %d, new size: %d",
|
||||
v.Id.String(), v.nm.ContentSize(), v.nm.DeletedSize(), dstDatSize)
|
||||
}
|
||||
} else {
|
||||
glog.Warningf("volume %s content size: %d less deleted size: %d, new size: %d",
|
||||
v.Id.String(), v.nm.ContentSize(), v.nm.DeletedSize(), dstDatSize)
|
||||
}
|
||||
err = newNm.SaveToIdx(datIdxName)
|
||||
if err != nil {
|
||||
|
Reference in New Issue
Block a user