using NCC.Dependency; using NCC.FriendlyException; using System; using System.Threading.Channels; using System.Threading.Tasks; namespace NCC.IPCChannel { /// /// 进程管道内通信上下文 /// /// /// /// 后续将通过 MemoryMapperFile 共享内存实现 IPC 通信:https://docs.microsoft.com/zh-cn/dotnet/api/system.io.memorymappedfiles.memorymappedfile?view=net-5.0 [SuppressSniffer] public sealed class ChannelContext where THandler : ChannelHandler { /// /// 通过懒加载创建无限容量通道 /// private static readonly Lazy> _unBoundedChannel = new(() => { var channel = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = false, // 允许多个管道读写,提供管道吞吐量(无序操作) SingleWriter = false }); StartReader(channel); return channel; }); /// /// 通过懒加载创建有限容量通道 /// /// 默认容量为 1000 private static readonly Lazy> _boundedChannel = new(() => { var channel = Channel.CreateBounded(new BoundedChannelOptions(1000) { FullMode = BoundedChannelFullMode.Wait, SingleReader = false, // 允许多个管道读写,提供管道吞吐量(无序操作) SingleWriter = false }); StartReader(channel); return channel; }); /// /// 无限容量通道 /// public static Channel UnBoundedChannel => _unBoundedChannel.Value; /// /// 有限容量通道 /// public static Channel BoundedChannel => _boundedChannel.Value; /// /// 私有构造函数 /// private ChannelContext() { } /// /// 创建一个读取器 /// /// private static void StartReader(Channel channel) { var reader = channel.Reader; // 创建长时间线程管道读取器 _ = Task.Factory.StartNew(async () => { while (await reader.WaitToReadAsync()) { if (!reader.TryRead(out var message)) continue; // 并行执行(非等待) var task = new Task(async () => { // 默认重试 3 次(每次间隔 1s) await Retry.Invoke(async () => await Activator.CreateInstance().InvokeAsync(message), 3, 1000, finalThrow: false); }); task.Start(); } }, TaskCreationOptions.LongRunning); } } }