Removed redundant thread synchronization code from modules which now use distributed locks.

This commit is contained in:
Daniel Stolt
2015-09-06 21:41:30 +02:00
parent 86b9f35454
commit 49c772bf25
3 changed files with 132 additions and 148 deletions

View File

@@ -1,6 +1,5 @@
using System; using System;
using System.Linq; using System.Linq;
using System.Threading;
using Orchard.AuditTrail.Models; using Orchard.AuditTrail.Models;
using Orchard.ContentManagement; using Orchard.ContentManagement;
using Orchard.Environment.Extensions; using Orchard.Environment.Extensions;
@@ -13,7 +12,6 @@ using Orchard.Tasks.Locking.Services;
namespace Orchard.AuditTrail.Services { namespace Orchard.AuditTrail.Services {
[OrchardFeature("Orchard.AuditTrail.Trimming")] [OrchardFeature("Orchard.AuditTrail.Trimming")]
public class AuditTrailTrimmingBackgroundTask : Component, IBackgroundTask { public class AuditTrailTrimmingBackgroundTask : Component, IBackgroundTask {
private static readonly object _sweepLock = new object();
private readonly ISiteService _siteService; private readonly ISiteService _siteService;
private readonly IClock _clock; private readonly IClock _clock;
private readonly IAuditTrailManager _auditTrailManager; private readonly IAuditTrailManager _auditTrailManager;
@@ -36,33 +34,32 @@ namespace Orchard.AuditTrail.Services {
} }
public void Sweep() { public void Sweep() {
if (Monitor.TryEnter(_sweepLock)) { Logger.Debug("Beginning sweep.");
try {
Logger.Debug("Beginning sweep.");
// Only allow this task to run on one farm node at a time. try {
IDistributedLock @lock; // Only allow this task to run on one farm node at a time.
if (_distributedLockService.TryAcquireLock(GetType().FullName, TimeSpan.FromHours(1), out @lock)) { IDistributedLock @lock;
using (@lock) { if (_distributedLockService.TryAcquireLock(GetType().FullName, TimeSpan.FromHours(1), out @lock)) {
using (@lock) {
// We don't need to check the audit trail for events to remove every minute. Let's stick with twice a day. // We don't need to check the audit trail for events to remove every minute. Let's stick with twice a day.
if (!GetIsTimeToTrim()) if (!GetIsTimeToTrim())
return; return;
Logger.Debug("Starting audit trail trimming."); Logger.Debug("Starting audit trail trimming.");
var deletedRecords = _auditTrailManager.Trim(TimeSpan.FromDays(Settings.RetentionPeriod)); var deletedRecords = _auditTrailManager.Trim(TimeSpan.FromDays(Settings.RetentionPeriod));
Logger.Debug("Audit trail trimming completed. {0} records were deleted.", deletedRecords.Count()); Logger.Debug("Audit trail trimming completed. {0} records were deleted.", deletedRecords.Count());
Settings.LastRunUtc = _clock.UtcNow; Settings.LastRunUtc = _clock.UtcNow;
}
} }
} }
catch (Exception ex) { else
Logger.Error(ex, "Error during sweep."); Logger.Debug("Distributed lock could not be acquired; going back to sleep.");
} }
finally { catch (Exception ex) {
Monitor.Exit(_sweepLock); Logger.Error(ex, "Error during sweep.");
Logger.Debug("Ending sweep."); }
} finally {
Logger.Debug("Ending sweep.");
} }
} }

View File

@@ -2,7 +2,6 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Text; using System.Text;
using System.Threading;
using Microsoft.WindowsAzure.MediaServices.Client; using Microsoft.WindowsAzure.MediaServices.Client;
using Orchard.Azure.MediaServices.Helpers; using Orchard.Azure.MediaServices.Helpers;
using Orchard.Azure.MediaServices.Models; using Orchard.Azure.MediaServices.Models;
@@ -18,8 +17,6 @@ using Orchard.Tasks.Locking.Services;
namespace Orchard.Azure.MediaServices.Services.Jobs { namespace Orchard.Azure.MediaServices.Services.Jobs {
public class JobProcessor : Component, IBackgroundTask { public class JobProcessor : Component, IBackgroundTask {
private static readonly object _sweepLock = new object();
private readonly IWamsClient _wamsClient; private readonly IWamsClient _wamsClient;
private readonly IAssetManager _assetManager; private readonly IAssetManager _assetManager;
private readonly IJobManager _jobManager; private readonly IJobManager _jobManager;
@@ -41,126 +38,125 @@ namespace Orchard.Azure.MediaServices.Services.Jobs {
} }
public void Sweep() { public void Sweep() {
if (Monitor.TryEnter(_sweepLock)) { Logger.Debug("Beginning sweep.");
try {
Logger.Debug("Beginning sweep.");
if (!_orchardServices.WorkContext.CurrentSite.As<CloudMediaSettingsPart>().IsValid()) { try {
Logger.Debug("Settings are invalid; going back to sleep."); if (!_orchardServices.WorkContext.CurrentSite.As<CloudMediaSettingsPart>().IsValid()) {
return; Logger.Debug("Settings are invalid; going back to sleep.");
} return;
}
// Only allow this task to run on one farm node at a time. // Only allow this task to run on one farm node at a time.
IDistributedLock @lock; IDistributedLock @lock;
if (_distributedLockService.TryAcquireLock(GetType().FullName, TimeSpan.FromHours(1), out @lock)) { if (_distributedLockService.TryAcquireLock(GetType().FullName, TimeSpan.FromHours(1), out @lock)) {
using (@lock) { using (@lock) {
var jobs = _jobManager.GetActiveJobs().ToDictionary(job => job.WamsJobId); var jobs = _jobManager.GetActiveJobs().ToDictionary(job => job.WamsJobId);
if (!jobs.Any()) { if (!jobs.Any()) {
Logger.Debug("No open jobs were found; going back to sleep."); Logger.Debug("No open jobs were found; going back to sleep.");
return; return;
}
Logger.Information("Beginning processing of {0} open jobs.", jobs.Count());
var wamsJobs = _wamsClient.GetJobsById(jobs.Keys);
foreach (var wamsJob in wamsJobs) {
Logger.Information("Processing job '{0}'...", wamsJob.Name);
var job = jobs[wamsJob.Id];
var tasks = job.Tasks.ToDictionary(task => task.WamsTaskId);
var wamsTasks = wamsJob.Tasks.ToArray();
foreach (var wamsTask in wamsTasks) {
var task = tasks[wamsTask.Id];
task.Status = MapWamsJobState(wamsTask.State);
task.PercentComplete = (int)wamsTask.Progress;
} }
Logger.Information("Beginning processing of {0} open jobs.", jobs.Count()); var previousStatus = job.Status;
var wamsJobErrors = HarvestWamsJobErrors(wamsJob).ToArray();
var wamsJobs = _wamsClient.GetJobsById(jobs.Keys); job.CreatedUtc = wamsJob.Created;
job.StartedUtc = wamsJob.StartTime;
job.FinishedUtc = wamsJob.EndTime;
job.Status = MapWamsJobState(wamsJob.State);
job.ErrorMessage = GetAggregateErrorMessage(wamsJobErrors);
foreach (var wamsJob in wamsJobs) { LogWamsJobErrors(wamsJobErrors);
Logger.Information("Processing job '{0}'...", wamsJob.Name);
var job = jobs[wamsJob.Id]; if (job.Status != previousStatus) {
var tasks = job.Tasks.ToDictionary(task => task.WamsTaskId); if (job.Status == JobStatus.Finished) {
var wamsTasks = wamsJob.Tasks.ToArray(); Logger.Information("Job '{0}' was finished in WAMS; creating locators.", wamsJob.Name);
foreach (var wamsTask in wamsTasks) { var lastTask = job.Tasks.Last();
var task = tasks[wamsTask.Id]; var lastWamsTask = wamsTasks.Single(task => task.Id == lastTask.WamsTaskId);
task.Status = MapWamsJobState(wamsTask.State); var outputAsset = lastWamsTask.OutputAssets.First();
task.PercentComplete = (int)wamsTask.Progress; var outputAssetName = !String.IsNullOrWhiteSpace(job.OutputAssetName) ? job.OutputAssetName : lastWamsTask.Name;
} var outputAssetDescription = job.OutputAssetDescription.TrimSafe();
var encoderMetadataXml = _wamsClient.GetEncoderMetadataXml(outputAsset).Result;
var cloudVideoPart = job.CloudVideoPart;
var wamsLocators = _wamsClient.CreateLocatorsAsync(outputAsset, WamsLocatorCategory.Private).Result;
var previousStatus = job.Status; // HACK: Temporary workaround to disable dynamic packaging for VC1-based assets. In future versions
var wamsJobErrors = HarvestWamsJobErrors(wamsJob).ToArray(); // this will be implemented more robustly by testing all the dynamic URLs to see which ones work
// and only store and use the working ones.
var forceNonDynamicAsset = lastWamsTask.Configuration.StartsWith("VC1");
job.CreatedUtc = wamsJob.Created; if (wamsLocators.OnDemandLocator != null && !forceNonDynamicAsset) {
job.StartedUtc = wamsJob.StartTime; _assetManager.CreateAssetFor<DynamicVideoAsset>(cloudVideoPart, asset => {
job.FinishedUtc = wamsJob.EndTime; asset.IncludeInPlayer = true;
job.Status = MapWamsJobState(wamsJob.State); asset.Name = outputAssetName;
job.ErrorMessage = GetAggregateErrorMessage(wamsJobErrors); asset.Description = outputAssetDescription;
asset.EncodingPreset = lastTask.HarvestAssetName;
asset.WamsPrivateLocatorId = wamsLocators.SasLocator.Id;
asset.WamsPrivateLocatorUrl = wamsLocators.SasLocator.Url;
asset.WamsPrivateOnDemandLocatorId = wamsLocators.OnDemandLocator.Id;
asset.WamsPrivateOnDemandLocatorUrl = wamsLocators.OnDemandLocator.Url;
asset.WamsManifestFilename = wamsLocators.OnDemandManifestFilename;
asset.WamsAssetId = outputAsset.Id;
asset.WamsEncoderMetadataXml = encoderMetadataXml;
asset.UploadState.Status = AssetUploadStatus.Uploaded;
asset.PublishState.Status = AssetPublishStatus.None;
});
}
else {
_assetManager.CreateAssetFor<VideoAsset>(cloudVideoPart, asset => {
asset.IncludeInPlayer = true;
asset.Name = outputAssetName;
asset.Description = outputAssetDescription;
asset.EncodingPreset = lastTask.HarvestAssetName;
asset.WamsPrivateLocatorId = wamsLocators.SasLocator.Id;
asset.WamsPrivateLocatorUrl = wamsLocators.SasLocator.Url;
asset.WamsAssetId = outputAsset.Id;
asset.WamsEncoderMetadataXml = encoderMetadataXml;
asset.UploadState.Status = AssetUploadStatus.Uploaded;
asset.PublishState.Status = AssetPublishStatus.None;
});
}
LogWamsJobErrors(wamsJobErrors); try {
if (cloudVideoPart.IsPublished())
if (job.Status != previousStatus) { _assetManager.PublishAssetsFor(cloudVideoPart);
if (job.Status == JobStatus.Finished) { }
Logger.Information("Job '{0}' was finished in WAMS; creating locators.", wamsJob.Name); catch (Exception ex) {
Logger.Warning(ex, "Processing of job '{0}' was completed but an error occurred while publishing the cloud video item with ID {1} after processing.", wamsJob.Name, cloudVideoPart.Id);
var lastTask = job.Tasks.Last();
var lastWamsTask = wamsTasks.Single(task => task.Id == lastTask.WamsTaskId);
var outputAsset = lastWamsTask.OutputAssets.First();
var outputAssetName = !String.IsNullOrWhiteSpace(job.OutputAssetName) ? job.OutputAssetName : lastWamsTask.Name;
var outputAssetDescription = job.OutputAssetDescription.TrimSafe();
var encoderMetadataXml = _wamsClient.GetEncoderMetadataXml(outputAsset).Result;
var cloudVideoPart = job.CloudVideoPart;
var wamsLocators = _wamsClient.CreateLocatorsAsync(outputAsset, WamsLocatorCategory.Private).Result;
// HACK: Temporary workaround to disable dynamic packaging for VC1-based assets. In future versions
// this will be implemented more robustly by testing all the dynamic URLs to see which ones work
// and only store and use the working ones.
var forceNonDynamicAsset = lastWamsTask.Configuration.StartsWith("VC1");
if (wamsLocators.OnDemandLocator != null && !forceNonDynamicAsset) {
_assetManager.CreateAssetFor<DynamicVideoAsset>(cloudVideoPart, asset => {
asset.IncludeInPlayer = true;
asset.Name = outputAssetName;
asset.Description = outputAssetDescription;
asset.EncodingPreset = lastTask.HarvestAssetName;
asset.WamsPrivateLocatorId = wamsLocators.SasLocator.Id;
asset.WamsPrivateLocatorUrl = wamsLocators.SasLocator.Url;
asset.WamsPrivateOnDemandLocatorId = wamsLocators.OnDemandLocator.Id;
asset.WamsPrivateOnDemandLocatorUrl = wamsLocators.OnDemandLocator.Url;
asset.WamsManifestFilename = wamsLocators.OnDemandManifestFilename;
asset.WamsAssetId = outputAsset.Id;
asset.WamsEncoderMetadataXml = encoderMetadataXml;
asset.UploadState.Status = AssetUploadStatus.Uploaded;
asset.PublishState.Status = AssetPublishStatus.None;
});
}
else {
_assetManager.CreateAssetFor<VideoAsset>(cloudVideoPart, asset => {
asset.IncludeInPlayer = true;
asset.Name = outputAssetName;
asset.Description = outputAssetDescription;
asset.EncodingPreset = lastTask.HarvestAssetName;
asset.WamsPrivateLocatorId = wamsLocators.SasLocator.Id;
asset.WamsPrivateLocatorUrl = wamsLocators.SasLocator.Url;
asset.WamsAssetId = outputAsset.Id;
asset.WamsEncoderMetadataXml = encoderMetadataXml;
asset.UploadState.Status = AssetUploadStatus.Uploaded;
asset.PublishState.Status = AssetPublishStatus.None;
});
}
try {
if (cloudVideoPart.IsPublished())
_assetManager.PublishAssetsFor(cloudVideoPart);
}
catch (Exception ex) {
Logger.Warning(ex, "Processing of job '{0}' was completed but an error occurred while publishing the cloud video item with ID {1} after processing.", wamsJob.Name, cloudVideoPart.Id);
}
} }
} }
Logger.Information("Processing of job '{0}' was successfully completed.", wamsJob.Name);
} }
Logger.Information("Processing of job '{0}' was successfully completed.", wamsJob.Name);
} }
} }
} }
catch (Exception ex) { else
Logger.Error(ex, "Error during sweep."); Logger.Debug("Distributed lock could not be acquired; going back to sleep.");
} }
finally { catch (Exception ex) {
Monitor.Exit(_sweepLock); Logger.Error(ex, "Error during sweep.");
Logger.Debug("Ending sweep."); }
} finally {
Logger.Debug("Ending sweep.");
} }
} }

View File

@@ -1,7 +1,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading;
using Newtonsoft.Json.Linq; using Newtonsoft.Json.Linq;
using Orchard.Environment; using Orchard.Environment;
using Orchard.Events; using Orchard.Events;
@@ -13,12 +12,11 @@ namespace Orchard.JobsQueue.Services {
public class JobsQueueProcessor : IJobsQueueProcessor { public class JobsQueueProcessor : IJobsQueueProcessor {
private readonly Work<IJobsQueueManager> _jobsQueueManager; private readonly Work<IJobsQueueManager> _jobsQueueManager;
private readonly Work<IEventBus> _eventBus; private readonly Work<IEventBus> _eventBus;
private readonly ReaderWriterLockSlim _rwl = new ReaderWriterLockSlim();
private readonly IDistributedLockService _distributedLockService; private readonly IDistributedLockService _distributedLockService;
public JobsQueueProcessor( public JobsQueueProcessor(
Work<IJobsQueueManager> jobsQueueManager, Work<IJobsQueueManager> jobsQueueManager,
Work<IEventBus> eventBus, Work<IEventBus> eventBus,
IDistributedLockService distributedLockService) { IDistributedLockService distributedLockService) {
_jobsQueueManager = jobsQueueManager; _jobsQueueManager = jobsQueueManager;
@@ -28,26 +26,19 @@ namespace Orchard.JobsQueue.Services {
} }
public ILogger Logger { get; set; } public ILogger Logger { get; set; }
public void ProcessQueue() {
// prevent two threads on the same machine to process the message queue
if (_rwl.TryEnterWriteLock(0)) {
try {
IDistributedLock @lock;
if(_distributedLockService.TryAcquireLock(GetType().FullName, TimeSpan.FromMinutes(5), out @lock)){
using (@lock) {
IEnumerable<QueuedJobRecord> messages;
while ((messages = _jobsQueueManager.Value.GetJobs(0, 10).ToArray()).Any()) { public void ProcessQueue() {
foreach (var message in messages) { IDistributedLock @lock;
ProcessMessage(message); if (_distributedLockService.TryAcquireLock(GetType().FullName, TimeSpan.FromMinutes(5), out @lock)) {
} using (@lock) {
} IEnumerable<QueuedJobRecord> messages;
while ((messages = _jobsQueueManager.Value.GetJobs(0, 10).ToArray()).Any()) {
foreach (var message in messages) {
ProcessMessage(message);
} }
} }
} }
finally {
_rwl.ExitWriteLock();
}
} }
} }