diff --git a/.DS_Store b/.DS_Store index c4f9490..de1aaf0 100644 --- a/.DS_Store +++ b/.DS_Store diff --git a/netcore/.DS_Store b/netcore/.DS_Store index 334dc2f..121bf43 100644 --- a/netcore/.DS_Store +++ b/netcore/.DS_Store diff --git a/netcore/src/.DS_Store b/netcore/src/.DS_Store index 863dc7b..24c4e10 100644 --- a/netcore/src/.DS_Store +++ b/netcore/src/.DS_Store diff --git a/netcore/src/Application/.DS_Store b/netcore/src/Application/.DS_Store index 2cbe18c..109f47a 100644 --- a/netcore/src/Application/.DS_Store +++ b/netcore/src/Application/.DS_Store diff --git a/netcore/src/Application/NCC.API/.DS_Store b/netcore/src/Application/NCC.API/.DS_Store index 21b3a62..783dc51 100644 --- a/netcore/src/Application/NCC.API/.DS_Store +++ b/netcore/src/Application/NCC.API/.DS_Store diff --git a/netcore/src/Modularity/.DS_Store b/netcore/src/Modularity/.DS_Store index 988ef62..1a33f84 100644 --- a/netcore/src/Modularity/.DS_Store +++ b/netcore/src/Modularity/.DS_Store diff --git a/netcore/src/Modularity/Extend/NCC.Extend/MqttPublisherService copy.cs b/netcore/src/Modularity/Extend/NCC.Extend/MqttPublisherService copy.cs new file mode 100644 index 0000000..32732b3 --- /dev/null +++ b/netcore/src/Modularity/Extend/NCC.Extend/MqttPublisherService copy.cs @@ -0,0 +1,593 @@ +// using System; +// using System.Collections.Generic; +// using System.Text; +// using System.Text.Json; +// using System.Threading; +// using System.Threading.Tasks; +// using Aliyun.Acs.Core.Logging; +// using Microsoft.Extensions.DependencyInjection; +// using MQTTnet; +// using MQTTnet.Client; +// using MQTTnet.Client.Options; +// using NCC.Dependency; +// using NCC.Extend.Entitys; +// using NCC.Extend.Interfaces.MqttPublisher; +// using NCC.Extend.Interfaces.UavOrder; +// using Serilog; +// using SqlSugar; +// using Yitter.IdGenerator; +// using System.Net.Http; +// using System.Net.Http.Headers; +// using System.Net; +// using System.Linq; + +// namespace NCC.Extend; + +// /// +// /// MQTT 发布与订阅统一服务 +// /// +// public class MqttPublisherService : IMqttPublisherService, ITransient +// { +// /// +// /// MQTT 客户端对象(用于连接、发布、订阅等操作) +// /// +// private readonly IMqttClient _mqttClient; + +// /// +// /// MQTT 连接参数配置 +// /// +// private readonly IMqttClientOptions _mqttOptions; +// private readonly IServiceProvider _serviceProvider; +// private readonly ISqlSugarClient _db; +// private readonly Dictionary _onlineStatusCache = new Dictionary(); +// private readonly object _cacheLock = new object(); +// /// +// /// 构造函数:初始化客户端和配置、注册事件 +// /// +// public MqttPublisherService(IServiceProvider serviceProvider, ISqlSugarClient db) +// { +// _serviceProvider = serviceProvider; +// _db = db; +// // 创建 MQTT 客户端实例 +// var factory = new MqttFactory(); +// _mqttClient = factory.CreateMqttClient(); + +// // 构建连接配置(MQTT 服务器地址、端口、用户名密码、客户端 ID) +// _mqttOptions = new MqttClientOptionsBuilder() +// .WithTcpServer("mqtt.cqjiangzhichao.cn", 1883) // Broker 地址 +// .WithCredentials("wrjservice", "P@ssw0rd") // 账号密码 +// .WithClientId("server_publisher") // 客户端 ID,必须唯一 +// .Build(); + +// // 连接成功事件:订阅所有设备的响应主题(如 device/xxx/response) +// _mqttClient.UseConnectedHandler(async e => +// { +// Log.Information("MQTT 已连接成功"); +// // 订阅所有设备的响应主题(+ 代表通配符) +// await _mqttClient.SubscribeAsync("device/+/response"); +// Log.Information("已订阅通用响应 topic: device/+/response"); +// }); + +// // 连接断开事件 +// _mqttClient.UseDisconnectedHandler(async e => +// { +// Log.Warning("MQTT 已断开连接"); +// int retryInterval = 5; // 秒 +// while (!_mqttClient.IsConnected) +// { +// try +// { +// Log.Information($"尝试在 {retryInterval} 秒后重新连接 MQTT..."); +// await Task.Delay(TimeSpan.FromSeconds(retryInterval)); +// await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); +// Log.Information("MQTT 重连成功"); +// break; // 重连成功后跳出循环 +// } +// catch (Exception ex) +// { +// Log.Error($"MQTT 重连失败: {ex.Message}"); +// // 重连失败后继续循环,但增加延迟避免频繁重试 +// await Task.Delay(TimeSpan.FromSeconds(retryInterval * 2)); +// } +// } +// }); +// // 收到消息事件:处理设备回传的消息 +// _mqttClient.UseApplicationMessageReceivedHandler(e => +// { +// try +// { +// // 获取 topic 和 payload 内容 +// var topic = e.ApplicationMessage.Topic; +// var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? Array.Empty()); +// Log.Information($"收到 MQTT 消息,Topic: {topic}, Payload: {payload}"); +// // 反序列化 JSON 内容为 MqttContent 实体 +// var content = JsonSerializer.Deserialize(payload); +// if (content != null) +// { +// // 从 topic 中提取设备 ID,例如 device/abc123/response +// var topicParts = topic.Split('/'); +// var deviceId = topicParts.Length >= 2 ? topicParts[1] : "unknown"; +// Log.Information($"{topicParts[0]}: {deviceId})"); +// UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity +// { +// Id = YitIdHelper.NextId().ToString(), +// MessageType = "response", +// Topic = topic, +// Payload = payload, +// DeviceId = deviceId, +// Processed = 1, +// Status = "success", +// CreatedAt = DateTime.Now, +// }; +// if (!payload.Contains("\"status\":\"received\"")) +// { +// //判断两秒内,Payload一样的话,就不处理了 +// var twoSecondsAgo = DateTime.Now.AddSeconds(-2); +// var lastLog = _db.Queryable().Where(p => p.DeviceId == deviceId && p.Payload == payload && p.CreatedAt >= twoSecondsAgo).Any(); +// if (lastLog == false) +// { +// Log.Information($"[订阅消息] 处理新消息:设备 {deviceId},内容:{payload}"); +// var orderService = _serviceProvider.GetRequiredService(); +// orderService.HandleSubscribeMessage(payload); +// } +// else +// { +// Log.Information($"[订阅消息] 接收到重复的消息,已忽略:设备 {deviceId},内容:{payload}"); +// } +// //只保存正常消息 +// _db.Insertable(logEntity).ExecuteCommand(); +// } +// else +// { +// Log.Debug($"[订阅消息] 接收到确认消息,不处理:设备 {deviceId},内容:{payload}"); +// // 保存确认消息到日志 +// _db.Insertable(logEntity).ExecuteCommand(); +// } +// } +// } +// catch (Exception ex) +// { +// Log.Error($"消息处理异常: {ex.Message}"); +// } +// }); + +// } + +// /// +// /// 向指定 Topic 发布 MQTT 消息 +// /// +// /// 主题路径,例如 device/{id}/command +// /// 发送的 JSON 字符串内容 +// /// 是否成功发送 +// public async Task PublishAsync(string topic, string payload) +// { +// try +// { +// // 如果未连接则先连接 +// if (!_mqttClient.IsConnected) +// { +// Log.Warning($"MQTT 未连接,尝试重新连接..."); +// await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); + +// // 等待连接稳定 +// int retryCount = 0; +// while (!_mqttClient.IsConnected && retryCount < 3) +// { +// await Task.Delay(1000); +// retryCount++; +// } + +// if (!_mqttClient.IsConnected) +// { +// Log.Error("MQTT 连接失败,无法发送消息"); +// return false; +// } +// } + +// // 构建 MQTT 消息 +// var message = new MqttApplicationMessageBuilder() +// .WithTopic(topic) +// .WithPayload(payload) +// .WithAtLeastOnceQoS() // QoS 1:至少送达一次 +// .WithRetainFlag(false) // 不保留历史消息 +// .Build(); + +// // 异步发送 +// await _mqttClient.PublishAsync(message); +// Log.Information($"MQTT 发布成功 -> Topic: {topic}, Payload: {payload}"); +// var topicParts = topic.Split('/'); +// UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity +// { +// Id = YitIdHelper.NextId().ToString(), +// MessageType = "send", +// Topic = topic, +// Payload = payload, +// DeviceId = topicParts[1], +// Processed = 1, +// Status = "success", +// CreatedAt = DateTime.Now, +// }; +// _db.Insertable(logEntity).ExecuteCommand(); +// return true; +// } +// catch (Exception ex) +// { +// Log.Error($"MQTT 发送失败:{ex.Message}"); + +// // 记录失败的消息到数据库 +// try +// { +// var topicParts = topic.Split('/'); +// UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity +// { +// Id = YitIdHelper.NextId().ToString(), +// MessageType = "send", +// Topic = topic, +// Payload = payload, +// DeviceId = topicParts.Length > 1 ? topicParts[1] : "unknown", +// Processed = 0, +// Status = "failed", +// CreatedAt = DateTime.Now, +// }; +// _db.Insertable(logEntity).ExecuteCommand(); +// } +// catch (Exception logEx) +// { +// Log.Error($"记录失败消息到数据库时出错:{logEx.Message}"); +// } + +// return false; +// } +// } + +// /// +// /// 手动启动 MQTT 客户端(推荐在程序启动时调用) +// /// +// public async Task StartAsync() +// { +// if (!_mqttClient.IsConnected) +// { +// await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); +// } +// } + +// /// +// /// 查询指定clientId是否在线(通过EMQX管理API,带缓存) +// /// +// /// 要查询的客户端ID +// /// 在线返回true,不在线返回false +// public async Task IsClientOnlineAsync(string clientId) +// { +// if (string.IsNullOrEmpty(clientId)) +// { +// Log.Warning("设备ID为空,无法查询在线状态"); +// return false; +// } + +// // 检查缓存 - 增加缓存时间到60秒,减少API调用 +// lock (_cacheLock) +// { +// if (_onlineStatusCache.TryGetValue(clientId, out var cacheEntry)) +// { +// var (isOnline, lastCheck) = cacheEntry; +// var cacheAge = DateTime.Now - lastCheck; + +// // 如果缓存时间小于60秒,直接返回缓存结果 +// if (cacheAge.TotalSeconds < 60) +// { +// Log.Debug($"使用缓存结果:设备 {clientId} 在线状态 = {isOnline}"); +// return isOnline; +// } +// } +// } + +// // 使用静态HttpClient以提高性能 +// using (var client = new HttpClient()) +// { +// client.Timeout = TimeSpan.FromSeconds(3); // 减少超时时间到3秒 + +// // 使用你的EMQX管理账号密码 +// var byteArray = Encoding.ASCII.GetBytes("285f888395fa6906:9BAdZeir9C3IVweof9Ate8HfQpn2u6GrH8Q9ArAF9Bb26tnI"); +// client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(byteArray)); + +// // 减少重试次数,提高响应速度 +// int maxRetries = 1; +// for (int retry = 0; retry <= maxRetries; retry++) +// { +// try +// { +// // 查询指定客户端 +// var clientUrl = $"http://mqtt.cqjiangzhichao.cn/api/v5/clients/{clientId}"; +// var response = await client.GetAsync(clientUrl); +// bool isOnline = false; + +// if (response.StatusCode == HttpStatusCode.OK) +// { +// Log.Debug($"设备 {clientId} 在线状态查询成功"); +// isOnline = true; +// } +// else if (response.StatusCode == HttpStatusCode.NotFound) +// { +// Log.Debug($"设备 {clientId} 不在线"); +// isOnline = false; +// } +// else +// { +// Log.Warning($"查询设备 {clientId} 在线状态失败,状态码: {response.StatusCode},重试次数: {retry + 1}"); +// if (retry == maxRetries) +// { +// Log.Error($"查询设备 {clientId} 在线状态最终失败,状态码: {response.StatusCode}"); +// isOnline = false; +// } +// else +// { +// await Task.Delay(500); // 减少等待时间到500ms +// continue; +// } +// } + +// // 更新缓存 +// lock (_cacheLock) +// { +// _onlineStatusCache[clientId] = (isOnline, DateTime.Now); +// } +// return isOnline; +// } +// catch (TaskCanceledException ex) +// { +// Log.Warning($"查询设备 {clientId} 在线状态超时,重试次数: {retry + 1}"); +// if (retry == maxRetries) +// { +// Log.Error($"查询设备 {clientId} 在线状态最终超时"); +// // 更新缓存为false +// lock (_cacheLock) +// { +// _onlineStatusCache[clientId] = (false, DateTime.Now); +// } +// return false; +// } +// await Task.Delay(500); // 减少等待时间到500ms +// } +// catch (Exception ex) +// { +// Log.Error($"查询设备 {clientId} 在线状态异常: {ex.Message},重试次数: {retry + 1}"); +// if (retry == maxRetries) +// { +// // 更新缓存为false +// lock (_cacheLock) +// { +// _onlineStatusCache[clientId] = (false, DateTime.Now); +// } +// return false; +// } +// await Task.Delay(500); // 减少等待时间到500ms +// } +// } + +// // 更新缓存为false +// lock (_cacheLock) +// { +// _onlineStatusCache[clientId] = (false, DateTime.Now); +// } +// return false; +// } +// } + +// /// +// /// 批量查询设备在线状态(提高效率) +// /// +// /// 要查询的客户端ID列表 +// /// 在线状态字典 +// public async Task> BatchCheckOnlineStatusAsync(List clientIds) +// { +// if (clientIds == null || clientIds.Count == 0) +// { +// return new Dictionary(); +// } + +// var result = new Dictionary(); +// var needQueryIds = new List(); + +// // 先检查缓存 +// lock (_cacheLock) +// { +// foreach (var clientId in clientIds) +// { +// if (_onlineStatusCache.TryGetValue(clientId, out var cacheEntry)) +// { +// var (isOnline, lastCheck) = cacheEntry; +// var cacheAge = DateTime.Now - lastCheck; + +// // 如果缓存时间小于60秒,使用缓存结果 +// if (cacheAge.TotalSeconds < 60) +// { +// result[clientId] = isOnline; +// continue; +// } +// } +// needQueryIds.Add(clientId); +// } +// } + +// // 如果没有需要查询的设备,直接返回缓存结果 +// if (needQueryIds.Count == 0) +// { +// return result; +// } + +// // 批量查询在线状态 +// using (var client = new HttpClient()) +// { +// client.Timeout = TimeSpan.FromSeconds(5); + +// var byteArray = Encoding.ASCII.GetBytes("285f888395fa6906:9BAdZeir9C3IVweof9Ate8HfQpn2u6GrH8Q9ArAF9Bb26tnI"); +// client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(byteArray)); + +// // 并行查询多个设备 +// var tasks = needQueryIds.Select(async clientId => +// { +// try +// { +// var clientUrl = $"http://mqtt.cqjiangzhichao.cn/api/v5/clients/{clientId}"; +// var response = await client.GetAsync(clientUrl); +// bool isOnline = response.StatusCode == HttpStatusCode.OK; + +// // 更新缓存 +// lock (_cacheLock) +// { +// _onlineStatusCache[clientId] = (isOnline, DateTime.Now); +// } + +// return new { ClientId = clientId, IsOnline = isOnline }; +// } +// catch (Exception ex) +// { +// Log.Warning($"批量查询设备 {clientId} 在线状态失败: {ex.Message}"); +// // 更新缓存为false +// lock (_cacheLock) +// { +// _onlineStatusCache[clientId] = (false, DateTime.Now); +// } +// return new { ClientId = clientId, IsOnline = false }; +// } +// }); + +// var queryResults = await Task.WhenAll(tasks); + +// // 合并结果 +// foreach (var queryResult in queryResults) +// { +// result[queryResult.ClientId] = queryResult.IsOnline; +// } +// } + +// return result; +// } + +// /// +// /// 清理过期的缓存数据(建议定期调用) +// /// +// public void CleanupExpiredCache() +// { +// lock (_cacheLock) +// { +// var now = DateTime.Now; +// var expiredKeys = new List(); + +// foreach (var kvp in _onlineStatusCache) +// { +// var (isOnline, lastCheck) = kvp.Value; +// var cacheAge = now - lastCheck; + +// // 如果缓存时间超过5分钟,标记为过期 +// if (cacheAge.TotalMinutes > 5) +// { +// expiredKeys.Add(kvp.Key); +// } +// } + +// // 删除过期的缓存 +// foreach (var key in expiredKeys) +// { +// _onlineStatusCache.Remove(key); +// } + +// if (expiredKeys.Count > 0) +// { +// Log.Information($"清理了 {expiredKeys.Count} 个过期的在线状态缓存"); +// } +// } +// } + +// #region 私有回调逻辑 + +// #endregion + +// /// +// /// 检查MQTT连接健康状态 +// /// +// /// 连接状态信息 +// public async Task GetConnectionHealthAsync() +// { +// try +// { +// if (_mqttClient.IsConnected) +// { +// try +// { +// await _mqttClient.PingAsync(CancellationToken.None); +// Log.Debug("MQTT Ping 成功"); +// return new +// { +// IsConnected = true, +// ConnectionTime = DateTime.Now, +// LastPing = DateTime.Now, +// ClientId = _mqttOptions.ClientId, +// BrokerAddress = "mqtt.cqjiangzhichao.cn:1883" +// }; +// } +// catch (Exception pingEx) +// { +// Log.Warning($"MQTT Ping 失败: {pingEx.Message}"); +// return new +// { +// IsConnected = false, +// ConnectionTime = (DateTime?)null, +// LastPing = DateTime.Now, +// ClientId = _mqttOptions.ClientId, +// BrokerAddress = "mqtt.cqjiangzhichao.cn:1883", +// PingError = pingEx.Message +// }; +// } +// } +// else +// { +// return new +// { +// IsConnected = false, +// ConnectionTime = (DateTime?)null, +// LastPing = DateTime.Now, +// ClientId = _mqttOptions.ClientId, +// BrokerAddress = "mqtt.cqjiangzhichao.cn:1883" +// }; +// } +// } +// catch (Exception ex) +// { +// Log.Error($"获取连接健康状态失败: {ex.Message}"); +// return new +// { +// IsConnected = false, +// Error = ex.Message, +// Timestamp = DateTime.Now +// }; +// } +// } + +// /// +// /// 强制重新连接MQTT +// /// +// /// 重连是否成功 +// public async Task ForceReconnectAsync() +// { +// try +// { +// if (_mqttClient.IsConnected) +// { +// await _mqttClient.DisconnectAsync(); +// Log.Information("MQTT 主动断开连接"); +// } + +// await Task.Delay(2000); // 等待2秒后重连 + +// await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); +// Log.Information("MQTT 强制重连成功"); +// return true; +// } +// catch (Exception ex) +// { +// Log.Error($"MQTT 强制重连失败: {ex.Message}"); +// return false; +// } +// } +// } \ No newline at end of file diff --git a/netcore/src/Modularity/Extend/NCC.Extend/MqttPublisherService.cs b/netcore/src/Modularity/Extend/NCC.Extend/MqttPublisherService.cs index c8e77fb..f71fd2d 100644 --- a/netcore/src/Modularity/Extend/NCC.Extend/MqttPublisherService.cs +++ b/netcore/src/Modularity/Extend/NCC.Extend/MqttPublisherService.cs @@ -26,7 +26,7 @@ namespace NCC.Extend; /// /// MQTT 发布与订阅统一服务 /// -public class MqttPublisherService : IMqttPublisherService, ITransient +public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposable { /// /// MQTT 客户端对象(用于连接、发布、订阅等操作) @@ -41,6 +41,8 @@ public class MqttPublisherService : IMqttPublisherService, ITransient private readonly ISqlSugarClient _db; private readonly Dictionary _onlineStatusCache = new Dictionary(); private readonly object _cacheLock = new object(); + private readonly Timer _healthCheckTimer; // 健康检查定时器 + private readonly int _healthCheckInterval = 5 * 60 * 1000; // 5分钟检查一次 /// /// 构造函数:初始化客户端和配置、注册事件 /// @@ -81,6 +83,18 @@ public class MqttPublisherService : IMqttPublisherService, ITransient await Task.Delay(TimeSpan.FromSeconds(retryInterval)); await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); Log.Information("MQTT 重连成功"); + + // 重连成功后重新订阅主题 + try + { + await _mqttClient.SubscribeAsync("device/+/response"); + Log.Information("重连后重新订阅主题成功: device/+/response"); + } + catch (Exception subEx) + { + Log.Error($"重连后重新订阅主题失败: {subEx.Message}"); + } + break; // 重连成功后跳出循环 } catch (Exception ex) @@ -151,6 +165,25 @@ public class MqttPublisherService : IMqttPublisherService, ITransient } }); + // 初始化健康检查定时器 + _healthCheckTimer = new Timer(HealthCheckCallback, null, _healthCheckInterval, _healthCheckInterval); + Log.Information("MQTT健康检查定时器已启动,每5分钟检查一次"); + } + + /// + /// 健康检查定时器回调方法 + /// + private async void HealthCheckCallback(object state) + { + try + { + Log.Debug("执行定时MQTT健康检查..."); + await PerformHealthCheckAndRepairAsync(); + } + catch (Exception ex) + { + Log.Error($"定时健康检查执行失败: {ex.Message}"); + } } /// @@ -249,6 +282,30 @@ public class MqttPublisherService : IMqttPublisherService, ITransient { await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); } + + // 确保订阅状态正常 + await EnsureSubscriptionAsync(); + } + + /// + /// 确保订阅状态正常,如果订阅丢失则重新订阅 + /// + private async Task EnsureSubscriptionAsync() + { + try + { + if (_mqttClient.IsConnected) + { + // 检查订阅状态,如果订阅丢失则重新订阅 + Log.Information("检查MQTT订阅状态..."); + await _mqttClient.SubscribeAsync("device/+/response"); + Log.Information("MQTT订阅状态正常: device/+/response"); + } + } + catch (Exception ex) + { + Log.Warning($"检查订阅状态时出错: {ex.Message}"); + } } /// @@ -582,6 +639,9 @@ public class MqttPublisherService : IMqttPublisherService, ITransient await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); Log.Information("MQTT 强制重连成功"); + + // 重连后重新订阅 + await EnsureSubscriptionAsync(); return true; } catch (Exception ex) @@ -590,4 +650,90 @@ public class MqttPublisherService : IMqttPublisherService, ITransient return false; } } + + /// + /// 定期健康检查和自动修复(建议每5分钟调用一次) + /// + /// 修复是否成功 + public async Task PerformHealthCheckAndRepairAsync() + { + try + { + Log.Information("开始MQTT健康检查..."); + + // 检查连接状态 + if (!_mqttClient.IsConnected) + { + Log.Warning("MQTT连接已断开,尝试重新连接..."); + await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); + await EnsureSubscriptionAsync(); + return true; + } + + // 检查连接质量(发送Ping) + try + { + await _mqttClient.PingAsync(CancellationToken.None); + Log.Debug("MQTT Ping 成功,连接质量良好"); + } + catch (Exception pingEx) + { + Log.Warning($"MQTT Ping 失败,连接可能有问题: {pingEx.Message}"); + + // Ping失败,尝试重新连接 + Log.Information("尝试重新建立连接..."); + await _mqttClient.DisconnectAsync(); + await Task.Delay(1000); + await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); + await EnsureSubscriptionAsync(); + return true; + } + + // 检查订阅状态 + await EnsureSubscriptionAsync(); + + Log.Information("MQTT健康检查完成,状态正常"); + return true; + } + catch (Exception ex) + { + Log.Error($"MQTT健康检查失败: {ex.Message}"); + return false; + } + } + + /// + /// 获取当前订阅的主题列表 + /// + /// 订阅的主题列表 + public async Task> GetSubscribedTopicsAsync() + { + try + { + // 注意:MQTTnet可能不直接提供获取订阅列表的方法 + // 这里返回我们已知的订阅主题 + return new List { "device/+/response" }; + } + catch (Exception ex) + { + Log.Error($"获取订阅主题列表失败: {ex.Message}"); + return new List(); + } + } + + /// + /// 释放资源 + /// + public void Dispose() + { + try + { + _healthCheckTimer?.Dispose(); + Log.Information("MQTT健康检查定时器已释放"); + } + catch (Exception ex) + { + Log.Error($"释放MQTT资源时出错: {ex.Message}"); + } + } } \ No newline at end of file diff --git a/netcore/src/Modularity/Extend/NCC.Extend/NCC.Extend.xml b/netcore/src/Modularity/Extend/NCC.Extend/NCC.Extend.xml index 6961022..6fefd09 100644 --- a/netcore/src/Modularity/Extend/NCC.Extend/NCC.Extend.xml +++ b/netcore/src/Modularity/Extend/NCC.Extend/NCC.Extend.xml @@ -137,6 +137,11 @@ 构造函数:初始化客户端和配置、注册事件 + + + 健康检查定时器回调方法 + + 向指定 Topic 发布 MQTT 消息 @@ -150,6 +155,11 @@ 手动启动 MQTT 客户端(推荐在程序启动时调用) + + + 确保订阅状态正常,如果订阅丢失则重新订阅 + + 查询指定clientId是否在线(通过EMQX管理API,带缓存) @@ -181,6 +191,23 @@ 重连是否成功 + + + 定期健康检查和自动修复(建议每5分钟调用一次) + + 修复是否成功 + + + + 获取当前订阅的主题列表 + + 订阅的主题列表 + + + + 释放资源 + + 广告管理服务 diff --git a/netcore/src/Modularity/Extend/NCC.Extend/UavOrderService.cs b/netcore/src/Modularity/Extend/NCC.Extend/UavOrderService.cs index c73e464..8a64a28 100644 --- a/netcore/src/Modularity/Extend/NCC.Extend/UavOrderService.cs +++ b/netcore/src/Modularity/Extend/NCC.Extend/UavOrderService.cs @@ -274,6 +274,7 @@ namespace NCC.Extend.UavOrder var RerutnEntity = await _db.UseTranAsync(async () => { var halfHourAgo = DateTime.Now.AddMinutes(-30); + // var halfHourAgo = DateTime.Now.AddMinutes(-0); try { //通过设备去找到设备货道,使用With(SqlSugar.SqlWith.RowLock)添加行锁