Merge pull request #24 from guochen2/cg/fix/task_cancel

任务取消支持
pull/37/head
xuanye wong 4 years ago committed by GitHub
commit c9d382722a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      src/DotXxlJob.Core/Model/JobExecuteContext.cs
  2. 201
      src/DotXxlJob.Core/Queue/JobTaskQueue.cs
  3. 11
      src/DotXxlJob.Core/TaskExecutors/BeanTaskExecutor.cs
  4. 3
      src/DotXxlJob.Core/TaskExecutors/ITaskExecutor.cs

@ -1,13 +1,17 @@
using System.Threading;
namespace DotXxlJob.Core.Model namespace DotXxlJob.Core.Model
{ {
public class JobExecuteContext public class JobExecuteContext
{ {
public JobExecuteContext(IJobLogger jobLogger,string jobParameter) public JobExecuteContext(IJobLogger jobLogger, string jobParameter, CancellationToken cancellationToken)
{ {
this.JobLogger = jobLogger; this.JobLogger = jobLogger;
this.JobParameter = jobParameter; this.JobParameter = jobParameter;
this.cancellationToken = cancellationToken;
} }
public string JobParameter { get; } public string JobParameter { get; }
public IJobLogger JobLogger { get; } public IJobLogger JobLogger { get; }
public CancellationToken cancellationToken { get; }
} }
} }

@ -8,7 +8,7 @@ using Microsoft.Extensions.Logging;
namespace DotXxlJob.Core namespace DotXxlJob.Core
{ {
public class JobTaskQueue:IDisposable public class JobTaskQueue : IDisposable
{ {
private readonly IJobLogger _jobLogger; private readonly IJobLogger _jobLogger;
private readonly ILogger<JobTaskQueue> _logger; private readonly ILogger<JobTaskQueue> _logger;
@ -16,7 +16,7 @@ namespace DotXxlJob.Core
private readonly ConcurrentDictionary<long, byte> ID_IN_QUEUE = new ConcurrentDictionary<long, byte>(); private readonly ConcurrentDictionary<long, byte> ID_IN_QUEUE = new ConcurrentDictionary<long, byte>();
private CancellationTokenSource _cancellationTokenSource; private CancellationTokenSource _cancellationTokenSource;
private Task _runTask; private Task _runTask;
public JobTaskQueue(ITaskExecutor executor,IJobLogger jobLogger,ILogger<JobTaskQueue> logger) public JobTaskQueue(ITaskExecutor executor, IJobLogger jobLogger, ILogger<JobTaskQueue> logger)
{ {
this.Executor = executor; this.Executor = executor;
this._jobLogger = jobLogger; this._jobLogger = jobLogger;
@ -33,130 +33,129 @@ namespace DotXxlJob.Core
public bool IsRunning() public bool IsRunning()
{ {
return _cancellationTokenSource !=null; return _cancellationTokenSource != null;
} }
/// <summary> /// <summary>
/// 覆盖之前的队列 /// 覆盖之前的队列
/// </summary> /// </summary>
/// <param name="triggerParam"></param> /// <param name="triggerParam"></param>
/// <returns></returns> /// <returns></returns>
public ReturnT Replace(TriggerParam triggerParam) public ReturnT Replace(TriggerParam triggerParam)
{ {
Stop();
while (!TASK_QUEUE.IsEmpty) while (!TASK_QUEUE.IsEmpty)
{ {
TASK_QUEUE.TryDequeue(out _); TASK_QUEUE.TryDequeue(out _);
} }
Stop();
ID_IN_QUEUE.Clear(); ID_IN_QUEUE.Clear();
return Push(triggerParam); return Push(triggerParam);
} }
public ReturnT Push(TriggerParam triggerParam) public ReturnT Push(TriggerParam triggerParam)
{ {
if(!ID_IN_QUEUE.TryAdd(triggerParam.LogId,0)) if (!ID_IN_QUEUE.TryAdd(triggerParam.LogId, 0))
{ {
_logger.LogWarning("repeat job task,logId={logId},jobId={jobId}",triggerParam.LogId,triggerParam.JobId); _logger.LogWarning("repeat job task,logId={logId},jobId={jobId}", triggerParam.LogId, triggerParam.JobId);
return ReturnT.Failed("repeat job task!"); return ReturnT.Failed("repeat job task!");
} }
//this._logger.LogWarning("add job with logId={logId},jobId={jobId}",triggerParam.LogId,triggerParam.JobId); //this._logger.LogWarning("add job with logId={logId},jobId={jobId}",triggerParam.LogId,triggerParam.JobId);
this.TASK_QUEUE.Enqueue(triggerParam); this.TASK_QUEUE.Enqueue(triggerParam);
StartTask(); StartTask();
return ReturnT.SUCCESS; return ReturnT.SUCCESS;
} }
public void Stop() public void Stop()
{ {
_cancellationTokenSource?.Cancel(); _cancellationTokenSource?.Cancel();
_cancellationTokenSource?.Dispose(); _cancellationTokenSource?.Dispose();
_cancellationTokenSource = null; _cancellationTokenSource = null;
//wait for task completed //wait for task completed
_runTask?.GetAwaiter().GetResult(); _runTask?.GetAwaiter().GetResult();
} }
public void Dispose() public void Dispose()
{ {
Stop(); while (!TASK_QUEUE.IsEmpty)
while (!TASK_QUEUE.IsEmpty) {
{ TASK_QUEUE.TryDequeue(out _);
TASK_QUEUE.TryDequeue(out _); }
} ID_IN_QUEUE.Clear();
ID_IN_QUEUE.Clear(); Stop();
} }
private void StartTask() private void StartTask()
{ {
if (_cancellationTokenSource != null ) if (_cancellationTokenSource != null)
{ {
return; //running return; //running
} }
_cancellationTokenSource = new CancellationTokenSource(); _cancellationTokenSource = new CancellationTokenSource();
var ct = _cancellationTokenSource.Token; var ct = _cancellationTokenSource.Token;
_runTask = Task.Factory.StartNew(async () => _runTask = Task.Factory.StartNew(async () =>
{ {
//ct.ThrowIfCancellationRequested(); //ct.ThrowIfCancellationRequested();
while (!ct.IsCancellationRequested) while (!ct.IsCancellationRequested)
{ {
if (TASK_QUEUE.IsEmpty) if (TASK_QUEUE.IsEmpty)
{ {
//_logger.LogInformation("task queue is empty!"); //_logger.LogInformation("task queue is empty!");
break; break;
} }
ReturnT result = null; ReturnT result = null;
TriggerParam triggerParam = null; TriggerParam triggerParam = null;
try try
{ {
if (TASK_QUEUE.TryDequeue(out triggerParam)) if (TASK_QUEUE.TryDequeue(out triggerParam))
{ {
if (!ID_IN_QUEUE.TryRemove(triggerParam.LogId,out _)) if (!ID_IN_QUEUE.TryRemove(triggerParam.LogId, out _))
{ {
_logger.LogWarning("remove queue failed,logId={logId},jobId={jobId},exists={exists}" _logger.LogWarning("remove queue failed,logId={logId},jobId={jobId},exists={exists}"
,triggerParam.LogId,triggerParam.JobId,ID_IN_QUEUE.ContainsKey(triggerParam.LogId)); , triggerParam.LogId, triggerParam.JobId, ID_IN_QUEUE.ContainsKey(triggerParam.LogId));
} }
//set log file; //set log file;
_jobLogger.SetLogFile(triggerParam.LogDateTime,triggerParam.LogId); _jobLogger.SetLogFile(triggerParam.LogDateTime, triggerParam.LogId);
_jobLogger.Log("<br>----------- xxl-job job execute start -----------<br>----------- Param:{0}" ,triggerParam.ExecutorParams); _jobLogger.Log("<br>----------- xxl-job job execute start -----------<br>----------- Param:{0}", triggerParam.ExecutorParams);
result = await Executor.Execute(triggerParam); result = await Executor.Execute(triggerParam, ct);
_jobLogger.Log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + result.Code); _jobLogger.Log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + result.Code);
} }
else else
{ {
_logger.LogWarning("Dequeue Task Failed"); _logger.LogWarning("Dequeue Task Failed");
} }
} }
catch (Exception ex) catch (Exception ex)
{ {
result = ReturnT.Failed("Dequeue Task Failed:"+ex.Message); result = ReturnT.Failed("Dequeue Task Failed:" + ex.Message);
_jobLogger.Log("<br>----------- JobThread Exception:" + ex.Message + "<br>----------- xxl-job job execute end(error) -----------"); _jobLogger.Log("<br>----------- JobThread Exception:" + ex.Message + "<br>----------- xxl-job job execute end(error) -----------");
} }
if(triggerParam !=null) if (triggerParam != null)
{ {
CallBack?.Invoke(this,new HandleCallbackParam(triggerParam, result??ReturnT.FAIL)); CallBack?.Invoke(this, new HandleCallbackParam(triggerParam, result ?? ReturnT.FAIL));
} }
} }
_cancellationTokenSource.Dispose(); _cancellationTokenSource?.Dispose();
_cancellationTokenSource = null; _cancellationTokenSource = null;
}, _cancellationTokenSource.Token);
}, _cancellationTokenSource.Token);
}
}
} }
} }

@ -1,3 +1,4 @@
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotXxlJob.Core.Model; using DotXxlJob.Core.Model;
@ -6,12 +7,12 @@ namespace DotXxlJob.Core.TaskExecutors
/// <summary> /// <summary>
/// 实现 IJobHandler的执行器 /// 实现 IJobHandler的执行器
/// </summary> /// </summary>
public class BeanTaskExecutor:ITaskExecutor public class BeanTaskExecutor : ITaskExecutor
{ {
private readonly IJobHandlerFactory _handlerFactory; private readonly IJobHandlerFactory _handlerFactory;
private readonly IJobLogger _jobLogger; private readonly IJobLogger _jobLogger;
public BeanTaskExecutor(IJobHandlerFactory handlerFactory,IJobLogger jobLogger) public BeanTaskExecutor(IJobHandlerFactory handlerFactory, IJobLogger jobLogger)
{ {
this._handlerFactory = handlerFactory; this._handlerFactory = handlerFactory;
this._jobLogger = jobLogger; this._jobLogger = jobLogger;
@ -19,15 +20,15 @@ namespace DotXxlJob.Core.TaskExecutors
public string GlueType { get; } = Constants.GlueType.BEAN; public string GlueType { get; } = Constants.GlueType.BEAN;
public Task<ReturnT> Execute(TriggerParam triggerParam) public Task<ReturnT> Execute(TriggerParam triggerParam, CancellationToken cancellationToken)
{ {
var handler = _handlerFactory.GetJobHandler(triggerParam.ExecutorHandler); var handler = _handlerFactory.GetJobHandler(triggerParam.ExecutorHandler);
if (handler == null) if (handler == null)
{ {
return Task.FromResult(ReturnT.Failed($"job handler [{triggerParam.ExecutorHandler} not found.")); return Task.FromResult(ReturnT.Failed($"job handler [{triggerParam.ExecutorHandler} not found."));
} }
var context = new JobExecuteContext(this._jobLogger, triggerParam.ExecutorParams); var context = new JobExecuteContext(this._jobLogger, triggerParam.ExecutorParams, cancellationToken);
return handler.Execute(context); return handler.Execute(context);
} }
} }

@ -1,3 +1,4 @@
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotXxlJob.Core.Model; using DotXxlJob.Core.Model;
@ -7,6 +8,6 @@ namespace DotXxlJob.Core
{ {
string GlueType { get; } string GlueType { get; }
Task<ReturnT> Execute(TriggerParam triggerParam); Task<ReturnT> Execute(TriggerParam triggerParam, CancellationToken cancellationToken);
} }
} }
Loading…
Cancel
Save