Admin UI: Add policies (#6968)

* add policies to UI, accessing filer directly

* view, edit policies

* add back buttons for "users" page

* remove unused

* fix ui dark mode when modal is closed

* bucket view details button

* fix browser buttons

* filer action button works

* clean up masters page

* fix volume servers action buttons

* fix collections page action button

* fix properties page

* more obvious

* fix directory creation file mode

* Update file_browser_handlers.go

* directory permission
This commit is contained in:
Chris Lu
2025-07-12 01:13:11 -07:00
committed by GitHub
parent 49d43003e1
commit 687a6a6c1d
41 changed files with 4941 additions and 2383 deletions

View File

@@ -86,5 +86,27 @@ type UserCredentials struct {
UpdatedAt time.Time `json:"updatedAt"`
}
// PolicyStatement represents a single policy statement in an IAM policy
type PolicyStatement struct {
Effect string `json:"Effect"`
Action []string `json:"Action"`
Resource []string `json:"Resource"`
}
// PolicyDocument represents an IAM policy document
type PolicyDocument struct {
Version string `json:"Version"`
Statement []*PolicyStatement `json:"Statement"`
}
// PolicyManager interface for managing IAM policies
type PolicyManager interface {
GetPolicies(ctx context.Context) (map[string]PolicyDocument, error)
CreatePolicy(ctx context.Context, name string, document PolicyDocument) error
UpdatePolicy(ctx context.Context, name string, document PolicyDocument) error
DeletePolicy(ctx context.Context, name string) error
GetPolicy(ctx context.Context, name string) (*PolicyDocument, error)
}
// Stores holds all available credential store implementations
var Stores []CredentialStore

View File

@@ -0,0 +1,188 @@
package filer_etc
import (
"bytes"
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
)
func (store *FilerEtcStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiConfiguration, error) {
s3cfg := &iam_pb.S3ApiConfiguration{}
err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
var buf bytes.Buffer
if err := filer.ReadEntry(nil, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil {
if err != filer_pb.ErrNotFound {
return err
}
}
if buf.Len() > 0 {
return filer.ParseS3ConfigurationFromBytes(buf.Bytes(), s3cfg)
}
return nil
})
return s3cfg, err
}
func (store *FilerEtcStore) SaveConfiguration(ctx context.Context, config *iam_pb.S3ApiConfiguration) error {
return store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
var buf bytes.Buffer
if err := filer.ProtoToText(&buf, config); err != nil {
return fmt.Errorf("failed to marshal configuration: %v", err)
}
return filer.SaveInsideFiler(client, filer.IamConfigDirectory, filer.IamIdentityFile, buf.Bytes())
})
}
func (store *FilerEtcStore) CreateUser(ctx context.Context, identity *iam_pb.Identity) error {
// Load existing configuration
config, err := store.LoadConfiguration(ctx)
if err != nil {
return fmt.Errorf("failed to load configuration: %v", err)
}
// Check if user already exists
for _, existingIdentity := range config.Identities {
if existingIdentity.Name == identity.Name {
return credential.ErrUserAlreadyExists
}
}
// Add new identity
config.Identities = append(config.Identities, identity)
// Save configuration
return store.SaveConfiguration(ctx, config)
}
func (store *FilerEtcStore) GetUser(ctx context.Context, username string) (*iam_pb.Identity, error) {
config, err := store.LoadConfiguration(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load configuration: %v", err)
}
for _, identity := range config.Identities {
if identity.Name == username {
return identity, nil
}
}
return nil, credential.ErrUserNotFound
}
func (store *FilerEtcStore) UpdateUser(ctx context.Context, username string, identity *iam_pb.Identity) error {
config, err := store.LoadConfiguration(ctx)
if err != nil {
return fmt.Errorf("failed to load configuration: %v", err)
}
// Find and update the user
for i, existingIdentity := range config.Identities {
if existingIdentity.Name == username {
config.Identities[i] = identity
return store.SaveConfiguration(ctx, config)
}
}
return credential.ErrUserNotFound
}
func (store *FilerEtcStore) DeleteUser(ctx context.Context, username string) error {
config, err := store.LoadConfiguration(ctx)
if err != nil {
return fmt.Errorf("failed to load configuration: %v", err)
}
// Find and remove the user
for i, identity := range config.Identities {
if identity.Name == username {
config.Identities = append(config.Identities[:i], config.Identities[i+1:]...)
return store.SaveConfiguration(ctx, config)
}
}
return credential.ErrUserNotFound
}
func (store *FilerEtcStore) ListUsers(ctx context.Context) ([]string, error) {
config, err := store.LoadConfiguration(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load configuration: %v", err)
}
var usernames []string
for _, identity := range config.Identities {
usernames = append(usernames, identity.Name)
}
return usernames, nil
}
func (store *FilerEtcStore) GetUserByAccessKey(ctx context.Context, accessKey string) (*iam_pb.Identity, error) {
config, err := store.LoadConfiguration(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load configuration: %v", err)
}
for _, identity := range config.Identities {
for _, credential := range identity.Credentials {
if credential.AccessKey == accessKey {
return identity, nil
}
}
}
return nil, credential.ErrAccessKeyNotFound
}
func (store *FilerEtcStore) CreateAccessKey(ctx context.Context, username string, cred *iam_pb.Credential) error {
config, err := store.LoadConfiguration(ctx)
if err != nil {
return fmt.Errorf("failed to load configuration: %v", err)
}
// Find the user and add the credential
for _, identity := range config.Identities {
if identity.Name == username {
// Check if access key already exists
for _, existingCred := range identity.Credentials {
if existingCred.AccessKey == cred.AccessKey {
return fmt.Errorf("access key %s already exists", cred.AccessKey)
}
}
identity.Credentials = append(identity.Credentials, cred)
return store.SaveConfiguration(ctx, config)
}
}
return credential.ErrUserNotFound
}
func (store *FilerEtcStore) DeleteAccessKey(ctx context.Context, username string, accessKey string) error {
config, err := store.LoadConfiguration(ctx)
if err != nil {
return fmt.Errorf("failed to load configuration: %v", err)
}
// Find the user and remove the credential
for _, identity := range config.Identities {
if identity.Name == username {
for i, cred := range identity.Credentials {
if cred.AccessKey == accessKey {
identity.Credentials = append(identity.Credentials[:i], identity.Credentials[i+1:]...)
return store.SaveConfiguration(ctx, config)
}
}
return credential.ErrAccessKeyNotFound
}
}
return credential.ErrUserNotFound
}

View File

@@ -0,0 +1,114 @@
package filer_etc
import (
"bytes"
"context"
"encoding/json"
"github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)
type PoliciesCollection struct {
Policies map[string]credential.PolicyDocument `json:"policies"`
}
// GetPolicies retrieves all IAM policies from the filer
func (store *FilerEtcStore) GetPolicies(ctx context.Context) (map[string]credential.PolicyDocument, error) {
policiesCollection := &PoliciesCollection{
Policies: make(map[string]credential.PolicyDocument),
}
// Check if filer client is configured
if store.filerGrpcAddress == "" {
glog.V(1).Infof("Filer client not configured for policy retrieval, returning empty policies")
// Return empty policies if filer client is not configured
return policiesCollection.Policies, nil
}
err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
var buf bytes.Buffer
if err := filer.ReadEntry(nil, client, filer.IamConfigDirectory, filer.IamPoliciesFile, &buf); err != nil {
if err == filer_pb.ErrNotFound {
glog.V(1).Infof("Policies file not found at %s/%s, returning empty policies", filer.IamConfigDirectory, filer.IamPoliciesFile)
// If file doesn't exist, return empty collection
return nil
}
return err
}
if buf.Len() > 0 {
return json.Unmarshal(buf.Bytes(), policiesCollection)
}
return nil
})
if err != nil {
return nil, err
}
return policiesCollection.Policies, nil
}
// CreatePolicy creates a new IAM policy in the filer
func (store *FilerEtcStore) CreatePolicy(ctx context.Context, name string, document credential.PolicyDocument) error {
return store.updatePolicies(ctx, func(policies map[string]credential.PolicyDocument) {
policies[name] = document
})
}
// UpdatePolicy updates an existing IAM policy in the filer
func (store *FilerEtcStore) UpdatePolicy(ctx context.Context, name string, document credential.PolicyDocument) error {
return store.updatePolicies(ctx, func(policies map[string]credential.PolicyDocument) {
policies[name] = document
})
}
// DeletePolicy deletes an IAM policy from the filer
func (store *FilerEtcStore) DeletePolicy(ctx context.Context, name string) error {
return store.updatePolicies(ctx, func(policies map[string]credential.PolicyDocument) {
delete(policies, name)
})
}
// updatePolicies is a helper method to update policies atomically
func (store *FilerEtcStore) updatePolicies(ctx context.Context, updateFunc func(map[string]credential.PolicyDocument)) error {
// Load existing policies
policies, err := store.GetPolicies(ctx)
if err != nil {
return err
}
// Apply update
updateFunc(policies)
// Save back to filer
policiesCollection := &PoliciesCollection{
Policies: policies,
}
data, err := json.Marshal(policiesCollection)
if err != nil {
return err
}
return store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
return filer.SaveInsideFiler(client, filer.IamConfigDirectory, filer.IamPoliciesFile, data)
})
}
// GetPolicy retrieves a specific IAM policy by name from the filer
func (store *FilerEtcStore) GetPolicy(ctx context.Context, name string) (*credential.PolicyDocument, error) {
policies, err := store.GetPolicies(ctx)
if err != nil {
return nil, err
}
if policy, exists := policies[name]; exists {
return &policy, nil
}
return nil, nil // Policy not found
}

View File

@@ -1,15 +1,11 @@
package filer_etc
import (
"bytes"
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc"
)
@@ -54,182 +50,6 @@ func (store *FilerEtcStore) withFilerClient(fn func(client filer_pb.SeaweedFiler
return pb.WithGrpcFilerClient(false, 0, pb.ServerAddress(store.filerGrpcAddress), store.grpcDialOption, fn)
}
func (store *FilerEtcStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiConfiguration, error) {
s3cfg := &iam_pb.S3ApiConfiguration{}
err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
var buf bytes.Buffer
if err := filer.ReadEntry(nil, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil {
if err != filer_pb.ErrNotFound {
return err
}
}
if buf.Len() > 0 {
return filer.ParseS3ConfigurationFromBytes(buf.Bytes(), s3cfg)
}
return nil
})
return s3cfg, err
}
func (store *FilerEtcStore) SaveConfiguration(ctx context.Context, config *iam_pb.S3ApiConfiguration) error {
return store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
var buf bytes.Buffer
if err := filer.ProtoToText(&buf, config); err != nil {
return fmt.Errorf("failed to marshal configuration: %v", err)
}
return filer.SaveInsideFiler(client, filer.IamConfigDirectory, filer.IamIdentityFile, buf.Bytes())
})
}
func (store *FilerEtcStore) CreateUser(ctx context.Context, identity *iam_pb.Identity) error {
// Load existing configuration
config, err := store.LoadConfiguration(ctx)
if err != nil {
return fmt.Errorf("failed to load configuration: %v", err)
}
// Check if user already exists
for _, existingIdentity := range config.Identities {
if existingIdentity.Name == identity.Name {
return credential.ErrUserAlreadyExists
}
}
// Add new identity
config.Identities = append(config.Identities, identity)
// Save configuration
return store.SaveConfiguration(ctx, config)
}
func (store *FilerEtcStore) GetUser(ctx context.Context, username string) (*iam_pb.Identity, error) {
config, err := store.LoadConfiguration(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load configuration: %v", err)
}
for _, identity := range config.Identities {
if identity.Name == username {
return identity, nil
}
}
return nil, credential.ErrUserNotFound
}
func (store *FilerEtcStore) UpdateUser(ctx context.Context, username string, identity *iam_pb.Identity) error {
config, err := store.LoadConfiguration(ctx)
if err != nil {
return fmt.Errorf("failed to load configuration: %v", err)
}
// Find and update the user
for i, existingIdentity := range config.Identities {
if existingIdentity.Name == username {
config.Identities[i] = identity
return store.SaveConfiguration(ctx, config)
}
}
return credential.ErrUserNotFound
}
func (store *FilerEtcStore) DeleteUser(ctx context.Context, username string) error {
config, err := store.LoadConfiguration(ctx)
if err != nil {
return fmt.Errorf("failed to load configuration: %v", err)
}
// Find and remove the user
for i, identity := range config.Identities {
if identity.Name == username {
config.Identities = append(config.Identities[:i], config.Identities[i+1:]...)
return store.SaveConfiguration(ctx, config)
}
}
return credential.ErrUserNotFound
}
func (store *FilerEtcStore) ListUsers(ctx context.Context) ([]string, error) {
config, err := store.LoadConfiguration(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load configuration: %v", err)
}
var usernames []string
for _, identity := range config.Identities {
usernames = append(usernames, identity.Name)
}
return usernames, nil
}
func (store *FilerEtcStore) GetUserByAccessKey(ctx context.Context, accessKey string) (*iam_pb.Identity, error) {
config, err := store.LoadConfiguration(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load configuration: %v", err)
}
for _, identity := range config.Identities {
for _, credential := range identity.Credentials {
if credential.AccessKey == accessKey {
return identity, nil
}
}
}
return nil, credential.ErrAccessKeyNotFound
}
func (store *FilerEtcStore) CreateAccessKey(ctx context.Context, username string, cred *iam_pb.Credential) error {
config, err := store.LoadConfiguration(ctx)
if err != nil {
return fmt.Errorf("failed to load configuration: %v", err)
}
// Find the user and add the credential
for _, identity := range config.Identities {
if identity.Name == username {
// Check if access key already exists
for _, existingCred := range identity.Credentials {
if existingCred.AccessKey == cred.AccessKey {
return fmt.Errorf("access key %s already exists", cred.AccessKey)
}
}
identity.Credentials = append(identity.Credentials, cred)
return store.SaveConfiguration(ctx, config)
}
}
return credential.ErrUserNotFound
}
func (store *FilerEtcStore) DeleteAccessKey(ctx context.Context, username string, accessKey string) error {
config, err := store.LoadConfiguration(ctx)
if err != nil {
return fmt.Errorf("failed to load configuration: %v", err)
}
// Find the user and remove the credential
for _, identity := range config.Identities {
if identity.Name == username {
for i, cred := range identity.Credentials {
if cred.AccessKey == accessKey {
identity.Credentials = append(identity.Credentials[:i], identity.Credentials[i+1:]...)
return store.SaveConfiguration(ctx, config)
}
}
return credential.ErrAccessKeyNotFound
}
}
return credential.ErrUserNotFound
}
func (store *FilerEtcStore) Shutdown() {
// No cleanup needed for file store
}

View File

@@ -0,0 +1,302 @@
package memory
import (
"context"
"encoding/json"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
)
func (store *MemoryStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiConfiguration, error) {
store.mu.RLock()
defer store.mu.RUnlock()
if !store.initialized {
return nil, fmt.Errorf("store not initialized")
}
config := &iam_pb.S3ApiConfiguration{}
// Convert all users to identities
for _, user := range store.users {
// Deep copy the identity to avoid mutation issues
identityCopy := store.deepCopyIdentity(user)
config.Identities = append(config.Identities, identityCopy)
}
return config, nil
}
func (store *MemoryStore) SaveConfiguration(ctx context.Context, config *iam_pb.S3ApiConfiguration) error {
store.mu.Lock()
defer store.mu.Unlock()
if !store.initialized {
return fmt.Errorf("store not initialized")
}
// Clear existing data
store.users = make(map[string]*iam_pb.Identity)
store.accessKeys = make(map[string]string)
// Add all identities
for _, identity := range config.Identities {
// Deep copy to avoid mutation issues
identityCopy := store.deepCopyIdentity(identity)
store.users[identity.Name] = identityCopy
// Index access keys
for _, credential := range identity.Credentials {
store.accessKeys[credential.AccessKey] = identity.Name
}
}
return nil
}
func (store *MemoryStore) CreateUser(ctx context.Context, identity *iam_pb.Identity) error {
store.mu.Lock()
defer store.mu.Unlock()
if !store.initialized {
return fmt.Errorf("store not initialized")
}
if _, exists := store.users[identity.Name]; exists {
return credential.ErrUserAlreadyExists
}
// Check for duplicate access keys
for _, cred := range identity.Credentials {
if _, exists := store.accessKeys[cred.AccessKey]; exists {
return fmt.Errorf("access key %s already exists", cred.AccessKey)
}
}
// Deep copy to avoid mutation issues
identityCopy := store.deepCopyIdentity(identity)
store.users[identity.Name] = identityCopy
// Index access keys
for _, cred := range identity.Credentials {
store.accessKeys[cred.AccessKey] = identity.Name
}
return nil
}
func (store *MemoryStore) GetUser(ctx context.Context, username string) (*iam_pb.Identity, error) {
store.mu.RLock()
defer store.mu.RUnlock()
if !store.initialized {
return nil, fmt.Errorf("store not initialized")
}
user, exists := store.users[username]
if !exists {
return nil, credential.ErrUserNotFound
}
// Return a deep copy to avoid mutation issues
return store.deepCopyIdentity(user), nil
}
func (store *MemoryStore) UpdateUser(ctx context.Context, username string, identity *iam_pb.Identity) error {
store.mu.Lock()
defer store.mu.Unlock()
if !store.initialized {
return fmt.Errorf("store not initialized")
}
existingUser, exists := store.users[username]
if !exists {
return credential.ErrUserNotFound
}
// Remove old access keys from index
for _, cred := range existingUser.Credentials {
delete(store.accessKeys, cred.AccessKey)
}
// Check for duplicate access keys (excluding current user)
for _, cred := range identity.Credentials {
if existingUsername, exists := store.accessKeys[cred.AccessKey]; exists && existingUsername != username {
return fmt.Errorf("access key %s already exists", cred.AccessKey)
}
}
// Deep copy to avoid mutation issues
identityCopy := store.deepCopyIdentity(identity)
store.users[username] = identityCopy
// Re-index access keys
for _, cred := range identity.Credentials {
store.accessKeys[cred.AccessKey] = username
}
return nil
}
func (store *MemoryStore) DeleteUser(ctx context.Context, username string) error {
store.mu.Lock()
defer store.mu.Unlock()
if !store.initialized {
return fmt.Errorf("store not initialized")
}
user, exists := store.users[username]
if !exists {
return credential.ErrUserNotFound
}
// Remove access keys from index
for _, cred := range user.Credentials {
delete(store.accessKeys, cred.AccessKey)
}
// Remove user
delete(store.users, username)
return nil
}
func (store *MemoryStore) ListUsers(ctx context.Context) ([]string, error) {
store.mu.RLock()
defer store.mu.RUnlock()
if !store.initialized {
return nil, fmt.Errorf("store not initialized")
}
var usernames []string
for username := range store.users {
usernames = append(usernames, username)
}
return usernames, nil
}
func (store *MemoryStore) GetUserByAccessKey(ctx context.Context, accessKey string) (*iam_pb.Identity, error) {
store.mu.RLock()
defer store.mu.RUnlock()
if !store.initialized {
return nil, fmt.Errorf("store not initialized")
}
username, exists := store.accessKeys[accessKey]
if !exists {
return nil, credential.ErrAccessKeyNotFound
}
user, exists := store.users[username]
if !exists {
// This should not happen, but handle it gracefully
return nil, credential.ErrUserNotFound
}
// Return a deep copy to avoid mutation issues
return store.deepCopyIdentity(user), nil
}
func (store *MemoryStore) CreateAccessKey(ctx context.Context, username string, cred *iam_pb.Credential) error {
store.mu.Lock()
defer store.mu.Unlock()
if !store.initialized {
return fmt.Errorf("store not initialized")
}
user, exists := store.users[username]
if !exists {
return credential.ErrUserNotFound
}
// Check if access key already exists
if _, exists := store.accessKeys[cred.AccessKey]; exists {
return fmt.Errorf("access key %s already exists", cred.AccessKey)
}
// Add credential to user
user.Credentials = append(user.Credentials, &iam_pb.Credential{
AccessKey: cred.AccessKey,
SecretKey: cred.SecretKey,
})
// Index the access key
store.accessKeys[cred.AccessKey] = username
return nil
}
func (store *MemoryStore) DeleteAccessKey(ctx context.Context, username string, accessKey string) error {
store.mu.Lock()
defer store.mu.Unlock()
if !store.initialized {
return fmt.Errorf("store not initialized")
}
user, exists := store.users[username]
if !exists {
return credential.ErrUserNotFound
}
// Find and remove the credential
var newCredentials []*iam_pb.Credential
found := false
for _, cred := range user.Credentials {
if cred.AccessKey == accessKey {
found = true
// Remove from access key index
delete(store.accessKeys, accessKey)
} else {
newCredentials = append(newCredentials, cred)
}
}
if !found {
return credential.ErrAccessKeyNotFound
}
user.Credentials = newCredentials
return nil
}
// deepCopyIdentity creates a deep copy of an identity to avoid mutation issues
func (store *MemoryStore) deepCopyIdentity(identity *iam_pb.Identity) *iam_pb.Identity {
if identity == nil {
return nil
}
// Use JSON marshaling/unmarshaling for deep copy
// This is simple and safe for protobuf messages
data, err := json.Marshal(identity)
if err != nil {
// Fallback to shallow copy if JSON fails
return &iam_pb.Identity{
Name: identity.Name,
Account: identity.Account,
Credentials: identity.Credentials,
Actions: identity.Actions,
}
}
var copy iam_pb.Identity
if err := json.Unmarshal(data, &copy); err != nil {
// Fallback to shallow copy if JSON fails
return &iam_pb.Identity{
Name: identity.Name,
Account: identity.Account,
Credentials: identity.Credentials,
Actions: identity.Actions,
}
}
return &copy
}

View File

@@ -0,0 +1,77 @@
package memory
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/credential"
)
// GetPolicies retrieves all IAM policies from memory
func (store *MemoryStore) GetPolicies(ctx context.Context) (map[string]credential.PolicyDocument, error) {
store.mu.RLock()
defer store.mu.RUnlock()
if !store.initialized {
return nil, fmt.Errorf("store not initialized")
}
// Create a copy of the policies map to avoid mutation issues
policies := make(map[string]credential.PolicyDocument)
for name, doc := range store.policies {
policies[name] = doc
}
return policies, nil
}
// GetPolicy retrieves a specific IAM policy by name from memory
func (store *MemoryStore) GetPolicy(ctx context.Context, name string) (*credential.PolicyDocument, error) {
store.mu.RLock()
defer store.mu.RUnlock()
if policy, exists := store.policies[name]; exists {
return &policy, nil
}
return nil, nil // Policy not found
}
// CreatePolicy creates a new IAM policy in memory
func (store *MemoryStore) CreatePolicy(ctx context.Context, name string, document credential.PolicyDocument) error {
store.mu.Lock()
defer store.mu.Unlock()
if !store.initialized {
return fmt.Errorf("store not initialized")
}
store.policies[name] = document
return nil
}
// UpdatePolicy updates an existing IAM policy in memory
func (store *MemoryStore) UpdatePolicy(ctx context.Context, name string, document credential.PolicyDocument) error {
store.mu.Lock()
defer store.mu.Unlock()
if !store.initialized {
return fmt.Errorf("store not initialized")
}
store.policies[name] = document
return nil
}
// DeletePolicy deletes an IAM policy from memory
func (store *MemoryStore) DeletePolicy(ctx context.Context, name string) error {
store.mu.Lock()
defer store.mu.Unlock()
if !store.initialized {
return fmt.Errorf("store not initialized")
}
delete(store.policies, name)
return nil
}

View File

@@ -1,9 +1,6 @@
package memory
import (
"context"
"encoding/json"
"fmt"
"sync"
"github.com/seaweedfs/seaweedfs/weed/credential"
@@ -19,8 +16,9 @@ func init() {
// This is primarily intended for testing purposes
type MemoryStore struct {
mu sync.RWMutex
users map[string]*iam_pb.Identity // username -> identity
accessKeys map[string]string // access_key -> username
users map[string]*iam_pb.Identity // username -> identity
accessKeys map[string]string // access_key -> username
policies map[string]credential.PolicyDocument // policy_name -> policy_document
initialized bool
}
@@ -38,313 +36,22 @@ func (store *MemoryStore) Initialize(configuration util.Configuration, prefix st
store.users = make(map[string]*iam_pb.Identity)
store.accessKeys = make(map[string]string)
store.policies = make(map[string]credential.PolicyDocument)
store.initialized = true
return nil
}
func (store *MemoryStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiConfiguration, error) {
store.mu.RLock()
defer store.mu.RUnlock()
if !store.initialized {
return nil, fmt.Errorf("store not initialized")
}
config := &iam_pb.S3ApiConfiguration{}
// Convert all users to identities
for _, user := range store.users {
// Deep copy the identity to avoid mutation issues
identityCopy := store.deepCopyIdentity(user)
config.Identities = append(config.Identities, identityCopy)
}
return config, nil
}
func (store *MemoryStore) SaveConfiguration(ctx context.Context, config *iam_pb.S3ApiConfiguration) error {
store.mu.Lock()
defer store.mu.Unlock()
if !store.initialized {
return fmt.Errorf("store not initialized")
}
// Clear existing data
store.users = make(map[string]*iam_pb.Identity)
store.accessKeys = make(map[string]string)
// Add all identities
for _, identity := range config.Identities {
// Deep copy to avoid mutation issues
identityCopy := store.deepCopyIdentity(identity)
store.users[identity.Name] = identityCopy
// Index access keys
for _, credential := range identity.Credentials {
store.accessKeys[credential.AccessKey] = identity.Name
}
}
return nil
}
func (store *MemoryStore) CreateUser(ctx context.Context, identity *iam_pb.Identity) error {
store.mu.Lock()
defer store.mu.Unlock()
if !store.initialized {
return fmt.Errorf("store not initialized")
}
if _, exists := store.users[identity.Name]; exists {
return credential.ErrUserAlreadyExists
}
// Check for duplicate access keys
for _, cred := range identity.Credentials {
if _, exists := store.accessKeys[cred.AccessKey]; exists {
return fmt.Errorf("access key %s already exists", cred.AccessKey)
}
}
// Deep copy to avoid mutation issues
identityCopy := store.deepCopyIdentity(identity)
store.users[identity.Name] = identityCopy
// Index access keys
for _, cred := range identity.Credentials {
store.accessKeys[cred.AccessKey] = identity.Name
}
return nil
}
func (store *MemoryStore) GetUser(ctx context.Context, username string) (*iam_pb.Identity, error) {
store.mu.RLock()
defer store.mu.RUnlock()
if !store.initialized {
return nil, fmt.Errorf("store not initialized")
}
user, exists := store.users[username]
if !exists {
return nil, credential.ErrUserNotFound
}
// Return a deep copy to avoid mutation issues
return store.deepCopyIdentity(user), nil
}
func (store *MemoryStore) UpdateUser(ctx context.Context, username string, identity *iam_pb.Identity) error {
store.mu.Lock()
defer store.mu.Unlock()
if !store.initialized {
return fmt.Errorf("store not initialized")
}
existingUser, exists := store.users[username]
if !exists {
return credential.ErrUserNotFound
}
// Remove old access keys from index
for _, cred := range existingUser.Credentials {
delete(store.accessKeys, cred.AccessKey)
}
// Check for duplicate access keys (excluding current user)
for _, cred := range identity.Credentials {
if existingUsername, exists := store.accessKeys[cred.AccessKey]; exists && existingUsername != username {
return fmt.Errorf("access key %s already exists", cred.AccessKey)
}
}
// Deep copy to avoid mutation issues
identityCopy := store.deepCopyIdentity(identity)
store.users[username] = identityCopy
// Re-index access keys
for _, cred := range identity.Credentials {
store.accessKeys[cred.AccessKey] = username
}
return nil
}
func (store *MemoryStore) DeleteUser(ctx context.Context, username string) error {
store.mu.Lock()
defer store.mu.Unlock()
if !store.initialized {
return fmt.Errorf("store not initialized")
}
user, exists := store.users[username]
if !exists {
return credential.ErrUserNotFound
}
// Remove access keys from index
for _, cred := range user.Credentials {
delete(store.accessKeys, cred.AccessKey)
}
// Remove user
delete(store.users, username)
return nil
}
func (store *MemoryStore) ListUsers(ctx context.Context) ([]string, error) {
store.mu.RLock()
defer store.mu.RUnlock()
if !store.initialized {
return nil, fmt.Errorf("store not initialized")
}
var usernames []string
for username := range store.users {
usernames = append(usernames, username)
}
return usernames, nil
}
func (store *MemoryStore) GetUserByAccessKey(ctx context.Context, accessKey string) (*iam_pb.Identity, error) {
store.mu.RLock()
defer store.mu.RUnlock()
if !store.initialized {
return nil, fmt.Errorf("store not initialized")
}
username, exists := store.accessKeys[accessKey]
if !exists {
return nil, credential.ErrAccessKeyNotFound
}
user, exists := store.users[username]
if !exists {
// This should not happen, but handle it gracefully
return nil, credential.ErrUserNotFound
}
// Return a deep copy to avoid mutation issues
return store.deepCopyIdentity(user), nil
}
func (store *MemoryStore) CreateAccessKey(ctx context.Context, username string, cred *iam_pb.Credential) error {
store.mu.Lock()
defer store.mu.Unlock()
if !store.initialized {
return fmt.Errorf("store not initialized")
}
user, exists := store.users[username]
if !exists {
return credential.ErrUserNotFound
}
// Check if access key already exists
if _, exists := store.accessKeys[cred.AccessKey]; exists {
return fmt.Errorf("access key %s already exists", cred.AccessKey)
}
// Add credential to user
user.Credentials = append(user.Credentials, &iam_pb.Credential{
AccessKey: cred.AccessKey,
SecretKey: cred.SecretKey,
})
// Index the access key
store.accessKeys[cred.AccessKey] = username
return nil
}
func (store *MemoryStore) DeleteAccessKey(ctx context.Context, username string, accessKey string) error {
store.mu.Lock()
defer store.mu.Unlock()
if !store.initialized {
return fmt.Errorf("store not initialized")
}
user, exists := store.users[username]
if !exists {
return credential.ErrUserNotFound
}
// Find and remove the credential
var newCredentials []*iam_pb.Credential
found := false
for _, cred := range user.Credentials {
if cred.AccessKey == accessKey {
found = true
// Remove from access key index
delete(store.accessKeys, accessKey)
} else {
newCredentials = append(newCredentials, cred)
}
}
if !found {
return credential.ErrAccessKeyNotFound
}
user.Credentials = newCredentials
return nil
}
func (store *MemoryStore) Shutdown() {
store.mu.Lock()
defer store.mu.Unlock()
// Clear all data
store.users = nil
store.accessKeys = nil
store.policies = nil
store.initialized = false
}
// deepCopyIdentity creates a deep copy of an identity to avoid mutation issues
func (store *MemoryStore) deepCopyIdentity(identity *iam_pb.Identity) *iam_pb.Identity {
if identity == nil {
return nil
}
// Use JSON marshaling/unmarshaling for deep copy
// This is simple and safe for protobuf messages
data, err := json.Marshal(identity)
if err != nil {
// Fallback to shallow copy if JSON fails
return &iam_pb.Identity{
Name: identity.Name,
Account: identity.Account,
Credentials: identity.Credentials,
Actions: identity.Actions,
}
}
var copy iam_pb.Identity
if err := json.Unmarshal(data, &copy); err != nil {
// Fallback to shallow copy if JSON fails
return &iam_pb.Identity{
Name: identity.Name,
Account: identity.Account,
Credentials: identity.Credentials,
Actions: identity.Actions,
}
}
return &copy
}
// Reset clears all data in the store (useful for testing)
func (store *MemoryStore) Reset() {
store.mu.Lock()

View File

@@ -0,0 +1,446 @@
package postgres
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
)
func (store *PostgresStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiConfiguration, error) {
if !store.configured {
return nil, fmt.Errorf("store not configured")
}
config := &iam_pb.S3ApiConfiguration{}
// Query all users
rows, err := store.db.QueryContext(ctx, "SELECT username, email, account_data, actions FROM users")
if err != nil {
return nil, fmt.Errorf("failed to query users: %v", err)
}
defer rows.Close()
for rows.Next() {
var username, email string
var accountDataJSON, actionsJSON []byte
if err := rows.Scan(&username, &email, &accountDataJSON, &actionsJSON); err != nil {
return nil, fmt.Errorf("failed to scan user row: %v", err)
}
identity := &iam_pb.Identity{
Name: username,
}
// Parse account data
if len(accountDataJSON) > 0 {
if err := json.Unmarshal(accountDataJSON, &identity.Account); err != nil {
return nil, fmt.Errorf("failed to unmarshal account data for user %s: %v", username, err)
}
}
// Parse actions
if len(actionsJSON) > 0 {
if err := json.Unmarshal(actionsJSON, &identity.Actions); err != nil {
return nil, fmt.Errorf("failed to unmarshal actions for user %s: %v", username, err)
}
}
// Query credentials for this user
credRows, err := store.db.QueryContext(ctx, "SELECT access_key, secret_key FROM credentials WHERE username = $1", username)
if err != nil {
return nil, fmt.Errorf("failed to query credentials for user %s: %v", username, err)
}
for credRows.Next() {
var accessKey, secretKey string
if err := credRows.Scan(&accessKey, &secretKey); err != nil {
credRows.Close()
return nil, fmt.Errorf("failed to scan credential row for user %s: %v", username, err)
}
identity.Credentials = append(identity.Credentials, &iam_pb.Credential{
AccessKey: accessKey,
SecretKey: secretKey,
})
}
credRows.Close()
config.Identities = append(config.Identities, identity)
}
return config, nil
}
func (store *PostgresStore) SaveConfiguration(ctx context.Context, config *iam_pb.S3ApiConfiguration) error {
if !store.configured {
return fmt.Errorf("store not configured")
}
// Start transaction
tx, err := store.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %v", err)
}
defer tx.Rollback()
// Clear existing data
if _, err := tx.ExecContext(ctx, "DELETE FROM credentials"); err != nil {
return fmt.Errorf("failed to clear credentials: %v", err)
}
if _, err := tx.ExecContext(ctx, "DELETE FROM users"); err != nil {
return fmt.Errorf("failed to clear users: %v", err)
}
// Insert all identities
for _, identity := range config.Identities {
// Marshal account data
var accountDataJSON []byte
if identity.Account != nil {
accountDataJSON, err = json.Marshal(identity.Account)
if err != nil {
return fmt.Errorf("failed to marshal account data for user %s: %v", identity.Name, err)
}
}
// Marshal actions
var actionsJSON []byte
if identity.Actions != nil {
actionsJSON, err = json.Marshal(identity.Actions)
if err != nil {
return fmt.Errorf("failed to marshal actions for user %s: %v", identity.Name, err)
}
}
// Insert user
_, err := tx.ExecContext(ctx,
"INSERT INTO users (username, email, account_data, actions) VALUES ($1, $2, $3, $4)",
identity.Name, "", accountDataJSON, actionsJSON)
if err != nil {
return fmt.Errorf("failed to insert user %s: %v", identity.Name, err)
}
// Insert credentials
for _, cred := range identity.Credentials {
_, err := tx.ExecContext(ctx,
"INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)",
identity.Name, cred.AccessKey, cred.SecretKey)
if err != nil {
return fmt.Errorf("failed to insert credential for user %s: %v", identity.Name, err)
}
}
}
return tx.Commit()
}
func (store *PostgresStore) CreateUser(ctx context.Context, identity *iam_pb.Identity) error {
if !store.configured {
return fmt.Errorf("store not configured")
}
// Check if user already exists
var count int
err := store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", identity.Name).Scan(&count)
if err != nil {
return fmt.Errorf("failed to check user existence: %v", err)
}
if count > 0 {
return credential.ErrUserAlreadyExists
}
// Start transaction
tx, err := store.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %v", err)
}
defer tx.Rollback()
// Marshal account data
var accountDataJSON []byte
if identity.Account != nil {
accountDataJSON, err = json.Marshal(identity.Account)
if err != nil {
return fmt.Errorf("failed to marshal account data: %v", err)
}
}
// Marshal actions
var actionsJSON []byte
if identity.Actions != nil {
actionsJSON, err = json.Marshal(identity.Actions)
if err != nil {
return fmt.Errorf("failed to marshal actions: %v", err)
}
}
// Insert user
_, err = tx.ExecContext(ctx,
"INSERT INTO users (username, email, account_data, actions) VALUES ($1, $2, $3, $4)",
identity.Name, "", accountDataJSON, actionsJSON)
if err != nil {
return fmt.Errorf("failed to insert user: %v", err)
}
// Insert credentials
for _, cred := range identity.Credentials {
_, err = tx.ExecContext(ctx,
"INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)",
identity.Name, cred.AccessKey, cred.SecretKey)
if err != nil {
return fmt.Errorf("failed to insert credential: %v", err)
}
}
return tx.Commit()
}
func (store *PostgresStore) GetUser(ctx context.Context, username string) (*iam_pb.Identity, error) {
if !store.configured {
return nil, fmt.Errorf("store not configured")
}
var email string
var accountDataJSON, actionsJSON []byte
err := store.db.QueryRowContext(ctx,
"SELECT email, account_data, actions FROM users WHERE username = $1",
username).Scan(&email, &accountDataJSON, &actionsJSON)
if err != nil {
if err == sql.ErrNoRows {
return nil, credential.ErrUserNotFound
}
return nil, fmt.Errorf("failed to query user: %v", err)
}
identity := &iam_pb.Identity{
Name: username,
}
// Parse account data
if len(accountDataJSON) > 0 {
if err := json.Unmarshal(accountDataJSON, &identity.Account); err != nil {
return nil, fmt.Errorf("failed to unmarshal account data: %v", err)
}
}
// Parse actions
if len(actionsJSON) > 0 {
if err := json.Unmarshal(actionsJSON, &identity.Actions); err != nil {
return nil, fmt.Errorf("failed to unmarshal actions: %v", err)
}
}
// Query credentials
rows, err := store.db.QueryContext(ctx, "SELECT access_key, secret_key FROM credentials WHERE username = $1", username)
if err != nil {
return nil, fmt.Errorf("failed to query credentials: %v", err)
}
defer rows.Close()
for rows.Next() {
var accessKey, secretKey string
if err := rows.Scan(&accessKey, &secretKey); err != nil {
return nil, fmt.Errorf("failed to scan credential: %v", err)
}
identity.Credentials = append(identity.Credentials, &iam_pb.Credential{
AccessKey: accessKey,
SecretKey: secretKey,
})
}
return identity, nil
}
func (store *PostgresStore) UpdateUser(ctx context.Context, username string, identity *iam_pb.Identity) error {
if !store.configured {
return fmt.Errorf("store not configured")
}
// Start transaction
tx, err := store.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %v", err)
}
defer tx.Rollback()
// Check if user exists
var count int
err = tx.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", username).Scan(&count)
if err != nil {
return fmt.Errorf("failed to check user existence: %v", err)
}
if count == 0 {
return credential.ErrUserNotFound
}
// Marshal account data
var accountDataJSON []byte
if identity.Account != nil {
accountDataJSON, err = json.Marshal(identity.Account)
if err != nil {
return fmt.Errorf("failed to marshal account data: %v", err)
}
}
// Marshal actions
var actionsJSON []byte
if identity.Actions != nil {
actionsJSON, err = json.Marshal(identity.Actions)
if err != nil {
return fmt.Errorf("failed to marshal actions: %v", err)
}
}
// Update user
_, err = tx.ExecContext(ctx,
"UPDATE users SET email = $2, account_data = $3, actions = $4, updated_at = CURRENT_TIMESTAMP WHERE username = $1",
username, "", accountDataJSON, actionsJSON)
if err != nil {
return fmt.Errorf("failed to update user: %v", err)
}
// Delete existing credentials
_, err = tx.ExecContext(ctx, "DELETE FROM credentials WHERE username = $1", username)
if err != nil {
return fmt.Errorf("failed to delete existing credentials: %v", err)
}
// Insert new credentials
for _, cred := range identity.Credentials {
_, err = tx.ExecContext(ctx,
"INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)",
username, cred.AccessKey, cred.SecretKey)
if err != nil {
return fmt.Errorf("failed to insert credential: %v", err)
}
}
return tx.Commit()
}
func (store *PostgresStore) DeleteUser(ctx context.Context, username string) error {
if !store.configured {
return fmt.Errorf("store not configured")
}
result, err := store.db.ExecContext(ctx, "DELETE FROM users WHERE username = $1", username)
if err != nil {
return fmt.Errorf("failed to delete user: %v", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %v", err)
}
if rowsAffected == 0 {
return credential.ErrUserNotFound
}
return nil
}
func (store *PostgresStore) ListUsers(ctx context.Context) ([]string, error) {
if !store.configured {
return nil, fmt.Errorf("store not configured")
}
rows, err := store.db.QueryContext(ctx, "SELECT username FROM users ORDER BY username")
if err != nil {
return nil, fmt.Errorf("failed to query users: %v", err)
}
defer rows.Close()
var usernames []string
for rows.Next() {
var username string
if err := rows.Scan(&username); err != nil {
return nil, fmt.Errorf("failed to scan username: %v", err)
}
usernames = append(usernames, username)
}
return usernames, nil
}
func (store *PostgresStore) GetUserByAccessKey(ctx context.Context, accessKey string) (*iam_pb.Identity, error) {
if !store.configured {
return nil, fmt.Errorf("store not configured")
}
var username string
err := store.db.QueryRowContext(ctx, "SELECT username FROM credentials WHERE access_key = $1", accessKey).Scan(&username)
if err != nil {
if err == sql.ErrNoRows {
return nil, credential.ErrAccessKeyNotFound
}
return nil, fmt.Errorf("failed to query access key: %v", err)
}
return store.GetUser(ctx, username)
}
func (store *PostgresStore) CreateAccessKey(ctx context.Context, username string, cred *iam_pb.Credential) error {
if !store.configured {
return fmt.Errorf("store not configured")
}
// Check if user exists
var count int
err := store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", username).Scan(&count)
if err != nil {
return fmt.Errorf("failed to check user existence: %v", err)
}
if count == 0 {
return credential.ErrUserNotFound
}
// Insert credential
_, err = store.db.ExecContext(ctx,
"INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)",
username, cred.AccessKey, cred.SecretKey)
if err != nil {
return fmt.Errorf("failed to insert credential: %v", err)
}
return nil
}
func (store *PostgresStore) DeleteAccessKey(ctx context.Context, username string, accessKey string) error {
if !store.configured {
return fmt.Errorf("store not configured")
}
result, err := store.db.ExecContext(ctx,
"DELETE FROM credentials WHERE username = $1 AND access_key = $2",
username, accessKey)
if err != nil {
return fmt.Errorf("failed to delete access key: %v", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %v", err)
}
if rowsAffected == 0 {
// Check if user exists
var count int
err = store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", username).Scan(&count)
if err != nil {
return fmt.Errorf("failed to check user existence: %v", err)
}
if count == 0 {
return credential.ErrUserNotFound
}
return credential.ErrAccessKeyNotFound
}
return nil
}

View File

@@ -0,0 +1,130 @@
package postgres
import (
"context"
"encoding/json"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/credential"
)
// GetPolicies retrieves all IAM policies from PostgreSQL
func (store *PostgresStore) GetPolicies(ctx context.Context) (map[string]credential.PolicyDocument, error) {
if !store.configured {
return nil, fmt.Errorf("store not configured")
}
policies := make(map[string]credential.PolicyDocument)
rows, err := store.db.QueryContext(ctx, "SELECT name, document FROM policies")
if err != nil {
return nil, fmt.Errorf("failed to query policies: %v", err)
}
defer rows.Close()
for rows.Next() {
var name string
var documentJSON []byte
if err := rows.Scan(&name, &documentJSON); err != nil {
return nil, fmt.Errorf("failed to scan policy row: %v", err)
}
var document credential.PolicyDocument
if err := json.Unmarshal(documentJSON, &document); err != nil {
return nil, fmt.Errorf("failed to unmarshal policy document for %s: %v", name, err)
}
policies[name] = document
}
return policies, nil
}
// CreatePolicy creates a new IAM policy in PostgreSQL
func (store *PostgresStore) CreatePolicy(ctx context.Context, name string, document credential.PolicyDocument) error {
if !store.configured {
return fmt.Errorf("store not configured")
}
documentJSON, err := json.Marshal(document)
if err != nil {
return fmt.Errorf("failed to marshal policy document: %v", err)
}
_, err = store.db.ExecContext(ctx,
"INSERT INTO policies (name, document) VALUES ($1, $2) ON CONFLICT (name) DO UPDATE SET document = $2, updated_at = CURRENT_TIMESTAMP",
name, documentJSON)
if err != nil {
return fmt.Errorf("failed to insert policy: %v", err)
}
return nil
}
// UpdatePolicy updates an existing IAM policy in PostgreSQL
func (store *PostgresStore) UpdatePolicy(ctx context.Context, name string, document credential.PolicyDocument) error {
if !store.configured {
return fmt.Errorf("store not configured")
}
documentJSON, err := json.Marshal(document)
if err != nil {
return fmt.Errorf("failed to marshal policy document: %v", err)
}
result, err := store.db.ExecContext(ctx,
"UPDATE policies SET document = $2, updated_at = CURRENT_TIMESTAMP WHERE name = $1",
name, documentJSON)
if err != nil {
return fmt.Errorf("failed to update policy: %v", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %v", err)
}
if rowsAffected == 0 {
return fmt.Errorf("policy %s not found", name)
}
return nil
}
// DeletePolicy deletes an IAM policy from PostgreSQL
func (store *PostgresStore) DeletePolicy(ctx context.Context, name string) error {
if !store.configured {
return fmt.Errorf("store not configured")
}
result, err := store.db.ExecContext(ctx, "DELETE FROM policies WHERE name = $1", name)
if err != nil {
return fmt.Errorf("failed to delete policy: %v", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %v", err)
}
if rowsAffected == 0 {
return fmt.Errorf("policy %s not found", name)
}
return nil
}
// GetPolicy retrieves a specific IAM policy by name from PostgreSQL
func (store *PostgresStore) GetPolicy(ctx context.Context, name string) (*credential.PolicyDocument, error) {
policies, err := store.GetPolicies(ctx)
if err != nil {
return nil, err
}
if policy, exists := policies[name]; exists {
return &policy, nil
}
return nil, nil // Policy not found
}

View File

@@ -1,14 +1,11 @@
package postgres
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
_ "github.com/lib/pq"
@@ -114,6 +111,17 @@ func (store *PostgresStore) createTables() error {
CREATE INDEX IF NOT EXISTS idx_credentials_access_key ON credentials(access_key);
`
// Create policies table
policiesTable := `
CREATE TABLE IF NOT EXISTS policies (
name VARCHAR(255) PRIMARY KEY,
document JSONB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_policies_name ON policies(name);
`
// Execute table creation
if _, err := store.db.Exec(usersTable); err != nil {
return fmt.Errorf("failed to create users table: %v", err)
@@ -123,439 +131,8 @@ func (store *PostgresStore) createTables() error {
return fmt.Errorf("failed to create credentials table: %v", err)
}
return nil
}
func (store *PostgresStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiConfiguration, error) {
if !store.configured {
return nil, fmt.Errorf("store not configured")
}
config := &iam_pb.S3ApiConfiguration{}
// Query all users
rows, err := store.db.QueryContext(ctx, "SELECT username, email, account_data, actions FROM users")
if err != nil {
return nil, fmt.Errorf("failed to query users: %v", err)
}
defer rows.Close()
for rows.Next() {
var username, email string
var accountDataJSON, actionsJSON []byte
if err := rows.Scan(&username, &email, &accountDataJSON, &actionsJSON); err != nil {
return nil, fmt.Errorf("failed to scan user row: %v", err)
}
identity := &iam_pb.Identity{
Name: username,
}
// Parse account data
if len(accountDataJSON) > 0 {
if err := json.Unmarshal(accountDataJSON, &identity.Account); err != nil {
return nil, fmt.Errorf("failed to unmarshal account data for user %s: %v", username, err)
}
}
// Parse actions
if len(actionsJSON) > 0 {
if err := json.Unmarshal(actionsJSON, &identity.Actions); err != nil {
return nil, fmt.Errorf("failed to unmarshal actions for user %s: %v", username, err)
}
}
// Query credentials for this user
credRows, err := store.db.QueryContext(ctx, "SELECT access_key, secret_key FROM credentials WHERE username = $1", username)
if err != nil {
return nil, fmt.Errorf("failed to query credentials for user %s: %v", username, err)
}
for credRows.Next() {
var accessKey, secretKey string
if err := credRows.Scan(&accessKey, &secretKey); err != nil {
credRows.Close()
return nil, fmt.Errorf("failed to scan credential row for user %s: %v", username, err)
}
identity.Credentials = append(identity.Credentials, &iam_pb.Credential{
AccessKey: accessKey,
SecretKey: secretKey,
})
}
credRows.Close()
config.Identities = append(config.Identities, identity)
}
return config, nil
}
func (store *PostgresStore) SaveConfiguration(ctx context.Context, config *iam_pb.S3ApiConfiguration) error {
if !store.configured {
return fmt.Errorf("store not configured")
}
// Start transaction
tx, err := store.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %v", err)
}
defer tx.Rollback()
// Clear existing data
if _, err := tx.ExecContext(ctx, "DELETE FROM credentials"); err != nil {
return fmt.Errorf("failed to clear credentials: %v", err)
}
if _, err := tx.ExecContext(ctx, "DELETE FROM users"); err != nil {
return fmt.Errorf("failed to clear users: %v", err)
}
// Insert all identities
for _, identity := range config.Identities {
// Marshal account data
var accountDataJSON []byte
if identity.Account != nil {
accountDataJSON, err = json.Marshal(identity.Account)
if err != nil {
return fmt.Errorf("failed to marshal account data for user %s: %v", identity.Name, err)
}
}
// Marshal actions
var actionsJSON []byte
if identity.Actions != nil {
actionsJSON, err = json.Marshal(identity.Actions)
if err != nil {
return fmt.Errorf("failed to marshal actions for user %s: %v", identity.Name, err)
}
}
// Insert user
_, err := tx.ExecContext(ctx,
"INSERT INTO users (username, email, account_data, actions) VALUES ($1, $2, $3, $4)",
identity.Name, "", accountDataJSON, actionsJSON)
if err != nil {
return fmt.Errorf("failed to insert user %s: %v", identity.Name, err)
}
// Insert credentials
for _, cred := range identity.Credentials {
_, err := tx.ExecContext(ctx,
"INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)",
identity.Name, cred.AccessKey, cred.SecretKey)
if err != nil {
return fmt.Errorf("failed to insert credential for user %s: %v", identity.Name, err)
}
}
}
return tx.Commit()
}
func (store *PostgresStore) CreateUser(ctx context.Context, identity *iam_pb.Identity) error {
if !store.configured {
return fmt.Errorf("store not configured")
}
// Check if user already exists
var count int
err := store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", identity.Name).Scan(&count)
if err != nil {
return fmt.Errorf("failed to check user existence: %v", err)
}
if count > 0 {
return credential.ErrUserAlreadyExists
}
// Start transaction
tx, err := store.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %v", err)
}
defer tx.Rollback()
// Marshal account data
var accountDataJSON []byte
if identity.Account != nil {
accountDataJSON, err = json.Marshal(identity.Account)
if err != nil {
return fmt.Errorf("failed to marshal account data: %v", err)
}
}
// Marshal actions
var actionsJSON []byte
if identity.Actions != nil {
actionsJSON, err = json.Marshal(identity.Actions)
if err != nil {
return fmt.Errorf("failed to marshal actions: %v", err)
}
}
// Insert user
_, err = tx.ExecContext(ctx,
"INSERT INTO users (username, email, account_data, actions) VALUES ($1, $2, $3, $4)",
identity.Name, "", accountDataJSON, actionsJSON)
if err != nil {
return fmt.Errorf("failed to insert user: %v", err)
}
// Insert credentials
for _, cred := range identity.Credentials {
_, err = tx.ExecContext(ctx,
"INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)",
identity.Name, cred.AccessKey, cred.SecretKey)
if err != nil {
return fmt.Errorf("failed to insert credential: %v", err)
}
}
return tx.Commit()
}
func (store *PostgresStore) GetUser(ctx context.Context, username string) (*iam_pb.Identity, error) {
if !store.configured {
return nil, fmt.Errorf("store not configured")
}
var email string
var accountDataJSON, actionsJSON []byte
err := store.db.QueryRowContext(ctx,
"SELECT email, account_data, actions FROM users WHERE username = $1",
username).Scan(&email, &accountDataJSON, &actionsJSON)
if err != nil {
if err == sql.ErrNoRows {
return nil, credential.ErrUserNotFound
}
return nil, fmt.Errorf("failed to query user: %v", err)
}
identity := &iam_pb.Identity{
Name: username,
}
// Parse account data
if len(accountDataJSON) > 0 {
if err := json.Unmarshal(accountDataJSON, &identity.Account); err != nil {
return nil, fmt.Errorf("failed to unmarshal account data: %v", err)
}
}
// Parse actions
if len(actionsJSON) > 0 {
if err := json.Unmarshal(actionsJSON, &identity.Actions); err != nil {
return nil, fmt.Errorf("failed to unmarshal actions: %v", err)
}
}
// Query credentials
rows, err := store.db.QueryContext(ctx, "SELECT access_key, secret_key FROM credentials WHERE username = $1", username)
if err != nil {
return nil, fmt.Errorf("failed to query credentials: %v", err)
}
defer rows.Close()
for rows.Next() {
var accessKey, secretKey string
if err := rows.Scan(&accessKey, &secretKey); err != nil {
return nil, fmt.Errorf("failed to scan credential: %v", err)
}
identity.Credentials = append(identity.Credentials, &iam_pb.Credential{
AccessKey: accessKey,
SecretKey: secretKey,
})
}
return identity, nil
}
func (store *PostgresStore) UpdateUser(ctx context.Context, username string, identity *iam_pb.Identity) error {
if !store.configured {
return fmt.Errorf("store not configured")
}
// Start transaction
tx, err := store.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %v", err)
}
defer tx.Rollback()
// Check if user exists
var count int
err = tx.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", username).Scan(&count)
if err != nil {
return fmt.Errorf("failed to check user existence: %v", err)
}
if count == 0 {
return credential.ErrUserNotFound
}
// Marshal account data
var accountDataJSON []byte
if identity.Account != nil {
accountDataJSON, err = json.Marshal(identity.Account)
if err != nil {
return fmt.Errorf("failed to marshal account data: %v", err)
}
}
// Marshal actions
var actionsJSON []byte
if identity.Actions != nil {
actionsJSON, err = json.Marshal(identity.Actions)
if err != nil {
return fmt.Errorf("failed to marshal actions: %v", err)
}
}
// Update user
_, err = tx.ExecContext(ctx,
"UPDATE users SET email = $2, account_data = $3, actions = $4, updated_at = CURRENT_TIMESTAMP WHERE username = $1",
username, "", accountDataJSON, actionsJSON)
if err != nil {
return fmt.Errorf("failed to update user: %v", err)
}
// Delete existing credentials
_, err = tx.ExecContext(ctx, "DELETE FROM credentials WHERE username = $1", username)
if err != nil {
return fmt.Errorf("failed to delete existing credentials: %v", err)
}
// Insert new credentials
for _, cred := range identity.Credentials {
_, err = tx.ExecContext(ctx,
"INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)",
username, cred.AccessKey, cred.SecretKey)
if err != nil {
return fmt.Errorf("failed to insert credential: %v", err)
}
}
return tx.Commit()
}
func (store *PostgresStore) DeleteUser(ctx context.Context, username string) error {
if !store.configured {
return fmt.Errorf("store not configured")
}
result, err := store.db.ExecContext(ctx, "DELETE FROM users WHERE username = $1", username)
if err != nil {
return fmt.Errorf("failed to delete user: %v", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %v", err)
}
if rowsAffected == 0 {
return credential.ErrUserNotFound
}
return nil
}
func (store *PostgresStore) ListUsers(ctx context.Context) ([]string, error) {
if !store.configured {
return nil, fmt.Errorf("store not configured")
}
rows, err := store.db.QueryContext(ctx, "SELECT username FROM users ORDER BY username")
if err != nil {
return nil, fmt.Errorf("failed to query users: %v", err)
}
defer rows.Close()
var usernames []string
for rows.Next() {
var username string
if err := rows.Scan(&username); err != nil {
return nil, fmt.Errorf("failed to scan username: %v", err)
}
usernames = append(usernames, username)
}
return usernames, nil
}
func (store *PostgresStore) GetUserByAccessKey(ctx context.Context, accessKey string) (*iam_pb.Identity, error) {
if !store.configured {
return nil, fmt.Errorf("store not configured")
}
var username string
err := store.db.QueryRowContext(ctx, "SELECT username FROM credentials WHERE access_key = $1", accessKey).Scan(&username)
if err != nil {
if err == sql.ErrNoRows {
return nil, credential.ErrAccessKeyNotFound
}
return nil, fmt.Errorf("failed to query access key: %v", err)
}
return store.GetUser(ctx, username)
}
func (store *PostgresStore) CreateAccessKey(ctx context.Context, username string, cred *iam_pb.Credential) error {
if !store.configured {
return fmt.Errorf("store not configured")
}
// Check if user exists
var count int
err := store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", username).Scan(&count)
if err != nil {
return fmt.Errorf("failed to check user existence: %v", err)
}
if count == 0 {
return credential.ErrUserNotFound
}
// Insert credential
_, err = store.db.ExecContext(ctx,
"INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)",
username, cred.AccessKey, cred.SecretKey)
if err != nil {
return fmt.Errorf("failed to insert credential: %v", err)
}
return nil
}
func (store *PostgresStore) DeleteAccessKey(ctx context.Context, username string, accessKey string) error {
if !store.configured {
return fmt.Errorf("store not configured")
}
result, err := store.db.ExecContext(ctx,
"DELETE FROM credentials WHERE username = $1 AND access_key = $2",
username, accessKey)
if err != nil {
return fmt.Errorf("failed to delete access key: %v", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %v", err)
}
if rowsAffected == 0 {
// Check if user exists
var count int
err = store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", username).Scan(&count)
if err != nil {
return fmt.Errorf("failed to check user existence: %v", err)
}
if count == 0 {
return credential.ErrUserNotFound
}
return credential.ErrAccessKeyNotFound
if _, err := store.db.Exec(policiesTable); err != nil {
return fmt.Errorf("failed to create policies table: %v", err)
}
return nil

View File

@@ -0,0 +1,146 @@
package test
import (
"context"
"testing"
"github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/credential/memory"
// Import all store implementations to register them
_ "github.com/seaweedfs/seaweedfs/weed/credential/filer_etc"
_ "github.com/seaweedfs/seaweedfs/weed/credential/memory"
_ "github.com/seaweedfs/seaweedfs/weed/credential/postgres"
)
// TestPolicyManagement tests policy management across all credential stores
func TestPolicyManagement(t *testing.T) {
ctx := context.Background()
// Test with memory store (easiest to test)
credentialManager, err := credential.NewCredentialManager(credential.StoreTypeMemory, nil, "")
if err != nil {
t.Fatalf("Failed to create credential manager: %v", err)
}
// Test policy operations
testPolicyOperations(t, ctx, credentialManager)
}
func testPolicyOperations(t *testing.T, ctx context.Context, credentialManager *credential.CredentialManager) {
store := credentialManager.GetStore()
// Cast to memory store to access policy methods
memoryStore, ok := store.(*memory.MemoryStore)
if !ok {
t.Skip("Store is not a memory store")
}
// Test GetPolicies (should be empty initially)
policies, err := memoryStore.GetPolicies(ctx)
if err != nil {
t.Fatalf("Failed to get policies: %v", err)
}
if len(policies) != 0 {
t.Errorf("Expected 0 policies, got %d", len(policies))
}
// Test CreatePolicy
testPolicy := credential.PolicyDocument{
Version: "2012-10-17",
Statement: []*credential.PolicyStatement{
{
Effect: "Allow",
Action: []string{"s3:GetObject"},
Resource: []string{"arn:aws:s3:::test-bucket/*"},
},
},
}
err = memoryStore.CreatePolicy(ctx, "test-policy", testPolicy)
if err != nil {
t.Fatalf("Failed to create policy: %v", err)
}
// Test GetPolicies (should have 1 policy now)
policies, err = memoryStore.GetPolicies(ctx)
if err != nil {
t.Fatalf("Failed to get policies: %v", err)
}
if len(policies) != 1 {
t.Errorf("Expected 1 policy, got %d", len(policies))
}
// Verify policy content
policy, exists := policies["test-policy"]
if !exists {
t.Error("test-policy not found")
}
if policy.Version != "2012-10-17" {
t.Errorf("Expected policy version '2012-10-17', got '%s'", policy.Version)
}
if len(policy.Statement) != 1 {
t.Errorf("Expected 1 statement, got %d", len(policy.Statement))
}
// Test UpdatePolicy
updatedPolicy := credential.PolicyDocument{
Version: "2012-10-17",
Statement: []*credential.PolicyStatement{
{
Effect: "Allow",
Action: []string{"s3:GetObject", "s3:PutObject"},
Resource: []string{"arn:aws:s3:::test-bucket/*"},
},
},
}
err = memoryStore.UpdatePolicy(ctx, "test-policy", updatedPolicy)
if err != nil {
t.Fatalf("Failed to update policy: %v", err)
}
// Verify the update
policies, err = memoryStore.GetPolicies(ctx)
if err != nil {
t.Fatalf("Failed to get policies after update: %v", err)
}
updatedPolicyResult, exists := policies["test-policy"]
if !exists {
t.Error("test-policy not found after update")
}
if len(updatedPolicyResult.Statement) != 1 {
t.Errorf("Expected 1 statement after update, got %d", len(updatedPolicyResult.Statement))
}
if len(updatedPolicyResult.Statement[0].Action) != 2 {
t.Errorf("Expected 2 actions after update, got %d", len(updatedPolicyResult.Statement[0].Action))
}
// Test DeletePolicy
err = memoryStore.DeletePolicy(ctx, "test-policy")
if err != nil {
t.Fatalf("Failed to delete policy: %v", err)
}
// Verify deletion
policies, err = memoryStore.GetPolicies(ctx)
if err != nil {
t.Fatalf("Failed to get policies after deletion: %v", err)
}
if len(policies) != 0 {
t.Errorf("Expected 0 policies after deletion, got %d", len(policies))
}
}
// TestPolicyManagementWithFilerEtc tests policy management with filer_etc store
func TestPolicyManagementWithFilerEtc(t *testing.T) {
// Skip this test if we can't connect to a filer
t.Skip("Filer connection required for filer_etc store testing")
}
// TestPolicyManagementWithPostgres tests policy management with postgres store
func TestPolicyManagementWithPostgres(t *testing.T) {
// Skip this test if we can't connect to PostgreSQL
t.Skip("PostgreSQL connection required for postgres store testing")
}