调整为每次只上报一次,貌似多条编解码有点问题

pull/1/head
wangshaoming 7 years ago
parent 6ba9658aa3
commit 5aaddac9b9
No known key found for this signature in database
GPG Key ID: 29F5223B4DB362B5
  1. 3
      src/DotXxlJob.Core/Config/XxlJobExecutorOptions.cs
  2. 2
      src/DotXxlJob.Core/Internal/Constants.cs
  3. 22
      src/DotXxlJob.Core/Queue/CallbackTaskQueue.cs
  4. 7
      src/DotXxlJob.Core/Queue/JobTaskQueue.cs

@ -52,5 +52,8 @@ namespace DotXxlJob.Core.Config
/// </summary>
public int LogRetentionDays { get; set; } = 30;
public int CallBackInterval { get; set; } = 500; //回调时间间隔 500毫秒
}
}

@ -31,7 +31,7 @@ namespace DotXxlJob.Core
public const int MaxCallbackRetryTimes = 10;
//每次回调最多发送几条记录
public const int MaxCallbackRecordsPerRequest = 20;
public const int MaxCallbackRecordsPerRequest =5;
public static TimeSpan CallbackRetryInterval = TimeSpan.FromSeconds(600);
//Admin集群机器请求默认超时时间

@ -22,6 +22,8 @@ namespace DotXxlJob.Core.Queue
private bool _isRunning;
private int _callbackInterval;
private Task _runTask;
public CallbackTaskQueue(AdminClient adminClient,IJobLogger jobLogger,IOptions<XxlJobExecutorOptions> optionsAccessor
, ILoggerFactory loggerFactory)
@ -29,6 +31,8 @@ namespace DotXxlJob.Core.Queue
_adminClient = adminClient;
_jobLogger = jobLogger;
_callbackInterval = optionsAccessor.Value.CallBackInterval;
_retryQueue = new RetryCallbackTaskQueue(optionsAccessor.Value.LogPath,
Push,
loggerFactory.CreateLogger<RetryCallbackTaskQueue>());
@ -65,7 +69,10 @@ namespace DotXxlJob.Core.Queue
while (!_stop)
{
await DoCallBack();
await Task.Delay(TimeSpan.FromSeconds(3));
if (taskQueue.IsEmpty)
{
await Task.Delay(TimeSpan.FromMilliseconds(_callbackInterval));
}
}
_logger.LogDebug("end to callback");
_isRunning = false;
@ -77,20 +84,17 @@ namespace DotXxlJob.Core.Queue
{
List<HandleCallbackParam> list = new List<HandleCallbackParam>();
while (list.Count < Constants.MaxCallbackRecordsPerRequest && taskQueue.TryDequeue(out var item))
{
list.Add(item);
}
if (list.Count == 0)
if(!taskQueue.TryDequeue(out var item))
{
return;
}
ReturnT result;
list.Add(item);
ReturnT result;
try
{
result = await _adminClient.Callback(list);
result = await _adminClient.Callback(list).ConfigureAwait(false);
}
catch (Exception ex){
_logger.LogError(ex,"trigger callback error:{error}",ex.Message);

@ -10,7 +10,6 @@ namespace DotXxlJob.Core
{
public class JobTaskQueue:IDisposable
{
private readonly ITaskExecutor _executor;
private readonly IJobLogger _jobLogger;
private readonly ILogger<JobTaskQueue> _logger;
private readonly ConcurrentQueue<TriggerParam> TASK_QUEUE = new ConcurrentQueue<TriggerParam>();
@ -19,12 +18,12 @@ namespace DotXxlJob.Core
private Task _runTask;
public JobTaskQueue(ITaskExecutor executor,IJobLogger jobLogger,ILogger<JobTaskQueue> logger)
{
this._executor = executor;
this.Executor = executor;
this._jobLogger = jobLogger;
this._logger = logger;
}
public ITaskExecutor Executor => this._executor;
public ITaskExecutor Executor { get; }
public event EventHandler<HandleCallbackParam> CallBack;
@ -122,7 +121,7 @@ namespace DotXxlJob.Core
_jobLogger.Log("<br>----------- xxl-job job execute start -----------<br>----------- Param:{0}" ,triggerParam.ExecutorParams);
result = await _executor.Execute(triggerParam);
result = await Executor.Execute(triggerParam);
_jobLogger.Log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + result.Code);
}

Loading…
Cancel
Save