Commit a0ec3cdb1c9934ae4e55a0a99a6300b2e5046368

Authored by “wangming”
1 parent 62329717

优化了MQTT连接

.DS_Store
No preview for this file type
netcore/.DS_Store
No preview for this file type
netcore/src/.DS_Store
No preview for this file type
netcore/src/Application/.DS_Store
No preview for this file type
netcore/src/Application/NCC.API/.DS_Store
No preview for this file type
netcore/src/Modularity/.DS_Store
No preview for this file type
netcore/src/Modularity/Extend/NCC.Extend/MqttPublisherService copy.cs 0 → 100644
  1 +// using System;
  2 +// using System.Collections.Generic;
  3 +// using System.Text;
  4 +// using System.Text.Json;
  5 +// using System.Threading;
  6 +// using System.Threading.Tasks;
  7 +// using Aliyun.Acs.Core.Logging;
  8 +// using Microsoft.Extensions.DependencyInjection;
  9 +// using MQTTnet;
  10 +// using MQTTnet.Client;
  11 +// using MQTTnet.Client.Options;
  12 +// using NCC.Dependency;
  13 +// using NCC.Extend.Entitys;
  14 +// using NCC.Extend.Interfaces.MqttPublisher;
  15 +// using NCC.Extend.Interfaces.UavOrder;
  16 +// using Serilog;
  17 +// using SqlSugar;
  18 +// using Yitter.IdGenerator;
  19 +// using System.Net.Http;
  20 +// using System.Net.Http.Headers;
  21 +// using System.Net;
  22 +// using System.Linq;
  23 +
  24 +// namespace NCC.Extend;
  25 +
  26 +// /// <summary>
  27 +// /// MQTT 发布与订阅统一服务
  28 +// /// </summary>
  29 +// public class MqttPublisherService : IMqttPublisherService, ITransient
  30 +// {
  31 +// /// <summary>
  32 +// /// MQTT 客户端对象(用于连接、发布、订阅等操作)
  33 +// /// </summary>
  34 +// private readonly IMqttClient _mqttClient;
  35 +
  36 +// /// <summary>
  37 +// /// MQTT 连接参数配置
  38 +// /// </summary>
  39 +// private readonly IMqttClientOptions _mqttOptions;
  40 +// private readonly IServiceProvider _serviceProvider;
  41 +// private readonly ISqlSugarClient _db;
  42 +// private readonly Dictionary<string, (bool isOnline, DateTime lastCheck)> _onlineStatusCache = new Dictionary<string, (bool, DateTime)>();
  43 +// private readonly object _cacheLock = new object();
  44 +// /// <summary>
  45 +// /// 构造函数:初始化客户端和配置、注册事件
  46 +// /// </summary>
  47 +// public MqttPublisherService(IServiceProvider serviceProvider, ISqlSugarClient db)
  48 +// {
  49 +// _serviceProvider = serviceProvider;
  50 +// _db = db;
  51 +// // 创建 MQTT 客户端实例
  52 +// var factory = new MqttFactory();
  53 +// _mqttClient = factory.CreateMqttClient();
  54 +
  55 +// // 构建连接配置(MQTT 服务器地址、端口、用户名密码、客户端 ID)
  56 +// _mqttOptions = new MqttClientOptionsBuilder()
  57 +// .WithTcpServer("mqtt.cqjiangzhichao.cn", 1883) // Broker 地址
  58 +// .WithCredentials("wrjservice", "P@ssw0rd") // 账号密码
  59 +// .WithClientId("server_publisher") // 客户端 ID,必须唯一
  60 +// .Build();
  61 +
  62 +// // 连接成功事件:订阅所有设备的响应主题(如 device/xxx/response)
  63 +// _mqttClient.UseConnectedHandler(async e =>
  64 +// {
  65 +// Log.Information("MQTT 已连接成功");
  66 +// // 订阅所有设备的响应主题(+ 代表通配符)
  67 +// await _mqttClient.SubscribeAsync("device/+/response");
  68 +// Log.Information("已订阅通用响应 topic: device/+/response");
  69 +// });
  70 +
  71 +// // 连接断开事件
  72 +// _mqttClient.UseDisconnectedHandler(async e =>
  73 +// {
  74 +// Log.Warning("MQTT 已断开连接");
  75 +// int retryInterval = 5; // 秒
  76 +// while (!_mqttClient.IsConnected)
  77 +// {
  78 +// try
  79 +// {
  80 +// Log.Information($"尝试在 {retryInterval} 秒后重新连接 MQTT...");
  81 +// await Task.Delay(TimeSpan.FromSeconds(retryInterval));
  82 +// await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
  83 +// Log.Information("MQTT 重连成功");
  84 +// break; // 重连成功后跳出循环
  85 +// }
  86 +// catch (Exception ex)
  87 +// {
  88 +// Log.Error($"MQTT 重连失败: {ex.Message}");
  89 +// // 重连失败后继续循环,但增加延迟避免频繁重试
  90 +// await Task.Delay(TimeSpan.FromSeconds(retryInterval * 2));
  91 +// }
  92 +// }
  93 +// });
  94 +// // 收到消息事件:处理设备回传的消息
  95 +// _mqttClient.UseApplicationMessageReceivedHandler(e =>
  96 +// {
  97 +// try
  98 +// {
  99 +// // 获取 topic 和 payload 内容
  100 +// var topic = e.ApplicationMessage.Topic;
  101 +// var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? Array.Empty<byte>());
  102 +// Log.Information($"收到 MQTT 消息,Topic: {topic}, Payload: {payload}");
  103 +// // 反序列化 JSON 内容为 MqttContent 实体
  104 +// var content = JsonSerializer.Deserialize<MqttContent>(payload);
  105 +// if (content != null)
  106 +// {
  107 +// // 从 topic 中提取设备 ID,例如 device/abc123/response
  108 +// var topicParts = topic.Split('/');
  109 +// var deviceId = topicParts.Length >= 2 ? topicParts[1] : "unknown";
  110 +// Log.Information($"{topicParts[0]}: {deviceId})");
  111 +// UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity
  112 +// {
  113 +// Id = YitIdHelper.NextId().ToString(),
  114 +// MessageType = "response",
  115 +// Topic = topic,
  116 +// Payload = payload,
  117 +// DeviceId = deviceId,
  118 +// Processed = 1,
  119 +// Status = "success",
  120 +// CreatedAt = DateTime.Now,
  121 +// };
  122 +// if (!payload.Contains("\"status\":\"received\""))
  123 +// {
  124 +// //判断两秒内,Payload一样的话,就不处理了
  125 +// var twoSecondsAgo = DateTime.Now.AddSeconds(-2);
  126 +// var lastLog = _db.Queryable<UavMqttMessageLogEntity>().Where(p => p.DeviceId == deviceId && p.Payload == payload && p.CreatedAt >= twoSecondsAgo).Any();
  127 +// if (lastLog == false)
  128 +// {
  129 +// Log.Information($"[订阅消息] 处理新消息:设备 {deviceId},内容:{payload}");
  130 +// var orderService = _serviceProvider.GetRequiredService<IUavOrderService>();
  131 +// orderService.HandleSubscribeMessage(payload);
  132 +// }
  133 +// else
  134 +// {
  135 +// Log.Information($"[订阅消息] 接收到重复的消息,已忽略:设备 {deviceId},内容:{payload}");
  136 +// }
  137 +// //只保存正常消息
  138 +// _db.Insertable(logEntity).ExecuteCommand();
  139 +// }
  140 +// else
  141 +// {
  142 +// Log.Debug($"[订阅消息] 接收到确认消息,不处理:设备 {deviceId},内容:{payload}");
  143 +// // 保存确认消息到日志
  144 +// _db.Insertable(logEntity).ExecuteCommand();
  145 +// }
  146 +// }
  147 +// }
  148 +// catch (Exception ex)
  149 +// {
  150 +// Log.Error($"消息处理异常: {ex.Message}");
  151 +// }
  152 +// });
  153 +
  154 +// }
  155 +
  156 +// /// <summary>
  157 +// /// 向指定 Topic 发布 MQTT 消息
  158 +// /// </summary>
  159 +// /// <param name="topic">主题路径,例如 device/{id}/command</param>
  160 +// /// <param name="payload">发送的 JSON 字符串内容</param>
  161 +// /// <returns>是否成功发送</returns>
  162 +// public async Task<bool> PublishAsync(string topic, string payload)
  163 +// {
  164 +// try
  165 +// {
  166 +// // 如果未连接则先连接
  167 +// if (!_mqttClient.IsConnected)
  168 +// {
  169 +// Log.Warning($"MQTT 未连接,尝试重新连接...");
  170 +// await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
  171 +
  172 +// // 等待连接稳定
  173 +// int retryCount = 0;
  174 +// while (!_mqttClient.IsConnected && retryCount < 3)
  175 +// {
  176 +// await Task.Delay(1000);
  177 +// retryCount++;
  178 +// }
  179 +
  180 +// if (!_mqttClient.IsConnected)
  181 +// {
  182 +// Log.Error("MQTT 连接失败,无法发送消息");
  183 +// return false;
  184 +// }
  185 +// }
  186 +
  187 +// // 构建 MQTT 消息
  188 +// var message = new MqttApplicationMessageBuilder()
  189 +// .WithTopic(topic)
  190 +// .WithPayload(payload)
  191 +// .WithAtLeastOnceQoS() // QoS 1:至少送达一次
  192 +// .WithRetainFlag(false) // 不保留历史消息
  193 +// .Build();
  194 +
  195 +// // 异步发送
  196 +// await _mqttClient.PublishAsync(message);
  197 +// Log.Information($"MQTT 发布成功 -> Topic: {topic}, Payload: {payload}");
  198 +// var topicParts = topic.Split('/');
  199 +// UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity
  200 +// {
  201 +// Id = YitIdHelper.NextId().ToString(),
  202 +// MessageType = "send",
  203 +// Topic = topic,
  204 +// Payload = payload,
  205 +// DeviceId = topicParts[1],
  206 +// Processed = 1,
  207 +// Status = "success",
  208 +// CreatedAt = DateTime.Now,
  209 +// };
  210 +// _db.Insertable(logEntity).ExecuteCommand();
  211 +// return true;
  212 +// }
  213 +// catch (Exception ex)
  214 +// {
  215 +// Log.Error($"MQTT 发送失败:{ex.Message}");
  216 +
  217 +// // 记录失败的消息到数据库
  218 +// try
  219 +// {
  220 +// var topicParts = topic.Split('/');
  221 +// UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity
  222 +// {
  223 +// Id = YitIdHelper.NextId().ToString(),
  224 +// MessageType = "send",
  225 +// Topic = topic,
  226 +// Payload = payload,
  227 +// DeviceId = topicParts.Length > 1 ? topicParts[1] : "unknown",
  228 +// Processed = 0,
  229 +// Status = "failed",
  230 +// CreatedAt = DateTime.Now,
  231 +// };
  232 +// _db.Insertable(logEntity).ExecuteCommand();
  233 +// }
  234 +// catch (Exception logEx)
  235 +// {
  236 +// Log.Error($"记录失败消息到数据库时出错:{logEx.Message}");
  237 +// }
  238 +
  239 +// return false;
  240 +// }
  241 +// }
  242 +
  243 +// /// <summary>
  244 +// /// 手动启动 MQTT 客户端(推荐在程序启动时调用)
  245 +// /// </summary>
  246 +// public async Task StartAsync()
  247 +// {
  248 +// if (!_mqttClient.IsConnected)
  249 +// {
  250 +// await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
  251 +// }
  252 +// }
  253 +
  254 +// /// <summary>
  255 +// /// 查询指定clientId是否在线(通过EMQX管理API,带缓存)
  256 +// /// </summary>
  257 +// /// <param name="clientId">要查询的客户端ID</param>
  258 +// /// <returns>在线返回true,不在线返回false</returns>
  259 +// public async Task<dynamic> IsClientOnlineAsync(string clientId)
  260 +// {
  261 +// if (string.IsNullOrEmpty(clientId))
  262 +// {
  263 +// Log.Warning("设备ID为空,无法查询在线状态");
  264 +// return false;
  265 +// }
  266 +
  267 +// // 检查缓存 - 增加缓存时间到60秒,减少API调用
  268 +// lock (_cacheLock)
  269 +// {
  270 +// if (_onlineStatusCache.TryGetValue(clientId, out var cacheEntry))
  271 +// {
  272 +// var (isOnline, lastCheck) = cacheEntry;
  273 +// var cacheAge = DateTime.Now - lastCheck;
  274 +
  275 +// // 如果缓存时间小于60秒,直接返回缓存结果
  276 +// if (cacheAge.TotalSeconds < 60)
  277 +// {
  278 +// Log.Debug($"使用缓存结果:设备 {clientId} 在线状态 = {isOnline}");
  279 +// return isOnline;
  280 +// }
  281 +// }
  282 +// }
  283 +
  284 +// // 使用静态HttpClient以提高性能
  285 +// using (var client = new HttpClient())
  286 +// {
  287 +// client.Timeout = TimeSpan.FromSeconds(3); // 减少超时时间到3秒
  288 +
  289 +// // 使用你的EMQX管理账号密码
  290 +// var byteArray = Encoding.ASCII.GetBytes("285f888395fa6906:9BAdZeir9C3IVweof9Ate8HfQpn2u6GrH8Q9ArAF9Bb26tnI");
  291 +// client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(byteArray));
  292 +
  293 +// // 减少重试次数,提高响应速度
  294 +// int maxRetries = 1;
  295 +// for (int retry = 0; retry <= maxRetries; retry++)
  296 +// {
  297 +// try
  298 +// {
  299 +// // 查询指定客户端
  300 +// var clientUrl = $"http://mqtt.cqjiangzhichao.cn/api/v5/clients/{clientId}";
  301 +// var response = await client.GetAsync(clientUrl);
  302 +// bool isOnline = false;
  303 +
  304 +// if (response.StatusCode == HttpStatusCode.OK)
  305 +// {
  306 +// Log.Debug($"设备 {clientId} 在线状态查询成功");
  307 +// isOnline = true;
  308 +// }
  309 +// else if (response.StatusCode == HttpStatusCode.NotFound)
  310 +// {
  311 +// Log.Debug($"设备 {clientId} 不在线");
  312 +// isOnline = false;
  313 +// }
  314 +// else
  315 +// {
  316 +// Log.Warning($"查询设备 {clientId} 在线状态失败,状态码: {response.StatusCode},重试次数: {retry + 1}");
  317 +// if (retry == maxRetries)
  318 +// {
  319 +// Log.Error($"查询设备 {clientId} 在线状态最终失败,状态码: {response.StatusCode}");
  320 +// isOnline = false;
  321 +// }
  322 +// else
  323 +// {
  324 +// await Task.Delay(500); // 减少等待时间到500ms
  325 +// continue;
  326 +// }
  327 +// }
  328 +
  329 +// // 更新缓存
  330 +// lock (_cacheLock)
  331 +// {
  332 +// _onlineStatusCache[clientId] = (isOnline, DateTime.Now);
  333 +// }
  334 +// return isOnline;
  335 +// }
  336 +// catch (TaskCanceledException ex)
  337 +// {
  338 +// Log.Warning($"查询设备 {clientId} 在线状态超时,重试次数: {retry + 1}");
  339 +// if (retry == maxRetries)
  340 +// {
  341 +// Log.Error($"查询设备 {clientId} 在线状态最终超时");
  342 +// // 更新缓存为false
  343 +// lock (_cacheLock)
  344 +// {
  345 +// _onlineStatusCache[clientId] = (false, DateTime.Now);
  346 +// }
  347 +// return false;
  348 +// }
  349 +// await Task.Delay(500); // 减少等待时间到500ms
  350 +// }
  351 +// catch (Exception ex)
  352 +// {
  353 +// Log.Error($"查询设备 {clientId} 在线状态异常: {ex.Message},重试次数: {retry + 1}");
  354 +// if (retry == maxRetries)
  355 +// {
  356 +// // 更新缓存为false
  357 +// lock (_cacheLock)
  358 +// {
  359 +// _onlineStatusCache[clientId] = (false, DateTime.Now);
  360 +// }
  361 +// return false;
  362 +// }
  363 +// await Task.Delay(500); // 减少等待时间到500ms
  364 +// }
  365 +// }
  366 +
  367 +// // 更新缓存为false
  368 +// lock (_cacheLock)
  369 +// {
  370 +// _onlineStatusCache[clientId] = (false, DateTime.Now);
  371 +// }
  372 +// return false;
  373 +// }
  374 +// }
  375 +
  376 +// /// <summary>
  377 +// /// 批量查询设备在线状态(提高效率)
  378 +// /// </summary>
  379 +// /// <param name="clientIds">要查询的客户端ID列表</param>
  380 +// /// <returns>在线状态字典</returns>
  381 +// public async Task<Dictionary<string, bool>> BatchCheckOnlineStatusAsync(List<string> clientIds)
  382 +// {
  383 +// if (clientIds == null || clientIds.Count == 0)
  384 +// {
  385 +// return new Dictionary<string, bool>();
  386 +// }
  387 +
  388 +// var result = new Dictionary<string, bool>();
  389 +// var needQueryIds = new List<string>();
  390 +
  391 +// // 先检查缓存
  392 +// lock (_cacheLock)
  393 +// {
  394 +// foreach (var clientId in clientIds)
  395 +// {
  396 +// if (_onlineStatusCache.TryGetValue(clientId, out var cacheEntry))
  397 +// {
  398 +// var (isOnline, lastCheck) = cacheEntry;
  399 +// var cacheAge = DateTime.Now - lastCheck;
  400 +
  401 +// // 如果缓存时间小于60秒,使用缓存结果
  402 +// if (cacheAge.TotalSeconds < 60)
  403 +// {
  404 +// result[clientId] = isOnline;
  405 +// continue;
  406 +// }
  407 +// }
  408 +// needQueryIds.Add(clientId);
  409 +// }
  410 +// }
  411 +
  412 +// // 如果没有需要查询的设备,直接返回缓存结果
  413 +// if (needQueryIds.Count == 0)
  414 +// {
  415 +// return result;
  416 +// }
  417 +
  418 +// // 批量查询在线状态
  419 +// using (var client = new HttpClient())
  420 +// {
  421 +// client.Timeout = TimeSpan.FromSeconds(5);
  422 +
  423 +// var byteArray = Encoding.ASCII.GetBytes("285f888395fa6906:9BAdZeir9C3IVweof9Ate8HfQpn2u6GrH8Q9ArAF9Bb26tnI");
  424 +// client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(byteArray));
  425 +
  426 +// // 并行查询多个设备
  427 +// var tasks = needQueryIds.Select(async clientId =>
  428 +// {
  429 +// try
  430 +// {
  431 +// var clientUrl = $"http://mqtt.cqjiangzhichao.cn/api/v5/clients/{clientId}";
  432 +// var response = await client.GetAsync(clientUrl);
  433 +// bool isOnline = response.StatusCode == HttpStatusCode.OK;
  434 +
  435 +// // 更新缓存
  436 +// lock (_cacheLock)
  437 +// {
  438 +// _onlineStatusCache[clientId] = (isOnline, DateTime.Now);
  439 +// }
  440 +
  441 +// return new { ClientId = clientId, IsOnline = isOnline };
  442 +// }
  443 +// catch (Exception ex)
  444 +// {
  445 +// Log.Warning($"批量查询设备 {clientId} 在线状态失败: {ex.Message}");
  446 +// // 更新缓存为false
  447 +// lock (_cacheLock)
  448 +// {
  449 +// _onlineStatusCache[clientId] = (false, DateTime.Now);
  450 +// }
  451 +// return new { ClientId = clientId, IsOnline = false };
  452 +// }
  453 +// });
  454 +
  455 +// var queryResults = await Task.WhenAll(tasks);
  456 +
  457 +// // 合并结果
  458 +// foreach (var queryResult in queryResults)
  459 +// {
  460 +// result[queryResult.ClientId] = queryResult.IsOnline;
  461 +// }
  462 +// }
  463 +
  464 +// return result;
  465 +// }
  466 +
  467 +// /// <summary>
  468 +// /// 清理过期的缓存数据(建议定期调用)
  469 +// /// </summary>
  470 +// public void CleanupExpiredCache()
  471 +// {
  472 +// lock (_cacheLock)
  473 +// {
  474 +// var now = DateTime.Now;
  475 +// var expiredKeys = new List<string>();
  476 +
  477 +// foreach (var kvp in _onlineStatusCache)
  478 +// {
  479 +// var (isOnline, lastCheck) = kvp.Value;
  480 +// var cacheAge = now - lastCheck;
  481 +
  482 +// // 如果缓存时间超过5分钟,标记为过期
  483 +// if (cacheAge.TotalMinutes > 5)
  484 +// {
  485 +// expiredKeys.Add(kvp.Key);
  486 +// }
  487 +// }
  488 +
  489 +// // 删除过期的缓存
  490 +// foreach (var key in expiredKeys)
  491 +// {
  492 +// _onlineStatusCache.Remove(key);
  493 +// }
  494 +
  495 +// if (expiredKeys.Count > 0)
  496 +// {
  497 +// Log.Information($"清理了 {expiredKeys.Count} 个过期的在线状态缓存");
  498 +// }
  499 +// }
  500 +// }
  501 +
  502 +// #region 私有回调逻辑
  503 +
  504 +// #endregion
  505 +
  506 +// /// <summary>
  507 +// /// 检查MQTT连接健康状态
  508 +// /// </summary>
  509 +// /// <returns>连接状态信息</returns>
  510 +// public async Task<dynamic> GetConnectionHealthAsync()
  511 +// {
  512 +// try
  513 +// {
  514 +// if (_mqttClient.IsConnected)
  515 +// {
  516 +// try
  517 +// {
  518 +// await _mqttClient.PingAsync(CancellationToken.None);
  519 +// Log.Debug("MQTT Ping 成功");
  520 +// return new
  521 +// {
  522 +// IsConnected = true,
  523 +// ConnectionTime = DateTime.Now,
  524 +// LastPing = DateTime.Now,
  525 +// ClientId = _mqttOptions.ClientId,
  526 +// BrokerAddress = "mqtt.cqjiangzhichao.cn:1883"
  527 +// };
  528 +// }
  529 +// catch (Exception pingEx)
  530 +// {
  531 +// Log.Warning($"MQTT Ping 失败: {pingEx.Message}");
  532 +// return new
  533 +// {
  534 +// IsConnected = false,
  535 +// ConnectionTime = (DateTime?)null,
  536 +// LastPing = DateTime.Now,
  537 +// ClientId = _mqttOptions.ClientId,
  538 +// BrokerAddress = "mqtt.cqjiangzhichao.cn:1883",
  539 +// PingError = pingEx.Message
  540 +// };
  541 +// }
  542 +// }
  543 +// else
  544 +// {
  545 +// return new
  546 +// {
  547 +// IsConnected = false,
  548 +// ConnectionTime = (DateTime?)null,
  549 +// LastPing = DateTime.Now,
  550 +// ClientId = _mqttOptions.ClientId,
  551 +// BrokerAddress = "mqtt.cqjiangzhichao.cn:1883"
  552 +// };
  553 +// }
  554 +// }
  555 +// catch (Exception ex)
  556 +// {
  557 +// Log.Error($"获取连接健康状态失败: {ex.Message}");
  558 +// return new
  559 +// {
  560 +// IsConnected = false,
  561 +// Error = ex.Message,
  562 +// Timestamp = DateTime.Now
  563 +// };
  564 +// }
  565 +// }
  566 +
  567 +// /// <summary>
  568 +// /// 强制重新连接MQTT
  569 +// /// </summary>
  570 +// /// <returns>重连是否成功</returns>
  571 +// public async Task<bool> ForceReconnectAsync()
  572 +// {
  573 +// try
  574 +// {
  575 +// if (_mqttClient.IsConnected)
  576 +// {
  577 +// await _mqttClient.DisconnectAsync();
  578 +// Log.Information("MQTT 主动断开连接");
  579 +// }
  580 +
  581 +// await Task.Delay(2000); // 等待2秒后重连
  582 +
  583 +// await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
  584 +// Log.Information("MQTT 强制重连成功");
  585 +// return true;
  586 +// }
  587 +// catch (Exception ex)
  588 +// {
  589 +// Log.Error($"MQTT 强制重连失败: {ex.Message}");
  590 +// return false;
  591 +// }
  592 +// }
  593 +// }
0 \ No newline at end of file 594 \ No newline at end of file
netcore/src/Modularity/Extend/NCC.Extend/MqttPublisherService.cs
@@ -26,7 +26,7 @@ namespace NCC.Extend; @@ -26,7 +26,7 @@ namespace NCC.Extend;
26 /// <summary> 26 /// <summary>
27 /// MQTT 发布与订阅统一服务 27 /// MQTT 发布与订阅统一服务
28 /// </summary> 28 /// </summary>
29 -public class MqttPublisherService : IMqttPublisherService, ITransient 29 +public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposable
30 { 30 {
31 /// <summary> 31 /// <summary>
32 /// MQTT 客户端对象(用于连接、发布、订阅等操作) 32 /// MQTT 客户端对象(用于连接、发布、订阅等操作)
@@ -41,6 +41,8 @@ public class MqttPublisherService : IMqttPublisherService, ITransient @@ -41,6 +41,8 @@ public class MqttPublisherService : IMqttPublisherService, ITransient
41 private readonly ISqlSugarClient _db; 41 private readonly ISqlSugarClient _db;
42 private readonly Dictionary<string, (bool isOnline, DateTime lastCheck)> _onlineStatusCache = new Dictionary<string, (bool, DateTime)>(); 42 private readonly Dictionary<string, (bool isOnline, DateTime lastCheck)> _onlineStatusCache = new Dictionary<string, (bool, DateTime)>();
43 private readonly object _cacheLock = new object(); 43 private readonly object _cacheLock = new object();
  44 + private readonly Timer _healthCheckTimer; // 健康检查定时器
  45 + private readonly int _healthCheckInterval = 5 * 60 * 1000; // 5分钟检查一次
44 /// <summary> 46 /// <summary>
45 /// 构造函数:初始化客户端和配置、注册事件 47 /// 构造函数:初始化客户端和配置、注册事件
46 /// </summary> 48 /// </summary>
@@ -81,6 +83,18 @@ public class MqttPublisherService : IMqttPublisherService, ITransient @@ -81,6 +83,18 @@ public class MqttPublisherService : IMqttPublisherService, ITransient
81 await Task.Delay(TimeSpan.FromSeconds(retryInterval)); 83 await Task.Delay(TimeSpan.FromSeconds(retryInterval));
82 await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); 84 await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
83 Log.Information("MQTT 重连成功"); 85 Log.Information("MQTT 重连成功");
  86 +
  87 + // 重连成功后重新订阅主题
  88 + try
  89 + {
  90 + await _mqttClient.SubscribeAsync("device/+/response");
  91 + Log.Information("重连后重新订阅主题成功: device/+/response");
  92 + }
  93 + catch (Exception subEx)
  94 + {
  95 + Log.Error($"重连后重新订阅主题失败: {subEx.Message}");
  96 + }
  97 +
84 break; // 重连成功后跳出循环 98 break; // 重连成功后跳出循环
85 } 99 }
86 catch (Exception ex) 100 catch (Exception ex)
@@ -151,6 +165,25 @@ public class MqttPublisherService : IMqttPublisherService, ITransient @@ -151,6 +165,25 @@ public class MqttPublisherService : IMqttPublisherService, ITransient
151 } 165 }
152 }); 166 });
153 167
  168 + // 初始化健康检查定时器
  169 + _healthCheckTimer = new Timer(HealthCheckCallback, null, _healthCheckInterval, _healthCheckInterval);
  170 + Log.Information("MQTT健康检查定时器已启动,每5分钟检查一次");
  171 + }
  172 +
  173 + /// <summary>
  174 + /// 健康检查定时器回调方法
  175 + /// </summary>
  176 + private async void HealthCheckCallback(object state)
  177 + {
  178 + try
  179 + {
  180 + Log.Debug("执行定时MQTT健康检查...");
  181 + await PerformHealthCheckAndRepairAsync();
  182 + }
  183 + catch (Exception ex)
  184 + {
  185 + Log.Error($"定时健康检查执行失败: {ex.Message}");
  186 + }
154 } 187 }
155 188
156 /// <summary> 189 /// <summary>
@@ -249,6 +282,30 @@ public class MqttPublisherService : IMqttPublisherService, ITransient @@ -249,6 +282,30 @@ public class MqttPublisherService : IMqttPublisherService, ITransient
249 { 282 {
250 await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); 283 await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
251 } 284 }
  285 +
  286 + // 确保订阅状态正常
  287 + await EnsureSubscriptionAsync();
  288 + }
  289 +
  290 + /// <summary>
  291 + /// 确保订阅状态正常,如果订阅丢失则重新订阅
  292 + /// </summary>
  293 + private async Task EnsureSubscriptionAsync()
  294 + {
  295 + try
  296 + {
  297 + if (_mqttClient.IsConnected)
  298 + {
  299 + // 检查订阅状态,如果订阅丢失则重新订阅
  300 + Log.Information("检查MQTT订阅状态...");
  301 + await _mqttClient.SubscribeAsync("device/+/response");
  302 + Log.Information("MQTT订阅状态正常: device/+/response");
  303 + }
  304 + }
  305 + catch (Exception ex)
  306 + {
  307 + Log.Warning($"检查订阅状态时出错: {ex.Message}");
  308 + }
252 } 309 }
253 310
254 /// <summary> 311 /// <summary>
@@ -582,6 +639,9 @@ public class MqttPublisherService : IMqttPublisherService, ITransient @@ -582,6 +639,9 @@ public class MqttPublisherService : IMqttPublisherService, ITransient
582 639
583 await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); 640 await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
584 Log.Information("MQTT 强制重连成功"); 641 Log.Information("MQTT 强制重连成功");
  642 +
  643 + // 重连后重新订阅
  644 + await EnsureSubscriptionAsync();
585 return true; 645 return true;
586 } 646 }
587 catch (Exception ex) 647 catch (Exception ex)
@@ -590,4 +650,90 @@ public class MqttPublisherService : IMqttPublisherService, ITransient @@ -590,4 +650,90 @@ public class MqttPublisherService : IMqttPublisherService, ITransient
590 return false; 650 return false;
591 } 651 }
592 } 652 }
  653 +
  654 + /// <summary>
  655 + /// 定期健康检查和自动修复(建议每5分钟调用一次)
  656 + /// </summary>
  657 + /// <returns>修复是否成功</returns>
  658 + public async Task<bool> PerformHealthCheckAndRepairAsync()
  659 + {
  660 + try
  661 + {
  662 + Log.Information("开始MQTT健康检查...");
  663 +
  664 + // 检查连接状态
  665 + if (!_mqttClient.IsConnected)
  666 + {
  667 + Log.Warning("MQTT连接已断开,尝试重新连接...");
  668 + await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
  669 + await EnsureSubscriptionAsync();
  670 + return true;
  671 + }
  672 +
  673 + // 检查连接质量(发送Ping)
  674 + try
  675 + {
  676 + await _mqttClient.PingAsync(CancellationToken.None);
  677 + Log.Debug("MQTT Ping 成功,连接质量良好");
  678 + }
  679 + catch (Exception pingEx)
  680 + {
  681 + Log.Warning($"MQTT Ping 失败,连接可能有问题: {pingEx.Message}");
  682 +
  683 + // Ping失败,尝试重新连接
  684 + Log.Information("尝试重新建立连接...");
  685 + await _mqttClient.DisconnectAsync();
  686 + await Task.Delay(1000);
  687 + await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
  688 + await EnsureSubscriptionAsync();
  689 + return true;
  690 + }
  691 +
  692 + // 检查订阅状态
  693 + await EnsureSubscriptionAsync();
  694 +
  695 + Log.Information("MQTT健康检查完成,状态正常");
  696 + return true;
  697 + }
  698 + catch (Exception ex)
  699 + {
  700 + Log.Error($"MQTT健康检查失败: {ex.Message}");
  701 + return false;
  702 + }
  703 + }
  704 +
  705 + /// <summary>
  706 + /// 获取当前订阅的主题列表
  707 + /// </summary>
  708 + /// <returns>订阅的主题列表</returns>
  709 + public async Task<List<string>> GetSubscribedTopicsAsync()
  710 + {
  711 + try
  712 + {
  713 + // 注意:MQTTnet可能不直接提供获取订阅列表的方法
  714 + // 这里返回我们已知的订阅主题
  715 + return new List<string> { "device/+/response" };
  716 + }
  717 + catch (Exception ex)
  718 + {
  719 + Log.Error($"获取订阅主题列表失败: {ex.Message}");
  720 + return new List<string>();
  721 + }
  722 + }
  723 +
  724 + /// <summary>
  725 + /// 释放资源
  726 + /// </summary>
  727 + public void Dispose()
  728 + {
  729 + try
  730 + {
  731 + _healthCheckTimer?.Dispose();
  732 + Log.Information("MQTT健康检查定时器已释放");
  733 + }
  734 + catch (Exception ex)
  735 + {
  736 + Log.Error($"释放MQTT资源时出错: {ex.Message}");
  737 + }
  738 + }
593 } 739 }
594 \ No newline at end of file 740 \ No newline at end of file
netcore/src/Modularity/Extend/NCC.Extend/NCC.Extend.xml
@@ -137,6 +137,11 @@ @@ -137,6 +137,11 @@
137 构造函数:初始化客户端和配置、注册事件 137 构造函数:初始化客户端和配置、注册事件
138 </summary> 138 </summary>
139 </member> 139 </member>
  140 + <member name="M:NCC.Extend.MqttPublisherService.HealthCheckCallback(System.Object)">
  141 + <summary>
  142 + 健康检查定时器回调方法
  143 + </summary>
  144 + </member>
140 <member name="M:NCC.Extend.MqttPublisherService.PublishAsync(System.String,System.String)"> 145 <member name="M:NCC.Extend.MqttPublisherService.PublishAsync(System.String,System.String)">
141 <summary> 146 <summary>
142 向指定 Topic 发布 MQTT 消息 147 向指定 Topic 发布 MQTT 消息
@@ -150,6 +155,11 @@ @@ -150,6 +155,11 @@
150 手动启动 MQTT 客户端(推荐在程序启动时调用) 155 手动启动 MQTT 客户端(推荐在程序启动时调用)
151 </summary> 156 </summary>
152 </member> 157 </member>
  158 + <member name="M:NCC.Extend.MqttPublisherService.EnsureSubscriptionAsync">
  159 + <summary>
  160 + 确保订阅状态正常,如果订阅丢失则重新订阅
  161 + </summary>
  162 + </member>
153 <member name="M:NCC.Extend.MqttPublisherService.IsClientOnlineAsync(System.String)"> 163 <member name="M:NCC.Extend.MqttPublisherService.IsClientOnlineAsync(System.String)">
154 <summary> 164 <summary>
155 查询指定clientId是否在线(通过EMQX管理API,带缓存) 165 查询指定clientId是否在线(通过EMQX管理API,带缓存)
@@ -181,6 +191,23 @@ @@ -181,6 +191,23 @@
181 </summary> 191 </summary>
182 <returns>重连是否成功</returns> 192 <returns>重连是否成功</returns>
183 </member> 193 </member>
  194 + <member name="M:NCC.Extend.MqttPublisherService.PerformHealthCheckAndRepairAsync">
  195 + <summary>
  196 + 定期健康检查和自动修复(建议每5分钟调用一次)
  197 + </summary>
  198 + <returns>修复是否成功</returns>
  199 + </member>
  200 + <member name="M:NCC.Extend.MqttPublisherService.GetSubscribedTopicsAsync">
  201 + <summary>
  202 + 获取当前订阅的主题列表
  203 + </summary>
  204 + <returns>订阅的主题列表</returns>
  205 + </member>
  206 + <member name="M:NCC.Extend.MqttPublisherService.Dispose">
  207 + <summary>
  208 + 释放资源
  209 + </summary>
  210 + </member>
184 <member name="T:NCC.Extend.UavAdvertisement.UavAdvertisementService"> 211 <member name="T:NCC.Extend.UavAdvertisement.UavAdvertisementService">
185 <summary> 212 <summary>
186 广告管理服务 213 广告管理服务
netcore/src/Modularity/Extend/NCC.Extend/UavOrderService.cs
@@ -274,6 +274,7 @@ namespace NCC.Extend.UavOrder @@ -274,6 +274,7 @@ namespace NCC.Extend.UavOrder
274 var RerutnEntity = await _db.UseTranAsync(async () => 274 var RerutnEntity = await _db.UseTranAsync(async () =>
275 { 275 {
276 var halfHourAgo = DateTime.Now.AddMinutes(-30); 276 var halfHourAgo = DateTime.Now.AddMinutes(-30);
  277 + // var halfHourAgo = DateTime.Now.AddMinutes(-0);
277 try 278 try
278 { 279 {
279 //通过设备去找到设备货道,使用With(SqlSugar.SqlWith.RowLock)添加行锁 280 //通过设备去找到设备货道,使用With(SqlSugar.SqlWith.RowLock)添加行锁