增加回调队列的延迟,避免长期占用CPU

pull/1/head
wangshaoming 7 years ago
parent 77e894d6eb
commit 6ba9658aa3
No known key found for this signature in database
GPG Key ID: 29F5223B4DB362B5
  1. 50
      src/DotXxlJob.Core/ExecutorRegistry.cs
  2. 43
      src/DotXxlJob.Core/Queue/CallbackTaskQueue.cs
  3. 39
      src/DotXxlJob.Core/Queue/JobTaskQueue.cs
  4. 21
      src/DotXxlJob.Core/Queue/RetryCallbackTaskQueue.cs

@ -11,67 +11,65 @@ namespace DotXxlJob.Core
/// <summary>
/// 执行器注册注册
/// </summary>
public class ExecutorRegistry:IExecutorRegistry
public class ExecutorRegistry : IExecutorRegistry
{
private readonly AdminClient _adminClient;
private readonly XxlJobExecutorOptions _options;
private readonly ILogger<ExecutorRegistry> _logger;
public ExecutorRegistry(AdminClient adminClient,IOptions<XxlJobExecutorOptions> optionsAccessor,ILogger<ExecutorRegistry> logger)
public ExecutorRegistry(AdminClient adminClient, IOptions<XxlJobExecutorOptions> optionsAccessor, ILogger<ExecutorRegistry> logger)
{
Preconditions.CheckNotNull(optionsAccessor, "XxlJobExecutorOptions");
Preconditions.CheckNotNull(optionsAccessor.Value, "XxlJobExecutorOptions");
this._adminClient = adminClient;
this._options = optionsAccessor.Value;
if (string.IsNullOrEmpty(this._options.SpecialBindAddress))
_adminClient = adminClient;
_options = optionsAccessor.Value;
if (string.IsNullOrEmpty(_options.SpecialBindAddress))
{
this._options.SpecialBindAddress = IPUtility.GetLocalIntranetIP().MapToIPv4().ToString();
_options.SpecialBindAddress = IPUtility.GetLocalIntranetIP().MapToIPv4().ToString();
}
this._logger = logger;
_logger = logger;
}
public async Task RegistryAsync(CancellationToken cancellationToken)
{
var registryParam = new RegistryParam {
RegistryGroup = "EXECUTOR",
RegistryKey = this._options.AppName,
RegistryValue = $"{this._options.SpecialBindAddress}:{this._options.Port}"
RegistryKey = _options.AppName,
RegistryValue = $"{_options.SpecialBindAddress}:{_options.Port}"
};
this._logger.LogInformation(">>>>>>>> start registry to admin <<<<<<<<");
_logger.LogInformation(">>>>>>>> start registry to admin <<<<<<<<");
var errorTimes = 0;
while (!cancellationToken.IsCancellationRequested)
{
try
{
var ret = await this._adminClient.Registry(registryParam);
this._logger.LogDebug("registry last result:{0}", ret?.Code);
var ret = await _adminClient.Registry(registryParam);
_logger.LogDebug("registry last result:{0}", ret?.Code);
errorTimes = 0;
await Task.Delay(Constants.RegistryInterval, cancellationToken);
}
catch (TaskCanceledException)
{
this._logger.LogInformation(">>>>> Application Stopping....<<<<<");
_logger.LogInformation(">>>>> Application Stopping....<<<<<");
}
catch (Exception ex)
{
errorTimes++;
await Task.Delay(Constants.RegistryInterval, cancellationToken);
this._logger.LogError(ex,"registry error:{0},{1} Times",ex.Message,errorTimes);
_logger.LogError(ex, "registry error:{0},{1} Times", ex.Message, errorTimes);
}
}
this._logger.LogInformation(">>>>>>>> end registry to admin <<<<<<<<");
this._logger.LogInformation(">>>>>>>> start remove registry to admin <<<<<<<<");
}
_logger.LogInformation(">>>>>>>> end registry to admin <<<<<<<<");
_logger.LogInformation(">>>>>>>> start remove registry to admin <<<<<<<<");
var removeRet = await this._adminClient.RegistryRemove(registryParam);
this._logger.LogInformation("remove registry last result:{0}",removeRet?.Code);
this._logger.LogInformation(">>>>>>>> end remove registry to admin <<<<<<<<");
_logger.LogInformation("remove registry last result:{0}", removeRet?.Code);
_logger.LogInformation(">>>>>>>> end remove registry to admin <<<<<<<<");
}
}
}

@ -26,48 +26,49 @@ namespace DotXxlJob.Core.Queue
public CallbackTaskQueue(AdminClient adminClient,IJobLogger jobLogger,IOptions<XxlJobExecutorOptions> optionsAccessor
, ILoggerFactory loggerFactory)
{
this._adminClient = adminClient;
this._jobLogger = jobLogger;
_adminClient = adminClient;
_jobLogger = jobLogger;
this._retryQueue = new RetryCallbackTaskQueue(optionsAccessor.Value.LogPath,
_retryQueue = new RetryCallbackTaskQueue(optionsAccessor.Value.LogPath,
Push,
loggerFactory.CreateLogger<RetryCallbackTaskQueue>());
this._logger = loggerFactory.CreateLogger<CallbackTaskQueue>();
_logger = loggerFactory.CreateLogger<CallbackTaskQueue>();
}
public void Push(HandleCallbackParam callbackParam)
{
this.taskQueue.Enqueue(callbackParam);
taskQueue.Enqueue(callbackParam);
StartCallBack();
}
public void Dispose()
{
this._stop = true;
this._retryQueue.Dispose();
this._runTask?.GetAwaiter().GetResult();
_stop = true;
_retryQueue.Dispose();
_runTask?.GetAwaiter().GetResult();
}
private void StartCallBack()
{
if ( this._isRunning)
if ( _isRunning)
{
return;
}
this._runTask = Task.Run(async () =>
_runTask = Task.Run(async () =>
{
this._logger.LogDebug("start to callback");
this._isRunning = true;
while (!this._stop)
_logger.LogDebug("start to callback");
_isRunning = true;
while (!_stop)
{
await DoCallBack();
await DoCallBack();
await Task.Delay(TimeSpan.FromSeconds(3));
}
this._logger.LogDebug("end to callback");
this._isRunning = false;
_logger.LogDebug("end to callback");
_isRunning = false;
});
}
@ -76,12 +77,12 @@ namespace DotXxlJob.Core.Queue
{
List<HandleCallbackParam> list = new List<HandleCallbackParam>();
while (list.Count < Constants.MaxCallbackRecordsPerRequest && this.taskQueue.TryDequeue(out var item))
while (list.Count < Constants.MaxCallbackRecordsPerRequest && taskQueue.TryDequeue(out var item))
{
list.Add(item);
}
if (!list.Any())
if (list.Count == 0)
{
return;
}
@ -92,9 +93,9 @@ namespace DotXxlJob.Core.Queue
result = await _adminClient.Callback(list);
}
catch (Exception ex){
this._logger.LogError(ex,"trigger callback error:{error}",ex.Message);
_logger.LogError(ex,"trigger callback error:{error}",ex.Message);
result = ReturnT.Failed(ex.Message);
this._retryQueue.Push(list);
_retryQueue.Push(list);
}
LogCallBackResult(result, list);
@ -104,7 +105,7 @@ namespace DotXxlJob.Core.Queue
{
foreach (var param in list)
{
this._jobLogger.LogSpecialFile(param.LogDateTime, param.LogId, result.Msg??"Success");
_jobLogger.LogSpecialFile(param.LogDateTime, param.LogId, result.Msg??"Success");
}
}

@ -52,7 +52,7 @@ namespace DotXxlJob.Core
{
if(!ID_IN_QUEUE.TryAdd(triggerParam.LogId,0))
{
this._logger.LogWarning("repeat job task,logId={logId},jobId={jobId}",triggerParam.LogId,triggerParam.JobId);
_logger.LogWarning("repeat job task,logId={logId},jobId={jobId}",triggerParam.LogId,triggerParam.JobId);
return ReturnT.Failed("repeat job task!");
}
@ -65,12 +65,12 @@ namespace DotXxlJob.Core
public void Stop()
{
this._cancellationTokenSource?.Cancel();
this._cancellationTokenSource?.Dispose();
this._cancellationTokenSource = null;
_cancellationTokenSource?.Cancel();
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;
//wait for task completed
this._runTask?.GetAwaiter().GetResult();
_runTask?.GetAwaiter().GetResult();
}
public void Dispose()
@ -85,14 +85,14 @@ namespace DotXxlJob.Core
private void StartTask()
{
if (this._cancellationTokenSource != null )
if (_cancellationTokenSource != null )
{
return; //running
}
this._cancellationTokenSource = new CancellationTokenSource();
var ct = this._cancellationTokenSource.Token;
_cancellationTokenSource = new CancellationTokenSource();
var ct = _cancellationTokenSource.Token;
this._runTask = Task.Factory.StartNew(async () =>
_runTask = Task.Factory.StartNew(async () =>
{
//ct.ThrowIfCancellationRequested();
@ -101,6 +101,7 @@ namespace DotXxlJob.Core
{
if (TASK_QUEUE.IsEmpty)
{
//_logger.LogInformation("task queue is empty!");
break;
}
@ -113,27 +114,27 @@ namespace DotXxlJob.Core
{
if (!ID_IN_QUEUE.TryRemove(triggerParam.LogId,out _))
{
this._logger.LogWarning("remove queue failed,logId={logId},jobId={jobId},exists={exists}"
_logger.LogWarning("remove queue failed,logId={logId},jobId={jobId},exists={exists}"
,triggerParam.LogId,triggerParam.JobId,ID_IN_QUEUE.ContainsKey(triggerParam.LogId));
}
//set log file;
this._jobLogger.SetLogFile(triggerParam.LogDateTime,triggerParam.LogId);
_jobLogger.SetLogFile(triggerParam.LogDateTime,triggerParam.LogId);
this._jobLogger.Log("<br>----------- xxl-job job execute start -----------<br>----------- Param:{0}" ,triggerParam.ExecutorParams);
_jobLogger.Log("<br>----------- xxl-job job execute start -----------<br>----------- Param:{0}" ,triggerParam.ExecutorParams);
result = await this._executor.Execute(triggerParam);
result = await _executor.Execute(triggerParam);
this._jobLogger.Log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + result.Code);
_jobLogger.Log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + result.Code);
}
else
{
this._logger.LogWarning("Dequeue Task Failed");
_logger.LogWarning("Dequeue Task Failed");
}
}
catch (Exception ex)
{
result = ReturnT.Failed("Dequeue Task Failed:"+ex.Message);
this._jobLogger.Log("<br>----------- JobThread Exception:" + ex.Message + "<br>----------- xxl-job job execute end(error) -----------");
_jobLogger.Log("<br>----------- JobThread Exception:" + ex.Message + "<br>----------- xxl-job job execute end(error) -----------");
}
if(triggerParam !=null)
@ -144,10 +145,10 @@ namespace DotXxlJob.Core
}
this._cancellationTokenSource.Dispose();
this._cancellationTokenSource = null;
_cancellationTokenSource.Dispose();
_cancellationTokenSource = null;
}, this._cancellationTokenSource.Token);
}, _cancellationTokenSource.Token);
}

@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@ -23,9 +22,9 @@ namespace DotXxlJob.Core.Queue
public RetryCallbackTaskQueue(string backupPath,Action<HandleCallbackParam> actionDoCallback,ILogger<RetryCallbackTaskQueue> logger)
{
this._actionDoCallback = actionDoCallback;
this._logger = logger;
this._backupFile = Path.Combine(backupPath, Constants.XxlJobRetryLogsFile);
_actionDoCallback = actionDoCallback;
_logger = logger;
_backupFile = Path.Combine(backupPath, Constants.XxlJobRetryLogsFile);
var dir = Path.GetDirectoryName(backupPath);
if (!Directory.Exists(dir))
{
@ -37,9 +36,9 @@ namespace DotXxlJob.Core.Queue
private void StartQueue()
{
this._cancellation = new CancellationTokenSource();
_cancellation = new CancellationTokenSource();
var stopToken = this._cancellation.Token;
this._runTask = Task.Factory.StartNew(async () =>
_runTask = Task.Factory.StartNew(async () =>
{
while (!stopToken.IsCancellationRequested)
{
@ -54,7 +53,7 @@ namespace DotXxlJob.Core.Queue
{
var list = new List<HandleCallbackParam>();
if (!File.Exists(this._backupFile))
if (!File.Exists(_backupFile))
{
return;
}
@ -70,7 +69,7 @@ namespace DotXxlJob.Core.Queue
}
catch(Exception ex)
{
this._logger.LogError(ex,"read backup file error:{error}",ex.Message);
_logger.LogError(ex,"read backup file error:{error}",ex.Message);
}
}
@ -83,13 +82,13 @@ namespace DotXxlJob.Core.Queue
catch (Exception ex)
{
this._logger.LogError(ex, "delete backup file error:{error}", ex.Message);
_logger.LogError(ex, "delete backup file error:{error}", ex.Message);
}
if (list.Any())
if (list.Count > 0)
{
foreach (var item in list)
{
this._actionDoCallback(item);
_actionDoCallback(item);
}
}

Loading…
Cancel
Save