using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Aliyun.Acs.Core.Logging;
using Microsoft.Extensions.DependencyInjection;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using NCC.Dependency;
using NCC.Extend.Entitys;
using NCC.Extend.Interfaces.MqttPublisher;
using NCC.Extend.Interfaces.UavOrder;
using Serilog;
using SqlSugar;
using Yitter.IdGenerator;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Net;
using System.Linq;
namespace NCC.Extend;
///
/// MQTT 消息信息
///
public class MqttMessageInfo
{
///
/// 消息主题
///
public string Topic { get; set; }
///
/// 消息内容
///
public string Payload { get; set; }
///
/// 接收时间
///
public DateTime ReceivedTime { get; set; }
///
/// 设备ID
///
public string DeviceId { get; set; }
}
///
/// MQTT 发布与订阅统一服务
///
public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposable
{
///
/// MQTT 客户端对象(用于连接、发布、订阅等操作)
///
private readonly IMqttClient _mqttClient;
///
/// MQTT 连接参数配置
///
private readonly IMqttClientOptions _mqttOptions;
private readonly IServiceProvider _serviceProvider;
private readonly ISqlSugarClient _db;
private readonly Dictionary _onlineStatusCache = new Dictionary();
private readonly object _cacheLock = new object();
private readonly Timer _healthCheckTimer; // 健康检查定时器
private readonly Timer _subscriptionMonitorTimer; // 订阅监控定时器
private readonly int _healthCheckInterval = 3 * 60 * 1000; // 3分钟检查一次
private readonly int _subscriptionMonitorInterval = 30 * 1000; // 30秒监控一次订阅状态
// 消息队列处理
private readonly ConcurrentQueue _messageQueue = new ConcurrentQueue();
private readonly SemaphoreSlim _messageProcessingSemaphore = new SemaphoreSlim(5, 5); // 最多5个并发处理
private readonly Timer _messageProcessingTimer; // 消息处理定时器
private readonly int _messageProcessingInterval = 100; // 100ms处理一次队列
private readonly int _maxQueueSize = 10000; // 最大队列大小,防止内存溢出
private volatile int _currentQueueSize = 0; // 当前队列大小
// 订阅状态监控
private DateTime _lastMessageReceivedTime = DateTime.Now;
private bool _subscriptionVerified = false;
private DateTime _lastRetryTime = DateTime.MinValue; // 上次重试时间
private readonly object _subscriptionLock = new object();
private const string SUBSCRIPTION_TOPIC = "device/+/response";
private const int RETRY_INTERVAL_MINUTES = 5; // 重试间隔5分钟
///
/// 获取客户端ID,区分生产环境和开发环境
///
/// 客户端ID
private string GetClientId()
{
// 手动设置:true为生产环境,false为开发环境
bool isProduction = false; // 这里可以手动调整
if (isProduction)
{
// 生产环境使用固定ID
return "server_publisher";
}
else
{
// 开发环境使用带机器名的ID,避免冲突
var machineName = Environment.MachineName;
return $"dev_publisher_{machineName}";
}
}
///
/// 构造函数:初始化客户端和配置、注册事件
///
public MqttPublisherService(IServiceProvider serviceProvider, ISqlSugarClient db)
{
_serviceProvider = serviceProvider;
_db = db;
// 创建 MQTT 客户端实例
var factory = new MqttFactory();
_mqttClient = factory.CreateMqttClient();
// 构建连接配置(MQTT 服务器地址、端口、用户名密码、客户端 ID)
var clientId = GetClientId();
_mqttOptions = new MqttClientOptionsBuilder()
.WithTcpServer("mqtt.cqjiangzhichao.cn", 1883) // Broker 地址
.WithCredentials("wrjservice", "P@ssw0rd") // 账号密码
.WithClientId(clientId) // 客户端 ID,必须唯一
.WithKeepAlivePeriod(TimeSpan.FromSeconds(60)) // 保持连接心跳
.WithCleanSession(false) // 保持会话状态
.Build();
// 连接成功事件:订阅所有设备的响应主题(如 device/xxx/response)
_mqttClient.UseConnectedHandler(async e =>
{
Log.Information($"MQTT 已连接成功,客户端ID: {_mqttOptions.ClientId}");
_subscriptionVerified = false; // 重置订阅验证状态
// 订阅所有设备的响应主题(+ 代表通配符)
await EnsureSubscriptionAsync();
});
// 连接断开事件
_mqttClient.UseDisconnectedHandler(e =>
{
Log.Warning($"MQTT 已断开连接,原因: {e.Reason}");
_subscriptionVerified = false; // 重置订阅验证状态
// 使用Task.Run避免阻塞MQTT客户端线程
_ = Task.Run(async () =>
{
int retryInterval = 5; // 秒
int maxRetries = 10; // 最大重试次数
int retryCount = 0;
while (!_mqttClient.IsConnected && retryCount < maxRetries)
{
try
{
retryCount++;
Log.Information($"尝试第 {retryCount} 次重新连接 MQTT,{retryInterval} 秒后开始...");
await Task.Delay(TimeSpan.FromSeconds(retryInterval));
await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
Log.Information("MQTT 重连成功");
// 重连成功后重新订阅主题
await EnsureSubscriptionAsync();
break; // 重连成功后跳出循环
}
catch (Exception ex)
{
Log.Error($"MQTT 第 {retryCount} 次重连失败: {ex.Message}");
if (retryCount >= maxRetries)
{
Log.Error($"MQTT 重连失败次数达到上限 ({maxRetries}),停止重连");
break;
}
// 重连失败后增加延迟,避免频繁重试
retryInterval = Math.Min(retryInterval * 2, 60); // 最大延迟60秒
await Task.Delay(TimeSpan.FromSeconds(retryInterval));
}
}
});
});
// 收到消息事件:快速入队,避免阻塞
_mqttClient.UseApplicationMessageReceivedHandler(e =>
{
try
{
// 更新最后收到消息的时间和验证状态
lock (_subscriptionLock)
{
_lastMessageReceivedTime = DateTime.Now;
_subscriptionVerified = true; // 只有收到实际消息才标记为已验证
}
// 获取 topic 和 payload 内容
var topic = e.ApplicationMessage.Topic;
var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? Array.Empty());
// 从 topic 中提取设备 ID
var topicParts = topic.Split('/');
var deviceId = topicParts.Length >= 2 ? topicParts[1] : "unknown";
// 检查队列大小,防止内存溢出
if (_currentQueueSize >= _maxQueueSize)
{
Log.Warning($"消息队列已满({_maxQueueSize}),丢弃消息: Topic: {topic}, DeviceId: {deviceId}");
// 记录丢弃的消息(异步执行,不阻塞)
_ = Task.Run(async () =>
{
await LogFailedMessageAsync(new MqttMessageInfo
{
Topic = topic,
Payload = payload,
ReceivedTime = DateTime.Now,
DeviceId = deviceId
}, "队列已满,消息被丢弃");
});
return;
}
// 快速入队,避免阻塞MQTT接收
var messageInfo = new MqttMessageInfo
{
Topic = topic,
Payload = payload,
ReceivedTime = DateTime.Now,
DeviceId = deviceId
};
_messageQueue.Enqueue(messageInfo);
Interlocked.Increment(ref _currentQueueSize);
Log.Debug($"MQTT消息已入队,Topic: {topic}, DeviceId: {deviceId}, 队列长度: {_currentQueueSize}");
}
catch (Exception ex)
{
Log.Error($"消息入队异常: {ex.Message}");
}
});
// 初始化健康检查定时器
_healthCheckTimer = new Timer(HealthCheckCallback, null, _healthCheckInterval, _healthCheckInterval);
Log.Information("MQTT健康检查定时器已启动,每3分钟检查一次");
// 初始化订阅监控定时器
_subscriptionMonitorTimer = new Timer(SubscriptionMonitorCallback, null, _subscriptionMonitorInterval, _subscriptionMonitorInterval);
Log.Information("MQTT订阅监控定时器已启动,每30秒监控一次");
// 初始化消息处理定时器
_messageProcessingTimer = new Timer(MessageProcessingCallback, null, _messageProcessingInterval, _messageProcessingInterval);
Log.Information("MQTT消息处理定时器已启动,每100ms处理一次队列");
}
///
/// 健康检查定时器回调方法
///
private async void HealthCheckCallback(object state)
{
try
{
Log.Debug("执行定时MQTT健康检查...");
await PerformHealthCheckAndRepairAsync();
}
catch (Exception ex)
{
Log.Error($"定时健康检查执行失败: {ex.Message}");
}
}
///
/// 订阅监控定时器回调方法
///
private async void SubscriptionMonitorCallback(object state)
{
try
{
Log.Debug("执行定时MQTT订阅监控...");
await VerifySubscriptionAsync();
}
catch (Exception ex)
{
Log.Error($"订阅监控执行失败: {ex.Message}");
}
}
///
/// 消息处理定时器回调方法
///
private void MessageProcessingCallback(object state)
{
try
{
// 处理队列中的消息,最多同时处理5个
var tasks = new List();
for (int i = 0; i < 5 && _currentQueueSize > 0; i++)
{
if (_messageQueue.TryDequeue(out var messageInfo))
{
Interlocked.Decrement(ref _currentQueueSize);
tasks.Add(ProcessMessageAsync(messageInfo));
}
}
if (tasks.Count > 0)
{
// 使用Task.Run避免阻塞定时器线程
Task.Run(async () =>
{
try
{
await Task.WhenAll(tasks);
}
catch (Exception ex)
{
Log.Error($"消息处理任务执行失败: {ex.Message}");
}
});
}
}
catch (Exception ex)
{
Log.Error($"消息处理定时器执行失败: {ex.Message}");
}
}
///
/// 异步处理单个消息
///
/// 消息信息
///
private async Task ProcessMessageAsync(MqttMessageInfo messageInfo)
{
await _messageProcessingSemaphore.WaitAsync();
try
{
Log.Information($"开始处理MQTT消息,Topic: {messageInfo.Topic}, DeviceId: {messageInfo.DeviceId}");
// 验证消息格式
if (string.IsNullOrWhiteSpace(messageInfo.Payload))
{
Log.Warning($"消息内容为空,跳过处理: Topic: {messageInfo.Topic}, DeviceId: {messageInfo.DeviceId}");
return;
}
// 反序列化 JSON 内容为 MqttContent 实体
MqttContent content = null;
try
{
content = JsonSerializer.Deserialize(messageInfo.Payload);
}
catch (JsonException jsonEx)
{
Log.Error($"JSON反序列化失败: {jsonEx.Message}, Payload: {messageInfo.Payload}");
// 即使JSON解析失败,也要记录到日志
await LogFailedMessageAsync(messageInfo, "JSON解析失败");
return;
}
if (content != null)
{
UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity
{
Id = YitIdHelper.NextId().ToString(),
MessageType = "response",
Topic = messageInfo.Topic,
Payload = messageInfo.Payload,
DeviceId = messageInfo.DeviceId,
Processed = 1,
Status = "success",
CreatedAt = messageInfo.ReceivedTime,
};
if (!messageInfo.Payload.Contains("\"status\":\"received\""))
{
// 判断两秒内,Payload一样的话,就不处理了
var twoSecondsAgo = DateTime.Now.AddSeconds(-2);
bool lastLog = false;
try
{
lastLog = await _db.Queryable()
.Where(p => p.DeviceId == messageInfo.DeviceId &&
p.Payload == messageInfo.Payload &&
p.CreatedAt >= twoSecondsAgo)
.AnyAsync();
}
catch (Exception dbEx)
{
Log.Error($"数据库查询失败: {dbEx.Message}, DeviceId: {messageInfo.DeviceId}");
// 数据库查询失败时,为了安全起见,不处理消息
await LogFailedMessageAsync(messageInfo, "数据库查询失败");
return;
}
if (!lastLog)
{
Log.Information($"[订阅消息] 处理新消息:设备 {messageInfo.DeviceId},内容:{messageInfo.Payload}");
try
{
var orderService = _serviceProvider.GetRequiredService();
await Task.Run(() => orderService.HandleSubscribeMessage(messageInfo.Payload));
}
catch (Exception businessEx)
{
Log.Error($"业务逻辑处理失败: {businessEx.Message}, DeviceId: {messageInfo.DeviceId}");
logEntity.Status = "business_error";
logEntity.Processed = 0;
}
}
else
{
Log.Information($"[订阅消息] 接收到重复的消息,已忽略:设备 {messageInfo.DeviceId},内容:{messageInfo.Payload}");
logEntity.Status = "duplicate";
}
// 保存消息到日志
try
{
await _db.Insertable(logEntity).ExecuteCommandAsync();
}
catch (Exception logEx)
{
Log.Error($"保存消息日志失败: {logEx.Message}, DeviceId: {messageInfo.DeviceId}");
}
}
else
{
Log.Debug($"[订阅消息] 接收到确认消息,不处理:设备 {messageInfo.DeviceId},内容:{messageInfo.Payload}");
// 保存确认消息到日志
try
{
await _db.Insertable(logEntity).ExecuteCommandAsync();
}
catch (Exception logEx)
{
Log.Error($"保存确认消息日志失败: {logEx.Message}, DeviceId: {messageInfo.DeviceId}");
}
}
}
else
{
Log.Warning($"JSON反序列化结果为空: Payload: {messageInfo.Payload}");
await LogFailedMessageAsync(messageInfo, "JSON反序列化结果为空");
}
Log.Debug($"MQTT消息处理完成,Topic: {messageInfo.Topic}, DeviceId: {messageInfo.DeviceId}");
}
catch (Exception ex)
{
Log.Error($"处理MQTT消息异常: {ex.Message}, Topic: {messageInfo.Topic}, DeviceId: {messageInfo.DeviceId}");
await LogFailedMessageAsync(messageInfo, $"处理异常: {ex.Message}");
}
finally
{
_messageProcessingSemaphore.Release();
}
}
///
/// 记录失败的消息
///
/// 消息信息
/// 失败原因
///
private async Task LogFailedMessageAsync(MqttMessageInfo messageInfo, string errorReason)
{
try
{
var logEntity = new UavMqttMessageLogEntity
{
Id = YitIdHelper.NextId().ToString(),
MessageType = "response",
Topic = messageInfo.Topic,
Payload = messageInfo.Payload,
DeviceId = messageInfo.DeviceId,
Processed = 0,
Status = "failed",
CreatedAt = messageInfo.ReceivedTime,
};
await _db.Insertable(logEntity).ExecuteCommandAsync();
Log.Information($"失败消息已记录: DeviceId: {messageInfo.DeviceId}, 原因: {errorReason}");
}
catch (Exception ex)
{
Log.Error($"记录失败消息时出错: {ex.Message}, DeviceId: {messageInfo.DeviceId}");
}
}
///
/// 向指定 Topic 发布 MQTT 消息
///
/// 主题路径,例如 device/{id}/command
/// 发送的 JSON 字符串内容
/// 是否成功发送
public async Task PublishAsync(string topic, string payload)
{
try
{
// 如果未连接则先连接
if (!_mqttClient.IsConnected)
{
Log.Warning($"MQTT 未连接,尝试重新连接...");
await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
// 等待连接稳定
int retryCount = 0;
while (!_mqttClient.IsConnected && retryCount < 3)
{
await Task.Delay(1000);
retryCount++;
}
if (!_mqttClient.IsConnected)
{
Log.Error("MQTT 连接失败,无法发送消息");
return false;
}
}
// 构建 MQTT 消息
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithAtLeastOnceQoS() // QoS 1:至少送达一次
.WithRetainFlag(false) // 不保留历史消息
.Build();
// 异步发送
await _mqttClient.PublishAsync(message);
Log.Information($"MQTT 发布成功 -> Topic: {topic}, Payload: {payload}");
var topicParts = topic.Split('/');
UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity
{
Id = YitIdHelper.NextId().ToString(),
MessageType = "send",
Topic = topic,
Payload = payload,
DeviceId = topicParts[1],
Processed = 1,
Status = "success",
CreatedAt = DateTime.Now,
};
_db.Insertable(logEntity).ExecuteCommand();
return true;
}
catch (Exception ex)
{
Log.Error($"MQTT 发送失败:{ex.Message}");
// 记录失败的消息到数据库
try
{
var topicParts = topic.Split('/');
UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity
{
Id = YitIdHelper.NextId().ToString(),
MessageType = "send",
Topic = topic,
Payload = payload,
DeviceId = topicParts.Length > 1 ? topicParts[1] : "unknown",
Processed = 0,
Status = "failed",
CreatedAt = DateTime.Now,
};
_db.Insertable(logEntity).ExecuteCommand();
}
catch (Exception logEx)
{
Log.Error($"记录失败消息到数据库时出错:{logEx.Message}");
}
return false;
}
}
///
/// 手动启动 MQTT 客户端(推荐在程序启动时调用)
///
public async Task StartAsync()
{
try
{
Log.Information("启动MQTT客户端...");
if (!_mqttClient.IsConnected)
{
await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
Log.Information("MQTT客户端连接成功");
}
// 确保订阅状态正常
await EnsureSubscriptionAsync();
// 注意:不立即标记为已验证,等待实际消息接收来验证订阅是否真正生效
Log.Information("MQTT客户端启动完成,等待实际消息验证订阅状态");
}
catch (Exception ex)
{
Log.Error($"启动MQTT客户端失败: {ex.Message}");
throw;
}
}
///
/// 确保订阅状态正常,如果订阅丢失则重新订阅
///
private async Task EnsureSubscriptionAsync()
{
try
{
if (_mqttClient.IsConnected)
{
// 检查订阅状态,如果订阅丢失则重新订阅
Log.Information("检查MQTT订阅状态...");
await _mqttClient.SubscribeAsync(SUBSCRIPTION_TOPIC);
Log.Information("MQTT订阅状态正常: device/+/response");
// 注意:不立即标记为已验证,等待实际消息接收来验证订阅是否真正生效
}
}
catch (Exception ex)
{
Log.Warning($"检查订阅状态时出错: {ex.Message}");
// 订阅失败,尝试重新订阅
await RetrySubscriptionAsync();
}
}
///
/// 验证订阅是否真的生效
///
private async Task VerifySubscriptionAsync()
{
try
{
lock (_subscriptionLock)
{
// 检查是否长时间没有收到消息
var timeSinceLastMessage = DateTime.Now - _lastMessageReceivedTime;
if (timeSinceLastMessage.TotalMinutes > 2 && _subscriptionVerified)
{
Log.Warning($"订阅可能失效,已 {timeSinceLastMessage.TotalMinutes:F1} 分钟未收到消息");
_subscriptionVerified = false;
}
}
// 如果订阅未验证,且距离上次重试超过5分钟,才尝试重新订阅
if (!_subscriptionVerified)
{
var timeSinceLastRetry = DateTime.Now - _lastRetryTime;
if (timeSinceLastRetry.TotalMinutes >= RETRY_INTERVAL_MINUTES)
{
Log.Warning("订阅验证失败,尝试重新订阅...");
await RetrySubscriptionAsync();
_lastRetryTime = DateTime.Now;
}
else
{
Log.Debug($"订阅未验证,但距离上次重试仅 {timeSinceLastRetry.TotalMinutes:F1} 分钟,跳过重试");
}
}
}
catch (Exception ex)
{
Log.Error($"验证订阅状态失败: {ex.Message}");
}
}
///
/// 重试订阅
///
private async Task RetrySubscriptionAsync()
{
try
{
if (_mqttClient.IsConnected)
{
Log.Information("尝试重新订阅...");
// 先取消现有订阅
try
{
await _mqttClient.UnsubscribeAsync(SUBSCRIPTION_TOPIC);
Log.Information("已取消现有订阅");
}
catch (Exception ex)
{
Log.Warning($"取消订阅失败: {ex.Message}");
}
// 重新订阅
await _mqttClient.SubscribeAsync(SUBSCRIPTION_TOPIC);
Log.Information("重新订阅成功");
// 重置验证状态,等待实际消息接收来验证订阅是否真正生效
_subscriptionVerified = false;
_lastMessageReceivedTime = DateTime.Now;
// 注意:不立即标记为已验证,等待实际消息接收来验证
}
}
catch (Exception ex)
{
Log.Error($"重试订阅失败: {ex.Message}");
}
}
///
/// 查询指定clientId是否在线(通过EMQX管理API,带缓存)
///
/// 要查询的客户端ID
/// 在线返回true,不在线返回false
public async Task IsClientOnlineAsync(string clientId)
{
if (string.IsNullOrEmpty(clientId))
{
Log.Warning("设备ID为空,无法查询在线状态");
return false;
}
// 检查缓存 - 增加缓存时间到60秒,减少API调用
lock (_cacheLock)
{
if (_onlineStatusCache.TryGetValue(clientId, out var cacheEntry))
{
var (isOnline, lastCheck) = cacheEntry;
var cacheAge = DateTime.Now - lastCheck;
// 如果缓存时间小于60秒,直接返回缓存结果
if (cacheAge.TotalSeconds < 60)
{
Log.Debug($"使用缓存结果:设备 {clientId} 在线状态 = {isOnline}");
return isOnline;
}
}
}
// 使用静态HttpClient以提高性能
using (var client = new HttpClient())
{
client.Timeout = TimeSpan.FromSeconds(3); // 减少超时时间到3秒
// 使用你的EMQX管理账号密码
var byteArray = Encoding.ASCII.GetBytes("285f888395fa6906:9BAdZeir9C3IVweof9Ate8HfQpn2u6GrH8Q9ArAF9Bb26tnI");
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(byteArray));
// 减少重试次数,提高响应速度
int maxRetries = 1;
for (int retry = 0; retry <= maxRetries; retry++)
{
try
{
// 查询指定客户端
var clientUrl = $"http://mqtt.cqjiangzhichao.cn/api/v5/clients/{clientId}";
var response = await client.GetAsync(clientUrl);
bool isOnline = false;
if (response.StatusCode == HttpStatusCode.OK)
{
Log.Debug($"设备 {clientId} 在线状态查询成功");
isOnline = true;
}
else if (response.StatusCode == HttpStatusCode.NotFound)
{
Log.Debug($"设备 {clientId} 不在线");
isOnline = false;
}
else
{
Log.Warning($"查询设备 {clientId} 在线状态失败,状态码: {response.StatusCode},重试次数: {retry + 1}");
if (retry == maxRetries)
{
Log.Error($"查询设备 {clientId} 在线状态最终失败,状态码: {response.StatusCode}");
isOnline = false;
}
else
{
await Task.Delay(500); // 减少等待时间到500ms
continue;
}
}
// 更新缓存
lock (_cacheLock)
{
_onlineStatusCache[clientId] = (isOnline, DateTime.Now);
}
return isOnline;
}
catch (TaskCanceledException)
{
Log.Warning($"查询设备 {clientId} 在线状态超时,重试次数: {retry + 1}");
if (retry == maxRetries)
{
Log.Error($"查询设备 {clientId} 在线状态最终超时");
// 更新缓存为false
lock (_cacheLock)
{
_onlineStatusCache[clientId] = (false, DateTime.Now);
}
return false;
}
await Task.Delay(500); // 减少等待时间到500ms
}
catch (Exception ex)
{
Log.Error($"查询设备 {clientId} 在线状态异常: {ex.Message},重试次数: {retry + 1}");
if (retry == maxRetries)
{
// 更新缓存为false
lock (_cacheLock)
{
_onlineStatusCache[clientId] = (false, DateTime.Now);
}
return false;
}
await Task.Delay(500); // 减少等待时间到500ms
}
}
// 更新缓存为false
lock (_cacheLock)
{
_onlineStatusCache[clientId] = (false, DateTime.Now);
}
return false;
}
}
///
/// 批量查询设备在线状态(提高效率)
///
/// 要查询的客户端ID列表
/// 在线状态字典
public async Task> BatchCheckOnlineStatusAsync(List clientIds)
{
if (clientIds == null || clientIds.Count == 0)
{
return new Dictionary();
}
var result = new Dictionary();
var needQueryIds = new List();
// 先检查缓存
lock (_cacheLock)
{
foreach (var clientId in clientIds)
{
if (_onlineStatusCache.TryGetValue(clientId, out var cacheEntry))
{
var (isOnline, lastCheck) = cacheEntry;
var cacheAge = DateTime.Now - lastCheck;
// 如果缓存时间小于60秒,使用缓存结果
if (cacheAge.TotalSeconds < 60)
{
result[clientId] = isOnline;
continue;
}
}
needQueryIds.Add(clientId);
}
}
// 如果没有需要查询的设备,直接返回缓存结果
if (needQueryIds.Count == 0)
{
return result;
}
// 批量查询在线状态
using (var client = new HttpClient())
{
client.Timeout = TimeSpan.FromSeconds(5);
var byteArray = Encoding.ASCII.GetBytes("285f888395fa6906:9BAdZeir9C3IVweof9Ate8HfQpn2u6GrH8Q9ArAF9Bb26tnI");
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(byteArray));
// 并行查询多个设备
var tasks = needQueryIds.Select(async clientId =>
{
try
{
var clientUrl = $"http://mqtt.cqjiangzhichao.cn/api/v5/clients/{clientId}";
var response = await client.GetAsync(clientUrl);
bool isOnline = response.StatusCode == HttpStatusCode.OK;
// 更新缓存
lock (_cacheLock)
{
_onlineStatusCache[clientId] = (isOnline, DateTime.Now);
}
return new { ClientId = clientId, IsOnline = isOnline };
}
catch (Exception ex)
{
Log.Warning($"批量查询设备 {clientId} 在线状态失败: {ex.Message}");
// 更新缓存为false
lock (_cacheLock)
{
_onlineStatusCache[clientId] = (false, DateTime.Now);
}
return new { ClientId = clientId, IsOnline = false };
}
});
var queryResults = await Task.WhenAll(tasks);
// 合并结果
foreach (var queryResult in queryResults)
{
result[queryResult.ClientId] = queryResult.IsOnline;
}
}
return result;
}
///
/// 清理过期的缓存数据(建议定期调用)
///
public void CleanupExpiredCache()
{
lock (_cacheLock)
{
var now = DateTime.Now;
var expiredKeys = new List();
foreach (var kvp in _onlineStatusCache)
{
var (isOnline, lastCheck) = kvp.Value;
var cacheAge = now - lastCheck;
// 如果缓存时间超过5分钟,标记为过期
if (cacheAge.TotalMinutes > 5)
{
expiredKeys.Add(kvp.Key);
}
}
// 删除过期的缓存
foreach (var key in expiredKeys)
{
_onlineStatusCache.Remove(key);
}
if (expiredKeys.Count > 0)
{
Log.Information($"清理了 {expiredKeys.Count} 个过期的在线状态缓存");
}
}
}
#region 私有回调逻辑
#endregion
///
/// 检查MQTT连接健康状态
///
/// 连接状态信息
public async Task GetConnectionHealthAsync()
{
try
{
if (_mqttClient.IsConnected)
{
try
{
await _mqttClient.PingAsync(CancellationToken.None);
Log.Debug("MQTT Ping 成功");
return new
{
IsConnected = true,
ConnectionTime = DateTime.Now,
LastPing = DateTime.Now,
ClientId = _mqttOptions.ClientId,
BrokerAddress = "mqtt.cqjiangzhichao.cn:1883"
};
}
catch (Exception pingEx)
{
Log.Warning($"MQTT Ping 失败: {pingEx.Message}");
return new
{
IsConnected = false,
ConnectionTime = (DateTime?)null,
LastPing = DateTime.Now,
ClientId = _mqttOptions.ClientId,
BrokerAddress = "mqtt.cqjiangzhichao.cn:1883",
PingError = pingEx.Message
};
}
}
else
{
return new
{
IsConnected = false,
ConnectionTime = (DateTime?)null,
LastPing = DateTime.Now,
ClientId = _mqttOptions.ClientId,
BrokerAddress = "mqtt.cqjiangzhichao.cn:1883"
};
}
}
catch (Exception ex)
{
Log.Error($"获取连接健康状态失败: {ex.Message}");
return new
{
IsConnected = false,
Error = ex.Message,
Timestamp = DateTime.Now
};
}
}
///
/// 强制重新连接MQTT
///
/// 重连是否成功
public async Task ForceReconnectAsync()
{
try
{
Log.Information("开始强制重连MQTT...");
if (_mqttClient.IsConnected)
{
await _mqttClient.DisconnectAsync();
Log.Information("MQTT 主动断开连接");
}
// 重置订阅状态
lock (_subscriptionLock)
{
_subscriptionVerified = false;
_lastMessageReceivedTime = DateTime.Now;
}
await Task.Delay(2000); // 等待2秒后重连
await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
Log.Information("MQTT 强制重连成功");
// 重连后重新订阅并验证
await EnsureSubscriptionAsync();
// 注意:不立即标记为已验证,等待实际消息接收来验证订阅是否真正生效
Log.Information("MQTT 强制重连完成,等待实际消息验证订阅状态");
return true;
}
catch (Exception ex)
{
Log.Error($"MQTT 强制重连失败: {ex.Message}");
return false;
}
}
///
/// 定期健康检查和自动修复(建议每3分钟调用一次)
///
/// 修复是否成功
public async Task PerformHealthCheckAndRepairAsync()
{
try
{
Log.Information("开始MQTT健康检查...");
// 检查连接状态
if (!_mqttClient.IsConnected)
{
Log.Warning("MQTT连接已断开,尝试重新连接...");
await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
await EnsureSubscriptionAsync();
return true;
}
// 检查连接质量(发送Ping)
try
{
await _mqttClient.PingAsync(CancellationToken.None);
Log.Debug("MQTT Ping 成功,连接质量良好");
}
catch (Exception pingEx)
{
Log.Warning($"MQTT Ping 失败,连接可能有问题: {pingEx.Message}");
// Ping失败,尝试重新连接
Log.Information("尝试重新建立连接...");
await _mqttClient.DisconnectAsync();
await Task.Delay(1000);
await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
await EnsureSubscriptionAsync();
return true;
}
// 检查订阅状态和消息接收情况
lock (_subscriptionLock)
{
var timeSinceLastMessage = DateTime.Now - _lastMessageReceivedTime;
if (timeSinceLastMessage.TotalMinutes > 10)
{
Log.Warning($"长时间未收到消息({timeSinceLastMessage.TotalMinutes:F1}分钟),订阅可能有问题");
}
}
// 验证订阅状态
await VerifySubscriptionAsync();
// 检查订阅状态
await EnsureSubscriptionAsync();
Log.Information("MQTT健康检查完成,状态正常");
return true;
}
catch (Exception ex)
{
Log.Error($"MQTT健康检查失败: {ex.Message}");
return false;
}
}
///
/// 获取当前订阅的主题列表
///
/// 订阅的主题列表
public Task> GetSubscribedTopicsAsync()
{
try
{
// 注意:MQTTnet可能不直接提供获取订阅列表的方法
// 这里返回我们已知的订阅主题
return Task.FromResult(new List { SUBSCRIPTION_TOPIC });
}
catch (Exception ex)
{
Log.Error($"获取订阅主题列表失败: {ex.Message}");
return Task.FromResult(new List());
}
}
///
/// 获取订阅状态信息
///
/// 订阅状态信息
public dynamic GetSubscriptionStatus()
{
try
{
lock (_subscriptionLock)
{
var timeSinceLastMessage = DateTime.Now - _lastMessageReceivedTime;
return new
{
IsConnected = _mqttClient.IsConnected,
SubscriptionVerified = _subscriptionVerified,
LastMessageReceived = _lastMessageReceivedTime,
TimeSinceLastMessage = $"{timeSinceLastMessage.TotalMinutes:F1} 分钟",
SubscriptionTopic = SUBSCRIPTION_TOPIC,
IsHealthy = timeSinceLastMessage.TotalMinutes < 10,
QueueSize = _currentQueueSize,
MaxQueueSize = _maxQueueSize
};
}
}
catch (Exception ex)
{
Log.Error($"获取订阅状态失败: {ex.Message}");
return new { Error = ex.Message };
}
}
///
/// 检查连接稳定性
///
/// 连接稳定性信息
public async Task CheckConnectionStabilityAsync()
{
try
{
// 如果连接正常,进行Ping测试
if (_mqttClient.IsConnected)
{
try
{
var pingStartTime = DateTime.Now;
await _mqttClient.PingAsync(CancellationToken.None);
var pingDuration = DateTime.Now - pingStartTime;
Log.Information($"连接稳定性检查通过,Ping耗时: {pingDuration.TotalMilliseconds:F2}ms");
return new
{
IsConnected = true,
PingSuccess = true,
PingDuration = pingDuration.TotalMilliseconds,
ConnectionTime = DateTime.Now,
LastMessageReceived = _lastMessageReceivedTime,
SubscriptionVerified = _subscriptionVerified,
QueueSize = _currentQueueSize,
MaxQueueSize = _maxQueueSize,
HealthCheckInterval = _healthCheckInterval,
SubscriptionMonitorInterval = _subscriptionMonitorInterval,
MessageProcessingInterval = _messageProcessingInterval
};
}
catch (Exception pingEx)
{
Log.Warning($"连接稳定性检查失败,Ping失败: {pingEx.Message}");
return new
{
IsConnected = false,
PingSuccess = false,
PingError = pingEx.Message,
ConnectionTime = DateTime.Now,
LastMessageReceived = _lastMessageReceivedTime,
SubscriptionVerified = _subscriptionVerified,
QueueSize = _currentQueueSize,
MaxQueueSize = _maxQueueSize,
HealthCheckInterval = _healthCheckInterval,
SubscriptionMonitorInterval = _subscriptionMonitorInterval,
MessageProcessingInterval = _messageProcessingInterval
};
}
}
else
{
Log.Warning("连接稳定性检查失败,MQTT客户端未连接");
return new
{
IsConnected = false,
ConnectionTime = DateTime.Now,
LastMessageReceived = _lastMessageReceivedTime,
SubscriptionVerified = _subscriptionVerified,
QueueSize = _currentQueueSize,
MaxQueueSize = _maxQueueSize,
HealthCheckInterval = _healthCheckInterval,
SubscriptionMonitorInterval = _subscriptionMonitorInterval,
MessageProcessingInterval = _messageProcessingInterval
};
}
}
catch (Exception ex)
{
Log.Error($"检查连接稳定性时发生异常: {ex.Message}");
return new
{
IsConnected = false,
Error = ex.Message,
ConnectionTime = DateTime.Now
};
}
}
///
/// 测试数据格式兼容性
///
/// 测试数据
/// 测试结果
public dynamic TestDataFormatCompatibility(string testPayload)
{
try
{
Log.Information($"开始测试数据格式兼容性: {testPayload}");
// 测试JSON反序列化
MqttContent content = null;
try
{
content = JsonSerializer.Deserialize(testPayload);
Log.Information("JSON反序列化成功");
}
catch (JsonException jsonEx)
{
Log.Error($"JSON反序列化失败: {jsonEx.Message}");
return new
{
Success = false,
Error = "JSON反序列化失败",
Details = jsonEx.Message,
TestPayload = testPayload
};
}
if (content == null)
{
return new
{
Success = false,
Error = "JSON反序列化结果为空",
TestPayload = testPayload
};
}
// 检查关键字段
var fieldChecks = new Dictionary
{
["id"] = content.id,
["lane"] = content.lane,
["rfid_1"] = content.rfid_1,
["rfid_2"] = content.rfid_2,
["batteryCapacity"] = content.batteryCapacity,
["isOpenSuccess"] = content.isOpenSuccess,
["isUav"] = content.isUav,
["uavcode"] = content.uavcode
};
Log.Information($"数据格式兼容性测试成功,字段值: {string.Join(", ", fieldChecks.Select(kv => $"{kv.Key}={kv.Value}"))}");
return new
{
Success = true,
Message = "数据格式兼容性测试通过",
DeserializedContent = content,
FieldChecks = fieldChecks,
TestPayload = testPayload
};
}
catch (Exception ex)
{
Log.Error($"数据格式兼容性测试失败: {ex.Message}");
return new
{
Success = false,
Error = "测试过程中发生异常",
Details = ex.Message,
TestPayload = testPayload
};
}
}
///
/// 释放资源
///
public void Dispose()
{
try
{
_healthCheckTimer?.Dispose();
_subscriptionMonitorTimer?.Dispose();
_messageProcessingTimer?.Dispose();
_messageProcessingSemaphore?.Dispose();
Log.Information("MQTT定时器和信号量已释放");
}
catch (Exception ex)
{
Log.Error($"释放MQTT资源时出错: {ex.Message}");
}
}
}