Commit 6232971783bee3df0ccd51b11ad0e81a6e366e78
1 parent
de2bd2f9
修复了一些BUG
Showing
10 changed files
with
168 additions
and
18 deletions
.DS_Store
No preview for this file type
netcore/.DS_Store
No preview for this file type
netcore/src/.DS_Store
No preview for this file type
netcore/src/Application/.DS_Store
No preview for this file type
netcore/src/Application/NCC.API/.DS_Store
No preview for this file type
netcore/src/Modularity/Extend/.DS_Store
No preview for this file type
netcore/src/Modularity/Extend/NCC.Extend.Entitys/uavDevice/Dto/UavDeviceCrInput.cs
netcore/src/Modularity/Extend/NCC.Extend/.DS_Store
No preview for this file type
netcore/src/Modularity/Extend/NCC.Extend/MqttPublisherService.cs
| ... | ... | @@ -69,24 +69,27 @@ public class MqttPublisherService : IMqttPublisherService, ITransient |
| 69 | 69 | }); |
| 70 | 70 | |
| 71 | 71 | // 连接断开事件 |
| 72 | - _mqttClient.UseDisconnectedHandler(e => | |
| 72 | + _mqttClient.UseDisconnectedHandler(async e => | |
| 73 | 73 | { |
| 74 | 74 | Log.Warning("MQTT 已断开连接"); |
| 75 | 75 | int retryInterval = 5; // 秒 |
| 76 | - // while (!_mqttClient.IsConnected) | |
| 77 | - // { | |
| 78 | - // try | |
| 79 | - // { | |
| 80 | - // Log.Information($"尝试在 {retryInterval} 秒后重新连接 MQTT..."); | |
| 81 | - // Task.Delay(TimeSpan.FromSeconds(retryInterval)); | |
| 82 | - // _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); | |
| 83 | - // Log.Information("MQTT 重连成功"); | |
| 84 | - // } | |
| 85 | - // catch (Exception ex) | |
| 86 | - // { | |
| 87 | - // Log.Error($"MQTT 重连失败: {ex.Message}"); | |
| 88 | - // } | |
| 89 | - // } | |
| 76 | + while (!_mqttClient.IsConnected) | |
| 77 | + { | |
| 78 | + try | |
| 79 | + { | |
| 80 | + Log.Information($"尝试在 {retryInterval} 秒后重新连接 MQTT..."); | |
| 81 | + await Task.Delay(TimeSpan.FromSeconds(retryInterval)); | |
| 82 | + await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); | |
| 83 | + Log.Information("MQTT 重连成功"); | |
| 84 | + break; // 重连成功后跳出循环 | |
| 85 | + } | |
| 86 | + catch (Exception ex) | |
| 87 | + { | |
| 88 | + Log.Error($"MQTT 重连失败: {ex.Message}"); | |
| 89 | + // 重连失败后继续循环,但增加延迟避免频繁重试 | |
| 90 | + await Task.Delay(TimeSpan.FromSeconds(retryInterval * 2)); | |
| 91 | + } | |
| 92 | + } | |
| 90 | 93 | }); |
| 91 | 94 | // 收到消息事件:处理设备回传的消息 |
| 92 | 95 | _mqttClient.UseApplicationMessageReceivedHandler(e => |
| ... | ... | @@ -123,17 +126,23 @@ public class MqttPublisherService : IMqttPublisherService, ITransient |
| 123 | 126 | var lastLog = _db.Queryable<UavMqttMessageLogEntity>().Where(p => p.DeviceId == deviceId && p.Payload == payload && p.CreatedAt >= twoSecondsAgo).Any(); |
| 124 | 127 | if (lastLog == false) |
| 125 | 128 | { |
| 126 | - | |
| 129 | + Log.Information($"[订阅消息] 处理新消息:设备 {deviceId},内容:{payload}"); | |
| 127 | 130 | var orderService = _serviceProvider.GetRequiredService<IUavOrderService>(); |
| 128 | 131 | orderService.HandleSubscribeMessage(payload); |
| 129 | 132 | } |
| 130 | 133 | else |
| 131 | 134 | { |
| 132 | - Log.Information($"[订阅消息] 接收到重复的消息,已忽略:{payload}"); | |
| 135 | + Log.Information($"[订阅消息] 接收到重复的消息,已忽略:设备 {deviceId},内容:{payload}"); | |
| 133 | 136 | } |
| 134 | 137 | //只保存正常消息 |
| 135 | 138 | _db.Insertable(logEntity).ExecuteCommand(); |
| 136 | 139 | } |
| 140 | + else | |
| 141 | + { | |
| 142 | + Log.Debug($"[订阅消息] 接收到确认消息,不处理:设备 {deviceId},内容:{payload}"); | |
| 143 | + // 保存确认消息到日志 | |
| 144 | + _db.Insertable(logEntity).ExecuteCommand(); | |
| 145 | + } | |
| 137 | 146 | } |
| 138 | 147 | } |
| 139 | 148 | catch (Exception ex) |
| ... | ... | @@ -157,7 +166,22 @@ public class MqttPublisherService : IMqttPublisherService, ITransient |
| 157 | 166 | // 如果未连接则先连接 |
| 158 | 167 | if (!_mqttClient.IsConnected) |
| 159 | 168 | { |
| 169 | + Log.Warning($"MQTT 未连接,尝试重新连接..."); | |
| 160 | 170 | await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); |
| 171 | + | |
| 172 | + // 等待连接稳定 | |
| 173 | + int retryCount = 0; | |
| 174 | + while (!_mqttClient.IsConnected && retryCount < 3) | |
| 175 | + { | |
| 176 | + await Task.Delay(1000); | |
| 177 | + retryCount++; | |
| 178 | + } | |
| 179 | + | |
| 180 | + if (!_mqttClient.IsConnected) | |
| 181 | + { | |
| 182 | + Log.Error("MQTT 连接失败,无法发送消息"); | |
| 183 | + return false; | |
| 184 | + } | |
| 161 | 185 | } |
| 162 | 186 | |
| 163 | 187 | // 构建 MQTT 消息 |
| ... | ... | @@ -189,6 +213,29 @@ public class MqttPublisherService : IMqttPublisherService, ITransient |
| 189 | 213 | catch (Exception ex) |
| 190 | 214 | { |
| 191 | 215 | Log.Error($"MQTT 发送失败:{ex.Message}"); |
| 216 | + | |
| 217 | + // 记录失败的消息到数据库 | |
| 218 | + try | |
| 219 | + { | |
| 220 | + var topicParts = topic.Split('/'); | |
| 221 | + UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity | |
| 222 | + { | |
| 223 | + Id = YitIdHelper.NextId().ToString(), | |
| 224 | + MessageType = "send", | |
| 225 | + Topic = topic, | |
| 226 | + Payload = payload, | |
| 227 | + DeviceId = topicParts.Length > 1 ? topicParts[1] : "unknown", | |
| 228 | + Processed = 0, | |
| 229 | + Status = "failed", | |
| 230 | + CreatedAt = DateTime.Now, | |
| 231 | + }; | |
| 232 | + _db.Insertable(logEntity).ExecuteCommand(); | |
| 233 | + } | |
| 234 | + catch (Exception logEx) | |
| 235 | + { | |
| 236 | + Log.Error($"记录失败消息到数据库时出错:{logEx.Message}"); | |
| 237 | + } | |
| 238 | + | |
| 192 | 239 | return false; |
| 193 | 240 | } |
| 194 | 241 | } |
| ... | ... | @@ -454,7 +501,93 @@ public class MqttPublisherService : IMqttPublisherService, ITransient |
| 454 | 501 | |
| 455 | 502 | #region 私有回调逻辑 |
| 456 | 503 | |
| 504 | + #endregion | |
| 457 | 505 | |
| 506 | + /// <summary> | |
| 507 | + /// 检查MQTT连接健康状态 | |
| 508 | + /// </summary> | |
| 509 | + /// <returns>连接状态信息</returns> | |
| 510 | + public async Task<dynamic> GetConnectionHealthAsync() | |
| 511 | + { | |
| 512 | + try | |
| 513 | + { | |
| 514 | + if (_mqttClient.IsConnected) | |
| 515 | + { | |
| 516 | + try | |
| 517 | + { | |
| 518 | + await _mqttClient.PingAsync(CancellationToken.None); | |
| 519 | + Log.Debug("MQTT Ping 成功"); | |
| 520 | + return new | |
| 521 | + { | |
| 522 | + IsConnected = true, | |
| 523 | + ConnectionTime = DateTime.Now, | |
| 524 | + LastPing = DateTime.Now, | |
| 525 | + ClientId = _mqttOptions.ClientId, | |
| 526 | + BrokerAddress = "mqtt.cqjiangzhichao.cn:1883" | |
| 527 | + }; | |
| 528 | + } | |
| 529 | + catch (Exception pingEx) | |
| 530 | + { | |
| 531 | + Log.Warning($"MQTT Ping 失败: {pingEx.Message}"); | |
| 532 | + return new | |
| 533 | + { | |
| 534 | + IsConnected = false, | |
| 535 | + ConnectionTime = (DateTime?)null, | |
| 536 | + LastPing = DateTime.Now, | |
| 537 | + ClientId = _mqttOptions.ClientId, | |
| 538 | + BrokerAddress = "mqtt.cqjiangzhichao.cn:1883", | |
| 539 | + PingError = pingEx.Message | |
| 540 | + }; | |
| 541 | + } | |
| 542 | + } | |
| 543 | + else | |
| 544 | + { | |
| 545 | + return new | |
| 546 | + { | |
| 547 | + IsConnected = false, | |
| 548 | + ConnectionTime = (DateTime?)null, | |
| 549 | + LastPing = DateTime.Now, | |
| 550 | + ClientId = _mqttOptions.ClientId, | |
| 551 | + BrokerAddress = "mqtt.cqjiangzhichao.cn:1883" | |
| 552 | + }; | |
| 553 | + } | |
| 554 | + } | |
| 555 | + catch (Exception ex) | |
| 556 | + { | |
| 557 | + Log.Error($"获取连接健康状态失败: {ex.Message}"); | |
| 558 | + return new | |
| 559 | + { | |
| 560 | + IsConnected = false, | |
| 561 | + Error = ex.Message, | |
| 562 | + Timestamp = DateTime.Now | |
| 563 | + }; | |
| 564 | + } | |
| 565 | + } | |
| 458 | 566 | |
| 459 | - #endregion | |
| 567 | + /// <summary> | |
| 568 | + /// 强制重新连接MQTT | |
| 569 | + /// </summary> | |
| 570 | + /// <returns>重连是否成功</returns> | |
| 571 | + public async Task<bool> ForceReconnectAsync() | |
| 572 | + { | |
| 573 | + try | |
| 574 | + { | |
| 575 | + if (_mqttClient.IsConnected) | |
| 576 | + { | |
| 577 | + await _mqttClient.DisconnectAsync(); | |
| 578 | + Log.Information("MQTT 主动断开连接"); | |
| 579 | + } | |
| 580 | + | |
| 581 | + await Task.Delay(2000); // 等待2秒后重连 | |
| 582 | + | |
| 583 | + await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); | |
| 584 | + Log.Information("MQTT 强制重连成功"); | |
| 585 | + return true; | |
| 586 | + } | |
| 587 | + catch (Exception ex) | |
| 588 | + { | |
| 589 | + Log.Error($"MQTT 强制重连失败: {ex.Message}"); | |
| 590 | + return false; | |
| 591 | + } | |
| 592 | + } | |
| 460 | 593 | } |
| 461 | 594 | \ No newline at end of file | ... | ... |
netcore/src/Modularity/Extend/NCC.Extend/NCC.Extend.xml
| ... | ... | @@ -169,6 +169,18 @@ |
| 169 | 169 | 清理过期的缓存数据(建议定期调用) |
| 170 | 170 | </summary> |
| 171 | 171 | </member> |
| 172 | + <member name="M:NCC.Extend.MqttPublisherService.GetConnectionHealthAsync"> | |
| 173 | + <summary> | |
| 174 | + 检查MQTT连接健康状态 | |
| 175 | + </summary> | |
| 176 | + <returns>连接状态信息</returns> | |
| 177 | + </member> | |
| 178 | + <member name="M:NCC.Extend.MqttPublisherService.ForceReconnectAsync"> | |
| 179 | + <summary> | |
| 180 | + 强制重新连接MQTT | |
| 181 | + </summary> | |
| 182 | + <returns>重连是否成功</returns> | |
| 183 | + </member> | |
| 172 | 184 | <member name="T:NCC.Extend.UavAdvertisement.UavAdvertisementService"> |
| 173 | 185 | <summary> |
| 174 | 186 | 广告管理服务 | ... | ... |