From 5aaddac9b927da9b373018bdda5a84e362b65a64 Mon Sep 17 00:00:00 2001 From: wangshaoming Date: Wed, 10 Apr 2019 09:35:57 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E4=B8=BA=E6=AF=8F=E6=AC=A1?= =?UTF-8?q?=E5=8F=AA=E4=B8=8A=E6=8A=A5=E4=B8=80=E6=AC=A1=EF=BC=8C=E8=B2=8C?= =?UTF-8?q?=E4=BC=BC=E5=A4=9A=E6=9D=A1=E7=BC=96=E8=A7=A3=E7=A0=81=E6=9C=89?= =?UTF-8?q?=E7=82=B9=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Config/XxlJobExecutorOptions.cs | 3 +++ src/DotXxlJob.Core/Internal/Constants.cs | 2 +- src/DotXxlJob.Core/Queue/CallbackTaskQueue.cs | 22 +++++++++++-------- src/DotXxlJob.Core/Queue/JobTaskQueue.cs | 7 +++--- 4 files changed, 20 insertions(+), 14 deletions(-) diff --git a/src/DotXxlJob.Core/Config/XxlJobExecutorOptions.cs b/src/DotXxlJob.Core/Config/XxlJobExecutorOptions.cs index d5be764..4d062be 100644 --- a/src/DotXxlJob.Core/Config/XxlJobExecutorOptions.cs +++ b/src/DotXxlJob.Core/Config/XxlJobExecutorOptions.cs @@ -52,5 +52,8 @@ namespace DotXxlJob.Core.Config /// public int LogRetentionDays { get; set; } = 30; + + public int CallBackInterval { get; set; } = 500; //回调时间间隔 500毫秒 + } } \ No newline at end of file diff --git a/src/DotXxlJob.Core/Internal/Constants.cs b/src/DotXxlJob.Core/Internal/Constants.cs index 46ed32b..b4ef9c6 100644 --- a/src/DotXxlJob.Core/Internal/Constants.cs +++ b/src/DotXxlJob.Core/Internal/Constants.cs @@ -31,7 +31,7 @@ namespace DotXxlJob.Core public const int MaxCallbackRetryTimes = 10; //每次回调最多发送几条记录 - public const int MaxCallbackRecordsPerRequest = 20; + public const int MaxCallbackRecordsPerRequest =5; public static TimeSpan CallbackRetryInterval = TimeSpan.FromSeconds(600); //Admin集群机器请求默认超时时间 diff --git a/src/DotXxlJob.Core/Queue/CallbackTaskQueue.cs b/src/DotXxlJob.Core/Queue/CallbackTaskQueue.cs index 16caa75..0900ddc 100644 --- a/src/DotXxlJob.Core/Queue/CallbackTaskQueue.cs +++ b/src/DotXxlJob.Core/Queue/CallbackTaskQueue.cs @@ -22,6 +22,8 @@ namespace DotXxlJob.Core.Queue private bool _isRunning; + private int _callbackInterval; + private Task _runTask; public CallbackTaskQueue(AdminClient adminClient,IJobLogger jobLogger,IOptions optionsAccessor , ILoggerFactory loggerFactory) @@ -29,6 +31,8 @@ namespace DotXxlJob.Core.Queue _adminClient = adminClient; _jobLogger = jobLogger; + _callbackInterval = optionsAccessor.Value.CallBackInterval; + _retryQueue = new RetryCallbackTaskQueue(optionsAccessor.Value.LogPath, Push, loggerFactory.CreateLogger()); @@ -65,7 +69,10 @@ namespace DotXxlJob.Core.Queue while (!_stop) { await DoCallBack(); - await Task.Delay(TimeSpan.FromSeconds(3)); + if (taskQueue.IsEmpty) + { + await Task.Delay(TimeSpan.FromMilliseconds(_callbackInterval)); + } } _logger.LogDebug("end to callback"); _isRunning = false; @@ -77,20 +84,17 @@ namespace DotXxlJob.Core.Queue { List list = new List(); - while (list.Count < Constants.MaxCallbackRecordsPerRequest && taskQueue.TryDequeue(out var item)) - { - list.Add(item); - } - - if (list.Count == 0) + if(!taskQueue.TryDequeue(out var item)) { return; } - ReturnT result; + list.Add(item); + + ReturnT result; try { - result = await _adminClient.Callback(list); + result = await _adminClient.Callback(list).ConfigureAwait(false); } catch (Exception ex){ _logger.LogError(ex,"trigger callback error:{error}",ex.Message); diff --git a/src/DotXxlJob.Core/Queue/JobTaskQueue.cs b/src/DotXxlJob.Core/Queue/JobTaskQueue.cs index 5453918..ead1c67 100644 --- a/src/DotXxlJob.Core/Queue/JobTaskQueue.cs +++ b/src/DotXxlJob.Core/Queue/JobTaskQueue.cs @@ -10,7 +10,6 @@ namespace DotXxlJob.Core { public class JobTaskQueue:IDisposable { - private readonly ITaskExecutor _executor; private readonly IJobLogger _jobLogger; private readonly ILogger _logger; private readonly ConcurrentQueue TASK_QUEUE = new ConcurrentQueue(); @@ -19,12 +18,12 @@ namespace DotXxlJob.Core private Task _runTask; public JobTaskQueue(ITaskExecutor executor,IJobLogger jobLogger,ILogger logger) { - this._executor = executor; + this.Executor = executor; this._jobLogger = jobLogger; this._logger = logger; } - public ITaskExecutor Executor => this._executor; + public ITaskExecutor Executor { get; } public event EventHandler CallBack; @@ -122,7 +121,7 @@ namespace DotXxlJob.Core _jobLogger.Log("
----------- xxl-job job execute start -----------
----------- Param:{0}" ,triggerParam.ExecutorParams); - result = await _executor.Execute(triggerParam); + result = await Executor.Execute(triggerParam); _jobLogger.Log("
----------- xxl-job job execute end(finish) -----------
----------- ReturnT:" + result.Code); }