mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-11-24 16:53:14 +08:00
add distributed lock manager
This commit is contained in:
50
weed/cluster/lock_manager/distributed_lock_manager.go
Normal file
50
weed/cluster/lock_manager/distributed_lock_manager.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package lock_manager
|
||||
|
||||
import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
)
|
||||
|
||||
type DistributedLockManager struct {
|
||||
lockManager *LockManager
|
||||
}
|
||||
|
||||
func NewDistributedLockManager() *DistributedLockManager {
|
||||
return &DistributedLockManager{
|
||||
lockManager: NewLockManager(),
|
||||
}
|
||||
}
|
||||
|
||||
func (dlm *DistributedLockManager) Lock(host pb.ServerAddress, key string, expiredAtNs int64, token string, servers []pb.ServerAddress) (renewToken string, movedTo pb.ServerAddress, err error) {
|
||||
server := HashKeyToServer(key, servers)
|
||||
if server != host {
|
||||
movedTo = server
|
||||
return
|
||||
}
|
||||
renewToken, err = dlm.lockManager.Lock(key, expiredAtNs, token)
|
||||
return
|
||||
}
|
||||
|
||||
func (dlm *DistributedLockManager) Unlock(host pb.ServerAddress, key string, token string, servers []pb.ServerAddress) (movedTo pb.ServerAddress, err error) {
|
||||
server := HashKeyToServer(key, servers)
|
||||
if server != host {
|
||||
movedTo = server
|
||||
return
|
||||
}
|
||||
_, err = dlm.lockManager.Unlock(key, token)
|
||||
return
|
||||
}
|
||||
|
||||
// InsertLock is used to insert a lock to a server unconditionally
|
||||
// It is used when a server is down and the lock is moved to another server
|
||||
func (dlm *DistributedLockManager) InsertLock(key string, expiredAtNs int64, token string) {
|
||||
dlm.lockManager.InsertLock(key, expiredAtNs, token)
|
||||
}
|
||||
func (dlm *DistributedLockManager) SelectNotOwnedLocks(host pb.ServerAddress, servers []pb.ServerAddress) (locks []*Lock) {
|
||||
return dlm.lockManager.SelectLocks(func(key string) bool {
|
||||
server := HashKeyToServer(key, servers)
|
||||
return server != host
|
||||
})
|
||||
}
|
||||
func (dlm *DistributedLockManager) CalculateTargetServer(key string, servers []pb.ServerAddress) pb.ServerAddress {
|
||||
return HashKeyToServer(key, servers)
|
||||
}
|
||||
@@ -12,9 +12,9 @@ type LockManager struct {
|
||||
locks *xsync.MapOf[string, *Lock]
|
||||
}
|
||||
type Lock struct {
|
||||
Token string
|
||||
ExpirationNs int64
|
||||
Key string // only used for moving locks
|
||||
Token string
|
||||
ExpiredAtNs int64
|
||||
Key string // only used for moving locks
|
||||
}
|
||||
|
||||
func NewLockManager() *LockManager {
|
||||
@@ -25,32 +25,37 @@ func NewLockManager() *LockManager {
|
||||
return t
|
||||
}
|
||||
|
||||
func (lm *LockManager) Lock(path string, ttlDuration time.Duration, token string) (renewToken string, err error) {
|
||||
func (lm *LockManager) Lock(path string, expiredAtNs int64, token string) (renewToken string, err error) {
|
||||
lm.locks.Compute(path, func(oldValue *Lock, loaded bool) (newValue *Lock, delete bool) {
|
||||
if oldValue != nil {
|
||||
now := time.Now()
|
||||
if oldValue.ExpirationNs > 0 && oldValue.ExpirationNs < now.UnixNano() {
|
||||
if oldValue.ExpiredAtNs > 0 && oldValue.ExpiredAtNs < time.Now().UnixNano() {
|
||||
// lock is expired, set to a new lock
|
||||
expirationNs := time.Now().Add(ttlDuration).UnixNano()
|
||||
return &Lock{Token: token, ExpirationNs: expirationNs}, false
|
||||
if token != "" {
|
||||
err = fmt.Errorf("lock: non-empty token on an expired lock")
|
||||
return nil, false
|
||||
} else {
|
||||
// new lock
|
||||
renewToken = uuid.New().String()
|
||||
return &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs}, false
|
||||
}
|
||||
}
|
||||
// not expired
|
||||
if oldValue.Token == token {
|
||||
expirationNs := time.Now().Add(ttlDuration).UnixNano()
|
||||
return &Lock{Token: token, ExpirationNs: expirationNs}, false
|
||||
// token matches, renew the lock
|
||||
return &Lock{Token: token, ExpiredAtNs: expiredAtNs}, false
|
||||
} else {
|
||||
err = fmt.Errorf("lock: token mismatch")
|
||||
return oldValue, false
|
||||
}
|
||||
} else {
|
||||
expirationNs := time.Now().Add(ttlDuration).UnixNano()
|
||||
if token == "" {
|
||||
// new lock
|
||||
renewToken = uuid.New().String()
|
||||
return &Lock{Token: renewToken, ExpirationNs: expirationNs}, false
|
||||
return &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs}, false
|
||||
} else {
|
||||
err = fmt.Errorf("lock: non-empty token on a new lock")
|
||||
return nil, false
|
||||
}
|
||||
return &Lock{Token: token, ExpirationNs: expirationNs}, false
|
||||
}
|
||||
})
|
||||
return
|
||||
@@ -60,13 +65,13 @@ func (lm *LockManager) Unlock(path string, token string) (isUnlocked bool, err e
|
||||
lm.locks.Compute(path, func(oldValue *Lock, loaded bool) (newValue *Lock, delete bool) {
|
||||
if oldValue != nil {
|
||||
now := time.Now()
|
||||
if oldValue.ExpirationNs > 0 && oldValue.ExpirationNs < now.UnixNano() {
|
||||
if oldValue.ExpiredAtNs > 0 && oldValue.ExpiredAtNs < now.UnixNano() {
|
||||
// lock is expired, delete it
|
||||
isUnlocked = true
|
||||
return nil, true
|
||||
}
|
||||
if oldValue.Token == token {
|
||||
if oldValue.ExpirationNs <= now.UnixNano() {
|
||||
if oldValue.ExpiredAtNs <= now.UnixNano() {
|
||||
isUnlocked = true
|
||||
return nil, true
|
||||
}
|
||||
@@ -89,7 +94,7 @@ func (lm *LockManager) CleanUp() {
|
||||
time.Sleep(1 * time.Minute)
|
||||
now := time.Now().UnixNano()
|
||||
lm.locks.Range(func(key string, value *Lock) bool {
|
||||
if now > value.ExpirationNs {
|
||||
if now > value.ExpiredAtNs {
|
||||
lm.locks.Delete(key)
|
||||
return true
|
||||
}
|
||||
@@ -98,16 +103,16 @@ func (lm *LockManager) CleanUp() {
|
||||
}
|
||||
}
|
||||
|
||||
// TakeOutLocksByKey takes out locks by key
|
||||
// SelectLocks takes out locks by key
|
||||
// if keyFn return true, the lock will be taken out
|
||||
func (lm *LockManager) TakeOutLocksByKey(keyFn func(key string) bool) (locks []*Lock) {
|
||||
func (lm *LockManager) SelectLocks(selectFn func(key string) bool) (locks []*Lock) {
|
||||
now := time.Now().UnixNano()
|
||||
lm.locks.Range(func(key string, lock *Lock) bool {
|
||||
if now > lock.ExpirationNs {
|
||||
if now > lock.ExpiredAtNs {
|
||||
lm.locks.Delete(key)
|
||||
return true
|
||||
}
|
||||
if keyFn(key) {
|
||||
if selectFn(key) {
|
||||
lm.locks.Delete(key)
|
||||
lock.Key = key
|
||||
locks = append(locks, lock)
|
||||
@@ -116,3 +121,8 @@ func (lm *LockManager) TakeOutLocksByKey(keyFn func(key string) bool) (locks []*
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// InsertLock inserts a lock unconditionally
|
||||
func (lm *LockManager) InsertLock(path string, expiredAtNs int64, token string) {
|
||||
lm.locks.Store(path, &Lock{Token: token, ExpiredAtNs: expiredAtNs})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user