允许在Scoped环境里执行Job

pull/19/head
彭伟 4 years ago
parent e7cd2bcd9f
commit 63b46caeea
  1. 4
      samples/ASPNetCoreExecutor/ASPNetCoreExecutor.csproj
  2. 8
      samples/ASPNetCoreExecutor/DemoJobHandler.cs
  3. 8
      samples/ASPNetCoreExecutor/Startup.cs
  4. 51
      src/DotXxlJob.Core/DefaultJobHandlerFactory.cs
  5. 76
      src/DotXxlJob.Core/Extensions/ServiceCollectionExtensions.cs
  6. 5
      src/DotXxlJob.Core/IJobHandlerFactory.cs
  7. 62
      src/DotXxlJob.Core/JobHandlerCache.cs
  8. 34
      src/DotXxlJob.Core/TaskExecutors/BeanTaskExecutor.cs
  9. 59
      tests/DotXxlJob.Core.Tests/BeanTaskExecutorTest.cs
  10. 42
      tests/DotXxlJob.Core.Tests/DefaultJobHandlerFactory.cs
  11. 27
      tests/DotXxlJob.Core.Tests/DotXxlJob.Core.Tests.csproj
  12. 31
      tests/DotXxlJob.Core.Tests/JobHandlerCacheTest.cs
  13. 13
      tests/DotXxlJob.Core.Tests/UnitTest1.cs

@ -1,13 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk.Web"> <Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup> <PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework> <TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Console"> <PackageReference Include="Microsoft.Extensions.Logging.Console">
<Version>2.2.0</Version> <Version>2.2.0</Version>
</PackageReference> </PackageReference>
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

@ -8,12 +8,12 @@ namespace ASPNetCoreExecutor
/// 示例Job,只是写个日志 /// 示例Job,只是写个日志
/// </summary> /// </summary>
[JobHandler("demoJobHandler")] [JobHandler("demoJobHandler")]
public class DemoJobHandler:AbstractJobHandler public class DemoJobHandler : AbstractJobHandler
{ {
public override async Task<ReturnT> Execute(JobExecuteContext context) public override async Task<ReturnT> Execute(JobExecuteContext context)
{ {
context.JobLogger.Log("receive demo job handler,parameter:{0}",context.JobParameter); context.JobLogger.Log("receive demo job handler,parameter:{0}", context.JobParameter);
context.JobLogger.Log("开始休眠10秒"); context.JobLogger.Log("开始休眠10秒");
await Task.Delay(10 * 1000); await Task.Delay(10 * 1000);
context.JobLogger.Log("休眠10秒结束"); context.JobLogger.Log("休眠10秒结束");
return ReturnT.SUCCESS; return ReturnT.SUCCESS;

@ -16,17 +16,17 @@ namespace ASPNetCoreExecutor
} }
private IConfiguration Configuration { get; } private IConfiguration Configuration { get; }
// This method gets called by the runtime. Use this method to add services to the container. // This method gets called by the runtime. Use this method to add services to the container.
// For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940 // For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
public void ConfigureServices(IServiceCollection services) public void ConfigureServices(IServiceCollection services)
{ {
services.AddXxlJobExecutor(Configuration); services.AddXxlJobExecutor(Configuration);
services.AddDefaultXxlJobHandlers();// add httpHandler; services.AddDefaultXxlJobHandlers();// add httpHandler;
services.AddSingleton<IJobHandler, DemoJobHandler>(); // 添加自定义的jobHandler services.AddJobHandler<DemoJobHandler>(); // 添加自定义的jobHandler
services.AddAutoRegistry(); // 自动注册 services.AddAutoRegistry(); // 自动注册

@ -1,49 +1,46 @@
using System; using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection; using System.Reflection;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
namespace DotXxlJob.Core namespace DotXxlJob.Core
{ {
public class DefaultJobHandlerFactory:IJobHandlerFactory public class DefaultJobHandlerFactory : IJobHandlerFactory
{ {
private readonly IServiceProvider _provider; private readonly JobHandlerCache _handlerCache;
private readonly Dictionary<string, IJobHandler> handlersCache = new Dictionary<string, IJobHandler>();
public DefaultJobHandlerFactory(IServiceProvider provider) public DefaultJobHandlerFactory(IServiceProvider provider, JobHandlerCache handlerCache = null)
{ {
this._provider = provider; _handlerCache = handlerCache ?? new JobHandlerCache();
Initialize();
Initialize(provider);
} }
private void Initialize() private void Initialize(IServiceProvider provider)
{ {
var list = this._provider.GetServices<IJobHandler>(); foreach (var handler in provider.GetServices<IJobHandler>())
if (list == null || !list.Any())
{ {
throw new TypeLoadException("IJobHandlers are not found in IServiceCollection"); _handlerCache.AddJobHandler(handler);
} }
foreach (var handler in list) if (_handlerCache.HandlersCache.Count < 1)
{ {
var jobHandlerAttr = handler.GetType().GetCustomAttribute<JobHandlerAttribute>(); throw new TypeLoadException("IJobHandlers are not found in IServiceCollection");
var handlerName = jobHandlerAttr == null ? handler.GetType().Name : jobHandlerAttr.Name;
if (handlersCache.ContainsKey(handlerName))
{
throw new Exception($"same IJobHandler' name: [{handlerName}]");
}
handlersCache.Add(handlerName,handler);
} }
} }
public IJobHandler GetJobHandler(string handlerName) public IJobHandler GetJobHandler(IServiceScopeFactory scopeFactory, string handlerName, out IServiceScope serviceScope)
{ {
if (handlersCache.ContainsKey(handlerName)) serviceScope = null;
{
return handlersCache[handlerName]; var jobHandler = _handlerCache.Get(handlerName);
}
return null; if (jobHandler == null) return null;
if (jobHandler.JobHandler != null) return jobHandler.JobHandler;
serviceScope = scopeFactory.CreateScope();
return (IJobHandler)ActivatorUtilities.CreateInstance(serviceScope.ServiceProvider, jobHandler.JobHandlerType, jobHandler.JobHandlerConstructorParameters);
} }
} }
} }

@ -1,4 +1,5 @@
using System; using System;
using System.Linq;
using DotXxlJob.Core.Config; using DotXxlJob.Core.Config;
using DotXxlJob.Core.DefaultHandlers; using DotXxlJob.Core.DefaultHandlers;
using DotXxlJob.Core.Queue; using DotXxlJob.Core.Queue;
@ -11,43 +12,37 @@ namespace DotXxlJob.Core
{ {
public static class ServiceCollectionExtensions public static class ServiceCollectionExtensions
{ {
public static IServiceCollection AddXxlJobExecutor(this IServiceCollection services,IConfiguration configuration) public static IServiceCollection AddXxlJobExecutor(this IServiceCollection services, IConfiguration configuration) =>
{ services.AddXxlJobExecutor(configuration.GetSection("xxlJob").Bind);
services.AddLogging();
services.AddOptions(); public static IServiceCollection AddXxlJobExecutor(this IServiceCollection services, Action<XxlJobExecutorOptions> configAction)
services.Configure<XxlJobExecutorOptions>(configuration.GetSection("xxlJob"))
.AddXxlJobExecutorServiceDependency();
return services;
}
public static IServiceCollection AddXxlJobExecutor(this IServiceCollection services,Action<XxlJobExecutorOptions> configAction)
{ {
services.AddLogging(); services.AddLogging();
services.AddOptions(); services.AddOptions();
services.Configure(configAction).AddXxlJobExecutorServiceDependency(); services.Configure(configAction).AddXxlJobExecutorServiceDependency();
return services; return services;
} }
public static IServiceCollection AddDefaultXxlJobHandlers(this IServiceCollection services) public static IServiceCollection AddDefaultXxlJobHandlers(this IServiceCollection services)
{ {
services.AddSingleton<IJobHandler,SimpleHttpJobHandler>(); services.AddSingleton<IJobHandler, SimpleHttpJobHandler>();
return services; return services;
} }
public static IServiceCollection AddAutoRegistry(this IServiceCollection services) public static IServiceCollection AddAutoRegistry(this IServiceCollection services)
{ {
services.AddSingleton<IExecutorRegistry,ExecutorRegistry>() services.AddSingleton<IExecutorRegistry, ExecutorRegistry>()
.AddSingleton<IHostedService,JobsExecuteHostedService>(); .AddSingleton<IHostedService, JobsExecuteHostedService>();
return services; return services;
} }
private static IServiceCollection AddXxlJobExecutorServiceDependency(this IServiceCollection services) private static IServiceCollection AddXxlJobExecutorServiceDependency(this IServiceCollection services)
{ {
//可在外部提前注册对应实现,并替换默认实现 //可在外部提前注册对应实现,并替换默认实现
services.TryAddSingleton<IJobLogger, JobLogger>(); services.TryAddSingleton<IJobLogger, JobLogger>();
services.TryAddSingleton<IJobHandlerFactory,DefaultJobHandlerFactory >(); services.TryAddSingleton<IJobHandlerFactory, DefaultJobHandlerFactory>();
services.TryAddSingleton<IExecutorRegistry, ExecutorRegistry>(); services.TryAddSingleton<IExecutorRegistry, ExecutorRegistry>();
services.AddHttpClient("DotXxlJobClient"); services.AddHttpClient("DotXxlJobClient");
services.AddSingleton<JobDispatcher>(); services.AddSingleton<JobDispatcher>();
services.AddSingleton<TaskExecutorFactory>(); services.AddSingleton<TaskExecutorFactory>();
@ -55,9 +50,48 @@ namespace DotXxlJob.Core
services.AddSingleton<CallbackTaskQueue>(); services.AddSingleton<CallbackTaskQueue>();
services.AddSingleton<AdminClient>(); services.AddSingleton<AdminClient>();
services.AddSingleton<ITaskExecutor, TaskExecutors.BeanTaskExecutor>(); services.AddSingleton<ITaskExecutor, TaskExecutors.BeanTaskExecutor>();
services.AddSingleton(new JobHandlerCache());
return services;
}
/// <summary>允许创建Scoped实例</summary>
/// <typeparam name="TJob"></typeparam>
/// <param name="services"></param>
/// <param name="constructorParameters">用于创建实例的额外参数,比如字符串</param>
/// <returns></returns>
public static IServiceCollection AddJobHandler<TJob>(this IServiceCollection services,
params object[] constructorParameters) where TJob : IJobHandler
{
services.GetJobHandlerCache().AddJobHandler<TJob>(constructorParameters);
return services;
}
/// <summary>允许创建Scoped实例</summary>
/// <typeparam name="TJob"></typeparam>
/// <param name="services"></param>
/// <param name="handlerName"></param>
/// <param name="constructorParameters">用于创建实例的额外参数,比如字符串</param>
/// <returns></returns>
public static IServiceCollection AddJobHandler<TJob>(this IServiceCollection services,
string handlerName, params object[] constructorParameters) where TJob : IJobHandler
{
services.GetJobHandlerCache().AddJobHandler<TJob>(handlerName, constructorParameters);
return services; return services;
} }
private static JobHandlerCache GetJobHandlerCache(this IServiceCollection services)
{
var sd = services.FirstOrDefault(x => x.ImplementationInstance is JobHandlerCache);
if (sd != null) return (JobHandlerCache)sd.ImplementationInstance;
var cache = new JobHandlerCache();
services.AddSingleton(cache);
return cache;
}
} }
} }

@ -1,7 +1,10 @@
using System;
using Microsoft.Extensions.DependencyInjection;
namespace DotXxlJob.Core namespace DotXxlJob.Core
{ {
public interface IJobHandlerFactory public interface IJobHandlerFactory
{ {
IJobHandler GetJobHandler(string handlerName); IJobHandler GetJobHandler(IServiceScopeFactory scopeFactory, string handlerName, out IServiceScope serviceScope);
} }
} }

@ -0,0 +1,62 @@
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Runtime.InteropServices;
using DotXxlJob.Core.DefaultHandlers;
namespace DotXxlJob.Core
{
public class JobHandlerCache
{
internal Dictionary<string, JobHandlerItem> HandlersCache { get; } = new Dictionary<string, JobHandlerItem>();
public void AddJobHandler<TJob>(params object[] constructorParameters)
where TJob : IJobHandler =>
AddJobHandler<TJob>(typeof(TJob).GetCustomAttribute<JobHandlerAttribute>()?.Name ??
typeof(TJob).Name, constructorParameters);
public void AddJobHandler<TJob>(string handlerName, params object[] constructorParameters)
where TJob : IJobHandler =>
AddJobHandler(handlerName, new JobHandlerItem {
JobHandlerType = typeof(TJob),
JobHandlerConstructorParameters = constructorParameters,
});
public void AddJobHandler(IJobHandler jobHandler)
{
var jobHandlerType = jobHandler.GetType();
var handlerName = jobHandlerType.GetCustomAttribute<JobHandlerAttribute>()?.Name ?? jobHandlerType.Name;
AddJobHandler(handlerName, jobHandler);
}
public void AddJobHandler(string handlerName, IJobHandler jobHandler)
{
AddJobHandler(handlerName, new JobHandlerItem { JobHandler = jobHandler });
}
private void AddJobHandler(string handlerName, JobHandlerItem jobHandler)
{
if (HandlersCache.ContainsKey(handlerName))
{
throw new ArgumentException($"Same IJobHandler' name: [{handlerName}]", nameof(handlerName));
}
HandlersCache.Add(handlerName, jobHandler);
}
public JobHandlerItem Get(string handlerName) =>
HandlersCache.TryGetValue(handlerName, out var item) ? item : null;
public class JobHandlerItem
{
public IJobHandler JobHandler { get; set; }
public Type JobHandlerType { get; set; }
public object[] JobHandlerConstructorParameters { get; set; }
}
}
}

@ -1,31 +1,45 @@
using System.Threading.Tasks; using System.Threading.Tasks;
using DotXxlJob.Core.Model; using DotXxlJob.Core.Model;
using Microsoft.Extensions.DependencyInjection;
namespace DotXxlJob.Core.TaskExecutors namespace DotXxlJob.Core.TaskExecutors
{ {
/// <summary> /// <summary>
/// 实现 IJobHandler的执行器 /// 实现 IJobHandler的执行器
/// </summary> /// </summary>
public class BeanTaskExecutor:ITaskExecutor public class BeanTaskExecutor : ITaskExecutor
{ {
private readonly IJobHandlerFactory _handlerFactory; private readonly IJobHandlerFactory _handlerFactory;
private readonly IJobLogger _jobLogger; private readonly IJobLogger _jobLogger;
private readonly IServiceScopeFactory _scopeFactory;
public BeanTaskExecutor(IJobHandlerFactory handlerFactory,IJobLogger jobLogger) public BeanTaskExecutor(IJobHandlerFactory handlerFactory, IJobLogger jobLogger, IServiceScopeFactory scopeFactory)
{ {
this._handlerFactory = handlerFactory; this._handlerFactory = handlerFactory;
this._jobLogger = jobLogger; this._jobLogger = jobLogger;
this._scopeFactory = scopeFactory;
} }
public string GlueType { get; } = Constants.GlueType.BEAN; public string GlueType { get; } = Constants.GlueType.BEAN;
public Task<ReturnT> Execute(TriggerParam triggerParam) public async Task<ReturnT> Execute(TriggerParam triggerParam)
{ {
var handler = _handlerFactory.GetJobHandler(triggerParam.ExecutorHandler); var handler = _handlerFactory.GetJobHandler(_scopeFactory, triggerParam.ExecutorHandler, out var scope);
if (scope == null) return await Execute(handler, triggerParam);
using (scope)
using (handler)
{
return await Execute(handler, triggerParam);
}
}
private Task<ReturnT> Execute(IJobHandler handler, TriggerParam triggerParam)
{
if (handler == null) if (handler == null)
{ {
return Task.FromResult(ReturnT.Failed($"job handler [{triggerParam.ExecutorHandler} not found.")); return Task.FromResult(ReturnT.Failed($"job handler [{triggerParam.ExecutorHandler} not found."));
} }
var context = new JobExecuteContext(this._jobLogger, triggerParam.ExecutorParams); var context = new JobExecuteContext(this._jobLogger, triggerParam.ExecutorParams);
return handler.Execute(context); return handler.Execute(context);

@ -0,0 +1,59 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using DotXxlJob.Core.DefaultHandlers;
using DotXxlJob.Core.Model;
using Microsoft.Extensions.DependencyInjection;
using Xunit;
namespace DotXxlJob.Core.Tests
{
public class BeanTaskExecutorTest
{
[Fact]
public async Task Repeated_Job_Handler()
{
var services = new ServiceCollection();
services.AddScoped<ScopedService>();
services.AddXxlJobExecutor(options => options.AdminAddresses = "http://localhost");
var list = new List<object>();
services.AddJobHandler<TestJobHandler>("test", list);
using (var provider = services.BuildServiceProvider(true))
{
await provider.GetRequiredService<ITaskExecutor>()
.Execute(new TriggerParam {
ExecutorHandler = "test"
});
}
Assert.Single(list);
}
private class TestJobHandler : IJobHandler
{
private readonly List<object> _list;
public TestJobHandler(List<object> list, ScopedService _) => _list = list;
public void Dispose()
{
}
public Task<ReturnT> Execute(JobExecuteContext context)
{
_list.Add(new object());
return Task.FromResult(ReturnT.SUCCESS);
}
}
private class ScopedService
{
}
}
}

@ -0,0 +1,42 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using DotXxlJob.Core.Model;
using Microsoft.Extensions.DependencyInjection;
using Xunit;
namespace DotXxlJob.Core.Tests
{
public class DefaultJobHandlerFactory
{
[Fact]
public async Task Repeated_Job_Handler()
{
var services = new ServiceCollection();
services.AddXxlJobExecutor(options => options.AdminAddresses = "http://localhost");
services.AddDefaultXxlJobHandlers();
services.AddJobHandler<TestJobHandler>();
using (var provider = services.BuildServiceProvider())
{
Assert.Throws<ArgumentException>(() => provider.GetRequiredService<IJobHandlerFactory>());
}
}
[JobHandler("simpleHttpJobHandler")]
private class TestJobHandler : IJobHandler
{
public void Dispose()
{
}
public Task<ReturnT> Execute(JobExecuteContext context)
{
throw new NotImplementedException();
}
}
}
}

@ -1,19 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup> <PropertyGroup>
<TargetFramework>netcoreapp2.2</TargetFramework> <TargetFramework>netcoreapp3.1</TargetFramework>
<IsPackable>false</IsPackable> <IsPackable>false</IsPackable>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.9.0" /> <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.11.0" />
<PackageReference Include="xunit" Version="2.4.0" /> <PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.0" /> <PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
</ItemGroup> <PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\src\DotXxlJob.Core\DotXxlJob.Core.csproj" /> <ProjectReference Include="..\..\src\DotXxlJob.Core\DotXxlJob.Core.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>

@ -0,0 +1,31 @@
using System;
using System.Threading.Tasks;
using DotXxlJob.Core.DefaultHandlers;
using DotXxlJob.Core.Model;
using Xunit;
namespace DotXxlJob.Core.Tests
{
public class JobHandlerCacheTest
{
[Fact]
public void Repeated_Job_Handler()
{
var cache = new JobHandlerCache();
cache.AddJobHandler<SimpleHttpJobHandler>();
Assert.Throws<ArgumentException>(() => cache.AddJobHandler("simpleHttpJobHandler", new TestJobHandler()));
}
private class TestJobHandler : IJobHandler
{
public void Dispose() { }
public Task<ReturnT> Execute(JobExecuteContext context)
{
throw new NotImplementedException();
}
}
}
}

@ -1,13 +0,0 @@
using System;
using Xunit;
namespace DotXxlJob.Core.Tests
{
public class UnitTest1
{
[Fact]
public void Test1()
{
}
}
}
Loading…
Cancel
Save