mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-09-20 04:17:56 +08:00
support multiple locks
This commit is contained in:
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
||||
@@ -56,13 +57,66 @@ const (
|
||||
LockDuration = 10 * time.Second
|
||||
)
|
||||
|
||||
type AdminLock struct {
|
||||
accessSecret int64
|
||||
accessLockTime time.Time
|
||||
}
|
||||
|
||||
type AdminLocks struct {
|
||||
locks map[string]*AdminLock
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
func NewAdminLocks() *AdminLocks {
|
||||
return &AdminLocks{
|
||||
locks: make(map[string]*AdminLock),
|
||||
}
|
||||
}
|
||||
|
||||
func (locks *AdminLocks) isLocked(lockName string) bool {
|
||||
locks.RLock()
|
||||
defer locks.RUnlock()
|
||||
adminLock, found := locks.locks[lockName]
|
||||
if !found {
|
||||
return false
|
||||
}
|
||||
return adminLock.accessLockTime.Add(LockDuration).After(time.Now())
|
||||
}
|
||||
|
||||
func (locks *AdminLocks) isValidToken(lockName string, ts time.Time, token int64) bool {
|
||||
locks.RLock()
|
||||
defer locks.RUnlock()
|
||||
adminLock, found := locks.locks[lockName]
|
||||
if !found {
|
||||
return false
|
||||
}
|
||||
return adminLock.accessLockTime.Equal(ts) && adminLock.accessSecret == token
|
||||
}
|
||||
|
||||
func (locks *AdminLocks) generateToken(lockName string) (ts time.Time, token int64) {
|
||||
locks.Lock()
|
||||
defer locks.Unlock()
|
||||
lock := &AdminLock{
|
||||
accessSecret: rand.Int63(),
|
||||
accessLockTime: time.Now(),
|
||||
}
|
||||
locks.locks[lockName] = lock
|
||||
return lock.accessLockTime, lock.accessSecret
|
||||
}
|
||||
|
||||
func (locks *AdminLocks) deleteLock(lockName string) {
|
||||
locks.Lock()
|
||||
defer locks.Unlock()
|
||||
delete(locks.locks, lockName)
|
||||
}
|
||||
|
||||
func (ms *MasterServer) LeaseAdminToken(ctx context.Context, req *master_pb.LeaseAdminTokenRequest) (*master_pb.LeaseAdminTokenResponse, error) {
|
||||
resp := &master_pb.LeaseAdminTokenResponse{}
|
||||
|
||||
if ms.adminAccessSecret != 0 && ms.adminAccessLockTime.Add(LockDuration).After(time.Now()) {
|
||||
if req.PreviousToken != 0 && ms.isValidToken(time.Unix(0, req.PreviousLockTime), req.PreviousToken) {
|
||||
if ms.adminLocks.isLocked(req.LockName) {
|
||||
if req.PreviousToken != 0 && ms.adminLocks.isValidToken(req.LockName, time.Unix(0, req.PreviousLockTime), req.PreviousToken) {
|
||||
// for renew
|
||||
ts, token := ms.generateToken()
|
||||
ts, token := ms.adminLocks.generateToken(req.LockName)
|
||||
resp.Token, resp.LockTsNs = token, ts.UnixNano()
|
||||
return resp, nil
|
||||
}
|
||||
@@ -70,24 +124,15 @@ func (ms *MasterServer) LeaseAdminToken(ctx context.Context, req *master_pb.Leas
|
||||
return resp, fmt.Errorf("already locked")
|
||||
}
|
||||
// for fresh lease request
|
||||
ts, token := ms.generateToken()
|
||||
ts, token := ms.adminLocks.generateToken(req.LockName)
|
||||
resp.Token, resp.LockTsNs = token, ts.UnixNano()
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (ms *MasterServer) isValidToken(ts time.Time, token int64) bool {
|
||||
return ms.adminAccessLockTime.Equal(ts) && ms.adminAccessSecret == token
|
||||
}
|
||||
func (ms *MasterServer) generateToken() (ts time.Time, token int64) {
|
||||
ms.adminAccessLockTime = time.Now()
|
||||
ms.adminAccessSecret = rand.Int63()
|
||||
return ms.adminAccessLockTime, ms.adminAccessSecret
|
||||
}
|
||||
|
||||
func (ms *MasterServer) ReleaseAdminToken(ctx context.Context, req *master_pb.ReleaseAdminTokenRequest) (*master_pb.ReleaseAdminTokenResponse, error) {
|
||||
resp := &master_pb.ReleaseAdminTokenResponse{}
|
||||
if ms.isValidToken(time.Unix(0, req.PreviousLockTime), req.PreviousToken) {
|
||||
ms.adminAccessSecret = 0
|
||||
if ms.adminLocks.isValidToken(req.LockName, time.Unix(0, req.PreviousLockTime), req.PreviousToken) {
|
||||
ms.adminLocks.deleteLock(req.LockName)
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user