diff --git a/src/DotXxlJob.Core/Model/JobExecuteContext.cs b/src/DotXxlJob.Core/Model/JobExecuteContext.cs index 253ec81..ac7a1bc 100644 --- a/src/DotXxlJob.Core/Model/JobExecuteContext.cs +++ b/src/DotXxlJob.Core/Model/JobExecuteContext.cs @@ -1,13 +1,17 @@ +using System.Threading; + namespace DotXxlJob.Core.Model { public class JobExecuteContext { - public JobExecuteContext(IJobLogger jobLogger,string jobParameter) + public JobExecuteContext(IJobLogger jobLogger, string jobParameter, CancellationToken cancellationToken) { this.JobLogger = jobLogger; this.JobParameter = jobParameter; + this.cancellationToken = cancellationToken; } public string JobParameter { get; } - public IJobLogger JobLogger { get; } + public IJobLogger JobLogger { get; } + public CancellationToken cancellationToken { get; } } } \ No newline at end of file diff --git a/src/DotXxlJob.Core/Queue/JobTaskQueue.cs b/src/DotXxlJob.Core/Queue/JobTaskQueue.cs index dfa3d22..adf62a3 100644 --- a/src/DotXxlJob.Core/Queue/JobTaskQueue.cs +++ b/src/DotXxlJob.Core/Queue/JobTaskQueue.cs @@ -8,7 +8,7 @@ using Microsoft.Extensions.Logging; namespace DotXxlJob.Core { - public class JobTaskQueue:IDisposable + public class JobTaskQueue : IDisposable { private readonly IJobLogger _jobLogger; private readonly ILogger _logger; @@ -16,7 +16,7 @@ namespace DotXxlJob.Core private readonly ConcurrentDictionary ID_IN_QUEUE = new ConcurrentDictionary(); private CancellationTokenSource _cancellationTokenSource; private Task _runTask; - public JobTaskQueue(ITaskExecutor executor,IJobLogger jobLogger,ILogger logger) + public JobTaskQueue(ITaskExecutor executor, IJobLogger jobLogger, ILogger logger) { this.Executor = executor; this._jobLogger = jobLogger; @@ -28,135 +28,134 @@ namespace DotXxlJob.Core public event EventHandler CallBack; - - - + + + public bool IsRunning() { - return _cancellationTokenSource !=null; + return _cancellationTokenSource != null; } - - - /// - /// 覆盖之前的队列 - /// - /// - /// - public ReturnT Replace(TriggerParam triggerParam) - { - Stop(); + + + /// + /// 覆盖之前的队列 + /// + /// + /// + public ReturnT Replace(TriggerParam triggerParam) + { while (!TASK_QUEUE.IsEmpty) { TASK_QUEUE.TryDequeue(out _); } + Stop(); ID_IN_QUEUE.Clear(); return Push(triggerParam); - } - - public ReturnT Push(TriggerParam triggerParam) - { - if(!ID_IN_QUEUE.TryAdd(triggerParam.LogId,0)) - { - _logger.LogWarning("repeat job task,logId={logId},jobId={jobId}",triggerParam.LogId,triggerParam.JobId); - return ReturnT.Failed("repeat job task!"); - } - - //this._logger.LogWarning("add job with logId={logId},jobId={jobId}",triggerParam.LogId,triggerParam.JobId); - - this.TASK_QUEUE.Enqueue(triggerParam); - StartTask(); - return ReturnT.SUCCESS; - } - - public void Stop() - { - _cancellationTokenSource?.Cancel(); - _cancellationTokenSource?.Dispose(); - _cancellationTokenSource = null; - - //wait for task completed - _runTask?.GetAwaiter().GetResult(); - } - - public void Dispose() - { - Stop(); - while (!TASK_QUEUE.IsEmpty) - { - TASK_QUEUE.TryDequeue(out _); - } - ID_IN_QUEUE.Clear(); - } - - private void StartTask() - { - if (_cancellationTokenSource != null ) - { - return; //running - } - _cancellationTokenSource = new CancellationTokenSource(); - var ct = _cancellationTokenSource.Token; - - _runTask = Task.Factory.StartNew(async () => - { - + } + + public ReturnT Push(TriggerParam triggerParam) + { + if (!ID_IN_QUEUE.TryAdd(triggerParam.LogId, 0)) + { + _logger.LogWarning("repeat job task,logId={logId},jobId={jobId}", triggerParam.LogId, triggerParam.JobId); + return ReturnT.Failed("repeat job task!"); + } + + //this._logger.LogWarning("add job with logId={logId},jobId={jobId}",triggerParam.LogId,triggerParam.JobId); + + this.TASK_QUEUE.Enqueue(triggerParam); + StartTask(); + return ReturnT.SUCCESS; + } + + public void Stop() + { + _cancellationTokenSource?.Cancel(); + _cancellationTokenSource?.Dispose(); + _cancellationTokenSource = null; + + //wait for task completed + _runTask?.GetAwaiter().GetResult(); + } + + public void Dispose() + { + while (!TASK_QUEUE.IsEmpty) + { + TASK_QUEUE.TryDequeue(out _); + } + ID_IN_QUEUE.Clear(); + Stop(); + } + + private void StartTask() + { + if (_cancellationTokenSource != null) + { + return; //running + } + _cancellationTokenSource = new CancellationTokenSource(); + var ct = _cancellationTokenSource.Token; + + _runTask = Task.Factory.StartNew(async () => + { + //ct.ThrowIfCancellationRequested(); - + while (!ct.IsCancellationRequested) - { - if (TASK_QUEUE.IsEmpty) - { + { + if (TASK_QUEUE.IsEmpty) + { //_logger.LogInformation("task queue is empty!"); break; - } - - ReturnT result = null; - TriggerParam triggerParam = null; - try - { - - if (TASK_QUEUE.TryDequeue(out triggerParam)) - { - if (!ID_IN_QUEUE.TryRemove(triggerParam.LogId,out _)) - { - _logger.LogWarning("remove queue failed,logId={logId},jobId={jobId},exists={exists}" - ,triggerParam.LogId,triggerParam.JobId,ID_IN_QUEUE.ContainsKey(triggerParam.LogId)); - } + } + + ReturnT result = null; + TriggerParam triggerParam = null; + try + { + + if (TASK_QUEUE.TryDequeue(out triggerParam)) + { + if (!ID_IN_QUEUE.TryRemove(triggerParam.LogId, out _)) + { + _logger.LogWarning("remove queue failed,logId={logId},jobId={jobId},exists={exists}" + , triggerParam.LogId, triggerParam.JobId, ID_IN_QUEUE.ContainsKey(triggerParam.LogId)); + } //set log file; - _jobLogger.SetLogFile(triggerParam.LogDateTime,triggerParam.LogId); - - _jobLogger.Log("
----------- xxl-job job execute start -----------
----------- Param:{0}" ,triggerParam.ExecutorParams); - - result = await Executor.Execute(triggerParam); - - _jobLogger.Log("
----------- xxl-job job execute end(finish) -----------
----------- ReturnT:" + result.Code); - } - else - { - _logger.LogWarning("Dequeue Task Failed"); - } - } - catch (Exception ex) - { - result = ReturnT.Failed("Dequeue Task Failed:"+ex.Message); - _jobLogger.Log("
----------- JobThread Exception:" + ex.Message + "
----------- xxl-job job execute end(error) -----------"); - } - - if(triggerParam !=null) - { - CallBack?.Invoke(this,new HandleCallbackParam(triggerParam, result??ReturnT.FAIL)); - } - - } - - - _cancellationTokenSource.Dispose(); - _cancellationTokenSource = null; - - }, _cancellationTokenSource.Token); - - - } + _jobLogger.SetLogFile(triggerParam.LogDateTime, triggerParam.LogId); + + _jobLogger.Log("
----------- xxl-job job execute start -----------
----------- Param:{0}", triggerParam.ExecutorParams); + + result = await Executor.Execute(triggerParam, ct); + + _jobLogger.Log("
----------- xxl-job job execute end(finish) -----------
----------- ReturnT:" + result.Code); + } + else + { + _logger.LogWarning("Dequeue Task Failed"); + } + } + catch (Exception ex) + { + result = ReturnT.Failed("Dequeue Task Failed:" + ex.Message); + _jobLogger.Log("
----------- JobThread Exception:" + ex.Message + "
----------- xxl-job job execute end(error) -----------"); + } + + if (triggerParam != null) + { + CallBack?.Invoke(this, new HandleCallbackParam(triggerParam, result ?? ReturnT.FAIL)); + } + + } + + + _cancellationTokenSource?.Dispose(); + _cancellationTokenSource = null; + }, _cancellationTokenSource.Token); + + + } } } \ No newline at end of file diff --git a/src/DotXxlJob.Core/TaskExecutors/BeanTaskExecutor.cs b/src/DotXxlJob.Core/TaskExecutors/BeanTaskExecutor.cs index 3c85c06..2b4a47c 100644 --- a/src/DotXxlJob.Core/TaskExecutors/BeanTaskExecutor.cs +++ b/src/DotXxlJob.Core/TaskExecutors/BeanTaskExecutor.cs @@ -1,3 +1,4 @@ +using System.Threading; using System.Threading.Tasks; using DotXxlJob.Core.Model; @@ -6,28 +7,28 @@ namespace DotXxlJob.Core.TaskExecutors /// /// 实现 IJobHandler的执行器 /// - public class BeanTaskExecutor:ITaskExecutor + public class BeanTaskExecutor : ITaskExecutor { private readonly IJobHandlerFactory _handlerFactory; private readonly IJobLogger _jobLogger; - public BeanTaskExecutor(IJobHandlerFactory handlerFactory,IJobLogger jobLogger) + public BeanTaskExecutor(IJobHandlerFactory handlerFactory, IJobLogger jobLogger) { this._handlerFactory = handlerFactory; this._jobLogger = jobLogger; } - + public string GlueType { get; } = Constants.GlueType.BEAN; - - public Task Execute(TriggerParam triggerParam) + + public Task Execute(TriggerParam triggerParam, CancellationToken cancellationToken) { var handler = _handlerFactory.GetJobHandler(triggerParam.ExecutorHandler); if (handler == null) { - return Task.FromResult(ReturnT.Failed($"job handler [{triggerParam.ExecutorHandler} not found.")); + return Task.FromResult(ReturnT.Failed($"job handler [{triggerParam.ExecutorHandler} not found.")); } - var context = new JobExecuteContext(this._jobLogger, triggerParam.ExecutorParams); + var context = new JobExecuteContext(this._jobLogger, triggerParam.ExecutorParams, cancellationToken); return handler.Execute(context); } } diff --git a/src/DotXxlJob.Core/TaskExecutors/ITaskExecutor.cs b/src/DotXxlJob.Core/TaskExecutors/ITaskExecutor.cs index 679f513..c9946f7 100644 --- a/src/DotXxlJob.Core/TaskExecutors/ITaskExecutor.cs +++ b/src/DotXxlJob.Core/TaskExecutors/ITaskExecutor.cs @@ -1,3 +1,4 @@ +using System.Threading; using System.Threading.Tasks; using DotXxlJob.Core.Model; @@ -7,6 +8,6 @@ namespace DotXxlJob.Core { string GlueType { get; } - Task Execute(TriggerParam triggerParam); + Task Execute(TriggerParam triggerParam, CancellationToken cancellationToken); } } \ No newline at end of file