pull/1/head
wangshaoming 7 years ago
parent 92f054db57
commit 2d2dac9cb3
No known key found for this signature in database
GPG Key ID: 29F5223B4DB362B5
  1. 5
      samples/ASPNetCoreExecutor/ASPNetCoreExecutor.csproj
  2. 0
      samples/ASPNetCoreExecutor/Extensions/ApplicationBuilderExtensions.cs
  3. 0
      samples/ASPNetCoreExecutor/Extensions/XxlJobExecutorMiddleware.cs
  4. 16
      samples/ASPNetCoreExecutor/Startup.cs
  5. 9
      samples/ASPNetCoreExecutor/appsettings.json
  6. 72
      samples/HessianReader/NewFile1.txt
  7. 19
      samples/HessianReader/Program.cs
  8. BIN
      samples/HessianReader/request.dat
  9. 25
      src/DotXxlJob.Core/AdminClient.cs
  10. 5
      src/DotXxlJob.Core/Attributes/JobHandlerAttribute.cs
  11. 13
      src/DotXxlJob.Core/CallbackTaskQueue.cs
  12. 15
      src/DotXxlJob.Core/Config/XxlJobExecutorOptions.cs
  13. 35
      src/DotXxlJob.Core/DateTimeExtensions.cs
  14. 16
      src/DotXxlJob.Core/DefaultHandlers/AbsJobHandler.cs
  15. 15
      src/DotXxlJob.Core/DefaultHandlers/HttpJobHandler.cs
  16. 2
      src/DotXxlJob.Core/DefaultJobHandlerFactory.cs
  17. 2
      src/DotXxlJob.Core/DotXxlJob.Core.csproj
  18. 44
      src/DotXxlJob.Core/Extensions/ServiceCollectionExtensions.cs
  19. 0
      src/DotXxlJob.Core/Hosted/JobExecuteHostedService.cs
  20. 18
      src/DotXxlJob.Core/HttpJobHandler.cs
  21. 20
      src/DotXxlJob.Core/IXxlJobExecutor.cs
  22. 4
      src/DotXxlJob.Core/Internal/Constants.cs
  23. 122
      src/DotXxlJob.Core/Internal/HessianSerializer.cs
  24. 42
      src/DotXxlJob.Core/JobDispatcher.cs
  25. 7
      src/DotXxlJob.Core/JobLogger.cs
  26. 64
      src/DotXxlJob.Core/Json/ProjectDefaultResolver.cs
  27. 23
      src/DotXxlJob.Core/Logger/IJobLogger.cs
  28. 185
      src/DotXxlJob.Core/Logger/JobLogger.cs
  29. 10
      src/DotXxlJob.Core/Model/CallbackParam.cs
  30. 28
      src/DotXxlJob.Core/Model/HandleCallbackParam.cs
  31. 8
      src/DotXxlJob.Core/Model/JobExecuteContext.cs
  32. 9
      src/DotXxlJob.Core/Model/LogResult.cs
  33. 113
      src/DotXxlJob.Core/Queue/CallbackTaskQueue.cs
  34. 59
      src/DotXxlJob.Core/Queue/JobTaskQueue.cs
  35. 125
      src/DotXxlJob.Core/Queue/RetryCallbackTaskQueue.cs
  36. 21
      src/DotXxlJob.Core/ServiceCollectionExtensions.cs
  37. 12
      src/DotXxlJob.Core/TaskExecutors/BeanTaskExecutor.cs
  38. 0
      src/DotXxlJob.Core/TaskExecutors/ITaskExecutor.cs
  39. 30
      src/DotXxlJob.Core/XxlRpcServiceHandler.cs
  40. 6
      src/Hessian/DateTimeExtension.cs
  41. 87
      src/Hessian/Deserializer.cs

@ -5,7 +5,10 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.App" />
<PackageReference Include="Microsoft.AspNetCore.App" />
<PackageReference Include="Microsoft.Extensions.Logging.Console">
<Version>2.2.0</Version>
</PackageReference>
</ItemGroup>
<ItemGroup>

@ -1,22 +1,34 @@
using DotXxlJob.Core;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
namespace ASPNetCoreExecutor
{
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
private IConfiguration Configuration { get; }
// This method gets called by the runtime. Use this method to add services to the container.
// For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
public void ConfigureServices(IServiceCollection services)
{
services.AddXxlJobExecutor();
services.AddXxlJobExecutor(Configuration);
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
public void Configure(IApplicationBuilder app,ILoggerFactory loggerFactory, IHostingEnvironment env)
{
loggerFactory.AddConsole();
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();

@ -4,5 +4,12 @@
"Default": "Warning"
}
},
"AllowedHosts": "*"
"xxlJob": {
"adminAddresses":"http://127.0.0.1:8101",
"appName": "ASPNetCoreExecutor",
"port": 5000,
"accessToken": "",
"logRetentionDays": 30
}
}

@ -0,0 +1,72 @@
------------ 0x43----------------
ReadClassDefinition
Hessian.ClassDef={"Name":"com.xxl.rpc.remoting.net.params.XxlRpcRequest","Fields":["requestId","createMillisTime","accessToken","className","methodName","version","parameterTypes","parameters"]}
------------------------------------------------------------
------------ 0x60----------------
ReadObjectCompact
------------ 0x00----------------
ReadShortString requestId
------------ 0x4c----------------
ReadLongFull createMillisTime
------------ 0x00----------------
ReadShortString accessToken
------------ 0x00----------------
ReadShortString className
------------ 0x08----------------
ReadShortString methodName
------------ 0x4e----------------
ReadNull version
------------ 0x71----------------
ReadCompactFixList parameterTypes
------------ 0x43----------------
ReadClassDefinition
------------ 0x61----------------
ReadObjectCompact
------------ 0x0e----------------
ReadShortString
Hessian.HessianObject=[{"Item1":"requestId","Item2":""},{"Item1":"createMillisTime","Item2":1547809331509},{"Item1":"accessToken","Item2":""},{"Item1":"className","Item2":""},{"Item1":"methodName","Item2":"callback"}
,{"Item1":"version","Item2":null},{"Item1":"parameterTypes","Item2":[{"Name":"java.lang.Clas
s","Fields":["name"]}]},{"Item1":"parameters","Item2":[{"Item1":"name","Item2":"java.util.List"}]}]
------------------------------------------------------------
------------ 0x72----------------
ReadCompactFixList List
------------ 0x43----------------
ReadClassDefinition HandleCallbackParam
------------ 0x62----------------
ReadObjectCompact HandleCallbackParam
------------ 0xcc----------------
ReadIntegerTwoBytes logId
------------ 0x4c----------------
ReadLongFull logDateTim
------------ 0x43----------------
ReadClassDefinition executeResult
System.Collections.Generic.List`1[System.Object]=[{"Name":"com.xxl.job.core.biz.model.HandleCallbackParam","Fields":["logId","logDateTim","executeResult"]},[{"Item1":"logId","Item2":1111},{"Item1":"logDateTim","Item2":1547809331507},{"Item1":"executeResult","Item2":{"Name":"com.xxl.job.core.biz.model.Return
T","Fields":["code","msg","content"]}}]]
------------------------------------------------------------
------------ 0x63----------------
ReadObjectCompact ReturnT
------------ 0xc8----------------
ReadIntegerTwoBytes code
------------ 0x4e----------------
ReadNull msg
------------ 0x07----------------
ReadShortString aaaaaaa
Hessian.HessianObject=[{"Item1":"code","Item2":200},{"Item1":"msg","Item2":null},{"Item1":"content","Item2":"aaaaaaa"}]
------------------------------------------------------------
------------ 0x62----------------
ReadObjectCompact HandleCallbackParam
------------ 0xd7----------------
ReadIntegerThreeBytes logId
------------ 0x4c----------------
ReadLongFull logDateTim
------------ 0x63----------------
ReadObjectCompact executeResult
------------ 0xc8----------------
ReadIntegerTwoBytes code
------------ 0x4e----------------
ReadNull msg
------------ 0x06----------------
ReadShortString content
Hessian.HessianObject=[{"Item1":"logId","Item2":222222},{"Item1":"logDateTim","Item2":1547809331507},{"Item1":"executeResult","Item2":[{"Item1":"code","Item2":200},{"Item1":"msg","Item2":null},{"Item1":"content","Item2":"bbbbbb"}]}]
------------------------------------------------------------

@ -12,7 +12,7 @@ namespace HessianReader
{
static void Main(string[] args)
{
byte[] myBinary = File.ReadAllBytes("log.dat");
byte[] myBinary = File.ReadAllBytes("request.dat");
foreach (var i in myBinary)
{
@ -28,12 +28,23 @@ namespace HessianReader
using (var stream1 = new MemoryStream(myBinary))
{
var s1 = HessianSerializer.DeserializeRequest(stream1);
Console.WriteLine(JsonConvert.SerializeObject(s1));
//var s1 = HessianSerializer.DeserializeRequest(stream1);
var s = new Deserializer(stream1);
while ( stream1.CanRead)
{
var o = s.ReadValue();
Console.WriteLine("{0}={1}",o.GetType(),JsonConvert.SerializeObject(o));
Console.WriteLine("------------------------------------------------------------");
}
}
Console.WriteLine("------------------------------------------------------------");
return;
RpcResponse response = new RpcResponse {
RequestId = Guid.NewGuid().ToString("N"), Result = ReturnT.Failed("ABCDEFG")

Binary file not shown.

@ -2,10 +2,10 @@ using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Linq.Expressions;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading.Tasks;
using Hessian;
using DotXxlJob.Core.Config;
using DotXxlJob.Core.Model;
using Microsoft.Extensions.Logging;
@ -29,7 +29,7 @@ namespace DotXxlJob.Core
,ILogger<AdminClient> logger)
{
this._options = optionsAccessor.Value;
_clientFactory = clientFactory;
this._clientFactory = clientFactory;
this._logger = logger;
InitAddress();
}
@ -37,7 +37,7 @@ namespace DotXxlJob.Core
private void InitAddress()
{
this._addresses = new List<AddressEntry>();
foreach (var item in this._options.AdminAddresses)
foreach (var item in this._options.AdminAddresses.Split(';'))
{
try
{
@ -73,7 +73,7 @@ namespace DotXxlJob.Core
object parameters)
{
var request = new RpcRequest {
CreateMillisTime = DateTime.Now.ToUnixTimeSeconds(),
CreateMillisTime = DateTime.Now.GetTotalMilliseconds(),
AccessToken = this._options.AccessToken,
ClassName = "com.xxl.job.core.biz.AdminBiz",
MethodName = methodName,
@ -93,23 +93,23 @@ namespace DotXxlJob.Core
using (var client = this._clientFactory.CreateClient())
{
while (triedTimes++ < _addresses.Count)
while (triedTimes++ < this._addresses.Count)
{
var address = _addresses[_currentIndex];
_currentIndex = (_currentIndex + 1) % _addresses.Count;
var address = this._addresses[this._currentIndex];
this._currentIndex = (this._currentIndex + 1) % this._addresses.Count;
if (!address.CheckAccessable())
continue;
Stream resStream;
try
{
resStream =await DoPost(client, address, postBuf);
resStream = await DoPost(client, address, postBuf);
address.Reset();
}
catch (Exception ex)
{
_logger.LogError(ex, "request admin error.");
this._logger.LogError(ex, "request admin error.");
address.SetFail();
continue;
}
@ -121,13 +121,13 @@ namespace DotXxlJob.Core
}
catch (Exception ex)
{
_logger.LogError(ex,"DeserializeResponse error:"+ex.Message);
this._logger.LogError(ex,"DeserializeResponse error:"+ex.Message);
}
if (res == null)
{
return ReturnT.Failed("response is nul");
return ReturnT.Failed("response is null");
}
@ -139,7 +139,7 @@ namespace DotXxlJob.Core
return res.Result as ReturnT;
}
}
throw new Exception("xxl-rpc server address not accessable.");
throw new Exception("xxl-rpc server address not accessible.");
}
@ -149,6 +149,7 @@ namespace DotXxlJob.Core
var content = new ByteArrayContent(postBuf);
content.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");
var responseMessage = await client.PostAsync(address.RequestUri, content);
responseMessage.EnsureSuccessStatusCode();
return await responseMessage.Content.ReadAsStreamAsync();
}

@ -10,5 +10,10 @@ namespace DotXxlJob.Core
}
public string Name { get; }
/// <summary>
/// set Ignore
/// </summary>
public bool Ignore { get; set; }
}
}

@ -1,13 +0,0 @@
using DotXxlJob.Core.Model;
namespace DotXxlJob.Core
{
public class CallbackTaskQueue
{
public void Push(CallbackParam callbackParam)
{
//throw new System.NotImplementedException();
}
}
}

@ -1,3 +1,6 @@
using System;
using System.IO;
namespace DotXxlJob.Core.Config
{
public class XxlJobExecutorOptions
@ -5,8 +8,8 @@ namespace DotXxlJob.Core.Config
public string AdminAddresses { get; set; }
public string AppName { get; set; }
public string AppName { get; set; } = "DotXxlJob";
public string SpecialBindAddress { get; set; }
@ -17,11 +20,11 @@ namespace DotXxlJob.Core.Config
public string AccessToken { get; set; }
public string LogPath { get; set; }
public int LogRetentionDays { get; set; }
public string LogPath { get; set; } = Path.Combine(AppContext.BaseDirectory, "./logs");
public int LogRetentionDays { get; set; } = 30;
}
}

@ -1,35 +0,0 @@
using System;
namespace DotXxlJob.Core
{
public static class DateTimeExtensions
{
private const long UnixEpochTicks = 621355968000000000;
private const long UnixEpochSeconds = 62135596800;
private const long UnixEpochMilliseconds = 62135596800000;
public static DateTimeOffset FromUnixTimeSeconds(this long seconds)
{
long ticks = seconds * TimeSpan.TicksPerSecond + UnixEpochTicks;
return new DateTime(ticks, DateTimeKind.Utc);
}
public static DateTime FromUnixTimeMilliseconds(this long milliseconds)
{
long ticks = milliseconds * TimeSpan.TicksPerMillisecond + UnixEpochTicks;
return new DateTime(ticks, DateTimeKind.Utc);
}
public static long ToUnixTimeSeconds(this DateTime dateTime)
{
long seconds = dateTime.ToUniversalTime().Ticks / TimeSpan.TicksPerSecond;
return seconds - UnixEpochSeconds;
}
public static long ToUnixTimeMilliseconds(this DateTime dateTime)
{
long milliseconds = dateTime.ToUniversalTime().Ticks / TimeSpan.TicksPerMillisecond;
return milliseconds - UnixEpochMilliseconds;
}
}
}

@ -0,0 +1,16 @@
using System.Threading.Tasks;
using DotXxlJob.Core.Model;
namespace DotXxlJob.Core.DefaultHandlers
{
public abstract class AbsJobHandler:IJobHandler
{
public virtual void Dispose()
{
}
public abstract Task<ReturnT> Execute(JobExecuteContext context);
}
}

@ -0,0 +1,15 @@
using System.Threading.Tasks;
using DotXxlJob.Core.Model;
namespace DotXxlJob.Core.DefaultHandlers
{
[JobHandler("httpJobHandler")]
public class HttpJobHandler:AbsJobHandler
{
public override Task<ReturnT> Execute(JobExecuteContext context)
{
context.JobLogger.Log("Get Request Data:{0}",context.JobParameter);
return Task.FromResult(ReturnT.SUCCESS);
}
}
}

@ -1,3 +1,5 @@
using DotXxlJob.Core.DefaultHandlers;
namespace DotXxlJob.Core
{
public class DefaultJobHandlerFactory:IJobHandlerFactory

@ -7,6 +7,8 @@
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Http" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="2.2.0" />
<PackageReference Include="Utf8Json" Version="1.3.7" />
</ItemGroup>
<ItemGroup>

@ -0,0 +1,44 @@
using System;
using DotXxlJob.Core.Config;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
namespace DotXxlJob.Core
{
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddXxlJobExecutor(this IServiceCollection services,IConfiguration configuration)
{
services.AddLogging();
services.AddOptions();
services.Configure<XxlJobExecutorOptions>(configuration.GetSection("xxlJob"))
.AddXxlJobExecutorServiceDependency();
return services;
}
public static IServiceCollection AddXxlJobExecutor(this IServiceCollection services,Action<XxlJobExecutorOptions> configAction)
{
services.AddLogging();
services.AddOptions();
services.Configure(configAction).AddXxlJobExecutorServiceDependency();
return services;
}
private static IServiceCollection AddXxlJobExecutorServiceDependency(this IServiceCollection services)
{
services.AddHttpClient("DotXxlJobClient");
services.AddSingleton<IJobLogger, JobLogger>();
services.AddSingleton<ITaskExecutor, TaskExecutors.BeanTaskExecutor>();
services.AddSingleton<IJobHandlerFactory,DefaultJobHandlerFactory >();
services.AddSingleton<JobDispatcher>();
services.AddSingleton<TaskExecutorFactory>();
services.AddSingleton<XxlRpcServiceHandler>();
services.AddSingleton<CallbackTaskQueue>();
services.AddSingleton<AdminClient>();
services.AddSingleton<IExecutorRegistry, ExecutorRegistry>();
return services;
}
}
}

@ -1,18 +0,0 @@
using System.Threading.Tasks;
using DotXxlJob.Core.Model;
namespace DotXxlJob.Core
{
public class HttpJobHandler:IJobHandler
{
public void Dispose()
{
}
public Task<ReturnT> Execute(JobExecuteContext context)
{
return Task.FromResult(ReturnT.SUCCESS);
}
}
}

@ -1,20 +0,0 @@
using DotXxlJob.Core.Model;
namespace DotXxlJob.Core
{
public interface IXxlJobExecutor
{
ReturnT Beat();
ReturnT IdleBeat(int jobId);
ReturnT Kill(int jobId);
ReturnT Log(long logDateTim, int logId, int fromLineNum);
ReturnT Run(TriggerParam triggerParam);
}
}

@ -16,7 +16,7 @@ namespace DotXxlJob.Core
public const int MaxCallbackRetryTimes = 10;
//每次回调最多发送几条记录
public const int MaxCallbackRecordsPerRequest = 100;
public const int MaxCallbackRecordsPerRequest = 20;
public static TimeSpan CallbackRetryInterval = TimeSpan.FromSeconds(600);
//Admin集群机器请求默认超时时间
@ -26,7 +26,7 @@ namespace DotXxlJob.Core
//Admin集群中的某台机器请求失败多少次后熔断
public const int AdminServerCircuitFailedTimes = 3;
public static TimeSpan JobThreadWaitTime = TimeSpan.FromSeconds(90);
public static class GlueType
{

@ -23,64 +23,30 @@ namespace DotXxlJob.Core
private static readonly Dictionary<string, PropertyInfo> returnProperties =
new Dictionary<string, PropertyInfo>();
private static readonly Dictionary<string, PropertyInfo> callbackProperties =
new Dictionary<string, PropertyInfo>();
static HessianSerializer()
{
var typeInfo = typeof(RpcRequest).GetTypeInfo();
foreach (var property in typeInfo.DeclaredProperties)
{
var attribute = property.GetCustomAttribute<DataMemberAttribute>();
if (null == attribute)
{
continue;
}
if (!property.CanRead || !property.CanWrite)
{
continue;
}
requestProperties.Add(attribute.Name,property);
}
InitProperties(typeof(RpcRequest), requestProperties);
var triggerTypeInfo = typeof(TriggerParam).GetTypeInfo();
foreach (var property in triggerTypeInfo.DeclaredProperties)
{
var attribute = property.GetCustomAttribute<DataMemberAttribute>();
if (null == attribute)
{
continue;
}
if (!property.CanRead || !property.CanWrite)
{
continue;
}
triggerProperties.Add(attribute.Name,property);
}
InitProperties(typeof(TriggerParam), triggerProperties);
var rspTypeInfo = typeof(RpcResponse).GetTypeInfo();
foreach (var property in rspTypeInfo.DeclaredProperties)
{
var attribute = property.GetCustomAttribute<DataMemberAttribute>();
InitProperties(typeof(RpcResponse), responseProperties);
if (null == attribute)
{
continue;
}
InitProperties(typeof(ReturnT), returnProperties);
InitProperties(typeof(HandleCallbackParam), callbackProperties);
}
if (!property.CanRead || !property.CanWrite)
{
continue;
}
responseProperties.Add(attribute.Name,property);
}
var retTypeInfo = typeof(ReturnT).GetTypeInfo();
foreach (var property in retTypeInfo.DeclaredProperties)
private static void InitProperties(Type type, Dictionary<string, PropertyInfo> propertyInfos)
{
var typeInfo = type.GetTypeInfo();
foreach (var property in typeInfo.DeclaredProperties)
{
var attribute = property.GetCustomAttribute<DataMemberAttribute>();
@ -94,7 +60,7 @@ namespace DotXxlJob.Core
continue;
}
returnProperties.Add(attribute.Name,property);
propertyInfos.Add(attribute.Name,property);
}
}
@ -173,30 +139,56 @@ namespace DotXxlJob.Core
return;
}
if (itemType == typeof(ClassDef))
if (item is ClassDef paramClass)
{
var triggerClass = item as ClassDef;
//TODO:这里要做成动态的话 ,可以注册所有的实体到对应的字典中,不过这里只有这个类型哦
if (triggerClass.Name != "com.xxl.job.core.biz.model.TriggerParam")
if (paramClass.Name != "com.xxl.job.core.biz.model.TriggerParam")
{
throw new HessianException($"not expected parameter type [{triggerClass.Name}]");
throw new HessianException($"not expected parameter type [{paramClass.Name}]");
}
if (!(deserializer.ReadValue() is HessianObject triggerData))
if (!(deserializer.ReadValue() is HessianObject paramData))
{
throw new HessianException("not expected parameter type ,data is null");
}
TriggerParam param = new TriggerParam();
foreach (var field in triggerData)
object val ;
if (paramClass.Name == "com.xxl.job.core.biz.model.TriggerParam")
{
val =new TriggerParam();
}
else
{
val =new HandleCallbackParam();
}
foreach (var (key,value) in paramData)
{
if (triggerProperties.TryGetValue(field.Item1, out var tgPropertyInfo))
if (triggerProperties.TryGetValue(key, out var tgPropertyInfo))
{
tgPropertyInfo.SetValue(param,field.Item2);
tgPropertyInfo.SetValue(val,value);
}
}
list.Add(val);
}
else if (item is HessianObject hessianObject)
{
if (hessianObject.TypeName == "com.xxl.job.core.biz.model.HandleCallbackParam")
{
var val =new HandleCallbackParam();
foreach (var (key,value) in hessianObject)
{
if (triggerProperties.TryGetValue(key, out var tgPropertyInfo))
{
tgPropertyInfo.SetValue(val,value);
}
}
list.Add(val);
}
}
else
{
throw new HessianException($"unsupported list item type =[{itemType}]");
}
@ -271,12 +263,12 @@ namespace DotXxlJob.Core
{
throw new HessianException("not expected parameter type ,data is null");
}
ReturnT data = new ReturnT();
foreach (var field in resultData)
var data = new ReturnT();
foreach (var (key, value) in resultData)
{
if (returnProperties.TryGetValue(field.Item1, out var tgPropertyInfo))
if (returnProperties.TryGetValue(key, out var tgPropertyInfo))
{
tgPropertyInfo.SetValue(data,field.Item2);
tgPropertyInfo.SetValue(data,value);
}
}

@ -12,23 +12,26 @@ namespace DotXxlJob.Core
{
private readonly TaskExecutorFactory _executorFactory;
private readonly CallbackTaskQueue _callbackTaskQueue;
private readonly ConcurrentDictionary<int,JobQueue> RUNNING_QUEUE = new ConcurrentDictionary<int, JobQueue>();
private readonly IJobLogger _jobLogger;
private readonly ConcurrentDictionary<int,JobTaskQueue> RUNNING_QUEUE = new ConcurrentDictionary<int, JobTaskQueue>();
private readonly ILogger<JobQueue> _jobQueueLogger;
private readonly ILogger<JobTaskQueue> _jobQueueLogger;
private readonly ILogger<JobDispatcher> _logger;
public JobDispatcher(
TaskExecutorFactory executorFactory,
CallbackTaskQueue callbackTaskQueue,
IJobLogger jobLogger,
ILoggerFactory loggerFactory
)
{
this. _executorFactory = executorFactory;
this. _callbackTaskQueue = callbackTaskQueue;
this._jobLogger = jobLogger;
this._jobQueueLogger = loggerFactory.CreateLogger<JobQueue>();
this._jobQueueLogger = loggerFactory.CreateLogger<JobTaskQueue>();
this._logger = loggerFactory.CreateLogger<JobDispatcher>();
}
@ -93,7 +96,7 @@ namespace DotXxlJob.Core
/// <summary>
/// 等待检查
/// IdleBeat
/// </summary>
/// <param name="jobId"></param>
/// <returns></returns>
@ -103,11 +106,23 @@ namespace DotXxlJob.Core
new ReturnT(ReturnT.FAIL_CODE, "job thread is running or has trigger queue.")
: ReturnT.SUCCESS;
}
private void TriggerCallback(object sender, HandleCallbackParam callbackParam)
{
this._callbackTaskQueue.Push(callbackParam);
}
private ReturnT PushJobQueue(TriggerParam triggerParam, ITaskExecutor executor)
{
JobQueue jobQueue = new JobQueue ( executor, this._callbackTaskQueue,this._jobQueueLogger);
if (RUNNING_QUEUE.TryGetValue(triggerParam.JobId,out var jobQueue))
{
return jobQueue.Push(triggerParam);
}
//NewJobId
jobQueue = new JobTaskQueue ( executor,this._jobLogger, this._jobQueueLogger);
jobQueue.CallBack += TriggerCallback;
if (RUNNING_QUEUE.TryAdd(triggerParam.JobId, jobQueue))
{
return jobQueue.Push(triggerParam);
@ -117,9 +132,16 @@ namespace DotXxlJob.Core
private ReturnT ChangeJobQueue(TriggerParam triggerParam, ITaskExecutor executor)
{
if (RUNNING_QUEUE.TryRemove(triggerParam.JobId, out var oldJobTask))
{
oldJobTask.CallBack -= TriggerCallback;
oldJobTask.Dispose(); //释放原来的资源
}
JobQueue jobQueue = new JobQueue ( executor, this._callbackTaskQueue,this._jobQueueLogger);
if (RUNNING_QUEUE.TryUpdate(triggerParam.JobId, jobQueue, null))
JobTaskQueue jobQueue = new JobTaskQueue ( executor,this._jobLogger, this._jobQueueLogger);
jobQueue.CallBack += TriggerCallback;
if (RUNNING_QUEUE.TryAdd(triggerParam.JobId, jobQueue))
{
return jobQueue.Push(triggerParam);
}

@ -1,7 +0,0 @@
namespace DotXxlJob.Core
{
public class JobLogger
{
}
}

@ -0,0 +1,64 @@
using System.Reflection;
using Utf8Json;
using Utf8Json.Formatters;
using Utf8Json.Resolvers;
namespace DotXxlJob.Core.Json
{
public class ProjectDefaultResolver : IJsonFormatterResolver
{
public static IJsonFormatterResolver Instance = new ProjectDefaultResolver();
// configure your resolver and formatters.
static readonly IJsonFormatter[] formatters = {
new DateTimeFormatter("yyyy-MM-dd HH:mm:ss"),
new NullableDateTimeFormatter("yyyy-MM-dd HH:mm:ss")
};
static readonly IJsonFormatterResolver[] resolvers = new[]
{
EnumResolver.UnderlyingValue,
StandardResolver.AllowPrivateExcludeNullSnakeCase
};
ProjectDefaultResolver()
{
}
public IJsonFormatter<T> GetFormatter<T>()
{
return FormatterCache<T>.formatter;
}
static class FormatterCache<T>
{
public static readonly IJsonFormatter<T> formatter;
static FormatterCache()
{
foreach (var item in formatters)
{
foreach (var implInterface in item.GetType().GetTypeInfo().ImplementedInterfaces)
{
var ti = implInterface.GetTypeInfo();
if (ti.IsGenericType && ti.GenericTypeArguments[0] == typeof(T))
{
formatter = (IJsonFormatter<T>)item;
return;
}
}
}
foreach (var item in resolvers)
{
var f = item.GetFormatter<T>();
if (f != null)
{
formatter = f;
return;
}
}
}
}
}
}

@ -0,0 +1,23 @@
using System;
using DotXxlJob.Core.Model;
namespace DotXxlJob.Core
{
public interface IJobLogger
{
void SetLogFile(long logTime, int logId);
void Log(string pattern, params object[] format);
void LogError(Exception ex);
LogResult ReadLog(long logTime, int logId, int fromLine);
void LogSpecialFile(long logTime, int logId, string pattern, params object[] format);
}
}

@ -0,0 +1,185 @@
using System;
using System.Diagnostics;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DotXxlJob.Core.Config;
using DotXxlJob.Core.Model;
using Hessian;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotXxlJob.Core
{
public class JobLogger:IJobLogger
{
private readonly ILogger<JobLogger> _logger;
private readonly AsyncLocal<string> LogFileName = new AsyncLocal<string>();
private readonly XxlJobExecutorOptions _options;
public JobLogger(IOptions<XxlJobExecutorOptions> optionsAccessor,ILogger<JobLogger> logger)
{
this._logger = logger;
this._options = optionsAccessor.Value;
}
public void SetLogFile(long logTime, int logId)
{
try
{
var filePath = MakeLogFileName(logTime, logId);
var dir = Path.GetDirectoryName(filePath);
if (!Directory.Exists(dir))
{
Directory.CreateDirectory(dir);
CleanOldLogs();
}
LogFileName.Value = filePath;
}
catch (Exception ex)
{
_logger.LogError(ex, "SetLogFileName error.");
}
}
public void Log(string pattern, params object[] format)
{
var appendLog = string.Format(pattern, format);
var callInfo = new StackTrace(true).GetFrame(1);
LogDetail(GetLogFileName(), callInfo, appendLog);
}
public void LogError(Exception ex)
{
var callInfo = new StackTrace(true).GetFrame(1);
LogDetail(GetLogFileName(), callInfo, ex.Message + ex.StackTrace);
}
public LogResult ReadLog(long logTime, int logId, int fromLine)
{
var filePath = MakeLogFileName(logTime, logId);
if (string.IsNullOrEmpty(filePath))
{
return new LogResult(fromLine, 0, "readLog fail, logFile not found", true);
}
if (!File.Exists(filePath))
{
return new LogResult(fromLine, 0, "readLog fail, logFile not exists", true);
}
// read file
var logContentBuffer = new StringBuilder();
int toLineNum = 0;
try
{
using (var reader = new StreamReader(filePath, Encoding.UTF8))
{
string line;
while ((line = reader.ReadLine()) != null)
{
toLineNum++;
if (toLineNum >= fromLine)
{
logContentBuffer.AppendLine(line);
}
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "ReadLog error.");
}
// result
var logResult = new LogResult(fromLine, toLineNum, logContentBuffer.ToString(), false);
return logResult;
}
public void LogSpecialFile(long logTime, int logId, string pattern, params object[] format)
{
var filePath = MakeLogFileName(logTime, logId);
var callInfo = new StackTrace(true).GetFrame(1);
var content = string.Format(pattern, format);
LogDetail(filePath, callInfo, content);
}
private string GetLogFileName()
{
return LogFileName.Value;
}
private string MakeLogFileName(long logDateTime, int logId)
{
//log fileName like: logPath/HandlerLogs/yyyy-MM-dd/9999.log
return Path.Combine(_options.LogPath, Constants.HandleLogsDirectory,
logDateTime.FromMilliseconds().ToString("yyyy-MM-dd"), $"{logId}.log");
}
private void LogDetail(string logFileName, StackFrame callInfo, string appendLog)
{
if (string.IsNullOrEmpty(logFileName))
{
return;
}
var stringBuffer = new StringBuilder();
stringBuffer
.Append(DateTime.Now.ToString("s")).Append(" ")
.Append("[" + callInfo.GetMethod().DeclaringType.FullName + "#" + callInfo.GetMethod().Name + "]").Append("-")
.Append("[line " + callInfo.GetFileLineNumber() + "]").Append("-")
.Append("[thread " + Thread.CurrentThread.ManagedThreadId + "]").Append(" ")
.Append(appendLog ?? string.Empty)
.AppendLine();
var formatAppendLog = stringBuffer.ToString();
try
{
File.AppendAllText(logFileName, formatAppendLog, Encoding.UTF8);
}
catch (Exception ex)
{
this._logger.LogError(ex, "LogDetail error");
}
}
private void CleanOldLogs()
{
if (_options.LogRetentionDays <= 0)
{
return;
}
Task.Run(() =>
{
try
{
var handlerLogsDir = new DirectoryInfo(Path.Combine(_options.LogPath, Constants.HandleLogsDirectory));
if (!handlerLogsDir.Exists)
{
return;
}
var today = DateTime.UtcNow.Date;
foreach (var dir in handlerLogsDir.GetDirectories())
{
if (DateTime.TryParse(dir.Name, out var dirDate))
{
if (today.Subtract(dirDate.Date).Days > _options.LogRetentionDays)
{
dir.Delete(true);
}
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "CleanOldLogs error.");
}
});
}
}
}

@ -1,10 +0,0 @@
namespace DotXxlJob.Core.Model
{
public class CallbackParam
{
public CallbackParam(TriggerParam triggerParam, ReturnT result)
{
throw new System.NotImplementedException();
}
}
}

@ -5,13 +5,25 @@ namespace DotXxlJob.Core.Model
[DataContract(Name = "com.xxl.job.core.biz.model.HandleCallbackParam")]
public class HandleCallbackParam
{
[DataMember(Name = "callbackRetryTimes",Order = 1)]
public int CallbackRetryTimes;
[DataMember(Name = "logId",Order = 2)]
public int LogId;
[DataMember(Name = "logDateTim",Order = 3)]
public long LogDateTim;
[DataMember(Name = "executeResult",Order = 4)]
public ReturnT ExecuteResult;
public HandleCallbackParam()
{
}
public HandleCallbackParam(TriggerParam triggerParam, ReturnT result)
{
this.LogId = triggerParam.LogId;
this.LogDateTime = triggerParam.LogDateTime;
this.ExecuteResult = result;
}
public int CallbackRetryTimes { get; set; }
[DataMember(Name = "logId",Order = 1)]
public int LogId { get; set; }
[DataMember(Name = "logDateTim",Order = 2)]
public long LogDateTime { get; set; }
[DataMember(Name = "executeResult",Order = 3)]
public ReturnT ExecuteResult { get; set; }
}
}

@ -2,6 +2,12 @@ namespace DotXxlJob.Core.Model
{
public class JobExecuteContext
{
public JobExecuteContext(IJobLogger jobLogger,string jobParameter)
{
this.JobLogger = jobLogger;
this.JobParameter = jobParameter;
}
public string JobParameter { get; }
public IJobLogger JobLogger { get; }
}
}

@ -5,6 +5,15 @@ namespace DotXxlJob.Core.Model
[DataContract(Name = "com.xxl.job.core.biz.model.LogResult")]
public class LogResult
{
public LogResult(int fromLine ,int toLine,string content,bool isEnd)
{
this.FromLineNum = fromLine;
this.ToLineNum = toLine;
this.LogContent = content;
this.IsEnd = isEnd;
}
[DataMember(Name = "fromLineNum",Order = 1)]
public int FromLineNum { get; set; }
[DataMember(Name = "toLineNum",Order = 2)]

@ -0,0 +1,113 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotXxlJob.Core.Config;
using DotXxlJob.Core.Model;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotXxlJob.Core
{
public class CallbackTaskQueue:IDisposable
{
private readonly AdminClient _adminClient;
private readonly IJobLogger _jobLogger;
private readonly RetryCallbackTaskQueue _retryQueue;
private readonly ILogger<CallbackTaskQueue> _logger;
private readonly ConcurrentQueue<HandleCallbackParam> TASK_QUEUE = new ConcurrentQueue<HandleCallbackParam>();
private bool _stop;
private bool _isRunning;
private Task _runTask;
public CallbackTaskQueue(AdminClient adminClient,IJobLogger jobLogger,IOptions<XxlJobExecutorOptions> optionsAccessor
, ILoggerFactory loggerFactory)
{
this._adminClient = adminClient;
this._jobLogger = jobLogger;
this._retryQueue = new RetryCallbackTaskQueue(optionsAccessor.Value.LogPath,
Push,
loggerFactory.CreateLogger<RetryCallbackTaskQueue>());
this._logger = loggerFactory.CreateLogger<CallbackTaskQueue>();
}
public void Push(HandleCallbackParam callbackParam)
{
TASK_QUEUE.Enqueue(callbackParam);
StartCallBack();
}
public void Dispose()
{
this._stop = true;
this._retryQueue.Dispose();
this._runTask?.GetAwaiter().GetResult();
}
private void StartCallBack()
{
if ( this._isRunning)
{
return;
}
this._runTask = Task.Run(async () =>
{
this._logger.LogDebug("start to callback");
this._isRunning = true;
while (!this._stop)
{
await DoCallBack();
}
this._logger.LogDebug("end to callback");
this._isRunning = false;
});
}
private async Task DoCallBack()
{
List<HandleCallbackParam> list = new List<HandleCallbackParam>();
while (list.Count < Constants.MaxCallbackRecordsPerRequest && this.TASK_QUEUE.TryDequeue(out var item))
{
list.Add(item);
}
if (!list.Any())
{
return;
}
ReturnT result;
try
{
result = await this._adminClient.Callback(list);
}
catch (Exception ex){
this._logger.LogError(ex,"trigger callback error:{error}",ex.Message);
result = ReturnT.Failed(ex.Message);
this._retryQueue.Push(list);
}
LogCallBackResult(result, list);
}
private void LogCallBackResult(ReturnT result,List<HandleCallbackParam> list)
{
foreach (var param in list)
{
this._jobLogger.LogSpecialFile(param.LogDateTime, param.LogId, result.Msg??"Empty");
}
}
}
}

@ -1,5 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Net.NetworkInformation;
using System.Threading;
using System.Threading.Tasks;
using DotXxlJob.Core.Model;
@ -7,23 +8,28 @@ using Microsoft.Extensions.Logging;
namespace DotXxlJob.Core
{
public class JobQueue
public class JobTaskQueue:IDisposable
{
private readonly ITaskExecutor _executor;
private readonly CallbackTaskQueue _callbackTaskQueue;
private readonly ILogger<JobQueue> _logger;
private readonly IJobLogger _jobLogger;
private readonly ILogger<JobTaskQueue> _logger;
private readonly ConcurrentQueue<TriggerParam> TASK_QUEUE = new ConcurrentQueue<TriggerParam>();
public JobQueue(ITaskExecutor executor,CallbackTaskQueue callbackTaskQueue,ILogger<JobQueue> logger)
private readonly ConcurrentDictionary<int, byte> ID_IN_QUEUE = new ConcurrentDictionary<int, byte>();
private CancellationTokenSource _cancellationTokenSource;
private Task _runTask;
public JobTaskQueue(ITaskExecutor executor,IJobLogger jobLogger,ILogger<JobTaskQueue> logger)
{
_executor = executor;
_callbackTaskQueue = callbackTaskQueue;
_logger = logger;
this._executor = executor;
this._jobLogger = jobLogger;
this._logger = logger;
}
public ITaskExecutor Executor => this._executor;
private CancellationTokenSource _cancellationTokenSource;
public event EventHandler<HandleCallbackParam> CallBack;
/// <summary>
/// 覆盖之前的队列
@ -37,12 +43,18 @@ namespace DotXxlJob.Core
{
TASK_QUEUE.TryDequeue(out _);
}
ID_IN_QUEUE.Clear();
return Push(triggerParam);
}
public ReturnT Push(TriggerParam triggerParam)
{
if(!ID_IN_QUEUE.TryAdd(triggerParam.LogId,0))
{
_logger.LogWarning("repeat job task,logId={logId},jobId={jobId}",triggerParam.LogId,triggerParam.JobId);
return ReturnT.Failed("repeat job task!");
}
this.TASK_QUEUE.Enqueue(triggerParam);
StartTask();
return ReturnT.SUCCESS;
@ -53,8 +65,20 @@ namespace DotXxlJob.Core
this._cancellationTokenSource?.Cancel();
this._cancellationTokenSource?.Dispose();
this._cancellationTokenSource = null;
//wait for task completed
this._runTask?.GetAwaiter().GetResult();
}
public void Dispose()
{
Stop();
while (!TASK_QUEUE.IsEmpty)
{
TASK_QUEUE.TryDequeue(out _);
}
ID_IN_QUEUE.Clear();
}
private void StartTask()
{
@ -65,7 +89,7 @@ namespace DotXxlJob.Core
this._cancellationTokenSource =new CancellationTokenSource();
CancellationToken ct = _cancellationTokenSource.Token;
Task.Factory.StartNew(async () =>
this._runTask = Task.Factory.StartNew(async () =>
{
//ct.ThrowIfCancellationRequested();
@ -84,7 +108,19 @@ namespace DotXxlJob.Core
if (TASK_QUEUE.TryDequeue(out triggerParam))
{
result = await this._executor.Execute(triggerParam);
if (ID_IN_QUEUE.TryRemove(triggerParam.LogId,out _))
{
this._logger.LogWarning("remove id in queue failed,logId={logId},jobId={jobId}"
,triggerParam.LogId,triggerParam.JobId);
}
//set log file;
this._jobLogger.SetLogFile(triggerParam.LogDateTime,triggerParam.LogId);
this._jobLogger.Log("<br>----------- xxl-job job execute start -----------<br>----------- Param:{0}" ,triggerParam.ExecutorParams);
result = await this._executor.Execute(triggerParam);
this._jobLogger.Log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + result.Code);
}
else
{
@ -94,11 +130,12 @@ namespace DotXxlJob.Core
catch (Exception ex)
{
result = ReturnT.Failed("Dequeue Task Failed:"+ex.Message);
this._jobLogger.Log("<br>----------- JobThread Exception:" + ex.Message + "<br>----------- xxl-job job execute end(error) -----------");
}
if(triggerParam !=null)
{
this._callbackTaskQueue.Push(new CallbackParam(triggerParam, result));
CallBack?.Invoke(this,new HandleCallbackParam(triggerParam, result??ReturnT.FAIL));
}
}

@ -0,0 +1,125 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using DotXxlJob.Core.Json;
using DotXxlJob.Core.Model;
using Microsoft.Extensions.Logging;
namespace DotXxlJob.Core
{
public class RetryCallbackTaskQueue:IDisposable
{
private readonly Action<HandleCallbackParam> _actionDoCallback;
private readonly ILogger<RetryCallbackTaskQueue> _logger;
private bool _stop;
private Task _runTask;
private readonly string _backupFile;
public RetryCallbackTaskQueue(string backupPath,Action<HandleCallbackParam> actionDoCallback,ILogger<RetryCallbackTaskQueue> logger)
{
this._actionDoCallback = actionDoCallback;
this._logger = logger;
this._backupFile = Path.Combine(backupPath, "xxl-job-callback.log");
var dir = Path.GetDirectoryName(backupPath);
if (!Directory.Exists(dir))
{
Directory.CreateDirectory(dir);
}
StartQueue();
}
private void StartQueue()
{
this._runTask = Task.Factory.StartNew(async () =>
{
while (!this._stop)
{
await LoadFromFile();
await Task.Delay(Constants.CallbackRetryInterval);
}
}, TaskCreationOptions.LongRunning);
}
private async Task LoadFromFile()
{
var list = new List<HandleCallbackParam>();
if (!File.Exists(_backupFile))
{
return;
}
var nextLine = string.Empty;
using (StreamReader reader = new StreamReader(this._backupFile))
{
while ((nextLine = await reader.ReadLineAsync()) != null)
{
try
{
list.Add(Utf8Json.JsonSerializer.Deserialize<HandleCallbackParam>(nextLine, ProjectDefaultResolver.Instance));
}
catch(Exception ex)
{
this._logger.LogError(ex,"de error:{error}",ex.Message);
}
}
}
if (list.Any())
{
foreach (var item in list)
{
this._actionDoCallback(item);
}
}
}
public void Push(List<HandleCallbackParam> list)
{
if (!list.Any())
{
return;
}
try
{
using (var writer = new StreamWriter(this._backupFile, true, Encoding.UTF8))
{
foreach (var item in list)
{
if (item.CallbackRetryTimes >= Constants.MaxCallbackRetryTimes)
{
_logger.LogInformation("callback too many times and will be abandon,logId {logId}", item.LogId);
}
else
{
item.CallbackRetryTimes++;
byte[] buffer = Utf8Json.JsonSerializer.Serialize(item,ProjectDefaultResolver.Instance);
writer.WriteLine(Encoding.UTF8.GetString(buffer));
}
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "SaveCallbackParams error.");
}
}
public void Dispose()
{
this._stop = true;
this._runTask?.GetAwaiter().GetResult();
}
}
}

@ -1,21 +0,0 @@
using Microsoft.Extensions.DependencyInjection;
namespace DotXxlJob.Core
{
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddXxlJobExecutor(this IServiceCollection services)
{
services.AddSingleton<ITaskExecutor, TaskExecutors.BeanTaskExecutor>();
services.AddSingleton<IJobHandlerFactory,DefaultJobHandlerFactory >();
services.AddSingleton<JobDispatcher>();
services.AddSingleton<TaskExecutorFactory>();
services.AddSingleton<XxlRpcServiceHandler>();
services.AddSingleton<CallbackTaskQueue>();
services.AddSingleton<AdminClient>();
services.AddSingleton<IExecutorRegistry, ExecutorRegistry>();
return services;
}
}
}

@ -9,10 +9,12 @@ namespace DotXxlJob.Core.TaskExecutors
public class BeanTaskExecutor:ITaskExecutor
{
private readonly IJobHandlerFactory _handlerFactory;
private readonly IJobLogger _jobLogger;
public BeanTaskExecutor(IJobHandlerFactory handlerFactory)
public BeanTaskExecutor(IJobHandlerFactory handlerFactory,IJobLogger jobLogger)
{
_handlerFactory = handlerFactory;
this._handlerFactory = handlerFactory;
this._jobLogger = jobLogger;
}
public string GlueType { get; } = Constants.GlueType.BEAN;
@ -23,12 +25,10 @@ namespace DotXxlJob.Core.TaskExecutors
if (handler == null)
{
return Task.FromResult(ReturnT.Failed($"job handler [{triggerParam.ExecutorHandler} not found."));
}
return Task.FromResult(ReturnT.Success("OK"));
//return handler.Execute(new JobExecuteContext());
var context = new JobExecuteContext(this._jobLogger, triggerParam.ExecutorParams);
return handler.Execute(context);
}
}
}

@ -4,6 +4,7 @@ using System.IO;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using Hessian;
using DotXxlJob.Core.Config;
using DotXxlJob.Core.Model;
using Microsoft.Extensions.Logging;
@ -19,6 +20,7 @@ namespace DotXxlJob.Core
{
private readonly JobDispatcher _jobDispatcher;
private readonly IJobLogger _jobLogger;
private readonly ILogger<XxlRpcServiceHandler> _logger;
private readonly XxlJobExecutorOptions _options;
@ -27,10 +29,12 @@ namespace DotXxlJob.Core
public XxlRpcServiceHandler(IOptions<XxlJobExecutorOptions> optionsAccessor,
JobDispatcher jobDispatcher,
IJobLogger jobLogger,
ILogger<XxlRpcServiceHandler> logger)
{
_jobDispatcher = jobDispatcher;
this._jobDispatcher = jobDispatcher;
this._jobLogger = jobLogger;
this._logger = logger;
this._options = optionsAccessor.Value;
@ -46,20 +50,24 @@ namespace DotXxlJob.Core
/// </summary>
/// <param name="reqStream"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
public async Task<byte[]> HandlerAsync(Stream reqStream)
{
var req = HessianSerializer.DeserializeRequest(reqStream);
var res = new RpcResponse { RequestId = req.RequestId};
if (!ValidRequest(req, out var error))
{
this._logger.LogWarning("job task request is not valid:{error}",error);
res.ErrorMsg = error;
}
else
{
this._logger.LogDebug("receive job task ,req.RequestId={requestId},method={methodName}"
,req.RequestId,req.MethodName);
await Invoke(req, res);
this._logger.LogDebug("completed receive job task ,req.RequestId={requestId},method={methodName},IsError={IsError}"
,req.RequestId,req.MethodName,res.IsError);
}
using (var outputStream = new MemoryStream())
@ -91,7 +99,7 @@ namespace DotXxlJob.Core
return false;
}
if (DateTime.UtcNow.Subtract(req.CreateMillisTime.FromUnixTimeMilliseconds()) > Constants.RpcRequestExpireTimeSpan)
if (DateTime.UtcNow.Subtract(req.CreateMillisTime.FromMilliseconds()) > Constants.RpcRequestExpireTimeSpan)
{
error = "request is timeout!";
return false;
@ -120,6 +128,7 @@ namespace DotXxlJob.Core
if (method == null)
{
res.ErrorMsg = $"The method{req.MethodName} is not defined.";
this._logger.LogWarning( $"The method{req.MethodName} is not defined.");
}
else
{
@ -131,7 +140,8 @@ namespace DotXxlJob.Core
}
catch (Exception ex)
{
res.ErrorMsg = ex.ToString();
res.ErrorMsg = ex.Message +"\n--------------\n"+ ex.StackTrace;
this._logger.LogError(ex,"invoke method error:{0}",ex.Message);
}
return Task.CompletedTask;
@ -177,7 +187,7 @@ namespace DotXxlJob.Core
}
/// <summary>
/// TODO:获取执行日志
/// read Log
/// </summary>
/// <param name="logDateTime"></param>
/// <param name="logId"></param>
@ -185,9 +195,9 @@ namespace DotXxlJob.Core
/// <returns></returns>
private ReturnT Log(long logDateTime, int logId, int fromLineNum)
{
//var logResult = JobLogger.ReadLog(logDateTime, logId, fromLineNum);
Console.WriteLine("{0} ---{1} --{2}",logDateTime,logId,fromLineNum);
return ReturnT.Success(null);
var ret = ReturnT.Success(null);
ret.Content = this._jobLogger.ReadLog(logDateTime, logId, fromLineNum);
return ret;
}
/// <summary>

@ -5,7 +5,7 @@ namespace Hessian
/// <summary>
///
/// </summary>
internal static class DateTimeExtension
public static class DateTimeExtension
{
private const long Era = 62135596800000L;
private const long Millis = 60000;
@ -36,7 +36,7 @@ namespace Hessian
/// </summary>
/// <param name="value"></param>
/// <returns></returns>
public static DateTime FromMinutes(int value)
public static DateTime FromMinutes(this int value)
{
var ticks = (value * Millis + Era) * 10000;
return new DateTime(ticks, DateTimeKind.Utc);
@ -47,7 +47,7 @@ namespace Hessian
/// </summary>
/// <param name="value"></param>
/// <returns></returns>
public static DateTime FromMilliseconds(long value)
public static DateTime FromMilliseconds(this long value)
{
var ticks = (value + Era) * 10000;
return new DateTime(ticks, DateTimeKind.Utc);

@ -31,6 +31,13 @@ namespace Hessian
typeNameRefs = new ListRefMap<string>();
}
public bool CanRead()
{
var tag = reader.Peek ();
return tag.HasValue;
}
#region ReadValue
public object ReadValue ()
@ -40,127 +47,127 @@ namespace Hessian
if (!tag.HasValue) {
throw new EndOfStreamException();
}
Console.WriteLine("------------ 0x{0:x2}----------------",tag.Value);
switch (tag.Value) {
case 0x00: case 0x01: case 0x02: case 0x03: case 0x04: case 0x05: case 0x06: case 0x07:
case 0x08: case 0x09: case 0x0A: case 0x0B: case 0x0C: case 0x0D: case 0x0E: case 0x0F:
case 0x10: case 0x11: case 0x12: case 0x13: case 0x14: case 0x15: case 0x16: case 0x17:
case 0x18: case 0x19: case 0x1A: case 0x1B: case 0x1C: case 0x1D: case 0x1E: case 0x1F:
Console.WriteLine("ReadShortString");
return ReadShortString();
case 0x20: case 0x21: case 0x22: case 0x23: case 0x24: case 0x25: case 0x26: case 0x27:
case 0x28: case 0x29: case 0x2A: case 0x2B: case 0x2C: case 0x2D: case 0x2E: case 0x2F:
Console.WriteLine("ReadShortBinary");
return ReadShortBinary();
case 0x30: case 0x31: case 0x32: case 0x33:
Console.WriteLine("ReadMediumString");
return ReadMediumString();
case 0x34: case 0x35: case 0x36: case 0x37:
Console.WriteLine("ReadMediumBinary");
return ReadMediumBinary();
case 0x38: case 0x39: case 0x3A: case 0x3B: case 0x3C: case 0x3D: case 0x3E: case 0x3F:
Console.WriteLine("ReadLongThreeBytes");
return ReadLongThreeBytes();
case 0x40:
Console.WriteLine("Reserved");
return Reserved();
case 0x41: case 0x42:
Console.WriteLine("ReadChunkedBinary");
return ReadChunkedBinary();
case 0x43:
Console.WriteLine("ReadClassDefinition");
return ReadClassDefinition();
case 0x44:
Console.WriteLine("ReadFullDouble");
return ReadFullDouble();
case 0x45:
Console.WriteLine("Reserved");
return Reserved();
case 0x46:
Console.WriteLine("ReadBoolean");
return ReadBoolean();
case 0x47:
Console.WriteLine("Reserved");
return Reserved();
case 0x48:
Console.WriteLine("ReadUntypedMap");
return ReadUntypedMap();
case 0x49:
Console.WriteLine("ReadInteger");
return ReadInteger();
case 0x4A:
Console.WriteLine("ReadDateInMillis");
return ReadDateInMillis();
case 0x4B:
Console.WriteLine("ReadDateInMinutes");
return ReadDateInMinutes();
case 0x4C:
Console.WriteLine("ReadLongFull");
return ReadLongFull();
case 0x4D:
Console.WriteLine("ReadTypedMap");
return ReadTypedMap();
case 0x4E:
Console.WriteLine("ReadNull");
return ReadNull();
case 0x4F:
Console.WriteLine("ReadObject");
return ReadObject();
case 0x50:
Console.WriteLine("Reserved");
return Reserved();
case 0x51:
Console.WriteLine("ReadRef");
return ReadRef();
case 0x52: case 0x53:
Console.WriteLine("ReadChunkedString");
return ReadChunkedString();
case 0x54:
Console.WriteLine("ReadBoolean");
return ReadBoolean();
case 0x55:
Console.WriteLine("ReadVarList");
return ReadVarList();
case 0x56:
Console.WriteLine("ReadFixList");
return ReadFixList();
case 0x57:
Console.WriteLine("ReadVarListUntyped");
return ReadVarListUntyped();
case 0x58:
Console.WriteLine("ReadFixListUntyped");
return ReadFixListUntyped();
case 0x59:
Console.WriteLine("ReadLongFourBytes");
return ReadLongFourBytes();
case 0x5A:
@ -168,30 +175,32 @@ namespace Hessian
throw new UnexpectedTagException(0x5A, "value");
case 0x5B: case 0x5C:
Console.WriteLine("ReadDoubleOneByte");
return ReadDoubleOneByte();
case 0x5D:
Console.WriteLine("ReadDoubleOneByte");
return ReadDoubleOneByte();
case 0x5E:
Console.WriteLine("ReadDoubleTwoBytes");
return ReadDoubleTwoBytes();
case 0x5F:
Console.WriteLine("ReadDoubleFourBytes");
return ReadDoubleFourBytes();
case 0x60: case 0x61: case 0x62: case 0x63: case 0x64: case 0x65: case 0x66: case 0x67:
case 0x68: case 0x69: case 0x6A: case 0x6B: case 0x6C: case 0x6D: case 0x6E: case 0x6F:
Console.WriteLine("ReadObjectCompact");
return ReadObjectCompact();
case 0x70: case 0x71: case 0x72: case 0x73: case 0x74: case 0x75: case 0x76: case 0x77:
Console.WriteLine("ReadCompactFixList");
return ReadCompactFixList();
case 0x78: case 0x79: case 0x7A: case 0x7B: case 0x7C: case 0x7D: case 0x7E: case 0x7F:
Console.WriteLine("ReadCompactFixListUntyped");
return ReadCompactFixListUntyped();
case 0x80: case 0x81: case 0x82: case 0x83: case 0x84: case 0x85: case 0x86: case 0x87:
@ -202,28 +211,30 @@ namespace Hessian
case 0xA8: case 0xA9: case 0xAA: case 0xAB: case 0xAC: case 0xAD: case 0xAE: case 0xAF:
case 0xB0: case 0xB1: case 0xB2: case 0xB3: case 0xB4: case 0xB5: case 0xB6: case 0xB7:
case 0xB8: case 0xB9: case 0xBA: case 0xBB: case 0xBC: case 0xBD: case 0xBE: case 0xBF:
Console.WriteLine("ReadIntegerSingleByte");
return ReadIntegerSingleByte();
case 0xC0: case 0xC1: case 0xC2: case 0xC3: case 0xC4: case 0xC5: case 0xC6: case 0xC7:
case 0xC8: case 0xC9: case 0xCA: case 0xCB: case 0xCC: case 0xCD: case 0xCE: case 0xCF:
Console.WriteLine("ReadIntegerTwoBytes");
return ReadIntegerTwoBytes();
case 0xD0: case 0xD1: case 0xD2: case 0xD3: case 0xD4: case 0xD5: case 0xD6: case 0xD7:
Console.WriteLine("ReadIntegerThreeBytes");
return ReadIntegerThreeBytes();
case 0xD8: case 0xD9: case 0xDA: case 0xDB: case 0xDC: case 0xDD: case 0xDE: case 0xDF:
case 0xE0: case 0xE1: case 0xE2: case 0xE3: case 0xE4: case 0xE5: case 0xE6: case 0xE7:
case 0xE8: case 0xE9: case 0xEA: case 0xEB: case 0xEC: case 0xED: case 0xEE: case 0xEF:
Console.WriteLine("ReadLongOneByte");
return ReadLongOneByte();
case 0xF0: case 0xF1: case 0xF2: case 0xF3: case 0xF4: case 0xF5: case 0xF6: case 0xF7:
case 0xF8: case 0xF9: case 0xFA: case 0xFB: case 0xFC: case 0xFD: case 0xFE: case 0xFF:
Console.WriteLine("ReadLongTwoBytes");
return ReadLongTwoBytes();
}
throw new Exception("WTF: byte value " + tag.Value + " not accounted for!");
}

Loading…
Cancel
Save