From 6ba9658aa390f0a042905100af0f6ea72e72a0b7 Mon Sep 17 00:00:00 2001 From: wangshaoming Date: Tue, 9 Apr 2019 13:15:55 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=9B=9E=E8=B0=83=E9=98=9F?= =?UTF-8?q?=E5=88=97=E7=9A=84=E5=BB=B6=E8=BF=9F=EF=BC=8C=E9=81=BF=E5=85=8D?= =?UTF-8?q?=E9=95=BF=E6=9C=9F=E5=8D=A0=E7=94=A8CPU?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/DotXxlJob.Core/ExecutorRegistry.cs | 50 +++++++++---------- src/DotXxlJob.Core/Queue/CallbackTaskQueue.cs | 43 ++++++++-------- src/DotXxlJob.Core/Queue/JobTaskQueue.cs | 39 ++++++++------- .../Queue/RetryCallbackTaskQueue.cs | 21 ++++---- 4 files changed, 76 insertions(+), 77 deletions(-) diff --git a/src/DotXxlJob.Core/ExecutorRegistry.cs b/src/DotXxlJob.Core/ExecutorRegistry.cs index 7240d76..84398cd 100644 --- a/src/DotXxlJob.Core/ExecutorRegistry.cs +++ b/src/DotXxlJob.Core/ExecutorRegistry.cs @@ -11,67 +11,65 @@ namespace DotXxlJob.Core /// /// 执行器注册注册 /// - public class ExecutorRegistry:IExecutorRegistry + public class ExecutorRegistry : IExecutorRegistry { private readonly AdminClient _adminClient; private readonly XxlJobExecutorOptions _options; private readonly ILogger _logger; - public ExecutorRegistry(AdminClient adminClient,IOptions optionsAccessor,ILogger logger) + public ExecutorRegistry(AdminClient adminClient, IOptions optionsAccessor, ILogger logger) { Preconditions.CheckNotNull(optionsAccessor, "XxlJobExecutorOptions"); Preconditions.CheckNotNull(optionsAccessor.Value, "XxlJobExecutorOptions"); - this._adminClient = adminClient; - this._options = optionsAccessor.Value; - if (string.IsNullOrEmpty(this._options.SpecialBindAddress)) + _adminClient = adminClient; + _options = optionsAccessor.Value; + if (string.IsNullOrEmpty(_options.SpecialBindAddress)) { - this._options.SpecialBindAddress = IPUtility.GetLocalIntranetIP().MapToIPv4().ToString(); + _options.SpecialBindAddress = IPUtility.GetLocalIntranetIP().MapToIPv4().ToString(); } - this._logger = logger; + _logger = logger; } - + public async Task RegistryAsync(CancellationToken cancellationToken) { var registryParam = new RegistryParam { RegistryGroup = "EXECUTOR", - RegistryKey = this._options.AppName, - RegistryValue = $"{this._options.SpecialBindAddress}:{this._options.Port}" + RegistryKey = _options.AppName, + RegistryValue = $"{_options.SpecialBindAddress}:{_options.Port}" }; - this._logger.LogInformation(">>>>>>>> start registry to admin <<<<<<<<"); + _logger.LogInformation(">>>>>>>> start registry to admin <<<<<<<<"); var errorTimes = 0; - + while (!cancellationToken.IsCancellationRequested) { try { - var ret = await this._adminClient.Registry(registryParam); - this._logger.LogDebug("registry last result:{0}", ret?.Code); + var ret = await _adminClient.Registry(registryParam); + _logger.LogDebug("registry last result:{0}", ret?.Code); errorTimes = 0; await Task.Delay(Constants.RegistryInterval, cancellationToken); } catch (TaskCanceledException) { - this._logger.LogInformation(">>>>> Application Stopping....<<<<<"); + _logger.LogInformation(">>>>> Application Stopping....<<<<<"); } catch (Exception ex) { errorTimes++; await Task.Delay(Constants.RegistryInterval, cancellationToken); - this._logger.LogError(ex,"registry error:{0},{1} Times",ex.Message,errorTimes); + _logger.LogError(ex, "registry error:{0},{1} Times", ex.Message, errorTimes); } - } - - this._logger.LogInformation(">>>>>>>> end registry to admin <<<<<<<<"); - - this._logger.LogInformation(">>>>>>>> start remove registry to admin <<<<<<<<"); - + } + + _logger.LogInformation(">>>>>>>> end registry to admin <<<<<<<<"); + + _logger.LogInformation(">>>>>>>> start remove registry to admin <<<<<<<<"); + var removeRet = await this._adminClient.RegistryRemove(registryParam); - this._logger.LogInformation("remove registry last result:{0}",removeRet?.Code); - this._logger.LogInformation(">>>>>>>> end remove registry to admin <<<<<<<<"); + _logger.LogInformation("remove registry last result:{0}", removeRet?.Code); + _logger.LogInformation(">>>>>>>> end remove registry to admin <<<<<<<<"); } - - } } \ No newline at end of file diff --git a/src/DotXxlJob.Core/Queue/CallbackTaskQueue.cs b/src/DotXxlJob.Core/Queue/CallbackTaskQueue.cs index fb16d7c..16caa75 100644 --- a/src/DotXxlJob.Core/Queue/CallbackTaskQueue.cs +++ b/src/DotXxlJob.Core/Queue/CallbackTaskQueue.cs @@ -26,48 +26,49 @@ namespace DotXxlJob.Core.Queue public CallbackTaskQueue(AdminClient adminClient,IJobLogger jobLogger,IOptions optionsAccessor , ILoggerFactory loggerFactory) { - this._adminClient = adminClient; - this._jobLogger = jobLogger; + _adminClient = adminClient; + _jobLogger = jobLogger; - this._retryQueue = new RetryCallbackTaskQueue(optionsAccessor.Value.LogPath, + _retryQueue = new RetryCallbackTaskQueue(optionsAccessor.Value.LogPath, Push, loggerFactory.CreateLogger()); - this._logger = loggerFactory.CreateLogger(); + _logger = loggerFactory.CreateLogger(); } public void Push(HandleCallbackParam callbackParam) { - this.taskQueue.Enqueue(callbackParam); + taskQueue.Enqueue(callbackParam); StartCallBack(); } public void Dispose() { - this._stop = true; - this._retryQueue.Dispose(); - this._runTask?.GetAwaiter().GetResult(); + _stop = true; + _retryQueue.Dispose(); + _runTask?.GetAwaiter().GetResult(); } private void StartCallBack() { - if ( this._isRunning) + if ( _isRunning) { return; } - this._runTask = Task.Run(async () => + _runTask = Task.Run(async () => { - this._logger.LogDebug("start to callback"); - this._isRunning = true; - while (!this._stop) + _logger.LogDebug("start to callback"); + _isRunning = true; + while (!_stop) { - await DoCallBack(); + await DoCallBack(); + await Task.Delay(TimeSpan.FromSeconds(3)); } - this._logger.LogDebug("end to callback"); - this._isRunning = false; + _logger.LogDebug("end to callback"); + _isRunning = false; }); } @@ -76,12 +77,12 @@ namespace DotXxlJob.Core.Queue { List list = new List(); - while (list.Count < Constants.MaxCallbackRecordsPerRequest && this.taskQueue.TryDequeue(out var item)) + while (list.Count < Constants.MaxCallbackRecordsPerRequest && taskQueue.TryDequeue(out var item)) { list.Add(item); } - if (!list.Any()) + if (list.Count == 0) { return; } @@ -92,9 +93,9 @@ namespace DotXxlJob.Core.Queue result = await _adminClient.Callback(list); } catch (Exception ex){ - this._logger.LogError(ex,"trigger callback error:{error}",ex.Message); + _logger.LogError(ex,"trigger callback error:{error}",ex.Message); result = ReturnT.Failed(ex.Message); - this._retryQueue.Push(list); + _retryQueue.Push(list); } LogCallBackResult(result, list); @@ -104,7 +105,7 @@ namespace DotXxlJob.Core.Queue { foreach (var param in list) { - this._jobLogger.LogSpecialFile(param.LogDateTime, param.LogId, result.Msg??"Success"); + _jobLogger.LogSpecialFile(param.LogDateTime, param.LogId, result.Msg??"Success"); } } diff --git a/src/DotXxlJob.Core/Queue/JobTaskQueue.cs b/src/DotXxlJob.Core/Queue/JobTaskQueue.cs index 8395f9a..5453918 100644 --- a/src/DotXxlJob.Core/Queue/JobTaskQueue.cs +++ b/src/DotXxlJob.Core/Queue/JobTaskQueue.cs @@ -52,7 +52,7 @@ namespace DotXxlJob.Core { if(!ID_IN_QUEUE.TryAdd(triggerParam.LogId,0)) { - this._logger.LogWarning("repeat job task,logId={logId},jobId={jobId}",triggerParam.LogId,triggerParam.JobId); + _logger.LogWarning("repeat job task,logId={logId},jobId={jobId}",triggerParam.LogId,triggerParam.JobId); return ReturnT.Failed("repeat job task!"); } @@ -65,12 +65,12 @@ namespace DotXxlJob.Core public void Stop() { - this._cancellationTokenSource?.Cancel(); - this._cancellationTokenSource?.Dispose(); - this._cancellationTokenSource = null; + _cancellationTokenSource?.Cancel(); + _cancellationTokenSource?.Dispose(); + _cancellationTokenSource = null; //wait for task completed - this._runTask?.GetAwaiter().GetResult(); + _runTask?.GetAwaiter().GetResult(); } public void Dispose() @@ -85,14 +85,14 @@ namespace DotXxlJob.Core private void StartTask() { - if (this._cancellationTokenSource != null ) + if (_cancellationTokenSource != null ) { return; //running } - this._cancellationTokenSource = new CancellationTokenSource(); - var ct = this._cancellationTokenSource.Token; + _cancellationTokenSource = new CancellationTokenSource(); + var ct = _cancellationTokenSource.Token; - this._runTask = Task.Factory.StartNew(async () => + _runTask = Task.Factory.StartNew(async () => { //ct.ThrowIfCancellationRequested(); @@ -101,6 +101,7 @@ namespace DotXxlJob.Core { if (TASK_QUEUE.IsEmpty) { + //_logger.LogInformation("task queue is empty!"); break; } @@ -113,27 +114,27 @@ namespace DotXxlJob.Core { if (!ID_IN_QUEUE.TryRemove(triggerParam.LogId,out _)) { - this._logger.LogWarning("remove queue failed,logId={logId},jobId={jobId},exists={exists}" + _logger.LogWarning("remove queue failed,logId={logId},jobId={jobId},exists={exists}" ,triggerParam.LogId,triggerParam.JobId,ID_IN_QUEUE.ContainsKey(triggerParam.LogId)); } //set log file; - this._jobLogger.SetLogFile(triggerParam.LogDateTime,triggerParam.LogId); + _jobLogger.SetLogFile(triggerParam.LogDateTime,triggerParam.LogId); - this._jobLogger.Log("
----------- xxl-job job execute start -----------
----------- Param:{0}" ,triggerParam.ExecutorParams); + _jobLogger.Log("
----------- xxl-job job execute start -----------
----------- Param:{0}" ,triggerParam.ExecutorParams); - result = await this._executor.Execute(triggerParam); + result = await _executor.Execute(triggerParam); - this._jobLogger.Log("
----------- xxl-job job execute end(finish) -----------
----------- ReturnT:" + result.Code); + _jobLogger.Log("
----------- xxl-job job execute end(finish) -----------
----------- ReturnT:" + result.Code); } else { - this._logger.LogWarning("Dequeue Task Failed"); + _logger.LogWarning("Dequeue Task Failed"); } } catch (Exception ex) { result = ReturnT.Failed("Dequeue Task Failed:"+ex.Message); - this._jobLogger.Log("
----------- JobThread Exception:" + ex.Message + "
----------- xxl-job job execute end(error) -----------"); + _jobLogger.Log("
----------- JobThread Exception:" + ex.Message + "
----------- xxl-job job execute end(error) -----------"); } if(triggerParam !=null) @@ -144,10 +145,10 @@ namespace DotXxlJob.Core } - this._cancellationTokenSource.Dispose(); - this._cancellationTokenSource = null; + _cancellationTokenSource.Dispose(); + _cancellationTokenSource = null; - }, this._cancellationTokenSource.Token); + }, _cancellationTokenSource.Token); } diff --git a/src/DotXxlJob.Core/Queue/RetryCallbackTaskQueue.cs b/src/DotXxlJob.Core/Queue/RetryCallbackTaskQueue.cs index e7a2ec4..5a7bd67 100644 --- a/src/DotXxlJob.Core/Queue/RetryCallbackTaskQueue.cs +++ b/src/DotXxlJob.Core/Queue/RetryCallbackTaskQueue.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Generic; using System.IO; -using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -23,9 +22,9 @@ namespace DotXxlJob.Core.Queue public RetryCallbackTaskQueue(string backupPath,Action actionDoCallback,ILogger logger) { - this._actionDoCallback = actionDoCallback; - this._logger = logger; - this._backupFile = Path.Combine(backupPath, Constants.XxlJobRetryLogsFile); + _actionDoCallback = actionDoCallback; + _logger = logger; + _backupFile = Path.Combine(backupPath, Constants.XxlJobRetryLogsFile); var dir = Path.GetDirectoryName(backupPath); if (!Directory.Exists(dir)) { @@ -37,9 +36,9 @@ namespace DotXxlJob.Core.Queue private void StartQueue() { - this._cancellation = new CancellationTokenSource(); + _cancellation = new CancellationTokenSource(); var stopToken = this._cancellation.Token; - this._runTask = Task.Factory.StartNew(async () => + _runTask = Task.Factory.StartNew(async () => { while (!stopToken.IsCancellationRequested) { @@ -54,7 +53,7 @@ namespace DotXxlJob.Core.Queue { var list = new List(); - if (!File.Exists(this._backupFile)) + if (!File.Exists(_backupFile)) { return; } @@ -70,7 +69,7 @@ namespace DotXxlJob.Core.Queue } catch(Exception ex) { - this._logger.LogError(ex,"read backup file error:{error}",ex.Message); + _logger.LogError(ex,"read backup file error:{error}",ex.Message); } } @@ -83,13 +82,13 @@ namespace DotXxlJob.Core.Queue catch (Exception ex) { - this._logger.LogError(ex, "delete backup file error:{error}", ex.Message); + _logger.LogError(ex, "delete backup file error:{error}", ex.Message); } - if (list.Any()) + if (list.Count > 0) { foreach (var item in list) { - this._actionDoCallback(item); + _actionDoCallback(item); } }