diff --git a/src/DotXxlJob.Core/ExecutorRegistry.cs b/src/DotXxlJob.Core/ExecutorRegistry.cs
index 7240d76..84398cd 100644
--- a/src/DotXxlJob.Core/ExecutorRegistry.cs
+++ b/src/DotXxlJob.Core/ExecutorRegistry.cs
@@ -11,67 +11,65 @@ namespace DotXxlJob.Core
///
/// 执行器注册注册
///
- public class ExecutorRegistry:IExecutorRegistry
+ public class ExecutorRegistry : IExecutorRegistry
{
private readonly AdminClient _adminClient;
private readonly XxlJobExecutorOptions _options;
private readonly ILogger _logger;
- public ExecutorRegistry(AdminClient adminClient,IOptions optionsAccessor,ILogger logger)
+ public ExecutorRegistry(AdminClient adminClient, IOptions optionsAccessor, ILogger logger)
{
Preconditions.CheckNotNull(optionsAccessor, "XxlJobExecutorOptions");
Preconditions.CheckNotNull(optionsAccessor.Value, "XxlJobExecutorOptions");
- this._adminClient = adminClient;
- this._options = optionsAccessor.Value;
- if (string.IsNullOrEmpty(this._options.SpecialBindAddress))
+ _adminClient = adminClient;
+ _options = optionsAccessor.Value;
+ if (string.IsNullOrEmpty(_options.SpecialBindAddress))
{
- this._options.SpecialBindAddress = IPUtility.GetLocalIntranetIP().MapToIPv4().ToString();
+ _options.SpecialBindAddress = IPUtility.GetLocalIntranetIP().MapToIPv4().ToString();
}
- this._logger = logger;
+ _logger = logger;
}
-
+
public async Task RegistryAsync(CancellationToken cancellationToken)
{
var registryParam = new RegistryParam {
RegistryGroup = "EXECUTOR",
- RegistryKey = this._options.AppName,
- RegistryValue = $"{this._options.SpecialBindAddress}:{this._options.Port}"
+ RegistryKey = _options.AppName,
+ RegistryValue = $"{_options.SpecialBindAddress}:{_options.Port}"
};
- this._logger.LogInformation(">>>>>>>> start registry to admin <<<<<<<<");
+ _logger.LogInformation(">>>>>>>> start registry to admin <<<<<<<<");
var errorTimes = 0;
-
+
while (!cancellationToken.IsCancellationRequested)
{
try
{
- var ret = await this._adminClient.Registry(registryParam);
- this._logger.LogDebug("registry last result:{0}", ret?.Code);
+ var ret = await _adminClient.Registry(registryParam);
+ _logger.LogDebug("registry last result:{0}", ret?.Code);
errorTimes = 0;
await Task.Delay(Constants.RegistryInterval, cancellationToken);
}
catch (TaskCanceledException)
{
- this._logger.LogInformation(">>>>> Application Stopping....<<<<<");
+ _logger.LogInformation(">>>>> Application Stopping....<<<<<");
}
catch (Exception ex)
{
errorTimes++;
await Task.Delay(Constants.RegistryInterval, cancellationToken);
- this._logger.LogError(ex,"registry error:{0},{1} Times",ex.Message,errorTimes);
+ _logger.LogError(ex, "registry error:{0},{1} Times", ex.Message, errorTimes);
}
- }
-
- this._logger.LogInformation(">>>>>>>> end registry to admin <<<<<<<<");
-
- this._logger.LogInformation(">>>>>>>> start remove registry to admin <<<<<<<<");
-
+ }
+
+ _logger.LogInformation(">>>>>>>> end registry to admin <<<<<<<<");
+
+ _logger.LogInformation(">>>>>>>> start remove registry to admin <<<<<<<<");
+
var removeRet = await this._adminClient.RegistryRemove(registryParam);
- this._logger.LogInformation("remove registry last result:{0}",removeRet?.Code);
- this._logger.LogInformation(">>>>>>>> end remove registry to admin <<<<<<<<");
+ _logger.LogInformation("remove registry last result:{0}", removeRet?.Code);
+ _logger.LogInformation(">>>>>>>> end remove registry to admin <<<<<<<<");
}
-
-
}
}
\ No newline at end of file
diff --git a/src/DotXxlJob.Core/Queue/CallbackTaskQueue.cs b/src/DotXxlJob.Core/Queue/CallbackTaskQueue.cs
index fb16d7c..16caa75 100644
--- a/src/DotXxlJob.Core/Queue/CallbackTaskQueue.cs
+++ b/src/DotXxlJob.Core/Queue/CallbackTaskQueue.cs
@@ -26,48 +26,49 @@ namespace DotXxlJob.Core.Queue
public CallbackTaskQueue(AdminClient adminClient,IJobLogger jobLogger,IOptions optionsAccessor
, ILoggerFactory loggerFactory)
{
- this._adminClient = adminClient;
- this._jobLogger = jobLogger;
+ _adminClient = adminClient;
+ _jobLogger = jobLogger;
- this._retryQueue = new RetryCallbackTaskQueue(optionsAccessor.Value.LogPath,
+ _retryQueue = new RetryCallbackTaskQueue(optionsAccessor.Value.LogPath,
Push,
loggerFactory.CreateLogger());
- this._logger = loggerFactory.CreateLogger();
+ _logger = loggerFactory.CreateLogger();
}
public void Push(HandleCallbackParam callbackParam)
{
- this.taskQueue.Enqueue(callbackParam);
+ taskQueue.Enqueue(callbackParam);
StartCallBack();
}
public void Dispose()
{
- this._stop = true;
- this._retryQueue.Dispose();
- this._runTask?.GetAwaiter().GetResult();
+ _stop = true;
+ _retryQueue.Dispose();
+ _runTask?.GetAwaiter().GetResult();
}
private void StartCallBack()
{
- if ( this._isRunning)
+ if ( _isRunning)
{
return;
}
- this._runTask = Task.Run(async () =>
+ _runTask = Task.Run(async () =>
{
- this._logger.LogDebug("start to callback");
- this._isRunning = true;
- while (!this._stop)
+ _logger.LogDebug("start to callback");
+ _isRunning = true;
+ while (!_stop)
{
- await DoCallBack();
+ await DoCallBack();
+ await Task.Delay(TimeSpan.FromSeconds(3));
}
- this._logger.LogDebug("end to callback");
- this._isRunning = false;
+ _logger.LogDebug("end to callback");
+ _isRunning = false;
});
}
@@ -76,12 +77,12 @@ namespace DotXxlJob.Core.Queue
{
List list = new List();
- while (list.Count < Constants.MaxCallbackRecordsPerRequest && this.taskQueue.TryDequeue(out var item))
+ while (list.Count < Constants.MaxCallbackRecordsPerRequest && taskQueue.TryDequeue(out var item))
{
list.Add(item);
}
- if (!list.Any())
+ if (list.Count == 0)
{
return;
}
@@ -92,9 +93,9 @@ namespace DotXxlJob.Core.Queue
result = await _adminClient.Callback(list);
}
catch (Exception ex){
- this._logger.LogError(ex,"trigger callback error:{error}",ex.Message);
+ _logger.LogError(ex,"trigger callback error:{error}",ex.Message);
result = ReturnT.Failed(ex.Message);
- this._retryQueue.Push(list);
+ _retryQueue.Push(list);
}
LogCallBackResult(result, list);
@@ -104,7 +105,7 @@ namespace DotXxlJob.Core.Queue
{
foreach (var param in list)
{
- this._jobLogger.LogSpecialFile(param.LogDateTime, param.LogId, result.Msg??"Success");
+ _jobLogger.LogSpecialFile(param.LogDateTime, param.LogId, result.Msg??"Success");
}
}
diff --git a/src/DotXxlJob.Core/Queue/JobTaskQueue.cs b/src/DotXxlJob.Core/Queue/JobTaskQueue.cs
index 8395f9a..5453918 100644
--- a/src/DotXxlJob.Core/Queue/JobTaskQueue.cs
+++ b/src/DotXxlJob.Core/Queue/JobTaskQueue.cs
@@ -52,7 +52,7 @@ namespace DotXxlJob.Core
{
if(!ID_IN_QUEUE.TryAdd(triggerParam.LogId,0))
{
- this._logger.LogWarning("repeat job task,logId={logId},jobId={jobId}",triggerParam.LogId,triggerParam.JobId);
+ _logger.LogWarning("repeat job task,logId={logId},jobId={jobId}",triggerParam.LogId,triggerParam.JobId);
return ReturnT.Failed("repeat job task!");
}
@@ -65,12 +65,12 @@ namespace DotXxlJob.Core
public void Stop()
{
- this._cancellationTokenSource?.Cancel();
- this._cancellationTokenSource?.Dispose();
- this._cancellationTokenSource = null;
+ _cancellationTokenSource?.Cancel();
+ _cancellationTokenSource?.Dispose();
+ _cancellationTokenSource = null;
//wait for task completed
- this._runTask?.GetAwaiter().GetResult();
+ _runTask?.GetAwaiter().GetResult();
}
public void Dispose()
@@ -85,14 +85,14 @@ namespace DotXxlJob.Core
private void StartTask()
{
- if (this._cancellationTokenSource != null )
+ if (_cancellationTokenSource != null )
{
return; //running
}
- this._cancellationTokenSource = new CancellationTokenSource();
- var ct = this._cancellationTokenSource.Token;
+ _cancellationTokenSource = new CancellationTokenSource();
+ var ct = _cancellationTokenSource.Token;
- this._runTask = Task.Factory.StartNew(async () =>
+ _runTask = Task.Factory.StartNew(async () =>
{
//ct.ThrowIfCancellationRequested();
@@ -101,6 +101,7 @@ namespace DotXxlJob.Core
{
if (TASK_QUEUE.IsEmpty)
{
+ //_logger.LogInformation("task queue is empty!");
break;
}
@@ -113,27 +114,27 @@ namespace DotXxlJob.Core
{
if (!ID_IN_QUEUE.TryRemove(triggerParam.LogId,out _))
{
- this._logger.LogWarning("remove queue failed,logId={logId},jobId={jobId},exists={exists}"
+ _logger.LogWarning("remove queue failed,logId={logId},jobId={jobId},exists={exists}"
,triggerParam.LogId,triggerParam.JobId,ID_IN_QUEUE.ContainsKey(triggerParam.LogId));
}
//set log file;
- this._jobLogger.SetLogFile(triggerParam.LogDateTime,triggerParam.LogId);
+ _jobLogger.SetLogFile(triggerParam.LogDateTime,triggerParam.LogId);
- this._jobLogger.Log("
----------- xxl-job job execute start -----------
----------- Param:{0}" ,triggerParam.ExecutorParams);
+ _jobLogger.Log("
----------- xxl-job job execute start -----------
----------- Param:{0}" ,triggerParam.ExecutorParams);
- result = await this._executor.Execute(triggerParam);
+ result = await _executor.Execute(triggerParam);
- this._jobLogger.Log("
----------- xxl-job job execute end(finish) -----------
----------- ReturnT:" + result.Code);
+ _jobLogger.Log("
----------- xxl-job job execute end(finish) -----------
----------- ReturnT:" + result.Code);
}
else
{
- this._logger.LogWarning("Dequeue Task Failed");
+ _logger.LogWarning("Dequeue Task Failed");
}
}
catch (Exception ex)
{
result = ReturnT.Failed("Dequeue Task Failed:"+ex.Message);
- this._jobLogger.Log("
----------- JobThread Exception:" + ex.Message + "
----------- xxl-job job execute end(error) -----------");
+ _jobLogger.Log("
----------- JobThread Exception:" + ex.Message + "
----------- xxl-job job execute end(error) -----------");
}
if(triggerParam !=null)
@@ -144,10 +145,10 @@ namespace DotXxlJob.Core
}
- this._cancellationTokenSource.Dispose();
- this._cancellationTokenSource = null;
+ _cancellationTokenSource.Dispose();
+ _cancellationTokenSource = null;
- }, this._cancellationTokenSource.Token);
+ }, _cancellationTokenSource.Token);
}
diff --git a/src/DotXxlJob.Core/Queue/RetryCallbackTaskQueue.cs b/src/DotXxlJob.Core/Queue/RetryCallbackTaskQueue.cs
index e7a2ec4..5a7bd67 100644
--- a/src/DotXxlJob.Core/Queue/RetryCallbackTaskQueue.cs
+++ b/src/DotXxlJob.Core/Queue/RetryCallbackTaskQueue.cs
@@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.IO;
-using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@@ -23,9 +22,9 @@ namespace DotXxlJob.Core.Queue
public RetryCallbackTaskQueue(string backupPath,Action actionDoCallback,ILogger logger)
{
- this._actionDoCallback = actionDoCallback;
- this._logger = logger;
- this._backupFile = Path.Combine(backupPath, Constants.XxlJobRetryLogsFile);
+ _actionDoCallback = actionDoCallback;
+ _logger = logger;
+ _backupFile = Path.Combine(backupPath, Constants.XxlJobRetryLogsFile);
var dir = Path.GetDirectoryName(backupPath);
if (!Directory.Exists(dir))
{
@@ -37,9 +36,9 @@ namespace DotXxlJob.Core.Queue
private void StartQueue()
{
- this._cancellation = new CancellationTokenSource();
+ _cancellation = new CancellationTokenSource();
var stopToken = this._cancellation.Token;
- this._runTask = Task.Factory.StartNew(async () =>
+ _runTask = Task.Factory.StartNew(async () =>
{
while (!stopToken.IsCancellationRequested)
{
@@ -54,7 +53,7 @@ namespace DotXxlJob.Core.Queue
{
var list = new List();
- if (!File.Exists(this._backupFile))
+ if (!File.Exists(_backupFile))
{
return;
}
@@ -70,7 +69,7 @@ namespace DotXxlJob.Core.Queue
}
catch(Exception ex)
{
- this._logger.LogError(ex,"read backup file error:{error}",ex.Message);
+ _logger.LogError(ex,"read backup file error:{error}",ex.Message);
}
}
@@ -83,13 +82,13 @@ namespace DotXxlJob.Core.Queue
catch (Exception ex)
{
- this._logger.LogError(ex, "delete backup file error:{error}", ex.Message);
+ _logger.LogError(ex, "delete backup file error:{error}", ex.Message);
}
- if (list.Any())
+ if (list.Count > 0)
{
foreach (var item in list)
{
- this._actionDoCallback(item);
+ _actionDoCallback(item);
}
}