using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; using Aliyun.Acs.Core.Logging; using Microsoft.Extensions.DependencyInjection; using MQTTnet; using MQTTnet.Client; using MQTTnet.Client.Options; using NCC.Dependency; using NCC.Extend.Entitys; using NCC.Extend.Interfaces.MqttPublisher; using NCC.Extend.Interfaces.UavOrder; using Serilog; using SqlSugar; using Yitter.IdGenerator; using System.Net.Http; using System.Net.Http.Headers; using System.Net; using System.Linq; namespace NCC.Extend; /// /// MQTT 消息信息 /// public class MqttMessageInfo { /// /// 消息主题 /// public string Topic { get; set; } /// /// 消息内容 /// public string Payload { get; set; } /// /// 接收时间 /// public DateTime ReceivedTime { get; set; } /// /// 设备ID /// public string DeviceId { get; set; } } /// /// MQTT 发布与订阅统一服务 /// public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposable { /// /// MQTT 客户端对象(用于连接、发布、订阅等操作) /// private readonly IMqttClient _mqttClient; /// /// MQTT 连接参数配置 /// private readonly IMqttClientOptions _mqttOptions; private readonly IServiceProvider _serviceProvider; private readonly ISqlSugarClient _db; private readonly Dictionary _onlineStatusCache = new Dictionary(); private readonly object _cacheLock = new object(); private readonly Timer _healthCheckTimer; // 健康检查定时器 private readonly Timer _subscriptionMonitorTimer; // 订阅监控定时器 private readonly int _healthCheckInterval = 3 * 60 * 1000; // 3分钟检查一次 private readonly int _subscriptionMonitorInterval = 30 * 1000; // 30秒监控一次订阅状态 // 消息队列处理 private readonly ConcurrentQueue _messageQueue = new ConcurrentQueue(); private readonly SemaphoreSlim _messageProcessingSemaphore = new SemaphoreSlim(5, 5); // 最多5个并发处理 private readonly Timer _messageProcessingTimer; // 消息处理定时器 private readonly int _messageProcessingInterval = 100; // 100ms处理一次队列 private readonly int _maxQueueSize = 10000; // 最大队列大小,防止内存溢出 private volatile int _currentQueueSize = 0; // 当前队列大小 // 订阅状态监控 private DateTime _lastMessageReceivedTime = DateTime.Now; private bool _subscriptionVerified = false; private DateTime _lastRetryTime = DateTime.MinValue; // 上次重试时间 private readonly object _subscriptionLock = new object(); private const string SUBSCRIPTION_TOPIC = "device/+/response"; private const int RETRY_INTERVAL_MINUTES = 5; // 重试间隔5分钟 /// /// 构造函数:初始化客户端和配置、注册事件 /// public MqttPublisherService(IServiceProvider serviceProvider, ISqlSugarClient db) { _serviceProvider = serviceProvider; _db = db; // 创建 MQTT 客户端实例 var factory = new MqttFactory(); _mqttClient = factory.CreateMqttClient(); // 构建连接配置(MQTT 服务器地址、端口、用户名密码、客户端 ID) _mqttOptions = new MqttClientOptionsBuilder() .WithTcpServer("mqtt.cqjiangzhichao.cn", 1883) // Broker 地址 .WithCredentials("wrjservice", "P@ssw0rd") // 账号密码 .WithClientId("server_publisher") // 客户端 ID,必须唯一 .WithKeepAlivePeriod(TimeSpan.FromSeconds(60)) // 保持连接心跳 .WithCleanSession(false) // 保持会话状态 .Build(); // 连接成功事件:订阅所有设备的响应主题(如 device/xxx/response) _mqttClient.UseConnectedHandler(async e => { Log.Information("MQTT 已连接成功"); _subscriptionVerified = false; // 重置订阅验证状态 // 订阅所有设备的响应主题(+ 代表通配符) await EnsureSubscriptionAsync(); }); // 连接断开事件 _mqttClient.UseDisconnectedHandler(e => { Log.Warning($"MQTT 已断开连接,原因: {e.Reason}"); _subscriptionVerified = false; // 重置订阅验证状态 // 使用Task.Run避免阻塞MQTT客户端线程 _ = Task.Run(async () => { int retryInterval = 5; // 秒 int maxRetries = 10; // 最大重试次数 int retryCount = 0; while (!_mqttClient.IsConnected && retryCount < maxRetries) { try { retryCount++; Log.Information($"尝试第 {retryCount} 次重新连接 MQTT,{retryInterval} 秒后开始..."); await Task.Delay(TimeSpan.FromSeconds(retryInterval)); await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); Log.Information("MQTT 重连成功"); // 重连成功后重新订阅主题 await EnsureSubscriptionAsync(); break; // 重连成功后跳出循环 } catch (Exception ex) { Log.Error($"MQTT 第 {retryCount} 次重连失败: {ex.Message}"); if (retryCount >= maxRetries) { Log.Error($"MQTT 重连失败次数达到上限 ({maxRetries}),停止重连"); break; } // 重连失败后增加延迟,避免频繁重试 retryInterval = Math.Min(retryInterval * 2, 60); // 最大延迟60秒 await Task.Delay(TimeSpan.FromSeconds(retryInterval)); } } }); }); // 收到消息事件:快速入队,避免阻塞 _mqttClient.UseApplicationMessageReceivedHandler(e => { try { // 更新最后收到消息的时间和验证状态 lock (_subscriptionLock) { _lastMessageReceivedTime = DateTime.Now; _subscriptionVerified = true; // 只有收到实际消息才标记为已验证 } // 获取 topic 和 payload 内容 var topic = e.ApplicationMessage.Topic; var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? Array.Empty()); // 从 topic 中提取设备 ID var topicParts = topic.Split('/'); var deviceId = topicParts.Length >= 2 ? topicParts[1] : "unknown"; // 检查队列大小,防止内存溢出 if (_currentQueueSize >= _maxQueueSize) { Log.Warning($"消息队列已满({_maxQueueSize}),丢弃消息: Topic: {topic}, DeviceId: {deviceId}"); // 记录丢弃的消息(异步执行,不阻塞) _ = Task.Run(async () => { await LogFailedMessageAsync(new MqttMessageInfo { Topic = topic, Payload = payload, ReceivedTime = DateTime.Now, DeviceId = deviceId }, "队列已满,消息被丢弃"); }); return; } // 快速入队,避免阻塞MQTT接收 var messageInfo = new MqttMessageInfo { Topic = topic, Payload = payload, ReceivedTime = DateTime.Now, DeviceId = deviceId }; _messageQueue.Enqueue(messageInfo); Interlocked.Increment(ref _currentQueueSize); Log.Debug($"MQTT消息已入队,Topic: {topic}, DeviceId: {deviceId}, 队列长度: {_currentQueueSize}"); } catch (Exception ex) { Log.Error($"消息入队异常: {ex.Message}"); } }); // 初始化健康检查定时器 _healthCheckTimer = new Timer(HealthCheckCallback, null, _healthCheckInterval, _healthCheckInterval); Log.Information("MQTT健康检查定时器已启动,每3分钟检查一次"); // 初始化订阅监控定时器 _subscriptionMonitorTimer = new Timer(SubscriptionMonitorCallback, null, _subscriptionMonitorInterval, _subscriptionMonitorInterval); Log.Information("MQTT订阅监控定时器已启动,每30秒监控一次"); // 初始化消息处理定时器 _messageProcessingTimer = new Timer(MessageProcessingCallback, null, _messageProcessingInterval, _messageProcessingInterval); Log.Information("MQTT消息处理定时器已启动,每100ms处理一次队列"); } /// /// 健康检查定时器回调方法 /// private async void HealthCheckCallback(object state) { try { Log.Debug("执行定时MQTT健康检查..."); await PerformHealthCheckAndRepairAsync(); } catch (Exception ex) { Log.Error($"定时健康检查执行失败: {ex.Message}"); } } /// /// 订阅监控定时器回调方法 /// private async void SubscriptionMonitorCallback(object state) { try { Log.Debug("执行定时MQTT订阅监控..."); await VerifySubscriptionAsync(); } catch (Exception ex) { Log.Error($"订阅监控执行失败: {ex.Message}"); } } /// /// 消息处理定时器回调方法 /// private void MessageProcessingCallback(object state) { try { // 处理队列中的消息,最多同时处理5个 var tasks = new List(); for (int i = 0; i < 5 && _currentQueueSize > 0; i++) { if (_messageQueue.TryDequeue(out var messageInfo)) { Interlocked.Decrement(ref _currentQueueSize); tasks.Add(ProcessMessageAsync(messageInfo)); } } if (tasks.Count > 0) { // 使用Task.Run避免阻塞定时器线程 Task.Run(async () => { try { await Task.WhenAll(tasks); } catch (Exception ex) { Log.Error($"消息处理任务执行失败: {ex.Message}"); } }); } } catch (Exception ex) { Log.Error($"消息处理定时器执行失败: {ex.Message}"); } } /// /// 异步处理单个消息 /// /// 消息信息 /// private async Task ProcessMessageAsync(MqttMessageInfo messageInfo) { await _messageProcessingSemaphore.WaitAsync(); try { Log.Information($"开始处理MQTT消息,Topic: {messageInfo.Topic}, DeviceId: {messageInfo.DeviceId}"); // 验证消息格式 if (string.IsNullOrWhiteSpace(messageInfo.Payload)) { Log.Warning($"消息内容为空,跳过处理: Topic: {messageInfo.Topic}, DeviceId: {messageInfo.DeviceId}"); return; } // 反序列化 JSON 内容为 MqttContent 实体 MqttContent content = null; try { content = JsonSerializer.Deserialize(messageInfo.Payload); } catch (JsonException jsonEx) { Log.Error($"JSON反序列化失败: {jsonEx.Message}, Payload: {messageInfo.Payload}"); // 即使JSON解析失败,也要记录到日志 await LogFailedMessageAsync(messageInfo, "JSON解析失败"); return; } if (content != null) { UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity { Id = YitIdHelper.NextId().ToString(), MessageType = "response", Topic = messageInfo.Topic, Payload = messageInfo.Payload, DeviceId = messageInfo.DeviceId, Processed = 1, Status = "success", CreatedAt = messageInfo.ReceivedTime, }; if (!messageInfo.Payload.Contains("\"status\":\"received\"")) { // 判断两秒内,Payload一样的话,就不处理了 var twoSecondsAgo = DateTime.Now.AddSeconds(-2); bool lastLog = false; try { lastLog = await _db.Queryable() .Where(p => p.DeviceId == messageInfo.DeviceId && p.Payload == messageInfo.Payload && p.CreatedAt >= twoSecondsAgo) .AnyAsync(); } catch (Exception dbEx) { Log.Error($"数据库查询失败: {dbEx.Message}, DeviceId: {messageInfo.DeviceId}"); // 数据库查询失败时,为了安全起见,不处理消息 await LogFailedMessageAsync(messageInfo, "数据库查询失败"); return; } if (!lastLog) { Log.Information($"[订阅消息] 处理新消息:设备 {messageInfo.DeviceId},内容:{messageInfo.Payload}"); try { var orderService = _serviceProvider.GetRequiredService(); await Task.Run(() => orderService.HandleSubscribeMessage(messageInfo.Payload)); } catch (Exception businessEx) { Log.Error($"业务逻辑处理失败: {businessEx.Message}, DeviceId: {messageInfo.DeviceId}"); logEntity.Status = "business_error"; logEntity.Processed = 0; } } else { Log.Information($"[订阅消息] 接收到重复的消息,已忽略:设备 {messageInfo.DeviceId},内容:{messageInfo.Payload}"); logEntity.Status = "duplicate"; } // 保存消息到日志 try { await _db.Insertable(logEntity).ExecuteCommandAsync(); } catch (Exception logEx) { Log.Error($"保存消息日志失败: {logEx.Message}, DeviceId: {messageInfo.DeviceId}"); } } else { Log.Debug($"[订阅消息] 接收到确认消息,不处理:设备 {messageInfo.DeviceId},内容:{messageInfo.Payload}"); // 保存确认消息到日志 try { await _db.Insertable(logEntity).ExecuteCommandAsync(); } catch (Exception logEx) { Log.Error($"保存确认消息日志失败: {logEx.Message}, DeviceId: {messageInfo.DeviceId}"); } } } else { Log.Warning($"JSON反序列化结果为空: Payload: {messageInfo.Payload}"); await LogFailedMessageAsync(messageInfo, "JSON反序列化结果为空"); } Log.Debug($"MQTT消息处理完成,Topic: {messageInfo.Topic}, DeviceId: {messageInfo.DeviceId}"); } catch (Exception ex) { Log.Error($"处理MQTT消息异常: {ex.Message}, Topic: {messageInfo.Topic}, DeviceId: {messageInfo.DeviceId}"); await LogFailedMessageAsync(messageInfo, $"处理异常: {ex.Message}"); } finally { _messageProcessingSemaphore.Release(); } } /// /// 记录失败的消息 /// /// 消息信息 /// 失败原因 /// private async Task LogFailedMessageAsync(MqttMessageInfo messageInfo, string errorReason) { try { var logEntity = new UavMqttMessageLogEntity { Id = YitIdHelper.NextId().ToString(), MessageType = "response", Topic = messageInfo.Topic, Payload = messageInfo.Payload, DeviceId = messageInfo.DeviceId, Processed = 0, Status = "failed", CreatedAt = messageInfo.ReceivedTime, }; await _db.Insertable(logEntity).ExecuteCommandAsync(); Log.Information($"失败消息已记录: DeviceId: {messageInfo.DeviceId}, 原因: {errorReason}"); } catch (Exception ex) { Log.Error($"记录失败消息时出错: {ex.Message}, DeviceId: {messageInfo.DeviceId}"); } } /// /// 向指定 Topic 发布 MQTT 消息 /// /// 主题路径,例如 device/{id}/command /// 发送的 JSON 字符串内容 /// 是否成功发送 public async Task PublishAsync(string topic, string payload) { try { // 如果未连接则先连接 if (!_mqttClient.IsConnected) { Log.Warning($"MQTT 未连接,尝试重新连接..."); await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); // 等待连接稳定 int retryCount = 0; while (!_mqttClient.IsConnected && retryCount < 3) { await Task.Delay(1000); retryCount++; } if (!_mqttClient.IsConnected) { Log.Error("MQTT 连接失败,无法发送消息"); return false; } } // 构建 MQTT 消息 var message = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(payload) .WithAtLeastOnceQoS() // QoS 1:至少送达一次 .WithRetainFlag(false) // 不保留历史消息 .Build(); // 异步发送 await _mqttClient.PublishAsync(message); Log.Information($"MQTT 发布成功 -> Topic: {topic}, Payload: {payload}"); var topicParts = topic.Split('/'); UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity { Id = YitIdHelper.NextId().ToString(), MessageType = "send", Topic = topic, Payload = payload, DeviceId = topicParts[1], Processed = 1, Status = "success", CreatedAt = DateTime.Now, }; _db.Insertable(logEntity).ExecuteCommand(); return true; } catch (Exception ex) { Log.Error($"MQTT 发送失败:{ex.Message}"); // 记录失败的消息到数据库 try { var topicParts = topic.Split('/'); UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity { Id = YitIdHelper.NextId().ToString(), MessageType = "send", Topic = topic, Payload = payload, DeviceId = topicParts.Length > 1 ? topicParts[1] : "unknown", Processed = 0, Status = "failed", CreatedAt = DateTime.Now, }; _db.Insertable(logEntity).ExecuteCommand(); } catch (Exception logEx) { Log.Error($"记录失败消息到数据库时出错:{logEx.Message}"); } return false; } } /// /// 手动启动 MQTT 客户端(推荐在程序启动时调用) /// public async Task StartAsync() { try { Log.Information("启动MQTT客户端..."); if (!_mqttClient.IsConnected) { await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); Log.Information("MQTT客户端连接成功"); } // 确保订阅状态正常 await EnsureSubscriptionAsync(); // 注意:不立即标记为已验证,等待实际消息接收来验证订阅是否真正生效 Log.Information("MQTT客户端启动完成,等待实际消息验证订阅状态"); } catch (Exception ex) { Log.Error($"启动MQTT客户端失败: {ex.Message}"); throw; } } /// /// 确保订阅状态正常,如果订阅丢失则重新订阅 /// private async Task EnsureSubscriptionAsync() { try { if (_mqttClient.IsConnected) { // 检查订阅状态,如果订阅丢失则重新订阅 Log.Information("检查MQTT订阅状态..."); await _mqttClient.SubscribeAsync(SUBSCRIPTION_TOPIC); Log.Information("MQTT订阅状态正常: device/+/response"); // 注意:不立即标记为已验证,等待实际消息接收来验证订阅是否真正生效 } } catch (Exception ex) { Log.Warning($"检查订阅状态时出错: {ex.Message}"); // 订阅失败,尝试重新订阅 await RetrySubscriptionAsync(); } } /// /// 验证订阅是否真的生效 /// private async Task VerifySubscriptionAsync() { try { lock (_subscriptionLock) { // 检查是否长时间没有收到消息 var timeSinceLastMessage = DateTime.Now - _lastMessageReceivedTime; if (timeSinceLastMessage.TotalMinutes > 2 && _subscriptionVerified) { Log.Warning($"订阅可能失效,已 {timeSinceLastMessage.TotalMinutes:F1} 分钟未收到消息"); _subscriptionVerified = false; } } // 如果订阅未验证,且距离上次重试超过5分钟,才尝试重新订阅 if (!_subscriptionVerified) { var timeSinceLastRetry = DateTime.Now - _lastRetryTime; if (timeSinceLastRetry.TotalMinutes >= RETRY_INTERVAL_MINUTES) { Log.Warning("订阅验证失败,尝试重新订阅..."); await RetrySubscriptionAsync(); _lastRetryTime = DateTime.Now; } else { Log.Debug($"订阅未验证,但距离上次重试仅 {timeSinceLastRetry.TotalMinutes:F1} 分钟,跳过重试"); } } } catch (Exception ex) { Log.Error($"验证订阅状态失败: {ex.Message}"); } } /// /// 重试订阅 /// private async Task RetrySubscriptionAsync() { try { if (_mqttClient.IsConnected) { Log.Information("尝试重新订阅..."); // 先取消现有订阅 try { await _mqttClient.UnsubscribeAsync(SUBSCRIPTION_TOPIC); Log.Information("已取消现有订阅"); } catch (Exception ex) { Log.Warning($"取消订阅失败: {ex.Message}"); } // 重新订阅 await _mqttClient.SubscribeAsync(SUBSCRIPTION_TOPIC); Log.Information("重新订阅成功"); // 重置验证状态,等待实际消息接收来验证订阅是否真正生效 _subscriptionVerified = false; _lastMessageReceivedTime = DateTime.Now; // 注意:不立即标记为已验证,等待实际消息接收来验证 } } catch (Exception ex) { Log.Error($"重试订阅失败: {ex.Message}"); } } /// /// 查询指定clientId是否在线(通过EMQX管理API,带缓存) /// /// 要查询的客户端ID /// 在线返回true,不在线返回false public async Task IsClientOnlineAsync(string clientId) { if (string.IsNullOrEmpty(clientId)) { Log.Warning("设备ID为空,无法查询在线状态"); return false; } // 检查缓存 - 增加缓存时间到60秒,减少API调用 lock (_cacheLock) { if (_onlineStatusCache.TryGetValue(clientId, out var cacheEntry)) { var (isOnline, lastCheck) = cacheEntry; var cacheAge = DateTime.Now - lastCheck; // 如果缓存时间小于60秒,直接返回缓存结果 if (cacheAge.TotalSeconds < 60) { Log.Debug($"使用缓存结果:设备 {clientId} 在线状态 = {isOnline}"); return isOnline; } } } // 使用静态HttpClient以提高性能 using (var client = new HttpClient()) { client.Timeout = TimeSpan.FromSeconds(3); // 减少超时时间到3秒 // 使用你的EMQX管理账号密码 var byteArray = Encoding.ASCII.GetBytes("285f888395fa6906:9BAdZeir9C3IVweof9Ate8HfQpn2u6GrH8Q9ArAF9Bb26tnI"); client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(byteArray)); // 减少重试次数,提高响应速度 int maxRetries = 1; for (int retry = 0; retry <= maxRetries; retry++) { try { // 查询指定客户端 var clientUrl = $"http://mqtt.cqjiangzhichao.cn/api/v5/clients/{clientId}"; var response = await client.GetAsync(clientUrl); bool isOnline = false; if (response.StatusCode == HttpStatusCode.OK) { Log.Debug($"设备 {clientId} 在线状态查询成功"); isOnline = true; } else if (response.StatusCode == HttpStatusCode.NotFound) { Log.Debug($"设备 {clientId} 不在线"); isOnline = false; } else { Log.Warning($"查询设备 {clientId} 在线状态失败,状态码: {response.StatusCode},重试次数: {retry + 1}"); if (retry == maxRetries) { Log.Error($"查询设备 {clientId} 在线状态最终失败,状态码: {response.StatusCode}"); isOnline = false; } else { await Task.Delay(500); // 减少等待时间到500ms continue; } } // 更新缓存 lock (_cacheLock) { _onlineStatusCache[clientId] = (isOnline, DateTime.Now); } return isOnline; } catch (TaskCanceledException) { Log.Warning($"查询设备 {clientId} 在线状态超时,重试次数: {retry + 1}"); if (retry == maxRetries) { Log.Error($"查询设备 {clientId} 在线状态最终超时"); // 更新缓存为false lock (_cacheLock) { _onlineStatusCache[clientId] = (false, DateTime.Now); } return false; } await Task.Delay(500); // 减少等待时间到500ms } catch (Exception ex) { Log.Error($"查询设备 {clientId} 在线状态异常: {ex.Message},重试次数: {retry + 1}"); if (retry == maxRetries) { // 更新缓存为false lock (_cacheLock) { _onlineStatusCache[clientId] = (false, DateTime.Now); } return false; } await Task.Delay(500); // 减少等待时间到500ms } } // 更新缓存为false lock (_cacheLock) { _onlineStatusCache[clientId] = (false, DateTime.Now); } return false; } } /// /// 批量查询设备在线状态(提高效率) /// /// 要查询的客户端ID列表 /// 在线状态字典 public async Task> BatchCheckOnlineStatusAsync(List clientIds) { if (clientIds == null || clientIds.Count == 0) { return new Dictionary(); } var result = new Dictionary(); var needQueryIds = new List(); // 先检查缓存 lock (_cacheLock) { foreach (var clientId in clientIds) { if (_onlineStatusCache.TryGetValue(clientId, out var cacheEntry)) { var (isOnline, lastCheck) = cacheEntry; var cacheAge = DateTime.Now - lastCheck; // 如果缓存时间小于60秒,使用缓存结果 if (cacheAge.TotalSeconds < 60) { result[clientId] = isOnline; continue; } } needQueryIds.Add(clientId); } } // 如果没有需要查询的设备,直接返回缓存结果 if (needQueryIds.Count == 0) { return result; } // 批量查询在线状态 using (var client = new HttpClient()) { client.Timeout = TimeSpan.FromSeconds(5); var byteArray = Encoding.ASCII.GetBytes("285f888395fa6906:9BAdZeir9C3IVweof9Ate8HfQpn2u6GrH8Q9ArAF9Bb26tnI"); client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(byteArray)); // 并行查询多个设备 var tasks = needQueryIds.Select(async clientId => { try { var clientUrl = $"http://mqtt.cqjiangzhichao.cn/api/v5/clients/{clientId}"; var response = await client.GetAsync(clientUrl); bool isOnline = response.StatusCode == HttpStatusCode.OK; // 更新缓存 lock (_cacheLock) { _onlineStatusCache[clientId] = (isOnline, DateTime.Now); } return new { ClientId = clientId, IsOnline = isOnline }; } catch (Exception ex) { Log.Warning($"批量查询设备 {clientId} 在线状态失败: {ex.Message}"); // 更新缓存为false lock (_cacheLock) { _onlineStatusCache[clientId] = (false, DateTime.Now); } return new { ClientId = clientId, IsOnline = false }; } }); var queryResults = await Task.WhenAll(tasks); // 合并结果 foreach (var queryResult in queryResults) { result[queryResult.ClientId] = queryResult.IsOnline; } } return result; } /// /// 清理过期的缓存数据(建议定期调用) /// public void CleanupExpiredCache() { lock (_cacheLock) { var now = DateTime.Now; var expiredKeys = new List(); foreach (var kvp in _onlineStatusCache) { var (isOnline, lastCheck) = kvp.Value; var cacheAge = now - lastCheck; // 如果缓存时间超过5分钟,标记为过期 if (cacheAge.TotalMinutes > 5) { expiredKeys.Add(kvp.Key); } } // 删除过期的缓存 foreach (var key in expiredKeys) { _onlineStatusCache.Remove(key); } if (expiredKeys.Count > 0) { Log.Information($"清理了 {expiredKeys.Count} 个过期的在线状态缓存"); } } } #region 私有回调逻辑 #endregion /// /// 检查MQTT连接健康状态 /// /// 连接状态信息 public async Task GetConnectionHealthAsync() { try { if (_mqttClient.IsConnected) { try { await _mqttClient.PingAsync(CancellationToken.None); Log.Debug("MQTT Ping 成功"); return new { IsConnected = true, ConnectionTime = DateTime.Now, LastPing = DateTime.Now, ClientId = _mqttOptions.ClientId, BrokerAddress = "mqtt.cqjiangzhichao.cn:1883" }; } catch (Exception pingEx) { Log.Warning($"MQTT Ping 失败: {pingEx.Message}"); return new { IsConnected = false, ConnectionTime = (DateTime?)null, LastPing = DateTime.Now, ClientId = _mqttOptions.ClientId, BrokerAddress = "mqtt.cqjiangzhichao.cn:1883", PingError = pingEx.Message }; } } else { return new { IsConnected = false, ConnectionTime = (DateTime?)null, LastPing = DateTime.Now, ClientId = _mqttOptions.ClientId, BrokerAddress = "mqtt.cqjiangzhichao.cn:1883" }; } } catch (Exception ex) { Log.Error($"获取连接健康状态失败: {ex.Message}"); return new { IsConnected = false, Error = ex.Message, Timestamp = DateTime.Now }; } } /// /// 强制重新连接MQTT /// /// 重连是否成功 public async Task ForceReconnectAsync() { try { Log.Information("开始强制重连MQTT..."); if (_mqttClient.IsConnected) { await _mqttClient.DisconnectAsync(); Log.Information("MQTT 主动断开连接"); } // 重置订阅状态 lock (_subscriptionLock) { _subscriptionVerified = false; _lastMessageReceivedTime = DateTime.Now; } await Task.Delay(2000); // 等待2秒后重连 await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); Log.Information("MQTT 强制重连成功"); // 重连后重新订阅并验证 await EnsureSubscriptionAsync(); // 注意:不立即标记为已验证,等待实际消息接收来验证订阅是否真正生效 Log.Information("MQTT 强制重连完成,等待实际消息验证订阅状态"); return true; } catch (Exception ex) { Log.Error($"MQTT 强制重连失败: {ex.Message}"); return false; } } /// /// 定期健康检查和自动修复(建议每3分钟调用一次) /// /// 修复是否成功 public async Task PerformHealthCheckAndRepairAsync() { try { Log.Information("开始MQTT健康检查..."); // 检查连接状态 if (!_mqttClient.IsConnected) { Log.Warning("MQTT连接已断开,尝试重新连接..."); await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); await EnsureSubscriptionAsync(); return true; } // 检查连接质量(发送Ping) try { await _mqttClient.PingAsync(CancellationToken.None); Log.Debug("MQTT Ping 成功,连接质量良好"); } catch (Exception pingEx) { Log.Warning($"MQTT Ping 失败,连接可能有问题: {pingEx.Message}"); // Ping失败,尝试重新连接 Log.Information("尝试重新建立连接..."); await _mqttClient.DisconnectAsync(); await Task.Delay(1000); await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); await EnsureSubscriptionAsync(); return true; } // 检查订阅状态和消息接收情况 lock (_subscriptionLock) { var timeSinceLastMessage = DateTime.Now - _lastMessageReceivedTime; if (timeSinceLastMessage.TotalMinutes > 10) { Log.Warning($"长时间未收到消息({timeSinceLastMessage.TotalMinutes:F1}分钟),订阅可能有问题"); } } // 验证订阅状态 await VerifySubscriptionAsync(); // 检查订阅状态 await EnsureSubscriptionAsync(); Log.Information("MQTT健康检查完成,状态正常"); return true; } catch (Exception ex) { Log.Error($"MQTT健康检查失败: {ex.Message}"); return false; } } /// /// 获取当前订阅的主题列表 /// /// 订阅的主题列表 public Task> GetSubscribedTopicsAsync() { try { // 注意:MQTTnet可能不直接提供获取订阅列表的方法 // 这里返回我们已知的订阅主题 return Task.FromResult(new List { SUBSCRIPTION_TOPIC }); } catch (Exception ex) { Log.Error($"获取订阅主题列表失败: {ex.Message}"); return Task.FromResult(new List()); } } /// /// 获取订阅状态信息 /// /// 订阅状态信息 public dynamic GetSubscriptionStatus() { try { lock (_subscriptionLock) { var timeSinceLastMessage = DateTime.Now - _lastMessageReceivedTime; return new { IsConnected = _mqttClient.IsConnected, SubscriptionVerified = _subscriptionVerified, LastMessageReceived = _lastMessageReceivedTime, TimeSinceLastMessage = $"{timeSinceLastMessage.TotalMinutes:F1} 分钟", SubscriptionTopic = SUBSCRIPTION_TOPIC, IsHealthy = timeSinceLastMessage.TotalMinutes < 10, QueueSize = _currentQueueSize, MaxQueueSize = _maxQueueSize }; } } catch (Exception ex) { Log.Error($"获取订阅状态失败: {ex.Message}"); return new { Error = ex.Message }; } } /// /// 检查连接稳定性 /// /// 连接稳定性信息 public async Task CheckConnectionStabilityAsync() { try { // 如果连接正常,进行Ping测试 if (_mqttClient.IsConnected) { try { var pingStartTime = DateTime.Now; await _mqttClient.PingAsync(CancellationToken.None); var pingDuration = DateTime.Now - pingStartTime; Log.Information($"连接稳定性检查通过,Ping耗时: {pingDuration.TotalMilliseconds:F2}ms"); return new { IsConnected = true, PingSuccess = true, PingDuration = pingDuration.TotalMilliseconds, ConnectionTime = DateTime.Now, LastMessageReceived = _lastMessageReceivedTime, SubscriptionVerified = _subscriptionVerified, QueueSize = _currentQueueSize, MaxQueueSize = _maxQueueSize, HealthCheckInterval = _healthCheckInterval, SubscriptionMonitorInterval = _subscriptionMonitorInterval, MessageProcessingInterval = _messageProcessingInterval }; } catch (Exception pingEx) { Log.Warning($"连接稳定性检查失败,Ping失败: {pingEx.Message}"); return new { IsConnected = false, PingSuccess = false, PingError = pingEx.Message, ConnectionTime = DateTime.Now, LastMessageReceived = _lastMessageReceivedTime, SubscriptionVerified = _subscriptionVerified, QueueSize = _currentQueueSize, MaxQueueSize = _maxQueueSize, HealthCheckInterval = _healthCheckInterval, SubscriptionMonitorInterval = _subscriptionMonitorInterval, MessageProcessingInterval = _messageProcessingInterval }; } } else { Log.Warning("连接稳定性检查失败,MQTT客户端未连接"); return new { IsConnected = false, ConnectionTime = DateTime.Now, LastMessageReceived = _lastMessageReceivedTime, SubscriptionVerified = _subscriptionVerified, QueueSize = _currentQueueSize, MaxQueueSize = _maxQueueSize, HealthCheckInterval = _healthCheckInterval, SubscriptionMonitorInterval = _subscriptionMonitorInterval, MessageProcessingInterval = _messageProcessingInterval }; } } catch (Exception ex) { Log.Error($"检查连接稳定性时发生异常: {ex.Message}"); return new { IsConnected = false, Error = ex.Message, ConnectionTime = DateTime.Now }; } } /// /// 测试数据格式兼容性 /// /// 测试数据 /// 测试结果 public dynamic TestDataFormatCompatibility(string testPayload) { try { Log.Information($"开始测试数据格式兼容性: {testPayload}"); // 测试JSON反序列化 MqttContent content = null; try { content = JsonSerializer.Deserialize(testPayload); Log.Information("JSON反序列化成功"); } catch (JsonException jsonEx) { Log.Error($"JSON反序列化失败: {jsonEx.Message}"); return new { Success = false, Error = "JSON反序列化失败", Details = jsonEx.Message, TestPayload = testPayload }; } if (content == null) { return new { Success = false, Error = "JSON反序列化结果为空", TestPayload = testPayload }; } // 检查关键字段 var fieldChecks = new Dictionary { ["id"] = content.id, ["lane"] = content.lane, ["rfid_1"] = content.rfid_1, ["rfid_2"] = content.rfid_2, ["batteryCapacity"] = content.batteryCapacity, ["isOpenSuccess"] = content.isOpenSuccess, ["isUav"] = content.isUav, ["uavcode"] = content.uavcode }; Log.Information($"数据格式兼容性测试成功,字段值: {string.Join(", ", fieldChecks.Select(kv => $"{kv.Key}={kv.Value}"))}"); return new { Success = true, Message = "数据格式兼容性测试通过", DeserializedContent = content, FieldChecks = fieldChecks, TestPayload = testPayload }; } catch (Exception ex) { Log.Error($"数据格式兼容性测试失败: {ex.Message}"); return new { Success = false, Error = "测试过程中发生异常", Details = ex.Message, TestPayload = testPayload }; } } /// /// 释放资源 /// public void Dispose() { try { _healthCheckTimer?.Dispose(); _subscriptionMonitorTimer?.Dispose(); _messageProcessingTimer?.Dispose(); _messageProcessingSemaphore?.Dispose(); Log.Information("MQTT定时器和信号量已释放"); } catch (Exception ex) { Log.Error($"释放MQTT资源时出错: {ex.Message}"); } } }