♻️ refactor initial

develop
Xuanye Wong 11 months ago
parent 727232a743
commit 091cc9ca70
  1. 8
      .cursorignore
  2. 70
      .cursorrules
  3. 223
      .editorconfig
  4. 4
      .gitattributes
  5. 128
      .gitignore
  6. 75
      DotXxlJob.sln
  7. 13
      samples/ASPNetCoreExecutor/.config/dotnet-tools.json
  8. 22
      samples/ASPNetCoreExecutor/ASPNetCoreExecutor.csproj
  9. 32
      samples/ASPNetCoreExecutor/Controllers/WeatherForecastController.cs
  10. 22
      samples/ASPNetCoreExecutor/DemoJobHandler.cs
  11. 12
      samples/ASPNetCoreExecutor/Extensions/ApplicationBuilderExtensions.cs
  12. 42
      samples/ASPNetCoreExecutor/Extensions/XxlJobExecutorMiddleware.cs
  13. 50
      samples/ASPNetCoreExecutor/Program.cs
  14. 23
      samples/ASPNetCoreExecutor/Properties/PublishProfiles/FolderProfile.pubxml
  15. 25
      samples/ASPNetCoreExecutor/Properties/launchSettings.json
  16. 49
      samples/ASPNetCoreExecutor/Startup.cs
  17. 13
      samples/ASPNetCoreExecutor/WeatherForecast.cs
  18. 8
      samples/ASPNetCoreExecutor/appsettings.Development.json
  19. 13
      samples/ASPNetCoreExecutor/appsettings.json
  20. 109
      src/DotXxlJob.Core/AdminClient.cs
  21. 20
      src/DotXxlJob.Core/Attributes/JobHandlerAttribute.cs
  22. 65
      src/DotXxlJob.Core/Config/XxlJobExecutorOptions.cs
  23. 16
      src/DotXxlJob.Core/DefaultHandlers/AbsJobHandler.cs
  24. 66
      src/DotXxlJob.Core/DefaultHandlers/SimpleHttpJobHandler.cs
  25. 49
      src/DotXxlJob.Core/DefaultJobHandlerFactory.cs
  26. 9
      src/DotXxlJob.Core/DependencyExtensions.cs
  27. 32
      src/DotXxlJob.Core/DotXxlJob.Core.csproj
  28. 76
      src/DotXxlJob.Core/ExecutorRegistry.cs
  29. 55
      src/DotXxlJob.Core/Extensions/DateTimeExtension.cs
  30. 63
      src/DotXxlJob.Core/Extensions/ServiceCollectionExtensions.cs
  31. 25
      src/DotXxlJob.Core/Hosted/JobExecuteHostedService.cs
  32. 13
      src/DotXxlJob.Core/IExecutorRegistry.cs
  33. 26
      src/DotXxlJob.Core/IJobHandler.cs
  34. 7
      src/DotXxlJob.Core/IJobHandlerFactory.cs
  35. 60
      src/DotXxlJob.Core/Internal/Constants.cs
  36. 135
      src/DotXxlJob.Core/Internal/IPUtility.cs
  37. 87
      src/DotXxlJob.Core/Internal/Preconditions.cs
  38. 158
      src/DotXxlJob.Core/JobDispatcher.cs
  39. 64
      src/DotXxlJob.Core/Json/ProjectDefaultResolver.cs
  40. 23
      src/DotXxlJob.Core/Logger/IJobLogger.cs
  41. 194
      src/DotXxlJob.Core/Logger/JobLogger.cs
  42. 39
      src/DotXxlJob.Core/Model/AddressEntity.cs
  43. 58
      src/DotXxlJob.Core/Model/HandleCallbackParam.cs
  44. 15
      src/DotXxlJob.Core/Model/IdleBeatRequest.cs
  45. 11
      src/DotXxlJob.Core/Model/JavaClass.cs
  46. 17
      src/DotXxlJob.Core/Model/JobExecuteContext.cs
  47. 16
      src/DotXxlJob.Core/Model/KillRequest.cs
  48. 21
      src/DotXxlJob.Core/Model/LogRequest.cs
  49. 26
      src/DotXxlJob.Core/Model/LogResult.cs
  50. 19
      src/DotXxlJob.Core/Model/RegistryParam.cs
  51. 47
      src/DotXxlJob.Core/Model/ReturnT.cs
  52. 50
      src/DotXxlJob.Core/Model/RpcRequest.cs
  53. 18
      src/DotXxlJob.Core/Model/RpcResponse.cs
  54. 45
      src/DotXxlJob.Core/Model/TriggerParam.cs
  55. 118
      src/DotXxlJob.Core/Queue/CallbackTaskQueue.cs
  56. 174
      src/DotXxlJob.Core/Queue/JobTaskQueue.cs
  57. 136
      src/DotXxlJob.Core/Queue/RetryCallbackTaskQueue.cs
  58. 40
      src/DotXxlJob.Core/TaskExecutorFactory.cs
  59. 35
      src/DotXxlJob.Core/TaskExecutors/BeanTaskExecutor.cs
  60. 13
      src/DotXxlJob.Core/TaskExecutors/ITaskExecutor.cs
  61. 166
      src/DotXxlJob.Core/XxlRestfulServiceHandler.cs
  62. 19
      tests/DotXxlJob.Core.Tests/DotXxlJob.Core.Tests.csproj
  63. 13
      tests/DotXxlJob.Core.Tests/UnitTest1.cs

@ -0,0 +1,8 @@
# Ignore all files in the `dist` directory
dist/
# Ignore all `.log` files
*.log
# Ignore specific file `config.json`
config.json

@ -0,0 +1,70 @@
# .NET Development Rules
You are a senior .NET backend developer and an expert in C#, ASP.NET Core, and Entity Framework Core.
## Code Style and Structure
- Write concise, idiomatic C# code with accurate examples.
- Follow .NET and ASP.NET Core conventions and best practices.
- Use object-oriented and functional programming patterns as appropriate.
- Prefer LINQ and lambda expressions for collection operations.
- Use descriptive variable and method names (e.g., 'IsUserSignedIn', 'CalculateTotal').
- Structure files according to .NET conventions (Controllers, Models, Services, etc.).
## Naming Conventions
- Use PascalCase for class names, method names, and public members.
- Use camelCase for local variables and private fields.
- Use UPPERCASE for constants.
- Prefix interface names with "I" (e.g., 'IUserService').
## C# and .NET Usage
- Use C# 10+ features when appropriate (e.g., record types, pattern matching, null-coalescing assignment).
- Leverage built-in ASP.NET Core features and middleware.
- Use Entity Framework Core effectively for database operations.
## Syntax and Formatting
- Follow the C# Coding Conventions (https://docs.microsoft.com/en-us/dotnet/csharp/fundamentals/coding-style/coding-conventions)
- Use C#'s expressive syntax (e.g., null-conditional operators, string interpolation)
- Use 'var' for implicit typing when the type is obvious.
## Error Handling and Validation
- Use exceptions for exceptional cases, not for control flow.
- Implement proper error logging using built-in .NET logging or a third-party logger.
- Use Data Annotations or Fluent Validation for model validation.
- Implement global exception handling middleware.
- Return appropriate HTTP status codes and consistent error responses.
## API Design
- Follow RESTful API design principles.
- Use attribute routing in controllers.
- Implement versioning for your API.
- Use action filters for cross-cutting concerns.
## Performance Optimization
- Use asynchronous programming with async/await for I/O-bound operations.
- Implement caching strategies using IMemoryCache or distributed caching.
- Use efficient LINQ queries and avoid N+1 query problems.
- Implement pagination for large data sets.
## Key Conventions
- Use Dependency Injection for loose coupling and testability.
- Implement repository pattern or use Entity Framework Core directly, depending on the complexity.
- Use AutoMapper for object-to-object mapping if needed.
- Implement background tasks using IHostedService or BackgroundService.
## Testing
- Write unit tests using xUnit, NUnit, or MSTest.
- Use Moq or NSubstitute for mocking dependencies.
- Implement integration tests for API endpoints.
## Security
- Use Authentication and Authorization middleware.
- Implement JWT authentication for stateless API authentication.
- Use HTTPS and enforce SSL.
- Implement proper CORS policies.
## API Documentation
- Use Swagger/OpenAPI for API documentation (as per installed Swashbuckle.AspNetCore package).
- Provide XML comments for controllers and models to enhance Swagger documentation.
Follow the official Microsoft documentation and ASP.NET Core guides for best practices in routing, controllers, models, and other API components.

@ -1,37 +1,230 @@
# Rules in this file were initially inferred by Visual Studio IntelliCode from the NetEscapades.AspNetCore.SecurityHeaders codebase based on best match to current usage at 16/11/2018
# Rules in this file were initially inferred by Visual Studio IntelliCode from the C:\workspaces\opensource\uuac codebase based on best match to current usage at 1/18/2022
# There already existed an .editorconfig file in this directory. Copy rules from this .editorconfig.inferred file to the existing .editorconfig file as desired to have them take effect at this location.
# You can modify the rules from these initially generated values to suit your own policies
# You can learn more about editorconfig here: https://docs.microsoft.com/en-us/visualstudio/ide/editorconfig-code-style-settings-reference
[*.cs]
file_header_template = Copyright (c) Xuanye Wong. All rights reserved.\nLicensed under MIT license
dotnet_diagnostic.IDE0073.severity = warning
#Core editorconfig formatting - indentation
#use soft tabs (spaces) for indentation
indent_style = space
indent_size = 4
#Formatting - new line options
#Formatting - indentation options
#require members of object intializers to be on separate lines
csharp_new_line_before_members_in_object_initializers = true
#require braces to be on a new line for lambdas, types, control_blocks, and methods (also known as "Allman" style)
csharp_new_line_before_open_brace = all
#indent switch case contents.
csharp_indent_case_contents = true
#indent switch labels
csharp_indent_switch_labels = true
#Formatting - organize using options
#Formatting - new line options
#do not place System.* using directives before other using directives
dotnet_sort_system_directives_first = false
#require braces to be on a new line for types, object_collection, methods, control_blocks, and lambdas (also known as "Allman" style)
csharp_new_line_before_open_brace = types, object_collection, methods, control_blocks, lambdas
#Formatting - spacing options
#Formatting - organize using options
#require NO space between a cast and the value
csharp_space_after_cast = false
#require a space before the colon for bases or interfaces in a type declaration
csharp_space_after_colon_in_inheritance_clause = true
#require a space after a keyword in a control flow statement such as a for loop
csharp_space_after_keywords_in_control_flow_statements = true
#require a space before the colon for bases or interfaces in a type declaration
csharp_space_before_colon_in_inheritance_clause = true
#remove space within empty argument list parentheses
csharp_space_between_method_call_empty_parameter_list_parentheses = false
#remove space between method call name and opening parenthesis
csharp_space_between_method_call_name_and_opening_parenthesis = false
#do not place space characters after the opening parenthesis and before the closing parenthesis of a method call
csharp_space_between_method_call_parameter_list_parentheses = false
#place a space character after the opening parenthesis and before the closing parenthesis of a method declaration parameter list.
csharp_space_between_method_declaration_parameter_list_parentheses = false
#Formatting - wrapping options
#leave code block on separate lines
csharp_preserve_single_line_blocks = true
#Style - Code block preferences
#prefer curly braces even for one line of code
csharp_prefer_braces = when_multiline:suggestion
#Style - expression bodied member options
#prefer block bodies for accessors
csharp_style_expression_bodied_accessors = false:suggestion
#prefer block bodies for constructors
csharp_style_expression_bodied_constructors = false:suggestion
#prefer block bodies for methods
csharp_style_expression_bodied_methods = false:suggestion
#prefer block bodies for properties
csharp_style_expression_bodied_properties = true:suggestion
#Style - Expression-level preferences
#prefer objects to be initialized using object initializers when possible
dotnet_style_object_initializer = true:suggestion
#Style - implicit and explicit types
#prefer var over explicit type in all cases, unless overridden by another code style rule
csharp_style_var_elsewhere = true:suggestion
#prefer var when the type is already mentioned on the right-hand side of a declaration expression
csharp_style_var_when_type_is_apparent = true:suggestion
#Style - language keyword and framework type options
#prefer the language keyword for local variables, method parameters, and class members, instead of the type name, for types that have a keyword to represent them
dotnet_style_predefined_type_for_locals_parameters_members = true:suggestion
#Style - modifier options
#prefer accessibility modifiers to be declared except for public interface members. This will currently not differ from always and will act as future proofing for if C# adds default interface methods.
dotnet_style_require_accessibility_modifiers = for_non_interface_members:suggestion
#sort System.* using directives alphabetically, and place them before other usings
dotnet_sort_system_directives_first = true
#Style - Modifier preferences
...
#when this rule is set to a list of modifiers, prefer the specified ordering.
csharp_preferred_modifier_order = public,private,static,readonly:suggestion
#Style - qualification options
#prefer fields not to be prefaced with this. or Me. in Visual Basic
dotnet_style_qualification_for_field = false:suggestion
#prefer methods not to be prefaced with this. or Me. in Visual Basic
dotnet_style_qualification_for_method = false:suggestion
#prefer properties not to be prefaced with this. or Me. in Visual Basic
dotnet_style_qualification_for_property = false:suggestion
csharp_indent_labels = one_less_than_current
csharp_using_directive_placement = outside_namespace:silent
csharp_prefer_simple_using_statement = true:suggestion
csharp_style_namespace_declarations = block_scoped:silent
csharp_style_prefer_method_group_conversion = true:silent
csharp_style_prefer_top_level_statements = true:silent
csharp_style_expression_bodied_operators = false:silent
csharp_style_expression_bodied_indexers = true:silent
csharp_style_expression_bodied_lambdas = true:silent
csharp_style_expression_bodied_local_functions = false:silent
csharp_style_throw_expression = true:suggestion
csharp_style_prefer_null_check_over_type_check = true:suggestion
csharp_prefer_simple_default_expression = true:suggestion
csharp_style_prefer_local_over_anonymous_function = true:suggestion
csharp_style_prefer_index_operator = true:suggestion
csharp_style_prefer_range_operator = true:suggestion
csharp_style_implicit_object_creation_when_type_is_apparent = true:suggestion
csharp_style_prefer_tuple_swap = true:suggestion
csharp_style_prefer_utf8_string_literals = true:suggestion
csharp_style_inlined_variable_declaration = true:suggestion
csharp_style_unused_value_assignment_preference = discard_variable:suggestion
csharp_style_deconstructed_variable_declaration = true:suggestion
csharp_style_unused_value_expression_statement_preference = discard_variable:silent
csharp_prefer_static_local_function = true:suggestion
csharp_style_allow_embedded_statements_on_same_line_experimental = true:silent
csharp_style_allow_blank_lines_between_consecutive_braces_experimental = true:silent
csharp_style_allow_blank_line_after_colon_in_constructor_initializer_experimental = true:silent
csharp_style_conditional_delegate_call = true:suggestion
csharp_style_prefer_switch_expression = true:suggestion
csharp_style_prefer_pattern_matching = true:silent
csharp_style_pattern_matching_over_is_with_cast_check = true:suggestion
csharp_style_pattern_matching_over_as_with_null_check = true:suggestion
csharp_style_prefer_extended_property_pattern = true:suggestion
csharp_style_prefer_not_pattern = true:suggestion
csharp_style_var_for_built_in_types = false:silent
csharp_space_around_binary_operators = before_and_after
csharp_style_prefer_readonly_struct = true:suggestion
[*.{cs,vb}]
#### Naming styles ####
# Naming rules
dotnet_naming_rule.interface_should_be_begins_with_i.severity = suggestion
dotnet_naming_rule.interface_should_be_begins_with_i.symbols = interface
dotnet_naming_rule.interface_should_be_begins_with_i.style = begins_with_i
dotnet_naming_rule.types_should_be_pascal_case.severity = suggestion
dotnet_naming_rule.types_should_be_pascal_case.symbols = types
dotnet_naming_rule.types_should_be_pascal_case.style = pascal_case
dotnet_naming_rule.non_field_members_should_be_pascal_case.severity = suggestion
dotnet_naming_rule.non_field_members_should_be_pascal_case.symbols = non_field_members
dotnet_naming_rule.non_field_members_should_be_pascal_case.style = pascal_case
dotnet_naming_rule.private_or_internal_field_should_be__camelcasename.severity = suggestion
dotnet_naming_rule.private_or_internal_field_should_be__camelcasename.symbols = private_or_internal_field
dotnet_naming_rule.private_or_internal_field_should_be__camelcasename.style = _camelcasename
# Symbol specifications
dotnet_naming_symbols.interface.applicable_kinds = interface
dotnet_naming_symbols.interface.applicable_accessibilities = public, internal, private, protected, protected_internal, private_protected
dotnet_naming_symbols.interface.required_modifiers =
dotnet_naming_symbols.types.applicable_kinds = class, struct, interface, enum
dotnet_naming_symbols.types.applicable_accessibilities = public, internal, private, protected, protected_internal, private_protected
dotnet_naming_symbols.types.required_modifiers =
dotnet_naming_symbols.non_field_members.applicable_kinds = property, event, method
dotnet_naming_symbols.non_field_members.applicable_accessibilities = public, internal, private, protected, protected_internal, private_protected
dotnet_naming_symbols.non_field_members.required_modifiers =
dotnet_naming_symbols.private_or_internal_field.applicable_kinds = field
dotnet_naming_symbols.private_or_internal_field.applicable_accessibilities = internal, private, private_protected
dotnet_naming_symbols.private_or_internal_field.required_modifiers =
# Naming styles
dotnet_naming_style.begins_with_i.required_prefix = I
dotnet_naming_style.begins_with_i.required_suffix =
dotnet_naming_style.begins_with_i.word_separator =
dotnet_naming_style.begins_with_i.capitalization = pascal_case
dotnet_naming_style.pascal_case.required_prefix =
dotnet_naming_style.pascal_case.required_suffix =
dotnet_naming_style.pascal_case.word_separator =
dotnet_naming_style.pascal_case.capitalization = pascal_case
dotnet_naming_style.pascal_case.required_prefix =
dotnet_naming_style.pascal_case.required_suffix =
dotnet_naming_style.pascal_case.word_separator =
dotnet_naming_style.pascal_case.capitalization = pascal_case
dotnet_naming_style._camelcasename.required_prefix = _
dotnet_naming_style._camelcasename.required_suffix =
dotnet_naming_style._camelcasename.word_separator =
dotnet_naming_style._camelcasename.capitalization = camel_case
dotnet_style_operator_placement_when_wrapping = beginning_of_line
tab_width = 4
indent_size = 4
end_of_line = crlf
dotnet_style_coalesce_expression = true:suggestion
dotnet_style_null_propagation = true:suggestion
dotnet_style_prefer_is_null_check_over_reference_equality_method = true:suggestion
dotnet_style_prefer_auto_properties = true:silent
dotnet_style_object_initializer = true:suggestion
dotnet_style_collection_initializer = true:suggestion
dotnet_style_prefer_simplified_boolean_expressions = true:suggestion
dotnet_style_prefer_conditional_expression_over_assignment = true:silent
dotnet_style_prefer_conditional_expression_over_return = true:silent
dotnet_style_explicit_tuple_names = true:suggestion
dotnet_style_prefer_inferred_tuple_names = true:suggestion
dotnet_style_prefer_inferred_anonymous_type_member_names = true:suggestion
dotnet_style_prefer_compound_assignment = true:suggestion
dotnet_style_prefer_simplified_interpolation = true:suggestion
dotnet_style_namespace_match_folder = true:suggestion
dotnet_style_readonly_field = true:suggestion
dotnet_style_predefined_type_for_member_access = true:silent
dotnet_style_predefined_type_for_locals_parameters_members = true:suggestion
dotnet_style_require_accessibility_modifiers = for_non_interface_members:suggestion
dotnet_style_allow_multiple_blank_lines_experimental = true:silent
dotnet_style_allow_statement_immediately_after_block_experimental = true:silent
dotnet_code_quality_unused_parameters = all:suggestion
dotnet_style_parentheses_in_other_binary_operators = always_for_clarity:silent
dotnet_style_parentheses_in_arithmetic_binary_operators = always_for_clarity:silent
dotnet_style_parentheses_in_relational_binary_operators = always_for_clarity:silent
dotnet_style_parentheses_in_other_operators = never_if_unnecessary:silent
dotnet_style_qualification_for_field = false:suggestion
dotnet_style_qualification_for_method = false:silent
dotnet_style_qualification_for_property = false:suggestion
dotnet_style_qualification_for_event = false:silent

4
.gitattributes vendored

@ -8,11 +8,9 @@
*.PDF diff=astextplain
*.rtf diff=astextplain
*.RTF diff=astextplain
*.jpg binary
*.png binary
*.gif binary
*.cs text=auto diff=csharp
*.vb text=auto
*.resx text=auto
@ -42,8 +40,6 @@
*.fs text=auto
*.fsx text=auto
*.hs text=auto
*.txt eol=crlf
*.csproj text=auto
*.vbproj text=auto
*.fsproj text=auto

128
.gitignore vendored

@ -1,70 +1,70 @@
# Misc folders
[Bb]in/
[Oo]bj/
[Pp]ackages/
# Build related
tools/**
!tools/packages.config
## Ignore Visual Studio temporary files, build results, and
## files generated by popular Visual Studio add-ons.
# User-specific files
*.suo
[Bb]in/
TestResults/
.nuget/
_ReSharper.*/
packages/
artifacts/
PublishProfiles/
*.user
*.sln.docstates
*.sln.ide/
*.userprefs
*.GhostDoc.xml
# Build results
[Dd]ebug/
[Rr]elease/
x64/
*_i.c
*_p.c
*.ilk
*.meta
*.obj
*.pch
*.pdb
*.pgc
*.pgd
*.rsp
*.sbr
*.tlb
*.tli
*.tlh
*.tmp
*.log
*.vspscc
*.vssscc
.builds
# Visual Studio profiler
*.suo
*.cache
*.docstates
_ReSharper.*
nuget.exe
*net45.csproj
*net451.csproj
*k10.csproj
*.psess
*.vsp
*.vspx
# ReSharper is a .NET coding add-in
_ReSharper*
# NCrunch
*.ncrunch*
.*crunch*.local.xml
_NCrunch_*
artifacts/
# NuGet Packages Directory
packages
# Windows
Thumbs.db
# NUnit
TestResult.xml
*.pidb
*.userprefs
*DS_Store
*.ncrunchsolution
*.*sdf
*.ipch
*.sln.ide
*.sublime-workspace
node_modules/
node_modules1/
node_modules2/
.build/
logs/
typings/
project.lock.json
classes
data
temp
doc/site
dist
# Logs
logs
*.log
npm-debug.log*
# Runtime data
pids
*.pid
*.seed
# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov
# Coverage directory used by tools like istanbul
coverage
# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files)
.grunt
# node-waf configuration
.lock-wscript
# Compiled binary addons (http://nodejs.org/api/addons.html)
build/Release
# Dependency directory
node_modules
# Optional npm cache directory
.npm
# Optional REPL history
.node_repl_history
cache/
.vs/
*.lnk
*.Production.json
.idea/
doc/_site/
.vscode/
workspace.xml

@ -1,19 +1,17 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.30104.148
# Visual Studio Version 17
VisualStudioVersion = 17.12.35527.113
MinimumVisualStudioVersion = 15.0.26124.0
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{97756BA5-1E7C-4536-A49E-AE2190C0E6A5}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotXxlJob.Core", "src\DotXxlJob.Core\DotXxlJob.Core.csproj", "{FFFEEA78-CB09-4BFB-89B7-E9A46EC3ED65}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{352EC932-F112-4A2F-9DC3-F0761C85E068}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{E959F9B5-F3EB-48B1-B842-2CDDFDB01900}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ASPNetCoreExecutor", "samples\ASPNetCoreExecutor\ASPNetCoreExecutor.csproj", "{DC9E5AF3-18FF-4713-BDB4-672E47ADA4E5}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AspNetCoreExecutor", "samples\AspNetCoreExecutor\AspNetCoreExecutor.csproj", "{942A8837-BBAB-4DC6-8ABB-4E3B7AD3EB4E}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotXxlJob.Core.Tests", "tests\DotXxlJob.Core.Tests\DotXxlJob.Core.Tests.csproj", "{81C60471-7C1C-48CE-98C0-F252C267AC9F}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotXxlJob.Core", "src\DotXxlJob.Core\DotXxlJob.Core.csproj", "{4584B4D5-0DA9-425F-A4C7-7A19A75D3E73}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@ -25,50 +23,37 @@ Global
Release|x86 = Release|x86
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{FFFEEA78-CB09-4BFB-89B7-E9A46EC3ED65}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{FFFEEA78-CB09-4BFB-89B7-E9A46EC3ED65}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FFFEEA78-CB09-4BFB-89B7-E9A46EC3ED65}.Debug|x64.ActiveCfg = Debug|Any CPU
{FFFEEA78-CB09-4BFB-89B7-E9A46EC3ED65}.Debug|x64.Build.0 = Debug|Any CPU
{FFFEEA78-CB09-4BFB-89B7-E9A46EC3ED65}.Debug|x86.ActiveCfg = Debug|Any CPU
{FFFEEA78-CB09-4BFB-89B7-E9A46EC3ED65}.Debug|x86.Build.0 = Debug|Any CPU
{FFFEEA78-CB09-4BFB-89B7-E9A46EC3ED65}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FFFEEA78-CB09-4BFB-89B7-E9A46EC3ED65}.Release|Any CPU.Build.0 = Release|Any CPU
{FFFEEA78-CB09-4BFB-89B7-E9A46EC3ED65}.Release|x64.ActiveCfg = Release|Any CPU
{FFFEEA78-CB09-4BFB-89B7-E9A46EC3ED65}.Release|x64.Build.0 = Release|Any CPU
{FFFEEA78-CB09-4BFB-89B7-E9A46EC3ED65}.Release|x86.ActiveCfg = Release|Any CPU
{FFFEEA78-CB09-4BFB-89B7-E9A46EC3ED65}.Release|x86.Build.0 = Release|Any CPU
{DC9E5AF3-18FF-4713-BDB4-672E47ADA4E5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DC9E5AF3-18FF-4713-BDB4-672E47ADA4E5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DC9E5AF3-18FF-4713-BDB4-672E47ADA4E5}.Debug|x64.ActiveCfg = Debug|Any CPU
{DC9E5AF3-18FF-4713-BDB4-672E47ADA4E5}.Debug|x64.Build.0 = Debug|Any CPU
{DC9E5AF3-18FF-4713-BDB4-672E47ADA4E5}.Debug|x86.ActiveCfg = Debug|Any CPU
{DC9E5AF3-18FF-4713-BDB4-672E47ADA4E5}.Debug|x86.Build.0 = Debug|Any CPU
{DC9E5AF3-18FF-4713-BDB4-672E47ADA4E5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DC9E5AF3-18FF-4713-BDB4-672E47ADA4E5}.Release|Any CPU.Build.0 = Release|Any CPU
{DC9E5AF3-18FF-4713-BDB4-672E47ADA4E5}.Release|x64.ActiveCfg = Release|Any CPU
{DC9E5AF3-18FF-4713-BDB4-672E47ADA4E5}.Release|x64.Build.0 = Release|Any CPU
{DC9E5AF3-18FF-4713-BDB4-672E47ADA4E5}.Release|x86.ActiveCfg = Release|Any CPU
{DC9E5AF3-18FF-4713-BDB4-672E47ADA4E5}.Release|x86.Build.0 = Release|Any CPU
{81C60471-7C1C-48CE-98C0-F252C267AC9F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{81C60471-7C1C-48CE-98C0-F252C267AC9F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{81C60471-7C1C-48CE-98C0-F252C267AC9F}.Debug|x64.ActiveCfg = Debug|Any CPU
{81C60471-7C1C-48CE-98C0-F252C267AC9F}.Debug|x64.Build.0 = Debug|Any CPU
{81C60471-7C1C-48CE-98C0-F252C267AC9F}.Debug|x86.ActiveCfg = Debug|Any CPU
{81C60471-7C1C-48CE-98C0-F252C267AC9F}.Debug|x86.Build.0 = Debug|Any CPU
{81C60471-7C1C-48CE-98C0-F252C267AC9F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{81C60471-7C1C-48CE-98C0-F252C267AC9F}.Release|Any CPU.Build.0 = Release|Any CPU
{81C60471-7C1C-48CE-98C0-F252C267AC9F}.Release|x64.ActiveCfg = Release|Any CPU
{81C60471-7C1C-48CE-98C0-F252C267AC9F}.Release|x64.Build.0 = Release|Any CPU
{81C60471-7C1C-48CE-98C0-F252C267AC9F}.Release|x86.ActiveCfg = Release|Any CPU
{81C60471-7C1C-48CE-98C0-F252C267AC9F}.Release|x86.Build.0 = Release|Any CPU
{942A8837-BBAB-4DC6-8ABB-4E3B7AD3EB4E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{942A8837-BBAB-4DC6-8ABB-4E3B7AD3EB4E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{942A8837-BBAB-4DC6-8ABB-4E3B7AD3EB4E}.Debug|x64.ActiveCfg = Debug|Any CPU
{942A8837-BBAB-4DC6-8ABB-4E3B7AD3EB4E}.Debug|x64.Build.0 = Debug|Any CPU
{942A8837-BBAB-4DC6-8ABB-4E3B7AD3EB4E}.Debug|x86.ActiveCfg = Debug|Any CPU
{942A8837-BBAB-4DC6-8ABB-4E3B7AD3EB4E}.Debug|x86.Build.0 = Debug|Any CPU
{942A8837-BBAB-4DC6-8ABB-4E3B7AD3EB4E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{942A8837-BBAB-4DC6-8ABB-4E3B7AD3EB4E}.Release|Any CPU.Build.0 = Release|Any CPU
{942A8837-BBAB-4DC6-8ABB-4E3B7AD3EB4E}.Release|x64.ActiveCfg = Release|Any CPU
{942A8837-BBAB-4DC6-8ABB-4E3B7AD3EB4E}.Release|x64.Build.0 = Release|Any CPU
{942A8837-BBAB-4DC6-8ABB-4E3B7AD3EB4E}.Release|x86.ActiveCfg = Release|Any CPU
{942A8837-BBAB-4DC6-8ABB-4E3B7AD3EB4E}.Release|x86.Build.0 = Release|Any CPU
{4584B4D5-0DA9-425F-A4C7-7A19A75D3E73}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4584B4D5-0DA9-425F-A4C7-7A19A75D3E73}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4584B4D5-0DA9-425F-A4C7-7A19A75D3E73}.Debug|x64.ActiveCfg = Debug|Any CPU
{4584B4D5-0DA9-425F-A4C7-7A19A75D3E73}.Debug|x64.Build.0 = Debug|Any CPU
{4584B4D5-0DA9-425F-A4C7-7A19A75D3E73}.Debug|x86.ActiveCfg = Debug|Any CPU
{4584B4D5-0DA9-425F-A4C7-7A19A75D3E73}.Debug|x86.Build.0 = Debug|Any CPU
{4584B4D5-0DA9-425F-A4C7-7A19A75D3E73}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4584B4D5-0DA9-425F-A4C7-7A19A75D3E73}.Release|Any CPU.Build.0 = Release|Any CPU
{4584B4D5-0DA9-425F-A4C7-7A19A75D3E73}.Release|x64.ActiveCfg = Release|Any CPU
{4584B4D5-0DA9-425F-A4C7-7A19A75D3E73}.Release|x64.Build.0 = Release|Any CPU
{4584B4D5-0DA9-425F-A4C7-7A19A75D3E73}.Release|x86.ActiveCfg = Release|Any CPU
{4584B4D5-0DA9-425F-A4C7-7A19A75D3E73}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{FFFEEA78-CB09-4BFB-89B7-E9A46EC3ED65} = {97756BA5-1E7C-4536-A49E-AE2190C0E6A5}
{DC9E5AF3-18FF-4713-BDB4-672E47ADA4E5} = {E959F9B5-F3EB-48B1-B842-2CDDFDB01900}
{81C60471-7C1C-48CE-98C0-F252C267AC9F} = {352EC932-F112-4A2F-9DC3-F0761C85E068}
{942A8837-BBAB-4DC6-8ABB-4E3B7AD3EB4E} = {E959F9B5-F3EB-48B1-B842-2CDDFDB01900}
{4584B4D5-0DA9-425F-A4C7-7A19A75D3E73} = {97756BA5-1E7C-4536-A49E-AE2190C0E6A5}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {F4A8B63E-6284-4D00-9719-BAB1D955DACF}

@ -1,13 +0,0 @@
{
"version": 1,
"isRoot": true,
"tools": {
"dotnet-ef": {
"version": "8.0.7",
"commands": [
"dotnet-ef"
],
"rollForward": false
}
}
}

@ -1,27 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk.Web">
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Console">
<Version>2.2.0</Version>
</PackageReference>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotXxlJob.Core\DotXxlJob.Core.csproj" />
</ItemGroup>
<ItemGroup>
<_ContentIncludedByDefault Remove="Properties\launchSettings.json" />
</ItemGroup>
<ItemGroup>
<Content Update="appsettings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.5.0" />
</ItemGroup>
</Project>

@ -0,0 +1,32 @@
using Microsoft.AspNetCore.Mvc;
namespace AspNetCoreExecutor.Controllers
{
[ApiController]
[Route("[controller]")]
public class WeatherForecastController : ControllerBase
{
private static readonly string[] Summaries = new[]
{
"Freezing", "Bracing", "Chilly", "Cool", "Mild", "Warm", "Balmy", "Hot", "Sweltering", "Scorching"
};
private readonly ILogger<WeatherForecastController> _logger;
public WeatherForecastController(ILogger<WeatherForecastController> logger)
{
_logger = logger;
}
[HttpGet(Name = "GetWeatherForecast")]
public IEnumerable<WeatherForecast> Get()
{
return Enumerable.Range(1, 5).Select(index => new WeatherForecast {
Date = DateTime.Now.AddDays(index),
TemperatureC = Random.Shared.Next(-20, 55),
Summary = Summaries[Random.Shared.Next(Summaries.Length)]
})
.ToArray();
}
}
}

@ -1,22 +0,0 @@
using System.Threading.Tasks;
using DotXxlJob.Core;
using DotXxlJob.Core.Model;
namespace ASPNetCoreExecutor
{
/// <summary>
/// 示例Job,只是写个日志
/// </summary>
[JobHandler("demoJobHandler")]
public class DemoJobHandler:AbstractJobHandler
{
public override async Task<ReturnT> Execute(JobExecuteContext context)
{
context.JobLogger.Log("receive demo job handler,parameter:{0}",context.JobParameter);
context.JobLogger.Log("开始休眠10秒");
await Task.Delay(10 * 1000);
context.JobLogger.Log("休眠10秒结束");
return ReturnT.SUCCESS;
}
}
}

@ -1,12 +0,0 @@
using Microsoft.AspNetCore.Builder;
namespace ASPNetCoreExecutor
{
public static class ApplicationBuilderExtensions
{
public static IApplicationBuilder UseXxlJobExecutor(this IApplicationBuilder @this)
{
return @this.UseMiddleware<XxlJobExecutorMiddleware>();
}
}
}

@ -1,42 +0,0 @@
using System;
using System.IO;
using System.Net;
using System.Threading.Tasks;
using DotXxlJob.Core;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
namespace ASPNetCoreExecutor
{
public class XxlJobExecutorMiddleware
{
private readonly IServiceProvider _provider;
private readonly RequestDelegate _next;
private readonly XxlRestfulServiceHandler _rpcService;
public XxlJobExecutorMiddleware(IServiceProvider provider, RequestDelegate next)
{
this._provider = provider;
this._next = next;
this._rpcService = _provider.GetRequiredService<XxlRestfulServiceHandler>();
}
public async Task Invoke(HttpContext context)
{
string contentType = context.Request.ContentType;
if ("POST".Equals(context.Request.Method, StringComparison.OrdinalIgnoreCase)
&& !string.IsNullOrEmpty(contentType)
&& contentType.ToLower().StartsWith("application/json"))
{
await _rpcService.HandlerAsync(context.Request,context.Response);
return;
}
await _next.Invoke(context);
}
}
}

@ -1,29 +1,25 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
namespace ASPNetCoreExecutor
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
builder.Services.AddControllers();
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
var app = builder.Build();
// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
public class Program
{
public static void Main(string[] args)
{
CreateWebHostBuilder(args).Build().Run();
}
public static IWebHostBuilder CreateWebHostBuilder(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.ConfigureLogging((ctx, builder) =>
{
builder.AddConfiguration(ctx.Configuration);
builder.AddConsole();
})
.UseStartup<Startup>();
}
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.Run();

@ -1,23 +0,0 @@
<?xml version="1.0" encoding="utf-8"?>
<!--
https://go.microsoft.com/fwlink/?LinkID=208121.
-->
<Project>
<PropertyGroup>
<DeleteExistingFiles>true</DeleteExistingFiles>
<ExcludeApp_Data>false</ExcludeApp_Data>
<LaunchSiteAfterPublish>true</LaunchSiteAfterPublish>
<LastUsedBuildConfiguration>Release</LastUsedBuildConfiguration>
<LastUsedPlatform>Any CPU</LastUsedPlatform>
<PublishProvider>FileSystem</PublishProvider>
<PublishUrl>bin\Release\net6.0\publish\</PublishUrl>
<WebPublishMethod>FileSystem</WebPublishMethod>
<_TargetId>Folder</_TargetId>
<SiteUrlToLaunchAfterPublish />
<TargetFramework>net6.0</TargetFramework>
<RuntimeIdentifier>linux-x64</RuntimeIdentifier>
<PublishSingleFile>true</PublishSingleFile>
<ProjectGuid>dc9e5af3-18ff-4713-bdb4-672e47ada4e5</ProjectGuid>
<SelfContained>false</SelfContained>
</PropertyGroup>
</Project>

@ -1,12 +1,31 @@
{
{
"$schema": "https://json.schemastore.org/launchsettings.json",
"iisSettings": {
"windowsAuthentication": false,
"anonymousAuthentication": true,
"iisExpress": {
"applicationUrl": "http://localhost:54508",
"sslPort": 44308
}
},
"profiles": {
"ASPNetCoreExecutor": {
"AspNetCoreExecutor": {
"commandName": "Project",
"dotnetRunMessages": true,
"launchBrowser": true,
"launchUrl": "swagger",
"applicationUrl": "https://localhost:7292;http://localhost:5036",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
},
"applicationUrl": "http://localhost:6662/"
"IIS Express": {
"commandName": "IISExpress",
"launchBrowser": true,
"launchUrl": "swagger",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
}
}
}

@ -1,49 +0,0 @@
using DotXxlJob.Core;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.AspNetCore.Server.Kestrel.Core;
namespace ASPNetCoreExecutor
{
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
private IConfiguration Configuration { get; }
// This method gets called by the runtime. Use this method to add services to the container.
// For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
public void ConfigureServices(IServiceCollection services)
{
services.AddXxlJobExecutor(Configuration);
services.AddDefaultXxlJobHandlers();// add httpHandler;
services.AddSingleton<IJobHandler, DemoJobHandler>(); // 添加自定义的jobHandler
services.AddAutoRegistry(); // 自动注册
//services.Configure<KestrelServerOptions>(x => x.AllowSynchronousIO = true)
// .Configure<IISServerOptions>(x=> x.AllowSynchronousIO = true);
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app,IWebHostEnvironment env)
{
if (env.EnvironmentName !="Production")
{
app.UseDeveloperExceptionPage();
}
//启用XxlExecutor
app.UseXxlJobExecutor();
}
}
}

@ -0,0 +1,13 @@
namespace AspNetCoreExecutor
{
public class WeatherForecast
{
public DateTime Date { get; set; }
public int TemperatureC { get; set; }
public int TemperatureF => 32 + (int)(TemperatureC / 0.5556);
public string? Summary { get; set; }
}
}

@ -0,0 +1,8 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
}
}

@ -1,16 +1,9 @@
{
"Logging": {
"LogLevel": {
"Default": "Information"
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"xxlJob": {
"adminAddresses": "https://jobs.xuanye.wang/xxl-job-admin",
"appName": "xxl-job-executor-dotnet",
"specialBindAddress": "127.0.0.1",
"port": 6662,
"autoRegistry": true,
"accessToken": "",
"logRetentionDays": 30
}
"AllowedHosts": "*"
}

@ -1,109 +0,0 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading.Tasks;
using DotXxlJob.Core.Config;
using DotXxlJob.Core.Model;
using Flurl;
using Flurl.Http;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
namespace DotXxlJob.Core
{
public class AdminClient
{
private readonly XxlJobExecutorOptions _options;
private readonly ILogger<AdminClient> _logger;
private List<AddressEntry> _addresses;
private int _currentIndex;
private static readonly string MAPPING = "/api";
public AdminClient(IOptions<XxlJobExecutorOptions> optionsAccessor
, ILogger<AdminClient> logger)
{
Preconditions.CheckNotNull(optionsAccessor?.Value, "XxlJobExecutorOptions");
this._options = optionsAccessor?.Value;
this._logger = logger;
InitAddress();
}
private void InitAddress()
{
this._addresses = new List<AddressEntry>();
foreach (var item in this._options.AdminAddresses.Split(';'))
{
try
{
var entry = new AddressEntry { RequestUri = item+ MAPPING };
this._addresses.Add(entry);
}
catch (Exception ex)
{
this._logger.LogError(ex, "init admin address error.");
}
}
}
public Task<ReturnT> Callback(List<HandleCallbackParam> callbackParamList)
{
return InvokeRpcService("callback", callbackParamList);
}
public Task<ReturnT> Registry(RegistryParam registryParam)
{
return InvokeRpcService("registry", registryParam);
}
public Task<ReturnT> RegistryRemove(RegistryParam registryParam)
{
return InvokeRpcService("registryRemove", registryParam);
}
private async Task<ReturnT> InvokeRpcService(string methodName, object jsonObject)
{
var triedTimes = 0;
ReturnT ret = null;
while (triedTimes++ < this._addresses.Count)
{
var address = this._addresses[this._currentIndex];
this._currentIndex = (this._currentIndex + 1) % this._addresses.Count;
if (!address.CheckAccessible())
continue;
try
{
var json = await address.RequestUri.AppendPathSegment(methodName)
.WithHeader("XXL-JOB-ACCESS-TOKEN", this._options.AccessToken)
.PostJsonAsync(jsonObject)
.ReceiveString();
//.ReceiveJson<ReturnT>();
ret = JsonConvert.DeserializeObject<ReturnT>(json);
address.Reset();
}
catch (Exception ex)
{
this._logger.LogError(ex, "request admin error.{0}", ex.Message);
address.SetFail();
continue;
}
}
if(ret == null)
{
ret = ReturnT.Failed("call admin fail");
}
return ret;
}
}
}

@ -1,20 +0,0 @@
using System;
namespace DotXxlJob.Core
{
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false)]
public class JobHandlerAttribute:Attribute
{
public JobHandlerAttribute(string name)
{
Name = name;
}
public string Name { get; }
/// <summary>
/// set Ignore
/// </summary>
public bool Ignore { get; set; }
}
}

@ -1,65 +0,0 @@
using System;
using System.IO;
namespace DotXxlJob.Core.Config
{
public class XxlJobExecutorOptions
{
/// <summary>
/// 管理端地址,多个以;分隔
/// </summary>
public string AdminAddresses { get; set; }
/// <summary>
/// appName自动注册时要去管理端配置一致
/// </summary>
public string AppName { get; set; } = "xxl-job-executor-dotnet";
/// <summary>
/// 绑定的特殊的URL,如果该项配置存在,则忽略SpecialBindAddress和Port
/// </summary>
public string SpecialBindUrl { get; set; }
/// <summary>
/// 自动注册时提交的地址,为空会自动获取内网地址
/// </summary>
public string SpecialBindAddress { get; set; }
/// <summary>
/// 绑定端口
/// </summary>
public int Port { get; set; }
/// <summary>
/// 是否自动注册
/// </summary>
public bool AutoRegistry { get; set; }
/// <summary>
/// 认证票据
/// </summary>
public string AccessToken { get; set; }
/// <summary>
/// 日志目录,默认为执行目录的logs子目录下,请配置绝对路径
/// </summary>
public string LogPath { get; set; } = Path.Combine(AppContext.BaseDirectory, "./logs");
/// <summary>
/// 日志保留天数
/// </summary>
public int LogRetentionDays { get; set; } = 30;
public int CallBackInterval { get; set; } = 500; //回调时间间隔 500毫秒
}
}

@ -1,16 +0,0 @@
using System.Threading.Tasks;
using DotXxlJob.Core.Model;
namespace DotXxlJob.Core.DefaultHandlers
{
public abstract class AbsJobHandler:IJobHandler
{
public virtual void Dispose()
{
}
public abstract Task<ReturnT> Execute(JobExecuteContext context);
}
}

@ -1,66 +0,0 @@
using System;
using System.Net;
using System.Net.Http;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using DotXxlJob.Core.Model;
namespace DotXxlJob.Core.DefaultHandlers
{
[JobHandler("simpleHttpJobHandler")]
public class SimpleHttpJobHandler:AbsJobHandler
{
private readonly IHttpClientFactory _httpClientFactory;
private static readonly Regex UrlPattern =
new Regex(@"(https?|ftp|file)://[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]");
public SimpleHttpJobHandler(IHttpClientFactory httpClientFactory)
{
this._httpClientFactory = httpClientFactory;
}
public override async Task<ReturnT> Execute(JobExecuteContext context)
{
if (string.IsNullOrEmpty(context.JobParameter))
{
return ReturnT.Failed("url is empty");
}
string url = context.JobParameter;
if (!UrlPattern.IsMatch(url))
{
return ReturnT.Failed("url format is not valid");
}
context.JobLogger.Log("Get Request Data:{0}",context.JobParameter);
using (var client = this._httpClientFactory.CreateClient(Constants.DefaultHttpClientName))
{
try
{
var response = await client.GetAsync(url);
if (response == null)
{
context.JobLogger.Log("call remote error,response is null");
return ReturnT.Failed("call remote error,response is null");
}
if (response.StatusCode != HttpStatusCode.OK)
{
context.JobLogger.Log("call remote error,response statusCode ={0}",response.StatusCode);
return ReturnT.Failed("call remote error,response statusCode ="+response.StatusCode);
}
string body = await response.Content.ReadAsStringAsync();
context.JobLogger.Log("<br/> call remote success ,response is : <br/> {0}",body);
return ReturnT.SUCCESS;
}
catch (Exception ex)
{
context.JobLogger.LogError(ex);
return ReturnT.Failed(ex.Message);
}
}
}
}
}

@ -1,49 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using Microsoft.Extensions.DependencyInjection;
namespace DotXxlJob.Core
{
public class DefaultJobHandlerFactory:IJobHandlerFactory
{
private readonly IServiceProvider _provider;
private readonly Dictionary<string, IJobHandler> handlersCache = new Dictionary<string, IJobHandler>();
public DefaultJobHandlerFactory(IServiceProvider provider)
{
this._provider = provider;
Initialize();
}
private void Initialize()
{
var list = this._provider.GetServices<IJobHandler>();
if (list == null || !list.Any())
{
throw new TypeLoadException("IJobHandlers are not found in IServiceCollection");
}
foreach (var handler in list)
{
var jobHandlerAttr = handler.GetType().GetCustomAttribute<JobHandlerAttribute>();
var handlerName = jobHandlerAttr == null ? handler.GetType().Name : jobHandlerAttr.Name;
if (handlersCache.ContainsKey(handlerName))
{
throw new Exception($"same IJobHandler' name: [{handlerName}]");
}
handlersCache.Add(handlerName,handler);
}
}
public IJobHandler GetJobHandler(string handlerName)
{
if (handlersCache.ContainsKey(handlerName))
{
return handlersCache[handlerName];
}
return null;
}
}
}

@ -0,0 +1,9 @@
// Copyright (c) Xuanye Wong. All rights reserved.
// Licensed under MIT license
namespace DotXxlJob.Core
{
public static class DependencyExtensions
{
}
}

@ -1,34 +1,8 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="../../build/version.props" />
<Import Project="../../build/releasenotes.props" />
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<DefineConstants>$(DefineConstants);DOTNETCORE</DefineConstants>
<Description>XxlJobExecutor DotNet port</Description>
<Copyright>Xuanye @ 2019</Copyright>
<Authors>Xuanye</Authors>
<AssemblyTitle>XxlJobExecutor DotNet port</AssemblyTitle>
<AssemblyName>DotXxlJob.Core</AssemblyName>
<PackageId>DotXxlJob.Core</PackageId>
<Version>$(DotXxlJobPackageVersion)</Version>
<PackageTags>Hession,xxl-job,DotXxlJob</PackageTags>
<PackageReleaseNotes>
$(DotXxlJobPackageNotes)
</PackageReleaseNotes>
<PackageProjectUrl>https://github.com/xuanye/DotXxlJob</PackageProjectUrl>
<PackageLicense>https://github.com/xuanye/DotXxlJob/blob/master/LICENSE</PackageLicense>
<PackageRequireLicenseAcceptance>false</PackageRequireLicenseAcceptance>
<RepositoryType>git</RepositoryType>
<RepositoryUrl>https://github.com/xuanye/DotXxlJob</RepositoryUrl>
<TargetFramework>netstandard2.1</TargetFramework>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Flurl.Http" Version="2.4.2" />
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Http" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="2.2.0" />
<PackageReference Include="Utf8Json" Version="1.3.7" />
</ItemGroup>
</Project>

@ -1,76 +0,0 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using DotXxlJob.Core.Config;
using DotXxlJob.Core.Model;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotXxlJob.Core
{
/// <summary>
/// 执行器注册注册
/// </summary>
public class ExecutorRegistry : IExecutorRegistry
{
private readonly AdminClient _adminClient;
private readonly XxlJobExecutorOptions _options;
private readonly ILogger<ExecutorRegistry> _logger;
public ExecutorRegistry(AdminClient adminClient, IOptions<XxlJobExecutorOptions> optionsAccessor, ILogger<ExecutorRegistry> logger)
{
Preconditions.CheckNotNull(optionsAccessor, "XxlJobExecutorOptions");
Preconditions.CheckNotNull(optionsAccessor.Value, "XxlJobExecutorOptions");
_adminClient = adminClient;
_options = optionsAccessor.Value;
if (string.IsNullOrEmpty(_options.SpecialBindAddress))
{
_options.SpecialBindAddress = IPUtility.GetLocalIntranetIP().MapToIPv4().ToString();
}
_logger = logger;
}
public async Task RegistryAsync(CancellationToken cancellationToken)
{
var registryParam = new RegistryParam {
RegistryGroup = "EXECUTOR",
RegistryKey = _options.AppName,
RegistryValue = string.IsNullOrEmpty(_options.SpecialBindUrl)?
$"http://{_options.SpecialBindAddress}:{_options.Port}/" : _options.SpecialBindUrl
};
_logger.LogInformation(">>>>>>>> start registry to admin <<<<<<<<");
var errorTimes = 0;
while (!cancellationToken.IsCancellationRequested)
{
try
{
var ret = await _adminClient.Registry(registryParam);
_logger.LogDebug("registry last result:{0}", ret?.Code);
errorTimes = 0;
await Task.Delay(Constants.RegistryInterval, cancellationToken);
}
catch (TaskCanceledException)
{
_logger.LogInformation(">>>>> Application Stopping....<<<<<");
}
catch (Exception ex)
{
errorTimes++;
await Task.Delay(Constants.RegistryInterval, cancellationToken);
_logger.LogError(ex, "registry error:{0},{1} Times", ex.Message, errorTimes);
}
}
_logger.LogInformation(">>>>>>>> end registry to admin <<<<<<<<");
_logger.LogInformation(">>>>>>>> start remove registry to admin <<<<<<<<");
var removeRet = await this._adminClient.RegistryRemove(registryParam);
_logger.LogInformation("remove registry last result:{0}", removeRet?.Code);
_logger.LogInformation(">>>>>>>> end remove registry to admin <<<<<<<<");
}
}
}

@ -1,55 +0,0 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace DotXxlJob.Core.Extensions
{
public static class DateTimeExtension
{
private const long Era = 62135596800000L;
private const long Millis = 60000;
/// <summary>
///
/// </summary>
/// <param name="dt"></param>
/// <returns></returns>
public static long GetTotalMilliseconds(this DateTime dt)
{
return dt.ToUniversalTime().Ticks / 10000 - Era;
}
/// <summary>
///
/// </summary>
/// <param name="dt"></param>
/// <returns></returns>
public static int GetTotalMinutes(this DateTime dt)
{
var val = GetTotalMilliseconds(dt);
return (int)(val / Millis);
}
/// <summary>
///
/// </summary>
/// <param name="value"></param>
/// <returns></returns>
public static DateTime FromMinutes(this int value)
{
var ticks = (value * Millis + Era) * 10000;
return new DateTime(ticks, DateTimeKind.Utc);
}
/// <summary>
///
/// </summary>
/// <param name="value"></param>
/// <returns></returns>
public static DateTime FromMilliseconds(this long value)
{
var ticks = (value + Era) * 10000;
return new DateTime(ticks, DateTimeKind.Utc);
}
}
}

@ -1,63 +0,0 @@
using System;
using DotXxlJob.Core.Config;
using DotXxlJob.Core.DefaultHandlers;
using DotXxlJob.Core.Queue;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
namespace DotXxlJob.Core
{
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddXxlJobExecutor(this IServiceCollection services,IConfiguration configuration)
{
services.AddLogging();
services.AddOptions();
services.Configure<XxlJobExecutorOptions>(configuration.GetSection("xxlJob"))
.AddXxlJobExecutorServiceDependency();
return services;
}
public static IServiceCollection AddXxlJobExecutor(this IServiceCollection services,Action<XxlJobExecutorOptions> configAction)
{
services.AddLogging();
services.AddOptions();
services.Configure(configAction).AddXxlJobExecutorServiceDependency();
return services;
}
public static IServiceCollection AddDefaultXxlJobHandlers(this IServiceCollection services)
{
services.AddSingleton<IJobHandler,SimpleHttpJobHandler>();
return services;
}
public static IServiceCollection AddAutoRegistry(this IServiceCollection services)
{
services.AddSingleton<IExecutorRegistry,ExecutorRegistry>()
.AddSingleton<IHostedService,JobsExecuteHostedService>();
return services;
}
private static IServiceCollection AddXxlJobExecutorServiceDependency(this IServiceCollection services)
{
//可在外部提前注册对应实现,并替换默认实现
services.TryAddSingleton<IJobLogger, JobLogger>();
services.TryAddSingleton<IJobHandlerFactory,DefaultJobHandlerFactory >();
services.TryAddSingleton<IExecutorRegistry, ExecutorRegistry>();
services.AddHttpClient("DotXxlJobClient");
services.AddSingleton<JobDispatcher>();
services.AddSingleton<TaskExecutorFactory>();
services.AddSingleton<XxlRestfulServiceHandler>();
services.AddSingleton<CallbackTaskQueue>();
services.AddSingleton<AdminClient>();
services.AddSingleton<ITaskExecutor, TaskExecutors.BeanTaskExecutor>();
return services;
}
}
}

@ -1,25 +0,0 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
namespace DotXxlJob.Core
{
/// <summary>
/// NOTE: 负责启动Executor服务,和进行服务注册的宿主服务
/// </summary>
public class JobsExecuteHostedService:BackgroundService
{
private readonly IExecutorRegistry _registry;
public JobsExecuteHostedService(IExecutorRegistry registry)
{
this._registry = registry;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
return this._registry.RegistryAsync(stoppingToken);
}
}
}

@ -1,13 +0,0 @@
using System.Threading;
using System.Threading.Tasks;
namespace DotXxlJob.Core
{
public interface IExecutorRegistry
{
Task RegistryAsync(CancellationToken cancellationToken);
}
}

@ -1,26 +0,0 @@
using System;
using System.Threading.Tasks;
using DotXxlJob.Core.Model;
namespace DotXxlJob.Core
{
public abstract class AbstractJobHandler:IJobHandler
{
/// <summary>
///
/// </summary>
/// <param name="param"></param>
/// <returns></returns>
public abstract Task<ReturnT> Execute(JobExecuteContext context);
public virtual void Dispose()
{
}
}
public interface IJobHandler:IDisposable
{
Task<ReturnT> Execute(JobExecuteContext context);
}
}

@ -1,7 +0,0 @@
namespace DotXxlJob.Core
{
public interface IJobHandlerFactory
{
IJobHandler GetJobHandler(string handlerName);
}
}

@ -1,60 +0,0 @@
using System;
namespace DotXxlJob.Core
{
internal static class Constants
{
public const string RpcRequestJavaFullName = "com.xxl.rpc.remoting.net.params.XxlRpcRequest";
public const string RpcResponseJavaFullName = "com.xxl.rpc.remoting.net.params.XxlRpcResponse";
public const string RegistryParamJavaFullName = "com.xxl.job.core.biz.model.RegistryParam";
public const string ReturnTJavaFullName = "com.xxl.job.core.biz.model.ReturnT";
public const string TriggerParamJavaFullName = "com.xxl.job.core.biz.model.TriggerParam";
public const string HandleCallbackParamJavaFullName = "com.xxl.job.core.biz.model.HandleCallbackParam";
public const string LogResultJavaFullName = "com.xxl.job.core.biz.model.LogResult";
public const string JavaClassFulName = "java.lang.Class";
public const string JavaListFulName = "java.util.List";
public const string XxlJobRetryLogsFile = "xxl-job-retry.log";
public const string HandleLogsDirectory = "HandlerLogs";
public const string DefaultHttpClientName = "DotXxlJobHttpClient";
public const int DefaultLogRetentionDays = 30;
public static TimeSpan RpcRequestExpireTimeSpan = TimeSpan.FromMinutes(3);
public static TimeSpan RegistryInterval = TimeSpan.FromSeconds(60);
public const int MaxCallbackRetryTimes = 10;
//每次回调最多发送几条记录
public const int MaxCallbackRecordsPerRequest =5;
public static TimeSpan CallbackRetryInterval = TimeSpan.FromSeconds(600);
//Admin集群机器请求默认超时时间
//public static TimeSpan AdminServerDefaultTimeout = TimeSpan.FromSeconds(15);
//Admin集群中的某台机器熔断后间隔多长时间再重试
public static TimeSpan AdminServerReconnectInterval = TimeSpan.FromMinutes(3);
//Admin集群中的某台机器请求失败多少次后熔断
public const int AdminServerCircuitFailedTimes = 3;
public static class GlueType
{
public const string BEAN = "BEAN";
}
public static class ExecutorBlockStrategy
{
public const string SERIAL_EXECUTION = "SERIAL_EXECUTION";
public const string DISCARD_LATER = "DISCARD_LATER";
public const string COVER_EARLY = "COVER_EARLY";
}
}
}

@ -1,135 +0,0 @@
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.NetworkInformation;
using System.Net.Sockets;
namespace DotXxlJob.Core
{
/// <summary>
/// ip utility
/// </summary>
internal static class IPUtility
{
#region Private Members
/// <summary>
/// A类: 10.0.0.0-10.255.255.255
/// </summary>
private static long ipABegin, ipAEnd;
/// <summary>
/// B类: 172.16.0.0-172.31.255.255
/// </summary>
private static long ipBBegin, ipBEnd;
/// <summary>
/// C类: 192.168.0.0-192.168.255.255
/// </summary>
private static long ipCBegin, ipCEnd;
#endregion
#region Constructors
/// <summary>
/// static new
/// </summary>
static IPUtility()
{
ipABegin = ConvertToNumber("10.0.0.0");
ipAEnd = ConvertToNumber("10.255.255.255");
ipBBegin = ConvertToNumber("172.16.0.0");
ipBEnd = ConvertToNumber("172.31.255.255");
ipCBegin = ConvertToNumber("192.168.0.0");
ipCEnd = ConvertToNumber("192.168.255.255");
}
#endregion
#region Public Methods
/// <summary>
/// ip address convert to long
/// </summary>
/// <param name="ipAddress"></param>
/// <returns></returns>
private static long ConvertToNumber(string ipAddress)
{
return ConvertToNumber(IPAddress.Parse(ipAddress));
}
/// <summary>
/// ip address convert to long
/// </summary>
/// <param name="ipAddress"></param>
/// <returns></returns>
private static long ConvertToNumber(IPAddress ipAddress)
{
var bytes = ipAddress.GetAddressBytes();
return bytes[0] * 256 * 256 * 256 + bytes[1] * 256 * 256 + bytes[2] * 256 + bytes[3];
}
/// <summary>
/// true表示为内网IP
/// </summary>
/// <param name="ipAddress"></param>
/// <returns></returns>
public static bool IsIntranet(string ipAddress)
{
return IsIntranet(ConvertToNumber(ipAddress));
}
/// <summary>
/// true表示为内网IP
/// </summary>
/// <param name="ipAddress"></param>
/// <returns></returns>
private static bool IsIntranet(IPAddress ipAddress)
{
return IsIntranet(ConvertToNumber(ipAddress));
}
/// <summary>
/// true表示为内网IP
/// </summary>
/// <param name="longIP"></param>
/// <returns></returns>
private static bool IsIntranet(long longIP)
{
return ((longIP >= ipABegin) && (longIP <= ipAEnd) ||
(longIP >= ipBBegin) && (longIP <= ipBEnd) ||
(longIP >= ipCBegin) && (longIP <= ipCEnd));
}
/// <summary>
/// 获取本机内网IP
/// </summary>
/// <returns></returns>
public static IPAddress GetLocalIntranetIP()
{
return NetworkInterface
.GetAllNetworkInterfaces()
.Select(p => p.GetIPProperties())
.SelectMany(p =>
p.UnicastAddresses
).FirstOrDefault(p => p.Address.AddressFamily == AddressFamily.InterNetwork
&& !IPAddress.IsLoopback(p.Address)
&& IsIntranet(p.Address))?.Address;
}
/// <summary>
/// 获取本机内网IP列表
/// </summary>
/// <returns></returns>
public static List<IPAddress> GetLocalIntranetIPList()
{
var infList =NetworkInterface.GetAllNetworkInterfaces()
.Select(p => p.GetIPProperties())
.SelectMany(p => p.UnicastAddresses)
.Where(p =>
p.Address.AddressFamily == AddressFamily.InterNetwork
&& !IPAddress.IsLoopback(p.Address)
&& IsIntranet(p.Address)
);
var result = new List<IPAddress>();
foreach (var child in infList)
{
result.Add(child.Address);
}
return result;
}
#endregion
}
}

@ -1,87 +0,0 @@
using System;
namespace DotXxlJob.Core
{
/// <summary>
/// Utility methods to simplify checking preconditions in the code.
/// </summary>
internal static class Preconditions
{
/// <summary>
/// Throws <see cref="ArgumentException"/> if condition is false.
/// </summary>
/// <param name="condition">The condition.</param>
public static void CheckArgument(bool condition)
{
if (!condition)
{
throw new ArgumentException();
}
}
/// <summary>
/// Throws <see cref="ArgumentException"/> with given message if condition is false.
/// </summary>
/// <param name="condition">The condition.</param>
/// <param name="errorMessage">The error message.</param>
public static void CheckArgument(bool condition, string errorMessage)
{
if (!condition)
{
throw new ArgumentException(errorMessage);
}
}
/// <summary>
/// Throws <see cref="ArgumentNullException"/> if reference is null.
/// </summary>
/// <param name="reference">The reference.</param>
public static T CheckNotNull<T>(T reference)
{
if (reference == null)
{
throw new ArgumentNullException();
}
return reference;
}
/// <summary>
/// Throws <see cref="ArgumentNullException"/> if reference is null.
/// </summary>
/// <param name="reference">The reference.</param>
/// <param name="paramName">The parameter name.</param>
public static T CheckNotNull<T>(T reference, string paramName)
{
if (reference == null)
{
throw new ArgumentNullException(paramName);
}
return reference;
}
/// <summary>
/// Throws <see cref="InvalidOperationException"/> if condition is false.
/// </summary>
/// <param name="condition">The condition.</param>
public static void CheckState(bool condition)
{
if (!condition)
{
throw new InvalidOperationException();
}
}
/// <summary>
/// Throws <see cref="InvalidOperationException"/> with given message if condition is false.
/// </summary>
/// <param name="condition">The condition.</param>
/// <param name="errorMessage">The error message.</param>
public static void CheckState(bool condition, string errorMessage)
{
if (!condition)
{
throw new InvalidOperationException(errorMessage);
}
}
}
}

@ -1,158 +0,0 @@
using System;
using System.Collections.Concurrent;
using DotXxlJob.Core.Model;
using DotXxlJob.Core.Queue;
using Microsoft.Extensions.Logging;
namespace DotXxlJob.Core
{
/// <summary>
/// 负责实际的JOB轮询
/// </summary>
public class JobDispatcher
{
private readonly TaskExecutorFactory _executorFactory;
private readonly CallbackTaskQueue _callbackTaskQueue;
private readonly IJobLogger _jobLogger;
private readonly ConcurrentDictionary<int,JobTaskQueue> RUNNING_QUEUE = new ConcurrentDictionary<int, JobTaskQueue>();
private readonly ILogger<JobTaskQueue> _jobQueueLogger;
private readonly ILogger<JobDispatcher> _logger;
public JobDispatcher(
TaskExecutorFactory executorFactory,
CallbackTaskQueue callbackTaskQueue,
IJobLogger jobLogger,
ILoggerFactory loggerFactory
)
{
this. _executorFactory = executorFactory;
this. _callbackTaskQueue = callbackTaskQueue;
this._jobLogger = jobLogger;
this._jobQueueLogger = loggerFactory.CreateLogger<JobTaskQueue>();
this._logger = loggerFactory.CreateLogger<JobDispatcher>();
}
/// <summary>
/// 尝试移除JobTask
/// </summary>
/// <param name="jobId"></param>
/// <param name="reason"></param>
/// <returns></returns>
public bool TryRemoveJobTask(int jobId)
{
if (RUNNING_QUEUE.TryGetValue(jobId, out var jobQueue))
{
jobQueue.Stop();
return true;
}
return false;
}
/// <summary>
/// 执行队列,并快速返回结果
/// </summary>
/// <param name="triggerParam"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
public ReturnT Execute(TriggerParam triggerParam)
{
var executor = this._executorFactory.GetTaskExecutor(triggerParam.GlueType);
if (executor == null)
{
return ReturnT.Failed($"glueType[{triggerParam.GlueType}] is not supported ");
}
// 1. 根据JobId 获取 TaskQueue; 用于判断是否有正在执行的任务
if (RUNNING_QUEUE.TryGetValue(triggerParam.JobId, out var taskQueue))
{
if (taskQueue.Executor != executor) //任务执行器变更
{
return ChangeJobQueue(triggerParam, executor);
}
}
if (taskQueue != null) //旧任务还在执行,判断执行策略
{
//丢弃后续的
if (Constants.ExecutorBlockStrategy.DISCARD_LATER == triggerParam.ExecutorBlockStrategy)
{
//存在还没执行完成的任务
if (taskQueue.IsRunning())
{
return ReturnT.Failed($"block strategy effect:{triggerParam.ExecutorBlockStrategy}");
}
//否则还是继续做
}
//覆盖较早的
if (Constants.ExecutorBlockStrategy.COVER_EARLY == triggerParam.ExecutorBlockStrategy)
{
return taskQueue.Replace(triggerParam);
}
}
return PushJobQueue(triggerParam, executor);
}
/// <summary>
/// IdleBeat
/// </summary>
/// <param name="jobId"></param>
/// <returns></returns>
public ReturnT IdleBeat(int jobId)
{
return RUNNING_QUEUE.ContainsKey(jobId) ?
new ReturnT(ReturnT.FAIL_CODE, "job thread is running or has trigger queue.")
: ReturnT.SUCCESS;
}
private void TriggerCallback(object sender, HandleCallbackParam callbackParam)
{
this._callbackTaskQueue.Push(callbackParam);
}
private ReturnT PushJobQueue(TriggerParam triggerParam, ITaskExecutor executor)
{
if (RUNNING_QUEUE.TryGetValue(triggerParam.JobId,out var jobQueue))
{
return jobQueue.Push(triggerParam);
}
//NewJobId
jobQueue = new JobTaskQueue ( executor,this._jobLogger, this._jobQueueLogger);
jobQueue.CallBack += TriggerCallback;
if (RUNNING_QUEUE.TryAdd(triggerParam.JobId, jobQueue))
{
return jobQueue.Push(triggerParam);
}
return ReturnT.Failed("add running queue executor error");
}
private ReturnT ChangeJobQueue(TriggerParam triggerParam, ITaskExecutor executor)
{
if (RUNNING_QUEUE.TryRemove(triggerParam.JobId, out var oldJobTask))
{
oldJobTask.CallBack -= TriggerCallback;
oldJobTask.Dispose(); //释放原来的资源
}
JobTaskQueue jobQueue = new JobTaskQueue ( executor,this._jobLogger, this._jobQueueLogger);
jobQueue.CallBack += TriggerCallback;
if (RUNNING_QUEUE.TryAdd(triggerParam.JobId, jobQueue))
{
return jobQueue.Push(triggerParam);
}
return ReturnT.Failed(" replace running queue executor error");
}
}
}

@ -1,64 +0,0 @@
using System.Reflection;
using Utf8Json;
using Utf8Json.Formatters;
using Utf8Json.Resolvers;
namespace DotXxlJob.Core.Json
{
public class ProjectDefaultResolver : IJsonFormatterResolver
{
public static IJsonFormatterResolver Instance = new ProjectDefaultResolver();
// configure your resolver and formatters.
static readonly IJsonFormatter[] formatters = {
new DateTimeFormatter("yyyy-MM-dd HH:mm:ss"),
new NullableDateTimeFormatter("yyyy-MM-dd HH:mm:ss")
};
static readonly IJsonFormatterResolver[] resolvers = new[]
{
EnumResolver.UnderlyingValue,
StandardResolver.AllowPrivateExcludeNullSnakeCase
};
ProjectDefaultResolver()
{
}
public IJsonFormatter<T> GetFormatter<T>()
{
return FormatterCache<T>.formatter;
}
static class FormatterCache<T>
{
public static readonly IJsonFormatter<T> formatter;
static FormatterCache()
{
foreach (var item in formatters)
{
foreach (var implInterface in item.GetType().GetTypeInfo().ImplementedInterfaces)
{
var ti = implInterface.GetTypeInfo();
if (ti.IsGenericType && ti.GenericTypeArguments[0] == typeof(T))
{
formatter = (IJsonFormatter<T>)item;
return;
}
}
}
foreach (var item in resolvers)
{
var f = item.GetFormatter<T>();
if (f != null)
{
formatter = f;
return;
}
}
}
}
}
}

@ -1,23 +0,0 @@
using System;
using DotXxlJob.Core.Model;
namespace DotXxlJob.Core
{
public interface IJobLogger
{
void SetLogFile(long logTime, long logId);
void Log(string pattern, params object[] format);
void LogError(Exception ex);
LogResult ReadLog(long logTime, long logId, int fromLine);
void LogSpecialFile(long logTime, long logId, string pattern, params object[] format);
}
}

@ -1,194 +0,0 @@
using System;
using System.Diagnostics;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DotXxlJob.Core.Config;
using DotXxlJob.Core.Extensions;
using DotXxlJob.Core.Model;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotXxlJob.Core
{
public class JobLogger:IJobLogger
{
private readonly ILogger<JobLogger> _logger;
private readonly AsyncLocal<string> LogFileName = new AsyncLocal<string>();
private readonly XxlJobExecutorOptions _options;
public JobLogger(IOptions<XxlJobExecutorOptions> optionsAccessor,ILogger<JobLogger> logger)
{
this._logger = logger;
this._options = optionsAccessor.Value;
}
public void SetLogFile(long logTime, long logId)
{
try
{
var filePath = MakeLogFileName(logTime, logId);
var dir = Path.GetDirectoryName(filePath);
if (!Directory.Exists(dir))
{
Directory.CreateDirectory(dir);
CleanOldLogs();
}
LogFileName.Value = filePath;
}
catch (Exception ex)
{
_logger.LogError(ex, "SetLogFileName error.");
}
}
public void Log(string pattern, params object[] format)
{
string appendLog;
if (format == null || format.Length == 0)
{
appendLog = pattern;
}
else
{
appendLog = string.Format(pattern, format);
}
var callInfo = new StackTrace(true).GetFrame(1);
LogDetail(GetLogFileName(), callInfo, appendLog);
}
public void LogError(Exception ex)
{
var callInfo = new StackTrace(true).GetFrame(1);
LogDetail(GetLogFileName(), callInfo, ex.Message + ex.StackTrace);
}
public LogResult ReadLog(long logTime, long logId, int fromLine)
{
var filePath = MakeLogFileName(logTime, logId);
if (string.IsNullOrEmpty(filePath))
{
return new LogResult(fromLine, 0, "readLog fail, logFile not found", true);
}
if (!File.Exists(filePath))
{
return new LogResult(fromLine, 0, "readLog fail, logFile not exists", true);
}
// read file
var logContentBuffer = new StringBuilder();
int toLineNum = 0;
try
{
using (var reader = new StreamReader(filePath, Encoding.UTF8))
{
string line;
while ((line = reader.ReadLine()) != null)
{
toLineNum++;
if (toLineNum >= fromLine)
{
logContentBuffer.AppendLine(line);
}
}
}
}
catch (Exception ex)
{
this._logger.LogError(ex, "ReadLog error.");
}
// result
var logResult = new LogResult(fromLine, toLineNum, logContentBuffer.ToString(), false);
return logResult;
}
public void LogSpecialFile(long logTime, long logId, string pattern, params object[] format)
{
var filePath = MakeLogFileName(logTime, logId);
var callInfo = new StackTrace(true).GetFrame(1);
var content = string.Format(pattern, format);
LogDetail(filePath, callInfo, content);
}
private string GetLogFileName()
{
return LogFileName.Value;
}
private string MakeLogFileName(long logDateTime, long logId)
{
//log fileName like: logPath/HandlerLogs/yyyy-MM-dd/9999.log
return Path.Combine(this._options.LogPath, Constants.HandleLogsDirectory,
logDateTime.FromMilliseconds().ToString("yyyy-MM-dd"), $"{logId}.log");
}
private void LogDetail(string logFileName, StackFrame callInfo, string appendLog)
{
if (string.IsNullOrEmpty(logFileName))
{
return;
}
var stringBuffer = new StringBuilder();
stringBuffer
.Append(DateTime.Now.ToString("s")).Append(" ")
.Append("[" + callInfo.GetMethod().DeclaringType.FullName + "#" + callInfo.GetMethod().Name + "]").Append("-")
.Append("[line " + callInfo.GetFileLineNumber() + "]").Append("-")
.Append("[thread " + Thread.CurrentThread.ManagedThreadId + "]").Append(" ")
.Append(appendLog ?? string.Empty)
.AppendLine();
var formatAppendLog = stringBuffer.ToString();
try
{
File.AppendAllText(logFileName, formatAppendLog, Encoding.UTF8);
}
catch (Exception ex)
{
this._logger.LogError(ex, "LogDetail error");
}
}
private void CleanOldLogs()
{
if (this._options.LogRetentionDays <= 0)
{
this._options.LogRetentionDays = Constants.DefaultLogRetentionDays;
}
Task.Run(() =>
{
try
{
var handlerLogsDir = new DirectoryInfo(Path.Combine(this._options.LogPath, Constants.HandleLogsDirectory));
if (!handlerLogsDir.Exists)
{
return;
}
var today = DateTime.UtcNow.Date;
foreach (var dir in handlerLogsDir.GetDirectories())
{
if (DateTime.TryParse(dir.Name, out var dirDate))
{
if (today.Subtract(dirDate.Date).Days > this._options.LogRetentionDays)
{
dir.Delete(true);
}
}
}
}
catch (Exception ex)
{
this._logger.LogError(ex, "CleanOldLogs error.");
}
});
}
}
}

@ -1,39 +0,0 @@
using System;
namespace DotXxlJob.Core.Model
{
public class AddressEntry
{
public string RequestUri { get; set; }
private DateTime? LastFailedTime { get; set; }
private int FailedTimes { get; set; }
public bool CheckAccessible()
{
if (LastFailedTime == null)
return true;
if (DateTime.UtcNow.Subtract(LastFailedTime.Value) > Constants.AdminServerReconnectInterval)
return true;
if (FailedTimes < Constants.AdminServerCircuitFailedTimes)
return true;
return false;
}
public void Reset()
{
LastFailedTime = null;
FailedTimes = 0;
}
public void SetFail()
{
LastFailedTime = DateTime.UtcNow;
FailedTimes++;
}
}
}

@ -1,58 +0,0 @@
using System.Runtime.Serialization;
namespace DotXxlJob.Core.Model
{
[DataContract(Name = Constants.HandleCallbackParamJavaFullName)]
public class HandleCallbackParam
{
public HandleCallbackParam()
{
}
public HandleCallbackParam(TriggerParam triggerParam, ReturnT result)
{
this.LogId = triggerParam.LogId;
this.LogDateTime = triggerParam.LogDateTime;
this.ExecuteResult = result;
}
public int CallbackRetryTimes { get; set; }
[DataMember(Name = "logId",Order = 1)]
public long LogId { get; set; }
[DataMember(Name = "logDateTim",Order = 2)]
public long LogDateTime { get; set; }
/// <summary>
/// 2.3.0以前版本
/// </summary>
[DataMember(Name = "executeResult",Order = 3)]
public ReturnT ExecuteResult { get; set; }
/// <summary>
/// 2.3.0版本使用的参数
/// </summary>
[DataMember(Name = "handleCode", Order = 4)]
public int HandleCode {
get {
if(this.ExecuteResult != null)
{
return this.ExecuteResult.Code;
}
return 500;
}
}
/// <summary>
/// 2.3.0版本使用的参数
/// </summary>
[DataMember(Name = "handleMsg", Order = 5)]
public string HandleMsg {
get {
return this.ExecuteResult?.Msg;
}
}
}
}

@ -1,15 +0,0 @@
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Text;
namespace DotXxlJob.Core.Model
{
[DataContract]
public class IdleBeatRequest
{
[DataMember(Name = "jobId", Order = 1)]
public int JobId { get; set; }
}
}

@ -1,11 +0,0 @@
using System.Runtime.Serialization;
namespace DotXxlJob.Core.Model
{
[DataContract(Name = Constants.JavaClassFulName)]
public class JavaClass
{
[DataMember(Name = "name",Order = 1)]
public string Name { get; set; }
}
}

@ -1,17 +0,0 @@
using System.Threading;
namespace DotXxlJob.Core.Model
{
public class JobExecuteContext
{
public JobExecuteContext(IJobLogger jobLogger, string jobParameter, CancellationToken cancellationToken)
{
this.JobLogger = jobLogger;
this.JobParameter = jobParameter;
this.cancellationToken = cancellationToken;
}
public string JobParameter { get; }
public IJobLogger JobLogger { get; }
public CancellationToken cancellationToken { get; }
}
}

@ -1,16 +0,0 @@
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Text;
namespace DotXxlJob.Core.Model
{
[DataContract]
public class KillRequest
{
[DataMember(Name = "jobId", Order = 1)]
public int JobId { get; set; }
}
}

@ -1,21 +0,0 @@
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Text;
namespace DotXxlJob.Core.Model
{
[DataContract]
public class LogRequest
{
[DataMember(Name = "logDateTim", Order =1)]
public long LogDateTime { get; set; }
[DataMember(Name = "logId", Order = 2)]
public int LogId { get; set; }
[DataMember(Name = "fromLineNum", Order = 3)]
public int FromLineNum { get; set; }
}
}

@ -1,26 +0,0 @@
using System.Runtime.Serialization;
namespace DotXxlJob.Core.Model
{
[DataContract(Name = Constants.LogResultJavaFullName)]
public class LogResult
{
public LogResult(int fromLine ,int toLine,string content,bool isEnd)
{
this.FromLineNum = fromLine;
this.ToLineNum = toLine;
this.LogContent = content;
this.IsEnd = isEnd;
}
[DataMember(Name = "fromLineNum",Order = 1)]
public int FromLineNum { get; set; }
[DataMember(Name = "toLineNum",Order = 2)]
public int ToLineNum { get; set; }
[DataMember(Name = "logContent",Order = 3)]
public string LogContent { get; set; }
[DataMember(Name = "isEnd",Order = 4)]
public bool IsEnd { get; set; }
}
}

@ -1,19 +0,0 @@
using System.Runtime.Serialization;
namespace DotXxlJob.Core.Model
{
[DataContract(Name = Constants.RegistryParamJavaFullName)]
public class RegistryParam
{
[DataMember(Name = "registryGroup", Order = 1)]
public string RegistryGroup { get; set; }
[DataMember(Name = "registryKey", Order = 2)]
public string RegistryKey { get; set; }
[DataMember(Name = "registryValue", Order = 3)]
public string RegistryValue { get; set; }
}
}

@ -1,47 +0,0 @@
using System.Runtime.Serialization;
namespace DotXxlJob.Core
{
[DataContract(Name = Constants.ReturnTJavaFullName)]
public class ReturnT
{
public const int SUCCESS_CODE = 200;
public const int FAIL_CODE = 500;
public static readonly ReturnT SUCCESS = new ReturnT(SUCCESS_CODE, null);
public static readonly ReturnT FAIL = new ReturnT(FAIL_CODE, null);
public static readonly ReturnT FAIL_TIMEOUT = new ReturnT(502, null);
public ReturnT() { }
public ReturnT(int code, string msg)
{
Code = code;
Msg = msg;
}
[DataMember(Name = "code",Order = 1)]
public int Code { get; set; }
[DataMember(Name = "msg",Order = 2)]
public string Msg { get; set; }
[DataMember(Name = "content",Order = 3)]
public object Content { get; set; }
public static ReturnT Failed(string msg)
{
return new ReturnT(FAIL_CODE, msg);
}
public static ReturnT Success(string msg)
{
return new ReturnT(SUCCESS_CODE, msg);
}
}
}

@ -1,50 +0,0 @@
using System.Collections.Generic;
using System.Runtime.Serialization;
namespace DotXxlJob.Core.Model
{
[DataContract(Name = Constants.RpcRequestJavaFullName)]
public class RpcRequest
{
/*
requestId
createMillisTime
accessToken
className
methodName
version
parameterTypes
parameters
*/
[DataMember(Name = "requestId",Order = 1)]
public string RequestId { get; set; }
//[DataMember(Name = "serverAddress")]
//public string ServerAddress{ get; set; }
[DataMember(Name = "createMillisTime" ,Order = 2)]
public long CreateMillisTime{ get; set; }
[DataMember(Name = "accessToken" ,Order = 3)]
public string AccessToken{ get; set; }
[DataMember(Name = "className" ,Order = 4)]
public string ClassName{ get; set; }
[DataMember(Name = "methodName" ,Order = 5)]
public string MethodName{ get; set; }
[DataMember(Name = "version" ,Order = 6)]
public string Version{ get; set; }
[DataMember(Name = "parameterTypes",Order = 7)]
public IList<object> ParameterTypes{ get; set; }
[DataMember(Name = "parameters",Order = 8)]
public IList<object> Parameters{ get; set; }
}
}

@ -1,18 +0,0 @@
using System.Runtime.Serialization;
namespace DotXxlJob.Core.Model
{
[DataContract(Name = Constants.RpcResponseJavaFullName)]
public class RpcResponse
{
[DataMember(Name = "requestId",Order = 1)]
public string RequestId{ get; set; }
[DataMember(Name = "errorMsg",Order = 2)]
public string ErrorMsg { get; set; }
[DataMember(Name = "result",Order = 3)]
public object Result{ get; set; }
public bool IsError => this.ErrorMsg != null;
}
}

@ -1,45 +0,0 @@
using System.Runtime.Serialization;
namespace DotXxlJob.Core.Model
{
[DataContract]
public class TriggerParam
{
//static readonly long SerialVersionUID = 42L;
[DataMember(Name = "jobId", Order = 1)]
public int JobId { get; set; }
[DataMember(Name = "executorHandler", Order = 2)]
public string ExecutorHandler { get; set; }
[DataMember(Name = "executorParams", Order = 3)]
public string ExecutorParams{ get; set; }
[DataMember(Name = "executorBlockStrategy", Order = 4)]
public string ExecutorBlockStrategy{ get; set; }
[DataMember(Name = "executorTimeout", Order = 5)]
public int ExecutorTimeout{ get; set; }
[DataMember(Name = "logId",Order = 5)]
public long LogId { get; set; }
[DataMember(Name = "logDateTime", Order = 6)]
public long LogDateTime{ get; set; }
[DataMember(Name = "glueType",Order = 7)]
public string GlueType{ get; set; }
[DataMember(Name = "glueSource",Order = 8)]
public string GlueSource{ get; set; }
[DataMember(Name = "glueUpdatetime", Order = 9)]
public long GlueUpdateTime{ get; set; }
[DataMember(Name = "broadcastIndex",Order = 10)]
public int BroadcastIndex{ get; set; }
[DataMember(Name = "broadcastTotal",Order = 11)]
public int BroadcastTotal{ get; set; }
}
}

@ -1,118 +0,0 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotXxlJob.Core.Config;
using DotXxlJob.Core.Model;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotXxlJob.Core.Queue
{
public class CallbackTaskQueue:IDisposable
{
private readonly AdminClient _adminClient;
private readonly IJobLogger _jobLogger;
private readonly RetryCallbackTaskQueue _retryQueue;
private readonly ILogger<CallbackTaskQueue> _logger;
private readonly ConcurrentQueue<HandleCallbackParam> taskQueue = new ConcurrentQueue<HandleCallbackParam>();
private bool _stop;
private bool _isRunning;
private int _callbackInterval;
private Task _runTask;
public CallbackTaskQueue(AdminClient adminClient,IJobLogger jobLogger,IOptions<XxlJobExecutorOptions> optionsAccessor
, ILoggerFactory loggerFactory)
{
_adminClient = adminClient;
_jobLogger = jobLogger;
_callbackInterval = optionsAccessor.Value.CallBackInterval;
_retryQueue = new RetryCallbackTaskQueue(optionsAccessor.Value.LogPath,
Push,
loggerFactory.CreateLogger<RetryCallbackTaskQueue>());
_logger = loggerFactory.CreateLogger<CallbackTaskQueue>();
}
public void Push(HandleCallbackParam callbackParam)
{
taskQueue.Enqueue(callbackParam);
StartCallBack();
}
public void Dispose()
{
_stop = true;
_retryQueue.Dispose();
_runTask?.GetAwaiter().GetResult();
}
private void StartCallBack()
{
if ( _isRunning)
{
return;
}
_runTask = Task.Run(async () =>
{
_logger.LogDebug("start to callback");
_isRunning = true;
while (!_stop)
{
await DoCallBack();
if (taskQueue.IsEmpty)
{
await Task.Delay(TimeSpan.FromMilliseconds(_callbackInterval));
}
}
_logger.LogDebug("end to callback");
_isRunning = false;
});
}
private async Task DoCallBack()
{
List<HandleCallbackParam> list = new List<HandleCallbackParam>();
if(!taskQueue.TryDequeue(out var item))
{
return;
}
list.Add(item);
ReturnT result;
try
{
result = await _adminClient.Callback(list).ConfigureAwait(false);
}
catch (Exception ex){
_logger.LogError(ex,"trigger callback error:{error}",ex.Message);
result = ReturnT.Failed(ex.Message);
_retryQueue.Push(list);
}
LogCallBackResult(result, list);
}
private void LogCallBackResult(ReturnT result,List<HandleCallbackParam> list)
{
foreach (var param in list)
{
_jobLogger.LogSpecialFile(param.LogDateTime, param.LogId, result.Msg??"Success");
}
}
}
}

@ -1,174 +0,0 @@
using System;
using System.Collections.Concurrent;
using System.Net.NetworkInformation;
using System.Threading;
using System.Threading.Tasks;
using DotXxlJob.Core.Model;
using Microsoft.Extensions.Logging;
namespace DotXxlJob.Core
{
public class JobTaskQueue : IDisposable
{
private readonly IJobLogger _jobLogger;
private readonly ILogger<JobTaskQueue> _logger;
private readonly ConcurrentQueue<TriggerParam> TASK_QUEUE = new ConcurrentQueue<TriggerParam>();
private readonly ConcurrentDictionary<long, byte> ID_IN_QUEUE = new ConcurrentDictionary<long, byte>();
private CancellationTokenSource _cancellationTokenSource;
private Task _runTask;
public JobTaskQueue(ITaskExecutor executor, IJobLogger jobLogger, ILogger<JobTaskQueue> logger)
{
this.Executor = executor;
this._jobLogger = jobLogger;
this._logger = logger;
}
public ITaskExecutor Executor { get; }
public event EventHandler<HandleCallbackParam> CallBack;
public bool IsRunning()
{
return _cancellationTokenSource != null;
}
/// <summary>
/// 覆盖之前的队列
/// </summary>
/// <param name="triggerParam"></param>
/// <returns></returns>
public ReturnT Replace(TriggerParam triggerParam)
{
while (!TASK_QUEUE.IsEmpty)
{
TASK_QUEUE.TryDequeue(out _);
}
Stop();
ID_IN_QUEUE.Clear();
return Push(triggerParam);
}
public ReturnT Push(TriggerParam triggerParam)
{
if (!ID_IN_QUEUE.TryAdd(triggerParam.LogId, 0))
{
_logger.LogWarning("repeat job task,logId={logId},jobId={jobId}", triggerParam.LogId, triggerParam.JobId);
return ReturnT.Failed("repeat job task!");
}
//this._logger.LogWarning("add job with logId={logId},jobId={jobId}",triggerParam.LogId,triggerParam.JobId);
this.TASK_QUEUE.Enqueue(triggerParam);
StartTask();
return ReturnT.SUCCESS;
}
public void Stop()
{
_cancellationTokenSource?.Cancel();
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;
//wait for task completed
_runTask?.GetAwaiter().GetResult();
}
public void Dispose()
{
while (!TASK_QUEUE.IsEmpty)
{
TASK_QUEUE.TryDequeue(out _);
}
ID_IN_QUEUE.Clear();
Stop();
}
private void StartTask()
{
if (_cancellationTokenSource != null)
{
return; //running
}
_cancellationTokenSource = new CancellationTokenSource();
var ct = _cancellationTokenSource.Token;
_runTask = Task.Factory.StartNew(async () =>
{
//ct.ThrowIfCancellationRequested();
while (!ct.IsCancellationRequested)
{
if (TASK_QUEUE.IsEmpty)
{
//_logger.LogInformation("task queue is empty!");
break;
}
ReturnT result = null;
TriggerParam triggerParam = null;
try
{
if (TASK_QUEUE.TryDequeue(out triggerParam))
{
if (!ID_IN_QUEUE.TryRemove(triggerParam.LogId, out _))
{
_logger.LogWarning("remove queue failed,logId={logId},jobId={jobId},exists={exists}"
, triggerParam.LogId, triggerParam.JobId, ID_IN_QUEUE.ContainsKey(triggerParam.LogId));
}
//set log file;
_jobLogger.SetLogFile(triggerParam.LogDateTime, triggerParam.LogId);
_jobLogger.Log("<br>----------- xxl-job job execute start -----------<br>----------- Param:{0}", triggerParam.ExecutorParams);
var exectorToken = ct;
CancellationTokenSource timeoutCts = null;
if (triggerParam.ExecutorTimeout > 0)
{
timeoutCts = new CancellationTokenSource(triggerParam.ExecutorTimeout * 1000);
exectorToken = CancellationTokenSource.CreateLinkedTokenSource(exectorToken, timeoutCts.Token).Token;
}
result = await Executor.Execute(triggerParam, exectorToken);
if(timeoutCts != null && timeoutCts.IsCancellationRequested)
{
result = ReturnT.FAIL_TIMEOUT;
timeoutCts.Dispose();
timeoutCts = null;
}
_jobLogger.Log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + result.Code);
}
else
{
_logger.LogWarning("Dequeue Task Failed");
}
}
catch (Exception ex)
{
result = ReturnT.Failed("Dequeue Task Failed:" + ex.Message);
_jobLogger.Log("<br>----------- JobThread Exception:" + ex.Message + "<br>----------- xxl-job job execute end(error) -----------");
}
if (triggerParam != null)
{
CallBack?.Invoke(this, new HandleCallbackParam(triggerParam, result ?? ReturnT.FAIL));
}
}
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;
}, _cancellationTokenSource.Token);
}
}
}

@ -1,136 +0,0 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DotXxlJob.Core.Json;
using DotXxlJob.Core.Model;
using Microsoft.Extensions.Logging;
namespace DotXxlJob.Core.Queue
{
public class RetryCallbackTaskQueue:IDisposable
{
private readonly Action<HandleCallbackParam> _actionDoCallback;
private readonly ILogger<RetryCallbackTaskQueue> _logger;
private CancellationTokenSource _cancellation;
private Task _runTask;
private readonly string _backupFile;
public RetryCallbackTaskQueue(string backupPath,Action<HandleCallbackParam> actionDoCallback,ILogger<RetryCallbackTaskQueue> logger)
{
_actionDoCallback = actionDoCallback;
_logger = logger;
_backupFile = Path.Combine(backupPath, Constants.XxlJobRetryLogsFile);
var dir = Path.GetDirectoryName(backupPath);
if (!Directory.Exists(dir))
{
Directory.CreateDirectory(dir ?? throw new Exception("logs path is empty"));
}
StartQueue();
}
private void StartQueue()
{
_cancellation = new CancellationTokenSource();
var stopToken = this._cancellation.Token;
_runTask = Task.Factory.StartNew(async () =>
{
while (!stopToken.IsCancellationRequested)
{
await LoadFromFile();
await Task.Delay(Constants.CallbackRetryInterval,stopToken);
}
}, TaskCreationOptions.LongRunning);
}
private async Task LoadFromFile()
{
var list = new List<HandleCallbackParam>();
if (!File.Exists(_backupFile))
{
return;
}
using (StreamReader reader = new StreamReader(this._backupFile))
{
string nextLine;
while ((nextLine = await reader.ReadLineAsync()) != null)
{
try
{
list.Add(Utf8Json.JsonSerializer.Deserialize<HandleCallbackParam>(nextLine, ProjectDefaultResolver.Instance));
}
catch(Exception ex)
{
_logger.LogError(ex,"read backup file error:{error}",ex.Message);
}
}
}
try
{
File.Delete(_backupFile); //ɾ³ý±¸·ÝÎļþ
}
catch (Exception ex)
{
_logger.LogError(ex, "delete backup file error:{error}", ex.Message);
}
if (list.Count > 0)
{
foreach (var item in list)
{
_actionDoCallback(item);
}
}
}
public void Push(List<HandleCallbackParam> list)
{
if (list?.Count == 0)
{
return;
}
try
{
using (var writer = new StreamWriter(this._backupFile, true, Encoding.UTF8))
{
foreach (var item in list)
{
if (item.CallbackRetryTimes >= Constants.MaxCallbackRetryTimes)
{
this._logger.LogInformation("callback too many times and will be abandon,logId {logId}", item.LogId);
}
else
{
item.CallbackRetryTimes++;
byte[] buffer = Utf8Json.JsonSerializer.Serialize(item,ProjectDefaultResolver.Instance);
writer.WriteLine(Encoding.UTF8.GetString(buffer));
}
}
}
}
catch (Exception ex)
{
this._logger.LogError(ex, "SaveCallbackParams error.");
}
}
public void Dispose()
{
this._cancellation.Cancel();
this._runTask?.GetAwaiter().GetResult();
}
}
}

@ -1,40 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Extensions.DependencyInjection;
namespace DotXxlJob.Core
{
/// <summary>
/// 负责响应RPC请求,调度任务执行器的工厂类
/// </summary>
public class TaskExecutorFactory
{
private readonly IServiceProvider _provider;
private readonly Dictionary<string, ITaskExecutor> _cache = new Dictionary<string, ITaskExecutor>();
public TaskExecutorFactory(IServiceProvider provider)
{
this._provider = provider;
Initialize();
}
private void Initialize()
{
var executors = this._provider.GetServices<ITaskExecutor>();
var taskExecutors = executors as ITaskExecutor[] ?? executors.ToArray();
if (executors == null || !taskExecutors.Any()) return;
foreach (var item in taskExecutors)
{
this._cache.Add(item.GlueType,item);
}
}
public ITaskExecutor GetTaskExecutor(string glueType)
{
return this._cache.TryGetValue(glueType, out var executor) ? executor : null;
}
}
}

@ -1,35 +0,0 @@
using System.Threading;
using System.Threading.Tasks;
using DotXxlJob.Core.Model;
namespace DotXxlJob.Core.TaskExecutors
{
/// <summary>
/// 实现 IJobHandler的执行器
/// </summary>
public class BeanTaskExecutor : ITaskExecutor
{
private readonly IJobHandlerFactory _handlerFactory;
private readonly IJobLogger _jobLogger;
public BeanTaskExecutor(IJobHandlerFactory handlerFactory, IJobLogger jobLogger)
{
this._handlerFactory = handlerFactory;
this._jobLogger = jobLogger;
}
public string GlueType { get; } = Constants.GlueType.BEAN;
public Task<ReturnT> Execute(TriggerParam triggerParam, CancellationToken cancellationToken)
{
var handler = _handlerFactory.GetJobHandler(triggerParam.ExecutorHandler);
if (handler == null)
{
return Task.FromResult(ReturnT.Failed($"job handler [{triggerParam.ExecutorHandler} not found."));
}
var context = new JobExecuteContext(this._jobLogger, triggerParam.ExecutorParams, cancellationToken);
return handler.Execute(context);
}
}
}

@ -1,13 +0,0 @@
using System.Threading;
using System.Threading.Tasks;
using DotXxlJob.Core.Model;
namespace DotXxlJob.Core
{
public interface ITaskExecutor
{
string GlueType { get; }
Task<ReturnT> Execute(TriggerParam triggerParam, CancellationToken cancellationToken);
}
}

@ -1,166 +0,0 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Cache;
using System.Text;
using System.Threading.Tasks;
using DotXxlJob.Core.Config;
using DotXxlJob.Core.Model;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
namespace DotXxlJob.Core
{
public class XxlRestfulServiceHandler
{
private readonly JobDispatcher _jobDispatcher;
private readonly IJobLogger _jobLogger;
private readonly ILogger<XxlRestfulServiceHandler> _logger;
private readonly XxlJobExecutorOptions _options;
public XxlRestfulServiceHandler(IOptions<XxlJobExecutorOptions> optionsAccessor,
JobDispatcher jobDispatcher,
IJobLogger jobLogger,
ILogger<XxlRestfulServiceHandler> logger)
{
this._jobDispatcher = jobDispatcher;
this._jobLogger = jobLogger;
this._logger = logger;
this._options = optionsAccessor.Value;
if (this._options == null)
{
throw new ArgumentNullException(nameof(XxlJobExecutorOptions));
}
}
public async Task HandlerAsync(HttpRequest request,HttpResponse response)
{
var path = request.Path.Value ;
ReturnT ret = null;
var arrParts = path.Split('/');
var method = arrParts[arrParts.Length - 1].ToLower();
if (!string.IsNullOrEmpty(this._options.AccessToken))
{
var reqToken = "";
if (request.Headers.TryGetValue("XXL-JOB-ACCESS-TOKEN", out var tokenValues))
{
reqToken = tokenValues[0].ToString();
}
if(this._options.AccessToken != reqToken)
{
ret = ReturnT.Failed("ACCESS-TOKEN Auth Fail");
response.ContentType = "application/json;charset=utf-8";
await response.WriteAsync(JsonConvert.SerializeObject(ret));
return;
}
}
try
{
string json = await CollectBody(request.Body);
switch (method)
{
case "beat":
ret = Beat();
break;
case "idlebeat":
ret = IdleBeat(JsonConvert.DeserializeObject<IdleBeatRequest>(json));
break;
case "run":
ret = Run(JsonConvert.DeserializeObject<TriggerParam>(json));
break;
case "kill":
ret = Kill(JsonConvert.DeserializeObject<KillRequest>(json));
break;
case "log":
ret = Log(JsonConvert.DeserializeObject<LogRequest>(json));
break;
}
}
catch(Exception ex)
{
this._logger.LogError(ex,"响应出错"+ ex.Message);
ret = ReturnT.Failed("执行器内部错误");
}
if(ret == null)
{
ret = ReturnT.Failed($"method {method} is not impl");
}
response.ContentType = "application/json;charset=utf-8";
await response.WriteAsync(JsonConvert.SerializeObject(ret, new JsonSerializerSettings() { StringEscapeHandling = StringEscapeHandling.EscapeNonAscii }));
}
private async Task<string> CollectBody(Stream body)
{
string bodyText;
using (var reader = new StreamReader(body))
{
bodyText = await reader.ReadToEndAsync();
}
return bodyText??string.Empty;
}
#region rpc service
private ReturnT Beat()
{
return ReturnT.SUCCESS;
}
private ReturnT IdleBeat(IdleBeatRequest req)
{
if(req == null)
{
return ReturnT.Failed("IdleBeat Error");
}
return this._jobDispatcher.IdleBeat(req.JobId);
}
private ReturnT Kill(KillRequest req)
{
if (req == null)
{
return ReturnT.Failed("Kill Error");
}
return this._jobDispatcher.TryRemoveJobTask(req.JobId) ?
ReturnT.SUCCESS
:
ReturnT.Success("job thread already killed.");
}
/// <summary>
/// read Log
/// </summary>
/// <returns></returns>
private ReturnT Log(LogRequest req)
{
if (req == null)
{
return ReturnT.Failed("Log Error");
}
var ret = ReturnT.Success(null);
ret.Content = this._jobLogger.ReadLog(req.LogDateTime,req.LogId, req.FromLineNum);
return ret;
}
/// <summary>
/// 执行
/// </summary>
/// <param name="triggerParam"></param>
/// <returns></returns>
private ReturnT Run(TriggerParam triggerParam)
{
return this._jobDispatcher.Execute(triggerParam);
}
#endregion
}
}

@ -1,19 +0,0 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netcoreapp2.2</TargetFramework>
<IsPackable>false</IsPackable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.9.0" />
<PackageReference Include="xunit" Version="2.4.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotXxlJob.Core\DotXxlJob.Core.csproj" />
</ItemGroup>
</Project>

@ -1,13 +0,0 @@
using System;
using Xunit;
namespace DotXxlJob.Core.Tests
{
public class UnitTest1
{
[Fact]
public void Test1()
{
}
}
}
Loading…
Cancel
Save