Refactored distributed locks.

This commit is contained in:
Daniel Stolt
2015-09-06 21:40:45 +02:00
parent 1da2d7e3e9
commit 86b9f35454
3 changed files with 153 additions and 155 deletions

View File

@@ -87,7 +87,6 @@ namespace Orchard.Tests.Tasks {
[Test]
public void TryAcquiringLockTwiceFails() {
_distributedLockService.DisableMonitorLock = true;
IDistributedLock @lock;
_machineNameProvider.MachineName = "Orchard Test Machine 1";
var attempt1 = _distributedLockService.TryAcquireLock(LockName, TimeSpan.FromSeconds(60), out @lock);
@@ -152,11 +151,10 @@ namespace Orchard.Tests.Tasks {
[Test]
public void MultipleAcquisitionsFromDifferentMachinesShouldFail() {
IDistributedLock @lock;
_distributedLockService.DisableMonitorLock = true;
_machineNameProvider.MachineName = "Orchard Test Machine 1";
var attempt1 = _distributedLockService.TryAcquireLock(LockName, TimeSpan.FromSeconds(60), out @lock);
var attempt1 = _distributedLockService.TryAcquireLock(LockName, TimeSpan.FromMinutes(60), out @lock);
_machineNameProvider.MachineName = "Orchard Test Machine 2";
var attempt2 = _distributedLockService.TryAcquireLock(LockName, TimeSpan.FromSeconds(60), out @lock);
var attempt2 = _distributedLockService.TryAcquireLock(LockName, TimeSpan.FromMinutes(60), out @lock);
Assert.That(attempt1, Is.True);
Assert.That(attempt2, Is.False);
@@ -165,7 +163,6 @@ namespace Orchard.Tests.Tasks {
[Test]
public void MultipleAcquisitionsFromDifferentMachinesOnDifferentTenantShouldSucceed() {
IDistributedLock @lock;
_distributedLockService.DisableMonitorLock = true;
_machineNameProvider.MachineName = "Orchard Test Machine 1";
var attempt1 = _distributedLockService.TryAcquireLock(LockName, TimeSpan.FromSeconds(60), out @lock);
_machineNameProvider.MachineName = "Orchard Test Machine 2";
@@ -194,8 +191,6 @@ namespace Orchard.Tests.Tasks {
[Test]
public void TryAcquireActiveLockWithNullTimeoutReturnsFalseImmediately() {
// Disable monitor locking to simulate concurrent requests
_distributedLockService.DisableMonitorLock = true;
CreateNonExpiredActiveLock("Other Machine");
IDistributedLock @lock;
@@ -206,8 +201,6 @@ namespace Orchard.Tests.Tasks {
[Test]
public void ActiveLockWithUndefinedValidUntilNeverExpires() {
// Disable monitor locking to simulate concurrent requests
_distributedLockService.DisableMonitorLock = true;
CreateNonExpiredActiveLockThatNeverExpires("Other Machine");
_clock.Advance(DateTime.MaxValue - _clock.UtcNow); // Fast forward to the End of Time.
@@ -224,9 +217,9 @@ namespace Orchard.Tests.Tasks {
// Create a never expiring lock.
_machineNameProvider.MachineName = "Orchard Test Machine 1";
var attempt1 = _distributedLockService.TryAcquireLock(LockName, maxValidFor: null, timeout: null, dLock: out @lock);
// Release the lock.
_distributedLockService.ReleaseDistributedLock((DistributedLock)@lock);
@lock.Dispose();
// Acquire the lock from another machine.
_machineNameProvider.MachineName = "Orchard Test Machine 2";
@@ -239,7 +232,7 @@ namespace Orchard.Tests.Tasks {
private DistributedLockRecord CreateLockRecord(DateTime createdUtc, DateTime? validUntilUtc, string machineName) {
var record = new DistributedLockRecord {
Name = ShellSettings.DefaultName + ":" + LockName,
Name = String.Format("DistributedLock:{0}:{1}", ShellSettings.DefaultName, LockName),
CreatedUtc = createdUtc,
ValidUntilUtc = validUntilUtc,
MachineName = machineName,

View File

@@ -1,36 +1,41 @@
using System;
using System.Threading;
namespace Orchard.Tasks.Locking.Services {
public class DistributedLock : IDistributedLock {
private DistributedLockService _service;
private string _name;
private readonly string _name;
private readonly string _internalName;
private readonly Action _releaseLockAction;
private int _count;
public DistributedLock(DistributedLockService service, string name) {
_service = service;
internal DistributedLock(string name, string internalName, Action releaseLockAction) {
_name = name;
_internalName = internalName;
_releaseLockAction = releaseLockAction;
_count = 1;
}
public string Name {
string IDistributedLock.Name {
get {
return _name;
}
}
public void Increment() {
internal string InternalName {
get {
return _internalName;
}
}
internal void Increment() {
_count++;
}
public void Dispose() {
_count--;
if (_count == 0) {
Monitor.Exit(String.Intern(_name));
_service.ReleaseDistributedLock(this);
}
if (_count == 0)
_releaseLockAction();
}
}
}

View File

@@ -1,5 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Threading;
@@ -19,9 +19,10 @@ namespace Orchard.Tasks.Locking.Services {
private readonly IMachineNameProvider _machineNameProvider;
private readonly ILifetimeScope _lifetimeScope;
private readonly IClock _clock;
private readonly IClock _clock;
private readonly ShellSettings _shellSettings;
private readonly ConcurrentDictionary<string, DistributedLock> _locks;
private readonly Dictionary<string, DistributedLock> _locks;
private readonly TimeSpan _defaultRepeatInterval;
public DistributedLockService(
IMachineNameProvider machineNameProvider,
@@ -32,183 +33,182 @@ namespace Orchard.Tasks.Locking.Services {
_lifetimeScope = lifetimeScope;
_shellSettings = shellSettings;
_machineNameProvider = machineNameProvider;
_locks = new ConcurrentDictionary<string, DistributedLock>();
}
_locks = new Dictionary<string, DistributedLock>();
_defaultRepeatInterval = TimeSpan.FromMilliseconds(500);
}
public bool DisableMonitorLock { get; set; }
public bool TryAcquireLock(string name, TimeSpan? maxValidFor, TimeSpan? timeout, out IDistributedLock dLock) {
public bool TryAcquireLock(string name, TimeSpan? maxValidFor, TimeSpan? timeout, out IDistributedLock dLock) {
try {
dLock = AcquireLock(name, maxValidFor, timeout);
return dLock != null;
dLock = AcquireLockInternal(name, maxValidFor, timeout, throwOnTimeout: false);
if (dLock != null) {
Logger.Debug("Successfully acquired lock '{0}'.", name);
return true;
}
Logger.Warning("Failed to acquire lock '{0}' within the specified timeout ({1}).", name, timeout);
}
catch {
dLock = null;
return false;
catch (Exception ex) {
Logger.Error(ex, "Error while trying to acquire lock '{0}'.", name);
// TODO: Is it correct to not throw here? Should we instead ONLY swallow TimeoutException?
}
dLock = null;
return false;
}
public IDistributedLock AcquireLock(string name, TimeSpan? maxValidFor, TimeSpan? timeout) {
DistributedLock dLock = null;
try {
var acquired = Poll(() => (dLock = AcquireLockInternal(name, maxValidFor)) != null, timeout);
if (acquired)
Logger.Debug("Successfully acquired lock '{0}'.", name);
else
Logger.Debug("Failed to acquire lock '{0}' within the specified timeout.", name);
DistributedLock result = AcquireLockInternal(name, maxValidFor, timeout, throwOnTimeout: true);
Logger.Debug("Successfully acquired lock '{0}'.", name);
return result;
}
catch (Exception ex) {
Logger.Error(ex, "Error while trying to acquire lock '{0}'.", name);
throw;
}
if (dLock == null && timeout != null)
throw new TimeoutException(String.Format("Failed to acquire lock '{0}' within the specified timeout ({1}).", name, timeout));
return dLock;
}
public void ReleaseDistributedLock(DistributedLock dLock) {
try {
var record = GetDistributedLockRecordByName(dLock.Name);
if (record == null)
throw new OrchardException(T("No lock record could be found in the database for lock '{0}'.", dLock.Name));
private DistributedLock AcquireLockInternal(string name, TimeSpan? maxValidFor, TimeSpan? timeout, bool throwOnTimeout) {
var internalName = GetInternalLockName(name);
var monitorTimeout = timeout.HasValue ? timeout.Value : TimeSpan.FromMilliseconds(-1); // -1 ms is .NET magic number for "infinite".
var monitorObj = String.Intern(String.Format("{0}:{1}", _machineNameProvider.GetMachineName(), internalName));
TryCommitNewTransaction(repository => repository.Delete(record));
if (!Monitor.TryEnter(monitorObj, monitorTimeout)) {
Logger.Debug("Could not enter local monitor for lock '{0}' within the specified timeout ({1}).", internalName, timeout);
if (throwOnTimeout)
throw new TimeoutException(String.Format("Failed to acquire lock '{0}' within the specified timeout ({1}).", internalName, timeout));
return null;
}
catch (Exception ex) {
if (ex.IsFatal())
throw;
Logger.Error(ex, "An non-fatal error occurred while releasing lock '{0}'.", dLock.Name);
}
}
private DistributedLockRecord GetDistributedLockRecordByName(string name) {
DistributedLockRecord result = null;
TryCommitNewTransaction(repository => {
result = repository.Table.FirstOrDefault(x =>
x.Name == name
);
});
Logger.Debug("Successfully entered local monitor for lock '{0}'.", internalName);
return result;
}
private DistributedLockRecord GetValidDistributedLockRecordByName(string name) {
DistributedLockRecord result = null;
TryCommitNewTransaction(repository => {
result = repository.Table.FirstOrDefault(x =>
x.Name == name && (x.ValidUntilUtc == null || x.ValidUntilUtc >= _clock.UtcNow)
);
});
return result;
}
private DistributedLock AcquireLockInternal(string name, TimeSpan? maxValidFor) {
try {
name = GetTenantLockName(name);
if (!DisableMonitorLock && !Monitor.TryEnter(String.Intern(name)))
return null;
DistributedLock dLock = null;
// Return the existing lock in case of reentrancy.
if(!DisableMonitorLock && _locks.TryGetValue(name, out dLock)) {
// If there's already a distributed lock object in our dictionary, that means
// this acquisition is a reentrance. Use the existing lock object from the
// dictionary but increment its count.
if (_locks.TryGetValue(monitorObj, out dLock)) {
Logger.Debug("Current thread is re-entering lock '{0}'; incrementing count.", internalName);
dLock.Increment();
return dLock;
}
// Find an existing active lock, if any.
var record = GetValidDistributedLockRecordByName(name);
// The current owner name (based on machine name).
var canAcquireLock = false;
// Check if there's already an active lock.
if (record != null) {
// Check if the machine name assigned to the lock is the one trying to acquire it.
if (record.MachineName == _machineNameProvider.GetMachineName()) {
canAcquireLock = true;
}
}
else {
// No one has an active lock yet, so good to go.
record = new DistributedLockRecord {
Name = name,
MachineName = _machineNameProvider.GetMachineName(),
CreatedUtc = _clock.UtcNow,
ValidUntilUtc = maxValidFor != null ? _clock.UtcNow + maxValidFor : null
};
// No distributed lock object existed in our dictionary. Try to take ownership
// of database record until timeout expires, and if successful create a distributed
// lock object and add it to our dictionary.
var success = RepeatUntilTimeout(timeout, _defaultRepeatInterval, () => {
if (EnsureDistributedLockRecord(internalName, maxValidFor)) {
Logger.Debug("Record for lock '{0}' already owned by current machine or was successfully created; creating lock object.", internalName);
canAcquireLock = TryCommitNewTransaction(repository => {
repository.Create(record);
dLock = new DistributedLock(name, internalName, releaseLockAction: () => {
Monitor.Exit(monitorObj);
DeleteDistributedLockRecord(internalName);
});
_locks.Add(monitorObj, dLock);
return true;
}
return false;
});
}
if (!canAcquireLock) {
return null;
}
if (!success) {
Logger.Debug("Record for lock '{0}' could not be created for current machine within the specified timeout ({1}).", internalName, timeout);
dLock = new DistributedLock(this, name);
if (!DisableMonitorLock) {
_locks.TryAdd(name, dLock);
if (throwOnTimeout)
throw new TimeoutException(String.Format("Failed to acquire lock '{0}' within the specified timeout ({1}).", internalName, timeout));
return null;
}
}
return dLock;
}
catch (Exception ex) {
Monitor.Exit(String.Intern(name));
Monitor.Exit(monitorObj);
Logger.Error(ex, "An error occurred while trying to acquire a lock.");
Logger.Error(ex, "An error occurred while trying to acquire lock '{0}'.", internalName);
throw;
}
}
}
private string GetTenantLockName(string name) {
return _shellSettings.Name + ":" + name;
private bool EnsureDistributedLockRecord(string internalName, TimeSpan? maxValidFor) {
var localMachineName = _machineNameProvider.GetMachineName();
var hasLockRecord = false;
ExecuteOnSeparateTransaction(repository => {
// Try to find a valid lock record in the database.
var record = repository.Table.FirstOrDefault(x => x.Name == internalName && (x.ValidUntilUtc == null || x.ValidUntilUtc >= _clock.UtcNow));
if (record == null) {
// No record existed, so we're good to create a new one.
Logger.Debug("No valid record was found for lock '{0}'; creating a new record.", internalName);
repository.Create(new DistributedLockRecord {
Name = internalName,
MachineName = localMachineName,
CreatedUtc = _clock.UtcNow,
ValidUntilUtc = maxValidFor.HasValue ? _clock.UtcNow + maxValidFor.Value : default(DateTime?)
});
hasLockRecord = true;
}
else if (record.MachineName == localMachineName) {
// Existing lock was for correct machine name => lock record exists.
Logger.Debug("Found a valid record for lock '{0}' and current local machine name '{1}'.", internalName, localMachineName);
hasLockRecord = true;
}
});
return hasLockRecord;
}
/// <summary>
/// Executes the specified function until it returns true, for the specified amount of time, or indefinitely if no timeout was given.
/// </summary>
/// <param name="operation">The operation to repeatedly execute until it returns true.</param>
/// <param name="timeout">The amount of time to retry executing the function. If null is specified, the specified function is executed indefinitely until it returns true.</param>
/// <returns>Returns true if the specified function returned true within the specified timeout, false otherwise.</returns>
private bool Poll(Func<bool> operation, TimeSpan? timeout) {
private void DeleteDistributedLockRecord(string internalName) {
try {
ExecuteOnSeparateTransaction(repository => {
var record = repository.Table.FirstOrDefault(x => x.Name == internalName);
if (record == null)
throw new Exception(String.Format("No record could be found in the database for lock '{0}'.", internalName));
repository.Delete(record);
Logger.Debug("Successfully deleted record for lock '{0}'.", internalName);
});
}
catch (Exception ex) {
if (ex.IsFatal())
throw;
Logger.Warning(ex, "An error occurred while deleting record for lock '{0}'.", internalName);
}
}
private bool RepeatUntilTimeout(TimeSpan? timeout, TimeSpan repeatInterval, Func<bool> action) {
bool success;
var waitedTime = TimeSpan.Zero;
var waitTime = TimeSpan.FromMilliseconds(timeout.GetValueOrDefault().TotalMilliseconds / 10);
bool acquired;
while (!(acquired = operation()) && (timeout == null || waitedTime < timeout.Value)) {
Task.Delay(waitTime).ContinueWith(t => {
waitedTime += waitTime;
}).Wait();
while (!(success = action()) && (!timeout.HasValue || waitedTime < timeout.Value)) {
Task.Delay(repeatInterval).Wait();
waitedTime += repeatInterval;
}
return acquired;
return success;
}
private bool TryCommitNewTransaction(Action<IRepository<DistributedLockRecord>> action) {
private void ExecuteOnSeparateTransaction(Action<IRepository<DistributedLockRecord>> action) {
if (action == null)
throw new ArgumentNullException();
try {
using (var childLifetimeScope = _lifetimeScope.BeginLifetimeScope()) {
var repository = childLifetimeScope.Resolve<IRepository<DistributedLockRecord>>();
var transactionManager = childLifetimeScope.Resolve<ITransactionManager>();
transactionManager.RequireNew(IsolationLevel.ReadCommitted);
action(repository);
}
using (var childLifetimeScope = _lifetimeScope.BeginLifetimeScope()) {
var repository = childLifetimeScope.Resolve<IRepository<DistributedLockRecord>>();
var transactionManager = childLifetimeScope.Resolve<ITransactionManager>();
transactionManager.RequireNew(IsolationLevel.ReadCommitted);
action(repository);
}
}
return true;
}
catch {
return false;
}
private string GetInternalLockName(string name) {
// Prefix the requested lock name by a constant and the tenant name.
return String.Format("DistributedLock:{0}:{1}", _shellSettings.Name, name);
}
}
}