diff --git a/src/DotXxlJob.Core/Config/XxlJobExecutorOptions.cs b/src/DotXxlJob.Core/Config/XxlJobExecutorOptions.cs index d5be764..4d062be 100644 --- a/src/DotXxlJob.Core/Config/XxlJobExecutorOptions.cs +++ b/src/DotXxlJob.Core/Config/XxlJobExecutorOptions.cs @@ -52,5 +52,8 @@ namespace DotXxlJob.Core.Config /// public int LogRetentionDays { get; set; } = 30; + + public int CallBackInterval { get; set; } = 500; //回调时间间隔 500毫秒 + } } \ No newline at end of file diff --git a/src/DotXxlJob.Core/Internal/Constants.cs b/src/DotXxlJob.Core/Internal/Constants.cs index 46ed32b..b4ef9c6 100644 --- a/src/DotXxlJob.Core/Internal/Constants.cs +++ b/src/DotXxlJob.Core/Internal/Constants.cs @@ -31,7 +31,7 @@ namespace DotXxlJob.Core public const int MaxCallbackRetryTimes = 10; //每次回调最多发送几条记录 - public const int MaxCallbackRecordsPerRequest = 20; + public const int MaxCallbackRecordsPerRequest =5; public static TimeSpan CallbackRetryInterval = TimeSpan.FromSeconds(600); //Admin集群机器请求默认超时时间 diff --git a/src/DotXxlJob.Core/Queue/CallbackTaskQueue.cs b/src/DotXxlJob.Core/Queue/CallbackTaskQueue.cs index 16caa75..0900ddc 100644 --- a/src/DotXxlJob.Core/Queue/CallbackTaskQueue.cs +++ b/src/DotXxlJob.Core/Queue/CallbackTaskQueue.cs @@ -22,6 +22,8 @@ namespace DotXxlJob.Core.Queue private bool _isRunning; + private int _callbackInterval; + private Task _runTask; public CallbackTaskQueue(AdminClient adminClient,IJobLogger jobLogger,IOptions optionsAccessor , ILoggerFactory loggerFactory) @@ -29,6 +31,8 @@ namespace DotXxlJob.Core.Queue _adminClient = adminClient; _jobLogger = jobLogger; + _callbackInterval = optionsAccessor.Value.CallBackInterval; + _retryQueue = new RetryCallbackTaskQueue(optionsAccessor.Value.LogPath, Push, loggerFactory.CreateLogger()); @@ -65,7 +69,10 @@ namespace DotXxlJob.Core.Queue while (!_stop) { await DoCallBack(); - await Task.Delay(TimeSpan.FromSeconds(3)); + if (taskQueue.IsEmpty) + { + await Task.Delay(TimeSpan.FromMilliseconds(_callbackInterval)); + } } _logger.LogDebug("end to callback"); _isRunning = false; @@ -77,20 +84,17 @@ namespace DotXxlJob.Core.Queue { List list = new List(); - while (list.Count < Constants.MaxCallbackRecordsPerRequest && taskQueue.TryDequeue(out var item)) - { - list.Add(item); - } - - if (list.Count == 0) + if(!taskQueue.TryDequeue(out var item)) { return; } - ReturnT result; + list.Add(item); + + ReturnT result; try { - result = await _adminClient.Callback(list); + result = await _adminClient.Callback(list).ConfigureAwait(false); } catch (Exception ex){ _logger.LogError(ex,"trigger callback error:{error}",ex.Message); diff --git a/src/DotXxlJob.Core/Queue/JobTaskQueue.cs b/src/DotXxlJob.Core/Queue/JobTaskQueue.cs index 5453918..ead1c67 100644 --- a/src/DotXxlJob.Core/Queue/JobTaskQueue.cs +++ b/src/DotXxlJob.Core/Queue/JobTaskQueue.cs @@ -10,7 +10,6 @@ namespace DotXxlJob.Core { public class JobTaskQueue:IDisposable { - private readonly ITaskExecutor _executor; private readonly IJobLogger _jobLogger; private readonly ILogger _logger; private readonly ConcurrentQueue TASK_QUEUE = new ConcurrentQueue(); @@ -19,12 +18,12 @@ namespace DotXxlJob.Core private Task _runTask; public JobTaskQueue(ITaskExecutor executor,IJobLogger jobLogger,ILogger logger) { - this._executor = executor; + this.Executor = executor; this._jobLogger = jobLogger; this._logger = logger; } - public ITaskExecutor Executor => this._executor; + public ITaskExecutor Executor { get; } public event EventHandler CallBack; @@ -122,7 +121,7 @@ namespace DotXxlJob.Core _jobLogger.Log("
----------- xxl-job job execute start -----------
----------- Param:{0}" ,triggerParam.ExecutorParams); - result = await _executor.Execute(triggerParam); + result = await Executor.Execute(triggerParam); _jobLogger.Log("
----------- xxl-job job execute end(finish) -----------
----------- ReturnT:" + result.Code); }