1.增加CancellationToken用于任务取消

2.修复在多任务执行情况下因取消或覆盖任务导致的调用bug
pull/24/head
guochen2 4 years ago
parent 0f0eaafbb9
commit a57721cf90
  1. 6
      src/DotXxlJob.Core/Model/JobExecuteContext.cs
  2. 35
      src/DotXxlJob.Core/Queue/JobTaskQueue.cs
  3. 9
      src/DotXxlJob.Core/TaskExecutors/BeanTaskExecutor.cs
  4. 3
      src/DotXxlJob.Core/TaskExecutors/ITaskExecutor.cs

@ -1,13 +1,17 @@
using System.Threading;
namespace DotXxlJob.Core.Model
{
public class JobExecuteContext
{
public JobExecuteContext(IJobLogger jobLogger,string jobParameter)
public JobExecuteContext(IJobLogger jobLogger, string jobParameter, CancellationToken cancellationToken)
{
this.JobLogger = jobLogger;
this.JobParameter = jobParameter;
this.cancellationToken = cancellationToken;
}
public string JobParameter { get; }
public IJobLogger JobLogger { get; }
public CancellationToken cancellationToken { get; }
}
}

@ -8,7 +8,7 @@ using Microsoft.Extensions.Logging;
namespace DotXxlJob.Core
{
public class JobTaskQueue:IDisposable
public class JobTaskQueue : IDisposable
{
private readonly IJobLogger _jobLogger;
private readonly ILogger<JobTaskQueue> _logger;
@ -16,7 +16,7 @@ namespace DotXxlJob.Core
private readonly ConcurrentDictionary<long, byte> ID_IN_QUEUE = new ConcurrentDictionary<long, byte>();
private CancellationTokenSource _cancellationTokenSource;
private Task _runTask;
public JobTaskQueue(ITaskExecutor executor,IJobLogger jobLogger,ILogger<JobTaskQueue> logger)
public JobTaskQueue(ITaskExecutor executor, IJobLogger jobLogger, ILogger<JobTaskQueue> logger)
{
this.Executor = executor;
this._jobLogger = jobLogger;
@ -33,7 +33,7 @@ namespace DotXxlJob.Core
public bool IsRunning()
{
return _cancellationTokenSource !=null;
return _cancellationTokenSource != null;
}
@ -44,11 +44,11 @@ namespace DotXxlJob.Core
/// <returns></returns>
public ReturnT Replace(TriggerParam triggerParam)
{
Stop();
while (!TASK_QUEUE.IsEmpty)
{
TASK_QUEUE.TryDequeue(out _);
}
Stop();
ID_IN_QUEUE.Clear();
return Push(triggerParam);
@ -56,9 +56,9 @@ namespace DotXxlJob.Core
public ReturnT Push(TriggerParam triggerParam)
{
if(!ID_IN_QUEUE.TryAdd(triggerParam.LogId,0))
if (!ID_IN_QUEUE.TryAdd(triggerParam.LogId, 0))
{
_logger.LogWarning("repeat job task,logId={logId},jobId={jobId}",triggerParam.LogId,triggerParam.JobId);
_logger.LogWarning("repeat job task,logId={logId},jobId={jobId}", triggerParam.LogId, triggerParam.JobId);
return ReturnT.Failed("repeat job task!");
}
@ -81,17 +81,17 @@ namespace DotXxlJob.Core
public void Dispose()
{
Stop();
while (!TASK_QUEUE.IsEmpty)
{
TASK_QUEUE.TryDequeue(out _);
}
ID_IN_QUEUE.Clear();
Stop();
}
private void StartTask()
{
if (_cancellationTokenSource != null )
if (_cancellationTokenSource != null)
{
return; //running
}
@ -118,17 +118,17 @@ namespace DotXxlJob.Core
if (TASK_QUEUE.TryDequeue(out triggerParam))
{
if (!ID_IN_QUEUE.TryRemove(triggerParam.LogId,out _))
if (!ID_IN_QUEUE.TryRemove(triggerParam.LogId, out _))
{
_logger.LogWarning("remove queue failed,logId={logId},jobId={jobId},exists={exists}"
,triggerParam.LogId,triggerParam.JobId,ID_IN_QUEUE.ContainsKey(triggerParam.LogId));
, triggerParam.LogId, triggerParam.JobId, ID_IN_QUEUE.ContainsKey(triggerParam.LogId));
}
//set log file;
_jobLogger.SetLogFile(triggerParam.LogDateTime,triggerParam.LogId);
_jobLogger.SetLogFile(triggerParam.LogDateTime, triggerParam.LogId);
_jobLogger.Log("<br>----------- xxl-job job execute start -----------<br>----------- Param:{0}" ,triggerParam.ExecutorParams);
_jobLogger.Log("<br>----------- xxl-job job execute start -----------<br>----------- Param:{0}", triggerParam.ExecutorParams);
result = await Executor.Execute(triggerParam);
result = await Executor.Execute(triggerParam, ct);
_jobLogger.Log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + result.Code);
}
@ -139,21 +139,20 @@ namespace DotXxlJob.Core
}
catch (Exception ex)
{
result = ReturnT.Failed("Dequeue Task Failed:"+ex.Message);
result = ReturnT.Failed("Dequeue Task Failed:" + ex.Message);
_jobLogger.Log("<br>----------- JobThread Exception:" + ex.Message + "<br>----------- xxl-job job execute end(error) -----------");
}
if(triggerParam !=null)
if (triggerParam != null)
{
CallBack?.Invoke(this,new HandleCallbackParam(triggerParam, result??ReturnT.FAIL));
CallBack?.Invoke(this, new HandleCallbackParam(triggerParam, result ?? ReturnT.FAIL));
}
}
_cancellationTokenSource.Dispose();
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;
}, _cancellationTokenSource.Token);

@ -1,3 +1,4 @@
using System.Threading;
using System.Threading.Tasks;
using DotXxlJob.Core.Model;
@ -6,12 +7,12 @@ namespace DotXxlJob.Core.TaskExecutors
/// <summary>
/// 实现 IJobHandler的执行器
/// </summary>
public class BeanTaskExecutor:ITaskExecutor
public class BeanTaskExecutor : ITaskExecutor
{
private readonly IJobHandlerFactory _handlerFactory;
private readonly IJobLogger _jobLogger;
public BeanTaskExecutor(IJobHandlerFactory handlerFactory,IJobLogger jobLogger)
public BeanTaskExecutor(IJobHandlerFactory handlerFactory, IJobLogger jobLogger)
{
this._handlerFactory = handlerFactory;
this._jobLogger = jobLogger;
@ -19,7 +20,7 @@ namespace DotXxlJob.Core.TaskExecutors
public string GlueType { get; } = Constants.GlueType.BEAN;
public Task<ReturnT> Execute(TriggerParam triggerParam)
public Task<ReturnT> Execute(TriggerParam triggerParam, CancellationToken cancellationToken)
{
var handler = _handlerFactory.GetJobHandler(triggerParam.ExecutorHandler);
@ -27,7 +28,7 @@ namespace DotXxlJob.Core.TaskExecutors
{
return Task.FromResult(ReturnT.Failed($"job handler [{triggerParam.ExecutorHandler} not found."));
}
var context = new JobExecuteContext(this._jobLogger, triggerParam.ExecutorParams);
var context = new JobExecuteContext(this._jobLogger, triggerParam.ExecutorParams, cancellationToken);
return handler.Execute(context);
}
}

@ -1,3 +1,4 @@
using System.Threading;
using System.Threading.Tasks;
using DotXxlJob.Core.Model;
@ -7,6 +8,6 @@ namespace DotXxlJob.Core
{
string GlueType { get; }
Task<ReturnT> Execute(TriggerParam triggerParam);
Task<ReturnT> Execute(TriggerParam triggerParam, CancellationToken cancellationToken);
}
}
Loading…
Cancel
Save