Compare commits

...

31 Commits

Author SHA1 Message Date
lztkdr ad792ec329 1. Transient功能修复 2、JobLogger 增加 LogId 3、打包 Kingo.XxlJob.Core v1.0.0 2 months ago
Xuanye Wong 727232a743 update sample dotnet to version6 1 year ago
xuanye wong 29965eb4f2
Merge pull request #38 from fanhousanbu/master 3 years ago
徐超越 db7cf6acfb
Update XxlRestfulServiceHandler.cs 3 years ago
xuanye wong e7adc08b5b
Merge pull request #37 from fanhousanbu/master 3 years ago
徐超越 fbe3e1a5cc
Update XxlRestfulServiceHandler.cs 3 years ago
xuanye wong 942efdba4b
Merge pull request #27 from StandHeo/master 4 years ago
leiyuan 63ebc19b17 修复idlebeat路由无法处理到的bug 4 years ago
xuanye wong d8e527b7e7
Merge pull request #26 from StandHeo/master 4 years ago
leiyuan 1fd77e8f23 Revert "修改swager配置" 4 years ago
leiyuan 834933aa17 修改swager配置 4 years ago
leiyuan 9a4260fa0d 修改超时控制 4 years ago
leiyuan eff0d3f065 新增任务超时自动取消功能 4 years ago
假正经哥哥 bee8db3813 publish :任务取消支持 (pullrequest#24) 4 years ago
xuanye wong c9d382722a
Merge pull request #24 from guochen2/cg/fix/task_cancel 4 years ago
guochen2 a57721cf90 1.增加CancellationToken用于任务取消 4 years ago
xuanye wong 0f0eaafbb9
Merge pull request #22 from FatuityCookie/patch-1 4 years ago
FatuityCookie 040ec60e8f
Update XxlRestfulServiceHandler.cs 4 years ago
Xuanye Wong e7cd2bcd9f 新增配置项 SpecialBindUrl ,支持配置特定的URL注册到admin 4 years ago
Xuanye Wong 99e4a5982f 🎨 格式化代码 5 years ago
Xuanye Wong f689dce846 修复阻塞处理策略设置为“丢弃后续调度“,只有第一次调度正常执行,后面的所有调度都被丢弃了的问题 5 years ago
Xuanye Wong 279916ae07 修复阻塞处理策略设置为“丢弃后续调度“,只有第一次调度正常执行,后面的所有调度都被丢弃了的问题 5 years ago
Xuanye Wong 617ee68288 发布版本 5 years ago
Xuanye Wong 50f4ffb7f2 兼容2.3版本 回调参数变更的问题 5 years ago
Xuanye Wong 8ddc99be82 Merge branch 'master' of github.com:xuanye/DotXxlJob 5 years ago
Xuanye Wong cc8f7d218f 🚧 合并冲突 5 years ago
Xuanye Wong c5cbf8230f 异步读取RequestBody 5 years ago
xuanye wong d1008dc495
Update README.md 5 years ago
xuanye wong 062b45870b
Update README.md 5 years ago
Xuanye Wong 79c3d000fc 修正TriggerParam类中logId的类型和admin 不一致的问题 5 years ago
Xuanye Wong e2d82ae952 新增适配xxl-jobv2.2+以上版本 6 years ago
  1. 51
      DotXxlJob.sln
  2. 167
      README.md
  3. 168
      README_SRC.md
  4. 8
      build/releasenotes.props
  5. 3
      build/version.props
  6. 13
      samples/ASPNetCoreExecutor/.config/dotnet-tools.json
  7. 3
      samples/ASPNetCoreExecutor/ASPNetCoreExecutor.csproj
  8. 8
      samples/ASPNetCoreExecutor/DemoJobHandler.cs
  9. 23
      samples/ASPNetCoreExecutor/Extensions/XxlJobExecutorMiddleware.cs
  10. 23
      samples/ASPNetCoreExecutor/Properties/PublishProfiles/FolderProfile.pubxml
  11. 17
      samples/ASPNetCoreExecutor/Properties/launchSettings.json
  12. 9
      samples/ASPNetCoreExecutor/Startup.cs
  13. 9
      samples/ASPNetCoreExecutor/appsettings.json
  14. 23
      samples/HessianReader/HessianReader.csproj
  15. 73
      samples/HessianReader/NewFile1.txt
  16. 74
      samples/HessianReader/NewFile2.txt
  17. 158
      samples/HessianReader/Program.cs
  18. BIN
      samples/HessianReader/kill.dat
  19. BIN
      samples/HessianReader/log.dat
  20. BIN
      samples/HessianReader/request.dat
  21. BIN
      samples/HessianReader/run.dat
  22. 18
      scripts/nuget-hessian.sh
  23. 4
      scripts/nuget.sh
  24. 2
      scripts/package.sh
  25. 117
      src/DotXxlJob.Core/AdminClient.cs
  26. 6
      src/DotXxlJob.Core/Config/XxlJobExecutorOptions.cs
  27. 33
      src/DotXxlJob.Core/DefaultJobHandlerFactory.cs
  28. 39
      src/DotXxlJob.Core/DotXxlJob.Core.csproj
  29. 3
      src/DotXxlJob.Core/ExecutorRegistry.cs
  30. 7
      src/DotXxlJob.Core/Extensions/DateTimeExtension.cs
  31. 10
      src/DotXxlJob.Core/Extensions/ServiceCollectionExtensions.cs
  32. 132
      src/DotXxlJob.Core/Internal/HessianObjectConvert.cs
  33. 79
      src/DotXxlJob.Core/Internal/HessianSerializer.cs
  34. 5
      src/DotXxlJob.Core/JobDispatcher.cs
  35. 9
      src/DotXxlJob.Core/Logger/IJobLogger.cs
  36. 45
      src/DotXxlJob.Core/Logger/JobLogger.cs
  37. 2
      src/DotXxlJob.Core/Model/AddressEntity.cs
  38. 31
      src/DotXxlJob.Core/Model/HandleCallbackParam.cs
  39. 15
      src/DotXxlJob.Core/Model/IdleBeatRequest.cs
  40. 6
      src/DotXxlJob.Core/Model/JobExecuteContext.cs
  41. 16
      src/DotXxlJob.Core/Model/KillRequest.cs
  42. 21
      src/DotXxlJob.Core/Model/LogRequest.cs
  43. 2
      src/DotXxlJob.Core/Model/RegistryParam.cs
  44. 6
      src/DotXxlJob.Core/Model/TriggerParam.cs
  45. 31
      src/DotXxlJob.Core/Queue/JobTaskQueue.cs
  46. 5
      src/DotXxlJob.Core/TaskExecutors/BeanTaskExecutor.cs
  47. 3
      src/DotXxlJob.Core/TaskExecutors/ITaskExecutor.cs
  48. 166
      src/DotXxlJob.Core/XxlRestfulServiceHandler.cs
  49. 217
      src/DotXxlJob.Core/XxlRpcServiceHandler.cs
  50. 59
      src/Hessian/ClassDef.cs
  51. 11
      src/Hessian/ClassElement.cs
  52. 91
      src/Hessian/Collections/ForwardingDictionary.cs
  53. 32
      src/Hessian/Collections/IRefMap.cs
  54. 59
      src/Hessian/Collections/ITwoWayDictionary.cs
  55. 34
      src/Hessian/Collections/ListRefMap.cs
  56. 123
      src/Hessian/Collections/TwoWayDictionary.cs
  57. 34
      src/Hessian/Collections/TwoWayDictionaryRefMap.cs
  58. 114
      src/Hessian/Conditions.cs
  59. 1015
      src/Hessian/Deserializer.cs
  60. 46
      src/Hessian/DictionaryTypeResolver.cs
  61. 24
      src/Hessian/Hessian.csproj
  62. 24
      src/Hessian/HessianException.cs
  63. 66
      src/Hessian/HessianObject.cs
  64. 27
      src/Hessian/HessianSerializationContext.cs
  65. 15
      src/Hessian/InvalidRefException.cs
  66. 66
      src/Hessian/ListTypeResolver.cs
  67. 39
      src/Hessian/Marker.cs
  68. 142
      src/Hessian/PeekStream.cs
  69. 25
      src/Hessian/Platform/BigEndianBitConverter.cs
  70. 215
      src/Hessian/Platform/EndianBitConverter.cs
  71. 25
      src/Hessian/Platform/LittleEndianBitConverter.cs
  72. 31
      src/Hessian/PropertyElement.cs
  73. 523
      src/Hessian/Serializer.cs
  74. 39
      src/Hessian/StringBuilderExtensions.cs
  75. 18
      src/Hessian/UnexpectedTagException.cs
  76. 163
      src/Hessian/ValueReader.cs
  77. 19
      tests/Hessian.Tests/Hessian.Tests.csproj
  78. 13
      tests/Hessian.Tests/UnitTest1.cs

@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26124.0
# Visual Studio Version 16
VisualStudioVersion = 16.0.30104.148
MinimumVisualStudioVersion = 15.0.26124.0
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{97756BA5-1E7C-4536-A49E-AE2190C0E6A5}"
EndProject
@ -13,13 +13,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{E959
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ASPNetCoreExecutor", "samples\ASPNetCoreExecutor\ASPNetCoreExecutor.csproj", "{DC9E5AF3-18FF-4713-BDB4-672E47ADA4E5}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HessianReader", "samples\HessianReader\HessianReader.csproj", "{F822B528-95FD-40B4-9EE0-3AE8878075AC}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Hessian", "src\Hessian\Hessian.csproj", "{BD9B8108-6528-430F-AD28-6F8434A29F55}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Hessian.Tests", "tests\Hessian.Tests\Hessian.Tests.csproj", "{A51FACF0-C90F-43D1-898D-CD171977C1A1}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotXxlJob.Core.Tests", "tests\DotXxlJob.Core.Tests\DotXxlJob.Core.Tests.csproj", "{81C60471-7C1C-48CE-98C0-F252C267AC9F}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotXxlJob.Core.Tests", "tests\DotXxlJob.Core.Tests\DotXxlJob.Core.Tests.csproj", "{81C60471-7C1C-48CE-98C0-F252C267AC9F}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@ -55,42 +49,6 @@ Global
{DC9E5AF3-18FF-4713-BDB4-672E47ADA4E5}.Release|x64.Build.0 = Release|Any CPU
{DC9E5AF3-18FF-4713-BDB4-672E47ADA4E5}.Release|x86.ActiveCfg = Release|Any CPU
{DC9E5AF3-18FF-4713-BDB4-672E47ADA4E5}.Release|x86.Build.0 = Release|Any CPU
{F822B528-95FD-40B4-9EE0-3AE8878075AC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F822B528-95FD-40B4-9EE0-3AE8878075AC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F822B528-95FD-40B4-9EE0-3AE8878075AC}.Debug|x64.ActiveCfg = Debug|Any CPU
{F822B528-95FD-40B4-9EE0-3AE8878075AC}.Debug|x64.Build.0 = Debug|Any CPU
{F822B528-95FD-40B4-9EE0-3AE8878075AC}.Debug|x86.ActiveCfg = Debug|Any CPU
{F822B528-95FD-40B4-9EE0-3AE8878075AC}.Debug|x86.Build.0 = Debug|Any CPU
{F822B528-95FD-40B4-9EE0-3AE8878075AC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F822B528-95FD-40B4-9EE0-3AE8878075AC}.Release|Any CPU.Build.0 = Release|Any CPU
{F822B528-95FD-40B4-9EE0-3AE8878075AC}.Release|x64.ActiveCfg = Release|Any CPU
{F822B528-95FD-40B4-9EE0-3AE8878075AC}.Release|x64.Build.0 = Release|Any CPU
{F822B528-95FD-40B4-9EE0-3AE8878075AC}.Release|x86.ActiveCfg = Release|Any CPU
{F822B528-95FD-40B4-9EE0-3AE8878075AC}.Release|x86.Build.0 = Release|Any CPU
{BD9B8108-6528-430F-AD28-6F8434A29F55}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{BD9B8108-6528-430F-AD28-6F8434A29F55}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BD9B8108-6528-430F-AD28-6F8434A29F55}.Debug|x64.ActiveCfg = Debug|Any CPU
{BD9B8108-6528-430F-AD28-6F8434A29F55}.Debug|x64.Build.0 = Debug|Any CPU
{BD9B8108-6528-430F-AD28-6F8434A29F55}.Debug|x86.ActiveCfg = Debug|Any CPU
{BD9B8108-6528-430F-AD28-6F8434A29F55}.Debug|x86.Build.0 = Debug|Any CPU
{BD9B8108-6528-430F-AD28-6F8434A29F55}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BD9B8108-6528-430F-AD28-6F8434A29F55}.Release|Any CPU.Build.0 = Release|Any CPU
{BD9B8108-6528-430F-AD28-6F8434A29F55}.Release|x64.ActiveCfg = Release|Any CPU
{BD9B8108-6528-430F-AD28-6F8434A29F55}.Release|x64.Build.0 = Release|Any CPU
{BD9B8108-6528-430F-AD28-6F8434A29F55}.Release|x86.ActiveCfg = Release|Any CPU
{BD9B8108-6528-430F-AD28-6F8434A29F55}.Release|x86.Build.0 = Release|Any CPU
{A51FACF0-C90F-43D1-898D-CD171977C1A1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A51FACF0-C90F-43D1-898D-CD171977C1A1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A51FACF0-C90F-43D1-898D-CD171977C1A1}.Debug|x64.ActiveCfg = Debug|Any CPU
{A51FACF0-C90F-43D1-898D-CD171977C1A1}.Debug|x64.Build.0 = Debug|Any CPU
{A51FACF0-C90F-43D1-898D-CD171977C1A1}.Debug|x86.ActiveCfg = Debug|Any CPU
{A51FACF0-C90F-43D1-898D-CD171977C1A1}.Debug|x86.Build.0 = Debug|Any CPU
{A51FACF0-C90F-43D1-898D-CD171977C1A1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A51FACF0-C90F-43D1-898D-CD171977C1A1}.Release|Any CPU.Build.0 = Release|Any CPU
{A51FACF0-C90F-43D1-898D-CD171977C1A1}.Release|x64.ActiveCfg = Release|Any CPU
{A51FACF0-C90F-43D1-898D-CD171977C1A1}.Release|x64.Build.0 = Release|Any CPU
{A51FACF0-C90F-43D1-898D-CD171977C1A1}.Release|x86.ActiveCfg = Release|Any CPU
{A51FACF0-C90F-43D1-898D-CD171977C1A1}.Release|x86.Build.0 = Release|Any CPU
{81C60471-7C1C-48CE-98C0-F252C267AC9F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{81C60471-7C1C-48CE-98C0-F252C267AC9F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{81C60471-7C1C-48CE-98C0-F252C267AC9F}.Debug|x64.ActiveCfg = Debug|Any CPU
@ -110,9 +68,6 @@ Global
GlobalSection(NestedProjects) = preSolution
{FFFEEA78-CB09-4BFB-89B7-E9A46EC3ED65} = {97756BA5-1E7C-4536-A49E-AE2190C0E6A5}
{DC9E5AF3-18FF-4713-BDB4-672E47ADA4E5} = {E959F9B5-F3EB-48B1-B842-2CDDFDB01900}
{F822B528-95FD-40B4-9EE0-3AE8878075AC} = {E959F9B5-F3EB-48B1-B842-2CDDFDB01900}
{BD9B8108-6528-430F-AD28-6F8434A29F55} = {97756BA5-1E7C-4536-A49E-AE2190C0E6A5}
{A51FACF0-C90F-43D1-898D-CD171977C1A1} = {352EC932-F112-4A2F-9DC3-F0761C85E068}
{81C60471-7C1C-48CE-98C0-F252C267AC9F} = {352EC932-F112-4A2F-9DC3-F0761C85E068}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution

@ -1,166 +1,13 @@
# DotXxlJob
xxl-job的dotnet core 执行器实现,支持XXL-JOB 2.0+
# Kingo.XxlJob.Core
## 1 XXL-JOB概述
[XXL-JOB][1]是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。以下是它的架构图
![架构图](https://raw.githubusercontent.com/xuxueli/xxl-job/master/doc/images/img_Qohm.png)
该仓库,基于 https://github.com/xuanye/DotXxlJob 源码基础上,进行扩展改造,原始项目说明,见 [README_SRC.md](README_SRC.md) 。
更新内容:
1. 修复IJobHandler 注册 AddTransient瞬时服务注册,瞬时不起作用的情况。(2025.2.26)
## 2. 关于DotXxlJob产生
在工作中调研过多个任务调度平台,如Hangfire、基于Quatz.NET的第三方扩展,都与实际的需求有一点差距。 之前一直使用Hangfire,Hangfire的执行器在同步调用业务服务时,如果即时业务服务正在重新部署或者重启,有一定概率会出现死锁,导致CPU100%,后来全部调整为异步,但是这样就无法获得执行结果,这样的设计有蛮大问题,XxlJob的回调机制很好的解决了这个问题。本身如果通过http的方式调用,只要部署springbootd的一个执行器就可以解决问题,但是扩展性较差。所以萌生了实现DotNet版本的执行器的想法,为避免重复造轮子,开始之前也进行过调研,以下仓库[https://github.com/yuniansheng/xxl-job-dotnet][2]给了较大的启发,但是该库只支持1.9版本的xxljob,还有一些其他小问题,所以还是自力更生。
示例:GTYPackageJobService 服务的 Push_Common_Job 、GetJobNames 重写,原先 GetJobNames 虽然是Transient服务,任务Execute会执行多次,但是GetJobNames 之前只会执行一次,调整后会执行多次。
## 3. 如何使用
2. JobExecuteContext下 的 IJobLogger 增加 LogId 属性,用于通过 LogId 查看Job日志。(2025.2.26)
目前只实现了BEAN的方式,即直接实现IJobHandler调用的方式,Glue源码的方式实际上实现起来也并不复杂(有需求再说把),或者各位有需求Fork 实现一下
可参考sample
安装:
> dotnet add package DotXxlJob.Core
### 3.1 在AspNetCore中使用
1. 声明一个AspNet的Middleware中间件,并扩展ApplicationBuilder,本质是拦截Post请求,解析Body中的流信息
```
public class XxlJobExecutorMiddleware
{
private readonly IServiceProvider _provider;
private readonly RequestDelegate _next;
private readonly XxlRpcServiceHandler _rpcService;
public XxlJobExecutorMiddleware(IServiceProvider provider, RequestDelegate next)
{
this._provider = provider;
this._next = next;
this._rpcService = _provider.GetRequiredService<XxlRpcServiceHandler>();
}
public async Task Invoke(HttpContext context)
{
if ("POST".Equals(context.Request.Method, StringComparison.OrdinalIgnoreCase) &&
"application/octet-stream".Equals(context.Request.ContentType, StringComparison.OrdinalIgnoreCase))
{
var rsp = await _rpcService.HandlerAsync(context.Request.Body);
context.Response.StatusCode = (int) HttpStatusCode.OK;
context.Response.ContentType = "text/plain;utf-8";
await context.Response.Body.WriteAsync(rsp,0,rsp.Length);
return;
}
await _next.Invoke(context);
}
}
```
扩展ApplicationBuilderExtensions,可根据实际情况绑定在特殊的Url Path上
```
public static class ApplicationBuilderExtensions
{
public static IApplicationBuilder UseXxlJobExecutor(this IApplicationBuilder @this)
{
return @this.UseMiddleware<XxlJobExecutorMiddleware>();
}
}
```
在Startup中添加必要的引用,其中自动注册。
```
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
private IConfiguration Configuration { get; }
public void ConfigureServices(IServiceCollection services)
{
services.AddXxlJobExecutor(Configuration);
services.AddSingleton<IJobHandler, DemoJobHandler>(); // 添加自定义的jobHandler
services.AddAutoRegistry(); // 自动注册
}
public void Configure(IApplicationBuilder app,IHostingEnvironment env)
{
//启用XxlExecutor
app.UseXxlJobExecutor();
}
}
```
编写JobHandler,继承AbstractJobHandler或者直接实现接口IJobHandler,通过context.JobLogger 记录执行过程和结果,在AdminWeb上可查看的哦
```
[JobHandler("demoJobHandler")]
public class DemoJobHandler:AbstractJobHandler
{
public override Task<ReturnT> Execute(JobExecuteContext context)
{
context.JobLogger.Log("receive demo job handler,parameter:{0}",context.JobParameter);
return Task.FromResult(ReturnT.SUCCESS);
}
}
```
## 3.2 配置信息
管理端地址和端口是必填信息,其他根据实际情况,选择配置,配置项说明见下代码中的注释
```
public class XxlJobExecutorOptions
{
/// <summary>
/// 管理端地址,多个以;分隔
/// </summary>
public string AdminAddresses { get; set; }
/// <summary>
/// appName自动注册时要去管理端配置一致
/// </summary>
public string AppName { get; set; } = "xxl-job-executor-dotnet";
/// <summary>
/// 自动注册时提交的地址,为空会自动获取内网地址
/// </summary>
public string SpecialBindAddress { get; set; }
/// <summary>
/// 绑定端口
/// </summary>
public int Port { get; set; }
/// <summary>
/// 是否自动注册
/// </summary>
public bool AutoRegistry { get; set; }
/// <summary>
/// 认证票据
/// </summary>
public string AccessToken { get; set; }
/// <summary>
/// 日志目录,默认为执行目录的logs子目录下,请配置绝对路径
/// </summary>
public string LogPath { get; set; } = Path.Combine(AppContext.BaseDirectory, "./logs");
/// <summary>
/// 日志保留天数
/// </summary>
public int LogRetentionDays { get; set; } = 30;
}
```
## 在其他Http服务中使用
只需要实现Http请求的拦截,并判断post请求中content-Type="application/octet-stream",并使用XxlRpcServiceHandler来处理流 即可。
## 其他说明
XXL-JOB内置的RPC是使用Hessian协议,这个有点坑。很多都是java特有的属性和标识,比如类名什么的。在本项目中,并没有实现完整的Hessian2协议,只实现了使用到的类型,当然扩展起来也非常方便。如果有人要单独使用Hessian 这个类库的话,要特别注意这个问题。
有任何问题,可Issue反馈 ,最后感谢 xxl-job
[1]: http://www.xuxueli.com/xxl-job
[2]: https://github.com/yuniansheng/xxl-job-dotnet
示例:任务执行后,可能有异常,有时找不到在哪次执行,可记录LogId,后续通过访问链接地址查看当时执行的日志。如 LogId 是 478571085,可访问对应的xjob服务的日志链接地址:https://xxx/xxl-job-admin/joblog/logDetailPage?id=478571085

@ -0,0 +1,168 @@
# DotXxlJob
xxl-job的dotnet core 最新执行器实现,支持XXL-JOB 2.2+
> 注意XXL-JOB 2.0.1版本请使用 1.0.8的执行器实现 ,*xxl-job* 从 2.0.2 到2.2版本又使用了xxl-rpc的新协议,本执行器不做支持,确实需要的朋友请自行fork..
## 1 XXL-JOB概述
[XXL-JOB][1]是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。以下是它的架构图
![架构图](https://raw.githubusercontent.com/xuxueli/xxl-job/master/doc/images/img_Qohm.png)
## 2. 关于DotXxlJob产生
在工作中调研过多个任务调度平台,如Hangfire、基于Quatz.NET的第三方扩展,都与实际的需求有一点差距。 之前一直使用Hangfire,Hangfire的执行器在同步调用业务服务时,如果即时业务服务正在重新部署或者重启,有一定概率会出现死锁,导致CPU100%,后来全部调整为异步,但是这样就无法获得执行结果,这样的设计有蛮大问题,XxlJob的回调机制很好的解决了这个问题。本身如果通过http的方式调用,只要部署springbootd的一个执行器就可以解决问题,但是扩展性较差。所以萌生了实现DotNet版本的执行器的想法,为避免重复造轮子,开始之前也进行过调研,以下仓库[https://github.com/yuniansheng/xxl-job-dotnet][2]给了较大的启发,但是该库只支持1.9版本的xxljob,还有一些其他小问题,所以还是自力更生。
## 3. 如何使用
目前只实现了BEAN的方式,即直接实现IJobHandler调用的方式,Glue源码的方式实际上实现起来也并不复杂(有需求再说把),或者各位有需求Fork 实现一下
可参考sample
安装:
> dotnet add package DotXxlJob.Core
### 3.1 在AspNetCore中使用
1. 声明一个AspNet的Middleware中间件,并扩展ApplicationBuilder,本质是拦截Post请求,解析Body中的流信息
```
public class XxlJobExecutorMiddleware
{
private readonly IServiceProvider _provider;
private readonly RequestDelegate _next;
private readonly XxlRestfulServiceHandler _rpcService;
public XxlJobExecutorMiddleware(IServiceProvider provider, RequestDelegate next)
{
this._provider = provider;
this._next = next;
this._rpcService = _provider.GetRequiredService<XxlRestfulServiceHandler>();
}
public async Task Invoke(HttpContext context)
{
string contentType = context.Request.ContentType;
if ("POST".Equals(context.Request.Method, StringComparison.OrdinalIgnoreCase)
&& !string.IsNullOrEmpty(contentType)
&& contentType.ToLower().StartsWith("application/json"))
{
await _rpcService.HandlerAsync(context.Request,context.Response);
return;
}
await _next.Invoke(context);
}
}
```
扩展ApplicationBuilderExtensions,可根据实际情况绑定在特殊的Url Path上
```
public static class ApplicationBuilderExtensions
{
public static IApplicationBuilder UseXxlJobExecutor(this IApplicationBuilder @this)
{
return @this.UseMiddleware<XxlJobExecutorMiddleware>();
}
}
```
在Startup中添加必要的引用,其中自动注册。
```
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
private IConfiguration Configuration { get; }
public void ConfigureServices(IServiceCollection services)
{
services.AddXxlJobExecutor(Configuration);
services.AddSingleton<IJobHandler, DemoJobHandler>(); // 添加自定义的jobHandler
services.AddAutoRegistry(); // 自动注册
}
public void Configure(IApplicationBuilder app,IHostingEnvironment env)
{
//启用XxlExecutor
app.UseXxlJobExecutor();
}
}
```
编写JobHandler,继承AbstractJobHandler或者直接实现接口IJobHandler,通过context.JobLogger 记录执行过程和结果,在AdminWeb上可查看的哦
```
[JobHandler("demoJobHandler")]
public class DemoJobHandler:AbstractJobHandler
{
public override Task<ReturnT> Execute(JobExecuteContext context)
{
context.JobLogger.Log("receive demo job handler,parameter:{0}",context.JobParameter);
return Task.FromResult(ReturnT.SUCCESS);
}
}
```
## 3.2 配置信息
管理端地址和端口是必填信息,其他根据实际情况,选择配置,配置项说明见下代码中的注释
```
public class XxlJobExecutorOptions
{
/// <summary>
/// 管理端地址,多个以;分隔
/// </summary>
public string AdminAddresses { get; set; }
/// <summary>
/// appName自动注册时要去管理端配置一致
/// </summary>
public string AppName { get; set; } = "xxl-job-executor-dotnet";
/// <summary>
/// 自动注册时提交的地址,为空会自动获取内网地址
/// </summary>
public string SpecialBindAddress { get; set; }
/// <summary>
/// 绑定端口
/// </summary>
public int Port { get; set; }
/// <summary>
/// 是否自动注册
/// </summary>
public bool AutoRegistry { get; set; }
/// <summary>
/// 认证票据
/// </summary>
public string AccessToken { get; set; }
/// <summary>
/// 日志目录,默认为执行目录的logs子目录下,请配置绝对路径
/// </summary>
public string LogPath { get; set; } = Path.Combine(AppContext.BaseDirectory, "./logs");
/// <summary>
/// 日志保留天数
/// </summary>
public int LogRetentionDays { get; set; } = 30;
}
```
## 其他说明
注意XXL-JOB 2.0.1版本请使用 1.0.8的执行器实现
有任何问题,可Issue反馈 ,最后感谢 xxl-job
[1]: http://www.xuxueli.com/xxl-job
[2]: https://github.com/yuniansheng/xxl-job-dotnet

@ -1,10 +1,10 @@
<Project>
<PropertyGroup>
<DotXxlJobPackageNotes>
1. 修复回调一次过多的问题
1. 修改异步方式读取RequestBody
2. 兼容xxl-jobv2.3版本回调函数参数变更的问题
3. 修复阻塞处理策略设置为“丢弃后续调度“,只有第一次调度正常执行,后面的所有调度都被丢弃了的问题
4. 任务取消支持 (pullrequest#24)
</DotXxlJobPackageNotes>
<HessianPackageNotes>
1. 实现基本的Hessian协议
</HessianPackageNotes>
</PropertyGroup>
</Project>

@ -1,6 +1,5 @@
<Project>
<PropertyGroup>
<DotXxlJobPackageVersion>1.0.8</DotXxlJobPackageVersion>
<HessianPackageVersion>1.0.1</HessianPackageVersion>
<DotXxlJobPackageVersion>2.3.2</DotXxlJobPackageVersion>
</PropertyGroup>
</Project>

@ -0,0 +1,13 @@
{
"version": 1,
"isRoot": true,
"tools": {
"dotnet-ef": {
"version": "8.0.7",
"commands": [
"dotnet-ef"
],
"rollForward": false
}
}
}

@ -1,11 +1,10 @@
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>netcoreapp2.2</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.App" />
<PackageReference Include="Microsoft.Extensions.Logging.Console">
<Version>2.2.0</Version>
</PackageReference>

@ -10,11 +10,13 @@ namespace ASPNetCoreExecutor
[JobHandler("demoJobHandler")]
public class DemoJobHandler:AbstractJobHandler
{
public override Task<ReturnT> Execute(JobExecuteContext context)
public override async Task<ReturnT> Execute(JobExecuteContext context)
{
context.JobLogger.Log("receive demo job handler,parameter:{0}",context.JobParameter);
return Task.FromResult(ReturnT.SUCCESS);
context.JobLogger.Log("开始休眠10秒");
await Task.Delay(10 * 1000);
context.JobLogger.Log("休眠10秒结束");
return ReturnT.SUCCESS;
}
}
}

@ -13,35 +13,26 @@ namespace ASPNetCoreExecutor
private readonly IServiceProvider _provider;
private readonly RequestDelegate _next;
private readonly XxlRpcServiceHandler _rpcService;
private readonly XxlRestfulServiceHandler _rpcService;
public XxlJobExecutorMiddleware(IServiceProvider provider, RequestDelegate next)
{
this._provider = provider;
this._next = next;
this._rpcService = _provider.GetRequiredService<XxlRpcServiceHandler>();
this._rpcService = _provider.GetRequiredService<XxlRestfulServiceHandler>();
}
public async Task Invoke(HttpContext context)
{
string contentType = context.Request.ContentType;
if ("POST".Equals(context.Request.Method, StringComparison.OrdinalIgnoreCase) &&
"application/octet-stream".Equals(context.Request.ContentType, StringComparison.OrdinalIgnoreCase))
if ("POST".Equals(context.Request.Method, StringComparison.OrdinalIgnoreCase)
&& !string.IsNullOrEmpty(contentType)
&& contentType.ToLower().StartsWith("application/json"))
{
/*
using (Stream file = File.Create("./"+DateTime.Now.ToUnixTimeSeconds()+".data"))
{
context.Request.Body.CopyTo(file);
}
return;
*/
var rsp = await _rpcService.HandlerAsync(context.Request.Body);
await _rpcService.HandlerAsync(context.Request,context.Response);
context.Response.StatusCode = (int) HttpStatusCode.OK;
context.Response.ContentType = "text/plain;utf-8";
await context.Response.Body.WriteAsync(rsp,0,rsp.Length);
return;
}

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="utf-8"?>
<!--
https://go.microsoft.com/fwlink/?LinkID=208121.
-->
<Project>
<PropertyGroup>
<DeleteExistingFiles>true</DeleteExistingFiles>
<ExcludeApp_Data>false</ExcludeApp_Data>
<LaunchSiteAfterPublish>true</LaunchSiteAfterPublish>
<LastUsedBuildConfiguration>Release</LastUsedBuildConfiguration>
<LastUsedPlatform>Any CPU</LastUsedPlatform>
<PublishProvider>FileSystem</PublishProvider>
<PublishUrl>bin\Release\net6.0\publish\</PublishUrl>
<WebPublishMethod>FileSystem</WebPublishMethod>
<_TargetId>Folder</_TargetId>
<SiteUrlToLaunchAfterPublish />
<TargetFramework>net6.0</TargetFramework>
<RuntimeIdentifier>linux-x64</RuntimeIdentifier>
<PublishSingleFile>true</PublishSingleFile>
<ProjectGuid>dc9e5af3-18ff-4713-bdb4-672e47ada4e5</ProjectGuid>
<SelfContained>false</SelfContained>
</PropertyGroup>
</Project>

@ -1,27 +1,12 @@
{
"iisSettings": {
"windowsAuthentication": false,
"anonymousAuthentication": true,
"iisExpress": {
"applicationUrl": "http://localhost:60087/",
"sslPort": 0
}
},
"profiles": {
"IIS Express": {
"commandName": "IISExpress",
"launchBrowser": true,
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
},
"ASPNetCoreExecutor": {
"commandName": "Project",
"launchBrowser": true,
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
},
"applicationUrl": "http://localhost:60088/"
"applicationUrl": "http://localhost:6662/"
}
}
}

@ -4,6 +4,7 @@ using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.AspNetCore.Server.Kestrel.Core;
namespace ASPNetCoreExecutor
{
@ -27,12 +28,16 @@ namespace ASPNetCoreExecutor
services.AddSingleton<IJobHandler, DemoJobHandler>(); // 添加自定义的jobHandler
services.AddAutoRegistry(); // 自动注册
//services.Configure<KestrelServerOptions>(x => x.AllowSynchronousIO = true)
// .Configure<IISServerOptions>(x=> x.AllowSynchronousIO = true);
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app,IHostingEnvironment env)
public void Configure(IApplicationBuilder app,IWebHostEnvironment env)
{
if (env.IsDevelopment())
if (env.EnvironmentName !="Production")
{
app.UseDeveloperExceptionPage();
}

@ -1,17 +1,16 @@
{
"Logging": {
"LogLevel": {
"Default": "Warning"
"Default": "Information"
}
},
"xxlJob": {
"adminAddresses":"http://127.0.0.1:8080",
"adminAddresses": "https://jobs.xuanye.wang/xxl-job-admin",
"appName": "xxl-job-executor-dotnet",
"specialBindAddress": "127.0.0.1",
"port": 5000,
"autoRegistry":false,
"port": 6662,
"autoRegistry": true,
"accessToken": "",
"logRetentionDays": 30
}
}

@ -1,23 +0,0 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.2</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="12.0.1" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotXxlJob.Core\DotXxlJob.Core.csproj" />
<ProjectReference Include="..\..\src\Hessian\Hessian.csproj" />
</ItemGroup>
<ItemGroup>
<None Update="run.dat">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>

@ -1,73 +0,0 @@
------------ 0x43----------------
ReadClassDefinition
Hessian.ClassDef={"Name":"com.xxl.rpc.remoting.net.params.XxlRpcRequest","Fields":["requestId","createMillisTime","accessToken","className","methodName","version","parameterTypes","parameters"]}
------------------------------------------------------------
------------ 0x60----------------
ReadObjectCompact XxlRpcRequest
------------ 0x30----------------
ReadMediumString requestId
------------ 0x4c----------------
ReadLongFull createMillisTime
------------ 0x00----------------
ReadShortString accessToken
------------ 0x1d----------------
ReadShortString className
------------ 0x08----------------
ReadShortString methodName
------------ 0x4e----------------
ReadNull version
------------ 0x71----------------
ReadCompactFixList parameterTypes
------------ 0x43----------------
ReadClassDefinition java.lang.Class
------------ 0x61----------------
ReadObjectCompact java.lang.Class
------------ 0x0e----------------
ReadShortString name
Hessian.HessianObject=[{"Item1":"requestId","Item2":"e24123be4a76417ca6f41f227532b235"},{"Item1":"createMillisTime","Item2":1547819469003},{"Item1":"accessToken","Item2":""},{"Item1":"className","Item2":"com.xxl.job.core.biz.AdminBiz"},{"It
em1":"methodName","Item2":"callback"},{"Item1":"version","Item2":null},{"Item1":"parameterTypes","Item2":[{"Name":"java.lang.Class","Fields":["name"]}]},{"Item1":"parameters","Item2":[{"Item1":"name","Item2":"java.util.List"}]}]
------------------------------------------------------------
------------ 0x71----------------
ReadCompactFixList parameters
------------ 0x72----------------
ReadCompactFixList List
------------ 0x43----------------
ReadClassDefinition HandleCallbackParam
------------ 0x62----------------
ReadObjectCompact HandleCallbackParam
------------ 0x9b----------------
ReadIntegerSingleByte logId
------------ 0x4c----------------
ReadLongFull logDateTim
------------ 0x43----------------
ReadClassDefinition executeResult
System.Collections.Generic.List`1[System.Object]=[[{"Name":"com.xxl.job.core.biz.model.HandleCallbackParam","Fields":["logId","logDateTim","executeResult"]},[{"Item1":"logId","Item2":11},{"Item1":"logDateTim","Item2":1547819469000},{"Item1"
:"executeResult","Item2":{"Name":"com.xxl.job.core.biz.model.ReturnT","Fields":["code","msg","content"]}}]]]
------------------------------------------------------------
------------ 0x63----------------
ReadObjectCompact executeResult
------------ 0xc8----------------
ReadIntegerTwoBytes code
------------ 0x03----------------
ReadShortString msg
------------ 0x07----------------
ReadShortString content
Hessian.HessianObject=[{"Item1":"code","Item2":200},{"Item1":"msg","Item2":"1bc"},{"Item1":"content","Item2":"acd3323"}]
------------------------------------------------------------
------------ 0x62----------------
ReadObjectCompact HandleCallbackParam
------------ 0xa6----------------
ReadIntegerSingleByte logId
------------ 0x4c----------------
ReadLongFull logDateTim
------------ 0x63----------------
ReadObjectCompact executeResult
------------ 0xc9----------------
ReadIntegerTwoBytes code
------------ 0x03----------------
ReadShortString msg
------------ 0x03----------------
ReadShortString content
Hessian.HessianObject=[{"Item1":"logId","Item2":22},{"Item1":"logDateTim","Item2":1547819469000},{"Item1":"executeResult","Item2":[{"Item1":"code","Item2":500},{"Item1":"msg","Item2":"cac"},{"Item1":"content","Item2":"aad"}]}]
------------------------------------------------------------
------------------------------------------------------------

@ -1,74 +0,0 @@
---------------------------------------------------------------
------------ 0x43----------------
ReadClassDefinition
Hessian.ClassDef={"Name":"com.xxl.rpc.remoting.net.params.XxlRpcRequest","Fields":["requestId","createMillisTime","accessToken","className","methodName","version","parameterTypes","parameters"]}
------------------------------------------------------------
------------ 0x60----------------
ReadObjectCompact
------------ 0x30----------------
ReadMediumString
------------ 0x4c----------------
ReadLongFull
------------ 0x00----------------
ReadShortString
------------ 0x1d----------------
ReadShortString
------------ 0x08----------------
ReadShortString
------------ 0x4e----------------
ReadNull
------------ 0x71----------------
ReadCompactFixList
------------ 0x43----------------
ReadClassDefinition
------------ 0x61----------------
ReadObjectCompact
------------ 0x0e----------------
ReadShortString
Hessian.HessianObject=[{"Item1":"requestId","Item2":"e24123be4a76417ca6f41f227532b235"},{"Item1":"createMillisTime","Item2":1547819469003},{"Item1":"accessToken","Item2":""},{"Item1":"className","Item2":"com.xxl.job.core.biz.AdminBiz"},{"I
tem1":"methodName","Item2":"callback"},{"Item1":"version","Item2":null},{"Item1":"parameterTypes","Item2":[{"Name":"java.lang.Class","Fields":["name"]}]},{"Item1":"parameters","Item2":[{"Item1":"name","Item2":"java.util.List"}]}]
------------------------------------------------------------
------------ 0x71----------------
ReadCompactFixList
------------ 0x72----------------
ReadCompactFixList
------------ 0x43----------------
ReadClassDefinition
------------ 0x62----------------
ReadObjectCompact
------------ 0x9b----------------
ReadIntegerSingleByte
------------ 0x4c----------------
ReadLongFull
------------ 0x43----------------
ReadClassDefinition
System.Collections.Generic.List`1[System.Object]=[[{"Name":"com.xxl.job.core.biz.model.HandleCallbackParam","Fields":["logId","logDateTim","executeResult"]},[{"Item1":"logId","Item2":11},{"Item1":"logDateTim","Item2":1547819469000},{"Item1
":"executeResult","Item2":{"Name":"com.xxl.job.core.biz.model.ReturnT","Fields":["code","msg","content"]}}]]]
------------------------------------------------------------
------------ 0x63----------------
ReadObjectCompact
------------ 0xc8----------------
ReadIntegerTwoBytes
------------ 0x03----------------
ReadShortString
------------ 0x07----------------
ReadShortString
Hessian.HessianObject=[{"Item1":"code","Item2":200},{"Item1":"msg","Item2":"1bc"},{"Item1":"content","Item2":"acd3323"}]
------------------------------------------------------------
------------ 0x62----------------
ReadObjectCompact
------------ 0xa6----------------
ReadIntegerSingleByte
------------ 0x4c----------------
ReadLongFull
------------ 0x63----------------
ReadObjectCompact
------------ 0xc9----------------
ReadIntegerTwoBytes
------------ 0x03----------------
ReadShortString
------------ 0x03----------------
ReadShortString
Hessian.HessianObject=[{"Item1":"logId","Item2":22},{"Item1":"logDateTim","Item2":1547819469000},{"Item1":"executeResult","Item2":[{"Item1":"code","Item2":500},{"Item1":"msg","Item2":"aad"},{"Item1":"content","Item2":"cac"}]}]
------------------------------------------------------------
------------------------------------------------------------

@ -1,158 +0,0 @@
using System;
using System.Collections.Generic;
using System.IO;
using DotXxlJob.Core;
using DotXxlJob.Core.Model;
using Hessian;
using Newtonsoft.Json;
namespace HessianReader
{
class Program
{
static void Main(string[] args)
{
/* */
byte[] myBinary = File.ReadAllBytes("run.dat");
foreach (var i in myBinary)
{
Console.Write("0x");
Console.Write(i.ToString("x2"));
Console.Write(",");
}
Console.WriteLine(Environment.NewLine);
Console.WriteLine("---------------------------------------------------------------");
/*
byte[] myBinary;
var callbackParamList = new List<HandleCallbackParam> {
new HandleCallbackParam {
LogId = 11,
LogDateTime = 1547819469000L,
ExecuteResult =new ReturnT { Code = 200,Content ="acd3323",Msg ="1bc" }
},
new HandleCallbackParam {
LogId = 22,
LogDateTime = 1547819469000L,
ExecuteResult =new ReturnT { Code = 500,Content ="cac",Msg ="aad" }
}
};
var request = new RpcRequest {
RequestId ="e24123be4a76417ca6f41f227532b235",
CreateMillisTime = 1547819469003L,
AccessToken = "",
ClassName = "com.xxl.job.core.biz.AdminBiz",
MethodName = "callback",
ParameterTypes = new List<object> {new JavaClass {Name = "java.util.List"}},
Parameters = new List<object> {callbackParamList}
};
using (var stream = new MemoryStream())
{
HessianSerializer.SerializeRequest(stream,request);
myBinary = stream.ToArray();
}
*/
using (var stream1 = new MemoryStream(myBinary))
{
var s1 = HessianSerializer.DeserializeRequest(stream1);
Console.WriteLine("{0}={1}",s1.GetType(),JsonConvert.SerializeObject(s1));
/*
var s = new Deserializer(stream1);
while ( s.CanRead())
{
var o = s.ReadValue();
Console.WriteLine("{0}={1}",o.GetType(),JsonConvert.SerializeObject(o));
Console.WriteLine("------------------------------------------------------------");
}
*/
}
Console.WriteLine("------------------------------------------------------------");
Console.ReadKey();
/**
*
* Console.WriteLine("---------------------------------------------------------------");
RpcRequest req = new RpcRequest {
RequestId = "71565f61-94e8-4dcf-9760-f2fb73a6886a",
CreateMillisTime = 1547621183585,
AccessToken = "",
ClassName = "com.xxl.job.core.biz.ExecutorBiz",
MethodName = "run",
ParameterTypes = new List<object> {new JavaClass{ Name = "com.xxl.job.core.biz.model.TriggerParam"}},
Version = null,
Parameters = new List<object>()
};
var p =new TriggerParam {
JobId=1,
ExecutorHandler="demoJobHandler",
ExecutorParams="111",
ExecutorBlockStrategy="SERIAL_EXECUTION",
ExecutorTimeout=0,
LogId=5,
LogDateTime=1547621183414L,
GlueType="BEAN",
GlueSource="",
GlueUpdateTime=1541254891000,
BroadcastIndex=0,
BroadcastTotal=1
};
req.Parameters.Add(p);
using (var stream2 = new MemoryStream())
{
var serializer = new Serializer(stream2);
serializer.WriteObject(req);
Console.WriteLine("-----------------------------序列化成功---{0}-------------------------------",stream2.Length);
stream2.Position = 0;
var s2 = HessianSerializer.DeserializeRequest(stream2);
Console.WriteLine(JsonConvert.SerializeObject(s2));
}
* [{"Item1":"requestId","Item2":"71565f61-94e8-4dcf-9760-f2fb73a6886a"},{"Item1":"createMillisTime","Item2":1432957289},{"Item1":"accessToken","Item2":""},{"Item1":"className","Item2":"com.xxl.job.core.biz.ExecutorBiz"},{"Item1":"methodName","Item2":"run"},{"Item1":"version","Item2":null},{"Item1":"parameterT
ypes","Item2":[{"Name":"java.lang.Class","Fields":["name"]}]},{"Item1":"parameters","Item2":[{"Item1":"name","Item2":"com.xxl.job.core.biz.model.TriggerParam"}]}]
System.Collections.Generic.List`1[System.Object]
[{"Name":"com.xxl.job.core.biz.model.TriggerParam","Fields":["jobId","executorHandler","executorParams","executorBlockStrategy","executorTimeout","logId","logDateTim","glueType","glueSource","glueUpdatetime","broadcastIndex","broadcastTotal"]}]
Hessian.HessianObject
[{"Item1":"jobId","Item2":1},{"Item1":"executorHandler","Item2":"demoJobHandler"},{"Item1":"executorParams","Item2":"111"},{"Item1":"executorBlockStrategy","Item2":"SERIAL_EXECUTION"},{"Item1":"executorTimeout","Item2":0},{"Item1":"logId","Item2":5},{"Item1":"logDateTim","Item2":1432956926},{"Item1":"glueTy
pe","Item2":"BEAN"},{"Item1":"glueSource","Item2":""},{"Item1":"glueUpdatetime","Item2":-638368258},{"Item1":"broadcastIndex","Item2":0},{"Item1":"broadcastTotal","Item2":1}]
* requestId='71565f61-94e8-4dcf-9760-f2fb73a6886a',
* createMillisTime=1547621183585,
* accessToken='',
* className='com.xxl.job.core.biz.ExecutorBiz',
* methodName='run',
* parameterTypes=[class com.xxl.job.core.biz.model.TriggerParam],
* parameters=[
* TriggerParam{
* jobId=1,
* executorHandler='demoJobHandler',
* executorParams='111',
* executorBlockStrategy='SERIAL_EXECUTION',
* executorTimeout=0,
* logId=5,
* logDateTim=1547621183414,
* glueType='BEAN',
* glueSource='',
* glueUpdatetime=1541254891000,
* broadcastIndex=0,
* broadcastTotal=1
* }
* ], version='null'
*
*/
}
}
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

@ -1,18 +0,0 @@
set -ex
cd $(dirname $0)/../
artifactsFolder="./artifacts"
if [ -d $artifactsFolder ]; then
rm -R $artifactsFolder
fi
mkdir -p $artifactsFolder
dotnet build ./src/Hessian/Hessian.csproj -c Release
dotnet pack ./src/Hessian/Hessian.csproj -c Release -o ../../$artifactsFolder
dotnet nuget push ./$artifactsFolder/Hessian.*.nupkg -k $NUGET_KEY -s https://www.nuget.org

@ -13,6 +13,6 @@ mkdir -p $artifactsFolder
dotnet build ./src/DotXxlJob.Core/DotXxlJob.Core.csproj -c Release
dotnet pack ./src/DotXxlJob.Core/DotXxlJob.Core.csproj -c Release -o ../../$artifactsFolder
dotnet pack ./src/DotXxlJob.Core/DotXxlJob.Core.csproj -c Release -o $artifactsFolder
dotnet nuget push ./$artifactsFolder/DotXxlJob.Core.*.nupkg -k $NUGET_KEY -s https://www.nuget.org
dotnet nuget push $artifactsFolder/DotXxlJob.Core.*.nupkg -k $NUGET_KEY -s https://www.nuget.org

@ -14,4 +14,4 @@ dotnet restore ./DotXxlJob.sln
dotnet build ./DotXxlJob.sln -c Release
dotnet pack ./src/DotXxlJob.Core/DotXxlJob.Core.csproj -c Release -o ../../$artifactsFolder
dotnet pack ./src/DotXxlJob.Core/DotXxlJob.Core.csproj -c Release -o $artifactsFolder

@ -7,31 +7,28 @@ using System.Net.Http.Headers;
using System.Threading.Tasks;
using DotXxlJob.Core.Config;
using DotXxlJob.Core.Model;
using Hessian;
using Flurl;
using Flurl.Http;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
namespace DotXxlJob.Core
{
public class AdminClient
{
private static readonly string MAPPING = "/api";
private readonly XxlJobExecutorOptions _options;
private readonly IHttpClientFactory _clientFactory;
private readonly ILogger<AdminClient> _logger;
private List<AddressEntry> _addresses;
private int _currentIndex;
private static readonly string MAPPING = "/api";
public AdminClient(IOptions<XxlJobExecutorOptions> optionsAccessor
, IHttpClientFactory clientFactory
, ILogger<AdminClient> logger)
{
Preconditions.CheckNotNull(optionsAccessor?.Value, "XxlJobExecutorOptions");
this._options = optionsAccessor?.Value;
this._clientFactory = clientFactory;
this._logger = logger;
InitAddress();
}
@ -43,8 +40,7 @@ namespace DotXxlJob.Core
{
try
{
var uri = new Uri(item + MAPPING);
var entry = new AddressEntry { RequestUri = uri };
var entry = new AddressEntry { RequestUri = item+ MAPPING };
this._addresses.Add(entry);
}
catch (Exception ex)
@ -56,55 +52,43 @@ namespace DotXxlJob.Core
public Task<ReturnT> Callback(List<HandleCallbackParam> callbackParamList)
{
return InvokeRpcService("callback", new List<object> { new JavaClass { Name = Constants.JavaListFulName } }, callbackParamList);
return InvokeRpcService("callback", callbackParamList);
}
public Task<ReturnT> Registry(RegistryParam registryParam)
{
return InvokeRpcService("registry", new List<object> { new JavaClass { Name = "com.xxl.job.core.biz.model.RegistryParam" } }, registryParam, true);
return InvokeRpcService("registry", registryParam);
}
public Task<ReturnT> RegistryRemove(RegistryParam registryParam)
{
return InvokeRpcService("registryRemove", new List<object> { new JavaClass { Name = "com.xxl.job.core.biz.model.RegistryParam" } }, registryParam, true);
return InvokeRpcService("registryRemove", registryParam);
}
private async Task<ReturnT> InvokeRpcService(string methodName, List<object> parameterTypes,
object parameters, bool polling = false)
{
var request = new RpcRequest {
RequestId = Guid.NewGuid().ToString("N"),
CreateMillisTime = DateTime.Now.GetTotalMilliseconds(),
AccessToken = _options.AccessToken,
ClassName = "com.xxl.job.core.biz.AdminBiz",
MethodName = methodName,
ParameterTypes = parameterTypes,
Parameters = new List<object> { parameters }
};
byte[] postBuf;
using (var stream = new MemoryStream())
{
HessianSerializer.SerializeRequest(stream, request);
postBuf = stream.ToArray();
}
private async Task<ReturnT> InvokeRpcService(string methodName, object jsonObject)
{
var triedTimes = 0;
var retList = new List<ReturnT>();
ReturnT ret = null;
using (var client = this._clientFactory.CreateClient(Constants.DefaultHttpClientName))
{
while (triedTimes++ < this._addresses.Count)
{
var address = this._addresses[this._currentIndex];
this._currentIndex = (this._currentIndex + 1) % this._addresses.Count;
if (!address.CheckAccessible())
continue;
Stream resStream;
try
{
resStream = await DoPost(client, address, postBuf);
var json = await address.RequestUri.AppendPathSegment(methodName)
.WithHeader("XXL-JOB-ACCESS-TOKEN", this._options.AccessToken)
.PostJsonAsync(jsonObject)
.ReceiveString();
//.ReceiveJson<ReturnT>();
ret = JsonConvert.DeserializeObject<ReturnT>(json);
address.Reset();
}
catch (Exception ex)
@ -113,64 +97,13 @@ namespace DotXxlJob.Core
address.SetFail();
continue;
}
RpcResponse res = null;
try
{
/*
using (StreamReader reader = new StreamReader(resStream))
{
string content = await reader.ReadToEndAsync();
this._logger.LogWarning(content);
}
*/
res = HessianSerializer.DeserializeResponse(resStream);
}
catch (Exception ex)
{
this._logger.LogError(ex, "DeserializeResponse error:{errorMessage}", ex.Message);
}
if (res == null)
{
retList.Add(ReturnT.Failed("response is null"));
}
else if (res.IsError)
{
retList.Add(ReturnT.Failed(res.ErrorMsg));
}
else if (res.Result is ReturnT ret)
{
retList.Add(ret);
}
else
{
retList.Add(ReturnT.Failed("response is null"));
}
if (!polling)
if(ret == null)
{
return retList[0];
ret = ReturnT.Failed("call admin fail");
}
return ret;
}
if (retList.Count > 0)
{
return retList.Last();
}
}
throw new Exception("xxl-rpc server address not accessible.");
}
private async Task<Stream> DoPost(HttpClient client, AddressEntry address, byte[] postBuf)
{
var content = new ByteArrayContent(postBuf);
content.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");
var responseMessage = await client.PostAsync(address.RequestUri, content);
responseMessage.EnsureSuccessStatusCode();
return await responseMessage.Content.ReadAsStreamAsync();
}
}
}

@ -18,6 +18,12 @@ namespace DotXxlJob.Core.Config
public string AppName { get; set; } = "xxl-job-executor-dotnet";
/// <summary>
/// 绑定的特殊的URL,如果该项配置存在,则忽略SpecialBindAddress和Port
/// </summary>
public string SpecialBindUrl { get; set; }
/// <summary>
/// 自动注册时提交的地址,为空会自动获取内网地址
/// </summary>

@ -1,7 +1,9 @@
using System;
using System.Collections.Generic;
using System.ComponentModel.Design;
using System.Linq;
using System.Reflection;
using System.Runtime.InteropServices;
using Microsoft.Extensions.DependencyInjection;
namespace DotXxlJob.Core
@ -9,16 +11,24 @@ namespace DotXxlJob.Core
public class DefaultJobHandlerFactory : IJobHandlerFactory
{
private readonly IServiceProvider _provider;
private readonly IServiceCollection _services;
private readonly Dictionary<string, IJobHandler> handlersCache = new Dictionary<string, IJobHandler>();
public DefaultJobHandlerFactory(IServiceProvider provider)
private readonly Dictionary<string, Type> NoSingleTypeCache = new Dictionary<string, Type>();
public DefaultJobHandlerFactory(IServiceProvider provider, IServiceCollection services)
{
this._provider = provider;
this._services = services;
Initialize();
}
private void Initialize()
{
var list = this._provider.GetServices<IJobHandler>();
var svcDprs = this._services.Where(v => v.ServiceType == typeof(IJobHandler)).ToList();
if (list == null || !list.Any())
{
throw new TypeLoadException("IJobHandlers are not found in IServiceCollection");
@ -26,15 +36,24 @@ namespace DotXxlJob.Core
foreach (var handler in list)
{
var jobHandlerAttr = handler.GetType().GetCustomAttribute<JobHandlerAttribute>();
var handlerName = jobHandlerAttr == null ? handler.GetType().Name : jobHandlerAttr.Name;
var handlerType = handler.GetType();
var svcDpr = this._services.FirstOrDefault(v => v.ServiceType == typeof(IJobHandler) && v.ImplementationType == handlerType);
var jobHandlerAttr = handlerType.GetCustomAttribute<JobHandlerAttribute>();
var handlerName = jobHandlerAttr == null ? handlerType.Name : jobHandlerAttr.Name;
if (handlersCache.ContainsKey(handlerName))
{
throw new Exception($"same IJobHandler' name: [{handlerName}]");
}
if (svcDpr.Lifetime == ServiceLifetime.Singleton)
{
handlersCache.Add(handlerName, handler);
}
else
{
NoSingleTypeCache.Add(handlerName, svcDpr.ImplementationType);
}
}
}
public IJobHandler GetJobHandler(string handlerName)
@ -43,7 +62,11 @@ namespace DotXxlJob.Core
{
return handlersCache[handlerName];
}
return null;
else
{
var handler = NoSingleTypeCache[handlerName];
return this._provider.GetService(handler) as IJobHandler;
}
}
}
}

@ -1,36 +1,25 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="../../build/version.props" />
<Import Project="../../build/releasenotes.props" />
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<DefineConstants>$(DefineConstants);DOTNETCORE</DefineConstants>
<Description>XxlJobExecutor DotNet port</Description>
<Copyright>Xuanye @ 2019</Copyright>
<Authors>Xuanye</Authors>
<AssemblyTitle>XxlJobExecutor DotNet port</AssemblyTitle>
<AssemblyName>DotXxlJob.Core</AssemblyName>
<PackageId>DotXxlJob.Core</PackageId>
<Version>$(DotXxlJobPackageVersion)</Version>
<PackageTags>Hession,xxl-job,DotXxlJob</PackageTags>
<PackageReleaseNotes>
$(DotXxlJobPackageNotes)
</PackageReleaseNotes>
<PackageProjectUrl>https://github.com/xuanye/DotXxlJob</PackageProjectUrl>
<PackageLicense>https://github.com/xuanye/DotXxlJob/blob/master/LICENSE</PackageLicense>
<Copyright>Kingo @ 2025</Copyright>
<AssemblyName>Kingo.XxlJob.Core</AssemblyName>
<Description>Xxl-Job中间件。</Description>
<PackageId>Kingo.XxlJob.Core</PackageId>
<Version>1.0.0</Version>
<Authors>Kingo</Authors>
<PackageTags>xxl,job,xxl-job,DotXxlJob,xxljob</PackageTags>
<PackageRequireLicenseAcceptance>false</PackageRequireLicenseAcceptance>
<RepositoryType>git</RepositoryType>
<RepositoryUrl>https://github.com/xuanye/DotXxlJob</RepositoryUrl>
<Company>http://www.kingoit.com</Company>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.Http" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="2.1.0" />
<PackageReference Include="Utf8Json" Version="1.3.7" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Hessian\Hessian.csproj" />
<PackageReference Include="Flurl.Http" Version="2.4.2" />
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Http" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="2.2.0" />
<PackageReference Include="Utf8Json" Version="1.3.7" />
</ItemGroup>
</Project>

@ -35,7 +35,8 @@ namespace DotXxlJob.Core
var registryParam = new RegistryParam {
RegistryGroup = "EXECUTOR",
RegistryKey = _options.AppName,
RegistryValue = $"{_options.SpecialBindAddress}:{_options.Port}"
RegistryValue = string.IsNullOrEmpty(_options.SpecialBindUrl)?
$"http://{_options.SpecialBindAddress}:{_options.Port}/" : _options.SpecialBindUrl
};
_logger.LogInformation(">>>>>>>> start registry to admin <<<<<<<<");

@ -1,10 +1,9 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace Hessian
namespace DotXxlJob.Core.Extensions
{
/// <summary>
///
/// </summary>
public static class DateTimeExtension
{
private const long Era = 62135596800000L;

@ -1,4 +1,5 @@
using System;
using System.Linq;
using DotXxlJob.Core.Config;
using DotXxlJob.Core.DefaultHandlers;
using DotXxlJob.Core.Queue;
@ -15,6 +16,7 @@ namespace DotXxlJob.Core
{
services.AddLogging();
services.AddOptions();
services.AddSingleton(services);
services.Configure<XxlJobExecutorOptions>(configuration.GetSection("xxlJob"))
.AddXxlJobExecutorServiceDependency();
@ -37,6 +39,12 @@ namespace DotXxlJob.Core
{
services.AddSingleton<IExecutorRegistry,ExecutorRegistry>()
.AddSingleton<IHostedService,JobsExecuteHostedService>();
var descriptors = services.Where(v => v.Lifetime != ServiceLifetime.Singleton && v.ServiceType == typeof(IJobHandler)).ToList();
foreach (var desc in descriptors)
{
services.AddTransient(desc.ImplementationType);
}
return services;
}
@ -51,7 +59,7 @@ namespace DotXxlJob.Core
services.AddHttpClient("DotXxlJobClient");
services.AddSingleton<JobDispatcher>();
services.AddSingleton<TaskExecutorFactory>();
services.AddSingleton<XxlRpcServiceHandler>();
services.AddSingleton<XxlRestfulServiceHandler>();
services.AddSingleton<CallbackTaskQueue>();
services.AddSingleton<AdminClient>();
services.AddSingleton<ITaskExecutor, TaskExecutors.BeanTaskExecutor>();

@ -1,132 +0,0 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Reflection;
using System.Runtime.Serialization;
using DotXxlJob.Core.Model;
using Hessian;
namespace DotXxlJob.Core
{
public class HessianObjectHelper
{
private static readonly Dictionary<string,Dictionary<string,PropertyInfo>> TransferObjCache
= new Dictionary<string, Dictionary<string, PropertyInfo>>();
private static readonly Dictionary<string,Type> TransferTypeCache
= new Dictionary<string, Type>();
static HessianObjectHelper()
{
InitProperties(typeof(RpcRequest));
InitProperties(typeof(TriggerParam));
InitProperties(typeof(RpcResponse));
InitProperties(typeof(ReturnT));
InitProperties(typeof(HandleCallbackParam));
InitProperties(typeof(JavaClass));
InitProperties(typeof(RegistryParam));
InitProperties(typeof(LogResult));
}
private static void InitProperties(Type type)
{
var propertyInfos = new Dictionary<string, PropertyInfo>();
var typeInfo = type.GetTypeInfo();
var classAttr = type.GetCustomAttribute<DataContractAttribute>();
if (classAttr == null)
{
return;
}
foreach (var property in typeInfo.DeclaredProperties)
{
var attribute = property.GetCustomAttribute<DataMemberAttribute>();
if (null == attribute)
{
continue;
}
if (!property.CanRead || !property.CanWrite)
{
continue;
}
propertyInfos.Add(attribute.Name,property);
}
TransferTypeCache.Add(classAttr.Name,type);
TransferObjCache.Add(classAttr.Name,propertyInfos);
}
public static object GetRealObjectValue(Deserializer deserializer,object value)
{
if (value == null || IsSimpleType(value.GetType()))
{
return value;
}
if (value is HessianObject hessianObject)
{
if(TransferObjCache.TryGetValue(hessianObject.TypeName,out var properties))
{
var instance = Activator.CreateInstance(TransferTypeCache[hessianObject.TypeName]);
foreach (var (k, v) in hessianObject)
{
if (properties.TryGetValue(k, out var p))
{
p.SetValue(instance,GetRealObjectValue(deserializer,v));
}
}
return instance;
}
}
if (value is ClassDef)
{
return GetRealObjectValue(deserializer, deserializer.ReadValue());
}
if (IsListType(value.GetType()))
{
var listData = new List<object>();
var cList = value as List<object>;
foreach (var cItem in cList)
{
listData.Add(GetRealObjectValue(deserializer,cItem));
}
return listData;
}
throw new HessianException($"unknown item:{value.GetType()}");
}
private static bool IsListType(Type type)
{
return typeof(ICollection).IsAssignableFrom(type);
}
private static bool IsSimpleType(Type typeInfo)
{
if (typeInfo.IsValueType || typeInfo.IsEnum || typeInfo.IsPrimitive)
{
return true;
}
if (typeof (string) == typeInfo)
{
return true;
}
return false;
}
}
}

@ -1,79 +0,0 @@
using System;
using System.IO;
using DotXxlJob.Core.Model;
using Hessian;
namespace DotXxlJob.Core
{
public static class HessianSerializer
{
public static RpcRequest DeserializeRequest(Stream stream)
{
RpcRequest request = null;
try
{
var deserializer = new Deserializer(stream);
var classDef = deserializer.ReadValue() as ClassDef;
if (!Constants.RpcRequestJavaFullName.Equals(classDef.Name))
{
throw new HessianException($"unknown class :{classDef.Name}");
}
request = HessianObjectHelper.GetRealObjectValue(deserializer,deserializer.ReadValue()) as RpcRequest;
}
catch (EndOfStreamException)
{
//没有数据可读了
}
return request;
}
public static void SerializeRequest(Stream stream,RpcRequest req)
{
var serializer = new Serializer(stream);
serializer.WriteObject(req);
}
public static void SerializeResponse(Stream stream,RpcResponse res)
{
var serializer = new Serializer(stream);
serializer.WriteObject(res);
}
public static RpcResponse DeserializeResponse(Stream resStream)
{
RpcResponse rsp = null;
try
{
var deserializer = new Deserializer(resStream);
var classDef = deserializer.ReadValue() as ClassDef;
if (!Constants.RpcResponseJavaFullName.Equals(classDef.Name))
{
throw new HessianException($"unknown class :{classDef.Name}");
}
rsp = HessianObjectHelper.GetRealObjectValue(deserializer,deserializer.ReadValue()) as RpcResponse;
}
catch (EndOfStreamException)
{
//没有数据可读了
}
catch
{
//TODO: do something?
}
return rsp;
}
}
}

@ -81,9 +81,14 @@ namespace DotXxlJob.Core
{
//丢弃后续的
if (Constants.ExecutorBlockStrategy.DISCARD_LATER == triggerParam.ExecutorBlockStrategy)
{
//存在还没执行完成的任务
if (taskQueue.IsRunning())
{
return ReturnT.Failed($"block strategy effect:{triggerParam.ExecutorBlockStrategy}");
}
//否则还是继续做
}
//覆盖较早的
if (Constants.ExecutorBlockStrategy.COVER_EARLY == triggerParam.ExecutorBlockStrategy)
{

@ -5,8 +5,11 @@ namespace DotXxlJob.Core
{
public interface IJobLogger
{
long LogId { get; }
void SetLogFile(long logTime, int logId);
string LogPath { get; }
void SetLogFile(long logTime, long logId);
void Log(string pattern, params object[] format);
@ -14,10 +17,10 @@ namespace DotXxlJob.Core
void LogError(Exception ex);
LogResult ReadLog(long logTime, int logId, int fromLine);
LogResult ReadLog(long logTime, long logId, int fromLine);
void LogSpecialFile(long logTime, int logId, string pattern, params object[] format);
void LogSpecialFile(long logTime, long logId, string pattern, params object[] format);
}
}

@ -5,8 +5,8 @@ using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DotXxlJob.Core.Config;
using DotXxlJob.Core.Extensions;
using DotXxlJob.Core.Model;
using Hessian;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
@ -19,13 +19,43 @@ namespace DotXxlJob.Core
private readonly AsyncLocal<string> LogFileName = new AsyncLocal<string>();
private readonly XxlJobExecutorOptions _options;
private static readonly object locker = new object();
public JobLogger(IOptions<XxlJobExecutorOptions> optionsAccessor, ILogger<JobLogger> logger)
{
this._logger = logger;
this._options = optionsAccessor.Value;
}
public void SetLogFile(long logTime, int logId)
public long _LogId = 0;
public long LogId
{
get
{
return _LogId;
}
private set
{
_LogId = value;
}
}
public string _LocalPath = null;
public string LogPath
{
get
{
return _LocalPath;
}
private set
{
_LocalPath = value;
}
}
void IJobLogger.SetLogFile(long logTime, long logId)
{
try
{
@ -37,6 +67,8 @@ namespace DotXxlJob.Core
CleanOldLogs();
}
LogFileName.Value = filePath;
this.LogId = logId;
this.LogPath = filePath;
}
catch (Exception ex)
{
@ -67,7 +99,7 @@ namespace DotXxlJob.Core
LogDetail(GetLogFileName(), callInfo, ex.Message + ex.StackTrace);
}
public LogResult ReadLog(long logTime, int logId, int fromLine)
public LogResult ReadLog(long logTime, long logId, int fromLine)
{
var filePath = MakeLogFileName(logTime, logId);
if (string.IsNullOrEmpty(filePath))
@ -107,7 +139,7 @@ namespace DotXxlJob.Core
return logResult;
}
public void LogSpecialFile(long logTime, int logId, string pattern, params object[] format)
public void LogSpecialFile(long logTime, long logId, string pattern, params object[] format)
{
var filePath = MakeLogFileName(logTime, logId);
var callInfo = new StackTrace(true).GetFrame(1);
@ -120,13 +152,15 @@ namespace DotXxlJob.Core
{
return LogFileName.Value;
}
private string MakeLogFileName(long logDateTime, int logId)
private string MakeLogFileName(long logDateTime, long logId)
{
//log fileName like: logPath/HandlerLogs/yyyy-MM-dd/9999.log
return Path.Combine(this._options.LogPath, Constants.HandleLogsDirectory,
logDateTime.FromMilliseconds().ToString("yyyy-MM-dd"), $"{logId}.log");
}
private void LogDetail(string logFileName, StackFrame callInfo, string appendLog)
{
lock (locker)
{
if (string.IsNullOrEmpty(logFileName))
{
@ -153,6 +187,7 @@ namespace DotXxlJob.Core
this._logger.LogError(ex, "LogDetail error");
}
}
}
private void CleanOldLogs()
{

@ -4,7 +4,7 @@ namespace DotXxlJob.Core.Model
{
public class AddressEntry
{
public Uri RequestUri { get; set; }
public string RequestUri { get; set; }
private DateTime? LastFailedTime { get; set; }

@ -20,10 +20,39 @@ namespace DotXxlJob.Core.Model
public int CallbackRetryTimes { get; set; }
[DataMember(Name = "logId",Order = 1)]
public int LogId { get; set; }
public long LogId { get; set; }
[DataMember(Name = "logDateTim",Order = 2)]
public long LogDateTime { get; set; }
/// <summary>
/// 2.3.0以前版本
/// </summary>
[DataMember(Name = "executeResult",Order = 3)]
public ReturnT ExecuteResult { get; set; }
/// <summary>
/// 2.3.0版本使用的参数
/// </summary>
[DataMember(Name = "handleCode", Order = 4)]
public int HandleCode {
get {
if(this.ExecuteResult != null)
{
return this.ExecuteResult.Code;
}
return 500;
}
}
/// <summary>
/// 2.3.0版本使用的参数
/// </summary>
[DataMember(Name = "handleMsg", Order = 5)]
public string HandleMsg {
get {
return this.ExecuteResult?.Msg;
}
}
}
}

@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Text;
namespace DotXxlJob.Core.Model
{
[DataContract]
public class IdleBeatRequest
{
[DataMember(Name = "jobId", Order = 1)]
public int JobId { get; set; }
}
}

@ -1,13 +1,17 @@
using System.Threading;
namespace DotXxlJob.Core.Model
{
public class JobExecuteContext
{
public JobExecuteContext(IJobLogger jobLogger,string jobParameter)
public JobExecuteContext(IJobLogger jobLogger, string jobParameter, CancellationToken cancellationToken)
{
this.JobLogger = jobLogger;
this.JobParameter = jobParameter;
this.cancellationToken = cancellationToken;
}
public string JobParameter { get; }
public IJobLogger JobLogger { get; }
public CancellationToken cancellationToken { get; }
}
}

@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Text;
namespace DotXxlJob.Core.Model
{
[DataContract]
public class KillRequest
{
[DataMember(Name = "jobId", Order = 1)]
public int JobId { get; set; }
}
}

@ -0,0 +1,21 @@
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Text;
namespace DotXxlJob.Core.Model
{
[DataContract]
public class LogRequest
{
[DataMember(Name = "logDateTim", Order =1)]
public long LogDateTime { get; set; }
[DataMember(Name = "logId", Order = 2)]
public int LogId { get; set; }
[DataMember(Name = "fromLineNum", Order = 3)]
public int FromLineNum { get; set; }
}
}

@ -5,7 +5,7 @@ namespace DotXxlJob.Core.Model
[DataContract(Name = Constants.RegistryParamJavaFullName)]
public class RegistryParam
{
[DataMember(Name = "registGroup",Order = 1)]
[DataMember(Name = "registryGroup", Order = 1)]
public string RegistryGroup { get; set; }
[DataMember(Name = "registryKey", Order = 2)]

@ -3,7 +3,7 @@ using System.Runtime.Serialization;
namespace DotXxlJob.Core.Model
{
[DataContract(Name = Constants.TriggerParamJavaFullName)]
[DataContract]
public class TriggerParam
{
//static readonly long SerialVersionUID = 42L;
@ -23,8 +23,8 @@ namespace DotXxlJob.Core.Model
public int ExecutorTimeout{ get; set; }
[DataMember(Name = "logId",Order = 5)]
public int LogId{ get; set; }
[DataMember(Name = "logDateTim",Order = 6)]
public long LogId { get; set; }
[DataMember(Name = "logDateTime", Order = 6)]
public long LogDateTime{ get; set; }

@ -13,7 +13,7 @@ namespace DotXxlJob.Core
private readonly IJobLogger _jobLogger;
private readonly ILogger<JobTaskQueue> _logger;
private readonly ConcurrentQueue<TriggerParam> TASK_QUEUE = new ConcurrentQueue<TriggerParam>();
private readonly ConcurrentDictionary<int, byte> ID_IN_QUEUE = new ConcurrentDictionary<int, byte>();
private readonly ConcurrentDictionary<long, byte> ID_IN_QUEUE = new ConcurrentDictionary<long, byte>();
private CancellationTokenSource _cancellationTokenSource;
private Task _runTask;
public JobTaskQueue(ITaskExecutor executor, IJobLogger jobLogger, ILogger<JobTaskQueue> logger)
@ -30,6 +30,13 @@ namespace DotXxlJob.Core
public bool IsRunning()
{
return _cancellationTokenSource != null;
}
/// <summary>
/// 覆盖之前的队列
/// </summary>
@ -37,11 +44,11 @@ namespace DotXxlJob.Core
/// <returns></returns>
public ReturnT Replace(TriggerParam triggerParam)
{
Stop();
while (!TASK_QUEUE.IsEmpty)
{
TASK_QUEUE.TryDequeue(out _);
}
Stop();
ID_IN_QUEUE.Clear();
return Push(triggerParam);
@ -74,12 +81,12 @@ namespace DotXxlJob.Core
public void Dispose()
{
Stop();
while (!TASK_QUEUE.IsEmpty)
{
TASK_QUEUE.TryDequeue(out _);
}
ID_IN_QUEUE.Clear();
Stop();
}
private void StartTask()
@ -121,7 +128,20 @@ namespace DotXxlJob.Core
_jobLogger.Log("<br>----------- xxl-job job execute start -----------<br>----------- Param:{0}", triggerParam.ExecutorParams);
result = await Executor.Execute(triggerParam);
var exectorToken = ct;
CancellationTokenSource timeoutCts = null;
if (triggerParam.ExecutorTimeout > 0)
{
timeoutCts = new CancellationTokenSource(triggerParam.ExecutorTimeout * 1000);
exectorToken = CancellationTokenSource.CreateLinkedTokenSource(exectorToken, timeoutCts.Token).Token;
}
result = await Executor.Execute(triggerParam, exectorToken);
if(timeoutCts != null && timeoutCts.IsCancellationRequested)
{
result = ReturnT.FAIL_TIMEOUT;
timeoutCts.Dispose();
timeoutCts = null;
}
_jobLogger.Log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + result.Code);
}
@ -144,9 +164,8 @@ namespace DotXxlJob.Core
}
_cancellationTokenSource.Dispose();
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;
}, _cancellationTokenSource.Token);

@ -1,3 +1,4 @@
using System.Threading;
using System.Threading.Tasks;
using DotXxlJob.Core.Model;
@ -19,7 +20,7 @@ namespace DotXxlJob.Core.TaskExecutors
public string GlueType { get; } = Constants.GlueType.BEAN;
public Task<ReturnT> Execute(TriggerParam triggerParam)
public Task<ReturnT> Execute(TriggerParam triggerParam, CancellationToken cancellationToken)
{
var handler = _handlerFactory.GetJobHandler(triggerParam.ExecutorHandler);
@ -27,7 +28,7 @@ namespace DotXxlJob.Core.TaskExecutors
{
return Task.FromResult(ReturnT.Failed($"job handler [{triggerParam.ExecutorHandler} not found."));
}
var context = new JobExecuteContext(this._jobLogger, triggerParam.ExecutorParams);
var context = new JobExecuteContext(this._jobLogger, triggerParam.ExecutorParams, cancellationToken);
return handler.Execute(context);
}
}

@ -1,3 +1,4 @@
using System.Threading;
using System.Threading.Tasks;
using DotXxlJob.Core.Model;
@ -7,6 +8,6 @@ namespace DotXxlJob.Core
{
string GlueType { get; }
Task<ReturnT> Execute(TriggerParam triggerParam);
Task<ReturnT> Execute(TriggerParam triggerParam, CancellationToken cancellationToken);
}
}

@ -0,0 +1,166 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Cache;
using System.Text;
using System.Threading.Tasks;
using DotXxlJob.Core.Config;
using DotXxlJob.Core.Model;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
namespace DotXxlJob.Core
{
public class XxlRestfulServiceHandler
{
private readonly JobDispatcher _jobDispatcher;
private readonly IJobLogger _jobLogger;
private readonly ILogger<XxlRestfulServiceHandler> _logger;
private readonly XxlJobExecutorOptions _options;
public XxlRestfulServiceHandler(IOptions<XxlJobExecutorOptions> optionsAccessor,
JobDispatcher jobDispatcher,
IJobLogger jobLogger,
ILogger<XxlRestfulServiceHandler> logger)
{
this._jobDispatcher = jobDispatcher;
this._jobLogger = jobLogger;
this._logger = logger;
this._options = optionsAccessor.Value;
if (this._options == null)
{
throw new ArgumentNullException(nameof(XxlJobExecutorOptions));
}
}
public async Task HandlerAsync(HttpRequest request,HttpResponse response)
{
var path = request.Path.Value ;
ReturnT ret = null;
var arrParts = path.Split('/');
var method = arrParts[arrParts.Length - 1].ToLower();
if (!string.IsNullOrEmpty(this._options.AccessToken))
{
var reqToken = "";
if (request.Headers.TryGetValue("XXL-JOB-ACCESS-TOKEN", out var tokenValues))
{
reqToken = tokenValues[0].ToString();
}
if(this._options.AccessToken != reqToken)
{
ret = ReturnT.Failed("ACCESS-TOKEN Auth Fail");
response.ContentType = "application/json;charset=utf-8";
await response.WriteAsync(JsonConvert.SerializeObject(ret));
return;
}
}
try
{
string json = await CollectBody(request.Body);
switch (method)
{
case "beat":
ret = Beat();
break;
case "idlebeat":
ret = IdleBeat(JsonConvert.DeserializeObject<IdleBeatRequest>(json));
break;
case "run":
ret = Run(JsonConvert.DeserializeObject<TriggerParam>(json));
break;
case "kill":
ret = Kill(JsonConvert.DeserializeObject<KillRequest>(json));
break;
case "log":
ret = Log(JsonConvert.DeserializeObject<LogRequest>(json));
break;
}
}
catch(Exception ex)
{
this._logger.LogError(ex,"响应出错"+ ex.Message);
ret = ReturnT.Failed("执行器内部错误");
}
if(ret == null)
{
ret = ReturnT.Failed($"method {method} is not impl");
}
response.ContentType = "application/json;charset=utf-8";
await response.WriteAsync(JsonConvert.SerializeObject(ret, new JsonSerializerSettings() { StringEscapeHandling = StringEscapeHandling.EscapeNonAscii }));
}
private async Task<string> CollectBody(Stream body)
{
string bodyText;
using (var reader = new StreamReader(body))
{
bodyText = await reader.ReadToEndAsync();
}
return bodyText??string.Empty;
}
#region rpc service
private ReturnT Beat()
{
return ReturnT.SUCCESS;
}
private ReturnT IdleBeat(IdleBeatRequest req)
{
if(req == null)
{
return ReturnT.Failed("IdleBeat Error");
}
return this._jobDispatcher.IdleBeat(req.JobId);
}
private ReturnT Kill(KillRequest req)
{
if (req == null)
{
return ReturnT.Failed("Kill Error");
}
return this._jobDispatcher.TryRemoveJobTask(req.JobId) ?
ReturnT.SUCCESS
:
ReturnT.Success("job thread already killed.");
}
/// <summary>
/// read Log
/// </summary>
/// <returns></returns>
private ReturnT Log(LogRequest req)
{
if (req == null)
{
return ReturnT.Failed("Log Error");
}
var ret = ReturnT.Success(null);
ret.Content = this._jobLogger.ReadLog(req.LogDateTime,req.LogId, req.FromLineNum);
return ret;
}
/// <summary>
/// 执行
/// </summary>
/// <param name="triggerParam"></param>
/// <returns></returns>
private ReturnT Run(TriggerParam triggerParam)
{
return this._jobDispatcher.Execute(triggerParam);
}
#endregion
}
}

@ -1,217 +0,0 @@
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using Hessian;
using DotXxlJob.Core.Config;
using DotXxlJob.Core.Model;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotXxlJob.Core
{
/// <summary>
/// 负责执行Http请求,序列化和反序列化并发送响应
/// </summary>
public class XxlRpcServiceHandler
{
private readonly JobDispatcher _jobDispatcher;
private readonly IJobLogger _jobLogger;
private readonly ILogger<XxlRpcServiceHandler> _logger;
private readonly XxlJobExecutorOptions _options;
private readonly ConcurrentDictionary<string, MethodInfo> METHOD_CACHE =
new ConcurrentDictionary<string, MethodInfo>();
public XxlRpcServiceHandler(IOptions<XxlJobExecutorOptions> optionsAccessor,
JobDispatcher jobDispatcher,
IJobLogger jobLogger,
ILogger<XxlRpcServiceHandler> logger)
{
this._jobDispatcher = jobDispatcher;
this._jobLogger = jobLogger;
this._logger = logger;
this._options = optionsAccessor.Value;
if (this._options == null)
{
throw new ArgumentNullException(nameof(XxlJobExecutorOptions));
}
}
/// <summary>
/// 处理XxlRpc请求流
/// </summary>
/// <param name="reqStream"></param>
/// <returns></returns>
public async Task<byte[]> HandlerAsync(Stream reqStream)
{
var req = HessianSerializer.DeserializeRequest(reqStream);
var res = new RpcResponse { RequestId = req.RequestId};
if (!ValidRequest(req, out var error))
{
this._logger.LogWarning("job task request is not valid:{error}",error);
res.ErrorMsg = error;
}
else
{
this._logger.LogDebug("receive job task ,req.RequestId={requestId},method={methodName}"
,req.RequestId,req.MethodName);
await Invoke(req, res);
this._logger.LogDebug("completed receive job task ,req.RequestId={requestId},method={methodName},IsError={IsError}"
,req.RequestId,req.MethodName,res.IsError);
}
using (var outputStream = new MemoryStream())
{
HessianSerializer.SerializeResponse(outputStream,res);
return outputStream.GetBuffer();
}
}
/// <summary>
/// 校验请求信息
/// </summary>
/// <param name="req"></param>
/// <param name="error"></param>
/// <returns></returns>
private bool ValidRequest(RpcRequest req,out string error)
{
error = string.Empty;
if (req == null)
{
error = "unknown request stream data,codec fail";
return false;
}
if (!"com.xxl.job.core.biz.ExecutorBiz".Equals(req.ClassName)) //
{
error = "not supported request!";
return false;
}
if (DateTime.UtcNow.Subtract(req.CreateMillisTime.FromMilliseconds()) > Constants.RpcRequestExpireTimeSpan)
{
error = "request is timeout!";
return false;
}
if (!string.IsNullOrEmpty(this._options.AccessToken) && this._options.AccessToken != req.AccessToken)
{
error = "need authorize";
return false;
}
return true;
}
/// <summary>
/// 执行请求,获取执行函数
/// </summary>
/// <param name="req"></param>
/// <param name="res"></param>
/// <returns></returns>
private Task Invoke(RpcRequest req, RpcResponse res)
{
try
{
var method = GetMethodInfo(req.MethodName);
if (method == null)
{
res.ErrorMsg = $"The method{req.MethodName} is not defined.";
this._logger.LogWarning( $"The method{req.MethodName} is not defined.");
}
else
{
var result = method.Invoke(this, req.Parameters.ToArray());
res.Result = result;
}
}
catch (Exception ex)
{
res.ErrorMsg = ex.Message +"\n--------------\n"+ ex.StackTrace;
this._logger.LogError(ex,"invoke method error:{0}",ex.Message);
}
return Task.CompletedTask;
}
private MethodInfo GetMethodInfo(string methodName)
{
if (METHOD_CACHE.TryGetValue(methodName, out var method))
{
return method;
}
var type = GetType();
method = type.GetMethod( methodName, BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.IgnoreCase);
if (method != null)
{
METHOD_CACHE.TryAdd(methodName, method);
}
return method;
}
#region rpc service
private ReturnT Beat()
{
return ReturnT.SUCCESS;
}
private ReturnT IdleBeat(int jobId)
{
return this._jobDispatcher.IdleBeat(jobId);
}
private ReturnT Kill(int jobId)
{
return this._jobDispatcher.TryRemoveJobTask(jobId) ?
ReturnT.SUCCESS
:
ReturnT.Success("job thread already killed.");
}
/// <summary>
/// read Log
/// </summary>
/// <param name="logDateTime"></param>
/// <param name="logId"></param>
/// <param name="fromLineNum"></param>
/// <returns></returns>
private ReturnT Log(long logDateTime, int logId, int fromLineNum)
{
var ret = ReturnT.Success(null);
ret.Content = this._jobLogger.ReadLog(logDateTime, logId, fromLineNum);
return ret;
}
/// <summary>
/// 执行
/// </summary>
/// <param name="triggerParam"></param>
/// <returns></returns>
private ReturnT Run(TriggerParam triggerParam)
{
return this._jobDispatcher.Execute(triggerParam);
}
#endregion
}
}

@ -1,59 +0,0 @@
using System;
namespace Hessian
{
public class ClassDef : IEquatable<ClassDef>
{
public string Name { get; private set; }
public string[] Fields { get; private set; }
public ClassDef(string name, string[] fields)
{
Name = Conditions.CheckNotNull(name, "name");
Fields = Conditions.CheckNotNull(fields, "fields");
}
public override int GetHashCode()
{
unchecked {
const uint prime = 16777619;
var hash = 2166136261;
hash *= prime;
hash ^= (uint)Name.GetHashCode();
for (var i = 0; i < Fields.Length; ++i) {
hash *= prime;
hash ^= (uint)Fields[i].GetHashCode();
}
return (int)hash;
}
}
public override bool Equals(object obj)
{
if (ReferenceEquals(null, obj)) {
return false;
}
if (ReferenceEquals(this, obj)) {
return true;
}
if (obj.GetType() != GetType()) {
return false;
}
return Equals((ClassDef) obj);
}
public bool Equals(ClassDef other)
{
if (ReferenceEquals(null, other)) {
return false;
}
if (ReferenceEquals(this, other)) {
return true;
}
return string.Equals(Name, other.Name) && Fields.Equals(other.Fields);
}
}
}

@ -1,11 +0,0 @@
using System.Collections.Generic;
namespace Hessian
{
public class ClassElement
{
public string ClassName { get; set; }
public List<PropertyElement> Fields { get; set; }
}
}

@ -1,91 +0,0 @@
using System.Collections;
using System.Collections.Generic;
namespace Hessian.Collections
{
public abstract class ForwardingDictionary<TKey, TValue> : IDictionary<TKey, TValue>
{
protected abstract IDictionary<TKey, TValue> Delegate { get; }
public virtual ICollection<TKey> Keys
{
get { return Delegate.Keys; }
}
public virtual ICollection<TValue> Values
{
get { return Delegate.Values; }
}
public virtual IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
{
return Delegate.GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
public virtual void Add(KeyValuePair<TKey, TValue> item)
{
Delegate.Add(item);
}
public virtual void Clear()
{
Delegate.Clear();
}
public virtual bool Contains(KeyValuePair<TKey, TValue> item)
{
return Delegate.Contains(item);
}
public virtual void CopyTo(KeyValuePair<TKey, TValue>[] array, int arrayIndex)
{
Delegate.CopyTo(array, arrayIndex);
}
public virtual bool Remove(KeyValuePair<TKey, TValue> item)
{
return Delegate.Remove(item);
}
public virtual int Count
{
get { return Delegate.Count; }
}
public virtual bool IsReadOnly
{
get { return Delegate.IsReadOnly; }
}
public virtual bool ContainsKey(TKey key)
{
return Delegate.ContainsKey(key);
}
public virtual void Add(TKey key, TValue value)
{
Delegate.Add(key, value);
}
public virtual bool Remove(TKey key)
{
return Delegate.Remove(key);
}
public virtual bool TryGetValue(TKey key, out TValue value)
{
return Delegate.TryGetValue(key, out value);
}
public virtual TValue this[TKey key]
{
get { return Delegate[key]; }
set { Delegate[key] = value; }
}
}
}

@ -1,32 +0,0 @@
namespace Hessian.Collections
{
/// <summary>
/// Represents a map that associates objects with a zero-based integer
/// index. Specified in Hessian 2.0.
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IRefMap<T>
{
/// <summary>
/// Adds an element to the ref map and returns its ID.
/// </summary>
/// <param name="entry"></param>
/// <returns></returns>
int Add(T entry);
/// <summary>
/// Retrieves the element identified by the given ID.
/// </summary>
/// <param name="refId"></param>
/// <returns></returns>
T Get(int refId);
/// <summary>
/// Looks up an element in the ref map and, if present, returns its ID.
/// </summary>
/// <remarks>
/// Performance of this method is not guaranteed and is implementation-specific.
/// </remarks>
int? Lookup(T entry);
}
}

@ -1,59 +0,0 @@
using System.Collections.Generic;
namespace Hessian.Collections
{
/// <summary>
/// Represents a dictionary which maintains uniqueness of both keys and values,
/// allowing for the lookup of a key by its value in additon to the normal
/// operations expected of a dictionary.
/// </summary>
public interface ITwoWayDictionary<TKey, TValue> : IDictionary<TKey, TValue>
{
/// <summary>
/// Exposes a view of the current dictionary that reverses keys and
/// values. Note that the same underlying data is shared.
/// </summary>
ITwoWayDictionary<TValue, TKey> Inverse { get; }
/// <summary>
/// Gets or sets a key by the given value.
/// </summary>
/// <param name="value">
/// The value with which to get or set a key.
/// </param>
/// <returns>
/// Returns the key indexing the given <paramref name="value"/>.
/// </returns>
TKey this[TValue valueKey] { get; set; }
/// <summary>
/// Gets a value indicating whether the given <paramref name="value"/>
/// is contained in this dictionary.
/// </summary>
/// <param name="value">
/// The value whose presence is to be determined.
/// </param>
/// <returns>
/// Returns <see langword="true"/> if <paramref name="value"/> is in
/// this dictionary, and <see langword="false"/> otherwise.
/// </returns>
bool ContainsValue(TValue value);
/// <summary>
/// Attempts to look up a key by the given value. A return value
/// indicates whether the lookip is successful.
/// </summary>
/// <param name="value">
/// The value whose corresponding key is to be retrieved.
/// </param>
/// <param name="key">
/// When the method returns, contains the looked-up key if the lookup
/// succeeded.
/// </param>
/// <returns>
/// Returns <see langword="true"/> if the lookup succeeded, and
/// <see langword="false"/> otherwise.
/// </returns>
bool TryGetKey(TValue value, out TKey key);
}
}

@ -1,34 +0,0 @@
using System.Collections.Generic;
namespace Hessian.Collections
{
public class ListRefMap<T> : IRefMap<T>
{
private readonly List<T> list = new List<T>();
public int Add(T entry)
{
list.Add(entry);
return list.Count - 1;
}
public T Get(int refId)
{
if (refId < 0 || refId >= list.Count) {
throw new InvalidRefException(refId);
}
return list[refId];
}
public int? Lookup(T entry)
{
for (var i = 0; i < list.Count; ++i) {
if (entry.Equals(list[i])) {
return i;
}
}
return null;
}
}
}

@ -1,123 +0,0 @@
using System.Collections.Generic;
namespace Hessian.Collections
{
public class TwoWayDictionary<TKey, TValue> : ForwardingDictionary<TKey, TValue>, ITwoWayDictionary<TKey, TValue>
{
private readonly IDictionary<TKey, TValue> dict;
private readonly TwoWayDictionary<TValue, TKey> inverse;
protected override IDictionary<TKey, TValue> Delegate {
get { return dict; }
}
public override bool IsReadOnly {
get { return false; }
}
public ITwoWayDictionary<TValue, TKey> Inverse {
get { return inverse; }
}
public override TValue this[TKey key] {
set { UpdateDictAndInverse(key, value, false); }
}
public TKey this[TValue valueKey] {
get { return inverse[valueKey]; }
set { inverse[valueKey] = value; }
}
public override ICollection<TValue> Values {
get { return inverse.dict.Keys; }
}
public TwoWayDictionary()
: this(new Dictionary<TKey, TValue>(), new Dictionary<TValue, TKey>())
{
}
public TwoWayDictionary(IDictionary<TKey, TValue> forwards, IDictionary<TValue, TKey> backwards)
{
dict = Conditions.CheckNotNull(forwards, "forwards");
inverse = new TwoWayDictionary<TValue, TKey>(backwards, this);
}
private TwoWayDictionary(IDictionary<TKey, TValue> dict, TwoWayDictionary<TValue, TKey> inverse)
{
this.dict = Conditions.CheckNotNull(dict, "dict");
this.inverse = inverse;
}
public bool ContainsValue(TValue value)
{
return inverse.ContainsKey(value);
}
public bool TryGetKey(TValue value, out TKey key)
{
return inverse.TryGetValue(value, out key);
}
public override void Add(TKey key, TValue value)
{
UpdateDictAndInverse(key, value, true);
}
public override bool Remove(TKey key)
{
return RemoveFromDictAndInverse(key);
}
private void UpdateDictAndInverse(TKey key, TValue value, bool throwIfContained)
{
if (!throwIfContained) {
dict.Remove(key);
inverse.dict.Remove(value);
}
dict.Add(key, value);
inverse.dict.Add(value, key);
}
private bool RemoveFromDictAndInverse(TKey key)
{
TValue value;
if (!TryGetValue(key, out value)) {
return false;
}
return RemoveFromDictAndInverse(key, value);
}
private bool RemoveFromDictAndInverse(TKey key, TValue value)
{
if (!ContainsKey(key) || !ContainsValue(value)) {
return false;
}
return dict.Remove(key) && inverse.dict.Remove(value);
}
#region ICollection<KeyValuePair<TKey, TValue>>
public override bool Remove(KeyValuePair<TKey, TValue> kvp)
{
return RemoveFromDictAndInverse(kvp.Key, kvp.Value);
}
public override void Clear()
{
dict.Clear();
inverse.dict.Clear();
}
public override bool Contains(KeyValuePair<TKey, TValue> kvp)
{
return ContainsKey(kvp.Key) && ContainsValue(kvp.Value);
}
#endregion
}
}

@ -1,34 +0,0 @@
namespace Hessian.Collections
{
public class TwoWayDictionaryRefMap<T> : IRefMap<T>
{
private readonly TwoWayDictionary<T, int> map = new TwoWayDictionary<T, int>();
public int Add(T value)
{
var refid = map.Count;
map.Add(value, refid);
return refid;
}
public T Get(int refid)
{
T entry;
if (map.TryGetKey(refid, out entry)) {
return entry;
}
throw new InvalidRefException(refid);
}
public int? Lookup(T entry)
{
int refId;
if (map.TryGetValue(entry, out refId)) {
return refId;
}
return null;
}
}
}

@ -1,114 +0,0 @@
using System;
namespace Hessian
{
public class Conditions
{
protected Conditions()
{
}
public static void CheckArgument(bool condition, string message, params object[] args)
{
if (condition) {
return;
}
if (args.Length > 0) {
message = String.Format(message, args);
}
throw new ArgumentException(message);
}
public static T CheckNotNull<T>(T value, string name)
where T : class
{
if (!ReferenceEquals(value, null)) {
return value;
}
throw new ArgumentNullException(name);
}
public static TComparable CheckGreater<TComparable, TComparand>(TComparable value, TComparand bounds,
string name)
where TComparable : IComparable<TComparand>
{
if (value.CompareTo(bounds) > 0) {
return value;
}
throw new ArgumentOutOfRangeException(name);
}
public static TComparable CheckLess<TComparable, TComparand>(TComparable value, TComparand bounds,
string name)
where TComparable : IComparable<TComparand>
{
if (value.CompareTo(bounds) < 0) {
return value;
}
throw new ArgumentOutOfRangeException(name);
}
public static TComparable CheckGreaterOrEqual<TComparable, TComparand>(TComparable value, TComparand bounds,
string name)
where TComparable : IComparable<TComparand>
{
if (value.CompareTo(bounds) >= 0) {
return value;
}
throw new ArgumentOutOfRangeException(name);
}
public static TComparable CheckLessOrEqual<TComparable, TComparand>(TComparable value, TComparand bounds,
string name)
where TComparable : IComparable<TComparand>
{
if (value.CompareTo(bounds) <= 0) {
return value;
}
throw new ArgumentOutOfRangeException(name);
}
public static int CheckGreater(int value, int bounds, string name)
{
if (value > bounds) {
return value;
}
throw new ArgumentOutOfRangeException(name);
}
public static int CheckLess(int value, int bounds, string name)
{
if (value < bounds) {
return value;
}
throw new ArgumentOutOfRangeException(name);
}
public static int CheckGreaterOrEqual(int value, int bounds, string name)
{
if (value >= bounds) {
return value;
}
throw new ArgumentOutOfRangeException(name);
}
public static int CheckLessOrEqual(int value, int bounds, string name)
{
if (value <= bounds) {
return value;
}
throw new ArgumentOutOfRangeException(name);
}
}
}

File diff suppressed because it is too large Load Diff

@ -1,46 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Hessian
{
public class DictionaryTypeResolver
{
private readonly Dictionary<string, Func<IDictionary<object, object>>> constructors;
public DictionaryTypeResolver()
{
constructors = new Dictionary<string, Func<IDictionary<object, object>>> {
{"System.Collections.Hashtable", DefaultCtor},
{"System.Collections.Generic.IDictionary`2", DefaultCtor},
{"System.Collections.Generic.Dictionary`2", DefaultCtor},
{"System.Collections.IDictionary", DefaultCtor},
{"java.lang.Map", DefaultCtor},
{"java.util.HashMap", DefaultCtor},
{"java.util.EnumMap", DefaultCtor},
{"java.util.TreeMap", DefaultCtor},
{"java.util.concurrent.ConcurrentHashMap", DefaultCtor}
};
}
public bool TryGetInstance(string type, out IDictionary<object, object> instance)
{
instance = null;
if (!constructors.TryGetValue(type, out var ctor)) {
return false;
}
instance = ctor();
return true;
}
private static IDictionary<object, object> DefaultCtor()
{
return new Dictionary<object, object>();
}
}
}

@ -1,24 +0,0 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="../../build/version.props" />
<Import Project="../../build/releasenotes.props" />
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<DefineConstants>$(DefineConstants);DOTNETCORE</DefineConstants>
<Description>Hessian的编解码实现,并不完全哦</Description>
<Copyright>Xuanye @ 2018</Copyright>
<Authors>Xuanye</Authors>
<AssemblyTitle>Hession DotNet port</AssemblyTitle>
<AssemblyName>Hessian</AssemblyName>
<PackageId>Hessian</PackageId>
<Version>$(HessianPackageVersion)</Version>
<PackageTags>Hessian,xxl-job,protocol</PackageTags>
<PackageReleaseNotes>
$(HessianPackageNotes)
</PackageReleaseNotes>
<PackageProjectUrl>https://github.com/xuanye/DotXxlJob</PackageProjectUrl>
<PackageLicense>https://github.com/xuanye/DotXxlJob/blob/master/LICENSE</PackageLicense>
<PackageRequireLicenseAcceptance>false</PackageRequireLicenseAcceptance>
<RepositoryType>git</RepositoryType>
<RepositoryUrl>https://github.com/xuanye/DotXxlJob</RepositoryUrl>
</PropertyGroup>
</Project>

@ -1,24 +0,0 @@
using System;
namespace Hessian
{
public class HessianException : ApplicationException
{
public HessianException()
{
}
public HessianException(string message)
: base(message)
{
}
public HessianException(string message, Exception innerException)
: base(message, innerException)
{
}
}
}

@ -1,66 +0,0 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
namespace Hessian
{
public class HessianObject : IReadOnlyCollection<Tuple<string, object>>
{
private readonly string typeName;
private readonly IDictionary<string, object> fields;
public string TypeName => typeName;
public object this[string key] => fields[key];
public int Count => fields.Count;
private HessianObject(string typeName)
{
this.typeName = Conditions.CheckNotNull(typeName, "typeName");
fields = new Dictionary<string, object>();
}
public IEnumerator<Tuple<string, object>> GetEnumerator()
{
return fields.Select(kvp => Tuple.Create(kvp.Key, kvp.Value)).GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
public class Builder
{
private readonly HessianObject obj;
public HessianObject Object
{
get { return obj; }
}
private Builder(string typeName)
{
obj = new HessianObject(typeName);
}
public static Builder New(string typeName)
{
return new Builder(typeName);
}
public Builder Add(string field, object value)
{
obj.fields.Add(field, value);
return this;
}
public HessianObject Create()
{
return obj;
}
}
}
}

@ -1,27 +0,0 @@
using System;
using System.Collections;
using System.Collections.Generic;
namespace Hessian
{
public class HessianSerializationContext
{
public IList<Type> Classes
{
get;
private set;
}
public IList Instances
{
get;
private set;
}
public HessianSerializationContext()
{
Classes = new List<Type>();
Instances = new List<object>();
}
}
}

@ -1,15 +0,0 @@
using System;
namespace Hessian
{
public class InvalidRefException : HessianException
{
public int RefId { get; private set; }
public InvalidRefException(int refId)
: base(String.Format("Invalid ref ID: {0}", refId))
{
RefId = refId;
}
}
}

@ -1,66 +0,0 @@
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Hessian
{
public class ListTypeResolver
{
private readonly Dictionary<string, Func<IList<object>>> constructors = new Dictionary<string, Func<IList<object>>>();
private readonly Dictionary<string, Func<int, IList<object>>> length_constructors = new Dictionary<string, Func<int, IList<object>>>();
private readonly Func<IList<object>> empty_list_ctor = () => new List<object>();
private readonly Func<int, IList<object>> empty_list_ctor_with_length = length => new List<object>(length);
public ListTypeResolver()
{
constructors.Add("System.Collections.ArrayList", empty_list_ctor);
constructors.Add("System.Collections.List", empty_list_ctor);
constructors.Add("System.Collections.IList", empty_list_ctor);
constructors.Add("System.Collections.Generic.List`1", empty_list_ctor);
constructors.Add("System.Collections.Generic.IList`1", empty_list_ctor);
constructors.Add("System.Collections.ObjectModel.Collection`1", () => new Collection<object>());
constructors.Add("java.util.List", empty_list_ctor);
constructors.Add("java.util.Vector", empty_list_ctor);
constructors.Add("java.util.ArrayList", empty_list_ctor);
constructors.Add("java.util.LinkedList", empty_list_ctor);
length_constructors.Add("System.Collections.List", empty_list_ctor_with_length);
length_constructors.Add("System.Collections.IList", empty_list_ctor_with_length);
length_constructors.Add("System.Collections.Generic.List`1", empty_list_ctor_with_length);
length_constructors.Add("System.Collections.Generic.IList`1", empty_list_ctor_with_length);
length_constructors.Add("java.util.List", empty_list_ctor_with_length);
length_constructors.Add("java.util.Vector", empty_list_ctor_with_length);
length_constructors.Add("java.util.ArrayList", empty_list_ctor_with_length);
length_constructors.Add("java.util.LinkedList", empty_list_ctor_with_length);
}
public bool TryGetListInstance(string type, out IList<object> list)
{
list = null;
Func<IList<object>> ctor;
if (!constructors.TryGetValue(type, out ctor)) {
return false;
}
list = ctor();
return true;
}
public bool TryGetListInstance(string type, int length, out IList<object> list)
{
list = null;
Func<int, IList<object>> ctor;
if (!length_constructors.TryGetValue(type, out ctor)) {
return false;
}
list = ctor(length);
return true;
}
}
}

@ -1,39 +0,0 @@
namespace Hessian
{
internal static class Marker
{
public const byte True = (byte) 'T';//0x54;
public const byte False = (byte) 'F';// 0x46;
public const byte Null = (byte) 'N';//0x4E;
public const byte BinaryNonfinalChunk = (byte) 'b';//0x41;
public const byte BinaryFinalChunk = (byte) 'B';//0x42;
public const byte ClassDefinition = (byte) 'C';//0x43;
public const byte DateTimeLong = 0x4A;
public const byte DateTimeCompact = 0x4B;
public const byte Double = 0x5A;
public const byte DoubleZero = 0x5B;
public const byte DoubleOne = 0x5C;
public const byte DoubleOctet = 0x5D;
public const byte DoubleShort = 0x5E;
public const byte DoubleFloat = 0x5F;
public const byte UnpackedInteger = (byte) 'I';// 0x49;
public const byte PackedLong = (byte) 'Y';// 0x59;
public const byte UnpackedLong = (byte) 'L';// 0x4C;
public const byte StringNonfinalChunk = 0x52;
public const byte StringFinalChunk = 0x53;
public const byte VarList = 0x55;
public const byte FixedList = 0x56;
public const byte VarListUntyped = 0x57;
public const byte FixListUntyped = 0x58;
public const byte CompactFixListStart = 0x70;
public const byte CompactFixListEnd = 0x77;
public const byte CompactFixListUntypedStart = 0x78;
public const byte CompactFixListUntypedEnd = 0x7F;
public const byte ClassReference = (byte) 'O';//0x4F
public const byte InstanceReference = (byte) 'Q'; //0x51;
}
}

@ -1,142 +0,0 @@
using System;
using System.IO;
namespace Hessian
{
public class PeekStream : Stream
{
private Stream inner;
private byte? peek;
public PeekStream(Stream inner)
{
if (inner == null) {
throw new ArgumentNullException("inner");
}
this.inner = inner;
this.peek = null;
}
public override bool CanRead {
get {
return inner.CanRead;
}
}
public override bool CanSeek {
get {
return false;
}
}
public override bool CanWrite {
get {
return false;
}
}
public override long Length {
get {
return inner.Length;
}
}
public override long Position {
get {
return inner.Position - (peek.HasValue ? 1 : 0);
}
set {
throw new NotSupportedException("Seeking not supported.");
}
}
public byte? Peek ()
{
if (!peek.HasValue) {
var b = inner.ReadByte();
if (b == -1) {
return null;
}
peek = (byte) b;
}
return peek;
}
public override int ReadByte ()
{
if (peek.HasValue) {
var val = peek.Value;
peek = null;
return val;
}
return inner.ReadByte();
}
public override int Read (byte[] buffer, int offset, int count)
{
Conditions.CheckNotNull(buffer, "buffer");
Conditions.CheckGreaterOrEqual(offset, 0, "offset");
Conditions.CheckLess(offset, buffer.Length, "offset");
Conditions.CheckGreaterOrEqual(count, 0, "count");
Conditions.CheckArgument(
offset + count <= buffer.Length,
"Buffer is not big enough to contain the requested amount of data at the given offset.");
if (count == 0) {
return 0;
}
var bytesToRead = count;
if (peek.HasValue) {
buffer[offset++] = peek.Value;
peek = null;
--bytesToRead;
}
int bytesRead;
while (bytesToRead > 0 && (bytesRead = inner.Read (buffer, offset, bytesToRead)) != 0) {
offset += bytesRead;
bytesToRead -= bytesRead;
}
return count - bytesToRead;
}
public override void Write (byte[] buffer, int offset, int count)
{
throw new NotSupportedException("Writes not supported.");
}
public override void SetLength (long value)
{
throw new NotSupportedException("Seeking not supported.");
}
public override long Seek (long offset, SeekOrigin origin)
{
throw new NotSupportedException("Seeking not supported.");
}
public override void Flush ()
{
throw new NotSupportedException("Writes not supported.");
}
protected override void Dispose (bool disposing)
{
if (inner != null) {
inner.Dispose ();
inner = null;
}
base.Dispose (disposing);
}
}
}

@ -1,25 +0,0 @@
namespace Hessian.Platform
{
public class BigEndianBitConverter : EndianBitConverter
{
protected override long FromBytes(byte[] bytes, int offset, int count)
{
var result = 0L;
for (var i = 0; i < count; ++i)
{
result = (result << 8) | bytes[offset + i];
}
return result;
}
protected override void CopyBytes(long source, byte[] buffer, int offset, int count)
{
var end = offset + count - 1;
for (var i = 0; i < count; ++i)
{
buffer[end - i] = (byte)(source & 0xFF);
source >>= 8;
}
}
}
}

@ -1,215 +0,0 @@
using System;
using System.Runtime.InteropServices;
namespace Hessian.Platform
{
public abstract class EndianBitConverter
{
#region T -> byte[]
public byte[] GetBytes(bool value)
{
// One byte, no endianness
return BitConverter.GetBytes(value);
}
public byte[] GetBytes(char value)
{
return GetBytes(value, sizeof (char));
}
public byte[] GetBytes(short value)
{
return GetBytes(value, sizeof (short));
}
public byte[] GetBytes(ushort value)
{
return GetBytes(value, sizeof (ushort));
}
public byte[] GetBytes(int value)
{
return GetBytes(value, sizeof (int));
}
public byte[] GetBytes(uint value)
{
return GetBytes(value, sizeof (uint));
}
public byte[] GetBytes(long value)
{
return GetBytes(value, sizeof (long));
}
public byte[] GetBytes(ulong value)
{
return GetBytes((long)value, sizeof (ulong));
}
public byte[] GetBytes(float value)
{
return GetBytes(SingleToInt32(value), sizeof (int));
}
public byte[] GetBytes(double value)
{
return GetBytes(DoubleToInt64(value), sizeof (long));
}
private byte[] GetBytes(long value, int size)
{
var buffer = new byte[size];
CopyBytes(value, buffer, 0, size);
return buffer;
}
#endregion
#region byte[] -> T
public bool ToBoolean(byte[] value, int index)
{
// one byte, no endianness
return BitConverter.ToBoolean(value, index);
}
public char ToChar(byte[] value, int index)
{
return (char) FromBytes(value, index, sizeof (char));
}
public short ToInt16(byte[] value, int index)
{
return (short) FromBytes(value, index, sizeof (short));
}
public ushort ToUInt16(byte[] value, int index)
{
return (ushort) FromBytes(value, index, sizeof (ushort));
}
public int ToInt32(byte[] value, int index)
{
return (int) FromBytes(value, index, sizeof (int));
}
public uint ToUInt32(byte[] value, int index)
{
return (uint) FromBytes(value, index, sizeof (uint));
}
public long ToInt64(byte[] value, int index)
{
return FromBytes(value, index, sizeof (long));
}
public ulong ToUInt64(byte[] value, int index)
{
return (ulong) FromBytes(value, index, sizeof (ulong));
}
public float ToSingle(byte[] value, int index)
{
var int32 = (int) FromBytes(value, index, sizeof (int));
return Int32ToSingle(int32);
}
public double ToDouble(byte[] value, int index)
{
var int64 = FromBytes(value, index, sizeof (long));
return Int64ToDouble(int64);
}
#endregion
protected abstract long FromBytes(byte[] bytes, int offset, int count);
protected abstract void CopyBytes(long source, byte[] buffer, int index, int count);
private static int SingleToInt32(float value)
{
return new JonSkeetUnion32(value).AsInt;
}
private static float Int32ToSingle(int value)
{
return new JonSkeetUnion32(value).AsFloat;
}
private static long DoubleToInt64(double value)
{
return new JonSkeetUnion64(value).AsLong;
}
private static double Int64ToDouble(long value)
{
return new JonSkeetUnion64(value).AsDouble;
}
[StructLayout(LayoutKind.Explicit)]
private struct JonSkeetUnion32
{
[FieldOffset(0)]
private readonly int i;
[FieldOffset(0)]
private readonly float f;
public int AsInt
{
get { return i; }
}
public float AsFloat
{
get { return f; }
}
public JonSkeetUnion32(int value)
{
f = 0;
i = value;
}
public JonSkeetUnion32(float value)
{
i = 0;
f = value;
}
}
[StructLayout(LayoutKind.Explicit)]
private struct JonSkeetUnion64
{
[FieldOffset(0)]
private readonly long l;
[FieldOffset(0)]
private readonly double d;
public long AsLong
{
get { return l; }
}
public double AsDouble
{
get { return d; }
}
public JonSkeetUnion64(long value)
{
d = 0;
l = value;
}
public JonSkeetUnion64(double value)
{
l = 0;
d = value;
}
}
}
}

@ -1,25 +0,0 @@
namespace Hessian.Platform
{
public class LittleEndianBitConverter : EndianBitConverter
{
protected override long FromBytes(byte[] bytes, int offset, int count)
{
var result = 0L;
var end = offset + count - 1;
for (var i = 0; i < count; ++i)
{
result = (result << 8) | bytes[end - i];
}
return result;
}
protected override void CopyBytes(long source, byte[] buffer, int offset, int count)
{
for (var i = 0; i < count; ++i)
{
buffer[offset + i] = (byte)(source & 0xFF);
source >>= 8;
}
}
}
}

@ -1,31 +0,0 @@
using System;
using System.Collections.Generic;
using System.Reflection;
namespace Hessian
{
public class PropertyElement
{
public string Name { get; set; }
public int Order { get; set; }
public PropertyInfo PropertyInfo { get; set; }
}
internal class PropertyComparer : IComparer<PropertyElement>
{
private readonly IComparer<int> comparer;
public PropertyComparer()
{
comparer = Comparer<int>.Default;
}
public int Compare(PropertyElement x, PropertyElement y)
{
var eq = comparer.Compare(x.Order, y.Order);
return 0 == eq
? String.Compare(x.Name, y.Name, StringComparison.Ordinal)
: eq;
}
}
}

@ -1,523 +0,0 @@
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Runtime.Serialization;
using System.Text;
namespace Hessian
{
public class Serializer
{
static readonly ConcurrentDictionary<Type,ClassElement> ClassDefCache =new ConcurrentDictionary<Type, ClassElement>();
private readonly Stream _stream;
private readonly HessianSerializationContext _context;
public Serializer (Stream stream)
{
this._stream = stream ?? throw new ArgumentNullException(nameof(stream));
_context = new HessianSerializationContext();
}
public void WriteObject(object graph)
{
var objectType = graph.GetType();
if (!ClassDefCache.TryGetValue(objectType, out var classDef))
{
classDef = GetClassDef(objectType.GetTypeInfo());
}
var index = this._context.Instances.IndexOf(graph);
if (index > -1)
{
WriteInstanceReference(index);
return;
}
this._context.Instances.Add(graph);
index = this._context.Classes.IndexOf(objectType);
if (index < 0)
{
BeginClassDefinition();
WriteString(classDef.ClassName);
WriteInt32(classDef.Fields.Count);
foreach (var property in classDef.Fields)
{
WriteString(property.Name);
}
EndClassDefinition();
index = this._context.Classes.Count;
this._context.Classes.Add(objectType);
}
WriteObjectReference(index);
foreach (var item in classDef.Fields)
{
var value = item.PropertyInfo.GetValue(graph);
WriteValue(value);
}
}
public void WriteValue(object val)
{
if (val == null)
{
WriteNull();
return;
}
var valType = val.GetType();
var typeInfo = valType.GetTypeInfo();
if (IsSimpleType(typeInfo))
{
WriteSimpleValue(val);
return;
}
if (IsListType(typeInfo))
{
WriteListValue(val);
return;
}
WriteObject(val);
}
private void WriteListValue(object val)
{
var tag = (int)Marker.CompactFixListStart;
if (!(val is ICollection eVal))
{
throw new HessianException("write list data error");
}
tag += eVal.Count;
if (tag > Marker.CompactFixListEnd)
{
throw new HessianException("write list data error,tag too large");
}
this._stream.WriteByte((byte)tag);
int index = 1;
foreach (var item in eVal)
{
if (index == 1)
{
WriteString("["+GetItemTypeName(item.GetType()));
}
WriteValue(item);
index++;
}
}
private string GetItemTypeName(Type type)
{
var classType = type.GetCustomAttribute<DataContractAttribute>();
if (classType != null)
{
return classType.Name;
}
if (IsListType(type.GetTypeInfo()))
{
return "java.util.List";
}
return type.Name;
}
private void WriteSimpleValue(object val)
{
var valType = val.GetType();
if (valType == typeof(int))
{
WriteInt32((int)val);
}
else if (valType == typeof(bool))
{
WriteBoolean((bool)val);
}
else if (valType == typeof(long))
{
WriteInt64((long)val);
}
else if (valType == typeof(double))
{
WriteDouble((double)val);
}
else if (valType == typeof(string))
{
WriteString((string)val);
}
else if (valType == typeof(DateTime))
{
WriteDateTime((DateTime)val);
}
}
private ClassElement GetClassDef(TypeInfo typeInfo)
{
var classAttr = typeInfo.GetCustomAttribute<DataContractAttribute>();
if (classAttr == null)
{
throw new HessianException("DataContract must be set");
}
ClassElement ce = new ClassElement {ClassName = classAttr.Name, Fields = new List<PropertyElement>()};
//ClassDef def = new ClassDef(classAttr.Name);
foreach (var property in typeInfo.DeclaredProperties)
{
var attribute = property.GetCustomAttribute<DataMemberAttribute>();
if (null == attribute)
{
continue;
}
if (!property.CanRead || !property.CanWrite)
{
continue;
}
PropertyElement p = new PropertyElement {
Name = attribute.Name,
Order = attribute.Order,
PropertyInfo = property
};
ce.Fields .Add( p );
}
ce.Fields.Sort(new PropertyComparer());
return ce;
}
/// <summary>
/// Writes NULL token into stream
/// </summary>
public void WriteNull()
{
this._stream.WriteByte(Marker.Null);
}
/// <summary>
/// Writes <see cref="System.Boolean" /> value into output stream.
/// </summary>
/// <param name="value">The value.</param>
public void WriteBoolean(bool value)
{
this._stream.WriteByte(value ? Marker.True : Marker.False);
}
/// <summary>
/// Writes array of <see cref="System.Byte" /> into output stream.
/// </summary>
/// <param name="buffer">The value.</param>
public void WriteBytes(byte[] buffer)
{
if (null == buffer)
{
WriteNull();
return;
}
WriteBytes(buffer, 0, buffer.Length);
}
public void WriteBytes(byte[] buffer, int offset, int count)
{
if (offset < 0)
{
throw new ArgumentException("", nameof(offset));
}
if (null == buffer)
{
WriteNull();
return;
}
if (count < 0x10)
{
this._stream.WriteByte((byte)(0x20 + (count & 0x0F)));
this._stream.Write(buffer, offset, count);
return;
}
const int chunkSize = 0x8000;
while (count > chunkSize)
{
this._stream.WriteByte(Marker.BinaryNonfinalChunk);
this._stream.WriteByte(chunkSize >> 8);
this._stream.WriteByte(chunkSize & 0xFF);
this._stream.Write(buffer, offset, chunkSize);
count -= chunkSize;
offset += chunkSize;
}
this._stream.WriteByte(Marker.BinaryFinalChunk);
this._stream.WriteByte((byte)(count >> 8));
this._stream.WriteByte((byte)(count & 0xFF));
this._stream.Write(buffer, offset, count);
}
public void WriteDateTime(DateTime value)
{
if (value.Second == 0)
{
var s = value.GetTotalMinutes();
this._stream.WriteByte(Marker.DateTimeCompact);
this._stream.WriteByte((byte)(s >> 24));
this._stream.WriteByte((byte)(s >> 16));
this._stream.WriteByte((byte)(s >> 8));
this._stream.WriteByte((byte)s);
return;
}
var dt = value.GetTotalMilliseconds();
this._stream.WriteByte(Marker.DateTimeLong);
this._stream.WriteByte((byte)(dt >> 56));
this._stream.WriteByte((byte)(dt >> 48));
this._stream.WriteByte((byte)(dt >> 40));
this._stream.WriteByte((byte)(dt >> 32));
this._stream.WriteByte((byte)(dt >> 24));
this._stream.WriteByte((byte)(dt >> 16));
this._stream.WriteByte((byte)(dt >> 8));
this._stream.WriteByte((byte)dt);
}
public void WriteDouble(double value)
{
if (value.Equals(0.0d))
{
this._stream.WriteByte(Marker.DoubleZero);
return;
}
if (value.Equals(1.0d))
{
this._stream.WriteByte(Marker.DoubleOne);
return;
}
var fraction = Math.Abs(value - Math.Truncate(value));
if (Double.Epsilon >= fraction)
{
if (Byte.MinValue <= value && value <= Byte.MaxValue)
{
this._stream.WriteByte(Marker.DoubleOctet);
this._stream.WriteByte(Convert.ToByte(value));
return;
}
if (Int16.MinValue <= value && value <= Int16.MaxValue)
{
var val = Convert.ToInt16(value);
this._stream.WriteByte(Marker.DoubleShort);
this._stream.WriteByte((byte)(val >> 8));
this._stream.WriteByte((byte)val);
return;
}
}
if (Single.MinValue <= value && value <= Single.MaxValue)
{
var bytes = BitConverter.GetBytes((float) value);
this._stream.WriteByte(Marker.DoubleFloat);
for (var index = bytes.Length - 1; index >= 0; index--)
{
this._stream.WriteByte(bytes[index]);
}
return;
}
var temp = BitConverter.DoubleToInt64Bits(value);
this._stream.WriteByte(Marker.Double);
for (var index = 56; index >= 0; index -= 8)
{
this._stream.WriteByte((byte) (temp >> index));
}
}
public void WriteInt32(int value)
{
if (-16 <= value && value < 48)
{
this._stream.WriteByte((byte)(0x90 + value));
}
else if (-2048 <= value && value < 2048)
{
this._stream.WriteByte((byte)(0xC8 + (byte)(value >> 8)));
this._stream.WriteByte((byte)value);
}
else if (-262144 <= value && value < 262144)
{
this._stream.WriteByte((byte)(0xD4 + (byte)(value >> 16)));
this._stream.WriteByte((byte)(value >> 8));
this._stream.WriteByte((byte)value);
}
else
{
this._stream.WriteByte(Marker.UnpackedInteger);
this._stream.WriteByte((byte)(value >> 24));
this._stream.WriteByte((byte)(value >> 16));
this._stream.WriteByte((byte)(value >> 8));
this._stream.WriteByte((byte)value);
}
}
public void WriteInt64(long value)
{
if (-8 <= value && value < 16)
{
this._stream.WriteByte((byte)(0xE0 + value));
}
else if (-2048 <= value && value < 2048)
{
this._stream.WriteByte((byte)(0xF8 + (byte)(value >> 8)));
this._stream.WriteByte((byte)value);
}
else if (-262144 <= value && value < 262144)
{
this._stream.WriteByte((byte)(0x3C + (byte)(value >> 16)));
this._stream.WriteByte((byte)(value >> 8));
this._stream.WriteByte((byte)value);
}
else if (Int32.MinValue <= value && value <= Int32.MaxValue)
{
this._stream.WriteByte(Marker.PackedLong);
this._stream.WriteByte((byte)(value >> 24));
this._stream.WriteByte((byte)(value >> 16));
this._stream.WriteByte((byte)(value >> 8));
this._stream.WriteByte((byte)value);
}
else
{
this._stream.WriteByte(Marker.UnpackedLong);
this._stream.WriteByte((byte)(value >> 56));
this._stream.WriteByte((byte)(value >> 48));
this._stream.WriteByte((byte)(value >> 40));
this._stream.WriteByte((byte)(value >> 32));
this._stream.WriteByte((byte)(value >> 24));
this._stream.WriteByte((byte)(value >> 16));
this._stream.WriteByte((byte)(value >> 8));
this._stream.WriteByte((byte)value);
}
}
public void WriteString(string value)
{
if (string.IsNullOrEmpty(value))
{
this._stream.WriteByte(0x00);
return;
}
var length = value.Length;
if (1024 > length)
{
var bytes = Encoding.UTF8.GetBytes(value.ToCharArray());
if (32 > length)
{
this._stream.WriteByte((byte) length);
}
else
{
this._stream.WriteByte((byte) (0x30 + (byte) (length >> 8)));
this._stream.WriteByte((byte) length);
}
this._stream.Write(bytes, 0, bytes.Length);
return;
}
const int maxChunkLength = 1024;
var position = 0;
while (position < length)
{
var count = Math.Min(length - position, maxChunkLength);
var final = length == (position + count);
var chunk = value.Substring(position, count);
var bytes = Encoding.UTF8.GetBytes(chunk.ToCharArray());
this._stream.WriteByte(final ? Marker.StringFinalChunk : Marker.StringNonfinalChunk);
this._stream.WriteByte((byte)(count >> 8));
this._stream.WriteByte((byte)count);
this._stream.Write(bytes, 0, bytes.Length);
position += count;
}
}
public void BeginClassDefinition()
{
this._stream.WriteByte(Marker.ClassDefinition);
}
public void EndClassDefinition()
{
}
public void WriteObjectReference(int index)
{
if (index < 0x10)
{
this._stream.WriteByte((byte)(0x60 + index));
}
else
{
this._stream.WriteByte(Marker.ClassReference);
WriteInt32(index);
}
}
public void WriteInstanceReference(int index)
{
this._stream.WriteByte(Marker.InstanceReference);
WriteInt32(index);
}
private static bool IsSimpleType(TypeInfo typeInfo)
{
if (typeInfo.IsValueType || typeInfo.IsEnum || typeInfo.IsPrimitive)
{
return true;
}
if (typeof (String) == typeInfo.AsType())
{
return true;
}
return false;
}
private static bool IsListType(TypeInfo typeInfo)
{
return typeof(ICollection).IsAssignableFrom(typeInfo);
}
}
}

@ -1,39 +0,0 @@
using System;
using System.IO;
using System.Text;
namespace Hessian
{
public static class StringBuilderExtensions
{
public static StringBuilder AppendCodepoint(this StringBuilder sb, uint codepoint)
{
if (codepoint < 0x10000) {
return sb.Append((char)codepoint);
}
var n = codepoint - 0x10000;
var high = (char)((n >> 10) + 0xD800);
var low = (char)((n & 0x3FF) + 0xDC00);
AssertValidSurrogates(high, low);
return sb
.Append (high)
.Append (low);
}
[System.Diagnostics.Conditional("DEBUG")]
private static void AssertValidSurrogates (char high, char low)
{
if (!Char.IsHighSurrogate (high)) {
throw new InvalidDataException ("Invalid high surrogate");
}
if (!Char.IsLowSurrogate (low)) {
throw new InvalidDataException ("Invalid low surrogate");
}
}
}
}

@ -1,18 +0,0 @@
namespace Hessian
{
public class UnexpectedTagException : HessianException
{
public byte Tag { get; private set; }
public UnexpectedTagException(byte tag, string expectedType)
: base(FormatErrorMessage(tag, expectedType))
{
Tag = tag;
}
private static string FormatErrorMessage(byte tag, string expectedType)
{
return string.Format("{0:X} is not a valid {1} tag.", tag, expectedType);
}
}
}

@ -1,163 +0,0 @@
using System;
using System.IO;
namespace Hessian
{
public class ValueReader
{
private byte[] buffer = new byte[8];
private PeekStream stream;
public ValueReader (Stream stream)
{
this.stream = stream as PeekStream ?? new PeekStream(stream);
}
public byte? Peek ()
{
return stream.Peek ();
}
public short ReadShort ()
{
Read (buffer, 0, 2);
return BitConverter.ToInt16(buffer, 0);
}
public int ReadInt()
{
Read (buffer, 0, 4);
return BitConverter.ToInt32(buffer, 0);
}
public uint ReadUtf8Codepoint ()
{
const uint replacementChar = 0xFFFD;
byte b0, b1, b2, b3;
b0 = ReadByte ();
if (b0 < 0x80) {
return b0;
}
if (b0 < 0xC2) {
return replacementChar;
}
if (b0 < 0xE0) {
b1 = ReadByte ();
if ((b1 ^ 0x80) >= 0x40) {
return replacementChar;
}
return (b1 & 0x3Fu) | ((b0 & 0x1Fu) << 6);
}
if (b0 < 0xF0) {
b1 = ReadByte ();
b2 = ReadByte ();
// Valid range: E0 A0..BF 80..BF
if (b0 == 0xE0 && (b1 ^ 0xA0) >= 0x20) {
return replacementChar;
}
// Valid range: ED 80..9F 80..BF
if (b0 == 0xED && (b1 ^ 0x80) >= 0x20) {
return replacementChar;
}
// Valid range: E1..EC 80..BF 80..BF
if ((b1 ^ 0x80) >= 0x40 || (b2 ^ 0x80) >= 0x40) {
return replacementChar;
}
return (b2 & 0x3Fu)
| ((b1 & 0x3Fu) << 6)
| ((b0 & 0x0Fu) << 12);
}
if (b0 < 0xF1) {
b1 = ReadByte();
if ((b1 ^ 0x90) < 0x30) {
return replacementChar;
}
b2 = ReadByte();
b3 = ReadByte();
if ((b2 & 0xC0) != 0x80 || (b3 & 0xC0) != 0x80) {
return replacementChar;
}
return (b3 & 0x3Fu)
| ((b2 & 0x3Fu) << 6)
| ((b1 & 0x3Fu) << 12)
| ((b0 & 0x07u) << 18);
}
if (b0 < 0xF4) {
b1 = ReadByte ();
b2 = ReadByte ();
b3 = ReadByte ();
// Valid range: F1..F3 80..BF 80..BF 80..BF
if ((b1 & 0xC0) != 0x80 || (b2 & 0xC0) != 0x80 || (b3 & 0xC0) != 0x80)
{
return replacementChar;
}
return (b3 & 0x3Fu)
| ((b2 & 0x3Fu) << 6)
| ((b1 & 0x3Fu) << 12)
| ((b0 & 0x07u) << 18);
}
if (b0 < 0xF5) {
b1 = ReadByte ();
// Valid range: F4 80..8F 80..BF 80..BF
if ((b1 ^ 0x80) >= 0x10) {
return replacementChar;
}
b2 = ReadByte();
b3 = ReadByte();
if ((b2 & 0xC0) != 0x80 || (b3 & 0xC0) != 0x80)
{
return replacementChar;
}
return (b3 & 0x3Fu)
| ((b2 & 0x3Fu) << 6)
| ((b1 & 0x3Fu) << 12)
| ((b0 & 0x07u) << 18);
}
return replacementChar;
}
public byte ReadByte()
{
var b = stream.ReadByte();
if (b == -1) throw new EndOfStreamException();
return (byte)b;
}
public void Read(byte[] buffer, int count)
{
Read (buffer, 0, count);
}
private void Read(byte[] buffer, int offset, int count)
{
var bytesRead = stream.Read (buffer, offset, count);
if (bytesRead != count) throw new EndOfStreamException();
}
}
}

@ -1,19 +0,0 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netcoreapp2.2</TargetFramework>
<IsPackable>false</IsPackable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.9.0" />
<PackageReference Include="xunit" Version="2.4.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\Hessian\Hessian.csproj" />
</ItemGroup>
</Project>

@ -1,13 +0,0 @@
using System;
using Xunit;
namespace Hessian.Tests
{
public class UnitTest1
{
[Fact]
public void Test1()
{
}
}
}
Loading…
Cancel
Save