pull/1/head
wangshaoming 7 years ago
parent cba469867d
commit e2e7c35224
No known key found for this signature in database
GPG Key ID: 29F5223B4DB362B5
  1. 18
      scripts/nuget-hessian.sh
  2. 91
      src/DotXxlJob.Core/AdminClient.cs
  3. 3
      src/DotXxlJob.Core/Attributes/JobHandlerAttribute.cs
  4. 2
      src/DotXxlJob.Core/Internal/HessianSerializer.cs
  5. 11
      src/DotXxlJob.Core/Logger/JobLogger.cs

@ -0,0 +1,18 @@
set -ex
cd $(dirname $0)/../
artifactsFolder="./artifacts"
if [ -d $artifactsFolder ]; then
rm -R $artifactsFolder
fi
mkdir -p $artifactsFolder
dotnet build ./src/Hessian/Hessian.csproj -c Release
dotnet pack ./src/Hessian/Hessian.csproj -c Release -o ../../$artifactsFolder
dotnet nuget push ./$artifactsFolder/Hessian.*.nupkg -k $NUGET_KEY -s https://www.nuget.org

@ -1,14 +1,13 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.SymbolStore;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading.Tasks;
using Hessian;
using DotXxlJob.Core.Config;
using DotXxlJob.Core.Model;
using Hessian;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
@ -16,22 +15,21 @@ namespace DotXxlJob.Core
{
public class AdminClient
{
static readonly string MAPPING = "/api";
private static readonly string MAPPING = "/api";
private readonly XxlJobExecutorOptions _options;
private readonly IHttpClientFactory _clientFactory;
private readonly ILogger<AdminClient> _logger;
private List<AddressEntry> _addresses;
private int _currentIndex;
public AdminClient(IOptions<XxlJobExecutorOptions> optionsAccessor
,IHttpClientFactory clientFactory
,ILogger<AdminClient> logger)
, IHttpClientFactory clientFactory
, ILogger<AdminClient> logger)
{
Preconditions.CheckNotNull(optionsAccessor?.Value, "XxlJobExecutorOptions");
this._options = optionsAccessor?.Value;
this._clientFactory = clientFactory;
this._logger = logger;
@ -43,59 +41,57 @@ namespace DotXxlJob.Core
this._addresses = new List<AddressEntry>();
foreach (var item in this._options.AdminAddresses.Split(';'))
{
try
{
var uri = new Uri(item + MAPPING);
var entry = new AddressEntry { RequestUri = uri };
this._addresses.Add(entry);
}
catch (Exception ex)
{
this._logger.LogError(ex, "init admin address error.");
}
try
{
var uri = new Uri(item + MAPPING);
var entry = new AddressEntry { RequestUri = uri };
this._addresses.Add(entry);
}
catch (Exception ex)
{
this._logger.LogError(ex, "init admin address error.");
}
}
}
public Task<ReturnT> Callback(List<HandleCallbackParam> callbackParamList)
{
return InvokeRpcService("callback", new List<object> {new JavaClass {Name = Constants.JavaListFulName}}, callbackParamList);
return InvokeRpcService("callback", new List<object> { new JavaClass { Name = Constants.JavaListFulName } }, callbackParamList);
}
public Task<ReturnT> Registry(RegistryParam registryParam)
{
return InvokeRpcService("registry", new List<object> {new JavaClass {Name = "com.xxl.job.core.biz.model.RegistryParam"}}, registryParam,true);
public Task<ReturnT> Registry(RegistryParam registryParam)
{
return InvokeRpcService("registry", new List<object> { new JavaClass { Name = "com.xxl.job.core.biz.model.RegistryParam" } }, registryParam, true);
}
public Task<ReturnT> RegistryRemove(RegistryParam registryParam)
public Task<ReturnT> RegistryRemove(RegistryParam registryParam)
{
return InvokeRpcService("registryRemove", new List<object> {new JavaClass {Name = "com.xxl.job.core.biz.model.RegistryParam"}}, registryParam,true);
return InvokeRpcService("registryRemove", new List<object> { new JavaClass { Name = "com.xxl.job.core.biz.model.RegistryParam" } }, registryParam, true);
}
private async Task<ReturnT> InvokeRpcService(string methodName, List<object> parameterTypes,
object parameters,bool polling=false)
object parameters, bool polling = false)
{
var request = new RpcRequest {
RequestId = Guid.NewGuid().ToString("N"),
CreateMillisTime = DateTime.Now.GetTotalMilliseconds(),
AccessToken = this._options.AccessToken,
AccessToken = _options.AccessToken,
ClassName = "com.xxl.job.core.biz.AdminBiz",
MethodName = methodName,
ParameterTypes = parameterTypes,
Parameters = new List<object> {parameters}
Parameters = new List<object> { parameters }
};
byte[] postBuf;
using (var stream = new MemoryStream())
{
HessianSerializer.SerializeRequest(stream,request);
HessianSerializer.SerializeRequest(stream, request);
postBuf = stream.ToArray();
postBuf = stream.ToArray();
}
var triedTimes = 0;
var retList = new List<ReturnT>();
using (var client = this._clientFactory.CreateClient(Constants.DefaultHttpClientName))
{
while (triedTimes++ < this._addresses.Count)
@ -104,7 +100,7 @@ namespace DotXxlJob.Core
this._currentIndex = (this._currentIndex + 1) % this._addresses.Count;
if (!address.CheckAccessible())
continue;
Stream resStream;
try
{
@ -113,41 +109,38 @@ namespace DotXxlJob.Core
}
catch (Exception ex)
{
this._logger.LogError(ex, "request admin error.{0}",ex.Message);
this._logger.LogError(ex, "request admin error.{0}", ex.Message);
address.SetFail();
continue;
}
RpcResponse res = null;
try
{
/*
{
/*
using (StreamReader reader = new StreamReader(resStream))
{
string content = await reader.ReadToEndAsync();
this._logger.LogWarning(content);
}
*/
res = HessianSerializer.DeserializeResponse(resStream);
res = HessianSerializer.DeserializeResponse(resStream);
}
catch (Exception ex)
{
this._logger.LogError(ex,"DeserializeResponse error:{errorMessage}",ex.Message);
this._logger.LogError(ex, "DeserializeResponse error:{errorMessage}", ex.Message);
}
if (res == null)
{
retList.Add(ReturnT.Failed("response is null"));
}
else if (res.IsError)
else if (res.IsError)
{
retList.Add(ReturnT.Failed(res.ErrorMsg));
}
else if(res.Result is ReturnT ret)
else if (res.Result is ReturnT ret)
{
retList.Add(ret);
}
@ -160,7 +153,6 @@ namespace DotXxlJob.Core
{
return retList[0];
}
}
if (retList.Count > 0)
@ -169,19 +161,16 @@ namespace DotXxlJob.Core
}
}
throw new Exception("xxl-rpc server address not accessible.");
}
private async Task<Stream> DoPost(HttpClient client,AddressEntry address, byte[] postBuf)
private async Task<Stream> DoPost(HttpClient client, AddressEntry address, byte[] postBuf)
{
var content = new ByteArrayContent(postBuf);
content.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");
var responseMessage = await client.PostAsync(address.RequestUri, content);
responseMessage.EnsureSuccessStatusCode();
return await responseMessage.Content.ReadAsStreamAsync();
}
}
}

@ -2,11 +2,12 @@ using System;
namespace DotXxlJob.Core
{
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false)]
public class JobHandlerAttribute:Attribute
{
public JobHandlerAttribute(string name)
{
this.Name = name;
Name = name;
}
public string Name { get; }

@ -64,7 +64,7 @@ namespace DotXxlJob.Core
{
//没有数据可读了
}
catch (Exception)
catch
{
//TODO: do something?
}

@ -47,7 +47,16 @@ namespace DotXxlJob.Core
public void Log(string pattern, params object[] format)
{
var appendLog = string.Format(pattern, format);
string appendLog;
if (format == null || format.Length == 0)
{
appendLog = pattern;
}
else
{
appendLog = string.Format(pattern, format);
}
var callInfo = new StackTrace(true).GetFrame(1);
LogDetail(GetLogFileName(), callInfo, appendLog);
}

Loading…
Cancel
Save