diff --git a/samples/ASPNetCoreExecutor/ASPNetCoreExecutor.csproj b/samples/ASPNetCoreExecutor/ASPNetCoreExecutor.csproj index 99a0127..91cf196 100644 --- a/samples/ASPNetCoreExecutor/ASPNetCoreExecutor.csproj +++ b/samples/ASPNetCoreExecutor/ASPNetCoreExecutor.csproj @@ -5,7 +5,10 @@ - + + + 2.2.0 + diff --git a/samples/ASPNetCoreExecutor/ApplicationBuilderExtensions.cs b/samples/ASPNetCoreExecutor/Extensions/ApplicationBuilderExtensions.cs similarity index 100% rename from samples/ASPNetCoreExecutor/ApplicationBuilderExtensions.cs rename to samples/ASPNetCoreExecutor/Extensions/ApplicationBuilderExtensions.cs diff --git a/samples/ASPNetCoreExecutor/XxlJobExecutorMiddleware.cs b/samples/ASPNetCoreExecutor/Extensions/XxlJobExecutorMiddleware.cs similarity index 100% rename from samples/ASPNetCoreExecutor/XxlJobExecutorMiddleware.cs rename to samples/ASPNetCoreExecutor/Extensions/XxlJobExecutorMiddleware.cs diff --git a/samples/ASPNetCoreExecutor/Startup.cs b/samples/ASPNetCoreExecutor/Startup.cs index dd413a2..240ea8b 100644 --- a/samples/ASPNetCoreExecutor/Startup.cs +++ b/samples/ASPNetCoreExecutor/Startup.cs @@ -1,22 +1,34 @@ using DotXxlJob.Core; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; namespace ASPNetCoreExecutor { public class Startup { + public Startup(IConfiguration configuration) + { + Configuration = configuration; + } + + private IConfiguration Configuration { get; } + // This method gets called by the runtime. Use this method to add services to the container. // For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940 public void ConfigureServices(IServiceCollection services) { - services.AddXxlJobExecutor(); + + services.AddXxlJobExecutor(Configuration); } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. - public void Configure(IApplicationBuilder app, IHostingEnvironment env) + public void Configure(IApplicationBuilder app,ILoggerFactory loggerFactory, IHostingEnvironment env) { + loggerFactory.AddConsole(); + if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); diff --git a/samples/ASPNetCoreExecutor/appsettings.json b/samples/ASPNetCoreExecutor/appsettings.json index def9159..46addc9 100644 --- a/samples/ASPNetCoreExecutor/appsettings.json +++ b/samples/ASPNetCoreExecutor/appsettings.json @@ -4,5 +4,12 @@ "Default": "Warning" } }, - "AllowedHosts": "*" + "xxlJob": { + "adminAddresses":"http://127.0.0.1:8101", + "appName": "ASPNetCoreExecutor", + "port": 5000, + "accessToken": "", + "logRetentionDays": 30 + } + } diff --git a/samples/HessianReader/NewFile1.txt b/samples/HessianReader/NewFile1.txt new file mode 100644 index 0000000..bc3c576 --- /dev/null +++ b/samples/HessianReader/NewFile1.txt @@ -0,0 +1,72 @@ +------------ 0x43---------------- +ReadClassDefinition +Hessian.ClassDef={"Name":"com.xxl.rpc.remoting.net.params.XxlRpcRequest","Fields":["requestId","createMillisTime","accessToken","className","methodName","version","parameterTypes","parameters"]} +------------------------------------------------------------ +------------ 0x60---------------- +ReadObjectCompact +------------ 0x00---------------- +ReadShortString requestId +------------ 0x4c---------------- +ReadLongFull createMillisTime +------------ 0x00---------------- +ReadShortString accessToken +------------ 0x00---------------- +ReadShortString className +------------ 0x08---------------- +ReadShortString methodName +------------ 0x4e---------------- +ReadNull version +------------ 0x71---------------- +ReadCompactFixList parameterTypes +------------ 0x43---------------- +ReadClassDefinition +------------ 0x61---------------- +ReadObjectCompact +------------ 0x0e---------------- +ReadShortString +Hessian.HessianObject=[{"Item1":"requestId","Item2":""},{"Item1":"createMillisTime","Item2":1547809331509},{"Item1":"accessToken","Item2":""},{"Item1":"className","Item2":""},{"Item1":"methodName","Item2":"callback"} +,{"Item1":"version","Item2":null},{"Item1":"parameterTypes","Item2":[{"Name":"java.lang.Clas +s","Fields":["name"]}]},{"Item1":"parameters","Item2":[{"Item1":"name","Item2":"java.util.List"}]}] +------------------------------------------------------------ +------------ 0x72---------------- +ReadCompactFixList List +------------ 0x43---------------- +ReadClassDefinition HandleCallbackParam +------------ 0x62---------------- +ReadObjectCompact HandleCallbackParam +------------ 0xcc---------------- +ReadIntegerTwoBytes logId +------------ 0x4c---------------- +ReadLongFull logDateTim +------------ 0x43---------------- +ReadClassDefinition executeResult +System.Collections.Generic.List`1[System.Object]=[{"Name":"com.xxl.job.core.biz.model.HandleCallbackParam","Fields":["logId","logDateTim","executeResult"]},[{"Item1":"logId","Item2":1111},{"Item1":"logDateTim","Item2":1547809331507},{"Item1":"executeResult","Item2":{"Name":"com.xxl.job.core.biz.model.Return +T","Fields":["code","msg","content"]}}]] +------------------------------------------------------------ +------------ 0x63---------------- +ReadObjectCompact ReturnT +------------ 0xc8---------------- +ReadIntegerTwoBytes code +------------ 0x4e---------------- +ReadNull msg +------------ 0x07---------------- +ReadShortString aaaaaaa +Hessian.HessianObject=[{"Item1":"code","Item2":200},{"Item1":"msg","Item2":null},{"Item1":"content","Item2":"aaaaaaa"}] +------------------------------------------------------------ +------------ 0x62---------------- +ReadObjectCompact HandleCallbackParam +------------ 0xd7---------------- +ReadIntegerThreeBytes logId +------------ 0x4c---------------- +ReadLongFull logDateTim +------------ 0x63---------------- +ReadObjectCompact executeResult +------------ 0xc8---------------- +ReadIntegerTwoBytes code +------------ 0x4e---------------- +ReadNull msg +------------ 0x06---------------- +ReadShortString content +Hessian.HessianObject=[{"Item1":"logId","Item2":222222},{"Item1":"logDateTim","Item2":1547809331507},{"Item1":"executeResult","Item2":[{"Item1":"code","Item2":200},{"Item1":"msg","Item2":null},{"Item1":"content","Item2":"bbbbbb"}]}] +------------------------------------------------------------ + diff --git a/samples/HessianReader/Program.cs b/samples/HessianReader/Program.cs index 0bffc92..7908d15 100644 --- a/samples/HessianReader/Program.cs +++ b/samples/HessianReader/Program.cs @@ -12,7 +12,7 @@ namespace HessianReader { static void Main(string[] args) { - byte[] myBinary = File.ReadAllBytes("log.dat"); + byte[] myBinary = File.ReadAllBytes("request.dat"); foreach (var i in myBinary) { @@ -28,12 +28,23 @@ namespace HessianReader using (var stream1 = new MemoryStream(myBinary)) { - var s1 = HessianSerializer.DeserializeRequest(stream1); - Console.WriteLine(JsonConvert.SerializeObject(s1)); + //var s1 = HessianSerializer.DeserializeRequest(stream1); + var s = new Deserializer(stream1); + + + while ( stream1.CanRead) + { + var o = s.ReadValue(); + Console.WriteLine("{0}={1}",o.GetType(),JsonConvert.SerializeObject(o)); + Console.WriteLine("------------------------------------------------------------"); + } + + } Console.WriteLine("------------------------------------------------------------"); - + + return; RpcResponse response = new RpcResponse { RequestId = Guid.NewGuid().ToString("N"), Result = ReturnT.Failed("ABCDEFG") diff --git a/samples/HessianReader/request.dat b/samples/HessianReader/request.dat new file mode 100644 index 0000000..fd4af53 Binary files /dev/null and b/samples/HessianReader/request.dat differ diff --git a/src/DotXxlJob.Core/AdminClient.cs b/src/DotXxlJob.Core/AdminClient.cs index 3474865..a6e52f4 100644 --- a/src/DotXxlJob.Core/AdminClient.cs +++ b/src/DotXxlJob.Core/AdminClient.cs @@ -2,10 +2,10 @@ using System; using System.Collections.Generic; using System.IO; using System.Linq; -using System.Linq.Expressions; using System.Net.Http; using System.Net.Http.Headers; using System.Threading.Tasks; +using Hessian; using DotXxlJob.Core.Config; using DotXxlJob.Core.Model; using Microsoft.Extensions.Logging; @@ -29,7 +29,7 @@ namespace DotXxlJob.Core ,ILogger logger) { this._options = optionsAccessor.Value; - _clientFactory = clientFactory; + this._clientFactory = clientFactory; this._logger = logger; InitAddress(); } @@ -37,7 +37,7 @@ namespace DotXxlJob.Core private void InitAddress() { this._addresses = new List(); - foreach (var item in this._options.AdminAddresses) + foreach (var item in this._options.AdminAddresses.Split(';')) { try { @@ -73,7 +73,7 @@ namespace DotXxlJob.Core object parameters) { var request = new RpcRequest { - CreateMillisTime = DateTime.Now.ToUnixTimeSeconds(), + CreateMillisTime = DateTime.Now.GetTotalMilliseconds(), AccessToken = this._options.AccessToken, ClassName = "com.xxl.job.core.biz.AdminBiz", MethodName = methodName, @@ -93,23 +93,23 @@ namespace DotXxlJob.Core using (var client = this._clientFactory.CreateClient()) { - while (triedTimes++ < _addresses.Count) + while (triedTimes++ < this._addresses.Count) { - var address = _addresses[_currentIndex]; - _currentIndex = (_currentIndex + 1) % _addresses.Count; + var address = this._addresses[this._currentIndex]; + this._currentIndex = (this._currentIndex + 1) % this._addresses.Count; if (!address.CheckAccessable()) continue; Stream resStream; try { - resStream =await DoPost(client, address, postBuf); + resStream = await DoPost(client, address, postBuf); address.Reset(); } catch (Exception ex) { - _logger.LogError(ex, "request admin error."); + this._logger.LogError(ex, "request admin error."); address.SetFail(); continue; } @@ -121,13 +121,13 @@ namespace DotXxlJob.Core } catch (Exception ex) { - _logger.LogError(ex,"DeserializeResponse error:"+ex.Message); + this._logger.LogError(ex,"DeserializeResponse error:"+ex.Message); } if (res == null) { - return ReturnT.Failed("response is nul"); + return ReturnT.Failed("response is null"); } @@ -139,7 +139,7 @@ namespace DotXxlJob.Core return res.Result as ReturnT; } } - throw new Exception("xxl-rpc server address not accessable."); + throw new Exception("xxl-rpc server address not accessible."); } @@ -149,6 +149,7 @@ namespace DotXxlJob.Core var content = new ByteArrayContent(postBuf); content.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream"); var responseMessage = await client.PostAsync(address.RequestUri, content); + responseMessage.EnsureSuccessStatusCode(); return await responseMessage.Content.ReadAsStreamAsync(); } diff --git a/src/DotXxlJob.Core/JobHandlerAttribute.cs b/src/DotXxlJob.Core/Attributes/JobHandlerAttribute.cs similarity index 67% rename from src/DotXxlJob.Core/JobHandlerAttribute.cs rename to src/DotXxlJob.Core/Attributes/JobHandlerAttribute.cs index 7bc13ed..0f05d93 100644 --- a/src/DotXxlJob.Core/JobHandlerAttribute.cs +++ b/src/DotXxlJob.Core/Attributes/JobHandlerAttribute.cs @@ -10,5 +10,10 @@ namespace DotXxlJob.Core } public string Name { get; } + + /// + /// set Ignore + /// + public bool Ignore { get; set; } } } \ No newline at end of file diff --git a/src/DotXxlJob.Core/CallbackTaskQueue.cs b/src/DotXxlJob.Core/CallbackTaskQueue.cs deleted file mode 100644 index ef50149..0000000 --- a/src/DotXxlJob.Core/CallbackTaskQueue.cs +++ /dev/null @@ -1,13 +0,0 @@ -using DotXxlJob.Core.Model; - -namespace DotXxlJob.Core -{ - public class CallbackTaskQueue - { - public void Push(CallbackParam callbackParam) - { - - //throw new System.NotImplementedException(); - } - } -} \ No newline at end of file diff --git a/src/DotXxlJob.Core/Config/XxlJobExecutorOptions.cs b/src/DotXxlJob.Core/Config/XxlJobExecutorOptions.cs index 84e76cd..f17e4d0 100644 --- a/src/DotXxlJob.Core/Config/XxlJobExecutorOptions.cs +++ b/src/DotXxlJob.Core/Config/XxlJobExecutorOptions.cs @@ -1,3 +1,6 @@ +using System; +using System.IO; + namespace DotXxlJob.Core.Config { public class XxlJobExecutorOptions @@ -5,8 +8,8 @@ namespace DotXxlJob.Core.Config public string AdminAddresses { get; set; } - - public string AppName { get; set; } + + public string AppName { get; set; } = "DotXxlJob"; public string SpecialBindAddress { get; set; } @@ -17,11 +20,11 @@ namespace DotXxlJob.Core.Config public string AccessToken { get; set; } - - public string LogPath { get; set; } - - public int LogRetentionDays { get; set; } + public string LogPath { get; set; } = Path.Combine(AppContext.BaseDirectory, "./logs"); + + + public int LogRetentionDays { get; set; } = 30; } } \ No newline at end of file diff --git a/src/DotXxlJob.Core/DateTimeExtensions.cs b/src/DotXxlJob.Core/DateTimeExtensions.cs deleted file mode 100644 index 5ba3014..0000000 --- a/src/DotXxlJob.Core/DateTimeExtensions.cs +++ /dev/null @@ -1,35 +0,0 @@ -using System; - -namespace DotXxlJob.Core -{ - public static class DateTimeExtensions - { - private const long UnixEpochTicks = 621355968000000000; - private const long UnixEpochSeconds = 62135596800; - private const long UnixEpochMilliseconds = 62135596800000; - - public static DateTimeOffset FromUnixTimeSeconds(this long seconds) - { - long ticks = seconds * TimeSpan.TicksPerSecond + UnixEpochTicks; - return new DateTime(ticks, DateTimeKind.Utc); - } - - public static DateTime FromUnixTimeMilliseconds(this long milliseconds) - { - long ticks = milliseconds * TimeSpan.TicksPerMillisecond + UnixEpochTicks; - return new DateTime(ticks, DateTimeKind.Utc); - } - - public static long ToUnixTimeSeconds(this DateTime dateTime) - { - long seconds = dateTime.ToUniversalTime().Ticks / TimeSpan.TicksPerSecond; - return seconds - UnixEpochSeconds; - } - - public static long ToUnixTimeMilliseconds(this DateTime dateTime) - { - long milliseconds = dateTime.ToUniversalTime().Ticks / TimeSpan.TicksPerMillisecond; - return milliseconds - UnixEpochMilliseconds; - } - } -} \ No newline at end of file diff --git a/src/DotXxlJob.Core/DefaultHandlers/AbsJobHandler.cs b/src/DotXxlJob.Core/DefaultHandlers/AbsJobHandler.cs new file mode 100644 index 0000000..308f41c --- /dev/null +++ b/src/DotXxlJob.Core/DefaultHandlers/AbsJobHandler.cs @@ -0,0 +1,16 @@ +using System.Threading.Tasks; +using DotXxlJob.Core.Model; + +namespace DotXxlJob.Core.DefaultHandlers +{ + public abstract class AbsJobHandler:IJobHandler + { + public virtual void Dispose() + { + + } + + public abstract Task Execute(JobExecuteContext context); + + } +} \ No newline at end of file diff --git a/src/DotXxlJob.Core/DefaultHandlers/HttpJobHandler.cs b/src/DotXxlJob.Core/DefaultHandlers/HttpJobHandler.cs new file mode 100644 index 0000000..d98d839 --- /dev/null +++ b/src/DotXxlJob.Core/DefaultHandlers/HttpJobHandler.cs @@ -0,0 +1,15 @@ +using System.Threading.Tasks; +using DotXxlJob.Core.Model; + +namespace DotXxlJob.Core.DefaultHandlers +{ + [JobHandler("httpJobHandler")] + public class HttpJobHandler:AbsJobHandler + { + public override Task Execute(JobExecuteContext context) + { + context.JobLogger.Log("Get Request Data:{0}",context.JobParameter); + return Task.FromResult(ReturnT.SUCCESS); + } + } +} \ No newline at end of file diff --git a/src/DotXxlJob.Core/DefaultJobHandlerFactory.cs b/src/DotXxlJob.Core/DefaultJobHandlerFactory.cs index a3ebab6..71f3b15 100644 --- a/src/DotXxlJob.Core/DefaultJobHandlerFactory.cs +++ b/src/DotXxlJob.Core/DefaultJobHandlerFactory.cs @@ -1,3 +1,5 @@ +using DotXxlJob.Core.DefaultHandlers; + namespace DotXxlJob.Core { public class DefaultJobHandlerFactory:IJobHandlerFactory diff --git a/src/DotXxlJob.Core/DotXxlJob.Core.csproj b/src/DotXxlJob.Core/DotXxlJob.Core.csproj index cb6e128..6a74dc3 100644 --- a/src/DotXxlJob.Core/DotXxlJob.Core.csproj +++ b/src/DotXxlJob.Core/DotXxlJob.Core.csproj @@ -7,6 +7,8 @@ + + diff --git a/src/DotXxlJob.Core/Extensions/ServiceCollectionExtensions.cs b/src/DotXxlJob.Core/Extensions/ServiceCollectionExtensions.cs new file mode 100644 index 0000000..dad9ce1 --- /dev/null +++ b/src/DotXxlJob.Core/Extensions/ServiceCollectionExtensions.cs @@ -0,0 +1,44 @@ +using System; +using DotXxlJob.Core.Config; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; + +namespace DotXxlJob.Core +{ + public static class ServiceCollectionExtensions + { + public static IServiceCollection AddXxlJobExecutor(this IServiceCollection services,IConfiguration configuration) + { + services.AddLogging(); + services.AddOptions(); + services.Configure(configuration.GetSection("xxlJob")) + .AddXxlJobExecutorServiceDependency(); + + return services; + } + public static IServiceCollection AddXxlJobExecutor(this IServiceCollection services,Action configAction) + { + services.AddLogging(); + services.AddOptions(); + services.Configure(configAction).AddXxlJobExecutorServiceDependency(); + return services; + } + private static IServiceCollection AddXxlJobExecutorServiceDependency(this IServiceCollection services) + { + + services.AddHttpClient("DotXxlJobClient"); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + + return services; + } + + } +} \ No newline at end of file diff --git a/src/DotXxlJob.Core/JobExecuteHostedService.cs b/src/DotXxlJob.Core/Hosted/JobExecuteHostedService.cs similarity index 100% rename from src/DotXxlJob.Core/JobExecuteHostedService.cs rename to src/DotXxlJob.Core/Hosted/JobExecuteHostedService.cs diff --git a/src/DotXxlJob.Core/HttpJobHandler.cs b/src/DotXxlJob.Core/HttpJobHandler.cs deleted file mode 100644 index 9228d74..0000000 --- a/src/DotXxlJob.Core/HttpJobHandler.cs +++ /dev/null @@ -1,18 +0,0 @@ -using System.Threading.Tasks; -using DotXxlJob.Core.Model; - -namespace DotXxlJob.Core -{ - public class HttpJobHandler:IJobHandler - { - public void Dispose() - { - - } - - public Task Execute(JobExecuteContext context) - { - return Task.FromResult(ReturnT.SUCCESS); - } - } -} \ No newline at end of file diff --git a/src/DotXxlJob.Core/IXxlJobExecutor.cs b/src/DotXxlJob.Core/IXxlJobExecutor.cs deleted file mode 100644 index 07254b5..0000000 --- a/src/DotXxlJob.Core/IXxlJobExecutor.cs +++ /dev/null @@ -1,20 +0,0 @@ -using DotXxlJob.Core.Model; - -namespace DotXxlJob.Core -{ - public interface IXxlJobExecutor - { - ReturnT Beat(); - - - ReturnT IdleBeat(int jobId); - - - ReturnT Kill(int jobId); - - ReturnT Log(long logDateTim, int logId, int fromLineNum); - - - ReturnT Run(TriggerParam triggerParam); - } -} \ No newline at end of file diff --git a/src/DotXxlJob.Core/Constants.cs b/src/DotXxlJob.Core/Internal/Constants.cs similarity index 92% rename from src/DotXxlJob.Core/Constants.cs rename to src/DotXxlJob.Core/Internal/Constants.cs index 80de582..e711daa 100644 --- a/src/DotXxlJob.Core/Constants.cs +++ b/src/DotXxlJob.Core/Internal/Constants.cs @@ -16,7 +16,7 @@ namespace DotXxlJob.Core public const int MaxCallbackRetryTimes = 10; //每次回调最多发送几条记录 - public const int MaxCallbackRecordsPerRequest = 100; + public const int MaxCallbackRecordsPerRequest = 20; public static TimeSpan CallbackRetryInterval = TimeSpan.FromSeconds(600); //Admin集群机器请求默认超时时间 @@ -26,7 +26,7 @@ namespace DotXxlJob.Core //Admin集群中的某台机器请求失败多少次后熔断 public const int AdminServerCircuitFailedTimes = 3; - public static TimeSpan JobThreadWaitTime = TimeSpan.FromSeconds(90); + public static class GlueType { diff --git a/src/DotXxlJob.Core/HessianSerializer.cs b/src/DotXxlJob.Core/Internal/HessianSerializer.cs similarity index 79% rename from src/DotXxlJob.Core/HessianSerializer.cs rename to src/DotXxlJob.Core/Internal/HessianSerializer.cs index c14342c..9dead15 100644 --- a/src/DotXxlJob.Core/HessianSerializer.cs +++ b/src/DotXxlJob.Core/Internal/HessianSerializer.cs @@ -23,64 +23,30 @@ namespace DotXxlJob.Core private static readonly Dictionary returnProperties = new Dictionary(); + + private static readonly Dictionary callbackProperties = + new Dictionary(); static HessianSerializer() { - var typeInfo = typeof(RpcRequest).GetTypeInfo(); - foreach (var property in typeInfo.DeclaredProperties) - { - var attribute = property.GetCustomAttribute(); - - if (null == attribute) - { - continue; - } - - if (!property.CanRead || !property.CanWrite) - { - continue; - } - - requestProperties.Add(attribute.Name,property); - } + + InitProperties(typeof(RpcRequest), requestProperties); - var triggerTypeInfo = typeof(TriggerParam).GetTypeInfo(); - foreach (var property in triggerTypeInfo.DeclaredProperties) - { - var attribute = property.GetCustomAttribute(); - - if (null == attribute) - { - continue; - } - - if (!property.CanRead || !property.CanWrite) - { - continue; - } - - triggerProperties.Add(attribute.Name,property); - } + InitProperties(typeof(TriggerParam), triggerProperties); - var rspTypeInfo = typeof(RpcResponse).GetTypeInfo(); - foreach (var property in rspTypeInfo.DeclaredProperties) - { - var attribute = property.GetCustomAttribute(); + InitProperties(typeof(RpcResponse), responseProperties); - if (null == attribute) - { - continue; - } + InitProperties(typeof(ReturnT), returnProperties); + + + InitProperties(typeof(HandleCallbackParam), callbackProperties); + + } - if (!property.CanRead || !property.CanWrite) - { - continue; - } - responseProperties.Add(attribute.Name,property); - } - - var retTypeInfo = typeof(ReturnT).GetTypeInfo(); - foreach (var property in retTypeInfo.DeclaredProperties) + private static void InitProperties(Type type, Dictionary propertyInfos) + { + var typeInfo = type.GetTypeInfo(); + foreach (var property in typeInfo.DeclaredProperties) { var attribute = property.GetCustomAttribute(); @@ -94,7 +60,7 @@ namespace DotXxlJob.Core continue; } - returnProperties.Add(attribute.Name,property); + propertyInfos.Add(attribute.Name,property); } } @@ -173,30 +139,56 @@ namespace DotXxlJob.Core return; } - if (itemType == typeof(ClassDef)) + if (item is ClassDef paramClass) { - var triggerClass = item as ClassDef; //TODO:这里要做成动态的话 ,可以注册所有的实体到对应的字典中,不过这里只有这个类型哦 - if (triggerClass.Name != "com.xxl.job.core.biz.model.TriggerParam") + if (paramClass.Name != "com.xxl.job.core.biz.model.TriggerParam") { - throw new HessianException($"not expected parameter type [{triggerClass.Name}]"); + throw new HessianException($"not expected parameter type [{paramClass.Name}]"); } - if (!(deserializer.ReadValue() is HessianObject triggerData)) + if (!(deserializer.ReadValue() is HessianObject paramData)) { throw new HessianException("not expected parameter type ,data is null"); } - TriggerParam param = new TriggerParam(); - foreach (var field in triggerData) + + object val ; + if (paramClass.Name == "com.xxl.job.core.biz.model.TriggerParam") + { + val =new TriggerParam(); + } + else + { + val =new HandleCallbackParam(); + } + + foreach (var (key,value) in paramData) { - if (triggerProperties.TryGetValue(field.Item1, out var tgPropertyInfo)) + if (triggerProperties.TryGetValue(key, out var tgPropertyInfo)) { - tgPropertyInfo.SetValue(param,field.Item2); + tgPropertyInfo.SetValue(val,value); + } + } + list.Add(val); + } + else if (item is HessianObject hessianObject) + { + if (hessianObject.TypeName == "com.xxl.job.core.biz.model.HandleCallbackParam") + { + var val =new HandleCallbackParam(); + foreach (var (key,value) in hessianObject) + { + if (triggerProperties.TryGetValue(key, out var tgPropertyInfo)) + { + tgPropertyInfo.SetValue(val,value); + } } + list.Add(val); } } else { + throw new HessianException($"unsupported list item type =[{itemType}]"); } @@ -271,12 +263,12 @@ namespace DotXxlJob.Core { throw new HessianException("not expected parameter type ,data is null"); } - ReturnT data = new ReturnT(); - foreach (var field in resultData) + var data = new ReturnT(); + foreach (var (key, value) in resultData) { - if (returnProperties.TryGetValue(field.Item1, out var tgPropertyInfo)) + if (returnProperties.TryGetValue(key, out var tgPropertyInfo)) { - tgPropertyInfo.SetValue(data,field.Item2); + tgPropertyInfo.SetValue(data,value); } } diff --git a/src/DotXxlJob.Core/JobDispatcher.cs b/src/DotXxlJob.Core/JobDispatcher.cs index 3cad564..bee51a7 100644 --- a/src/DotXxlJob.Core/JobDispatcher.cs +++ b/src/DotXxlJob.Core/JobDispatcher.cs @@ -12,23 +12,26 @@ namespace DotXxlJob.Core { private readonly TaskExecutorFactory _executorFactory; private readonly CallbackTaskQueue _callbackTaskQueue; - - private readonly ConcurrentDictionary RUNNING_QUEUE = new ConcurrentDictionary(); + private readonly IJobLogger _jobLogger; + private readonly ConcurrentDictionary RUNNING_QUEUE = new ConcurrentDictionary(); - private readonly ILogger _jobQueueLogger; + + private readonly ILogger _jobQueueLogger; private readonly ILogger _logger; public JobDispatcher( TaskExecutorFactory executorFactory, CallbackTaskQueue callbackTaskQueue, + IJobLogger jobLogger, ILoggerFactory loggerFactory ) { this. _executorFactory = executorFactory; this. _callbackTaskQueue = callbackTaskQueue; - + this._jobLogger = jobLogger; + - this._jobQueueLogger = loggerFactory.CreateLogger(); + this._jobQueueLogger = loggerFactory.CreateLogger(); this._logger = loggerFactory.CreateLogger(); } @@ -93,7 +96,7 @@ namespace DotXxlJob.Core /// - /// 等待检查 + /// IdleBeat /// /// /// @@ -103,11 +106,23 @@ namespace DotXxlJob.Core new ReturnT(ReturnT.FAIL_CODE, "job thread is running or has trigger queue.") : ReturnT.SUCCESS; } - + + private void TriggerCallback(object sender, HandleCallbackParam callbackParam) + { + this._callbackTaskQueue.Push(callbackParam); + } private ReturnT PushJobQueue(TriggerParam triggerParam, ITaskExecutor executor) { - JobQueue jobQueue = new JobQueue ( executor, this._callbackTaskQueue,this._jobQueueLogger); + + if (RUNNING_QUEUE.TryGetValue(triggerParam.JobId,out var jobQueue)) + { + return jobQueue.Push(triggerParam); + } + + //NewJobId + jobQueue = new JobTaskQueue ( executor,this._jobLogger, this._jobQueueLogger); + jobQueue.CallBack += TriggerCallback; if (RUNNING_QUEUE.TryAdd(triggerParam.JobId, jobQueue)) { return jobQueue.Push(triggerParam); @@ -117,9 +132,16 @@ namespace DotXxlJob.Core private ReturnT ChangeJobQueue(TriggerParam triggerParam, ITaskExecutor executor) { + + if (RUNNING_QUEUE.TryRemove(triggerParam.JobId, out var oldJobTask)) + { + oldJobTask.CallBack -= TriggerCallback; + oldJobTask.Dispose(); //释放原来的资源 + } - JobQueue jobQueue = new JobQueue ( executor, this._callbackTaskQueue,this._jobQueueLogger); - if (RUNNING_QUEUE.TryUpdate(triggerParam.JobId, jobQueue, null)) + JobTaskQueue jobQueue = new JobTaskQueue ( executor,this._jobLogger, this._jobQueueLogger); + jobQueue.CallBack += TriggerCallback; + if (RUNNING_QUEUE.TryAdd(triggerParam.JobId, jobQueue)) { return jobQueue.Push(triggerParam); } diff --git a/src/DotXxlJob.Core/JobLogger.cs b/src/DotXxlJob.Core/JobLogger.cs deleted file mode 100644 index e5133da..0000000 --- a/src/DotXxlJob.Core/JobLogger.cs +++ /dev/null @@ -1,7 +0,0 @@ -namespace DotXxlJob.Core -{ - public class JobLogger - { - - } -} \ No newline at end of file diff --git a/src/DotXxlJob.Core/Json/ProjectDefaultResolver.cs b/src/DotXxlJob.Core/Json/ProjectDefaultResolver.cs new file mode 100644 index 0000000..be3e1dc --- /dev/null +++ b/src/DotXxlJob.Core/Json/ProjectDefaultResolver.cs @@ -0,0 +1,64 @@ +using System.Reflection; +using Utf8Json; +using Utf8Json.Formatters; +using Utf8Json.Resolvers; + +namespace DotXxlJob.Core.Json +{ + public class ProjectDefaultResolver : IJsonFormatterResolver + { + public static IJsonFormatterResolver Instance = new ProjectDefaultResolver(); + + // configure your resolver and formatters. + static readonly IJsonFormatter[] formatters = { + new DateTimeFormatter("yyyy-MM-dd HH:mm:ss"), + new NullableDateTimeFormatter("yyyy-MM-dd HH:mm:ss") + }; + + static readonly IJsonFormatterResolver[] resolvers = new[] + { + EnumResolver.UnderlyingValue, + StandardResolver.AllowPrivateExcludeNullSnakeCase + }; + + ProjectDefaultResolver() + { + } + + public IJsonFormatter GetFormatter() + { + return FormatterCache.formatter; + } + + static class FormatterCache + { + public static readonly IJsonFormatter formatter; + + static FormatterCache() + { + foreach (var item in formatters) + { + foreach (var implInterface in item.GetType().GetTypeInfo().ImplementedInterfaces) + { + var ti = implInterface.GetTypeInfo(); + if (ti.IsGenericType && ti.GenericTypeArguments[0] == typeof(T)) + { + formatter = (IJsonFormatter)item; + return; + } + } + } + + foreach (var item in resolvers) + { + var f = item.GetFormatter(); + if (f != null) + { + formatter = f; + return; + } + } + } + } + } +} \ No newline at end of file diff --git a/src/DotXxlJob.Core/Logger/IJobLogger.cs b/src/DotXxlJob.Core/Logger/IJobLogger.cs new file mode 100644 index 0000000..970b026 --- /dev/null +++ b/src/DotXxlJob.Core/Logger/IJobLogger.cs @@ -0,0 +1,23 @@ +using System; +using DotXxlJob.Core.Model; + +namespace DotXxlJob.Core +{ + public interface IJobLogger + { + + void SetLogFile(long logTime, int logId); + + void Log(string pattern, params object[] format); + + + void LogError(Exception ex); + + + LogResult ReadLog(long logTime, int logId, int fromLine); + + + void LogSpecialFile(long logTime, int logId, string pattern, params object[] format); + + } +} \ No newline at end of file diff --git a/src/DotXxlJob.Core/Logger/JobLogger.cs b/src/DotXxlJob.Core/Logger/JobLogger.cs new file mode 100644 index 0000000..43d88f7 --- /dev/null +++ b/src/DotXxlJob.Core/Logger/JobLogger.cs @@ -0,0 +1,185 @@ +using System; +using System.Diagnostics; +using System.IO; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using DotXxlJob.Core.Config; +using DotXxlJob.Core.Model; +using Hessian; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace DotXxlJob.Core +{ + public class JobLogger:IJobLogger + { + private readonly ILogger _logger; + + private readonly AsyncLocal LogFileName = new AsyncLocal(); + + private readonly XxlJobExecutorOptions _options; + public JobLogger(IOptions optionsAccessor,ILogger logger) + { + this._logger = logger; + this._options = optionsAccessor.Value; + } + + public void SetLogFile(long logTime, int logId) + { + try + { + var filePath = MakeLogFileName(logTime, logId); + var dir = Path.GetDirectoryName(filePath); + if (!Directory.Exists(dir)) + { + Directory.CreateDirectory(dir); + CleanOldLogs(); + } + LogFileName.Value = filePath; + } + catch (Exception ex) + { + _logger.LogError(ex, "SetLogFileName error."); + } + } + + + public void Log(string pattern, params object[] format) + { + var appendLog = string.Format(pattern, format); + var callInfo = new StackTrace(true).GetFrame(1); + LogDetail(GetLogFileName(), callInfo, appendLog); + } + + public void LogError(Exception ex) + { + var callInfo = new StackTrace(true).GetFrame(1); + LogDetail(GetLogFileName(), callInfo, ex.Message + ex.StackTrace); + } + + public LogResult ReadLog(long logTime, int logId, int fromLine) + { + var filePath = MakeLogFileName(logTime, logId); + if (string.IsNullOrEmpty(filePath)) + { + return new LogResult(fromLine, 0, "readLog fail, logFile not found", true); + } + if (!File.Exists(filePath)) + { + return new LogResult(fromLine, 0, "readLog fail, logFile not exists", true); + } + + // read file + var logContentBuffer = new StringBuilder(); + int toLineNum = 0; + try + { + using (var reader = new StreamReader(filePath, Encoding.UTF8)) + { + string line; + while ((line = reader.ReadLine()) != null) + { + toLineNum++; + if (toLineNum >= fromLine) + { + logContentBuffer.AppendLine(line); + } + } + } + } + catch (Exception ex) + { + _logger.LogError(ex, "ReadLog error."); + } + + // result + var logResult = new LogResult(fromLine, toLineNum, logContentBuffer.ToString(), false); + return logResult; + } + + public void LogSpecialFile(long logTime, int logId, string pattern, params object[] format) + { + var filePath = MakeLogFileName(logTime, logId); + var callInfo = new StackTrace(true).GetFrame(1); + var content = string.Format(pattern, format); + LogDetail(filePath, callInfo, content); + } + + + private string GetLogFileName() + { + return LogFileName.Value; + } + private string MakeLogFileName(long logDateTime, int logId) + { + //log fileName like: logPath/HandlerLogs/yyyy-MM-dd/9999.log + return Path.Combine(_options.LogPath, Constants.HandleLogsDirectory, + logDateTime.FromMilliseconds().ToString("yyyy-MM-dd"), $"{logId}.log"); + } + private void LogDetail(string logFileName, StackFrame callInfo, string appendLog) + { + if (string.IsNullOrEmpty(logFileName)) + { + return; + } + + var stringBuffer = new StringBuilder(); + stringBuffer + .Append(DateTime.Now.ToString("s")).Append(" ") + .Append("[" + callInfo.GetMethod().DeclaringType.FullName + "#" + callInfo.GetMethod().Name + "]").Append("-") + .Append("[line " + callInfo.GetFileLineNumber() + "]").Append("-") + .Append("[thread " + Thread.CurrentThread.ManagedThreadId + "]").Append(" ") + .Append(appendLog ?? string.Empty) + .AppendLine(); + + var formatAppendLog = stringBuffer.ToString(); + + try + { + File.AppendAllText(logFileName, formatAppendLog, Encoding.UTF8); + } + catch (Exception ex) + { + this._logger.LogError(ex, "LogDetail error"); + } + } + + private void CleanOldLogs() + { + if (_options.LogRetentionDays <= 0) + { + return; + } + + Task.Run(() => + { + try + { + var handlerLogsDir = new DirectoryInfo(Path.Combine(_options.LogPath, Constants.HandleLogsDirectory)); + if (!handlerLogsDir.Exists) + { + return; + } + + var today = DateTime.UtcNow.Date; + foreach (var dir in handlerLogsDir.GetDirectories()) + { + if (DateTime.TryParse(dir.Name, out var dirDate)) + { + if (today.Subtract(dirDate.Date).Days > _options.LogRetentionDays) + { + dir.Delete(true); + } + } + } + } + catch (Exception ex) + { + _logger.LogError(ex, "CleanOldLogs error."); + } + }); + } + + } +} \ No newline at end of file diff --git a/src/DotXxlJob.Core/Model/CallbackParam.cs b/src/DotXxlJob.Core/Model/CallbackParam.cs deleted file mode 100644 index 24aa640..0000000 --- a/src/DotXxlJob.Core/Model/CallbackParam.cs +++ /dev/null @@ -1,10 +0,0 @@ -namespace DotXxlJob.Core.Model -{ - public class CallbackParam - { - public CallbackParam(TriggerParam triggerParam, ReturnT result) - { - throw new System.NotImplementedException(); - } - } -} \ No newline at end of file diff --git a/src/DotXxlJob.Core/Model/HandleCallbackParam.cs b/src/DotXxlJob.Core/Model/HandleCallbackParam.cs index 865ef60..b4b6243 100644 --- a/src/DotXxlJob.Core/Model/HandleCallbackParam.cs +++ b/src/DotXxlJob.Core/Model/HandleCallbackParam.cs @@ -5,13 +5,25 @@ namespace DotXxlJob.Core.Model [DataContract(Name = "com.xxl.job.core.biz.model.HandleCallbackParam")] public class HandleCallbackParam { - [DataMember(Name = "callbackRetryTimes",Order = 1)] - public int CallbackRetryTimes; - [DataMember(Name = "logId",Order = 2)] - public int LogId; - [DataMember(Name = "logDateTim",Order = 3)] - public long LogDateTim; - [DataMember(Name = "executeResult",Order = 4)] - public ReturnT ExecuteResult; + public HandleCallbackParam() + { + + } + public HandleCallbackParam(TriggerParam triggerParam, ReturnT result) + { + this.LogId = triggerParam.LogId; + this.LogDateTime = triggerParam.LogDateTime; + this.ExecuteResult = result; + } + + + public int CallbackRetryTimes { get; set; } + + [DataMember(Name = "logId",Order = 1)] + public int LogId { get; set; } + [DataMember(Name = "logDateTim",Order = 2)] + public long LogDateTime { get; set; } + [DataMember(Name = "executeResult",Order = 3)] + public ReturnT ExecuteResult { get; set; } } } \ No newline at end of file diff --git a/src/DotXxlJob.Core/Model/JobExecuteContext.cs b/src/DotXxlJob.Core/Model/JobExecuteContext.cs index 39ff717..253ec81 100644 --- a/src/DotXxlJob.Core/Model/JobExecuteContext.cs +++ b/src/DotXxlJob.Core/Model/JobExecuteContext.cs @@ -2,6 +2,12 @@ namespace DotXxlJob.Core.Model { public class JobExecuteContext { - + public JobExecuteContext(IJobLogger jobLogger,string jobParameter) + { + this.JobLogger = jobLogger; + this.JobParameter = jobParameter; + } + public string JobParameter { get; } + public IJobLogger JobLogger { get; } } } \ No newline at end of file diff --git a/src/DotXxlJob.Core/Model/LogResult.cs b/src/DotXxlJob.Core/Model/LogResult.cs index a4c855c..c181dfb 100644 --- a/src/DotXxlJob.Core/Model/LogResult.cs +++ b/src/DotXxlJob.Core/Model/LogResult.cs @@ -5,6 +5,15 @@ namespace DotXxlJob.Core.Model [DataContract(Name = "com.xxl.job.core.biz.model.LogResult")] public class LogResult { + + public LogResult(int fromLine ,int toLine,string content,bool isEnd) + { + this.FromLineNum = fromLine; + this.ToLineNum = toLine; + this.LogContent = content; + this.IsEnd = isEnd; + } + [DataMember(Name = "fromLineNum",Order = 1)] public int FromLineNum { get; set; } [DataMember(Name = "toLineNum",Order = 2)] diff --git a/src/DotXxlJob.Core/Queue/CallbackTaskQueue.cs b/src/DotXxlJob.Core/Queue/CallbackTaskQueue.cs new file mode 100644 index 0000000..f996cf0 --- /dev/null +++ b/src/DotXxlJob.Core/Queue/CallbackTaskQueue.cs @@ -0,0 +1,113 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using DotXxlJob.Core.Config; +using DotXxlJob.Core.Model; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace DotXxlJob.Core +{ + public class CallbackTaskQueue:IDisposable + { + private readonly AdminClient _adminClient; + private readonly IJobLogger _jobLogger; + private readonly RetryCallbackTaskQueue _retryQueue; + private readonly ILogger _logger; + private readonly ConcurrentQueue TASK_QUEUE = new ConcurrentQueue(); + + private bool _stop; + + private bool _isRunning; + + private Task _runTask; + public CallbackTaskQueue(AdminClient adminClient,IJobLogger jobLogger,IOptions optionsAccessor + , ILoggerFactory loggerFactory) + { + this._adminClient = adminClient; + this._jobLogger = jobLogger; + + this._retryQueue = new RetryCallbackTaskQueue(optionsAccessor.Value.LogPath, + Push, + loggerFactory.CreateLogger()); + + this._logger = loggerFactory.CreateLogger(); + } + + public void Push(HandleCallbackParam callbackParam) + { + TASK_QUEUE.Enqueue(callbackParam); + StartCallBack(); + } + + + public void Dispose() + { + this._stop = true; + this._retryQueue.Dispose(); + this._runTask?.GetAwaiter().GetResult(); + } + + + private void StartCallBack() + { + if ( this._isRunning) + { + return; + } + + this._runTask = Task.Run(async () => + { + this._logger.LogDebug("start to callback"); + this._isRunning = true; + while (!this._stop) + { + await DoCallBack(); + } + this._logger.LogDebug("end to callback"); + this._isRunning = false; + }); + + } + + private async Task DoCallBack() + { + List list = new List(); + + while (list.Count < Constants.MaxCallbackRecordsPerRequest && this.TASK_QUEUE.TryDequeue(out var item)) + { + list.Add(item); + } + + if (!list.Any()) + { + return; + } + + ReturnT result; + try + { + result = await this._adminClient.Callback(list); + } + catch (Exception ex){ + this._logger.LogError(ex,"trigger callback error:{error}",ex.Message); + result = ReturnT.Failed(ex.Message); + this._retryQueue.Push(list); + } + + LogCallBackResult(result, list); + } + + private void LogCallBackResult(ReturnT result,List list) + { + foreach (var param in list) + { + this._jobLogger.LogSpecialFile(param.LogDateTime, param.LogId, result.Msg??"Empty"); + } + } + + + } +} \ No newline at end of file diff --git a/src/DotXxlJob.Core/TaskQueue.cs b/src/DotXxlJob.Core/Queue/JobTaskQueue.cs similarity index 53% rename from src/DotXxlJob.Core/TaskQueue.cs rename to src/DotXxlJob.Core/Queue/JobTaskQueue.cs index 9275bfa..d115c30 100644 --- a/src/DotXxlJob.Core/TaskQueue.cs +++ b/src/DotXxlJob.Core/Queue/JobTaskQueue.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Concurrent; +using System.Net.NetworkInformation; using System.Threading; using System.Threading.Tasks; using DotXxlJob.Core.Model; @@ -7,23 +8,28 @@ using Microsoft.Extensions.Logging; namespace DotXxlJob.Core { - public class JobQueue + public class JobTaskQueue:IDisposable { private readonly ITaskExecutor _executor; - private readonly CallbackTaskQueue _callbackTaskQueue; - private readonly ILogger _logger; + private readonly IJobLogger _jobLogger; + private readonly ILogger _logger; private readonly ConcurrentQueue TASK_QUEUE = new ConcurrentQueue(); - public JobQueue(ITaskExecutor executor,CallbackTaskQueue callbackTaskQueue,ILogger logger) + private readonly ConcurrentDictionary ID_IN_QUEUE = new ConcurrentDictionary(); + private CancellationTokenSource _cancellationTokenSource; + private Task _runTask; + public JobTaskQueue(ITaskExecutor executor,IJobLogger jobLogger,ILogger logger) { - _executor = executor; - _callbackTaskQueue = callbackTaskQueue; - _logger = logger; + this._executor = executor; + this._jobLogger = jobLogger; + this._logger = logger; } public ITaskExecutor Executor => this._executor; - private CancellationTokenSource _cancellationTokenSource; + public event EventHandler CallBack; + + /// /// 覆盖之前的队列 @@ -37,12 +43,18 @@ namespace DotXxlJob.Core { TASK_QUEUE.TryDequeue(out _); } + ID_IN_QUEUE.Clear(); return Push(triggerParam); } public ReturnT Push(TriggerParam triggerParam) { + if(!ID_IN_QUEUE.TryAdd(triggerParam.LogId,0)) + { + _logger.LogWarning("repeat job task,logId={logId},jobId={jobId}",triggerParam.LogId,triggerParam.JobId); + return ReturnT.Failed("repeat job task!"); + } this.TASK_QUEUE.Enqueue(triggerParam); StartTask(); return ReturnT.SUCCESS; @@ -53,8 +65,20 @@ namespace DotXxlJob.Core this._cancellationTokenSource?.Cancel(); this._cancellationTokenSource?.Dispose(); this._cancellationTokenSource = null; + + //wait for task completed + this._runTask?.GetAwaiter().GetResult(); } + public void Dispose() + { + Stop(); + while (!TASK_QUEUE.IsEmpty) + { + TASK_QUEUE.TryDequeue(out _); + } + ID_IN_QUEUE.Clear(); + } private void StartTask() { @@ -65,7 +89,7 @@ namespace DotXxlJob.Core this._cancellationTokenSource =new CancellationTokenSource(); CancellationToken ct = _cancellationTokenSource.Token; - Task.Factory.StartNew(async () => + this._runTask = Task.Factory.StartNew(async () => { //ct.ThrowIfCancellationRequested(); @@ -84,7 +108,19 @@ namespace DotXxlJob.Core if (TASK_QUEUE.TryDequeue(out triggerParam)) { - result = await this._executor.Execute(triggerParam); + if (ID_IN_QUEUE.TryRemove(triggerParam.LogId,out _)) + { + this._logger.LogWarning("remove id in queue failed,logId={logId},jobId={jobId}" + ,triggerParam.LogId,triggerParam.JobId); + } + //set log file; + this._jobLogger.SetLogFile(triggerParam.LogDateTime,triggerParam.LogId); + + this._jobLogger.Log("
----------- xxl-job job execute start -----------
----------- Param:{0}" ,triggerParam.ExecutorParams); + + result = await this._executor.Execute(triggerParam); + + this._jobLogger.Log("
----------- xxl-job job execute end(finish) -----------
----------- ReturnT:" + result.Code); } else { @@ -94,11 +130,12 @@ namespace DotXxlJob.Core catch (Exception ex) { result = ReturnT.Failed("Dequeue Task Failed:"+ex.Message); + this._jobLogger.Log("
----------- JobThread Exception:" + ex.Message + "
----------- xxl-job job execute end(error) -----------"); } if(triggerParam !=null) { - this._callbackTaskQueue.Push(new CallbackParam(triggerParam, result)); + CallBack?.Invoke(this,new HandleCallbackParam(triggerParam, result??ReturnT.FAIL)); } } diff --git a/src/DotXxlJob.Core/Queue/RetryCallbackTaskQueue.cs b/src/DotXxlJob.Core/Queue/RetryCallbackTaskQueue.cs new file mode 100644 index 0000000..ecdcdf5 --- /dev/null +++ b/src/DotXxlJob.Core/Queue/RetryCallbackTaskQueue.cs @@ -0,0 +1,125 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using DotXxlJob.Core.Json; +using DotXxlJob.Core.Model; +using Microsoft.Extensions.Logging; + +namespace DotXxlJob.Core +{ + public class RetryCallbackTaskQueue:IDisposable + { + + private readonly Action _actionDoCallback; + private readonly ILogger _logger; + + private bool _stop; + private Task _runTask; + private readonly string _backupFile; + public RetryCallbackTaskQueue(string backupPath,Action actionDoCallback,ILogger logger) + { + + this._actionDoCallback = actionDoCallback; + this._logger = logger; + this._backupFile = Path.Combine(backupPath, "xxl-job-callback.log"); + var dir = Path.GetDirectoryName(backupPath); + if (!Directory.Exists(dir)) + { + Directory.CreateDirectory(dir); + } + + StartQueue(); + } + + private void StartQueue() + { + this._runTask = Task.Factory.StartNew(async () => + { + while (!this._stop) + { + await LoadFromFile(); + await Task.Delay(Constants.CallbackRetryInterval); + } + + }, TaskCreationOptions.LongRunning); + } + + private async Task LoadFromFile() + { + var list = new List(); + + if (!File.Exists(_backupFile)) + { + return; + } + + var nextLine = string.Empty; + using (StreamReader reader = new StreamReader(this._backupFile)) + { + while ((nextLine = await reader.ReadLineAsync()) != null) + { + try + { + list.Add(Utf8Json.JsonSerializer.Deserialize(nextLine, ProjectDefaultResolver.Instance)); + } + catch(Exception ex) + { + this._logger.LogError(ex,"de error:{error}",ex.Message); + } + + } + } + + if (list.Any()) + { + foreach (var item in list) + { + this._actionDoCallback(item); + } + } + + } + + public void Push(List list) + { + if (!list.Any()) + { + return; + } + + try + { + + using (var writer = new StreamWriter(this._backupFile, true, Encoding.UTF8)) + { + foreach (var item in list) + { + if (item.CallbackRetryTimes >= Constants.MaxCallbackRetryTimes) + { + _logger.LogInformation("callback too many times and will be abandon,logId {logId}", item.LogId); + } + else + { + item.CallbackRetryTimes++; + byte[] buffer = Utf8Json.JsonSerializer.Serialize(item,ProjectDefaultResolver.Instance); + writer.WriteLine(Encoding.UTF8.GetString(buffer)); + } + } + } + } + catch (Exception ex) + { + _logger.LogError(ex, "SaveCallbackParams error."); + } + } + + public void Dispose() + { + this._stop = true; + this._runTask?.GetAwaiter().GetResult(); + } + } +} \ No newline at end of file diff --git a/src/DotXxlJob.Core/ServiceCollectionExtensions.cs b/src/DotXxlJob.Core/ServiceCollectionExtensions.cs deleted file mode 100644 index 5f35362..0000000 --- a/src/DotXxlJob.Core/ServiceCollectionExtensions.cs +++ /dev/null @@ -1,21 +0,0 @@ -using Microsoft.Extensions.DependencyInjection; - -namespace DotXxlJob.Core -{ - public static class ServiceCollectionExtensions - { - public static IServiceCollection AddXxlJobExecutor(this IServiceCollection services) - { - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - - return services; - } - } -} \ No newline at end of file diff --git a/src/DotXxlJob.Core/TaskExecutors/BeanTaskExecutor.cs b/src/DotXxlJob.Core/TaskExecutors/BeanTaskExecutor.cs index b41134c..3c85c06 100644 --- a/src/DotXxlJob.Core/TaskExecutors/BeanTaskExecutor.cs +++ b/src/DotXxlJob.Core/TaskExecutors/BeanTaskExecutor.cs @@ -9,10 +9,12 @@ namespace DotXxlJob.Core.TaskExecutors public class BeanTaskExecutor:ITaskExecutor { private readonly IJobHandlerFactory _handlerFactory; + private readonly IJobLogger _jobLogger; - public BeanTaskExecutor(IJobHandlerFactory handlerFactory) + public BeanTaskExecutor(IJobHandlerFactory handlerFactory,IJobLogger jobLogger) { - _handlerFactory = handlerFactory; + this._handlerFactory = handlerFactory; + this._jobLogger = jobLogger; } public string GlueType { get; } = Constants.GlueType.BEAN; @@ -23,12 +25,10 @@ namespace DotXxlJob.Core.TaskExecutors if (handler == null) { - return Task.FromResult(ReturnT.Failed($"job handler [{triggerParam.ExecutorHandler} not found.")); } - - return Task.FromResult(ReturnT.Success("OK")); - //return handler.Execute(new JobExecuteContext()); + var context = new JobExecuteContext(this._jobLogger, triggerParam.ExecutorParams); + return handler.Execute(context); } } } \ No newline at end of file diff --git a/src/DotXxlJob.Core/ITaskExecutor.cs b/src/DotXxlJob.Core/TaskExecutors/ITaskExecutor.cs similarity index 100% rename from src/DotXxlJob.Core/ITaskExecutor.cs rename to src/DotXxlJob.Core/TaskExecutors/ITaskExecutor.cs diff --git a/src/DotXxlJob.Core/XxlRpcServiceHandler.cs b/src/DotXxlJob.Core/XxlRpcServiceHandler.cs index fb0a6f2..0825155 100644 --- a/src/DotXxlJob.Core/XxlRpcServiceHandler.cs +++ b/src/DotXxlJob.Core/XxlRpcServiceHandler.cs @@ -4,6 +4,7 @@ using System.IO; using System.Linq; using System.Reflection; using System.Threading.Tasks; +using Hessian; using DotXxlJob.Core.Config; using DotXxlJob.Core.Model; using Microsoft.Extensions.Logging; @@ -19,6 +20,7 @@ namespace DotXxlJob.Core { private readonly JobDispatcher _jobDispatcher; + private readonly IJobLogger _jobLogger; private readonly ILogger _logger; private readonly XxlJobExecutorOptions _options; @@ -27,10 +29,12 @@ namespace DotXxlJob.Core public XxlRpcServiceHandler(IOptions optionsAccessor, JobDispatcher jobDispatcher, + IJobLogger jobLogger, ILogger logger) { - _jobDispatcher = jobDispatcher; + this._jobDispatcher = jobDispatcher; + this._jobLogger = jobLogger; this._logger = logger; this._options = optionsAccessor.Value; @@ -46,20 +50,24 @@ namespace DotXxlJob.Core ///
/// /// - /// public async Task HandlerAsync(Stream reqStream) { var req = HessianSerializer.DeserializeRequest(reqStream); - - + var res = new RpcResponse { RequestId = req.RequestId}; + if (!ValidRequest(req, out var error)) { + this._logger.LogWarning("job task request is not valid:{error}",error); res.ErrorMsg = error; } else { + this._logger.LogDebug("receive job task ,req.RequestId={requestId},method={methodName}" + ,req.RequestId,req.MethodName); await Invoke(req, res); + this._logger.LogDebug("completed receive job task ,req.RequestId={requestId},method={methodName},IsError={IsError}" + ,req.RequestId,req.MethodName,res.IsError); } using (var outputStream = new MemoryStream()) @@ -91,7 +99,7 @@ namespace DotXxlJob.Core return false; } - if (DateTime.UtcNow.Subtract(req.CreateMillisTime.FromUnixTimeMilliseconds()) > Constants.RpcRequestExpireTimeSpan) + if (DateTime.UtcNow.Subtract(req.CreateMillisTime.FromMilliseconds()) > Constants.RpcRequestExpireTimeSpan) { error = "request is timeout!"; return false; @@ -120,6 +128,7 @@ namespace DotXxlJob.Core if (method == null) { res.ErrorMsg = $"The method{req.MethodName} is not defined."; + this._logger.LogWarning( $"The method{req.MethodName} is not defined."); } else { @@ -131,7 +140,8 @@ namespace DotXxlJob.Core } catch (Exception ex) { - res.ErrorMsg = ex.ToString(); + res.ErrorMsg = ex.Message +"\n--------------\n"+ ex.StackTrace; + this._logger.LogError(ex,"invoke method error:{0}",ex.Message); } return Task.CompletedTask; @@ -177,7 +187,7 @@ namespace DotXxlJob.Core } /// - /// TODO:获取执行日志 + /// read Log /// /// /// @@ -185,9 +195,9 @@ namespace DotXxlJob.Core /// private ReturnT Log(long logDateTime, int logId, int fromLineNum) { - //var logResult = JobLogger.ReadLog(logDateTime, logId, fromLineNum); - Console.WriteLine("{0} ---{1} --{2}",logDateTime,logId,fromLineNum); - return ReturnT.Success(null); + var ret = ReturnT.Success(null); + ret.Content = this._jobLogger.ReadLog(logDateTime, logId, fromLineNum); + return ret; } /// diff --git a/src/Hessian/DateTimeExtension.cs b/src/Hessian/DateTimeExtension.cs index 03e952b..7a7800b 100644 --- a/src/Hessian/DateTimeExtension.cs +++ b/src/Hessian/DateTimeExtension.cs @@ -5,7 +5,7 @@ namespace Hessian /// /// /// - internal static class DateTimeExtension + public static class DateTimeExtension { private const long Era = 62135596800000L; private const long Millis = 60000; @@ -36,7 +36,7 @@ namespace Hessian /// /// /// - public static DateTime FromMinutes(int value) + public static DateTime FromMinutes(this int value) { var ticks = (value * Millis + Era) * 10000; return new DateTime(ticks, DateTimeKind.Utc); @@ -47,7 +47,7 @@ namespace Hessian /// /// /// - public static DateTime FromMilliseconds(long value) + public static DateTime FromMilliseconds(this long value) { var ticks = (value + Era) * 10000; return new DateTime(ticks, DateTimeKind.Utc); diff --git a/src/Hessian/Deserializer.cs b/src/Hessian/Deserializer.cs index fb73d7c..bdc9f60 100644 --- a/src/Hessian/Deserializer.cs +++ b/src/Hessian/Deserializer.cs @@ -31,6 +31,13 @@ namespace Hessian typeNameRefs = new ListRefMap(); } + + public bool CanRead() + { + var tag = reader.Peek (); + return tag.HasValue; + } + #region ReadValue public object ReadValue () @@ -40,127 +47,127 @@ namespace Hessian if (!tag.HasValue) { throw new EndOfStreamException(); } - + Console.WriteLine("------------ 0x{0:x2}----------------",tag.Value); switch (tag.Value) { case 0x00: case 0x01: case 0x02: case 0x03: case 0x04: case 0x05: case 0x06: case 0x07: case 0x08: case 0x09: case 0x0A: case 0x0B: case 0x0C: case 0x0D: case 0x0E: case 0x0F: case 0x10: case 0x11: case 0x12: case 0x13: case 0x14: case 0x15: case 0x16: case 0x17: case 0x18: case 0x19: case 0x1A: case 0x1B: case 0x1C: case 0x1D: case 0x1E: case 0x1F: - + Console.WriteLine("ReadShortString"); return ReadShortString(); case 0x20: case 0x21: case 0x22: case 0x23: case 0x24: case 0x25: case 0x26: case 0x27: case 0x28: case 0x29: case 0x2A: case 0x2B: case 0x2C: case 0x2D: case 0x2E: case 0x2F: - + Console.WriteLine("ReadShortBinary"); return ReadShortBinary(); case 0x30: case 0x31: case 0x32: case 0x33: - + Console.WriteLine("ReadMediumString"); return ReadMediumString(); case 0x34: case 0x35: case 0x36: case 0x37: - + Console.WriteLine("ReadMediumBinary"); return ReadMediumBinary(); case 0x38: case 0x39: case 0x3A: case 0x3B: case 0x3C: case 0x3D: case 0x3E: case 0x3F: - + Console.WriteLine("ReadLongThreeBytes"); return ReadLongThreeBytes(); case 0x40: - + Console.WriteLine("Reserved"); return Reserved(); case 0x41: case 0x42: - + Console.WriteLine("ReadChunkedBinary"); return ReadChunkedBinary(); case 0x43: - + Console.WriteLine("ReadClassDefinition"); return ReadClassDefinition(); case 0x44: - + Console.WriteLine("ReadFullDouble"); return ReadFullDouble(); case 0x45: - + Console.WriteLine("Reserved"); return Reserved(); case 0x46: - + Console.WriteLine("ReadBoolean"); return ReadBoolean(); case 0x47: - + Console.WriteLine("Reserved"); return Reserved(); case 0x48: - + Console.WriteLine("ReadUntypedMap"); return ReadUntypedMap(); case 0x49: - + Console.WriteLine("ReadInteger"); return ReadInteger(); case 0x4A: - + Console.WriteLine("ReadDateInMillis"); return ReadDateInMillis(); case 0x4B: - + Console.WriteLine("ReadDateInMinutes"); return ReadDateInMinutes(); case 0x4C: - + Console.WriteLine("ReadLongFull"); return ReadLongFull(); case 0x4D: - + Console.WriteLine("ReadTypedMap"); return ReadTypedMap(); case 0x4E: - + Console.WriteLine("ReadNull"); return ReadNull(); case 0x4F: - + Console.WriteLine("ReadObject"); return ReadObject(); case 0x50: - + Console.WriteLine("Reserved"); return Reserved(); case 0x51: - + Console.WriteLine("ReadRef"); return ReadRef(); case 0x52: case 0x53: - + Console.WriteLine("ReadChunkedString"); return ReadChunkedString(); case 0x54: - + Console.WriteLine("ReadBoolean"); return ReadBoolean(); case 0x55: - + Console.WriteLine("ReadVarList"); return ReadVarList(); case 0x56: - + Console.WriteLine("ReadFixList"); return ReadFixList(); case 0x57: - + Console.WriteLine("ReadVarListUntyped"); return ReadVarListUntyped(); case 0x58: - + Console.WriteLine("ReadFixListUntyped"); return ReadFixListUntyped(); case 0x59: - + Console.WriteLine("ReadLongFourBytes"); return ReadLongFourBytes(); case 0x5A: @@ -168,30 +175,32 @@ namespace Hessian throw new UnexpectedTagException(0x5A, "value"); case 0x5B: case 0x5C: - + Console.WriteLine("ReadDoubleOneByte"); return ReadDoubleOneByte(); case 0x5D: - + Console.WriteLine("ReadDoubleOneByte"); return ReadDoubleOneByte(); case 0x5E: - + Console.WriteLine("ReadDoubleTwoBytes"); return ReadDoubleTwoBytes(); case 0x5F: - + Console.WriteLine("ReadDoubleFourBytes"); return ReadDoubleFourBytes(); case 0x60: case 0x61: case 0x62: case 0x63: case 0x64: case 0x65: case 0x66: case 0x67: case 0x68: case 0x69: case 0x6A: case 0x6B: case 0x6C: case 0x6D: case 0x6E: case 0x6F: - + Console.WriteLine("ReadObjectCompact"); return ReadObjectCompact(); case 0x70: case 0x71: case 0x72: case 0x73: case 0x74: case 0x75: case 0x76: case 0x77: + Console.WriteLine("ReadCompactFixList"); return ReadCompactFixList(); case 0x78: case 0x79: case 0x7A: case 0x7B: case 0x7C: case 0x7D: case 0x7E: case 0x7F: + Console.WriteLine("ReadCompactFixListUntyped"); return ReadCompactFixListUntyped(); case 0x80: case 0x81: case 0x82: case 0x83: case 0x84: case 0x85: case 0x86: case 0x87: @@ -202,28 +211,30 @@ namespace Hessian case 0xA8: case 0xA9: case 0xAA: case 0xAB: case 0xAC: case 0xAD: case 0xAE: case 0xAF: case 0xB0: case 0xB1: case 0xB2: case 0xB3: case 0xB4: case 0xB5: case 0xB6: case 0xB7: case 0xB8: case 0xB9: case 0xBA: case 0xBB: case 0xBC: case 0xBD: case 0xBE: case 0xBF: + Console.WriteLine("ReadIntegerSingleByte"); return ReadIntegerSingleByte(); case 0xC0: case 0xC1: case 0xC2: case 0xC3: case 0xC4: case 0xC5: case 0xC6: case 0xC7: case 0xC8: case 0xC9: case 0xCA: case 0xCB: case 0xCC: case 0xCD: case 0xCE: case 0xCF: - + Console.WriteLine("ReadIntegerTwoBytes"); return ReadIntegerTwoBytes(); case 0xD0: case 0xD1: case 0xD2: case 0xD3: case 0xD4: case 0xD5: case 0xD6: case 0xD7: - + Console.WriteLine("ReadIntegerThreeBytes"); return ReadIntegerThreeBytes(); case 0xD8: case 0xD9: case 0xDA: case 0xDB: case 0xDC: case 0xDD: case 0xDE: case 0xDF: case 0xE0: case 0xE1: case 0xE2: case 0xE3: case 0xE4: case 0xE5: case 0xE6: case 0xE7: case 0xE8: case 0xE9: case 0xEA: case 0xEB: case 0xEC: case 0xED: case 0xEE: case 0xEF: + Console.WriteLine("ReadLongOneByte"); return ReadLongOneByte(); case 0xF0: case 0xF1: case 0xF2: case 0xF3: case 0xF4: case 0xF5: case 0xF6: case 0xF7: case 0xF8: case 0xF9: case 0xFA: case 0xFB: case 0xFC: case 0xFD: case 0xFE: case 0xFF: + Console.WriteLine("ReadLongTwoBytes"); return ReadLongTwoBytes(); } - throw new Exception("WTF: byte value " + tag.Value + " not accounted for!"); }