using NCC.Dependency;
using NCC.FriendlyException;
using System;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace NCC.IPCChannel
{
///
/// 进程管道内通信上下文
///
///
///
/// 后续将通过 MemoryMapperFile 共享内存实现 IPC 通信:https://docs.microsoft.com/zh-cn/dotnet/api/system.io.memorymappedfiles.memorymappedfile?view=net-5.0
[SuppressSniffer]
public sealed class ChannelContext
where THandler : ChannelHandler
{
///
/// 通过懒加载创建无限容量通道
///
private static readonly Lazy> _unBoundedChannel = new(() =>
{
var channel = Channel.CreateUnbounded(new UnboundedChannelOptions
{
SingleReader = false, // 允许多个管道读写,提供管道吞吐量(无序操作)
SingleWriter = false
});
StartReader(channel);
return channel;
});
///
/// 通过懒加载创建有限容量通道
///
/// 默认容量为 1000
private static readonly Lazy> _boundedChannel = new(() =>
{
var channel = Channel.CreateBounded(new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = false, // 允许多个管道读写,提供管道吞吐量(无序操作)
SingleWriter = false
});
StartReader(channel);
return channel;
});
///
/// 无限容量通道
///
public static Channel UnBoundedChannel => _unBoundedChannel.Value;
///
/// 有限容量通道
///
public static Channel BoundedChannel => _boundedChannel.Value;
///
/// 私有构造函数
///
private ChannelContext()
{
}
///
/// 创建一个读取器
///
///
private static void StartReader(Channel channel)
{
var reader = channel.Reader;
// 创建长时间线程管道读取器
_ = Task.Factory.StartNew(async () =>
{
while (await reader.WaitToReadAsync())
{
if (!reader.TryRead(out var message)) continue;
// 并行执行(非等待)
var task = new Task(async () =>
{
// 默认重试 3 次(每次间隔 1s)
await Retry.Invoke(async () => await Activator.CreateInstance().InvokeAsync(message), 3, 1000, finalThrow: false);
});
task.Start();
}
}, TaskCreationOptions.LongRunning);
}
}
}