EventDispatcher.cs
5.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
using NCC.Extensions;
using NCC.FriendlyException;
using NCC.IPCChannel;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
namespace NCC.EventBridge
{
/// <summary>
/// 事件分发调度器
/// </summary>
internal sealed class EventDispatcher : ChannelHandler<EventMessage>
{
/// <summary>
/// 调度核心代码
/// </summary>
/// <param name="eventMessage"></param>
/// <returns></returns>
public async override Task InvokeAsync(EventMessage eventMessage)
{
// 解析服务工厂及创建作用域
var serviceScopeFactory = App.GetService<IServiceScopeFactory>(App.RootServices);
using var scoped = serviceScopeFactory.CreateScope();
// 解析事件消息元数据
var eventStoreProvider = scoped.ServiceProvider.GetService<IEventStoreProvider>();
var eventMessageMetadata = await eventStoreProvider?.GetEventMessageAsync(eventMessage.Category, eventMessage.EventId);
if (eventMessageMetadata == null) return;
// 获取解析事件处理程序服务委托并创建事件处理程序
var eventHandlerResolve = scoped.ServiceProvider.GetService<Func<EventMessageMetadata, IEventHandler>>();
var eventHandler = eventHandlerResolve(eventMessageMetadata);
if (eventHandler == null) return;
// 查找所有符合的处理方法,贴了 [EventId] 或 方法名相等的
var methods = eventHandler.GetType().GetTypeInfo().DeclaredMethods
.Where(u => !u.IsStatic)
.Where(u => u.Name.ClearStringAffixes(1, "Async") == eventMessage.EventId
|| (u.IsDefined(typeof(EventMessageAttribute), false) && u.GetCustomAttributes<EventMessageAttribute>(false).Any(e => e.EventId == eventMessage.EventId)))
.Where(u => u.ReturnType == typeof(void) || u.ReturnType == typeof(Task))
.Where(u => u.GetParameters().Length > 0 && u.GetParameters()[0].ParameterType.HasImplementedRawGeneric(typeof(EventMessage<>)));
if (!methods.Any()) return;
// 调用方法
await InvokeAsync(methods, eventMessage, eventMessageMetadata
, scoped, eventStoreProvider, eventHandler);
}
/// <summary>
/// 调用符合规则的方法
/// </summary>
/// <param name="methods"></param>
/// <param name="eventPayload"></param>
/// <param name="eventMessageMetadata"></param>
/// <param name="scoped"></param>
/// <param name="eventStoreProvider"></param>
/// <param name="eventHandler"></param>
/// <returns></returns>
private static async Task InvokeAsync(IEnumerable<MethodInfo> methods
, EventMessage eventPayload
, EventMessageMetadata eventMessageMetadata
, IServiceScope scoped
, IEventStoreProvider eventStoreProvider
, IEventHandler eventHandler)
{
foreach (var method in methods)
{
// 处理泛型事件消息承载数据
var parameters = new List<object> { ConvertGenericPayload(eventPayload, method) };
// 解析贴了 [FromServices] 特性的服务
var otherParameters = method.GetParameters().Skip(1);
foreach (var parameterInfo in otherParameters)
{
if (!parameterInfo.IsDefined(typeof(FromServicesAttribute), false)) parameters.Add(default);
else parameters.Add(scoped.ServiceProvider.GetService(parameterInfo.ParameterType));
}
try
{
// 默认重试 3 次(每次间隔 1s)
await Retry.Invoke(async () =>
{
var result = method.Invoke(eventHandler, parameters.ToArray());
if (method.IsAsync()) await (Task)result;
}, 3, 1000, finalThrow: true);
// 触发调用成功处理方法
await eventStoreProvider.ExecuteSuccessfullyAsync(eventMessageMetadata);
}
catch (Exception exception)
{
// 触发调用失败处理方法
await eventStoreProvider.ExecuteFaildedAsync(eventMessageMetadata, exception);
}
}
}
/// <summary>
/// 处理泛型消息承载数据
/// </summary>
/// <param name="eventMessage"></param>
/// <param name="method"></param>
/// <returns></returns>
private static object ConvertGenericPayload(EventMessage eventMessage, MethodInfo method)
{
object payload;
if (method.GetParameters()[0].ParameterType.IsGenericType)
{
var payloadType = method.GetParameters()[0].ParameterType.GetGenericArguments().First();
payload = Activator.CreateInstance(method.GetParameters()[0].ParameterType
, new object[] { eventMessage.Category, eventMessage.EventId, eventMessage.Payload.ChangeType(payloadType) });
}
else payload = eventMessage;
return payload;
}
}
}