diff --git a/src/Orchard.Web/Modules/Orchard.AuditTrail/Services/AuditTrailTrimmingBackgroundTask.cs b/src/Orchard.Web/Modules/Orchard.AuditTrail/Services/AuditTrailTrimmingBackgroundTask.cs index 83da1ae69..1a648ed78 100644 --- a/src/Orchard.Web/Modules/Orchard.AuditTrail/Services/AuditTrailTrimmingBackgroundTask.cs +++ b/src/Orchard.Web/Modules/Orchard.AuditTrail/Services/AuditTrailTrimmingBackgroundTask.cs @@ -7,8 +7,8 @@ using Orchard.Environment.Extensions; using Orchard.Logging; using Orchard.Services; using Orchard.Settings; -using Orchard.TaskLease.Services; using Orchard.Tasks; +using Orchard.Tasks.Locking.Services; namespace Orchard.AuditTrail.Services { [OrchardFeature("Orchard.AuditTrail.Trimming")] @@ -16,19 +16,19 @@ namespace Orchard.AuditTrail.Services { private static readonly object _sweepLock = new object(); private readonly ISiteService _siteService; private readonly IClock _clock; - private readonly ITaskLeaseService _taskLeaseService; private readonly IAuditTrailManager _auditTrailManager; + private readonly IDistributedLockService _distributedLockService; public AuditTrailTrimmingBackgroundTask( ISiteService siteService, IClock clock, - ITaskLeaseService taskLeaseService, - IAuditTrailManager auditTrailManager) { + IAuditTrailManager auditTrailManager, + IDistributedLockService distributedLockService) { _siteService = siteService; _clock = clock; - _taskLeaseService = taskLeaseService; _auditTrailManager = auditTrailManager; + _distributedLockService = distributedLockService; } public AuditTrailTrimmingSettingsPart Settings { @@ -41,17 +41,20 @@ namespace Orchard.AuditTrail.Services { Logger.Debug("Beginning sweep."); // Only allow this task to run on one farm node at a time. - if (_taskLeaseService.Acquire(GetType().FullName, _clock.UtcNow.AddHours(1)) != null) { + DistributedLock @lock; + if (_distributedLockService.TryAcquireLockForMachine(GetType().FullName, TimeSpan.FromHours(1), out @lock)) { + using (@lock) { - // We don't need to check the audit trail for events to remove every minute. Let's stick with twice a day. - if (!GetIsTimeToTrim()) - return; + // We don't need to check the audit trail for events to remove every minute. Let's stick with twice a day. + if (!GetIsTimeToTrim()) + return; - Logger.Debug("Starting audit trail trimming."); - var deletedRecords = _auditTrailManager.Trim(TimeSpan.FromDays(Settings.RetentionPeriod)); - Logger.Debug("Audit trail trimming completed. {0} records were deleted.", deletedRecords.Count()); - Settings.LastRunUtc = _clock.UtcNow; - } + Logger.Debug("Starting audit trail trimming."); + var deletedRecords = _auditTrailManager.Trim(TimeSpan.FromDays(Settings.RetentionPeriod)); + Logger.Debug("Audit trail trimming completed. {0} records were deleted.", deletedRecords.Count()); + Settings.LastRunUtc = _clock.UtcNow; + } + } } catch (Exception ex) { Logger.Error(ex, "Error during sweep."); diff --git a/src/Orchard.Web/Modules/Orchard.Azure.MediaServices/Services/Jobs/JobProcessor.cs b/src/Orchard.Web/Modules/Orchard.Azure.MediaServices/Services/Jobs/JobProcessor.cs index 4a73fd266..4a066e68a 100644 --- a/src/Orchard.Web/Modules/Orchard.Azure.MediaServices/Services/Jobs/JobProcessor.cs +++ b/src/Orchard.Web/Modules/Orchard.Azure.MediaServices/Services/Jobs/JobProcessor.cs @@ -3,19 +3,17 @@ using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; +using Microsoft.WindowsAzure.MediaServices.Client; using Orchard.Azure.MediaServices.Helpers; using Orchard.Azure.MediaServices.Models; using Orchard.Azure.MediaServices.Models.Assets; using Orchard.Azure.MediaServices.Models.Jobs; using Orchard.Azure.MediaServices.Services.Assets; using Orchard.Azure.MediaServices.Services.Wams; -using Microsoft.WindowsAzure.MediaServices.Client; -using Orchard; using Orchard.ContentManagement; using Orchard.Logging; -using Orchard.Services; -using Orchard.TaskLease.Services; using Orchard.Tasks; +using Orchard.Tasks.Locking.Services; namespace Orchard.Azure.MediaServices.Services.Jobs { public class JobProcessor : Component, IBackgroundTask { @@ -23,26 +21,23 @@ namespace Orchard.Azure.MediaServices.Services.Jobs { private static readonly object _sweepLock = new object(); private readonly IWamsClient _wamsClient; - private readonly IClock _clock; - private readonly ITaskLeaseService _taskLeaseService; private readonly IAssetManager _assetManager; private readonly IJobManager _jobManager; private readonly IOrchardServices _orchardServices; + private readonly IDistributedLockService _distributedLockService; public JobProcessor( IWamsClient wamsClient, - IClock clock, - ITaskLeaseService taskLeaseService, IAssetManager assetManager, IJobManager jobManager, - IOrchardServices orchardServices) { + IOrchardServices orchardServices, + IDistributedLockService distributedLockService) { _wamsClient = wamsClient; - _clock = clock; - _taskLeaseService = taskLeaseService; _assetManager = assetManager; _jobManager = jobManager; _orchardServices = orchardServices; + _distributedLockService = distributedLockService; } public void Sweep() { @@ -56,103 +51,106 @@ namespace Orchard.Azure.MediaServices.Services.Jobs { } // Only allow this task to run on one farm node at a time. - if (_taskLeaseService.Acquire(GetType().FullName, _clock.UtcNow.AddHours(1)) != null) { - var jobs = _jobManager.GetActiveJobs().ToDictionary(job => job.WamsJobId); + DistributedLock @lock; + if (_distributedLockService.TryAcquireLockForMachine(GetType().FullName, TimeSpan.FromHours(1), out @lock)) { + using (@lock) { + var jobs = _jobManager.GetActiveJobs().ToDictionary(job => job.WamsJobId); - if (!jobs.Any()) { - Logger.Debug("No open jobs were found; going back to sleep."); - return; - } - - Logger.Information("Beginning processing of {0} open jobs.", jobs.Count()); - - var wamsJobs = _wamsClient.GetJobsById(jobs.Keys); - - foreach (var wamsJob in wamsJobs) { - Logger.Information("Processing job '{0}'...", wamsJob.Name); - - var job = jobs[wamsJob.Id]; - var tasks = job.Tasks.ToDictionary(task => task.WamsTaskId); - var wamsTasks = wamsJob.Tasks.ToArray(); - - foreach (var wamsTask in wamsTasks) { - var task = tasks[wamsTask.Id]; - task.Status = MapWamsJobState(wamsTask.State); - task.PercentComplete = (int)wamsTask.Progress; + if (!jobs.Any()) { + Logger.Debug("No open jobs were found; going back to sleep."); + return; } - var previousStatus = job.Status; - var wamsJobErrors = HarvestWamsJobErrors(wamsJob).ToArray(); + Logger.Information("Beginning processing of {0} open jobs.", jobs.Count()); - job.CreatedUtc = wamsJob.Created; - job.StartedUtc = wamsJob.StartTime; - job.FinishedUtc = wamsJob.EndTime; - job.Status = MapWamsJobState(wamsJob.State); - job.ErrorMessage = GetAggregateErrorMessage(wamsJobErrors); + var wamsJobs = _wamsClient.GetJobsById(jobs.Keys); - LogWamsJobErrors(wamsJobErrors); + foreach (var wamsJob in wamsJobs) { + Logger.Information("Processing job '{0}'...", wamsJob.Name); - if (job.Status != previousStatus) { - if (job.Status == JobStatus.Finished) { - Logger.Information("Job '{0}' was finished in WAMS; creating locators.", wamsJob.Name); + var job = jobs[wamsJob.Id]; + var tasks = job.Tasks.ToDictionary(task => task.WamsTaskId); + var wamsTasks = wamsJob.Tasks.ToArray(); - var lastTask = job.Tasks.Last(); - var lastWamsTask = wamsTasks.Where(task => task.Id == lastTask.WamsTaskId).Single(); - var outputAsset = lastWamsTask.OutputAssets.First(); - var outputAssetName = !String.IsNullOrWhiteSpace(job.OutputAssetName) ? job.OutputAssetName : lastWamsTask.Name; - var outputAssetDescription = job.OutputAssetDescription.TrimSafe(); - var encoderMetadataXml = _wamsClient.GetEncoderMetadataXml(outputAsset).Result; - var cloudVideoPart = job.CloudVideoPart; - var wamsLocators = _wamsClient.CreateLocatorsAsync(outputAsset, WamsLocatorCategory.Private).Result; + foreach (var wamsTask in wamsTasks) { + var task = tasks[wamsTask.Id]; + task.Status = MapWamsJobState(wamsTask.State); + task.PercentComplete = (int)wamsTask.Progress; + } - // HACK: Temporary workaround to disable dynamic packaging for VC1-based assets. In future versions - // this will be implemented more robustly by testing all the dynamic URLs to see which ones work - // and only store and use the working ones. - var forceNonDynamicAsset = lastWamsTask.Configuration.StartsWith("VC1"); + var previousStatus = job.Status; + var wamsJobErrors = HarvestWamsJobErrors(wamsJob).ToArray(); - if (wamsLocators.OnDemandLocator != null && !forceNonDynamicAsset) { - _assetManager.CreateAssetFor(cloudVideoPart, asset => { - asset.IncludeInPlayer = true; - asset.Name = outputAssetName; - asset.Description = outputAssetDescription; - asset.EncodingPreset = lastTask.HarvestAssetName; - asset.WamsPrivateLocatorId = wamsLocators.SasLocator.Id; - asset.WamsPrivateLocatorUrl = wamsLocators.SasLocator.Url; - asset.WamsPrivateOnDemandLocatorId = wamsLocators.OnDemandLocator.Id; - asset.WamsPrivateOnDemandLocatorUrl = wamsLocators.OnDemandLocator.Url; - asset.WamsManifestFilename = wamsLocators.OnDemandManifestFilename; - asset.WamsAssetId = outputAsset.Id; - asset.WamsEncoderMetadataXml = encoderMetadataXml; - asset.UploadState.Status = AssetUploadStatus.Uploaded; - asset.PublishState.Status = AssetPublishStatus.None; - }); - } - else { - _assetManager.CreateAssetFor(cloudVideoPart, asset => { - asset.IncludeInPlayer = true; - asset.Name = outputAssetName; - asset.Description = outputAssetDescription; - asset.EncodingPreset = lastTask.HarvestAssetName; - asset.WamsPrivateLocatorId = wamsLocators.SasLocator.Id; - asset.WamsPrivateLocatorUrl = wamsLocators.SasLocator.Url; - asset.WamsAssetId = outputAsset.Id; - asset.WamsEncoderMetadataXml = encoderMetadataXml; - asset.UploadState.Status = AssetUploadStatus.Uploaded; - asset.PublishState.Status = AssetPublishStatus.None; - }); - } + job.CreatedUtc = wamsJob.Created; + job.StartedUtc = wamsJob.StartTime; + job.FinishedUtc = wamsJob.EndTime; + job.Status = MapWamsJobState(wamsJob.State); + job.ErrorMessage = GetAggregateErrorMessage(wamsJobErrors); - try { - if (cloudVideoPart.IsPublished()) - _assetManager.PublishAssetsFor(cloudVideoPart); - } - catch (Exception ex) { - Logger.Warning(ex, "Processing of job '{0}' was completed but an error occurred while publishing the cloud video item with ID {1} after processing.", wamsJob.Name, cloudVideoPart.Id); + LogWamsJobErrors(wamsJobErrors); + + if (job.Status != previousStatus) { + if (job.Status == JobStatus.Finished) { + Logger.Information("Job '{0}' was finished in WAMS; creating locators.", wamsJob.Name); + + var lastTask = job.Tasks.Last(); + var lastWamsTask = wamsTasks.Single(task => task.Id == lastTask.WamsTaskId); + var outputAsset = lastWamsTask.OutputAssets.First(); + var outputAssetName = !String.IsNullOrWhiteSpace(job.OutputAssetName) ? job.OutputAssetName : lastWamsTask.Name; + var outputAssetDescription = job.OutputAssetDescription.TrimSafe(); + var encoderMetadataXml = _wamsClient.GetEncoderMetadataXml(outputAsset).Result; + var cloudVideoPart = job.CloudVideoPart; + var wamsLocators = _wamsClient.CreateLocatorsAsync(outputAsset, WamsLocatorCategory.Private).Result; + + // HACK: Temporary workaround to disable dynamic packaging for VC1-based assets. In future versions + // this will be implemented more robustly by testing all the dynamic URLs to see which ones work + // and only store and use the working ones. + var forceNonDynamicAsset = lastWamsTask.Configuration.StartsWith("VC1"); + + if (wamsLocators.OnDemandLocator != null && !forceNonDynamicAsset) { + _assetManager.CreateAssetFor(cloudVideoPart, asset => { + asset.IncludeInPlayer = true; + asset.Name = outputAssetName; + asset.Description = outputAssetDescription; + asset.EncodingPreset = lastTask.HarvestAssetName; + asset.WamsPrivateLocatorId = wamsLocators.SasLocator.Id; + asset.WamsPrivateLocatorUrl = wamsLocators.SasLocator.Url; + asset.WamsPrivateOnDemandLocatorId = wamsLocators.OnDemandLocator.Id; + asset.WamsPrivateOnDemandLocatorUrl = wamsLocators.OnDemandLocator.Url; + asset.WamsManifestFilename = wamsLocators.OnDemandManifestFilename; + asset.WamsAssetId = outputAsset.Id; + asset.WamsEncoderMetadataXml = encoderMetadataXml; + asset.UploadState.Status = AssetUploadStatus.Uploaded; + asset.PublishState.Status = AssetPublishStatus.None; + }); + } + else { + _assetManager.CreateAssetFor(cloudVideoPart, asset => { + asset.IncludeInPlayer = true; + asset.Name = outputAssetName; + asset.Description = outputAssetDescription; + asset.EncodingPreset = lastTask.HarvestAssetName; + asset.WamsPrivateLocatorId = wamsLocators.SasLocator.Id; + asset.WamsPrivateLocatorUrl = wamsLocators.SasLocator.Url; + asset.WamsAssetId = outputAsset.Id; + asset.WamsEncoderMetadataXml = encoderMetadataXml; + asset.UploadState.Status = AssetUploadStatus.Uploaded; + asset.PublishState.Status = AssetPublishStatus.None; + }); + } + + try { + if (cloudVideoPart.IsPublished()) + _assetManager.PublishAssetsFor(cloudVideoPart); + } + catch (Exception ex) { + Logger.Warning(ex, "Processing of job '{0}' was completed but an error occurred while publishing the cloud video item with ID {1} after processing.", wamsJob.Name, cloudVideoPart.Id); + } } } - } - Logger.Information("Processing of job '{0}' was successfully completed.", wamsJob.Name); + Logger.Information("Processing of job '{0}' was successfully completed.", wamsJob.Name); + } } } } diff --git a/src/Orchard.Web/Modules/Orchard.JobsQueue/Services/JobsQueueProcessor.cs b/src/Orchard.Web/Modules/Orchard.JobsQueue/Services/JobsQueueProcessor.cs index dd5ddc633..039bfbc75 100644 --- a/src/Orchard.Web/Modules/Orchard.JobsQueue/Services/JobsQueueProcessor.cs +++ b/src/Orchard.Web/Modules/Orchard.JobsQueue/Services/JobsQueueProcessor.cs @@ -5,28 +5,25 @@ using System.Threading; using Newtonsoft.Json.Linq; using Orchard.Environment; using Orchard.Events; -using Orchard.Logging; using Orchard.JobsQueue.Models; -using Orchard.Services; -using Orchard.TaskLease.Services; +using Orchard.Logging; +using Orchard.Tasks.Locking.Services; namespace Orchard.JobsQueue.Services { public class JobsQueueProcessor : IJobsQueueProcessor { private readonly Work _jobsQueueManager; - private readonly Work _clock; - private readonly Work _taskLeaseService; private readonly Work _eventBus; private readonly ReaderWriterLockSlim _rwl = new ReaderWriterLockSlim(); + private readonly IDistributedLockService _distributedLockService; public JobsQueueProcessor( - Work clock, Work jobsQueueManager, - Work taskLeaseService, - Work eventBus) { - _clock = clock; + Work eventBus, + IDistributedLockService distributedLockService) { + _jobsQueueManager = jobsQueueManager; - _taskLeaseService = taskLeaseService; _eventBus = eventBus; + _distributedLockService = distributedLockService; Logger = NullLogger.Instance; } @@ -35,12 +32,15 @@ namespace Orchard.JobsQueue.Services { // prevent two threads on the same machine to process the message queue if (_rwl.TryEnterWriteLock(0)) { try { - if (_taskLeaseService.Value.Acquire("JobsQueueProcessor", _clock.Value.UtcNow.AddMinutes(5)) != null) { - IEnumerable messages; + DistributedLock @lock; + if(_distributedLockService.TryAcquireLockForMachine(GetType().FullName, TimeSpan.FromMinutes(5), out @lock)){ + using (@lock) { + IEnumerable messages; - while ((messages = _jobsQueueManager.Value.GetJobs(0, 10).ToArray()).Any()) { - foreach (var message in messages) { - ProcessMessage(message); + while ((messages = _jobsQueueManager.Value.GetJobs(0, 10).ToArray()).Any()) { + foreach (var message in messages) { + ProcessMessage(message); + } } } } diff --git a/src/Orchard.Web/Modules/Orchard.TaskLease/Services/ITaskLeaseService.cs b/src/Orchard.Web/Modules/Orchard.TaskLease/Services/ITaskLeaseService.cs index c32e3d37e..f9eaa7e35 100644 --- a/src/Orchard.Web/Modules/Orchard.TaskLease/Services/ITaskLeaseService.cs +++ b/src/Orchard.Web/Modules/Orchard.TaskLease/Services/ITaskLeaseService.cs @@ -6,7 +6,7 @@ namespace Orchard.TaskLease.Services { /// Describes a service to save and acquire task leases. A task lease can't be acquired by two different machines, /// for a specific amount of time. Optionnally a State can be saved along with the lease. /// - [Obsolete("Use Orchard.Tasks.Locking.IDistributedLockService instead.")] + [Obsolete("Use Orchard.Tasks.Locking.IDistributedLockService.AcquireForMachine instead.")] public interface ITaskLeaseService : IDependency { /// diff --git a/src/Orchard.Web/Modules/Orchard.TaskLease/Services/TaskLeaseService.cs b/src/Orchard.Web/Modules/Orchard.TaskLease/Services/TaskLeaseService.cs index 398c1092f..5d5fd6179 100644 --- a/src/Orchard.Web/Modules/Orchard.TaskLease/Services/TaskLeaseService.cs +++ b/src/Orchard.Web/Modules/Orchard.TaskLease/Services/TaskLeaseService.cs @@ -9,7 +9,7 @@ namespace Orchard.TaskLease.Services { /// /// Provides a database driven implementation of /// - [Obsolete("Use Orchard.Tasks.Locking.DistributedLockService instead.")] + [Obsolete("Use Orchard.Tasks.Locking.DistributedLockService.AcquireForMachine instead.")] public class TaskLeaseService : ITaskLeaseService { private readonly IRepository _repository; diff --git a/src/Orchard/Environment/OrchardStarter.cs b/src/Orchard/Environment/OrchardStarter.cs index 3d3dd6958..1e0db5736 100644 --- a/src/Orchard/Environment/OrchardStarter.cs +++ b/src/Orchard/Environment/OrchardStarter.cs @@ -67,7 +67,7 @@ namespace Orchard.Environment { builder.RegisterType().As().SingleInstance(); builder.RegisterType().As().SingleInstance(); builder.RegisterType().As().SingleInstance(); - builder.RegisterType().As().SingleInstance(); + builder.RegisterType().As().SingleInstance(); //builder.RegisterType().As().SingleInstance(); RegisterVolatileProvider(builder);