1. Transient功能修复 2、JobLogger 增加 LogId 3、打包 Kingo.XxlJob.Core v1.0.0

master
lztkdr 1 month ago
parent 727232a743
commit ad792ec329
  1. 169
      README.md
  2. 168
      README_SRC.md
  3. 41
      src/DotXxlJob.Core/DefaultJobHandlerFactory.cs
  4. 51
      src/DotXxlJob.Core/DotXxlJob.Core.csproj
  5. 8
      src/DotXxlJob.Core/Extensions/ServiceCollectionExtensions.cs
  6. 7
      src/DotXxlJob.Core/Logger/IJobLogger.cs
  7. 81
      src/DotXxlJob.Core/Logger/JobLogger.cs

@ -1,168 +1,13 @@
# DotXxlJob
xxl-job的dotnet core 最新执行器实现,支持XXL-JOB 2.2+
> 注意XXL-JOB 2.0.1版本请使用 1.0.8的执行器实现 ,*xxl-job* 从 2.0.2 到2.2版本又使用了xxl-rpc的新协议,本执行器不做支持,确实需要的朋友请自行fork..
# Kingo.XxlJob.Core
## 1 XXL-JOB概述
[XXL-JOB][1]是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。以下是它的架构图
![架构图](https://raw.githubusercontent.com/xuxueli/xxl-job/master/doc/images/img_Qohm.png)
该仓库,基于 https://github.com/xuanye/DotXxlJob 源码基础上,进行扩展改造,原始项目说明,见 [README_SRC.md](README_SRC.md) 。
更新内容:
1. 修复IJobHandler 注册 AddTransient瞬时服务注册,瞬时不起作用的情况。(2025.2.26)
## 2. 关于DotXxlJob产生
在工作中调研过多个任务调度平台,如Hangfire、基于Quatz.NET的第三方扩展,都与实际的需求有一点差距。 之前一直使用Hangfire,Hangfire的执行器在同步调用业务服务时,如果即时业务服务正在重新部署或者重启,有一定概率会出现死锁,导致CPU100%,后来全部调整为异步,但是这样就无法获得执行结果,这样的设计有蛮大问题,XxlJob的回调机制很好的解决了这个问题。本身如果通过http的方式调用,只要部署springbootd的一个执行器就可以解决问题,但是扩展性较差。所以萌生了实现DotNet版本的执行器的想法,为避免重复造轮子,开始之前也进行过调研,以下仓库[https://github.com/yuniansheng/xxl-job-dotnet][2]给了较大的启发,但是该库只支持1.9版本的xxljob,还有一些其他小问题,所以还是自力更生。
示例:GTYPackageJobService 服务的 Push_Common_Job 、GetJobNames 重写,原先 GetJobNames 虽然是Transient服务,任务Execute会执行多次,但是GetJobNames 之前只会执行一次,调整后会执行多次。
## 3. 如何使用
2. JobExecuteContext下 的 IJobLogger 增加 LogId 属性,用于通过 LogId 查看Job日志。(2025.2.26)
目前只实现了BEAN的方式,即直接实现IJobHandler调用的方式,Glue源码的方式实际上实现起来也并不复杂(有需求再说把),或者各位有需求Fork 实现一下
可参考sample
安装:
> dotnet add package DotXxlJob.Core
### 3.1 在AspNetCore中使用
1. 声明一个AspNet的Middleware中间件,并扩展ApplicationBuilder,本质是拦截Post请求,解析Body中的流信息
```
public class XxlJobExecutorMiddleware
{
private readonly IServiceProvider _provider;
private readonly RequestDelegate _next;
private readonly XxlRestfulServiceHandler _rpcService;
public XxlJobExecutorMiddleware(IServiceProvider provider, RequestDelegate next)
{
this._provider = provider;
this._next = next;
this._rpcService = _provider.GetRequiredService<XxlRestfulServiceHandler>();
}
public async Task Invoke(HttpContext context)
{
string contentType = context.Request.ContentType;
if ("POST".Equals(context.Request.Method, StringComparison.OrdinalIgnoreCase)
&& !string.IsNullOrEmpty(contentType)
&& contentType.ToLower().StartsWith("application/json"))
{
await _rpcService.HandlerAsync(context.Request,context.Response);
return;
}
await _next.Invoke(context);
}
}
```
扩展ApplicationBuilderExtensions,可根据实际情况绑定在特殊的Url Path上
```
public static class ApplicationBuilderExtensions
{
public static IApplicationBuilder UseXxlJobExecutor(this IApplicationBuilder @this)
{
return @this.UseMiddleware<XxlJobExecutorMiddleware>();
}
}
```
在Startup中添加必要的引用,其中自动注册。
```
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
private IConfiguration Configuration { get; }
public void ConfigureServices(IServiceCollection services)
{
services.AddXxlJobExecutor(Configuration);
services.AddSingleton<IJobHandler, DemoJobHandler>(); // 添加自定义的jobHandler
services.AddAutoRegistry(); // 自动注册
}
public void Configure(IApplicationBuilder app,IHostingEnvironment env)
{
//启用XxlExecutor
app.UseXxlJobExecutor();
}
}
```
编写JobHandler,继承AbstractJobHandler或者直接实现接口IJobHandler,通过context.JobLogger 记录执行过程和结果,在AdminWeb上可查看的哦
```
[JobHandler("demoJobHandler")]
public class DemoJobHandler:AbstractJobHandler
{
public override Task<ReturnT> Execute(JobExecuteContext context)
{
context.JobLogger.Log("receive demo job handler,parameter:{0}",context.JobParameter);
return Task.FromResult(ReturnT.SUCCESS);
}
}
```
## 3.2 配置信息
管理端地址和端口是必填信息,其他根据实际情况,选择配置,配置项说明见下代码中的注释
```
public class XxlJobExecutorOptions
{
/// <summary>
/// 管理端地址,多个以;分隔
/// </summary>
public string AdminAddresses { get; set; }
/// <summary>
/// appName自动注册时要去管理端配置一致
/// </summary>
public string AppName { get; set; } = "xxl-job-executor-dotnet";
/// <summary>
/// 自动注册时提交的地址,为空会自动获取内网地址
/// </summary>
public string SpecialBindAddress { get; set; }
/// <summary>
/// 绑定端口
/// </summary>
public int Port { get; set; }
/// <summary>
/// 是否自动注册
/// </summary>
public bool AutoRegistry { get; set; }
/// <summary>
/// 认证票据
/// </summary>
public string AccessToken { get; set; }
/// <summary>
/// 日志目录,默认为执行目录的logs子目录下,请配置绝对路径
/// </summary>
public string LogPath { get; set; } = Path.Combine(AppContext.BaseDirectory, "./logs");
/// <summary>
/// 日志保留天数
/// </summary>
public int LogRetentionDays { get; set; } = 30;
}
```
## 其他说明
注意XXL-JOB 2.0.1版本请使用 1.0.8的执行器实现
有任何问题,可Issue反馈 ,最后感谢 xxl-job
[1]: http://www.xuxueli.com/xxl-job
[2]: https://github.com/yuniansheng/xxl-job-dotnet
示例:任务执行后,可能有异常,有时找不到在哪次执行,可记录LogId,后续通过访问链接地址查看当时执行的日志。如 LogId 是 478571085,可访问对应的xjob服务的日志链接地址:https://xxx/xxl-job-admin/joblog/logDetailPage?id=478571085

@ -0,0 +1,168 @@
# DotXxlJob
xxl-job的dotnet core 最新执行器实现,支持XXL-JOB 2.2+
> 注意XXL-JOB 2.0.1版本请使用 1.0.8的执行器实现 ,*xxl-job* 从 2.0.2 到2.2版本又使用了xxl-rpc的新协议,本执行器不做支持,确实需要的朋友请自行fork..
## 1 XXL-JOB概述
[XXL-JOB][1]是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。以下是它的架构图
![架构图](https://raw.githubusercontent.com/xuxueli/xxl-job/master/doc/images/img_Qohm.png)
## 2. 关于DotXxlJob产生
在工作中调研过多个任务调度平台,如Hangfire、基于Quatz.NET的第三方扩展,都与实际的需求有一点差距。 之前一直使用Hangfire,Hangfire的执行器在同步调用业务服务时,如果即时业务服务正在重新部署或者重启,有一定概率会出现死锁,导致CPU100%,后来全部调整为异步,但是这样就无法获得执行结果,这样的设计有蛮大问题,XxlJob的回调机制很好的解决了这个问题。本身如果通过http的方式调用,只要部署springbootd的一个执行器就可以解决问题,但是扩展性较差。所以萌生了实现DotNet版本的执行器的想法,为避免重复造轮子,开始之前也进行过调研,以下仓库[https://github.com/yuniansheng/xxl-job-dotnet][2]给了较大的启发,但是该库只支持1.9版本的xxljob,还有一些其他小问题,所以还是自力更生。
## 3. 如何使用
目前只实现了BEAN的方式,即直接实现IJobHandler调用的方式,Glue源码的方式实际上实现起来也并不复杂(有需求再说把),或者各位有需求Fork 实现一下
可参考sample
安装:
> dotnet add package DotXxlJob.Core
### 3.1 在AspNetCore中使用
1. 声明一个AspNet的Middleware中间件,并扩展ApplicationBuilder,本质是拦截Post请求,解析Body中的流信息
```
public class XxlJobExecutorMiddleware
{
private readonly IServiceProvider _provider;
private readonly RequestDelegate _next;
private readonly XxlRestfulServiceHandler _rpcService;
public XxlJobExecutorMiddleware(IServiceProvider provider, RequestDelegate next)
{
this._provider = provider;
this._next = next;
this._rpcService = _provider.GetRequiredService<XxlRestfulServiceHandler>();
}
public async Task Invoke(HttpContext context)
{
string contentType = context.Request.ContentType;
if ("POST".Equals(context.Request.Method, StringComparison.OrdinalIgnoreCase)
&& !string.IsNullOrEmpty(contentType)
&& contentType.ToLower().StartsWith("application/json"))
{
await _rpcService.HandlerAsync(context.Request,context.Response);
return;
}
await _next.Invoke(context);
}
}
```
扩展ApplicationBuilderExtensions,可根据实际情况绑定在特殊的Url Path上
```
public static class ApplicationBuilderExtensions
{
public static IApplicationBuilder UseXxlJobExecutor(this IApplicationBuilder @this)
{
return @this.UseMiddleware<XxlJobExecutorMiddleware>();
}
}
```
在Startup中添加必要的引用,其中自动注册。
```
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
private IConfiguration Configuration { get; }
public void ConfigureServices(IServiceCollection services)
{
services.AddXxlJobExecutor(Configuration);
services.AddSingleton<IJobHandler, DemoJobHandler>(); // 添加自定义的jobHandler
services.AddAutoRegistry(); // 自动注册
}
public void Configure(IApplicationBuilder app,IHostingEnvironment env)
{
//启用XxlExecutor
app.UseXxlJobExecutor();
}
}
```
编写JobHandler,继承AbstractJobHandler或者直接实现接口IJobHandler,通过context.JobLogger 记录执行过程和结果,在AdminWeb上可查看的哦
```
[JobHandler("demoJobHandler")]
public class DemoJobHandler:AbstractJobHandler
{
public override Task<ReturnT> Execute(JobExecuteContext context)
{
context.JobLogger.Log("receive demo job handler,parameter:{0}",context.JobParameter);
return Task.FromResult(ReturnT.SUCCESS);
}
}
```
## 3.2 配置信息
管理端地址和端口是必填信息,其他根据实际情况,选择配置,配置项说明见下代码中的注释
```
public class XxlJobExecutorOptions
{
/// <summary>
/// 管理端地址,多个以;分隔
/// </summary>
public string AdminAddresses { get; set; }
/// <summary>
/// appName自动注册时要去管理端配置一致
/// </summary>
public string AppName { get; set; } = "xxl-job-executor-dotnet";
/// <summary>
/// 自动注册时提交的地址,为空会自动获取内网地址
/// </summary>
public string SpecialBindAddress { get; set; }
/// <summary>
/// 绑定端口
/// </summary>
public int Port { get; set; }
/// <summary>
/// 是否自动注册
/// </summary>
public bool AutoRegistry { get; set; }
/// <summary>
/// 认证票据
/// </summary>
public string AccessToken { get; set; }
/// <summary>
/// 日志目录,默认为执行目录的logs子目录下,请配置绝对路径
/// </summary>
public string LogPath { get; set; } = Path.Combine(AppContext.BaseDirectory, "./logs");
/// <summary>
/// 日志保留天数
/// </summary>
public int LogRetentionDays { get; set; } = 30;
}
```
## 其他说明
注意XXL-JOB 2.0.1版本请使用 1.0.8的执行器实现
有任何问题,可Issue反馈 ,最后感谢 xxl-job
[1]: http://www.xuxueli.com/xxl-job
[2]: https://github.com/yuniansheng/xxl-job-dotnet

@ -1,24 +1,34 @@
using System;
using System.Collections.Generic;
using System.ComponentModel.Design;
using System.Linq;
using System.Reflection;
using System.Runtime.InteropServices;
using Microsoft.Extensions.DependencyInjection;
namespace DotXxlJob.Core
{
public class DefaultJobHandlerFactory:IJobHandlerFactory
public class DefaultJobHandlerFactory : IJobHandlerFactory
{
private readonly IServiceProvider _provider;
private readonly IServiceCollection _services;
private readonly Dictionary<string, IJobHandler> handlersCache = new Dictionary<string, IJobHandler>();
public DefaultJobHandlerFactory(IServiceProvider provider)
private readonly Dictionary<string, Type> NoSingleTypeCache = new Dictionary<string, Type>();
public DefaultJobHandlerFactory(IServiceProvider provider, IServiceCollection services)
{
this._provider = provider;
this._services = services;
Initialize();
}
private void Initialize()
{
var list = this._provider.GetServices<IJobHandler>();
var svcDprs = this._services.Where(v => v.ServiceType == typeof(IJobHandler)).ToList();
if (list == null || !list.Any())
{
throw new TypeLoadException("IJobHandlers are not found in IServiceCollection");
@ -26,24 +36,37 @@ namespace DotXxlJob.Core
foreach (var handler in list)
{
var jobHandlerAttr = handler.GetType().GetCustomAttribute<JobHandlerAttribute>();
var handlerName = jobHandlerAttr == null ? handler.GetType().Name : jobHandlerAttr.Name;
var handlerType = handler.GetType();
var svcDpr = this._services.FirstOrDefault(v => v.ServiceType == typeof(IJobHandler) && v.ImplementationType == handlerType);
var jobHandlerAttr = handlerType.GetCustomAttribute<JobHandlerAttribute>();
var handlerName = jobHandlerAttr == null ? handlerType.Name : jobHandlerAttr.Name;
if (handlersCache.ContainsKey(handlerName))
{
throw new Exception($"same IJobHandler' name: [{handlerName}]");
throw new Exception($"same IJobHandler' name: [{handlerName}]");
}
if (svcDpr.Lifetime == ServiceLifetime.Singleton)
{
handlersCache.Add(handlerName, handler);
}
else
{
NoSingleTypeCache.Add(handlerName, svcDpr.ImplementationType);
}
handlersCache.Add(handlerName,handler);
}
}
public IJobHandler GetJobHandler(string handlerName)
{
if (handlersCache.ContainsKey(handlerName))
{
return handlersCache[handlerName];
return handlersCache[handlerName];
}
else
{
var handler = NoSingleTypeCache[handlerName];
return this._provider.GetService(handler) as IJobHandler;
}
return null;
}
}
}

@ -1,34 +1,25 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="../../build/version.props" />
<Import Project="../../build/releasenotes.props" />
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<DefineConstants>$(DefineConstants);DOTNETCORE</DefineConstants>
<Description>XxlJobExecutor DotNet port</Description>
<Copyright>Xuanye @ 2019</Copyright>
<Authors>Xuanye</Authors>
<AssemblyTitle>XxlJobExecutor DotNet port</AssemblyTitle>
<AssemblyName>DotXxlJob.Core</AssemblyName>
<PackageId>DotXxlJob.Core</PackageId>
<Version>$(DotXxlJobPackageVersion)</Version>
<PackageTags>Hession,xxl-job,DotXxlJob</PackageTags>
<PackageReleaseNotes>
$(DotXxlJobPackageNotes)
</PackageReleaseNotes>
<PackageProjectUrl>https://github.com/xuanye/DotXxlJob</PackageProjectUrl>
<PackageLicense>https://github.com/xuanye/DotXxlJob/blob/master/LICENSE</PackageLicense>
<PackageRequireLicenseAcceptance>false</PackageRequireLicenseAcceptance>
<RepositoryType>git</RepositoryType>
<RepositoryUrl>https://github.com/xuanye/DotXxlJob</RepositoryUrl>
</PropertyGroup>
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<Copyright>Kingo @ 2025</Copyright>
<AssemblyName>Kingo.XxlJob.Core</AssemblyName>
<Description>Xxl-Job中间件。</Description>
<PackageId>Kingo.XxlJob.Core</PackageId>
<Version>1.0.0</Version>
<Authors>Kingo</Authors>
<PackageTags>xxl,job,xxl-job,DotXxlJob,xxljob</PackageTags>
<PackageRequireLicenseAcceptance>false</PackageRequireLicenseAcceptance>
<Company>http://www.kingoit.com</Company>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Flurl.Http" Version="2.4.2" />
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Http" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="2.2.0" />
<PackageReference Include="Utf8Json" Version="1.3.7" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Flurl.Http" Version="2.4.2" />
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Http" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="2.2.0" />
<PackageReference Include="Utf8Json" Version="1.3.7" />
</ItemGroup>
</Project>

@ -1,4 +1,5 @@
using System;
using System.Linq;
using DotXxlJob.Core.Config;
using DotXxlJob.Core.DefaultHandlers;
using DotXxlJob.Core.Queue;
@ -15,6 +16,7 @@ namespace DotXxlJob.Core
{
services.AddLogging();
services.AddOptions();
services.AddSingleton(services);
services.Configure<XxlJobExecutorOptions>(configuration.GetSection("xxlJob"))
.AddXxlJobExecutorServiceDependency();
@ -37,6 +39,12 @@ namespace DotXxlJob.Core
{
services.AddSingleton<IExecutorRegistry,ExecutorRegistry>()
.AddSingleton<IHostedService,JobsExecuteHostedService>();
var descriptors = services.Where(v => v.Lifetime != ServiceLifetime.Singleton && v.ServiceType == typeof(IJobHandler)).ToList();
foreach (var desc in descriptors)
{
services.AddTransient(desc.ImplementationType);
}
return services;
}

@ -5,9 +5,12 @@ namespace DotXxlJob.Core
{
public interface IJobLogger
{
long LogId { get; }
string LogPath { get; }
void SetLogFile(long logTime, long logId);
void Log(string pattern, params object[] format);
@ -16,7 +19,7 @@ namespace DotXxlJob.Core
LogResult ReadLog(long logTime, long logId, int fromLine);
void LogSpecialFile(long logTime, long logId, string pattern, params object[] format);
}

@ -12,20 +12,50 @@ using Microsoft.Extensions.Options;
namespace DotXxlJob.Core
{
public class JobLogger:IJobLogger
public class JobLogger : IJobLogger
{
private readonly ILogger<JobLogger> _logger;
private readonly AsyncLocal<string> LogFileName = new AsyncLocal<string>();
private readonly XxlJobExecutorOptions _options;
public JobLogger(IOptions<XxlJobExecutorOptions> optionsAccessor,ILogger<JobLogger> logger)
private static readonly object locker = new object();
public JobLogger(IOptions<XxlJobExecutorOptions> optionsAccessor, ILogger<JobLogger> logger)
{
this._logger = logger;
this._options = optionsAccessor.Value;
}
public void SetLogFile(long logTime, long logId)
public long _LogId = 0;
public long LogId
{
get
{
return _LogId;
}
private set
{
_LogId = value;
}
}
public string _LocalPath = null;
public string LogPath
{
get
{
return _LocalPath;
}
private set
{
_LocalPath = value;
}
}
void IJobLogger.SetLogFile(long logTime, long logId)
{
try
{
@ -37,6 +67,8 @@ namespace DotXxlJob.Core
CleanOldLogs();
}
LogFileName.Value = filePath;
this.LogId = logId;
this.LogPath = filePath;
}
catch (Exception ex)
{
@ -128,29 +160,32 @@ namespace DotXxlJob.Core
}
private void LogDetail(string logFileName, StackFrame callInfo, string appendLog)
{
if (string.IsNullOrEmpty(logFileName))
lock (locker)
{
return;
}
if (string.IsNullOrEmpty(logFileName))
{
return;
}
var stringBuffer = new StringBuilder();
stringBuffer
.Append(DateTime.Now.ToString("s")).Append(" ")
.Append("[" + callInfo.GetMethod().DeclaringType.FullName + "#" + callInfo.GetMethod().Name + "]").Append("-")
.Append("[line " + callInfo.GetFileLineNumber() + "]").Append("-")
.Append("[thread " + Thread.CurrentThread.ManagedThreadId + "]").Append(" ")
.Append(appendLog ?? string.Empty)
.AppendLine();
var formatAppendLog = stringBuffer.ToString();
var stringBuffer = new StringBuilder();
stringBuffer
.Append(DateTime.Now.ToString("s")).Append(" ")
.Append("[" + callInfo.GetMethod().DeclaringType.FullName + "#" + callInfo.GetMethod().Name + "]").Append("-")
.Append("[line " + callInfo.GetFileLineNumber() + "]").Append("-")
.Append("[thread " + Thread.CurrentThread.ManagedThreadId + "]").Append(" ")
.Append(appendLog ?? string.Empty)
.AppendLine();
try
{
File.AppendAllText(logFileName, formatAppendLog, Encoding.UTF8);
}
catch (Exception ex)
{
this._logger.LogError(ex, "LogDetail error");
var formatAppendLog = stringBuffer.ToString();
try
{
File.AppendAllText(logFileName, formatAppendLog, Encoding.UTF8);
}
catch (Exception ex)
{
this._logger.LogError(ex, "LogDetail error");
}
}
}

Loading…
Cancel
Save