Blame view

netcore/src/Infrastructure/NCC/IPCChannel/ChannelContext.cs 3.2 KB
de2bd2f9   “wangming”   项目初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
  using NCC.Dependency;
  using NCC.FriendlyException;
  using System;
  using System.Threading.Channels;
  using System.Threading.Tasks;
  
  namespace NCC.IPCChannel
  {
      /// <summary>
      /// 进程管道内通信上下文
      /// </summary>
      /// <typeparam name="TMessage"></typeparam>
      /// <typeparam name="THandler"></typeparam>
      /// <remarks>后续将通过 MemoryMapperFile 共享内存实现 IPC 通信:https://docs.microsoft.com/zh-cn/dotnet/api/system.io.memorymappedfiles.memorymappedfile?view=net-5.0 </remarks>
      [SuppressSniffer]
      public sealed class ChannelContext<TMessage, THandler>
          where THandler : ChannelHandler<TMessage>
      {
          /// <summary>
          /// 通过懒加载创建无限容量通道
          /// </summary>
          private static readonly Lazy<Channel<TMessage>> _unBoundedChannel = new(() =>
          {
              var channel = Channel.CreateUnbounded<TMessage>(new UnboundedChannelOptions
              {
                  SingleReader = false,   // 允许多个管道读写,提供管道吞吐量(无序操作)
                  SingleWriter = false
              });
  
              StartReader(channel);
              return channel;
          });
  
          /// <summary>
          /// 通过懒加载创建有限容量通道
          /// </summary>
          /// <remarks>默认容量为 1000</remarks>
          private static readonly Lazy<Channel<TMessage>> _boundedChannel = new(() =>
          {
              var channel = Channel.CreateBounded<TMessage>(new BoundedChannelOptions(1000)
              {
                  FullMode = BoundedChannelFullMode.Wait,
                  SingleReader = false,   // 允许多个管道读写,提供管道吞吐量(无序操作)
                  SingleWriter = false
              });
  
              StartReader(channel);
              return channel;
          });
  
          /// <summary>
          /// 无限容量通道
          /// </summary>
          public static Channel<TMessage> UnBoundedChannel => _unBoundedChannel.Value;
  
          /// <summary>
          /// 有限容量通道
          /// </summary>
          public static Channel<TMessage> BoundedChannel => _boundedChannel.Value;
  
          /// <summary>
          /// 私有构造函数
          /// </summary>
          private ChannelContext()
          {
          }
  
          /// <summary>
          /// 创建一个读取器
          /// </summary>
          /// <param name="channel"></param>
          private static void StartReader(Channel<TMessage> channel)
          {
              var reader = channel.Reader;
  
              // 创建长时间线程管道读取器
              _ = Task.Factory.StartNew(async () =>
              {
                  while (await reader.WaitToReadAsync())
                  {
                      if (!reader.TryRead(out var message)) continue;
  
                      // 并行执行(非等待)
                      var task = new Task(async () =>
                      {
                          // 默认重试 3 次(每次间隔 1s
                          await Retry.Invoke(async () => await Activator.CreateInstance<THandler>().InvokeAsync(message), 3, 1000, finalThrow: false);
                      });
                      task.Start();
                  }
              }, TaskCreationOptions.LongRunning);
          }
      }
  }