From 49c772bf257954026d192bf125073f69a64bd888 Mon Sep 17 00:00:00 2001 From: Daniel Stolt Date: Sun, 6 Sep 2015 21:41:30 +0200 Subject: [PATCH] Removed redundant thread synchronization code from modules which now use distributed locks. --- .../AuditTrailTrimmingBackgroundTask.cs | 45 ++-- .../Services/Jobs/JobProcessor.cs | 206 +++++++++--------- .../Services/JobsQueueProcessor.cs | 29 +-- 3 files changed, 132 insertions(+), 148 deletions(-) diff --git a/src/Orchard.Web/Modules/Orchard.AuditTrail/Services/AuditTrailTrimmingBackgroundTask.cs b/src/Orchard.Web/Modules/Orchard.AuditTrail/Services/AuditTrailTrimmingBackgroundTask.cs index 846c35827..8c142ec6a 100644 --- a/src/Orchard.Web/Modules/Orchard.AuditTrail/Services/AuditTrailTrimmingBackgroundTask.cs +++ b/src/Orchard.Web/Modules/Orchard.AuditTrail/Services/AuditTrailTrimmingBackgroundTask.cs @@ -1,6 +1,5 @@ using System; using System.Linq; -using System.Threading; using Orchard.AuditTrail.Models; using Orchard.ContentManagement; using Orchard.Environment.Extensions; @@ -13,7 +12,6 @@ using Orchard.Tasks.Locking.Services; namespace Orchard.AuditTrail.Services { [OrchardFeature("Orchard.AuditTrail.Trimming")] public class AuditTrailTrimmingBackgroundTask : Component, IBackgroundTask { - private static readonly object _sweepLock = new object(); private readonly ISiteService _siteService; private readonly IClock _clock; private readonly IAuditTrailManager _auditTrailManager; @@ -36,33 +34,32 @@ namespace Orchard.AuditTrail.Services { } public void Sweep() { - if (Monitor.TryEnter(_sweepLock)) { - try { - Logger.Debug("Beginning sweep."); + Logger.Debug("Beginning sweep."); - // Only allow this task to run on one farm node at a time. - IDistributedLock @lock; - if (_distributedLockService.TryAcquireLock(GetType().FullName, TimeSpan.FromHours(1), out @lock)) { - using (@lock) { + try { + // Only allow this task to run on one farm node at a time. + IDistributedLock @lock; + if (_distributedLockService.TryAcquireLock(GetType().FullName, TimeSpan.FromHours(1), out @lock)) { + using (@lock) { - // We don't need to check the audit trail for events to remove every minute. Let's stick with twice a day. - if (!GetIsTimeToTrim()) - return; + // We don't need to check the audit trail for events to remove every minute. Let's stick with twice a day. + if (!GetIsTimeToTrim()) + return; - Logger.Debug("Starting audit trail trimming."); - var deletedRecords = _auditTrailManager.Trim(TimeSpan.FromDays(Settings.RetentionPeriod)); - Logger.Debug("Audit trail trimming completed. {0} records were deleted.", deletedRecords.Count()); - Settings.LastRunUtc = _clock.UtcNow; - } + Logger.Debug("Starting audit trail trimming."); + var deletedRecords = _auditTrailManager.Trim(TimeSpan.FromDays(Settings.RetentionPeriod)); + Logger.Debug("Audit trail trimming completed. {0} records were deleted.", deletedRecords.Count()); + Settings.LastRunUtc = _clock.UtcNow; } } - catch (Exception ex) { - Logger.Error(ex, "Error during sweep."); - } - finally { - Monitor.Exit(_sweepLock); - Logger.Debug("Ending sweep."); - } + else + Logger.Debug("Distributed lock could not be acquired; going back to sleep."); + } + catch (Exception ex) { + Logger.Error(ex, "Error during sweep."); + } + finally { + Logger.Debug("Ending sweep."); } } diff --git a/src/Orchard.Web/Modules/Orchard.Azure.MediaServices/Services/Jobs/JobProcessor.cs b/src/Orchard.Web/Modules/Orchard.Azure.MediaServices/Services/Jobs/JobProcessor.cs index 4912cdde9..4fd8487fe 100644 --- a/src/Orchard.Web/Modules/Orchard.Azure.MediaServices/Services/Jobs/JobProcessor.cs +++ b/src/Orchard.Web/Modules/Orchard.Azure.MediaServices/Services/Jobs/JobProcessor.cs @@ -2,7 +2,6 @@ using System.Collections.Generic; using System.Linq; using System.Text; -using System.Threading; using Microsoft.WindowsAzure.MediaServices.Client; using Orchard.Azure.MediaServices.Helpers; using Orchard.Azure.MediaServices.Models; @@ -18,8 +17,6 @@ using Orchard.Tasks.Locking.Services; namespace Orchard.Azure.MediaServices.Services.Jobs { public class JobProcessor : Component, IBackgroundTask { - private static readonly object _sweepLock = new object(); - private readonly IWamsClient _wamsClient; private readonly IAssetManager _assetManager; private readonly IJobManager _jobManager; @@ -41,126 +38,125 @@ namespace Orchard.Azure.MediaServices.Services.Jobs { } public void Sweep() { - if (Monitor.TryEnter(_sweepLock)) { - try { - Logger.Debug("Beginning sweep."); + Logger.Debug("Beginning sweep."); - if (!_orchardServices.WorkContext.CurrentSite.As().IsValid()) { - Logger.Debug("Settings are invalid; going back to sleep."); - return; - } + try { + if (!_orchardServices.WorkContext.CurrentSite.As().IsValid()) { + Logger.Debug("Settings are invalid; going back to sleep."); + return; + } - // Only allow this task to run on one farm node at a time. - IDistributedLock @lock; - if (_distributedLockService.TryAcquireLock(GetType().FullName, TimeSpan.FromHours(1), out @lock)) { - using (@lock) { - var jobs = _jobManager.GetActiveJobs().ToDictionary(job => job.WamsJobId); + // Only allow this task to run on one farm node at a time. + IDistributedLock @lock; + if (_distributedLockService.TryAcquireLock(GetType().FullName, TimeSpan.FromHours(1), out @lock)) { + using (@lock) { + var jobs = _jobManager.GetActiveJobs().ToDictionary(job => job.WamsJobId); - if (!jobs.Any()) { - Logger.Debug("No open jobs were found; going back to sleep."); - return; + if (!jobs.Any()) { + Logger.Debug("No open jobs were found; going back to sleep."); + return; + } + + Logger.Information("Beginning processing of {0} open jobs.", jobs.Count()); + + var wamsJobs = _wamsClient.GetJobsById(jobs.Keys); + + foreach (var wamsJob in wamsJobs) { + Logger.Information("Processing job '{0}'...", wamsJob.Name); + + var job = jobs[wamsJob.Id]; + var tasks = job.Tasks.ToDictionary(task => task.WamsTaskId); + var wamsTasks = wamsJob.Tasks.ToArray(); + + foreach (var wamsTask in wamsTasks) { + var task = tasks[wamsTask.Id]; + task.Status = MapWamsJobState(wamsTask.State); + task.PercentComplete = (int)wamsTask.Progress; } - Logger.Information("Beginning processing of {0} open jobs.", jobs.Count()); + var previousStatus = job.Status; + var wamsJobErrors = HarvestWamsJobErrors(wamsJob).ToArray(); - var wamsJobs = _wamsClient.GetJobsById(jobs.Keys); + job.CreatedUtc = wamsJob.Created; + job.StartedUtc = wamsJob.StartTime; + job.FinishedUtc = wamsJob.EndTime; + job.Status = MapWamsJobState(wamsJob.State); + job.ErrorMessage = GetAggregateErrorMessage(wamsJobErrors); - foreach (var wamsJob in wamsJobs) { - Logger.Information("Processing job '{0}'...", wamsJob.Name); + LogWamsJobErrors(wamsJobErrors); - var job = jobs[wamsJob.Id]; - var tasks = job.Tasks.ToDictionary(task => task.WamsTaskId); - var wamsTasks = wamsJob.Tasks.ToArray(); + if (job.Status != previousStatus) { + if (job.Status == JobStatus.Finished) { + Logger.Information("Job '{0}' was finished in WAMS; creating locators.", wamsJob.Name); - foreach (var wamsTask in wamsTasks) { - var task = tasks[wamsTask.Id]; - task.Status = MapWamsJobState(wamsTask.State); - task.PercentComplete = (int)wamsTask.Progress; - } + var lastTask = job.Tasks.Last(); + var lastWamsTask = wamsTasks.Single(task => task.Id == lastTask.WamsTaskId); + var outputAsset = lastWamsTask.OutputAssets.First(); + var outputAssetName = !String.IsNullOrWhiteSpace(job.OutputAssetName) ? job.OutputAssetName : lastWamsTask.Name; + var outputAssetDescription = job.OutputAssetDescription.TrimSafe(); + var encoderMetadataXml = _wamsClient.GetEncoderMetadataXml(outputAsset).Result; + var cloudVideoPart = job.CloudVideoPart; + var wamsLocators = _wamsClient.CreateLocatorsAsync(outputAsset, WamsLocatorCategory.Private).Result; - var previousStatus = job.Status; - var wamsJobErrors = HarvestWamsJobErrors(wamsJob).ToArray(); + // HACK: Temporary workaround to disable dynamic packaging for VC1-based assets. In future versions + // this will be implemented more robustly by testing all the dynamic URLs to see which ones work + // and only store and use the working ones. + var forceNonDynamicAsset = lastWamsTask.Configuration.StartsWith("VC1"); - job.CreatedUtc = wamsJob.Created; - job.StartedUtc = wamsJob.StartTime; - job.FinishedUtc = wamsJob.EndTime; - job.Status = MapWamsJobState(wamsJob.State); - job.ErrorMessage = GetAggregateErrorMessage(wamsJobErrors); + if (wamsLocators.OnDemandLocator != null && !forceNonDynamicAsset) { + _assetManager.CreateAssetFor(cloudVideoPart, asset => { + asset.IncludeInPlayer = true; + asset.Name = outputAssetName; + asset.Description = outputAssetDescription; + asset.EncodingPreset = lastTask.HarvestAssetName; + asset.WamsPrivateLocatorId = wamsLocators.SasLocator.Id; + asset.WamsPrivateLocatorUrl = wamsLocators.SasLocator.Url; + asset.WamsPrivateOnDemandLocatorId = wamsLocators.OnDemandLocator.Id; + asset.WamsPrivateOnDemandLocatorUrl = wamsLocators.OnDemandLocator.Url; + asset.WamsManifestFilename = wamsLocators.OnDemandManifestFilename; + asset.WamsAssetId = outputAsset.Id; + asset.WamsEncoderMetadataXml = encoderMetadataXml; + asset.UploadState.Status = AssetUploadStatus.Uploaded; + asset.PublishState.Status = AssetPublishStatus.None; + }); + } + else { + _assetManager.CreateAssetFor(cloudVideoPart, asset => { + asset.IncludeInPlayer = true; + asset.Name = outputAssetName; + asset.Description = outputAssetDescription; + asset.EncodingPreset = lastTask.HarvestAssetName; + asset.WamsPrivateLocatorId = wamsLocators.SasLocator.Id; + asset.WamsPrivateLocatorUrl = wamsLocators.SasLocator.Url; + asset.WamsAssetId = outputAsset.Id; + asset.WamsEncoderMetadataXml = encoderMetadataXml; + asset.UploadState.Status = AssetUploadStatus.Uploaded; + asset.PublishState.Status = AssetPublishStatus.None; + }); + } - LogWamsJobErrors(wamsJobErrors); - - if (job.Status != previousStatus) { - if (job.Status == JobStatus.Finished) { - Logger.Information("Job '{0}' was finished in WAMS; creating locators.", wamsJob.Name); - - var lastTask = job.Tasks.Last(); - var lastWamsTask = wamsTasks.Single(task => task.Id == lastTask.WamsTaskId); - var outputAsset = lastWamsTask.OutputAssets.First(); - var outputAssetName = !String.IsNullOrWhiteSpace(job.OutputAssetName) ? job.OutputAssetName : lastWamsTask.Name; - var outputAssetDescription = job.OutputAssetDescription.TrimSafe(); - var encoderMetadataXml = _wamsClient.GetEncoderMetadataXml(outputAsset).Result; - var cloudVideoPart = job.CloudVideoPart; - var wamsLocators = _wamsClient.CreateLocatorsAsync(outputAsset, WamsLocatorCategory.Private).Result; - - // HACK: Temporary workaround to disable dynamic packaging for VC1-based assets. In future versions - // this will be implemented more robustly by testing all the dynamic URLs to see which ones work - // and only store and use the working ones. - var forceNonDynamicAsset = lastWamsTask.Configuration.StartsWith("VC1"); - - if (wamsLocators.OnDemandLocator != null && !forceNonDynamicAsset) { - _assetManager.CreateAssetFor(cloudVideoPart, asset => { - asset.IncludeInPlayer = true; - asset.Name = outputAssetName; - asset.Description = outputAssetDescription; - asset.EncodingPreset = lastTask.HarvestAssetName; - asset.WamsPrivateLocatorId = wamsLocators.SasLocator.Id; - asset.WamsPrivateLocatorUrl = wamsLocators.SasLocator.Url; - asset.WamsPrivateOnDemandLocatorId = wamsLocators.OnDemandLocator.Id; - asset.WamsPrivateOnDemandLocatorUrl = wamsLocators.OnDemandLocator.Url; - asset.WamsManifestFilename = wamsLocators.OnDemandManifestFilename; - asset.WamsAssetId = outputAsset.Id; - asset.WamsEncoderMetadataXml = encoderMetadataXml; - asset.UploadState.Status = AssetUploadStatus.Uploaded; - asset.PublishState.Status = AssetPublishStatus.None; - }); - } - else { - _assetManager.CreateAssetFor(cloudVideoPart, asset => { - asset.IncludeInPlayer = true; - asset.Name = outputAssetName; - asset.Description = outputAssetDescription; - asset.EncodingPreset = lastTask.HarvestAssetName; - asset.WamsPrivateLocatorId = wamsLocators.SasLocator.Id; - asset.WamsPrivateLocatorUrl = wamsLocators.SasLocator.Url; - asset.WamsAssetId = outputAsset.Id; - asset.WamsEncoderMetadataXml = encoderMetadataXml; - asset.UploadState.Status = AssetUploadStatus.Uploaded; - asset.PublishState.Status = AssetPublishStatus.None; - }); - } - - try { - if (cloudVideoPart.IsPublished()) - _assetManager.PublishAssetsFor(cloudVideoPart); - } - catch (Exception ex) { - Logger.Warning(ex, "Processing of job '{0}' was completed but an error occurred while publishing the cloud video item with ID {1} after processing.", wamsJob.Name, cloudVideoPart.Id); - } + try { + if (cloudVideoPart.IsPublished()) + _assetManager.PublishAssetsFor(cloudVideoPart); + } + catch (Exception ex) { + Logger.Warning(ex, "Processing of job '{0}' was completed but an error occurred while publishing the cloud video item with ID {1} after processing.", wamsJob.Name, cloudVideoPart.Id); } } - - Logger.Information("Processing of job '{0}' was successfully completed.", wamsJob.Name); } + + Logger.Information("Processing of job '{0}' was successfully completed.", wamsJob.Name); } } } - catch (Exception ex) { - Logger.Error(ex, "Error during sweep."); - } - finally { - Monitor.Exit(_sweepLock); - Logger.Debug("Ending sweep."); - } + else + Logger.Debug("Distributed lock could not be acquired; going back to sleep."); + } + catch (Exception ex) { + Logger.Error(ex, "Error during sweep."); + } + finally { + Logger.Debug("Ending sweep."); } } diff --git a/src/Orchard.Web/Modules/Orchard.JobsQueue/Services/JobsQueueProcessor.cs b/src/Orchard.Web/Modules/Orchard.JobsQueue/Services/JobsQueueProcessor.cs index dc9cb6438..74bec9bb6 100644 --- a/src/Orchard.Web/Modules/Orchard.JobsQueue/Services/JobsQueueProcessor.cs +++ b/src/Orchard.Web/Modules/Orchard.JobsQueue/Services/JobsQueueProcessor.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Threading; using Newtonsoft.Json.Linq; using Orchard.Environment; using Orchard.Events; @@ -13,12 +12,11 @@ namespace Orchard.JobsQueue.Services { public class JobsQueueProcessor : IJobsQueueProcessor { private readonly Work _jobsQueueManager; private readonly Work _eventBus; - private readonly ReaderWriterLockSlim _rwl = new ReaderWriterLockSlim(); private readonly IDistributedLockService _distributedLockService; public JobsQueueProcessor( Work jobsQueueManager, - Work eventBus, + Work eventBus, IDistributedLockService distributedLockService) { _jobsQueueManager = jobsQueueManager; @@ -28,26 +26,19 @@ namespace Orchard.JobsQueue.Services { } public ILogger Logger { get; set; } - public void ProcessQueue() { - // prevent two threads on the same machine to process the message queue - if (_rwl.TryEnterWriteLock(0)) { - try { - IDistributedLock @lock; - if(_distributedLockService.TryAcquireLock(GetType().FullName, TimeSpan.FromMinutes(5), out @lock)){ - using (@lock) { - IEnumerable messages; - while ((messages = _jobsQueueManager.Value.GetJobs(0, 10).ToArray()).Any()) { - foreach (var message in messages) { - ProcessMessage(message); - } - } + public void ProcessQueue() { + IDistributedLock @lock; + if (_distributedLockService.TryAcquireLock(GetType().FullName, TimeSpan.FromMinutes(5), out @lock)) { + using (@lock) { + IEnumerable messages; + + while ((messages = _jobsQueueManager.Value.GetJobs(0, 10).ToArray()).Any()) { + foreach (var message in messages) { + ProcessMessage(message); } } } - finally { - _rwl.ExitWriteLock(); - } } }