Blame view

netcore/src/Infrastructure/NCC/EventBridge/Internal/EventDispatcher.cs 5.49 KB
de2bd2f9   “wangming”   项目初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
  using NCC.Extensions;
  using NCC.FriendlyException;
  using NCC.IPCChannel;
  using Microsoft.AspNetCore.Mvc;
  using Microsoft.Extensions.DependencyInjection;
  using System;
  using System.Collections.Generic;
  using System.Linq;
  using System.Reflection;
  using System.Threading.Tasks;
  
  namespace NCC.EventBridge
  {
      /// <summary>
      /// 事件分发调度器
      /// </summary>
      internal sealed class EventDispatcher : ChannelHandler<EventMessage>
      {
          /// <summary>
          /// 调度核心代码
          /// </summary>
          /// <param name="eventMessage"></param>
          /// <returns></returns>
          public async override Task InvokeAsync(EventMessage eventMessage)
          {
              // 解析服务工厂及创建作用域
              var serviceScopeFactory = App.GetService<IServiceScopeFactory>(App.RootServices);
              using var scoped = serviceScopeFactory.CreateScope();
  
              // 解析事件消息元数据
              var eventStoreProvider = scoped.ServiceProvider.GetService<IEventStoreProvider>();
              var eventMessageMetadata = await eventStoreProvider?.GetEventMessageAsync(eventMessage.Category, eventMessage.EventId);
              if (eventMessageMetadata == null) return;
  
              // 获取解析事件处理程序服务委托并创建事件处理程序
              var eventHandlerResolve = scoped.ServiceProvider.GetService<Func<EventMessageMetadata, IEventHandler>>();
              var eventHandler = eventHandlerResolve(eventMessageMetadata);
              if (eventHandler == null) return;
  
              // 查找所有符合的处理方法,贴了 [EventId]  方法名相等的
              var methods = eventHandler.GetType().GetTypeInfo().DeclaredMethods
                                     .Where(u => !u.IsStatic)
                                     .Where(u => u.Name.ClearStringAffixes(1, "Async") == eventMessage.EventId
                                         || (u.IsDefined(typeof(EventMessageAttribute), false) && u.GetCustomAttributes<EventMessageAttribute>(false).Any(e => e.EventId == eventMessage.EventId)))
                                     .Where(u => u.ReturnType == typeof(void) || u.ReturnType == typeof(Task))
                                     .Where(u => u.GetParameters().Length > 0 && u.GetParameters()[0].ParameterType.HasImplementedRawGeneric(typeof(EventMessage<>)));
  
              if (!methods.Any()) return;
  
              // 调用方法
              await InvokeAsync(methods, eventMessage, eventMessageMetadata
                  , scoped, eventStoreProvider, eventHandler);
          }
  
          /// <summary>
          /// 调用符合规则的方法
          /// </summary>
          /// <param name="methods"></param>
          /// <param name="eventPayload"></param>
          /// <param name="eventMessageMetadata"></param>
          /// <param name="scoped"></param>
          /// <param name="eventStoreProvider"></param>
          /// <param name="eventHandler"></param>
          /// <returns></returns>
          private static async Task InvokeAsync(IEnumerable<MethodInfo> methods
              , EventMessage eventPayload
              , EventMessageMetadata eventMessageMetadata
              , IServiceScope scoped
              , IEventStoreProvider eventStoreProvider
              , IEventHandler eventHandler)
          {
              foreach (var method in methods)
              {
                  // 处理泛型事件消息承载数据
                  var parameters = new List<object> { ConvertGenericPayload(eventPayload, method) };
  
                  // 解析贴了 [FromServices] 特性的服务
                  var otherParameters = method.GetParameters().Skip(1);
                  foreach (var parameterInfo in otherParameters)
                  {
                      if (!parameterInfo.IsDefined(typeof(FromServicesAttribute), false)) parameters.Add(default);
                      else parameters.Add(scoped.ServiceProvider.GetService(parameterInfo.ParameterType));
                  }
  
                  try
                  {
                      // 默认重试 3 次(每次间隔 1s
                      await Retry.Invoke(async () =>
                      {
                          var result = method.Invoke(eventHandler, parameters.ToArray());
                          if (method.IsAsync()) await (Task)result;
                      }, 3, 1000, finalThrow: true);
  
                      // 触发调用成功处理方法
                      await eventStoreProvider.ExecuteSuccessfullyAsync(eventMessageMetadata);
                  }
                  catch (Exception exception)
                  {
                      // 触发调用失败处理方法
                      await eventStoreProvider.ExecuteFaildedAsync(eventMessageMetadata, exception);
                  }
              }
          }
  
          /// <summary>
          /// 处理泛型消息承载数据
          /// </summary>
          /// <param name="eventMessage"></param>
          /// <param name="method"></param>
          /// <returns></returns>
          private static object ConvertGenericPayload(EventMessage eventMessage, MethodInfo method)
          {
              object payload;
              if (method.GetParameters()[0].ParameterType.IsGenericType)
              {
                  var payloadType = method.GetParameters()[0].ParameterType.GetGenericArguments().First();
                  payload = Activator.CreateInstance(method.GetParameters()[0].ParameterType
                      , new object[] { eventMessage.Category, eventMessage.EventId, eventMessage.Payload.ChangeType(payloadType) });
              }
              else payload = eventMessage;
              return payload;
          }
      }
  }