Fixing that JobsQueue will fail if there are a large number of tasks, fixes #5365

JobsQueue now works in batches to prevent transaction timeouts and the manual job execution feature uses batches as well. This is a fix for the immediate issue, further fixes are needed around the background task infrastructure, see the linked issue.
This commit is contained in:
Lombiq
2016-12-28 20:16:47 +01:00
committed by Zoltán Lehóczky
parent 8425fb4829
commit 48e2a6a1b4
5 changed files with 47 additions and 21 deletions

View File

@@ -1,11 +1,11 @@
using System.Linq;
using System.Web.Mvc;
using Orchard.ContentManagement;
using Orchard.ContentManagement;
using Orchard.DisplayManagement;
using Orchard.Environment.Extensions;
using Orchard.Localization;
using Orchard.JobsQueue.Models;
using System.Linq;
using System.Web.Mvc;
using Orchard.JobsQueue.Services;
using Orchard.Localization;
using Orchard.Mvc;
using Orchard.UI.Admin;
using Orchard.UI.Navigation;
@@ -20,9 +20,9 @@ namespace Orchard.JobsQueue.Controllers {
private readonly IJobsQueueProcessor _jobsQueueProcessor;
public AdminController(
IJobsQueueManager jobsQueueManager,
IJobsQueueManager jobsQueueManager,
IShapeFactory shapeFactory,
IOrchardServices services,
IOrchardServices services,
IJobsQueueProcessor jobsQueueProcessor) {
_jobsQueueManager = jobsQueueManager;
_services = services;
@@ -43,7 +43,7 @@ namespace Orchard.JobsQueue.Controllers {
return View(model);
}
public ActionResult List(PagerParameters pagerParameters) {
public ActionResult List(PagerParameters pagerParameters, bool processQueue = false) {
var pager = new Pager(_services.WorkContext.CurrentSite, pagerParameters);
var jobsCount = _jobsQueueManager.GetJobsCount();
@@ -52,6 +52,7 @@ namespace Orchard.JobsQueue.Controllers {
.Pager(_services.New.Pager(pager).TotalItemCount(jobsCount))
.JobsQueueStatus(_services.WorkContext.CurrentSite.As<JobsQueueSettingsPart>().Status)
.Jobs(jobs)
.ProcessQueue(processQueue)
;
return View(model);
@@ -82,10 +83,17 @@ namespace Orchard.JobsQueue.Controllers {
[HttpPost, ActionName("List")]
[FormValueRequired("submit.Process")]
public ActionResult Process() {
_jobsQueueProcessor.ProcessQueue();
_services.Notifier.Information(T("Processing has started."));
return RedirectToAction("List");
}
var processQueue = false;
if (_jobsQueueManager.GetJobsCount() > 0) {
_services.Notifier.Information(T("Processing is in progress."));
processQueue = true;
_jobsQueueProcessor.ProcessQueue(10, 1);
}
else {
_services.Notifier.Information(T("Processing has been completed."));
}
return RedirectToAction("List", new { processQueue });
}
}
}

View File

@@ -1,5 +1,5 @@
namespace Orchard.JobsQueue.Services {
public interface IJobsQueueProcessor : ISingletonDependency {
void ProcessQueue();
void ProcessQueue(int batchSize, uint batchCount);
}
}

View File

@@ -8,7 +8,7 @@ namespace Orchard.JobsQueue.Services {
}
public void Sweep() {
_jobsQueueProcessor.ProcessQueue();
_jobsQueueProcessor.ProcessQueue(10, uint.MaxValue);
}
}
}

View File

@@ -7,59 +7,66 @@ using Orchard.Events;
using Orchard.JobsQueue.Models;
using Orchard.Logging;
using Orchard.Tasks.Locking.Services;
using Orchard.Data;
namespace Orchard.JobsQueue.Services {
public class JobsQueueProcessor : IJobsQueueProcessor {
private readonly Work<IJobsQueueManager> _jobsQueueManager;
private readonly Work<IEventBus> _eventBus;
private readonly Work<IDistributedLockService> _distributedLockService;
private readonly Work<ITransactionManager> _transactionManager;
public JobsQueueProcessor(
Work<IJobsQueueManager> jobsQueueManager,
Work<IEventBus> eventBus,
Work<IDistributedLockService> distributedLockService) {
Work<IDistributedLockService> distributedLockService,
Work<ITransactionManager> transactionManager) {
_jobsQueueManager = jobsQueueManager;
_eventBus = eventBus;
_distributedLockService = distributedLockService;
_transactionManager = transactionManager;
Logger = NullLogger.Instance;
}
public ILogger Logger { get; set; }
public void ProcessQueue() {
public void ProcessQueue(int batchSize, uint batchCount) {
IDistributedLock @lock;
if (_distributedLockService.Value.TryAcquireLock(GetType().FullName, TimeSpan.FromMinutes(5), out @lock)) {
using (@lock) {
IEnumerable<QueuedJobRecord> messages;
while ((messages = _jobsQueueManager.Value.GetJobs(0, 10).ToArray()).Any()) {
var currentBatch = 0;
while (batchCount > currentBatch && (messages = _jobsQueueManager.Value.GetJobs(0, batchSize).ToArray()).Any()) {
foreach (var message in messages) {
ProcessMessage(message);
}
currentBatch++;
}
}
}
}
private void ProcessMessage(QueuedJobRecord job) {
Logger.Debug("Processing job {0}.", job.Id);
_transactionManager.Value.RequireNew();
try {
var payload = JObject.Parse(job.Parameters);
var parameters = payload.ToDictionary();
_eventBus.Value.Notify(job.Message, parameters);
_jobsQueueManager.Value.Delete(job);
Logger.Debug("Processed job Id {0}.", job.Id);
}
catch (Exception e) {
_transactionManager.Value.Cancel();
Logger.Error(e, "An unexpected error while processing job {0}. Error message: {1}.", job.Id, e);
}
finally {
_jobsQueueManager.Value.Delete(job);
}
}
}

View File

@@ -7,6 +7,7 @@
Layout.Title = T("Jobs Queue");
Style.Include("admin-jobsqueue.css");
Script.Require("jQuery");
}
@using (Html.BeginFormAntiForgeryPost()) {
<div class="manage">
@@ -62,4 +63,14 @@ else {
</table>
}
@Display(Model.Pager)
}
@if (Model.ProcessQueue) {
using (Script.Foot()) {
<script type="text/javascript">
$(function () {
$("[name='submit.Process']").trigger("click");
});
</script>
}
}