Updated usages of ITaskLeaseService with IDistributedLockService.

This commit is contained in:
Sipke Schoorstra
2015-08-21 19:35:42 +01:00
parent 6c712a9840
commit 81bf5d94fd
6 changed files with 128 additions and 127 deletions

View File

@@ -3,19 +3,17 @@ using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using Microsoft.WindowsAzure.MediaServices.Client;
using Orchard.Azure.MediaServices.Helpers;
using Orchard.Azure.MediaServices.Models;
using Orchard.Azure.MediaServices.Models.Assets;
using Orchard.Azure.MediaServices.Models.Jobs;
using Orchard.Azure.MediaServices.Services.Assets;
using Orchard.Azure.MediaServices.Services.Wams;
using Microsoft.WindowsAzure.MediaServices.Client;
using Orchard;
using Orchard.ContentManagement;
using Orchard.Logging;
using Orchard.Services;
using Orchard.TaskLease.Services;
using Orchard.Tasks;
using Orchard.Tasks.Locking.Services;
namespace Orchard.Azure.MediaServices.Services.Jobs {
public class JobProcessor : Component, IBackgroundTask {
@@ -23,26 +21,23 @@ namespace Orchard.Azure.MediaServices.Services.Jobs {
private static readonly object _sweepLock = new object();
private readonly IWamsClient _wamsClient;
private readonly IClock _clock;
private readonly ITaskLeaseService _taskLeaseService;
private readonly IAssetManager _assetManager;
private readonly IJobManager _jobManager;
private readonly IOrchardServices _orchardServices;
private readonly IDistributedLockService _distributedLockService;
public JobProcessor(
IWamsClient wamsClient,
IClock clock,
ITaskLeaseService taskLeaseService,
IAssetManager assetManager,
IJobManager jobManager,
IOrchardServices orchardServices) {
IOrchardServices orchardServices,
IDistributedLockService distributedLockService) {
_wamsClient = wamsClient;
_clock = clock;
_taskLeaseService = taskLeaseService;
_assetManager = assetManager;
_jobManager = jobManager;
_orchardServices = orchardServices;
_distributedLockService = distributedLockService;
}
public void Sweep() {
@@ -56,103 +51,106 @@ namespace Orchard.Azure.MediaServices.Services.Jobs {
}
// Only allow this task to run on one farm node at a time.
if (_taskLeaseService.Acquire(GetType().FullName, _clock.UtcNow.AddHours(1)) != null) {
var jobs = _jobManager.GetActiveJobs().ToDictionary(job => job.WamsJobId);
DistributedLock @lock;
if (_distributedLockService.TryAcquireLockForMachine(GetType().FullName, TimeSpan.FromHours(1), out @lock)) {
using (@lock) {
var jobs = _jobManager.GetActiveJobs().ToDictionary(job => job.WamsJobId);
if (!jobs.Any()) {
Logger.Debug("No open jobs were found; going back to sleep.");
return;
}
Logger.Information("Beginning processing of {0} open jobs.", jobs.Count());
var wamsJobs = _wamsClient.GetJobsById(jobs.Keys);
foreach (var wamsJob in wamsJobs) {
Logger.Information("Processing job '{0}'...", wamsJob.Name);
var job = jobs[wamsJob.Id];
var tasks = job.Tasks.ToDictionary(task => task.WamsTaskId);
var wamsTasks = wamsJob.Tasks.ToArray();
foreach (var wamsTask in wamsTasks) {
var task = tasks[wamsTask.Id];
task.Status = MapWamsJobState(wamsTask.State);
task.PercentComplete = (int)wamsTask.Progress;
if (!jobs.Any()) {
Logger.Debug("No open jobs were found; going back to sleep.");
return;
}
var previousStatus = job.Status;
var wamsJobErrors = HarvestWamsJobErrors(wamsJob).ToArray();
Logger.Information("Beginning processing of {0} open jobs.", jobs.Count());
job.CreatedUtc = wamsJob.Created;
job.StartedUtc = wamsJob.StartTime;
job.FinishedUtc = wamsJob.EndTime;
job.Status = MapWamsJobState(wamsJob.State);
job.ErrorMessage = GetAggregateErrorMessage(wamsJobErrors);
var wamsJobs = _wamsClient.GetJobsById(jobs.Keys);
LogWamsJobErrors(wamsJobErrors);
foreach (var wamsJob in wamsJobs) {
Logger.Information("Processing job '{0}'...", wamsJob.Name);
if (job.Status != previousStatus) {
if (job.Status == JobStatus.Finished) {
Logger.Information("Job '{0}' was finished in WAMS; creating locators.", wamsJob.Name);
var job = jobs[wamsJob.Id];
var tasks = job.Tasks.ToDictionary(task => task.WamsTaskId);
var wamsTasks = wamsJob.Tasks.ToArray();
var lastTask = job.Tasks.Last();
var lastWamsTask = wamsTasks.Where(task => task.Id == lastTask.WamsTaskId).Single();
var outputAsset = lastWamsTask.OutputAssets.First();
var outputAssetName = !String.IsNullOrWhiteSpace(job.OutputAssetName) ? job.OutputAssetName : lastWamsTask.Name;
var outputAssetDescription = job.OutputAssetDescription.TrimSafe();
var encoderMetadataXml = _wamsClient.GetEncoderMetadataXml(outputAsset).Result;
var cloudVideoPart = job.CloudVideoPart;
var wamsLocators = _wamsClient.CreateLocatorsAsync(outputAsset, WamsLocatorCategory.Private).Result;
foreach (var wamsTask in wamsTasks) {
var task = tasks[wamsTask.Id];
task.Status = MapWamsJobState(wamsTask.State);
task.PercentComplete = (int)wamsTask.Progress;
}
// HACK: Temporary workaround to disable dynamic packaging for VC1-based assets. In future versions
// this will be implemented more robustly by testing all the dynamic URLs to see which ones work
// and only store and use the working ones.
var forceNonDynamicAsset = lastWamsTask.Configuration.StartsWith("VC1");
var previousStatus = job.Status;
var wamsJobErrors = HarvestWamsJobErrors(wamsJob).ToArray();
if (wamsLocators.OnDemandLocator != null && !forceNonDynamicAsset) {
_assetManager.CreateAssetFor<DynamicVideoAsset>(cloudVideoPart, asset => {
asset.IncludeInPlayer = true;
asset.Name = outputAssetName;
asset.Description = outputAssetDescription;
asset.EncodingPreset = lastTask.HarvestAssetName;
asset.WamsPrivateLocatorId = wamsLocators.SasLocator.Id;
asset.WamsPrivateLocatorUrl = wamsLocators.SasLocator.Url;
asset.WamsPrivateOnDemandLocatorId = wamsLocators.OnDemandLocator.Id;
asset.WamsPrivateOnDemandLocatorUrl = wamsLocators.OnDemandLocator.Url;
asset.WamsManifestFilename = wamsLocators.OnDemandManifestFilename;
asset.WamsAssetId = outputAsset.Id;
asset.WamsEncoderMetadataXml = encoderMetadataXml;
asset.UploadState.Status = AssetUploadStatus.Uploaded;
asset.PublishState.Status = AssetPublishStatus.None;
});
}
else {
_assetManager.CreateAssetFor<VideoAsset>(cloudVideoPart, asset => {
asset.IncludeInPlayer = true;
asset.Name = outputAssetName;
asset.Description = outputAssetDescription;
asset.EncodingPreset = lastTask.HarvestAssetName;
asset.WamsPrivateLocatorId = wamsLocators.SasLocator.Id;
asset.WamsPrivateLocatorUrl = wamsLocators.SasLocator.Url;
asset.WamsAssetId = outputAsset.Id;
asset.WamsEncoderMetadataXml = encoderMetadataXml;
asset.UploadState.Status = AssetUploadStatus.Uploaded;
asset.PublishState.Status = AssetPublishStatus.None;
});
}
job.CreatedUtc = wamsJob.Created;
job.StartedUtc = wamsJob.StartTime;
job.FinishedUtc = wamsJob.EndTime;
job.Status = MapWamsJobState(wamsJob.State);
job.ErrorMessage = GetAggregateErrorMessage(wamsJobErrors);
try {
if (cloudVideoPart.IsPublished())
_assetManager.PublishAssetsFor(cloudVideoPart);
}
catch (Exception ex) {
Logger.Warning(ex, "Processing of job '{0}' was completed but an error occurred while publishing the cloud video item with ID {1} after processing.", wamsJob.Name, cloudVideoPart.Id);
LogWamsJobErrors(wamsJobErrors);
if (job.Status != previousStatus) {
if (job.Status == JobStatus.Finished) {
Logger.Information("Job '{0}' was finished in WAMS; creating locators.", wamsJob.Name);
var lastTask = job.Tasks.Last();
var lastWamsTask = wamsTasks.Single(task => task.Id == lastTask.WamsTaskId);
var outputAsset = lastWamsTask.OutputAssets.First();
var outputAssetName = !String.IsNullOrWhiteSpace(job.OutputAssetName) ? job.OutputAssetName : lastWamsTask.Name;
var outputAssetDescription = job.OutputAssetDescription.TrimSafe();
var encoderMetadataXml = _wamsClient.GetEncoderMetadataXml(outputAsset).Result;
var cloudVideoPart = job.CloudVideoPart;
var wamsLocators = _wamsClient.CreateLocatorsAsync(outputAsset, WamsLocatorCategory.Private).Result;
// HACK: Temporary workaround to disable dynamic packaging for VC1-based assets. In future versions
// this will be implemented more robustly by testing all the dynamic URLs to see which ones work
// and only store and use the working ones.
var forceNonDynamicAsset = lastWamsTask.Configuration.StartsWith("VC1");
if (wamsLocators.OnDemandLocator != null && !forceNonDynamicAsset) {
_assetManager.CreateAssetFor<DynamicVideoAsset>(cloudVideoPart, asset => {
asset.IncludeInPlayer = true;
asset.Name = outputAssetName;
asset.Description = outputAssetDescription;
asset.EncodingPreset = lastTask.HarvestAssetName;
asset.WamsPrivateLocatorId = wamsLocators.SasLocator.Id;
asset.WamsPrivateLocatorUrl = wamsLocators.SasLocator.Url;
asset.WamsPrivateOnDemandLocatorId = wamsLocators.OnDemandLocator.Id;
asset.WamsPrivateOnDemandLocatorUrl = wamsLocators.OnDemandLocator.Url;
asset.WamsManifestFilename = wamsLocators.OnDemandManifestFilename;
asset.WamsAssetId = outputAsset.Id;
asset.WamsEncoderMetadataXml = encoderMetadataXml;
asset.UploadState.Status = AssetUploadStatus.Uploaded;
asset.PublishState.Status = AssetPublishStatus.None;
});
}
else {
_assetManager.CreateAssetFor<VideoAsset>(cloudVideoPart, asset => {
asset.IncludeInPlayer = true;
asset.Name = outputAssetName;
asset.Description = outputAssetDescription;
asset.EncodingPreset = lastTask.HarvestAssetName;
asset.WamsPrivateLocatorId = wamsLocators.SasLocator.Id;
asset.WamsPrivateLocatorUrl = wamsLocators.SasLocator.Url;
asset.WamsAssetId = outputAsset.Id;
asset.WamsEncoderMetadataXml = encoderMetadataXml;
asset.UploadState.Status = AssetUploadStatus.Uploaded;
asset.PublishState.Status = AssetPublishStatus.None;
});
}
try {
if (cloudVideoPart.IsPublished())
_assetManager.PublishAssetsFor(cloudVideoPart);
}
catch (Exception ex) {
Logger.Warning(ex, "Processing of job '{0}' was completed but an error occurred while publishing the cloud video item with ID {1} after processing.", wamsJob.Name, cloudVideoPart.Id);
}
}
}
}
Logger.Information("Processing of job '{0}' was successfully completed.", wamsJob.Name);
Logger.Information("Processing of job '{0}' was successfully completed.", wamsJob.Name);
}
}
}
}