From 1db7c2b8aad59177f9ccb32f156908faf0c13eca Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 2 Jul 2025 18:03:17 -0700 Subject: [PATCH] Add credential storage (#6938) * add credential store interface * load credential.toml * lint * create credentialManager with explicit store type * add type name * InitializeCredentialManager * remove unused functions * fix missing import * fix import * fix nil configuration --- weed/admin/dash/admin_server.go | 48 +- weed/admin/dash/user_management.go | 365 +++++------- weed/command/iam.go | 9 +- weed/command/s3.go | 3 +- weed/command/scaffold.go | 11 +- weed/command/scaffold/credential.toml | 55 ++ weed/command/scaffold/example.go | 3 + weed/credential/README.md | 182 ++++++ weed/credential/config_loader.go | 133 +++++ weed/credential/credential_manager.go | 125 ++++ weed/credential/credential_store.go | 91 +++ weed/credential/credential_test.go | 353 ++++++++++++ weed/credential/filer_etc/filer_etc_store.go | 235 ++++++++ weed/credential/memory/memory_store.go | 373 ++++++++++++ weed/credential/memory/memory_store_test.go | 315 ++++++++++ weed/credential/migration.go | 221 +++++++ weed/credential/postgres/postgres_store.go | 570 +++++++++++++++++++ weed/credential/sqlite/sqlite_store.go | 557 ++++++++++++++++++ weed/credential/test/integration_test.go | 122 ++++ weed/iamapi/iamapi_server.go | 63 +- weed/s3api/auth_credentials.go | 62 +- weed/s3api/s3api_put_object_helper_test.go | 8 +- weed/s3api/s3api_server.go | 40 +- 23 files changed, 3656 insertions(+), 288 deletions(-) create mode 100644 weed/command/scaffold/credential.toml create mode 100644 weed/credential/README.md create mode 100644 weed/credential/config_loader.go create mode 100644 weed/credential/credential_manager.go create mode 100644 weed/credential/credential_store.go create mode 100644 weed/credential/credential_test.go create mode 100644 weed/credential/filer_etc/filer_etc_store.go create mode 100644 weed/credential/memory/memory_store.go create mode 100644 weed/credential/memory/memory_store_test.go create mode 100644 weed/credential/migration.go create mode 100644 weed/credential/postgres/postgres_store.go create mode 100644 weed/credential/sqlite/sqlite_store.go create mode 100644 weed/credential/test/integration_test.go diff --git a/weed/admin/dash/admin_server.go b/weed/admin/dash/admin_server.go index cff6f3300..c98026ed1 100644 --- a/weed/admin/dash/admin_server.go +++ b/weed/admin/dash/admin_server.go @@ -9,6 +9,7 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/cluster" + "github.com/seaweedfs/seaweedfs/weed/credential" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" @@ -34,6 +35,9 @@ type AdminServer struct { cachedFilers []string lastFilerUpdate time.Time filerCacheExpiration time.Duration + + // Credential management + credentialManager *credential.CredentialManager } type ClusterTopology struct { @@ -195,13 +199,55 @@ type ClusterFilersData struct { } func NewAdminServer(masterAddress string, templateFS http.FileSystem) *AdminServer { - return &AdminServer{ + server := &AdminServer{ masterAddress: masterAddress, templateFS: templateFS, grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"), cacheExpiration: 10 * time.Second, filerCacheExpiration: 30 * time.Second, // Cache filers for 30 seconds } + + // Initialize credential manager with defaults + credentialManager, err := credential.NewCredentialManagerWithDefaults("") + if err != nil { + glog.Warningf("Failed to initialize credential manager: %v", err) + // Continue without credential manager - will fall back to legacy approach + } else { + // For stores that need filer client details, set them + if store := credentialManager.GetStore(); store != nil { + if filerClientSetter, ok := store.(interface { + SetFilerClient(string, grpc.DialOption) + }); ok { + // We'll set the filer client later when we discover filers + // For now, just store the credential manager + server.credentialManager = credentialManager + + // Set up a goroutine to set filer client once we discover filers + go func() { + for { + filerAddr := server.GetFilerAddress() + if filerAddr != "" { + filerClientSetter.SetFilerClient(filerAddr, server.grpcDialOption) + glog.V(1).Infof("Set filer client for credential manager: %s", filerAddr) + break + } + time.Sleep(5 * time.Second) // Retry every 5 seconds + } + }() + } else { + server.credentialManager = credentialManager + } + } else { + server.credentialManager = credentialManager + } + } + + return server +} + +// GetCredentialManager returns the credential manager +func (s *AdminServer) GetCredentialManager() *credential.CredentialManager { + return s.credentialManager } // GetFilerAddress returns a filer address, discovering from masters if needed diff --git a/weed/admin/dash/user_management.go b/weed/admin/dash/user_management.go index 007faeed8..1ed0b071e 100644 --- a/weed/admin/dash/user_management.go +++ b/weed/admin/dash/user_management.go @@ -1,45 +1,23 @@ package dash import ( - "bytes" + "context" "crypto/rand" "encoding/base64" "fmt" "time" - "github.com/seaweedfs/seaweedfs/weed/filer" - "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/credential" "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" ) -// CreateObjectStoreUser creates a new user in identity.json +// CreateObjectStoreUser creates a new user using the credential manager func (s *AdminServer) CreateObjectStoreUser(req CreateUserRequest) (*ObjectStoreUser, error) { - s3cfg := &iam_pb.S3ApiConfiguration{} - - // Load existing configuration - err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - var buf bytes.Buffer - if err := filer.ReadEntry(nil, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil { - if err != filer_pb.ErrNotFound { - return err - } - } - if buf.Len() > 0 { - return filer.ParseS3ConfigurationFromBytes(buf.Bytes(), s3cfg) - } - return nil - }) - - if err != nil { - return nil, fmt.Errorf("failed to load IAM configuration: %v", err) + if s.credentialManager == nil { + return nil, fmt.Errorf("credential manager not available") } - // Check if user already exists - for _, identity := range s3cfg.Identities { - if identity.Name == req.Username { - return nil, fmt.Errorf("user %s already exists", req.Username) - } - } + ctx := context.Background() // Create new identity newIdentity := &iam_pb.Identity{ @@ -69,13 +47,13 @@ func (s *AdminServer) CreateObjectStoreUser(req CreateUserRequest) (*ObjectStore } } - // Add to configuration - s3cfg.Identities = append(s3cfg.Identities, newIdentity) - - // Save configuration - err = s.saveS3Configuration(s3cfg) + // Create user using credential manager + err := s.credentialManager.CreateUser(ctx, newIdentity) if err != nil { - return nil, fmt.Errorf("failed to save IAM configuration: %v", err) + if err == credential.ErrUserAlreadyExists { + return nil, fmt.Errorf("user %s already exists", req.Username) + } + return nil, fmt.Errorf("failed to create user: %v", err) } // Return created user @@ -92,35 +70,27 @@ func (s *AdminServer) CreateObjectStoreUser(req CreateUserRequest) (*ObjectStore // UpdateObjectStoreUser updates an existing user func (s *AdminServer) UpdateObjectStoreUser(username string, req UpdateUserRequest) (*ObjectStoreUser, error) { - s3cfg := &iam_pb.S3ApiConfiguration{} + if s.credentialManager == nil { + return nil, fmt.Errorf("credential manager not available") + } - // Load existing configuration - err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - var buf bytes.Buffer - if err := filer.ReadEntry(nil, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil { - return err - } - if buf.Len() > 0 { - return filer.ParseS3ConfigurationFromBytes(buf.Bytes(), s3cfg) - } - return nil - }) + ctx := context.Background() + // Get existing user + identity, err := s.credentialManager.GetUser(ctx, username) if err != nil { - return nil, fmt.Errorf("failed to load IAM configuration: %v", err) - } - - // Find and update user - var updatedIdentity *iam_pb.Identity - for _, identity := range s3cfg.Identities { - if identity.Name == username { - updatedIdentity = identity - break + if err == credential.ErrUserNotFound { + return nil, fmt.Errorf("user %s not found", username) } + return nil, fmt.Errorf("failed to get user: %v", err) } - if updatedIdentity == nil { - return nil, fmt.Errorf("user %s not found", username) + // Create updated identity + updatedIdentity := &iam_pb.Identity{ + Name: identity.Name, + Account: identity.Account, + Credentials: identity.Credentials, + Actions: identity.Actions, } // Update actions if provided @@ -139,10 +109,10 @@ func (s *AdminServer) UpdateObjectStoreUser(username string, req UpdateUserReque updatedIdentity.Account.EmailAddress = req.Email } - // Save configuration - err = s.saveS3Configuration(s3cfg) + // Update user using credential manager + err = s.credentialManager.UpdateUser(ctx, username, updatedIdentity) if err != nil { - return nil, fmt.Errorf("failed to save IAM configuration: %v", err) + return nil, fmt.Errorf("failed to update user: %v", err) } // Return updated user @@ -161,142 +131,95 @@ func (s *AdminServer) UpdateObjectStoreUser(username string, req UpdateUserReque return user, nil } -// DeleteObjectStoreUser deletes a user from identity.json +// DeleteObjectStoreUser deletes a user using the credential manager func (s *AdminServer) DeleteObjectStoreUser(username string) error { - s3cfg := &iam_pb.S3ApiConfiguration{} + if s.credentialManager == nil { + return fmt.Errorf("credential manager not available") + } - // Load existing configuration - err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - var buf bytes.Buffer - if err := filer.ReadEntry(nil, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil { - return err - } - if buf.Len() > 0 { - return filer.ParseS3ConfigurationFromBytes(buf.Bytes(), s3cfg) - } - return nil - }) + ctx := context.Background() + // Delete user using credential manager + err := s.credentialManager.DeleteUser(ctx, username) if err != nil { - return fmt.Errorf("failed to load IAM configuration: %v", err) - } - - // Find and remove user - found := false - for i, identity := range s3cfg.Identities { - if identity.Name == username { - s3cfg.Identities = append(s3cfg.Identities[:i], s3cfg.Identities[i+1:]...) - found = true - break + if err == credential.ErrUserNotFound { + return fmt.Errorf("user %s not found", username) } + return fmt.Errorf("failed to delete user: %v", err) } - if !found { - return fmt.Errorf("user %s not found", username) - } - - // Save configuration - return s.saveS3Configuration(s3cfg) + return nil } // GetObjectStoreUserDetails returns detailed information about a user func (s *AdminServer) GetObjectStoreUserDetails(username string) (*UserDetails, error) { - s3cfg := &iam_pb.S3ApiConfiguration{} + if s.credentialManager == nil { + return nil, fmt.Errorf("credential manager not available") + } - // Load existing configuration - err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - var buf bytes.Buffer - if err := filer.ReadEntry(nil, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil { - return err - } - if buf.Len() > 0 { - return filer.ParseS3ConfigurationFromBytes(buf.Bytes(), s3cfg) - } - return nil - }) + ctx := context.Background() + // Get user using credential manager + identity, err := s.credentialManager.GetUser(ctx, username) if err != nil { - return nil, fmt.Errorf("failed to load IAM configuration: %v", err) - } - - // Find user - for _, identity := range s3cfg.Identities { - if identity.Name == username { - details := &UserDetails{ - Username: username, - Actions: identity.Actions, - } - - // Set email from account if available - if identity.Account != nil { - details.Email = identity.Account.EmailAddress - } - - // Convert credentials to access key info - for _, cred := range identity.Credentials { - details.AccessKeys = append(details.AccessKeys, AccessKeyInfo{ - AccessKey: cred.AccessKey, - SecretKey: cred.SecretKey, - CreatedAt: time.Now().AddDate(0, -1, 0), // Mock creation date - }) - } - - return details, nil + if err == credential.ErrUserNotFound { + return nil, fmt.Errorf("user %s not found", username) } + return nil, fmt.Errorf("failed to get user: %v", err) } - return nil, fmt.Errorf("user %s not found", username) + details := &UserDetails{ + Username: username, + Actions: identity.Actions, + } + + // Set email from account if available + if identity.Account != nil { + details.Email = identity.Account.EmailAddress + } + + // Convert credentials to access key info + for _, cred := range identity.Credentials { + details.AccessKeys = append(details.AccessKeys, AccessKeyInfo{ + AccessKey: cred.AccessKey, + SecretKey: cred.SecretKey, + CreatedAt: time.Now().AddDate(0, -1, 0), // Mock creation date + }) + } + + return details, nil } // CreateAccessKey creates a new access key for a user func (s *AdminServer) CreateAccessKey(username string) (*AccessKeyInfo, error) { - s3cfg := &iam_pb.S3ApiConfiguration{} + if s.credentialManager == nil { + return nil, fmt.Errorf("credential manager not available") + } - // Load existing configuration - err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - var buf bytes.Buffer - if err := filer.ReadEntry(nil, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil { - return err - } - if buf.Len() > 0 { - return filer.ParseS3ConfigurationFromBytes(buf.Bytes(), s3cfg) - } - return nil - }) + ctx := context.Background() + // Check if user exists + _, err := s.credentialManager.GetUser(ctx, username) if err != nil { - return nil, fmt.Errorf("failed to load IAM configuration: %v", err) - } - - // Find user - var targetIdentity *iam_pb.Identity - for _, identity := range s3cfg.Identities { - if identity.Name == username { - targetIdentity = identity - break + if err == credential.ErrUserNotFound { + return nil, fmt.Errorf("user %s not found", username) } - } - - if targetIdentity == nil { - return nil, fmt.Errorf("user %s not found", username) + return nil, fmt.Errorf("failed to get user: %v", err) } // Generate new access key accessKey := generateAccessKey() secretKey := generateSecretKey() - newCredential := &iam_pb.Credential{ + credential := &iam_pb.Credential{ AccessKey: accessKey, SecretKey: secretKey, } - // Add to user's credentials - targetIdentity.Credentials = append(targetIdentity.Credentials, newCredential) - - // Save configuration - err = s.saveS3Configuration(s3cfg) + // Create access key using credential manager + err = s.credentialManager.CreateAccessKey(ctx, username, credential) if err != nil { - return nil, fmt.Errorf("failed to save IAM configuration: %v", err) + return nil, fmt.Errorf("failed to create access key: %v", err) } return &AccessKeyInfo{ @@ -308,111 +231,79 @@ func (s *AdminServer) CreateAccessKey(username string) (*AccessKeyInfo, error) { // DeleteAccessKey deletes an access key for a user func (s *AdminServer) DeleteAccessKey(username, accessKeyId string) error { - s3cfg := &iam_pb.S3ApiConfiguration{} - - // Load existing configuration - err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - var buf bytes.Buffer - if err := filer.ReadEntry(nil, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil { - return err - } - if buf.Len() > 0 { - return filer.ParseS3ConfigurationFromBytes(buf.Bytes(), s3cfg) - } - return nil - }) - - if err != nil { - return fmt.Errorf("failed to load IAM configuration: %v", err) + if s.credentialManager == nil { + return fmt.Errorf("credential manager not available") } - // Find user and remove access key - for _, identity := range s3cfg.Identities { - if identity.Name == username { - for i, cred := range identity.Credentials { - if cred.AccessKey == accessKeyId { - identity.Credentials = append(identity.Credentials[:i], identity.Credentials[i+1:]...) - return s.saveS3Configuration(s3cfg) - } - } + ctx := context.Background() + + // Delete access key using credential manager + err := s.credentialManager.DeleteAccessKey(ctx, username, accessKeyId) + if err != nil { + if err == credential.ErrUserNotFound { + return fmt.Errorf("user %s not found", username) + } + if err == credential.ErrAccessKeyNotFound { return fmt.Errorf("access key %s not found for user %s", accessKeyId, username) } + return fmt.Errorf("failed to delete access key: %v", err) } - return fmt.Errorf("user %s not found", username) + return nil } // GetUserPolicies returns the policies for a user (actions) func (s *AdminServer) GetUserPolicies(username string) ([]string, error) { - s3cfg := &iam_pb.S3ApiConfiguration{} + if s.credentialManager == nil { + return nil, fmt.Errorf("credential manager not available") + } - // Load existing configuration - err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - var buf bytes.Buffer - if err := filer.ReadEntry(nil, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil { - return err - } - if buf.Len() > 0 { - return filer.ParseS3ConfigurationFromBytes(buf.Bytes(), s3cfg) - } - return nil - }) + ctx := context.Background() + // Get user using credential manager + identity, err := s.credentialManager.GetUser(ctx, username) if err != nil { - return nil, fmt.Errorf("failed to load IAM configuration: %v", err) - } - - // Find user and return policies - for _, identity := range s3cfg.Identities { - if identity.Name == username { - return identity.Actions, nil + if err == credential.ErrUserNotFound { + return nil, fmt.Errorf("user %s not found", username) } + return nil, fmt.Errorf("failed to get user: %v", err) } - return nil, fmt.Errorf("user %s not found", username) + return identity.Actions, nil } // UpdateUserPolicies updates the policies (actions) for a user func (s *AdminServer) UpdateUserPolicies(username string, actions []string) error { - s3cfg := &iam_pb.S3ApiConfiguration{} + if s.credentialManager == nil { + return fmt.Errorf("credential manager not available") + } - // Load existing configuration - err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - var buf bytes.Buffer - if err := filer.ReadEntry(nil, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil { - return err - } - if buf.Len() > 0 { - return filer.ParseS3ConfigurationFromBytes(buf.Bytes(), s3cfg) - } - return nil - }) + ctx := context.Background() + // Get existing user + identity, err := s.credentialManager.GetUser(ctx, username) if err != nil { - return fmt.Errorf("failed to load IAM configuration: %v", err) + if err == credential.ErrUserNotFound { + return fmt.Errorf("user %s not found", username) + } + return fmt.Errorf("failed to get user: %v", err) } - // Find user and update policies - for _, identity := range s3cfg.Identities { - if identity.Name == username { - identity.Actions = actions - return s.saveS3Configuration(s3cfg) - } + // Create updated identity with new actions + updatedIdentity := &iam_pb.Identity{ + Name: identity.Name, + Account: identity.Account, + Credentials: identity.Credentials, + Actions: actions, } - return fmt.Errorf("user %s not found", username) -} + // Update user using credential manager + err = s.credentialManager.UpdateUser(ctx, username, updatedIdentity) + if err != nil { + return fmt.Errorf("failed to update user policies: %v", err) + } -// saveS3Configuration saves the S3 configuration to identity.json -func (s *AdminServer) saveS3Configuration(s3cfg *iam_pb.S3ApiConfiguration) error { - return s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - var buf bytes.Buffer - if err := filer.ProtoToText(&buf, s3cfg); err != nil { - return fmt.Errorf("failed to marshal configuration: %v", err) - } - - return filer.SaveInsideFiler(client, filer.IamConfigDirectory, filer.IamIdentityFile, buf.Bytes()) - }) + return nil } // Helper functions for generating keys and IDs diff --git a/weed/command/iam.go b/weed/command/iam.go index b0b0fc52a..f67173389 100644 --- a/weed/command/iam.go +++ b/weed/command/iam.go @@ -3,9 +3,10 @@ package command import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/util/version" "net/http" + "github.com/seaweedfs/seaweedfs/weed/util/version" + "time" "github.com/gorilla/mux" @@ -15,6 +16,12 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/util" + + // Import credential stores to register them + _ "github.com/seaweedfs/seaweedfs/weed/credential/filer_etc" + _ "github.com/seaweedfs/seaweedfs/weed/credential/memory" + _ "github.com/seaweedfs/seaweedfs/weed/credential/postgres" + _ "github.com/seaweedfs/seaweedfs/weed/credential/sqlite" ) var ( diff --git a/weed/command/s3.go b/weed/command/s3.go index aa8798eb1..f955c4222 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -5,7 +5,6 @@ import ( "crypto/tls" "crypto/x509" "fmt" - "github.com/seaweedfs/seaweedfs/weed/util/version" "io/ioutil" "net" "net/http" @@ -14,6 +13,8 @@ import ( "strings" "time" + "github.com/seaweedfs/seaweedfs/weed/util/version" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "google.golang.org/grpc/credentials/tls/certprovider" "google.golang.org/grpc/credentials/tls/certprovider/pemfile" diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 5d1ccb13f..26de2e1fd 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -2,9 +2,10 @@ package command import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/util" "path/filepath" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/command/scaffold" ) @@ -13,9 +14,9 @@ func init() { } var cmdScaffold = &Command{ - UsageLine: "scaffold -config=[filer|notification|replication|security|master]", + UsageLine: "scaffold -config=[filer|notification|replication|security|master|shell|credential]", Short: "generate basic configuration files", - Long: `Generate filer.toml with all possible configurations for you to customize. + Long: `Generate configuration files with all possible configurations for you to customize. The options can also be overwritten by environment variables. For example, the filer.toml mysql password can be overwritten by environment variable @@ -30,7 +31,7 @@ var cmdScaffold = &Command{ var ( outputPath = cmdScaffold.Flag.String("output", "", "if not empty, save the configuration file to this directory") - config = cmdScaffold.Flag.String("config", "filer", "[filer|notification|replication|security|master] the configuration file to generate") + config = cmdScaffold.Flag.String("config", "filer", "[filer|notification|replication|security|master|shell|credential] the configuration file to generate") ) func runScaffold(cmd *Command, args []string) bool { @@ -49,6 +50,8 @@ func runScaffold(cmd *Command, args []string) bool { content = scaffold.Master case "shell": content = scaffold.Shell + case "credential": + content = scaffold.Credential } if content == "" { println("need a valid -config option") diff --git a/weed/command/scaffold/credential.toml b/weed/command/scaffold/credential.toml new file mode 100644 index 000000000..380867800 --- /dev/null +++ b/weed/command/scaffold/credential.toml @@ -0,0 +1,55 @@ +# Put this file to one of the location, with descending priority +# ./credential.toml +# $HOME/.seaweedfs/credential.toml +# /etc/seaweedfs/credential.toml +# this file is read by S3 API and IAM API servers + +# Choose one of the credential stores below +# Only one store can be enabled at a time + +# Filer-based credential store (default, uses existing filer storage) +[credential.filer_etc] +enabled = true +# filer address and grpc_dial_option will be automatically configured by the server + +# SQLite credential store (recommended for single-node deployments) +[credential.sqlite] +enabled = false +file = "/var/lib/seaweedfs/credentials.db" +# Optional: table name prefix (default: "sw_") +table_prefix = "sw_" + +# PostgreSQL credential store (recommended for multi-node deployments) +[credential.postgres] +enabled = false +hostname = "localhost" +port = 5432 +username = "seaweedfs" +password = "your_password" +database = "seaweedfs" +schema = "public" +sslmode = "disable" +# Optional: table name prefix (default: "sw_") +table_prefix = "sw_" +# Connection pool settings +connection_max_idle = 10 +connection_max_open = 100 +connection_max_lifetime_seconds = 3600 + +# Memory credential store (for testing only, data is lost on restart) +[credential.memory] +enabled = false + +# Environment variable overrides: +# Any configuration value can be overridden by environment variables +# Rules: +# * Prefix with "WEED_CREDENTIAL_" +# * Convert to uppercase +# * Replace '.' with '_' +# +# Examples: +# export WEED_CREDENTIAL_POSTGRES_PASSWORD=secret +# export WEED_CREDENTIAL_SQLITE_FILE=/custom/path/credentials.db +# export WEED_CREDENTIAL_POSTGRES_HOSTNAME=db.example.com +# export WEED_CREDENTIAL_FILER_ETC_ENABLED=true +# export WEED_CREDENTIAL_SQLITE_ENABLED=false \ No newline at end of file diff --git a/weed/command/scaffold/example.go b/weed/command/scaffold/example.go index 6be6804e5..26d0a306c 100644 --- a/weed/command/scaffold/example.go +++ b/weed/command/scaffold/example.go @@ -19,3 +19,6 @@ var Master string //go:embed shell.toml var Shell string + +//go:embed credential.toml +var Credential string diff --git a/weed/credential/README.md b/weed/credential/README.md new file mode 100644 index 000000000..a08bc914e --- /dev/null +++ b/weed/credential/README.md @@ -0,0 +1,182 @@ +# Credential Store Integration + +This document shows how the credential store has been integrated into SeaweedFS's S3 API and IAM API components. + +## Quick Start + +1. **Generate credential configuration:** + ```bash + weed scaffold -config=credential -output=. + ``` + +2. **Edit credential.toml** to enable your preferred store (filer_etc is enabled by default) + +3. **Start S3 API server** - it will automatically load credential.toml: + ```bash + weed s3 -filer=localhost:8888 + ``` + +## Integration Overview + +The credential store provides a pluggable backend for storing S3 identities and credentials, supporting: +- **Filer-based storage** (filer_etc) - Uses existing filer storage (default) +- **SQLite** - Local database storage +- **PostgreSQL** - Shared database for multiple servers +- **Memory** - In-memory storage for testing + +## Configuration + +### Using credential.toml + +Generate the configuration template: +```bash +weed scaffold -config=credential +``` + +This creates a `credential.toml` file with all available options. The filer_etc store is enabled by default: + +```toml +# Filer-based credential store (default, uses existing filer storage) +[credential.filer_etc] +enabled = true + +# SQLite credential store (recommended for single-node deployments) +[credential.sqlite] +enabled = false +file = "/var/lib/seaweedfs/credentials.db" + +# PostgreSQL credential store (recommended for multi-node deployments) +[credential.postgres] +enabled = false +hostname = "localhost" +port = 5432 +username = "seaweedfs" +password = "your_password" +database = "seaweedfs" + +# Memory credential store (for testing only, data is lost on restart) +[credential.memory] +enabled = false +``` + +The credential.toml file is automatically loaded from these locations (in priority order): +- `./credential.toml` +- `$HOME/.seaweedfs/credential.toml` +- `/etc/seaweedfs/credential.toml` + +### Server Configuration + +Both S3 API and IAM API servers automatically load credential.toml during startup. No additional configuration is required. + +## Usage Examples + +### Filer-based Store (Default) + +```toml +[credential.filer_etc] +enabled = true +``` + +This uses the existing filer storage and is compatible with current deployments. + +### SQLite Store + +```toml +[credential.sqlite] +enabled = true +file = "/var/lib/seaweedfs/credentials.db" +table_prefix = "sw_" +``` + +### PostgreSQL Store + +```toml +[credential.postgres] +enabled = true +hostname = "localhost" +port = 5432 +username = "seaweedfs" +password = "your_password" +database = "seaweedfs" +schema = "public" +sslmode = "disable" +table_prefix = "sw_" +connection_max_idle = 10 +connection_max_open = 100 +connection_max_lifetime_seconds = 3600 +``` + +### Memory Store (Testing) + +```toml +[credential.memory] +enabled = true +``` + +## Environment Variables + +All credential configuration can be overridden with environment variables: + +```bash +# Override PostgreSQL password +export WEED_CREDENTIAL_POSTGRES_PASSWORD=secret + +# Override SQLite file path +export WEED_CREDENTIAL_SQLITE_FILE=/custom/path/credentials.db + +# Override PostgreSQL hostname +export WEED_CREDENTIAL_POSTGRES_HOSTNAME=db.example.com + +# Enable/disable stores +export WEED_CREDENTIAL_FILER_ETC_ENABLED=true +export WEED_CREDENTIAL_SQLITE_ENABLED=false +``` + +Rules: +- Prefix with `WEED_CREDENTIAL_` +- Convert to uppercase +- Replace `.` with `_` + +## Implementation Details + +Components automatically load credential configuration during startup: + +```go +// Server initialization +if credConfig, err := credential.LoadCredentialConfiguration(); err == nil && credConfig != nil { + credentialManager, err := credential.NewCredentialManager( + credConfig.Store, + credConfig.Config, + credConfig.Prefix, + ) + if err != nil { + return nil, fmt.Errorf("failed to initialize credential manager: %v", err) + } + // Use credential manager for operations +} +``` + +## Benefits + +1. **Easy Configuration** - Generate template with `weed scaffold -config=credential` +2. **Pluggable Storage** - Switch between filer_etc, SQLite, PostgreSQL without code changes +3. **Backward Compatibility** - Filer-based storage works with existing deployments +4. **Scalability** - Database stores support multiple concurrent servers +5. **Performance** - Database access can be faster than file-based storage +6. **Testing** - Memory store simplifies unit testing +7. **Environment Override** - All settings can be overridden with environment variables + +## Error Handling + +When a credential store is configured, it must initialize successfully or the server will fail to start: + +```go +if credConfig != nil { + credentialManager, err = credential.NewCredentialManager(...) + if err != nil { + return nil, fmt.Errorf("failed to initialize credential manager: %v", err) + } +} +``` + +This ensures explicit configuration - if you configure a credential store, it must work properly. \ No newline at end of file diff --git a/weed/credential/config_loader.go b/weed/credential/config_loader.go new file mode 100644 index 000000000..959f1cfb4 --- /dev/null +++ b/weed/credential/config_loader.go @@ -0,0 +1,133 @@ +package credential + +import ( + "fmt" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +// CredentialConfig represents the credential configuration from credential.toml +type CredentialConfig struct { + Store string + Config util.Configuration + Prefix string +} + +// LoadCredentialConfiguration loads credential configuration from credential.toml +// Returns the store type, configuration, and prefix for credential management +func LoadCredentialConfiguration() (*CredentialConfig, error) { + // Try to load credential.toml configuration + loaded := util.LoadConfiguration("credential", false) + if !loaded { + glog.V(1).Info("No credential.toml found, credential store disabled") + return nil, nil + } + + viper := util.GetViper() + + // Find which credential store is enabled + var enabledStore string + var storePrefix string + + // Get available store types from registered stores + storeTypes := GetAvailableStores() + for _, storeType := range storeTypes { + key := fmt.Sprintf("credential.%s.enabled", string(storeType)) + if viper.GetBool(key) { + if enabledStore != "" { + return nil, fmt.Errorf("multiple credential stores enabled: %s and %s. Only one store can be enabled", enabledStore, string(storeType)) + } + enabledStore = string(storeType) + storePrefix = fmt.Sprintf("credential.%s.", string(storeType)) + } + } + + if enabledStore == "" { + glog.V(1).Info("No credential store enabled in credential.toml") + return nil, nil + } + + glog.V(0).Infof("Loaded credential configuration: store=%s", enabledStore) + + return &CredentialConfig{ + Store: enabledStore, + Config: viper, + Prefix: storePrefix, + }, nil +} + +// GetCredentialStoreConfig extracts credential store configuration from command line flags +// This is used when credential store is configured via command line instead of credential.toml +func GetCredentialStoreConfig(store string, config util.Configuration, prefix string) *CredentialConfig { + if store == "" { + return nil + } + + return &CredentialConfig{ + Store: store, + Config: config, + Prefix: prefix, + } +} + +// MergeCredentialConfig merges command line credential config with credential.toml config +// Command line flags take priority over credential.toml +func MergeCredentialConfig(cmdLineStore string, cmdLineConfig util.Configuration, cmdLinePrefix string) (*CredentialConfig, error) { + // If command line credential store is specified, use it + if cmdLineStore != "" { + glog.V(0).Infof("Using command line credential configuration: store=%s", cmdLineStore) + return GetCredentialStoreConfig(cmdLineStore, cmdLineConfig, cmdLinePrefix), nil + } + + // Otherwise, try to load from credential.toml + config, err := LoadCredentialConfiguration() + if err != nil { + return nil, err + } + + if config == nil { + glog.V(1).Info("No credential store configured") + } + + return config, nil +} + +// NewCredentialManagerWithDefaults creates a credential manager with fallback to defaults +// If explicitStore is provided, it will be used regardless of credential.toml +// If explicitStore is empty, it tries credential.toml first, then defaults to "filer_etc" +func NewCredentialManagerWithDefaults(explicitStore CredentialStoreTypeName) (*CredentialManager, error) { + var storeName CredentialStoreTypeName + var config util.Configuration + var prefix string + + // If explicit store is provided, use it + if explicitStore != "" { + storeName = explicitStore + config = nil + prefix = "" + glog.V(0).Infof("Using explicit credential store: %s", storeName) + } else { + // Try to load from credential.toml first + if credConfig, err := LoadCredentialConfiguration(); err == nil && credConfig != nil { + storeName = CredentialStoreTypeName(credConfig.Store) + config = credConfig.Config + prefix = credConfig.Prefix + glog.V(0).Infof("Loaded credential configuration from credential.toml: store=%s", storeName) + } else { + // Default to filer_etc store + storeName = StoreTypeFilerEtc + config = nil + prefix = "" + glog.V(1).Info("No credential.toml found, defaulting to filer_etc store") + } + } + + // Create the credential manager + credentialManager, err := NewCredentialManager(storeName, config, prefix) + if err != nil { + return nil, fmt.Errorf("failed to initialize credential manager with store '%s': %v", storeName, err) + } + + return credentialManager, nil +} diff --git a/weed/credential/credential_manager.go b/weed/credential/credential_manager.go new file mode 100644 index 000000000..d4323e920 --- /dev/null +++ b/weed/credential/credential_manager.go @@ -0,0 +1,125 @@ +package credential + +import ( + "context" + "fmt" + "strings" + + "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +// CredentialManager manages user credentials using a configurable store +type CredentialManager struct { + store CredentialStore +} + +// NewCredentialManager creates a new credential manager with the specified store +func NewCredentialManager(storeName CredentialStoreTypeName, configuration util.Configuration, prefix string) (*CredentialManager, error) { + var store CredentialStore + + // Find the requested store implementation + for _, s := range Stores { + if s.GetName() == storeName { + store = s + break + } + } + + if store == nil { + return nil, fmt.Errorf("credential store '%s' not found. Available stores: %s", + storeName, getAvailableStores()) + } + + // Initialize the store + if err := store.Initialize(configuration, prefix); err != nil { + return nil, fmt.Errorf("failed to initialize credential store '%s': %v", storeName, err) + } + + return &CredentialManager{ + store: store, + }, nil +} + +// GetStore returns the underlying credential store +func (cm *CredentialManager) GetStore() CredentialStore { + return cm.store +} + +// LoadConfiguration loads the S3 API configuration +func (cm *CredentialManager) LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiConfiguration, error) { + return cm.store.LoadConfiguration(ctx) +} + +// SaveConfiguration saves the S3 API configuration +func (cm *CredentialManager) SaveConfiguration(ctx context.Context, config *iam_pb.S3ApiConfiguration) error { + return cm.store.SaveConfiguration(ctx, config) +} + +// CreateUser creates a new user +func (cm *CredentialManager) CreateUser(ctx context.Context, identity *iam_pb.Identity) error { + return cm.store.CreateUser(ctx, identity) +} + +// GetUser retrieves a user by username +func (cm *CredentialManager) GetUser(ctx context.Context, username string) (*iam_pb.Identity, error) { + return cm.store.GetUser(ctx, username) +} + +// UpdateUser updates an existing user +func (cm *CredentialManager) UpdateUser(ctx context.Context, username string, identity *iam_pb.Identity) error { + return cm.store.UpdateUser(ctx, username, identity) +} + +// DeleteUser removes a user +func (cm *CredentialManager) DeleteUser(ctx context.Context, username string) error { + return cm.store.DeleteUser(ctx, username) +} + +// ListUsers returns all usernames +func (cm *CredentialManager) ListUsers(ctx context.Context) ([]string, error) { + return cm.store.ListUsers(ctx) +} + +// GetUserByAccessKey retrieves a user by access key +func (cm *CredentialManager) GetUserByAccessKey(ctx context.Context, accessKey string) (*iam_pb.Identity, error) { + return cm.store.GetUserByAccessKey(ctx, accessKey) +} + +// CreateAccessKey creates a new access key for a user +func (cm *CredentialManager) CreateAccessKey(ctx context.Context, username string, credential *iam_pb.Credential) error { + return cm.store.CreateAccessKey(ctx, username, credential) +} + +// DeleteAccessKey removes an access key for a user +func (cm *CredentialManager) DeleteAccessKey(ctx context.Context, username string, accessKey string) error { + return cm.store.DeleteAccessKey(ctx, username, accessKey) +} + +// Shutdown performs cleanup +func (cm *CredentialManager) Shutdown() { + if cm.store != nil { + cm.store.Shutdown() + } +} + +// getAvailableStores returns a comma-separated list of available store names +func getAvailableStores() string { + var storeNames []string + for _, store := range Stores { + storeNames = append(storeNames, string(store.GetName())) + } + return strings.Join(storeNames, ", ") +} + +// GetAvailableStores returns a list of available credential store names +func GetAvailableStores() []CredentialStoreTypeName { + var storeNames []CredentialStoreTypeName + for _, store := range Stores { + storeNames = append(storeNames, store.GetName()) + } + if storeNames == nil { + return []CredentialStoreTypeName{} + } + return storeNames +} diff --git a/weed/credential/credential_store.go b/weed/credential/credential_store.go new file mode 100644 index 000000000..60a86cfda --- /dev/null +++ b/weed/credential/credential_store.go @@ -0,0 +1,91 @@ +package credential + +import ( + "context" + "errors" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +var ( + ErrUserNotFound = errors.New("user not found") + ErrUserAlreadyExists = errors.New("user already exists") + ErrAccessKeyNotFound = errors.New("access key not found") +) + +// CredentialStoreTypeName represents the type name of a credential store +type CredentialStoreTypeName string + +// Credential store name constants +const ( + StoreTypeMemory CredentialStoreTypeName = "memory" + StoreTypeFilerEtc CredentialStoreTypeName = "filer_etc" + StoreTypePostgres CredentialStoreTypeName = "postgres" + StoreTypeSQLite CredentialStoreTypeName = "sqlite" +) + +// CredentialStore defines the interface for user credential storage and retrieval +type CredentialStore interface { + // GetName returns the name of the credential store implementation + GetName() CredentialStoreTypeName + + // Initialize initializes the credential store with configuration + Initialize(configuration util.Configuration, prefix string) error + + // LoadConfiguration loads the entire S3 API configuration + LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiConfiguration, error) + + // SaveConfiguration saves the entire S3 API configuration + SaveConfiguration(ctx context.Context, config *iam_pb.S3ApiConfiguration) error + + // CreateUser creates a new user with the given identity + CreateUser(ctx context.Context, identity *iam_pb.Identity) error + + // GetUser retrieves a user by username + GetUser(ctx context.Context, username string) (*iam_pb.Identity, error) + + // UpdateUser updates an existing user + UpdateUser(ctx context.Context, username string, identity *iam_pb.Identity) error + + // DeleteUser removes a user by username + DeleteUser(ctx context.Context, username string) error + + // ListUsers returns all usernames + ListUsers(ctx context.Context) ([]string, error) + + // GetUserByAccessKey retrieves a user by access key + GetUserByAccessKey(ctx context.Context, accessKey string) (*iam_pb.Identity, error) + + // CreateAccessKey creates a new access key for a user + CreateAccessKey(ctx context.Context, username string, credential *iam_pb.Credential) error + + // DeleteAccessKey removes an access key for a user + DeleteAccessKey(ctx context.Context, username string, accessKey string) error + + // Shutdown performs cleanup when the store is being shut down + Shutdown() +} + +// AccessKeyInfo represents access key information with metadata +type AccessKeyInfo struct { + AccessKey string `json:"accessKey"` + SecretKey string `json:"secretKey"` + Username string `json:"username"` + CreatedAt time.Time `json:"createdAt"` +} + +// UserCredentials represents a user's credentials and metadata +type UserCredentials struct { + Username string `json:"username"` + Email string `json:"email"` + Account *iam_pb.Account `json:"account,omitempty"` + Credentials []*iam_pb.Credential `json:"credentials"` + Actions []string `json:"actions"` + CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` +} + +// Stores holds all available credential store implementations +var Stores []CredentialStore diff --git a/weed/credential/credential_test.go b/weed/credential/credential_test.go new file mode 100644 index 000000000..70eeb7b0c --- /dev/null +++ b/weed/credential/credential_test.go @@ -0,0 +1,353 @@ +package credential + +import ( + "context" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +func TestCredentialStoreInterface(t *testing.T) { + // Note: This test may fail if run without importing store packages + // For full integration testing, see the test/ package + if len(Stores) == 0 { + t.Skip("No credential stores registered - this is expected when testing the base package without store imports") + } + + // Check that expected stores are available + storeNames := GetAvailableStores() + expectedStores := []string{string(StoreTypeFilerEtc), string(StoreTypeMemory)} + + // Add SQLite and PostgreSQL if they're available (build tags dependent) + for _, storeName := range storeNames { + found := false + for _, expected := range append(expectedStores, string(StoreTypeSQLite), string(StoreTypePostgres)) { + if string(storeName) == expected { + found = true + break + } + } + if !found { + t.Errorf("Unexpected store found: %s", storeName) + } + } + + // Test that filer_etc store is always available + filerEtcStoreFound := false + memoryStoreFound := false + for _, storeName := range storeNames { + if string(storeName) == string(StoreTypeFilerEtc) { + filerEtcStoreFound = true + } + if string(storeName) == string(StoreTypeMemory) { + memoryStoreFound = true + } + } + if !filerEtcStoreFound { + t.Error("FilerEtc store should always be available") + } + if !memoryStoreFound { + t.Error("Memory store should always be available") + } +} + +func TestCredentialManagerCreation(t *testing.T) { + config := util.GetViper() + + // Test creating credential manager with invalid store + _, err := NewCredentialManager(CredentialStoreTypeName("nonexistent"), config, "test.") + if err == nil { + t.Error("Expected error for nonexistent store") + } + + // Skip store-specific tests if no stores are registered + if len(Stores) == 0 { + t.Skip("No credential stores registered - skipping store-specific tests") + } + + // Test creating credential manager with available stores + availableStores := GetAvailableStores() + if len(availableStores) == 0 { + t.Skip("No stores available for testing") + } + + // Test with the first available store + storeName := availableStores[0] + cm, err := NewCredentialManager(storeName, config, "test.") + if err != nil { + t.Fatalf("Failed to create credential manager with store %s: %v", storeName, err) + } + if cm == nil { + t.Error("Credential manager should not be nil") + } + defer cm.Shutdown() + + // Test that the store is of the correct type + if cm.GetStore().GetName() != storeName { + t.Errorf("Expected %s store, got %s", storeName, cm.GetStore().GetName()) + } +} + +func TestCredentialInterface(t *testing.T) { + // Skip if no stores are registered + if len(Stores) == 0 { + t.Skip("No credential stores registered - for full testing see test/ package") + } + + // Test the interface with the first available store + availableStores := GetAvailableStores() + if len(availableStores) == 0 { + t.Skip("No stores available for testing") + } + + testCredentialInterfaceWithStore(t, availableStores[0]) +} + +func testCredentialInterfaceWithStore(t *testing.T, storeName CredentialStoreTypeName) { + // Create a test identity + testIdentity := &iam_pb.Identity{ + Name: "testuser", + Actions: []string{"Read", "Write"}, + Account: &iam_pb.Account{ + Id: "123456789012", + DisplayName: "Test User", + EmailAddress: "test@example.com", + }, + Credentials: []*iam_pb.Credential{ + { + AccessKey: "AKIAIOSFODNN7EXAMPLE", + SecretKey: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + }, + }, + } + + // Test the interface methods exist (compile-time check) + config := util.GetViper() + cm, err := NewCredentialManager(storeName, config, "test.") + if err != nil { + t.Fatalf("Failed to create credential manager: %v", err) + } + defer cm.Shutdown() + + ctx := context.Background() + + // Test LoadConfiguration + _, err = cm.LoadConfiguration(ctx) + if err != nil { + t.Fatalf("LoadConfiguration failed: %v", err) + } + + // Test CreateUser + err = cm.CreateUser(ctx, testIdentity) + if err != nil { + t.Fatalf("CreateUser failed: %v", err) + } + + // Test GetUser + user, err := cm.GetUser(ctx, "testuser") + if err != nil { + t.Fatalf("GetUser failed: %v", err) + } + if user.Name != "testuser" { + t.Errorf("Expected user name 'testuser', got %s", user.Name) + } + + // Test ListUsers + users, err := cm.ListUsers(ctx) + if err != nil { + t.Fatalf("ListUsers failed: %v", err) + } + if len(users) != 1 || users[0] != "testuser" { + t.Errorf("Expected ['testuser'], got %v", users) + } + + // Test GetUserByAccessKey + userByKey, err := cm.GetUserByAccessKey(ctx, "AKIAIOSFODNN7EXAMPLE") + if err != nil { + t.Fatalf("GetUserByAccessKey failed: %v", err) + } + if userByKey.Name != "testuser" { + t.Errorf("Expected user name 'testuser', got %s", userByKey.Name) + } +} + +func TestCredentialManagerIntegration(t *testing.T) { + // Skip if no stores are registered + if len(Stores) == 0 { + t.Skip("No credential stores registered - for full testing see test/ package") + } + + // Test with the first available store + availableStores := GetAvailableStores() + if len(availableStores) == 0 { + t.Skip("No stores available for testing") + } + + storeName := availableStores[0] + config := util.GetViper() + cm, err := NewCredentialManager(storeName, config, "test.") + if err != nil { + t.Fatalf("Failed to create credential manager: %v", err) + } + defer cm.Shutdown() + + ctx := context.Background() + + // Test complete workflow + user1 := &iam_pb.Identity{ + Name: "user1", + Actions: []string{"Read"}, + Account: &iam_pb.Account{ + Id: "111111111111", + DisplayName: "User One", + EmailAddress: "user1@example.com", + }, + Credentials: []*iam_pb.Credential{ + { + AccessKey: "AKIAUSER1", + SecretKey: "secret1", + }, + }, + } + + user2 := &iam_pb.Identity{ + Name: "user2", + Actions: []string{"Write"}, + Account: &iam_pb.Account{ + Id: "222222222222", + DisplayName: "User Two", + EmailAddress: "user2@example.com", + }, + Credentials: []*iam_pb.Credential{ + { + AccessKey: "AKIAUSER2", + SecretKey: "secret2", + }, + }, + } + + // Create users + err = cm.CreateUser(ctx, user1) + if err != nil { + t.Fatalf("Failed to create user1: %v", err) + } + + err = cm.CreateUser(ctx, user2) + if err != nil { + t.Fatalf("Failed to create user2: %v", err) + } + + // List users + users, err := cm.ListUsers(ctx) + if err != nil { + t.Fatalf("Failed to list users: %v", err) + } + + if len(users) != 2 { + t.Errorf("Expected 2 users, got %d", len(users)) + } + + // Test access key lookup + foundUser, err := cm.GetUserByAccessKey(ctx, "AKIAUSER1") + if err != nil { + t.Fatalf("Failed to get user by access key: %v", err) + } + if foundUser.Name != "user1" { + t.Errorf("Expected user1, got %s", foundUser.Name) + } + + // Delete user + err = cm.DeleteUser(ctx, "user1") + if err != nil { + t.Fatalf("Failed to delete user: %v", err) + } + + // Verify user is deleted + _, err = cm.GetUser(ctx, "user1") + if err != ErrUserNotFound { + t.Errorf("Expected ErrUserNotFound, got %v", err) + } + + // Clean up + err = cm.DeleteUser(ctx, "user2") + if err != nil { + t.Fatalf("Failed to delete user2: %v", err) + } +} + +// TestErrorTypes tests that the custom error types are defined correctly +func TestErrorTypes(t *testing.T) { + // Test that error types are defined + if ErrUserNotFound == nil { + t.Error("ErrUserNotFound should be defined") + } + if ErrUserAlreadyExists == nil { + t.Error("ErrUserAlreadyExists should be defined") + } + if ErrAccessKeyNotFound == nil { + t.Error("ErrAccessKeyNotFound should be defined") + } + + // Test error messages + if ErrUserNotFound.Error() != "user not found" { + t.Errorf("Expected 'user not found', got '%s'", ErrUserNotFound.Error()) + } + if ErrUserAlreadyExists.Error() != "user already exists" { + t.Errorf("Expected 'user already exists', got '%s'", ErrUserAlreadyExists.Error()) + } + if ErrAccessKeyNotFound.Error() != "access key not found" { + t.Errorf("Expected 'access key not found', got '%s'", ErrAccessKeyNotFound.Error()) + } +} + +// TestGetAvailableStores tests the store discovery function +func TestGetAvailableStores(t *testing.T) { + stores := GetAvailableStores() + if len(stores) == 0 { + t.Skip("No stores available for testing") + } + + // Convert to strings for comparison + storeNames := make([]string, len(stores)) + for i, store := range stores { + storeNames[i] = string(store) + } + + t.Logf("Available stores: %v (count: %d)", storeNames, len(storeNames)) + + // We expect at least memory and filer_etc stores to be available + expectedStores := []string{string(StoreTypeFilerEtc), string(StoreTypeMemory)} + + // Add SQLite and PostgreSQL if they're available (build tags dependent) + for _, storeName := range storeNames { + found := false + for _, expected := range append(expectedStores, string(StoreTypeSQLite), string(StoreTypePostgres)) { + if storeName == expected { + found = true + break + } + } + if !found { + t.Errorf("Unexpected store found: %s", storeName) + } + } + + // Test that filer_etc store is always available + filerEtcStoreFound := false + memoryStoreFound := false + for _, storeName := range storeNames { + if storeName == string(StoreTypeFilerEtc) { + filerEtcStoreFound = true + } + if storeName == string(StoreTypeMemory) { + memoryStoreFound = true + } + } + if !filerEtcStoreFound { + t.Error("FilerEtc store should always be available") + } + if !memoryStoreFound { + t.Error("Memory store should always be available") + } +} diff --git a/weed/credential/filer_etc/filer_etc_store.go b/weed/credential/filer_etc/filer_etc_store.go new file mode 100644 index 000000000..6951cc103 --- /dev/null +++ b/weed/credential/filer_etc/filer_etc_store.go @@ -0,0 +1,235 @@ +package filer_etc + +import ( + "bytes" + "context" + "fmt" + + "github.com/seaweedfs/seaweedfs/weed/credential" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + "google.golang.org/grpc" +) + +func init() { + credential.Stores = append(credential.Stores, &FilerEtcStore{}) +} + +// FilerEtcStore implements CredentialStore using SeaweedFS filer for storage +type FilerEtcStore struct { + filerGrpcAddress string + grpcDialOption grpc.DialOption +} + +func (store *FilerEtcStore) GetName() credential.CredentialStoreTypeName { + return credential.StoreTypeFilerEtc +} + +func (store *FilerEtcStore) Initialize(configuration util.Configuration, prefix string) error { + // Handle nil configuration gracefully + if configuration != nil { + store.filerGrpcAddress = configuration.GetString(prefix + "filer") + // TODO: Initialize grpcDialOption based on configuration + } + // Note: filerGrpcAddress can be set later via SetFilerClient method + return nil +} + +// SetFilerClient sets the filer client details for the file store +func (store *FilerEtcStore) SetFilerClient(filerAddress string, grpcDialOption grpc.DialOption) { + store.filerGrpcAddress = filerAddress + store.grpcDialOption = grpcDialOption +} + +// withFilerClient executes a function with a filer client +func (store *FilerEtcStore) withFilerClient(fn func(client filer_pb.SeaweedFilerClient) error) error { + if store.filerGrpcAddress == "" { + return fmt.Errorf("filer address not configured") + } + + // Use the pb.WithGrpcFilerClient helper similar to existing code + return pb.WithGrpcFilerClient(false, 0, pb.ServerAddress(store.filerGrpcAddress), store.grpcDialOption, fn) +} + +func (store *FilerEtcStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiConfiguration, error) { + s3cfg := &iam_pb.S3ApiConfiguration{} + + err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + var buf bytes.Buffer + if err := filer.ReadEntry(nil, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil { + if err != filer_pb.ErrNotFound { + return err + } + } + if buf.Len() > 0 { + return filer.ParseS3ConfigurationFromBytes(buf.Bytes(), s3cfg) + } + return nil + }) + + return s3cfg, err +} + +func (store *FilerEtcStore) SaveConfiguration(ctx context.Context, config *iam_pb.S3ApiConfiguration) error { + return store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + var buf bytes.Buffer + if err := filer.ProtoToText(&buf, config); err != nil { + return fmt.Errorf("failed to marshal configuration: %v", err) + } + return filer.SaveInsideFiler(client, filer.IamConfigDirectory, filer.IamIdentityFile, buf.Bytes()) + }) +} + +func (store *FilerEtcStore) CreateUser(ctx context.Context, identity *iam_pb.Identity) error { + // Load existing configuration + config, err := store.LoadConfiguration(ctx) + if err != nil { + return fmt.Errorf("failed to load configuration: %v", err) + } + + // Check if user already exists + for _, existingIdentity := range config.Identities { + if existingIdentity.Name == identity.Name { + return credential.ErrUserAlreadyExists + } + } + + // Add new identity + config.Identities = append(config.Identities, identity) + + // Save configuration + return store.SaveConfiguration(ctx, config) +} + +func (store *FilerEtcStore) GetUser(ctx context.Context, username string) (*iam_pb.Identity, error) { + config, err := store.LoadConfiguration(ctx) + if err != nil { + return nil, fmt.Errorf("failed to load configuration: %v", err) + } + + for _, identity := range config.Identities { + if identity.Name == username { + return identity, nil + } + } + + return nil, credential.ErrUserNotFound +} + +func (store *FilerEtcStore) UpdateUser(ctx context.Context, username string, identity *iam_pb.Identity) error { + config, err := store.LoadConfiguration(ctx) + if err != nil { + return fmt.Errorf("failed to load configuration: %v", err) + } + + // Find and update the user + for i, existingIdentity := range config.Identities { + if existingIdentity.Name == username { + config.Identities[i] = identity + return store.SaveConfiguration(ctx, config) + } + } + + return credential.ErrUserNotFound +} + +func (store *FilerEtcStore) DeleteUser(ctx context.Context, username string) error { + config, err := store.LoadConfiguration(ctx) + if err != nil { + return fmt.Errorf("failed to load configuration: %v", err) + } + + // Find and remove the user + for i, identity := range config.Identities { + if identity.Name == username { + config.Identities = append(config.Identities[:i], config.Identities[i+1:]...) + return store.SaveConfiguration(ctx, config) + } + } + + return credential.ErrUserNotFound +} + +func (store *FilerEtcStore) ListUsers(ctx context.Context) ([]string, error) { + config, err := store.LoadConfiguration(ctx) + if err != nil { + return nil, fmt.Errorf("failed to load configuration: %v", err) + } + + var usernames []string + for _, identity := range config.Identities { + usernames = append(usernames, identity.Name) + } + + return usernames, nil +} + +func (store *FilerEtcStore) GetUserByAccessKey(ctx context.Context, accessKey string) (*iam_pb.Identity, error) { + config, err := store.LoadConfiguration(ctx) + if err != nil { + return nil, fmt.Errorf("failed to load configuration: %v", err) + } + + for _, identity := range config.Identities { + for _, credential := range identity.Credentials { + if credential.AccessKey == accessKey { + return identity, nil + } + } + } + + return nil, credential.ErrAccessKeyNotFound +} + +func (store *FilerEtcStore) CreateAccessKey(ctx context.Context, username string, cred *iam_pb.Credential) error { + config, err := store.LoadConfiguration(ctx) + if err != nil { + return fmt.Errorf("failed to load configuration: %v", err) + } + + // Find the user and add the credential + for _, identity := range config.Identities { + if identity.Name == username { + // Check if access key already exists + for _, existingCred := range identity.Credentials { + if existingCred.AccessKey == cred.AccessKey { + return fmt.Errorf("access key %s already exists", cred.AccessKey) + } + } + + identity.Credentials = append(identity.Credentials, cred) + return store.SaveConfiguration(ctx, config) + } + } + + return credential.ErrUserNotFound +} + +func (store *FilerEtcStore) DeleteAccessKey(ctx context.Context, username string, accessKey string) error { + config, err := store.LoadConfiguration(ctx) + if err != nil { + return fmt.Errorf("failed to load configuration: %v", err) + } + + // Find the user and remove the credential + for _, identity := range config.Identities { + if identity.Name == username { + for i, cred := range identity.Credentials { + if cred.AccessKey == accessKey { + identity.Credentials = append(identity.Credentials[:i], identity.Credentials[i+1:]...) + return store.SaveConfiguration(ctx, config) + } + } + return credential.ErrAccessKeyNotFound + } + } + + return credential.ErrUserNotFound +} + +func (store *FilerEtcStore) Shutdown() { + // No cleanup needed for file store +} diff --git a/weed/credential/memory/memory_store.go b/weed/credential/memory/memory_store.go new file mode 100644 index 000000000..e6117bf48 --- /dev/null +++ b/weed/credential/memory/memory_store.go @@ -0,0 +1,373 @@ +package memory + +import ( + "context" + "encoding/json" + "fmt" + "sync" + + "github.com/seaweedfs/seaweedfs/weed/credential" + "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +func init() { + credential.Stores = append(credential.Stores, &MemoryStore{}) +} + +// MemoryStore implements CredentialStore using in-memory storage +// This is primarily intended for testing purposes +type MemoryStore struct { + mu sync.RWMutex + users map[string]*iam_pb.Identity // username -> identity + accessKeys map[string]string // access_key -> username + initialized bool +} + +func (store *MemoryStore) GetName() credential.CredentialStoreTypeName { + return credential.StoreTypeMemory +} + +func (store *MemoryStore) Initialize(configuration util.Configuration, prefix string) error { + store.mu.Lock() + defer store.mu.Unlock() + + if store.initialized { + return nil + } + + store.users = make(map[string]*iam_pb.Identity) + store.accessKeys = make(map[string]string) + store.initialized = true + + return nil +} + +func (store *MemoryStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiConfiguration, error) { + store.mu.RLock() + defer store.mu.RUnlock() + + if !store.initialized { + return nil, fmt.Errorf("store not initialized") + } + + config := &iam_pb.S3ApiConfiguration{} + + // Convert all users to identities + for _, user := range store.users { + // Deep copy the identity to avoid mutation issues + identityCopy := store.deepCopyIdentity(user) + config.Identities = append(config.Identities, identityCopy) + } + + return config, nil +} + +func (store *MemoryStore) SaveConfiguration(ctx context.Context, config *iam_pb.S3ApiConfiguration) error { + store.mu.Lock() + defer store.mu.Unlock() + + if !store.initialized { + return fmt.Errorf("store not initialized") + } + + // Clear existing data + store.users = make(map[string]*iam_pb.Identity) + store.accessKeys = make(map[string]string) + + // Add all identities + for _, identity := range config.Identities { + // Deep copy to avoid mutation issues + identityCopy := store.deepCopyIdentity(identity) + store.users[identity.Name] = identityCopy + + // Index access keys + for _, credential := range identity.Credentials { + store.accessKeys[credential.AccessKey] = identity.Name + } + } + + return nil +} + +func (store *MemoryStore) CreateUser(ctx context.Context, identity *iam_pb.Identity) error { + store.mu.Lock() + defer store.mu.Unlock() + + if !store.initialized { + return fmt.Errorf("store not initialized") + } + + if _, exists := store.users[identity.Name]; exists { + return credential.ErrUserAlreadyExists + } + + // Check for duplicate access keys + for _, cred := range identity.Credentials { + if _, exists := store.accessKeys[cred.AccessKey]; exists { + return fmt.Errorf("access key %s already exists", cred.AccessKey) + } + } + + // Deep copy to avoid mutation issues + identityCopy := store.deepCopyIdentity(identity) + store.users[identity.Name] = identityCopy + + // Index access keys + for _, cred := range identity.Credentials { + store.accessKeys[cred.AccessKey] = identity.Name + } + + return nil +} + +func (store *MemoryStore) GetUser(ctx context.Context, username string) (*iam_pb.Identity, error) { + store.mu.RLock() + defer store.mu.RUnlock() + + if !store.initialized { + return nil, fmt.Errorf("store not initialized") + } + + user, exists := store.users[username] + if !exists { + return nil, credential.ErrUserNotFound + } + + // Return a deep copy to avoid mutation issues + return store.deepCopyIdentity(user), nil +} + +func (store *MemoryStore) UpdateUser(ctx context.Context, username string, identity *iam_pb.Identity) error { + store.mu.Lock() + defer store.mu.Unlock() + + if !store.initialized { + return fmt.Errorf("store not initialized") + } + + existingUser, exists := store.users[username] + if !exists { + return credential.ErrUserNotFound + } + + // Remove old access keys from index + for _, cred := range existingUser.Credentials { + delete(store.accessKeys, cred.AccessKey) + } + + // Check for duplicate access keys (excluding current user) + for _, cred := range identity.Credentials { + if existingUsername, exists := store.accessKeys[cred.AccessKey]; exists && existingUsername != username { + return fmt.Errorf("access key %s already exists", cred.AccessKey) + } + } + + // Deep copy to avoid mutation issues + identityCopy := store.deepCopyIdentity(identity) + store.users[username] = identityCopy + + // Re-index access keys + for _, cred := range identity.Credentials { + store.accessKeys[cred.AccessKey] = username + } + + return nil +} + +func (store *MemoryStore) DeleteUser(ctx context.Context, username string) error { + store.mu.Lock() + defer store.mu.Unlock() + + if !store.initialized { + return fmt.Errorf("store not initialized") + } + + user, exists := store.users[username] + if !exists { + return credential.ErrUserNotFound + } + + // Remove access keys from index + for _, cred := range user.Credentials { + delete(store.accessKeys, cred.AccessKey) + } + + // Remove user + delete(store.users, username) + + return nil +} + +func (store *MemoryStore) ListUsers(ctx context.Context) ([]string, error) { + store.mu.RLock() + defer store.mu.RUnlock() + + if !store.initialized { + return nil, fmt.Errorf("store not initialized") + } + + var usernames []string + for username := range store.users { + usernames = append(usernames, username) + } + + return usernames, nil +} + +func (store *MemoryStore) GetUserByAccessKey(ctx context.Context, accessKey string) (*iam_pb.Identity, error) { + store.mu.RLock() + defer store.mu.RUnlock() + + if !store.initialized { + return nil, fmt.Errorf("store not initialized") + } + + username, exists := store.accessKeys[accessKey] + if !exists { + return nil, credential.ErrAccessKeyNotFound + } + + user, exists := store.users[username] + if !exists { + // This should not happen, but handle it gracefully + return nil, credential.ErrUserNotFound + } + + // Return a deep copy to avoid mutation issues + return store.deepCopyIdentity(user), nil +} + +func (store *MemoryStore) CreateAccessKey(ctx context.Context, username string, cred *iam_pb.Credential) error { + store.mu.Lock() + defer store.mu.Unlock() + + if !store.initialized { + return fmt.Errorf("store not initialized") + } + + user, exists := store.users[username] + if !exists { + return credential.ErrUserNotFound + } + + // Check if access key already exists + if _, exists := store.accessKeys[cred.AccessKey]; exists { + return fmt.Errorf("access key %s already exists", cred.AccessKey) + } + + // Add credential to user + user.Credentials = append(user.Credentials, &iam_pb.Credential{ + AccessKey: cred.AccessKey, + SecretKey: cred.SecretKey, + }) + + // Index the access key + store.accessKeys[cred.AccessKey] = username + + return nil +} + +func (store *MemoryStore) DeleteAccessKey(ctx context.Context, username string, accessKey string) error { + store.mu.Lock() + defer store.mu.Unlock() + + if !store.initialized { + return fmt.Errorf("store not initialized") + } + + user, exists := store.users[username] + if !exists { + return credential.ErrUserNotFound + } + + // Find and remove the credential + var newCredentials []*iam_pb.Credential + found := false + for _, cred := range user.Credentials { + if cred.AccessKey == accessKey { + found = true + // Remove from access key index + delete(store.accessKeys, accessKey) + } else { + newCredentials = append(newCredentials, cred) + } + } + + if !found { + return credential.ErrAccessKeyNotFound + } + + user.Credentials = newCredentials + return nil +} + +func (store *MemoryStore) Shutdown() { + store.mu.Lock() + defer store.mu.Unlock() + + // Clear all data + store.users = nil + store.accessKeys = nil + store.initialized = false +} + +// deepCopyIdentity creates a deep copy of an identity to avoid mutation issues +func (store *MemoryStore) deepCopyIdentity(identity *iam_pb.Identity) *iam_pb.Identity { + if identity == nil { + return nil + } + + // Use JSON marshaling/unmarshaling for deep copy + // This is simple and safe for protobuf messages + data, err := json.Marshal(identity) + if err != nil { + // Fallback to shallow copy if JSON fails + return &iam_pb.Identity{ + Name: identity.Name, + Account: identity.Account, + Credentials: identity.Credentials, + Actions: identity.Actions, + } + } + + var copy iam_pb.Identity + if err := json.Unmarshal(data, ©); err != nil { + // Fallback to shallow copy if JSON fails + return &iam_pb.Identity{ + Name: identity.Name, + Account: identity.Account, + Credentials: identity.Credentials, + Actions: identity.Actions, + } + } + + return © +} + +// Reset clears all data in the store (useful for testing) +func (store *MemoryStore) Reset() { + store.mu.Lock() + defer store.mu.Unlock() + + if store.initialized { + store.users = make(map[string]*iam_pb.Identity) + store.accessKeys = make(map[string]string) + } +} + +// GetUserCount returns the number of users in the store (useful for testing) +func (store *MemoryStore) GetUserCount() int { + store.mu.RLock() + defer store.mu.RUnlock() + + return len(store.users) +} + +// GetAccessKeyCount returns the number of access keys in the store (useful for testing) +func (store *MemoryStore) GetAccessKeyCount() int { + store.mu.RLock() + defer store.mu.RUnlock() + + return len(store.accessKeys) +} diff --git a/weed/credential/memory/memory_store_test.go b/weed/credential/memory/memory_store_test.go new file mode 100644 index 000000000..567b5bb3e --- /dev/null +++ b/weed/credential/memory/memory_store_test.go @@ -0,0 +1,315 @@ +package memory + +import ( + "context" + "fmt" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/credential" + "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +func TestMemoryStore(t *testing.T) { + store := &MemoryStore{} + + // Test initialization + config := util.GetViper() + if err := store.Initialize(config, "credential."); err != nil { + t.Fatalf("Failed to initialize store: %v", err) + } + + ctx := context.Background() + + // Test creating a user + identity := &iam_pb.Identity{ + Name: "testuser", + Credentials: []*iam_pb.Credential{ + { + AccessKey: "access123", + SecretKey: "secret123", + }, + }, + } + + if err := store.CreateUser(ctx, identity); err != nil { + t.Fatalf("Failed to create user: %v", err) + } + + // Test getting user + retrievedUser, err := store.GetUser(ctx, "testuser") + if err != nil { + t.Fatalf("Failed to get user: %v", err) + } + + if retrievedUser.Name != "testuser" { + t.Errorf("Expected username 'testuser', got '%s'", retrievedUser.Name) + } + + if len(retrievedUser.Credentials) != 1 { + t.Errorf("Expected 1 credential, got %d", len(retrievedUser.Credentials)) + } + + // Test getting user by access key + userByAccessKey, err := store.GetUserByAccessKey(ctx, "access123") + if err != nil { + t.Fatalf("Failed to get user by access key: %v", err) + } + + if userByAccessKey.Name != "testuser" { + t.Errorf("Expected username 'testuser', got '%s'", userByAccessKey.Name) + } + + // Test listing users + users, err := store.ListUsers(ctx) + if err != nil { + t.Fatalf("Failed to list users: %v", err) + } + + if len(users) != 1 || users[0] != "testuser" { + t.Errorf("Expected ['testuser'], got %v", users) + } + + // Test creating access key + newCred := &iam_pb.Credential{ + AccessKey: "access456", + SecretKey: "secret456", + } + + if err := store.CreateAccessKey(ctx, "testuser", newCred); err != nil { + t.Fatalf("Failed to create access key: %v", err) + } + + // Verify user now has 2 credentials + updatedUser, err := store.GetUser(ctx, "testuser") + if err != nil { + t.Fatalf("Failed to get updated user: %v", err) + } + + if len(updatedUser.Credentials) != 2 { + t.Errorf("Expected 2 credentials, got %d", len(updatedUser.Credentials)) + } + + // Test deleting access key + if err := store.DeleteAccessKey(ctx, "testuser", "access456"); err != nil { + t.Fatalf("Failed to delete access key: %v", err) + } + + // Verify user now has 1 credential again + finalUser, err := store.GetUser(ctx, "testuser") + if err != nil { + t.Fatalf("Failed to get final user: %v", err) + } + + if len(finalUser.Credentials) != 1 { + t.Errorf("Expected 1 credential, got %d", len(finalUser.Credentials)) + } + + // Test deleting user + if err := store.DeleteUser(ctx, "testuser"); err != nil { + t.Fatalf("Failed to delete user: %v", err) + } + + // Verify user is gone + _, err = store.GetUser(ctx, "testuser") + if err != credential.ErrUserNotFound { + t.Errorf("Expected ErrUserNotFound, got %v", err) + } + + // Test error cases + if err := store.CreateUser(ctx, identity); err != nil { + t.Fatalf("Failed to create user for error tests: %v", err) + } + + // Try to create duplicate user + if err := store.CreateUser(ctx, identity); err != credential.ErrUserAlreadyExists { + t.Errorf("Expected ErrUserAlreadyExists, got %v", err) + } + + // Try to get non-existent user + _, err = store.GetUser(ctx, "nonexistent") + if err != credential.ErrUserNotFound { + t.Errorf("Expected ErrUserNotFound, got %v", err) + } + + // Try to get user by non-existent access key + _, err = store.GetUserByAccessKey(ctx, "nonexistent") + if err != credential.ErrAccessKeyNotFound { + t.Errorf("Expected ErrAccessKeyNotFound, got %v", err) + } +} + +func TestMemoryStoreConcurrency(t *testing.T) { + store := &MemoryStore{} + config := util.GetViper() + if err := store.Initialize(config, "credential."); err != nil { + t.Fatalf("Failed to initialize store: %v", err) + } + + ctx := context.Background() + + // Test concurrent access + done := make(chan bool, 10) + for i := 0; i < 10; i++ { + go func(i int) { + defer func() { done <- true }() + + username := fmt.Sprintf("user%d", i) + identity := &iam_pb.Identity{ + Name: username, + Credentials: []*iam_pb.Credential{ + { + AccessKey: fmt.Sprintf("access%d", i), + SecretKey: fmt.Sprintf("secret%d", i), + }, + }, + } + + if err := store.CreateUser(ctx, identity); err != nil { + t.Errorf("Failed to create user %s: %v", username, err) + return + } + + if _, err := store.GetUser(ctx, username); err != nil { + t.Errorf("Failed to get user %s: %v", username, err) + return + } + }(i) + } + + // Wait for all goroutines to complete + for i := 0; i < 10; i++ { + <-done + } + + // Verify all users were created + users, err := store.ListUsers(ctx) + if err != nil { + t.Fatalf("Failed to list users: %v", err) + } + + if len(users) != 10 { + t.Errorf("Expected 10 users, got %d", len(users)) + } +} + +func TestMemoryStoreReset(t *testing.T) { + store := &MemoryStore{} + config := util.GetViper() + if err := store.Initialize(config, "credential."); err != nil { + t.Fatalf("Failed to initialize store: %v", err) + } + + ctx := context.Background() + + // Create a user + identity := &iam_pb.Identity{ + Name: "testuser", + Credentials: []*iam_pb.Credential{ + { + AccessKey: "access123", + SecretKey: "secret123", + }, + }, + } + + if err := store.CreateUser(ctx, identity); err != nil { + t.Fatalf("Failed to create user: %v", err) + } + + // Verify user exists + if store.GetUserCount() != 1 { + t.Errorf("Expected 1 user, got %d", store.GetUserCount()) + } + + if store.GetAccessKeyCount() != 1 { + t.Errorf("Expected 1 access key, got %d", store.GetAccessKeyCount()) + } + + // Reset the store + store.Reset() + + // Verify store is empty + if store.GetUserCount() != 0 { + t.Errorf("Expected 0 users after reset, got %d", store.GetUserCount()) + } + + if store.GetAccessKeyCount() != 0 { + t.Errorf("Expected 0 access keys after reset, got %d", store.GetAccessKeyCount()) + } + + // Verify user is gone + _, err := store.GetUser(ctx, "testuser") + if err != credential.ErrUserNotFound { + t.Errorf("Expected ErrUserNotFound after reset, got %v", err) + } +} + +func TestMemoryStoreConfigurationSaveLoad(t *testing.T) { + store := &MemoryStore{} + config := util.GetViper() + if err := store.Initialize(config, "credential."); err != nil { + t.Fatalf("Failed to initialize store: %v", err) + } + + ctx := context.Background() + + // Create initial configuration + originalConfig := &iam_pb.S3ApiConfiguration{ + Identities: []*iam_pb.Identity{ + { + Name: "user1", + Credentials: []*iam_pb.Credential{ + { + AccessKey: "access1", + SecretKey: "secret1", + }, + }, + }, + { + Name: "user2", + Credentials: []*iam_pb.Credential{ + { + AccessKey: "access2", + SecretKey: "secret2", + }, + }, + }, + }, + } + + // Save configuration + if err := store.SaveConfiguration(ctx, originalConfig); err != nil { + t.Fatalf("Failed to save configuration: %v", err) + } + + // Load configuration + loadedConfig, err := store.LoadConfiguration(ctx) + if err != nil { + t.Fatalf("Failed to load configuration: %v", err) + } + + // Verify configuration matches + if len(loadedConfig.Identities) != 2 { + t.Errorf("Expected 2 identities, got %d", len(loadedConfig.Identities)) + } + + // Check users exist + user1, err := store.GetUser(ctx, "user1") + if err != nil { + t.Fatalf("Failed to get user1: %v", err) + } + + if len(user1.Credentials) != 1 || user1.Credentials[0].AccessKey != "access1" { + t.Errorf("User1 credentials not correct: %+v", user1.Credentials) + } + + user2, err := store.GetUser(ctx, "user2") + if err != nil { + t.Fatalf("Failed to get user2: %v", err) + } + + if len(user2.Credentials) != 1 || user2.Credentials[0].AccessKey != "access2" { + t.Errorf("User2 credentials not correct: %+v", user2.Credentials) + } +} diff --git a/weed/credential/migration.go b/weed/credential/migration.go new file mode 100644 index 000000000..b286bce62 --- /dev/null +++ b/weed/credential/migration.go @@ -0,0 +1,221 @@ +package credential + +import ( + "context" + "fmt" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +// MigrateCredentials migrates credentials from one store to another +func MigrateCredentials(fromStoreName, toStoreName CredentialStoreTypeName, configuration util.Configuration, fromPrefix, toPrefix string) error { + ctx := context.Background() + + // Create source credential manager + fromCM, err := NewCredentialManager(fromStoreName, configuration, fromPrefix) + if err != nil { + return fmt.Errorf("failed to create source credential manager (%s): %v", fromStoreName, err) + } + defer fromCM.Shutdown() + + // Create destination credential manager + toCM, err := NewCredentialManager(toStoreName, configuration, toPrefix) + if err != nil { + return fmt.Errorf("failed to create destination credential manager (%s): %v", toStoreName, err) + } + defer toCM.Shutdown() + + // Load configuration from source + glog.Infof("Loading configuration from %s store...", fromStoreName) + config, err := fromCM.LoadConfiguration(ctx) + if err != nil { + return fmt.Errorf("failed to load configuration from source store: %v", err) + } + + if config == nil || len(config.Identities) == 0 { + glog.Info("No identities found in source store") + return nil + } + + glog.Infof("Found %d identities in source store", len(config.Identities)) + + // Migrate each identity + var migrated, failed int + for _, identity := range config.Identities { + glog.V(1).Infof("Migrating user: %s", identity.Name) + + // Check if user already exists in destination + existingUser, err := toCM.GetUser(ctx, identity.Name) + if err != nil && err != ErrUserNotFound { + glog.Errorf("Failed to check if user %s exists in destination: %v", identity.Name, err) + failed++ + continue + } + + if existingUser != nil { + glog.Warningf("User %s already exists in destination store, skipping", identity.Name) + continue + } + + // Create user in destination + err = toCM.CreateUser(ctx, identity) + if err != nil { + glog.Errorf("Failed to create user %s in destination store: %v", identity.Name, err) + failed++ + continue + } + + migrated++ + glog.V(1).Infof("Successfully migrated user: %s", identity.Name) + } + + glog.Infof("Migration completed: %d migrated, %d failed", migrated, failed) + + if failed > 0 { + return fmt.Errorf("migration completed with %d failures", failed) + } + + return nil +} + +// ExportCredentials exports credentials from a store to a configuration +func ExportCredentials(storeName CredentialStoreTypeName, configuration util.Configuration, prefix string) (*iam_pb.S3ApiConfiguration, error) { + ctx := context.Background() + + // Create credential manager + cm, err := NewCredentialManager(storeName, configuration, prefix) + if err != nil { + return nil, fmt.Errorf("failed to create credential manager (%s): %v", storeName, err) + } + defer cm.Shutdown() + + // Load configuration + config, err := cm.LoadConfiguration(ctx) + if err != nil { + return nil, fmt.Errorf("failed to load configuration: %v", err) + } + + return config, nil +} + +// ImportCredentials imports credentials from a configuration to a store +func ImportCredentials(storeName CredentialStoreTypeName, configuration util.Configuration, prefix string, config *iam_pb.S3ApiConfiguration) error { + ctx := context.Background() + + // Create credential manager + cm, err := NewCredentialManager(storeName, configuration, prefix) + if err != nil { + return fmt.Errorf("failed to create credential manager (%s): %v", storeName, err) + } + defer cm.Shutdown() + + // Import each identity + var imported, failed int + for _, identity := range config.Identities { + glog.V(1).Infof("Importing user: %s", identity.Name) + + // Check if user already exists + existingUser, err := cm.GetUser(ctx, identity.Name) + if err != nil && err != ErrUserNotFound { + glog.Errorf("Failed to check if user %s exists: %v", identity.Name, err) + failed++ + continue + } + + if existingUser != nil { + glog.Warningf("User %s already exists, skipping", identity.Name) + continue + } + + // Create user + err = cm.CreateUser(ctx, identity) + if err != nil { + glog.Errorf("Failed to create user %s: %v", identity.Name, err) + failed++ + continue + } + + imported++ + glog.V(1).Infof("Successfully imported user: %s", identity.Name) + } + + glog.Infof("Import completed: %d imported, %d failed", imported, failed) + + if failed > 0 { + return fmt.Errorf("import completed with %d failures", failed) + } + + return nil +} + +// ValidateCredentials validates that all credentials in a store are accessible +func ValidateCredentials(storeName CredentialStoreTypeName, configuration util.Configuration, prefix string) error { + ctx := context.Background() + + // Create credential manager + cm, err := NewCredentialManager(storeName, configuration, prefix) + if err != nil { + return fmt.Errorf("failed to create credential manager (%s): %v", storeName, err) + } + defer cm.Shutdown() + + // Load configuration + config, err := cm.LoadConfiguration(ctx) + if err != nil { + return fmt.Errorf("failed to load configuration: %v", err) + } + + if config == nil || len(config.Identities) == 0 { + glog.Info("No identities found in store") + return nil + } + + glog.Infof("Validating %d identities...", len(config.Identities)) + + // Validate each identity + var validated, failed int + for _, identity := range config.Identities { + // Check if user can be retrieved + user, err := cm.GetUser(ctx, identity.Name) + if err != nil { + glog.Errorf("Failed to retrieve user %s: %v", identity.Name, err) + failed++ + continue + } + + if user == nil { + glog.Errorf("User %s not found", identity.Name) + failed++ + continue + } + + // Validate access keys + for _, credential := range identity.Credentials { + accessKeyUser, err := cm.GetUserByAccessKey(ctx, credential.AccessKey) + if err != nil { + glog.Errorf("Failed to retrieve user by access key %s: %v", credential.AccessKey, err) + failed++ + continue + } + + if accessKeyUser == nil || accessKeyUser.Name != identity.Name { + glog.Errorf("Access key %s does not map to correct user %s", credential.AccessKey, identity.Name) + failed++ + continue + } + } + + validated++ + glog.V(1).Infof("Successfully validated user: %s", identity.Name) + } + + glog.Infof("Validation completed: %d validated, %d failed", validated, failed) + + if failed > 0 { + return fmt.Errorf("validation completed with %d failures", failed) + } + + return nil +} diff --git a/weed/credential/postgres/postgres_store.go b/weed/credential/postgres/postgres_store.go new file mode 100644 index 000000000..0d75ad8c0 --- /dev/null +++ b/weed/credential/postgres/postgres_store.go @@ -0,0 +1,570 @@ +package postgres + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/credential" + "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + + _ "github.com/lib/pq" +) + +func init() { + credential.Stores = append(credential.Stores, &PostgresStore{}) +} + +// PostgresStore implements CredentialStore using PostgreSQL +type PostgresStore struct { + db *sql.DB + configured bool +} + +func (store *PostgresStore) GetName() credential.CredentialStoreTypeName { + return credential.StoreTypePostgres +} + +func (store *PostgresStore) Initialize(configuration util.Configuration, prefix string) error { + if store.configured { + return nil + } + + hostname := configuration.GetString(prefix + "hostname") + port := configuration.GetInt(prefix + "port") + username := configuration.GetString(prefix + "username") + password := configuration.GetString(prefix + "password") + database := configuration.GetString(prefix + "database") + schema := configuration.GetString(prefix + "schema") + sslmode := configuration.GetString(prefix + "sslmode") + + // Set defaults + if hostname == "" { + hostname = "localhost" + } + if port == 0 { + port = 5432 + } + if schema == "" { + schema = "public" + } + if sslmode == "" { + sslmode = "disable" + } + + // Build connection string + connStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s search_path=%s", + hostname, port, username, password, database, sslmode, schema) + + db, err := sql.Open("postgres", connStr) + if err != nil { + return fmt.Errorf("failed to open database: %v", err) + } + + // Test connection + if err := db.Ping(); err != nil { + db.Close() + return fmt.Errorf("failed to ping database: %v", err) + } + + // Set connection pool settings + db.SetMaxOpenConns(25) + db.SetMaxIdleConns(5) + db.SetConnMaxLifetime(5 * time.Minute) + + store.db = db + + // Create tables if they don't exist + if err := store.createTables(); err != nil { + db.Close() + return fmt.Errorf("failed to create tables: %v", err) + } + + store.configured = true + return nil +} + +func (store *PostgresStore) createTables() error { + // Create users table + usersTable := ` + CREATE TABLE IF NOT EXISTS users ( + username VARCHAR(255) PRIMARY KEY, + email VARCHAR(255), + account_data JSONB, + actions JSONB, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ); + CREATE INDEX IF NOT EXISTS idx_users_email ON users(email); + ` + + // Create credentials table + credentialsTable := ` + CREATE TABLE IF NOT EXISTS credentials ( + id SERIAL PRIMARY KEY, + username VARCHAR(255) REFERENCES users(username) ON DELETE CASCADE, + access_key VARCHAR(255) UNIQUE NOT NULL, + secret_key VARCHAR(255) NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ); + CREATE INDEX IF NOT EXISTS idx_credentials_username ON credentials(username); + CREATE INDEX IF NOT EXISTS idx_credentials_access_key ON credentials(access_key); + ` + + // Execute table creation + if _, err := store.db.Exec(usersTable); err != nil { + return fmt.Errorf("failed to create users table: %v", err) + } + + if _, err := store.db.Exec(credentialsTable); err != nil { + return fmt.Errorf("failed to create credentials table: %v", err) + } + + return nil +} + +func (store *PostgresStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiConfiguration, error) { + if !store.configured { + return nil, fmt.Errorf("store not configured") + } + + config := &iam_pb.S3ApiConfiguration{} + + // Query all users + rows, err := store.db.QueryContext(ctx, "SELECT username, email, account_data, actions FROM users") + if err != nil { + return nil, fmt.Errorf("failed to query users: %v", err) + } + defer rows.Close() + + for rows.Next() { + var username, email string + var accountDataJSON, actionsJSON []byte + + if err := rows.Scan(&username, &email, &accountDataJSON, &actionsJSON); err != nil { + return nil, fmt.Errorf("failed to scan user row: %v", err) + } + + identity := &iam_pb.Identity{ + Name: username, + } + + // Parse account data + if len(accountDataJSON) > 0 { + if err := json.Unmarshal(accountDataJSON, &identity.Account); err != nil { + return nil, fmt.Errorf("failed to unmarshal account data for user %s: %v", username, err) + } + } + + // Parse actions + if len(actionsJSON) > 0 { + if err := json.Unmarshal(actionsJSON, &identity.Actions); err != nil { + return nil, fmt.Errorf("failed to unmarshal actions for user %s: %v", username, err) + } + } + + // Query credentials for this user + credRows, err := store.db.QueryContext(ctx, "SELECT access_key, secret_key FROM credentials WHERE username = $1", username) + if err != nil { + return nil, fmt.Errorf("failed to query credentials for user %s: %v", username, err) + } + + for credRows.Next() { + var accessKey, secretKey string + if err := credRows.Scan(&accessKey, &secretKey); err != nil { + credRows.Close() + return nil, fmt.Errorf("failed to scan credential row for user %s: %v", username, err) + } + + identity.Credentials = append(identity.Credentials, &iam_pb.Credential{ + AccessKey: accessKey, + SecretKey: secretKey, + }) + } + credRows.Close() + + config.Identities = append(config.Identities, identity) + } + + return config, nil +} + +func (store *PostgresStore) SaveConfiguration(ctx context.Context, config *iam_pb.S3ApiConfiguration) error { + if !store.configured { + return fmt.Errorf("store not configured") + } + + // Start transaction + tx, err := store.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("failed to begin transaction: %v", err) + } + defer tx.Rollback() + + // Clear existing data + if _, err := tx.ExecContext(ctx, "DELETE FROM credentials"); err != nil { + return fmt.Errorf("failed to clear credentials: %v", err) + } + if _, err := tx.ExecContext(ctx, "DELETE FROM users"); err != nil { + return fmt.Errorf("failed to clear users: %v", err) + } + + // Insert all identities + for _, identity := range config.Identities { + // Marshal account data + var accountDataJSON []byte + if identity.Account != nil { + accountDataJSON, err = json.Marshal(identity.Account) + if err != nil { + return fmt.Errorf("failed to marshal account data for user %s: %v", identity.Name, err) + } + } + + // Marshal actions + var actionsJSON []byte + if identity.Actions != nil { + actionsJSON, err = json.Marshal(identity.Actions) + if err != nil { + return fmt.Errorf("failed to marshal actions for user %s: %v", identity.Name, err) + } + } + + // Insert user + _, err := tx.ExecContext(ctx, + "INSERT INTO users (username, email, account_data, actions) VALUES ($1, $2, $3, $4)", + identity.Name, "", accountDataJSON, actionsJSON) + if err != nil { + return fmt.Errorf("failed to insert user %s: %v", identity.Name, err) + } + + // Insert credentials + for _, cred := range identity.Credentials { + _, err := tx.ExecContext(ctx, + "INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)", + identity.Name, cred.AccessKey, cred.SecretKey) + if err != nil { + return fmt.Errorf("failed to insert credential for user %s: %v", identity.Name, err) + } + } + } + + return tx.Commit() +} + +func (store *PostgresStore) CreateUser(ctx context.Context, identity *iam_pb.Identity) error { + if !store.configured { + return fmt.Errorf("store not configured") + } + + // Check if user already exists + var count int + err := store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", identity.Name).Scan(&count) + if err != nil { + return fmt.Errorf("failed to check user existence: %v", err) + } + if count > 0 { + return credential.ErrUserAlreadyExists + } + + // Start transaction + tx, err := store.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("failed to begin transaction: %v", err) + } + defer tx.Rollback() + + // Marshal account data + var accountDataJSON []byte + if identity.Account != nil { + accountDataJSON, err = json.Marshal(identity.Account) + if err != nil { + return fmt.Errorf("failed to marshal account data: %v", err) + } + } + + // Marshal actions + var actionsJSON []byte + if identity.Actions != nil { + actionsJSON, err = json.Marshal(identity.Actions) + if err != nil { + return fmt.Errorf("failed to marshal actions: %v", err) + } + } + + // Insert user + _, err = tx.ExecContext(ctx, + "INSERT INTO users (username, email, account_data, actions) VALUES ($1, $2, $3, $4)", + identity.Name, "", accountDataJSON, actionsJSON) + if err != nil { + return fmt.Errorf("failed to insert user: %v", err) + } + + // Insert credentials + for _, cred := range identity.Credentials { + _, err = tx.ExecContext(ctx, + "INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)", + identity.Name, cred.AccessKey, cred.SecretKey) + if err != nil { + return fmt.Errorf("failed to insert credential: %v", err) + } + } + + return tx.Commit() +} + +func (store *PostgresStore) GetUser(ctx context.Context, username string) (*iam_pb.Identity, error) { + if !store.configured { + return nil, fmt.Errorf("store not configured") + } + + var email string + var accountDataJSON, actionsJSON []byte + + err := store.db.QueryRowContext(ctx, + "SELECT email, account_data, actions FROM users WHERE username = $1", + username).Scan(&email, &accountDataJSON, &actionsJSON) + if err != nil { + if err == sql.ErrNoRows { + return nil, credential.ErrUserNotFound + } + return nil, fmt.Errorf("failed to query user: %v", err) + } + + identity := &iam_pb.Identity{ + Name: username, + } + + // Parse account data + if len(accountDataJSON) > 0 { + if err := json.Unmarshal(accountDataJSON, &identity.Account); err != nil { + return nil, fmt.Errorf("failed to unmarshal account data: %v", err) + } + } + + // Parse actions + if len(actionsJSON) > 0 { + if err := json.Unmarshal(actionsJSON, &identity.Actions); err != nil { + return nil, fmt.Errorf("failed to unmarshal actions: %v", err) + } + } + + // Query credentials + rows, err := store.db.QueryContext(ctx, "SELECT access_key, secret_key FROM credentials WHERE username = $1", username) + if err != nil { + return nil, fmt.Errorf("failed to query credentials: %v", err) + } + defer rows.Close() + + for rows.Next() { + var accessKey, secretKey string + if err := rows.Scan(&accessKey, &secretKey); err != nil { + return nil, fmt.Errorf("failed to scan credential: %v", err) + } + + identity.Credentials = append(identity.Credentials, &iam_pb.Credential{ + AccessKey: accessKey, + SecretKey: secretKey, + }) + } + + return identity, nil +} + +func (store *PostgresStore) UpdateUser(ctx context.Context, username string, identity *iam_pb.Identity) error { + if !store.configured { + return fmt.Errorf("store not configured") + } + + // Start transaction + tx, err := store.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("failed to begin transaction: %v", err) + } + defer tx.Rollback() + + // Check if user exists + var count int + err = tx.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", username).Scan(&count) + if err != nil { + return fmt.Errorf("failed to check user existence: %v", err) + } + if count == 0 { + return credential.ErrUserNotFound + } + + // Marshal account data + var accountDataJSON []byte + if identity.Account != nil { + accountDataJSON, err = json.Marshal(identity.Account) + if err != nil { + return fmt.Errorf("failed to marshal account data: %v", err) + } + } + + // Marshal actions + var actionsJSON []byte + if identity.Actions != nil { + actionsJSON, err = json.Marshal(identity.Actions) + if err != nil { + return fmt.Errorf("failed to marshal actions: %v", err) + } + } + + // Update user + _, err = tx.ExecContext(ctx, + "UPDATE users SET email = $2, account_data = $3, actions = $4, updated_at = CURRENT_TIMESTAMP WHERE username = $1", + username, "", accountDataJSON, actionsJSON) + if err != nil { + return fmt.Errorf("failed to update user: %v", err) + } + + // Delete existing credentials + _, err = tx.ExecContext(ctx, "DELETE FROM credentials WHERE username = $1", username) + if err != nil { + return fmt.Errorf("failed to delete existing credentials: %v", err) + } + + // Insert new credentials + for _, cred := range identity.Credentials { + _, err = tx.ExecContext(ctx, + "INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)", + username, cred.AccessKey, cred.SecretKey) + if err != nil { + return fmt.Errorf("failed to insert credential: %v", err) + } + } + + return tx.Commit() +} + +func (store *PostgresStore) DeleteUser(ctx context.Context, username string) error { + if !store.configured { + return fmt.Errorf("store not configured") + } + + result, err := store.db.ExecContext(ctx, "DELETE FROM users WHERE username = $1", username) + if err != nil { + return fmt.Errorf("failed to delete user: %v", err) + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected: %v", err) + } + + if rowsAffected == 0 { + return credential.ErrUserNotFound + } + + return nil +} + +func (store *PostgresStore) ListUsers(ctx context.Context) ([]string, error) { + if !store.configured { + return nil, fmt.Errorf("store not configured") + } + + rows, err := store.db.QueryContext(ctx, "SELECT username FROM users ORDER BY username") + if err != nil { + return nil, fmt.Errorf("failed to query users: %v", err) + } + defer rows.Close() + + var usernames []string + for rows.Next() { + var username string + if err := rows.Scan(&username); err != nil { + return nil, fmt.Errorf("failed to scan username: %v", err) + } + usernames = append(usernames, username) + } + + return usernames, nil +} + +func (store *PostgresStore) GetUserByAccessKey(ctx context.Context, accessKey string) (*iam_pb.Identity, error) { + if !store.configured { + return nil, fmt.Errorf("store not configured") + } + + var username string + err := store.db.QueryRowContext(ctx, "SELECT username FROM credentials WHERE access_key = $1", accessKey).Scan(&username) + if err != nil { + if err == sql.ErrNoRows { + return nil, credential.ErrAccessKeyNotFound + } + return nil, fmt.Errorf("failed to query access key: %v", err) + } + + return store.GetUser(ctx, username) +} + +func (store *PostgresStore) CreateAccessKey(ctx context.Context, username string, cred *iam_pb.Credential) error { + if !store.configured { + return fmt.Errorf("store not configured") + } + + // Check if user exists + var count int + err := store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", username).Scan(&count) + if err != nil { + return fmt.Errorf("failed to check user existence: %v", err) + } + if count == 0 { + return credential.ErrUserNotFound + } + + // Insert credential + _, err = store.db.ExecContext(ctx, + "INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)", + username, cred.AccessKey, cred.SecretKey) + if err != nil { + return fmt.Errorf("failed to insert credential: %v", err) + } + + return nil +} + +func (store *PostgresStore) DeleteAccessKey(ctx context.Context, username string, accessKey string) error { + if !store.configured { + return fmt.Errorf("store not configured") + } + + result, err := store.db.ExecContext(ctx, + "DELETE FROM credentials WHERE username = $1 AND access_key = $2", + username, accessKey) + if err != nil { + return fmt.Errorf("failed to delete access key: %v", err) + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected: %v", err) + } + + if rowsAffected == 0 { + // Check if user exists + var count int + err = store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", username).Scan(&count) + if err != nil { + return fmt.Errorf("failed to check user existence: %v", err) + } + if count == 0 { + return credential.ErrUserNotFound + } + return credential.ErrAccessKeyNotFound + } + + return nil +} + +func (store *PostgresStore) Shutdown() { + if store.db != nil { + store.db.Close() + store.db = nil + } + store.configured = false +} diff --git a/weed/credential/sqlite/sqlite_store.go b/weed/credential/sqlite/sqlite_store.go new file mode 100644 index 000000000..70d015fc9 --- /dev/null +++ b/weed/credential/sqlite/sqlite_store.go @@ -0,0 +1,557 @@ +package sqlite + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "os" + "path/filepath" + + "github.com/seaweedfs/seaweedfs/weed/credential" + "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + + _ "modernc.org/sqlite" +) + +func init() { + credential.Stores = append(credential.Stores, &SqliteStore{}) +} + +// SqliteStore implements CredentialStore using SQLite +type SqliteStore struct { + db *sql.DB + configured bool +} + +func (store *SqliteStore) GetName() credential.CredentialStoreTypeName { + return credential.StoreTypeSQLite +} + +func (store *SqliteStore) Initialize(configuration util.Configuration, prefix string) error { + if store.configured { + return nil + } + + dbFile := configuration.GetString(prefix + "dbFile") + if dbFile == "" { + dbFile = "seaweedfs_credentials.db" + } + + // Create directory if it doesn't exist + dir := filepath.Dir(dbFile) + if dir != "." { + if err := os.MkdirAll(dir, 0755); err != nil { + return fmt.Errorf("failed to create directory %s: %v", dir, err) + } + } + + db, err := sql.Open("sqlite", dbFile) + if err != nil { + return fmt.Errorf("failed to open database: %v", err) + } + + // Test connection + if err := db.Ping(); err != nil { + db.Close() + return fmt.Errorf("failed to ping database: %v", err) + } + + store.db = db + + // Create tables if they don't exist + if err := store.createTables(); err != nil { + db.Close() + return fmt.Errorf("failed to create tables: %v", err) + } + + store.configured = true + return nil +} + +func (store *SqliteStore) createTables() error { + // Create users table + usersTable := ` + CREATE TABLE IF NOT EXISTS users ( + username TEXT PRIMARY KEY, + email TEXT, + account_data TEXT, + actions TEXT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP + ); + CREATE INDEX IF NOT EXISTS idx_users_email ON users(email); + ` + + // Create credentials table + credentialsTable := ` + CREATE TABLE IF NOT EXISTS credentials ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + username TEXT REFERENCES users(username) ON DELETE CASCADE, + access_key TEXT UNIQUE NOT NULL, + secret_key TEXT NOT NULL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP + ); + CREATE INDEX IF NOT EXISTS idx_credentials_username ON credentials(username); + CREATE INDEX IF NOT EXISTS idx_credentials_access_key ON credentials(access_key); + ` + + // Execute table creation + if _, err := store.db.Exec(usersTable); err != nil { + return fmt.Errorf("failed to create users table: %v", err) + } + + if _, err := store.db.Exec(credentialsTable); err != nil { + return fmt.Errorf("failed to create credentials table: %v", err) + } + + return nil +} + +func (store *SqliteStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiConfiguration, error) { + if !store.configured { + return nil, fmt.Errorf("store not configured") + } + + config := &iam_pb.S3ApiConfiguration{} + + // Query all users + rows, err := store.db.QueryContext(ctx, "SELECT username, email, account_data, actions FROM users") + if err != nil { + return nil, fmt.Errorf("failed to query users: %v", err) + } + defer rows.Close() + + for rows.Next() { + var username, email, accountDataJSON, actionsJSON string + + if err := rows.Scan(&username, &email, &accountDataJSON, &actionsJSON); err != nil { + return nil, fmt.Errorf("failed to scan user row: %v", err) + } + + identity := &iam_pb.Identity{ + Name: username, + } + + // Parse account data + if accountDataJSON != "" { + if err := json.Unmarshal([]byte(accountDataJSON), &identity.Account); err != nil { + return nil, fmt.Errorf("failed to unmarshal account data for user %s: %v", username, err) + } + } + + // Parse actions + if actionsJSON != "" { + if err := json.Unmarshal([]byte(actionsJSON), &identity.Actions); err != nil { + return nil, fmt.Errorf("failed to unmarshal actions for user %s: %v", username, err) + } + } + + // Query credentials for this user + credRows, err := store.db.QueryContext(ctx, "SELECT access_key, secret_key FROM credentials WHERE username = ?", username) + if err != nil { + return nil, fmt.Errorf("failed to query credentials for user %s: %v", username, err) + } + + for credRows.Next() { + var accessKey, secretKey string + if err := credRows.Scan(&accessKey, &secretKey); err != nil { + credRows.Close() + return nil, fmt.Errorf("failed to scan credential row for user %s: %v", username, err) + } + + identity.Credentials = append(identity.Credentials, &iam_pb.Credential{ + AccessKey: accessKey, + SecretKey: secretKey, + }) + } + credRows.Close() + + config.Identities = append(config.Identities, identity) + } + + return config, nil +} + +func (store *SqliteStore) SaveConfiguration(ctx context.Context, config *iam_pb.S3ApiConfiguration) error { + if !store.configured { + return fmt.Errorf("store not configured") + } + + // Start transaction + tx, err := store.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("failed to begin transaction: %v", err) + } + defer tx.Rollback() + + // Clear existing data + if _, err := tx.ExecContext(ctx, "DELETE FROM credentials"); err != nil { + return fmt.Errorf("failed to clear credentials: %v", err) + } + if _, err := tx.ExecContext(ctx, "DELETE FROM users"); err != nil { + return fmt.Errorf("failed to clear users: %v", err) + } + + // Insert all identities + for _, identity := range config.Identities { + // Marshal account data + var accountDataJSON string + if identity.Account != nil { + data, err := json.Marshal(identity.Account) + if err != nil { + return fmt.Errorf("failed to marshal account data for user %s: %v", identity.Name, err) + } + accountDataJSON = string(data) + } + + // Marshal actions + var actionsJSON string + if identity.Actions != nil { + data, err := json.Marshal(identity.Actions) + if err != nil { + return fmt.Errorf("failed to marshal actions for user %s: %v", identity.Name, err) + } + actionsJSON = string(data) + } + + // Insert user + _, err := tx.ExecContext(ctx, + "INSERT INTO users (username, email, account_data, actions) VALUES (?, ?, ?, ?)", + identity.Name, "", accountDataJSON, actionsJSON) + if err != nil { + return fmt.Errorf("failed to insert user %s: %v", identity.Name, err) + } + + // Insert credentials + for _, cred := range identity.Credentials { + _, err := tx.ExecContext(ctx, + "INSERT INTO credentials (username, access_key, secret_key) VALUES (?, ?, ?)", + identity.Name, cred.AccessKey, cred.SecretKey) + if err != nil { + return fmt.Errorf("failed to insert credential for user %s: %v", identity.Name, err) + } + } + } + + return tx.Commit() +} + +func (store *SqliteStore) CreateUser(ctx context.Context, identity *iam_pb.Identity) error { + if !store.configured { + return fmt.Errorf("store not configured") + } + + // Check if user already exists + var count int + err := store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = ?", identity.Name).Scan(&count) + if err != nil { + return fmt.Errorf("failed to check user existence: %v", err) + } + if count > 0 { + return credential.ErrUserAlreadyExists + } + + // Start transaction + tx, err := store.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("failed to begin transaction: %v", err) + } + defer tx.Rollback() + + // Marshal account data + var accountDataJSON string + if identity.Account != nil { + data, err := json.Marshal(identity.Account) + if err != nil { + return fmt.Errorf("failed to marshal account data: %v", err) + } + accountDataJSON = string(data) + } + + // Marshal actions + var actionsJSON string + if identity.Actions != nil { + data, err := json.Marshal(identity.Actions) + if err != nil { + return fmt.Errorf("failed to marshal actions: %v", err) + } + actionsJSON = string(data) + } + + // Insert user + _, err = tx.ExecContext(ctx, + "INSERT INTO users (username, email, account_data, actions) VALUES (?, ?, ?, ?)", + identity.Name, "", accountDataJSON, actionsJSON) + if err != nil { + return fmt.Errorf("failed to insert user: %v", err) + } + + // Insert credentials + for _, cred := range identity.Credentials { + _, err = tx.ExecContext(ctx, + "INSERT INTO credentials (username, access_key, secret_key) VALUES (?, ?, ?)", + identity.Name, cred.AccessKey, cred.SecretKey) + if err != nil { + return fmt.Errorf("failed to insert credential: %v", err) + } + } + + return tx.Commit() +} + +func (store *SqliteStore) GetUser(ctx context.Context, username string) (*iam_pb.Identity, error) { + if !store.configured { + return nil, fmt.Errorf("store not configured") + } + + var email, accountDataJSON, actionsJSON string + + err := store.db.QueryRowContext(ctx, + "SELECT email, account_data, actions FROM users WHERE username = ?", + username).Scan(&email, &accountDataJSON, &actionsJSON) + if err != nil { + if err == sql.ErrNoRows { + return nil, credential.ErrUserNotFound + } + return nil, fmt.Errorf("failed to query user: %v", err) + } + + identity := &iam_pb.Identity{ + Name: username, + } + + // Parse account data + if accountDataJSON != "" { + if err := json.Unmarshal([]byte(accountDataJSON), &identity.Account); err != nil { + return nil, fmt.Errorf("failed to unmarshal account data: %v", err) + } + } + + // Parse actions + if actionsJSON != "" { + if err := json.Unmarshal([]byte(actionsJSON), &identity.Actions); err != nil { + return nil, fmt.Errorf("failed to unmarshal actions: %v", err) + } + } + + // Query credentials + rows, err := store.db.QueryContext(ctx, "SELECT access_key, secret_key FROM credentials WHERE username = ?", username) + if err != nil { + return nil, fmt.Errorf("failed to query credentials: %v", err) + } + defer rows.Close() + + for rows.Next() { + var accessKey, secretKey string + if err := rows.Scan(&accessKey, &secretKey); err != nil { + return nil, fmt.Errorf("failed to scan credential: %v", err) + } + + identity.Credentials = append(identity.Credentials, &iam_pb.Credential{ + AccessKey: accessKey, + SecretKey: secretKey, + }) + } + + return identity, nil +} + +func (store *SqliteStore) UpdateUser(ctx context.Context, username string, identity *iam_pb.Identity) error { + if !store.configured { + return fmt.Errorf("store not configured") + } + + // Start transaction + tx, err := store.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("failed to begin transaction: %v", err) + } + defer tx.Rollback() + + // Check if user exists + var count int + err = tx.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = ?", username).Scan(&count) + if err != nil { + return fmt.Errorf("failed to check user existence: %v", err) + } + if count == 0 { + return credential.ErrUserNotFound + } + + // Marshal account data + var accountDataJSON string + if identity.Account != nil { + data, err := json.Marshal(identity.Account) + if err != nil { + return fmt.Errorf("failed to marshal account data: %v", err) + } + accountDataJSON = string(data) + } + + // Marshal actions + var actionsJSON string + if identity.Actions != nil { + data, err := json.Marshal(identity.Actions) + if err != nil { + return fmt.Errorf("failed to marshal actions: %v", err) + } + actionsJSON = string(data) + } + + // Update user + _, err = tx.ExecContext(ctx, + "UPDATE users SET email = ?, account_data = ?, actions = ?, updated_at = CURRENT_TIMESTAMP WHERE username = ?", + "", accountDataJSON, actionsJSON, username) + if err != nil { + return fmt.Errorf("failed to update user: %v", err) + } + + // Delete existing credentials + _, err = tx.ExecContext(ctx, "DELETE FROM credentials WHERE username = ?", username) + if err != nil { + return fmt.Errorf("failed to delete existing credentials: %v", err) + } + + // Insert new credentials + for _, cred := range identity.Credentials { + _, err = tx.ExecContext(ctx, + "INSERT INTO credentials (username, access_key, secret_key) VALUES (?, ?, ?)", + username, cred.AccessKey, cred.SecretKey) + if err != nil { + return fmt.Errorf("failed to insert credential: %v", err) + } + } + + return tx.Commit() +} + +func (store *SqliteStore) DeleteUser(ctx context.Context, username string) error { + if !store.configured { + return fmt.Errorf("store not configured") + } + + result, err := store.db.ExecContext(ctx, "DELETE FROM users WHERE username = ?", username) + if err != nil { + return fmt.Errorf("failed to delete user: %v", err) + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected: %v", err) + } + + if rowsAffected == 0 { + return credential.ErrUserNotFound + } + + return nil +} + +func (store *SqliteStore) ListUsers(ctx context.Context) ([]string, error) { + if !store.configured { + return nil, fmt.Errorf("store not configured") + } + + rows, err := store.db.QueryContext(ctx, "SELECT username FROM users ORDER BY username") + if err != nil { + return nil, fmt.Errorf("failed to query users: %v", err) + } + defer rows.Close() + + var usernames []string + for rows.Next() { + var username string + if err := rows.Scan(&username); err != nil { + return nil, fmt.Errorf("failed to scan username: %v", err) + } + usernames = append(usernames, username) + } + + return usernames, nil +} + +func (store *SqliteStore) GetUserByAccessKey(ctx context.Context, accessKey string) (*iam_pb.Identity, error) { + if !store.configured { + return nil, fmt.Errorf("store not configured") + } + + var username string + err := store.db.QueryRowContext(ctx, "SELECT username FROM credentials WHERE access_key = ?", accessKey).Scan(&username) + if err != nil { + if err == sql.ErrNoRows { + return nil, credential.ErrAccessKeyNotFound + } + return nil, fmt.Errorf("failed to query access key: %v", err) + } + + return store.GetUser(ctx, username) +} + +func (store *SqliteStore) CreateAccessKey(ctx context.Context, username string, cred *iam_pb.Credential) error { + if !store.configured { + return fmt.Errorf("store not configured") + } + + // Check if user exists + var count int + err := store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = ?", username).Scan(&count) + if err != nil { + return fmt.Errorf("failed to check user existence: %v", err) + } + if count == 0 { + return credential.ErrUserNotFound + } + + // Insert credential + _, err = store.db.ExecContext(ctx, + "INSERT INTO credentials (username, access_key, secret_key) VALUES (?, ?, ?)", + username, cred.AccessKey, cred.SecretKey) + if err != nil { + return fmt.Errorf("failed to insert credential: %v", err) + } + + return nil +} + +func (store *SqliteStore) DeleteAccessKey(ctx context.Context, username string, accessKey string) error { + if !store.configured { + return fmt.Errorf("store not configured") + } + + result, err := store.db.ExecContext(ctx, + "DELETE FROM credentials WHERE username = ? AND access_key = ?", + username, accessKey) + if err != nil { + return fmt.Errorf("failed to delete access key: %v", err) + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected: %v", err) + } + + if rowsAffected == 0 { + // Check if user exists + var count int + err = store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = ?", username).Scan(&count) + if err != nil { + return fmt.Errorf("failed to check user existence: %v", err) + } + if count == 0 { + return credential.ErrUserNotFound + } + return credential.ErrAccessKeyNotFound + } + + return nil +} + +func (store *SqliteStore) Shutdown() { + if store.db != nil { + store.db.Close() + store.db = nil + } + store.configured = false +} diff --git a/weed/credential/test/integration_test.go b/weed/credential/test/integration_test.go new file mode 100644 index 000000000..53cd80bc0 --- /dev/null +++ b/weed/credential/test/integration_test.go @@ -0,0 +1,122 @@ +package test + +import ( + "context" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/credential" + "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + + // Import all store implementations to register them + _ "github.com/seaweedfs/seaweedfs/weed/credential/filer_etc" + _ "github.com/seaweedfs/seaweedfs/weed/credential/memory" + _ "github.com/seaweedfs/seaweedfs/weed/credential/postgres" + _ "github.com/seaweedfs/seaweedfs/weed/credential/sqlite" +) + +func TestStoreRegistration(t *testing.T) { + // Test that stores are registered + storeNames := credential.GetAvailableStores() + if len(storeNames) == 0 { + t.Fatal("No credential stores registered") + } + + expectedStores := []string{string(credential.StoreTypeFilerEtc), string(credential.StoreTypeMemory), string(credential.StoreTypeSQLite), string(credential.StoreTypePostgres)} + + // Verify all expected stores are present + for _, expected := range expectedStores { + found := false + for _, storeName := range storeNames { + if string(storeName) == expected { + found = true + break + } + } + if !found { + t.Errorf("Expected store not found: %s", expected) + } + } + + t.Logf("Available stores: %v", storeNames) +} + +func TestMemoryStoreIntegration(t *testing.T) { + // Test creating credential manager with memory store + config := util.GetViper() + cm, err := credential.NewCredentialManager(credential.StoreTypeMemory, config, "test.") + if err != nil { + t.Fatalf("Failed to create memory credential manager: %v", err) + } + defer cm.Shutdown() + + // Test that the store is of the correct type + if cm.GetStore().GetName() != credential.StoreTypeMemory { + t.Errorf("Expected memory store, got %s", cm.GetStore().GetName()) + } + + // Test basic operations + ctx := context.Background() + + // Create test user + testUser := &iam_pb.Identity{ + Name: "testuser", + Actions: []string{"Read", "Write"}, + Account: &iam_pb.Account{ + Id: "123456789012", + DisplayName: "Test User", + EmailAddress: "test@example.com", + }, + Credentials: []*iam_pb.Credential{ + { + AccessKey: "AKIAIOSFODNN7EXAMPLE", + SecretKey: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + }, + }, + } + + // Test CreateUser + err = cm.CreateUser(ctx, testUser) + if err != nil { + t.Fatalf("CreateUser failed: %v", err) + } + + // Test GetUser + user, err := cm.GetUser(ctx, "testuser") + if err != nil { + t.Fatalf("GetUser failed: %v", err) + } + if user.Name != "testuser" { + t.Errorf("Expected user name 'testuser', got %s", user.Name) + } + + // Test ListUsers + users, err := cm.ListUsers(ctx) + if err != nil { + t.Fatalf("ListUsers failed: %v", err) + } + if len(users) != 1 || users[0] != "testuser" { + t.Errorf("Expected ['testuser'], got %v", users) + } + + // Test GetUserByAccessKey + userByKey, err := cm.GetUserByAccessKey(ctx, "AKIAIOSFODNN7EXAMPLE") + if err != nil { + t.Fatalf("GetUserByAccessKey failed: %v", err) + } + if userByKey.Name != "testuser" { + t.Errorf("Expected user name 'testuser', got %s", userByKey.Name) + } + + // Test DeleteUser + err = cm.DeleteUser(ctx, "testuser") + if err != nil { + t.Fatalf("DeleteUser failed: %v", err) + } + + // Verify user was deleted + _, err = cm.GetUser(ctx, "testuser") + if err != credential.ErrUserNotFound { + t.Errorf("Expected ErrUserNotFound, got %v", err) + } +} diff --git a/weed/iamapi/iamapi_server.go b/weed/iamapi/iamapi_server.go index d1575a14e..763761b94 100644 --- a/weed/iamapi/iamapi_server.go +++ b/weed/iamapi/iamapi_server.go @@ -4,11 +4,13 @@ package iamapi import ( "bytes" + "context" "encoding/json" "fmt" "net/http" "github.com/gorilla/mux" + "github.com/seaweedfs/seaweedfs/weed/credential" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -29,8 +31,9 @@ type IamS3ApiConfig interface { } type IamS3ApiConfigure struct { - option *IamServerOption - masterClient *wdclient.MasterClient + option *IamServerOption + masterClient *wdclient.MasterClient + credentialManager *credential.CredentialManager } type IamServerOption struct { @@ -48,17 +51,28 @@ type IamApiServer struct { var s3ApiConfigure IamS3ApiConfig func NewIamApiServer(router *mux.Router, option *IamServerOption) (iamApiServer *IamApiServer, err error) { - s3ApiConfigure = IamS3ApiConfigure{ + return NewIamApiServerWithStore(router, option, "") +} + +func NewIamApiServerWithStore(router *mux.Router, option *IamServerOption, explicitStore string) (iamApiServer *IamApiServer, err error) { + configure := &IamS3ApiConfigure{ option: option, masterClient: wdclient.NewMasterClient(option.GrpcDialOption, "", "iam", "", "", "", *pb.NewServiceDiscoveryFromMap(option.Masters)), } + + s3ApiConfigure = configure + s3Option := s3api.S3ApiServerOption{ Filer: option.Filer, GrpcDialOption: option.GrpcDialOption, } + + iam := s3api.NewIdentityAccessManagementWithStore(&s3Option, explicitStore) + configure.credentialManager = iam.GetCredentialManager() + iamApiServer = &IamApiServer{ s3ApiConfig: s3ApiConfigure, - iam: s3api.NewIdentityAccessManagement(&s3Option), + iam: iam, } iamApiServer.registerRouter(router) @@ -78,10 +92,31 @@ func (iama *IamApiServer) registerRouter(router *mux.Router) { apiRouter.NotFoundHandler = http.HandlerFunc(s3err.NotFoundHandler) } -func (iam IamS3ApiConfigure) GetS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) (err error) { +func (iama *IamS3ApiConfigure) GetS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) (err error) { + return iama.GetS3ApiConfigurationFromCredentialManager(s3cfg) +} + +func (iama *IamS3ApiConfigure) PutS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) (err error) { + return iama.PutS3ApiConfigurationToCredentialManager(s3cfg) +} + +func (iama *IamS3ApiConfigure) GetS3ApiConfigurationFromCredentialManager(s3cfg *iam_pb.S3ApiConfiguration) (err error) { + config, err := iama.credentialManager.LoadConfiguration(context.Background()) + if err != nil { + return fmt.Errorf("failed to load configuration from credential manager: %v", err) + } + *s3cfg = *config + return nil +} + +func (iama *IamS3ApiConfigure) PutS3ApiConfigurationToCredentialManager(s3cfg *iam_pb.S3ApiConfiguration) (err error) { + return iama.credentialManager.SaveConfiguration(context.Background(), s3cfg) +} + +func (iama *IamS3ApiConfigure) GetS3ApiConfigurationFromFiler(s3cfg *iam_pb.S3ApiConfiguration) (err error) { var buf bytes.Buffer - err = pb.WithGrpcFilerClient(false, 0, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - if err = filer.ReadEntry(iam.masterClient, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil { + err = pb.WithGrpcFilerClient(false, 0, iama.option.Filer, iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err = filer.ReadEntry(iama.masterClient, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil { return err } return nil @@ -97,12 +132,12 @@ func (iam IamS3ApiConfigure) GetS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfigurat return nil } -func (iam IamS3ApiConfigure) PutS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) (err error) { +func (iama *IamS3ApiConfigure) PutS3ApiConfigurationToFiler(s3cfg *iam_pb.S3ApiConfiguration) (err error) { buf := bytes.Buffer{} if err := filer.ProtoToText(&buf, s3cfg); err != nil { return fmt.Errorf("ProtoToText: %s", err) } - return pb.WithGrpcFilerClient(false, 0, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithGrpcFilerClient(false, 0, iama.option.Filer, iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { err = util.Retry("saveIamIdentity", func() error { return filer.SaveInsideFiler(client, filer.IamConfigDirectory, filer.IamIdentityFile, buf.Bytes()) }) @@ -113,10 +148,10 @@ func (iam IamS3ApiConfigure) PutS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfigurat }) } -func (iam IamS3ApiConfigure) GetPolicies(policies *Policies) (err error) { +func (iama *IamS3ApiConfigure) GetPolicies(policies *Policies) (err error) { var buf bytes.Buffer - err = pb.WithGrpcFilerClient(false, 0, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - if err = filer.ReadEntry(iam.masterClient, client, filer.IamConfigDirectory, filer.IamPoliciesFile, &buf); err != nil { + err = pb.WithGrpcFilerClient(false, 0, iama.option.Filer, iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err = filer.ReadEntry(iama.masterClient, client, filer.IamConfigDirectory, filer.IamPoliciesFile, &buf); err != nil { return err } return nil @@ -134,12 +169,12 @@ func (iam IamS3ApiConfigure) GetPolicies(policies *Policies) (err error) { return nil } -func (iam IamS3ApiConfigure) PutPolicies(policies *Policies) (err error) { +func (iama *IamS3ApiConfigure) PutPolicies(policies *Policies) (err error) { var b []byte if b, err = json.Marshal(policies); err != nil { return err } - return pb.WithGrpcFilerClient(false, 0, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithGrpcFilerClient(false, 0, iama.option.Filer, iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { if err := filer.SaveInsideFiler(client, filer.IamConfigDirectory, filer.IamPoliciesFile, b); err != nil { return err } diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index 1fb118d6f..e4e5fda83 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -1,19 +1,21 @@ package s3api import ( + "context" "fmt" "net/http" "os" "strings" "sync" + "github.com/seaweedfs/seaweedfs/weed/credential" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" + "google.golang.org/grpc" ) type Action string @@ -35,6 +37,9 @@ type IdentityAccessManagement struct { hashMu sync.RWMutex domain string isAuthEnabled bool + credentialManager *credential.CredentialManager + filerClient filer_pb.SeaweedFilerClient + grpcDialOption grpc.DialOption } type Identity struct { @@ -114,19 +119,40 @@ func (action Action) getPermission() Permission { } func NewIdentityAccessManagement(option *S3ApiServerOption) *IdentityAccessManagement { + return NewIdentityAccessManagementWithStore(option, "") +} + +func NewIdentityAccessManagementWithStore(option *S3ApiServerOption, explicitStore string) *IdentityAccessManagement { iam := &IdentityAccessManagement{ domain: option.DomainName, hashes: make(map[string]*sync.Pool), hashCounters: make(map[string]*int32), } + // Always initialize credential manager with fallback to defaults + credentialManager, err := credential.NewCredentialManagerWithDefaults(credential.CredentialStoreTypeName(explicitStore)) + if err != nil { + glog.Fatalf("failed to initialize credential manager: %v", err) + } + + // For stores that need filer client details, set them + if store := credentialManager.GetStore(); store != nil { + if filerClientSetter, ok := store.(interface { + SetFilerClient(string, grpc.DialOption) + }); ok { + filerClientSetter.SetFilerClient(string(option.Filer), option.GrpcDialOption) + } + } + + iam.credentialManager = credentialManager + if option.Config != "" { glog.V(3).Infof("loading static config file %s", option.Config) if err := iam.loadS3ApiConfigurationFromFile(option.Config); err != nil { glog.Fatalf("fail to load config file %s: %v", option.Config, err) } } else { - glog.V(3).Infof("no static config file specified... loading config from filer %s", option.Filer) + glog.V(3).Infof("no static config file specified... loading config from credential manager") if err := iam.loadS3ApiConfigurationFromFiler(option); err != nil { glog.Warningf("fail to load config: %v", err) } @@ -134,17 +160,8 @@ func NewIdentityAccessManagement(option *S3ApiServerOption) *IdentityAccessManag return iam } -func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFiler(option *S3ApiServerOption) (err error) { - var content []byte - err = pb.WithFilerClient(false, 0, option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - glog.V(3).Infof("loading config %s from filer %s", filer.IamConfigDirectory+"/"+filer.IamIdentityFile, option.Filer) - content, err = filer.ReadInsideFiler(client, filer.IamConfigDirectory, filer.IamIdentityFile) - return err - }) - if err != nil { - return fmt.Errorf("read S3 config: %v", err) - } - return iam.LoadS3ApiConfigurationFromBytes(content) +func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFiler(option *S3ApiServerOption) error { + return iam.LoadS3ApiConfigurationFromCredentialManager() } func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFile(fileName string) error { @@ -516,3 +533,22 @@ func (identity *Identity) isAdmin() bool { } return false } + +// GetCredentialManager returns the credential manager instance +func (iam *IdentityAccessManagement) GetCredentialManager() *credential.CredentialManager { + return iam.credentialManager +} + +// LoadS3ApiConfigurationFromCredentialManager loads configuration using the credential manager +func (iam *IdentityAccessManagement) LoadS3ApiConfigurationFromCredentialManager() error { + s3ApiConfiguration, err := iam.credentialManager.LoadConfiguration(context.Background()) + if err != nil { + return fmt.Errorf("failed to load configuration from credential manager: %v", err) + } + + if len(s3ApiConfiguration.Identities) == 0 { + return fmt.Errorf("no identities found") + } + + return iam.loadS3ApiConfiguration(s3ApiConfiguration) +} diff --git a/weed/s3api/s3api_put_object_helper_test.go b/weed/s3api/s3api_put_object_helper_test.go index 774741a0d..455701772 100644 --- a/weed/s3api/s3api_put_object_helper_test.go +++ b/weed/s3api/s3api_put_object_helper_test.go @@ -5,13 +5,15 @@ import ( "strings" "testing" + "github.com/seaweedfs/seaweedfs/weed/credential" + _ "github.com/seaweedfs/seaweedfs/weed/credential/memory" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" ) func TestGetRequestDataReader_ChunkedEncodingWithoutIAM(t *testing.T) { // Create an S3ApiServer with IAM disabled s3a := &S3ApiServer{ - iam: NewIdentityAccessManagement(&S3ApiServerOption{}), + iam: NewIdentityAccessManagementWithStore(&S3ApiServerOption{}, string(credential.StoreTypeMemory)), } // Ensure IAM is disabled for this test s3a.iam.isAuthEnabled = false @@ -85,7 +87,7 @@ func TestGetRequestDataReader_ChunkedEncodingWithoutIAM(t *testing.T) { func TestGetRequestDataReader_AuthTypeDetection(t *testing.T) { // Create an S3ApiServer with IAM disabled s3a := &S3ApiServer{ - iam: NewIdentityAccessManagement(&S3ApiServerOption{}), + iam: NewIdentityAccessManagementWithStore(&S3ApiServerOption{}, string(credential.StoreTypeMemory)), } s3a.iam.isAuthEnabled = false @@ -120,7 +122,7 @@ func TestGetRequestDataReader_AuthTypeDetection(t *testing.T) { func TestGetRequestDataReader_IAMEnabled(t *testing.T) { // Create an S3ApiServer with IAM enabled s3a := &S3ApiServer{ - iam: NewIdentityAccessManagement(&S3ApiServerOption{}), + iam: NewIdentityAccessManagementWithStore(&S3ApiServerOption{}, string(credential.StoreTypeMemory)), } s3a.iam.isAuthEnabled = true diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index 2f9e9e3fb..f0aaa3985 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/seaweedfs/seaweedfs/weed/credential" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb" @@ -41,16 +42,21 @@ type S3ApiServerOption struct { type S3ApiServer struct { s3_pb.UnimplementedSeaweedS3Server - option *S3ApiServerOption - iam *IdentityAccessManagement - cb *CircuitBreaker - randomClientId int32 - filerGuard *security.Guard - client util_http_client.HTTPClientInterface - bucketRegistry *BucketRegistry + option *S3ApiServerOption + iam *IdentityAccessManagement + cb *CircuitBreaker + randomClientId int32 + filerGuard *security.Guard + client util_http_client.HTTPClientInterface + bucketRegistry *BucketRegistry + credentialManager *credential.CredentialManager } func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) { + return NewS3ApiServerWithStore(router, option, "") +} + +func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, explicitStore string) (s3ApiServer *S3ApiServer, err error) { startTsNs := time.Now().UnixNano() v := util.GetViper() @@ -64,19 +70,25 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer v.SetDefault("cors.allowed_origins.values", "*") - if (option.AllowedOrigins == nil) || (len(option.AllowedOrigins) == 0) { + if len(option.AllowedOrigins) == 0 { allowedOrigins := v.GetString("cors.allowed_origins.values") domains := strings.Split(allowedOrigins, ",") option.AllowedOrigins = domains } + var iam *IdentityAccessManagement + + iam = NewIdentityAccessManagementWithStore(option, explicitStore) + s3ApiServer = &S3ApiServer{ - option: option, - iam: NewIdentityAccessManagement(option), - randomClientId: util.RandomInt32(), - filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec), - cb: NewCircuitBreaker(option), + option: option, + iam: iam, + randomClientId: util.RandomInt32(), + filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec), + cb: NewCircuitBreaker(option), + credentialManager: iam.credentialManager, } + if option.Config != "" { grace.OnReload(func() { if err := s3ApiServer.iam.loadS3ApiConfigurationFromFile(option.Config); err != nil { @@ -119,7 +131,7 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) { func(w http.ResponseWriter, r *http.Request) { origin := r.Header.Get("Origin") if origin != "" { - if s3a.option.AllowedOrigins == nil || len(s3a.option.AllowedOrigins) == 0 || s3a.option.AllowedOrigins[0] == "*" { + if len(s3a.option.AllowedOrigins) == 0 || s3a.option.AllowedOrigins[0] == "*" { origin = "*" } else { originFound := false