ChannelContext.cs
3.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
using NCC.Dependency;
using NCC.FriendlyException;
using System;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace NCC.IPCChannel
{
/// <summary>
/// 进程管道内通信上下文
/// </summary>
/// <typeparam name="TMessage"></typeparam>
/// <typeparam name="THandler"></typeparam>
/// <remarks>后续将通过 MemoryMapperFile 共享内存实现 IPC 通信:https://docs.microsoft.com/zh-cn/dotnet/api/system.io.memorymappedfiles.memorymappedfile?view=net-5.0 </remarks>
[SuppressSniffer]
public sealed class ChannelContext<TMessage, THandler>
where THandler : ChannelHandler<TMessage>
{
/// <summary>
/// 通过懒加载创建无限容量通道
/// </summary>
private static readonly Lazy<Channel<TMessage>> _unBoundedChannel = new(() =>
{
var channel = Channel.CreateUnbounded<TMessage>(new UnboundedChannelOptions
{
SingleReader = false, // 允许多个管道读写,提供管道吞吐量(无序操作)
SingleWriter = false
});
StartReader(channel);
return channel;
});
/// <summary>
/// 通过懒加载创建有限容量通道
/// </summary>
/// <remarks>默认容量为 1000</remarks>
private static readonly Lazy<Channel<TMessage>> _boundedChannel = new(() =>
{
var channel = Channel.CreateBounded<TMessage>(new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = false, // 允许多个管道读写,提供管道吞吐量(无序操作)
SingleWriter = false
});
StartReader(channel);
return channel;
});
/// <summary>
/// 无限容量通道
/// </summary>
public static Channel<TMessage> UnBoundedChannel => _unBoundedChannel.Value;
/// <summary>
/// 有限容量通道
/// </summary>
public static Channel<TMessage> BoundedChannel => _boundedChannel.Value;
/// <summary>
/// 私有构造函数
/// </summary>
private ChannelContext()
{
}
/// <summary>
/// 创建一个读取器
/// </summary>
/// <param name="channel"></param>
private static void StartReader(Channel<TMessage> channel)
{
var reader = channel.Reader;
// 创建长时间线程管道读取器
_ = Task.Factory.StartNew(async () =>
{
while (await reader.WaitToReadAsync())
{
if (!reader.TryRead(out var message)) continue;
// 并行执行(非等待)
var task = new Task(async () =>
{
// 默认重试 3 次(每次间隔 1s)
await Retry.Invoke(async () => await Activator.CreateInstance<THandler>().InvokeAsync(message), 3, 1000, finalThrow: false);
});
task.Start();
}
}, TaskCreationOptions.LongRunning);
}
}
}