Commit 5244304259e6735ae2993be1ce2323a468254b85

Authored by “wangming”
1 parent 3c705a12

新增MQTT消息处理功能,优化订阅监控和消息队列管理,支持消息格式兼容性测试,更新文档以反映新功能。

netcore/src/Application/.DS_Store
No preview for this file type
netcore/src/Modularity/Extend/NCC.Extend.Entitys/Dto/MqttContent.cs
@@ -33,4 +33,40 @@ public class MqttContent @@ -33,4 +33,40 @@ public class MqttContent
33 /// 回调地址 33 /// 回调地址
34 /// </summary> 34 /// </summary>
35 public string tokenUrl { get; set; } 35 public string tokenUrl { get; set; }
  36 +
  37 + // 新增字段,支持无人机设备消息格式
  38 + /// <summary>
  39 + /// 设备ID
  40 + /// </summary>
  41 + public string id { get; set; }
  42 +
  43 + /// <summary>
  44 + /// 电池1 RFID
  45 + /// </summary>
  46 + public string rfid_1 { get; set; }
  47 +
  48 + /// <summary>
  49 + /// 电池2 RFID
  50 + /// </summary>
  51 + public string rfid_2 { get; set; }
  52 +
  53 + /// <summary>
  54 + /// 电池容量
  55 + /// </summary>
  56 + public string batteryCapacity { get; set; }
  57 +
  58 + /// <summary>
  59 + /// 是否开启成功
  60 + /// </summary>
  61 + public bool? isOpenSuccess { get; set; }
  62 +
  63 + /// <summary>
  64 + /// 是否为无人机
  65 + /// </summary>
  66 + public bool? isUav { get; set; }
  67 +
  68 + /// <summary>
  69 + /// 无人机编码
  70 + /// </summary>
  71 + public string uavcode { get; set; }
36 } 72 }
netcore/src/Modularity/Extend/NCC.Extend/MqttPublisherService.cs
1 using System; 1 using System;
  2 +using System.Collections.Concurrent;
2 using System.Collections.Generic; 3 using System.Collections.Generic;
3 using System.Text; 4 using System.Text;
4 using System.Text.Json; 5 using System.Text.Json;
@@ -24,6 +25,32 @@ using System.Linq; @@ -24,6 +25,32 @@ using System.Linq;
24 namespace NCC.Extend; 25 namespace NCC.Extend;
25 26
26 /// <summary> 27 /// <summary>
  28 +/// MQTT 消息信息
  29 +/// </summary>
  30 +public class MqttMessageInfo
  31 +{
  32 + /// <summary>
  33 + /// 消息主题
  34 + /// </summary>
  35 + public string Topic { get; set; }
  36 +
  37 + /// <summary>
  38 + /// 消息内容
  39 + /// </summary>
  40 + public string Payload { get; set; }
  41 +
  42 + /// <summary>
  43 + /// 接收时间
  44 + /// </summary>
  45 + public DateTime ReceivedTime { get; set; }
  46 +
  47 + /// <summary>
  48 + /// 设备ID
  49 + /// </summary>
  50 + public string DeviceId { get; set; }
  51 +}
  52 +
  53 +/// <summary>
27 /// MQTT 发布与订阅统一服务 54 /// MQTT 发布与订阅统一服务
28 /// </summary> 55 /// </summary>
29 public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposable 56 public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposable
@@ -42,7 +69,26 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab @@ -42,7 +69,26 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab
42 private readonly Dictionary<string, (bool isOnline, DateTime lastCheck)> _onlineStatusCache = new Dictionary<string, (bool, DateTime)>(); 69 private readonly Dictionary<string, (bool isOnline, DateTime lastCheck)> _onlineStatusCache = new Dictionary<string, (bool, DateTime)>();
43 private readonly object _cacheLock = new object(); 70 private readonly object _cacheLock = new object();
44 private readonly Timer _healthCheckTimer; // 健康检查定时器 71 private readonly Timer _healthCheckTimer; // 健康检查定时器
45 - private readonly int _healthCheckInterval = 5 * 60 * 1000; // 5分钟检查一次 72 + private readonly Timer _subscriptionMonitorTimer; // 订阅监控定时器
  73 + private readonly int _healthCheckInterval = 3 * 60 * 1000; // 3分钟检查一次
  74 + private readonly int _subscriptionMonitorInterval = 30 * 1000; // 30秒监控一次订阅状态
  75 +
  76 + // 消息队列处理
  77 + private readonly ConcurrentQueue<MqttMessageInfo> _messageQueue = new ConcurrentQueue<MqttMessageInfo>();
  78 + private readonly SemaphoreSlim _messageProcessingSemaphore = new SemaphoreSlim(5, 5); // 最多5个并发处理
  79 + private readonly Timer _messageProcessingTimer; // 消息处理定时器
  80 + private readonly int _messageProcessingInterval = 100; // 100ms处理一次队列
  81 + private readonly int _maxQueueSize = 10000; // 最大队列大小,防止内存溢出
  82 + private volatile int _currentQueueSize = 0; // 当前队列大小
  83 +
  84 + // 订阅状态监控
  85 + private DateTime _lastMessageReceivedTime = DateTime.Now;
  86 + private bool _subscriptionVerified = false;
  87 + private DateTime _lastRetryTime = DateTime.MinValue; // 上次重试时间
  88 + private readonly object _subscriptionLock = new object();
  89 + private const string SUBSCRIPTION_TOPIC = "device/+/response";
  90 + private const int RETRY_INTERVAL_MINUTES = 5; // 重试间隔5分钟
  91 +
46 /// <summary> 92 /// <summary>
47 /// 构造函数:初始化客户端和配置、注册事件 93 /// 构造函数:初始化客户端和配置、注册事件
48 /// </summary> 94 /// </summary>
@@ -59,115 +105,135 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab @@ -59,115 +105,135 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab
59 .WithTcpServer("mqtt.cqjiangzhichao.cn", 1883) // Broker 地址 105 .WithTcpServer("mqtt.cqjiangzhichao.cn", 1883) // Broker 地址
60 .WithCredentials("wrjservice", "P@ssw0rd") // 账号密码 106 .WithCredentials("wrjservice", "P@ssw0rd") // 账号密码
61 .WithClientId("server_publisher") // 客户端 ID,必须唯一 107 .WithClientId("server_publisher") // 客户端 ID,必须唯一
  108 + .WithKeepAlivePeriod(TimeSpan.FromSeconds(60)) // 保持连接心跳
  109 + .WithCleanSession(false) // 保持会话状态
62 .Build(); 110 .Build();
63 111
64 // 连接成功事件:订阅所有设备的响应主题(如 device/xxx/response) 112 // 连接成功事件:订阅所有设备的响应主题(如 device/xxx/response)
65 _mqttClient.UseConnectedHandler(async e => 113 _mqttClient.UseConnectedHandler(async e =>
66 { 114 {
67 Log.Information("MQTT 已连接成功"); 115 Log.Information("MQTT 已连接成功");
  116 + _subscriptionVerified = false; // 重置订阅验证状态
  117 +
68 // 订阅所有设备的响应主题(+ 代表通配符) 118 // 订阅所有设备的响应主题(+ 代表通配符)
69 - await _mqttClient.SubscribeAsync("device/+/response");  
70 - Log.Information("已订阅通用响应 topic: device/+/response"); 119 + await EnsureSubscriptionAsync();
71 }); 120 });
72 121
73 // 连接断开事件 122 // 连接断开事件
74 - _mqttClient.UseDisconnectedHandler(async e => 123 + _mqttClient.UseDisconnectedHandler(e =>
75 { 124 {
76 - Log.Warning("MQTT 已断开连接");  
77 - int retryInterval = 5; // 秒  
78 - while (!_mqttClient.IsConnected) 125 + Log.Warning($"MQTT 已断开连接,原因: {e.Reason}");
  126 + _subscriptionVerified = false; // 重置订阅验证状态
  127 +
  128 + // 使用Task.Run避免阻塞MQTT客户端线程
  129 + _ = Task.Run(async () =>
79 { 130 {
80 - try 131 + int retryInterval = 5; // 秒
  132 + int maxRetries = 10; // 最大重试次数
  133 + int retryCount = 0;
  134 +
  135 + while (!_mqttClient.IsConnected && retryCount < maxRetries)
81 { 136 {
82 - Log.Information($"尝试在 {retryInterval} 秒后重新连接 MQTT...");  
83 - await Task.Delay(TimeSpan.FromSeconds(retryInterval));  
84 - await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);  
85 - Log.Information("MQTT 重连成功");  
86 -  
87 - // 重连成功后重新订阅主题  
88 try 137 try
89 { 138 {
90 - await _mqttClient.SubscribeAsync("device/+/response");  
91 - Log.Information("重连后重新订阅主题成功: device/+/response"); 139 + retryCount++;
  140 + Log.Information($"尝试第 {retryCount} 次重新连接 MQTT,{retryInterval} 秒后开始...");
  141 + await Task.Delay(TimeSpan.FromSeconds(retryInterval));
  142 +
  143 + await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
  144 + Log.Information("MQTT 重连成功");
  145 +
  146 + // 重连成功后重新订阅主题
  147 + await EnsureSubscriptionAsync();
  148 +
  149 + break; // 重连成功后跳出循环
92 } 150 }
93 - catch (Exception subEx) 151 + catch (Exception ex)
94 { 152 {
95 - Log.Error($"重连后重新订阅主题失败: {subEx.Message}"); 153 + Log.Error($"MQTT 第 {retryCount} 次重连失败: {ex.Message}");
  154 +
  155 + if (retryCount >= maxRetries)
  156 + {
  157 + Log.Error($"MQTT 重连失败次数达到上限 ({maxRetries}),停止重连");
  158 + break;
  159 + }
  160 +
  161 + // 重连失败后增加延迟,避免频繁重试
  162 + retryInterval = Math.Min(retryInterval * 2, 60); // 最大延迟60秒
  163 + await Task.Delay(TimeSpan.FromSeconds(retryInterval));
96 } 164 }
97 -  
98 - break; // 重连成功后跳出循环  
99 } 165 }
100 - catch (Exception ex)  
101 - {  
102 - Log.Error($"MQTT 重连失败: {ex.Message}");  
103 - // 重连失败后继续循环,但增加延迟避免频繁重试  
104 - await Task.Delay(TimeSpan.FromSeconds(retryInterval * 2));  
105 - }  
106 - } 166 + });
107 }); 167 });
108 - // 收到消息事件:处理设备回传的消息 168 +
  169 + // 收到消息事件:快速入队,避免阻塞
109 _mqttClient.UseApplicationMessageReceivedHandler(e => 170 _mqttClient.UseApplicationMessageReceivedHandler(e =>
110 { 171 {
111 try 172 try
112 { 173 {
  174 + // 更新最后收到消息的时间和验证状态
  175 + lock (_subscriptionLock)
  176 + {
  177 + _lastMessageReceivedTime = DateTime.Now;
  178 + _subscriptionVerified = true; // 只有收到实际消息才标记为已验证
  179 + }
  180 +
113 // 获取 topic 和 payload 内容 181 // 获取 topic 和 payload 内容
114 var topic = e.ApplicationMessage.Topic; 182 var topic = e.ApplicationMessage.Topic;
115 var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? Array.Empty<byte>()); 183 var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? Array.Empty<byte>());
116 - Log.Information($"收到 MQTT 消息,Topic: {topic}, Payload: {payload}");  
117 - // 反序列化 JSON 内容为 MqttContent 实体  
118 - var content = JsonSerializer.Deserialize<MqttContent>(payload);  
119 - if (content != null)  
120 - {  
121 - // 从 topic 中提取设备 ID,例如 device/abc123/response  
122 - var topicParts = topic.Split('/');  
123 - var deviceId = topicParts.Length >= 2 ? topicParts[1] : "unknown";  
124 - Log.Information($"{topicParts[0]}: {deviceId})");  
125 - UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity  
126 - {  
127 - Id = YitIdHelper.NextId().ToString(),  
128 - MessageType = "response",  
129 - Topic = topic,  
130 - Payload = payload,  
131 - DeviceId = deviceId,  
132 - Processed = 1,  
133 - Status = "success",  
134 - CreatedAt = DateTime.Now,  
135 - };  
136 - if (!payload.Contains("\"status\":\"received\"")) 184 +
  185 + // 从 topic 中提取设备 ID
  186 + var topicParts = topic.Split('/');
  187 + var deviceId = topicParts.Length >= 2 ? topicParts[1] : "unknown";
  188 +
  189 + // 检查队列大小,防止内存溢出
  190 + if (_currentQueueSize >= _maxQueueSize)
  191 + {
  192 + Log.Warning($"消息队列已满({_maxQueueSize}),丢弃消息: Topic: {topic}, DeviceId: {deviceId}");
  193 + // 记录丢弃的消息(异步执行,不阻塞)
  194 + _ = Task.Run(async () =>
137 { 195 {
138 - //判断两秒内,Payload一样的话,就不处理了  
139 - var twoSecondsAgo = DateTime.Now.AddSeconds(-2);  
140 - var lastLog = _db.Queryable<UavMqttMessageLogEntity>().Where(p => p.DeviceId == deviceId && p.Payload == payload && p.CreatedAt >= twoSecondsAgo).Any();  
141 - if (lastLog == false) 196 + await LogFailedMessageAsync(new MqttMessageInfo
142 { 197 {
143 - Log.Information($"[订阅消息] 处理新消息:设备 {deviceId},内容:{payload}");  
144 - var orderService = _serviceProvider.GetRequiredService<IUavOrderService>();  
145 - orderService.HandleSubscribeMessage(payload);  
146 - }  
147 - else  
148 - {  
149 - Log.Information($"[订阅消息] 接收到重复的消息,已忽略:设备 {deviceId},内容:{payload}");  
150 - }  
151 - //只保存正常消息  
152 - _db.Insertable(logEntity).ExecuteCommand();  
153 - }  
154 - else  
155 - {  
156 - Log.Debug($"[订阅消息] 接收到确认消息,不处理:设备 {deviceId},内容:{payload}");  
157 - // 保存确认消息到日志  
158 - _db.Insertable(logEntity).ExecuteCommand();  
159 - } 198 + Topic = topic,
  199 + Payload = payload,
  200 + ReceivedTime = DateTime.Now,
  201 + DeviceId = deviceId
  202 + }, "队列已满,消息被丢弃");
  203 + });
  204 + return;
160 } 205 }
  206 +
  207 + // 快速入队,避免阻塞MQTT接收
  208 + var messageInfo = new MqttMessageInfo
  209 + {
  210 + Topic = topic,
  211 + Payload = payload,
  212 + ReceivedTime = DateTime.Now,
  213 + DeviceId = deviceId
  214 + };
  215 +
  216 + _messageQueue.Enqueue(messageInfo);
  217 + Interlocked.Increment(ref _currentQueueSize);
  218 + Log.Debug($"MQTT消息已入队,Topic: {topic}, DeviceId: {deviceId}, 队列长度: {_currentQueueSize}");
161 } 219 }
162 catch (Exception ex) 220 catch (Exception ex)
163 { 221 {
164 - Log.Error($"消息处理异常: {ex.Message}"); 222 + Log.Error($"消息入队异常: {ex.Message}");
165 } 223 }
166 }); 224 });
167 225
168 // 初始化健康检查定时器 226 // 初始化健康检查定时器
169 _healthCheckTimer = new Timer(HealthCheckCallback, null, _healthCheckInterval, _healthCheckInterval); 227 _healthCheckTimer = new Timer(HealthCheckCallback, null, _healthCheckInterval, _healthCheckInterval);
170 - Log.Information("MQTT健康检查定时器已启动,每5分钟检查一次"); 228 + Log.Information("MQTT健康检查定时器已启动,每3分钟检查一次");
  229 +
  230 + // 初始化订阅监控定时器
  231 + _subscriptionMonitorTimer = new Timer(SubscriptionMonitorCallback, null, _subscriptionMonitorInterval, _subscriptionMonitorInterval);
  232 + Log.Information("MQTT订阅监控定时器已启动,每30秒监控一次");
  233 +
  234 + // 初始化消息处理定时器
  235 + _messageProcessingTimer = new Timer(MessageProcessingCallback, null, _messageProcessingInterval, _messageProcessingInterval);
  236 + Log.Information("MQTT消息处理定时器已启动,每100ms处理一次队列");
171 } 237 }
172 238
173 /// <summary> 239 /// <summary>
@@ -187,6 +253,225 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab @@ -187,6 +253,225 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab
187 } 253 }
188 254
189 /// <summary> 255 /// <summary>
  256 + /// 订阅监控定时器回调方法
  257 + /// </summary>
  258 + private async void SubscriptionMonitorCallback(object state)
  259 + {
  260 + try
  261 + {
  262 + Log.Debug("执行定时MQTT订阅监控...");
  263 + await VerifySubscriptionAsync();
  264 + }
  265 + catch (Exception ex)
  266 + {
  267 + Log.Error($"订阅监控执行失败: {ex.Message}");
  268 + }
  269 + }
  270 +
  271 + /// <summary>
  272 + /// 消息处理定时器回调方法
  273 + /// </summary>
  274 + private void MessageProcessingCallback(object state)
  275 + {
  276 + try
  277 + {
  278 + // 处理队列中的消息,最多同时处理5个
  279 + var tasks = new List<Task>();
  280 + for (int i = 0; i < 5 && _currentQueueSize > 0; i++)
  281 + {
  282 + if (_messageQueue.TryDequeue(out var messageInfo))
  283 + {
  284 + Interlocked.Decrement(ref _currentQueueSize);
  285 + tasks.Add(ProcessMessageAsync(messageInfo));
  286 + }
  287 + }
  288 +
  289 + if (tasks.Count > 0)
  290 + {
  291 + // 使用Task.Run避免阻塞定时器线程
  292 + Task.Run(async () =>
  293 + {
  294 + try
  295 + {
  296 + await Task.WhenAll(tasks);
  297 + }
  298 + catch (Exception ex)
  299 + {
  300 + Log.Error($"消息处理任务执行失败: {ex.Message}");
  301 + }
  302 + });
  303 + }
  304 + }
  305 + catch (Exception ex)
  306 + {
  307 + Log.Error($"消息处理定时器执行失败: {ex.Message}");
  308 + }
  309 + }
  310 +
  311 + /// <summary>
  312 + /// 异步处理单个消息
  313 + /// </summary>
  314 + /// <param name="messageInfo">消息信息</param>
  315 + /// <returns></returns>
  316 + private async Task ProcessMessageAsync(MqttMessageInfo messageInfo)
  317 + {
  318 + await _messageProcessingSemaphore.WaitAsync();
  319 + try
  320 + {
  321 + Log.Information($"开始处理MQTT消息,Topic: {messageInfo.Topic}, DeviceId: {messageInfo.DeviceId}");
  322 +
  323 + // 验证消息格式
  324 + if (string.IsNullOrWhiteSpace(messageInfo.Payload))
  325 + {
  326 + Log.Warning($"消息内容为空,跳过处理: Topic: {messageInfo.Topic}, DeviceId: {messageInfo.DeviceId}");
  327 + return;
  328 + }
  329 +
  330 + // 反序列化 JSON 内容为 MqttContent 实体
  331 + MqttContent content = null;
  332 + try
  333 + {
  334 + content = JsonSerializer.Deserialize<MqttContent>(messageInfo.Payload);
  335 + }
  336 + catch (JsonException jsonEx)
  337 + {
  338 + Log.Error($"JSON反序列化失败: {jsonEx.Message}, Payload: {messageInfo.Payload}");
  339 + // 即使JSON解析失败,也要记录到日志
  340 + await LogFailedMessageAsync(messageInfo, "JSON解析失败");
  341 + return;
  342 + }
  343 +
  344 + if (content != null)
  345 + {
  346 + UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity
  347 + {
  348 + Id = YitIdHelper.NextId().ToString(),
  349 + MessageType = "response",
  350 + Topic = messageInfo.Topic,
  351 + Payload = messageInfo.Payload,
  352 + DeviceId = messageInfo.DeviceId,
  353 + Processed = 1,
  354 + Status = "success",
  355 + CreatedAt = messageInfo.ReceivedTime,
  356 + };
  357 +
  358 + if (!messageInfo.Payload.Contains("\"status\":\"received\""))
  359 + {
  360 + // 判断两秒内,Payload一样的话,就不处理了
  361 + var twoSecondsAgo = DateTime.Now.AddSeconds(-2);
  362 + bool lastLog = false;
  363 + try
  364 + {
  365 + lastLog = await _db.Queryable<UavMqttMessageLogEntity>()
  366 + .Where(p => p.DeviceId == messageInfo.DeviceId &&
  367 + p.Payload == messageInfo.Payload &&
  368 + p.CreatedAt >= twoSecondsAgo)
  369 + .AnyAsync();
  370 + }
  371 + catch (Exception dbEx)
  372 + {
  373 + Log.Error($"数据库查询失败: {dbEx.Message}, DeviceId: {messageInfo.DeviceId}");
  374 + // 数据库查询失败时,为了安全起见,不处理消息
  375 + await LogFailedMessageAsync(messageInfo, "数据库查询失败");
  376 + return;
  377 + }
  378 +
  379 + if (!lastLog)
  380 + {
  381 + Log.Information($"[订阅消息] 处理新消息:设备 {messageInfo.DeviceId},内容:{messageInfo.Payload}");
  382 + try
  383 + {
  384 + var orderService = _serviceProvider.GetRequiredService<IUavOrderService>();
  385 + await Task.Run(() => orderService.HandleSubscribeMessage(messageInfo.Payload));
  386 + }
  387 + catch (Exception businessEx)
  388 + {
  389 + Log.Error($"业务逻辑处理失败: {businessEx.Message}, DeviceId: {messageInfo.DeviceId}");
  390 + logEntity.Status = "business_error";
  391 + logEntity.Processed = 0;
  392 + }
  393 + }
  394 + else
  395 + {
  396 + Log.Information($"[订阅消息] 接收到重复的消息,已忽略:设备 {messageInfo.DeviceId},内容:{messageInfo.Payload}");
  397 + logEntity.Status = "duplicate";
  398 + }
  399 +
  400 + // 保存消息到日志
  401 + try
  402 + {
  403 + await _db.Insertable(logEntity).ExecuteCommandAsync();
  404 + }
  405 + catch (Exception logEx)
  406 + {
  407 + Log.Error($"保存消息日志失败: {logEx.Message}, DeviceId: {messageInfo.DeviceId}");
  408 + }
  409 + }
  410 + else
  411 + {
  412 + Log.Debug($"[订阅消息] 接收到确认消息,不处理:设备 {messageInfo.DeviceId},内容:{messageInfo.Payload}");
  413 + // 保存确认消息到日志
  414 + try
  415 + {
  416 + await _db.Insertable(logEntity).ExecuteCommandAsync();
  417 + }
  418 + catch (Exception logEx)
  419 + {
  420 + Log.Error($"保存确认消息日志失败: {logEx.Message}, DeviceId: {messageInfo.DeviceId}");
  421 + }
  422 + }
  423 + }
  424 + else
  425 + {
  426 + Log.Warning($"JSON反序列化结果为空: Payload: {messageInfo.Payload}");
  427 + await LogFailedMessageAsync(messageInfo, "JSON反序列化结果为空");
  428 + }
  429 +
  430 + Log.Debug($"MQTT消息处理完成,Topic: {messageInfo.Topic}, DeviceId: {messageInfo.DeviceId}");
  431 + }
  432 + catch (Exception ex)
  433 + {
  434 + Log.Error($"处理MQTT消息异常: {ex.Message}, Topic: {messageInfo.Topic}, DeviceId: {messageInfo.DeviceId}");
  435 + await LogFailedMessageAsync(messageInfo, $"处理异常: {ex.Message}");
  436 + }
  437 + finally
  438 + {
  439 + _messageProcessingSemaphore.Release();
  440 + }
  441 + }
  442 +
  443 + /// <summary>
  444 + /// 记录失败的消息
  445 + /// </summary>
  446 + /// <param name="messageInfo">消息信息</param>
  447 + /// <param name="errorReason">失败原因</param>
  448 + /// <returns></returns>
  449 + private async Task LogFailedMessageAsync(MqttMessageInfo messageInfo, string errorReason)
  450 + {
  451 + try
  452 + {
  453 + var logEntity = new UavMqttMessageLogEntity
  454 + {
  455 + Id = YitIdHelper.NextId().ToString(),
  456 + MessageType = "response",
  457 + Topic = messageInfo.Topic,
  458 + Payload = messageInfo.Payload,
  459 + DeviceId = messageInfo.DeviceId,
  460 + Processed = 0,
  461 + Status = "failed",
  462 + CreatedAt = messageInfo.ReceivedTime,
  463 + };
  464 +
  465 + await _db.Insertable(logEntity).ExecuteCommandAsync();
  466 + Log.Information($"失败消息已记录: DeviceId: {messageInfo.DeviceId}, 原因: {errorReason}");
  467 + }
  468 + catch (Exception ex)
  469 + {
  470 + Log.Error($"记录失败消息时出错: {ex.Message}, DeviceId: {messageInfo.DeviceId}");
  471 + }
  472 + }
  473 +
  474 + /// <summary>
190 /// 向指定 Topic 发布 MQTT 消息 475 /// 向指定 Topic 发布 MQTT 消息
191 /// </summary> 476 /// </summary>
192 /// <param name="topic">主题路径,例如 device/{id}/command</param> 477 /// <param name="topic">主题路径,例如 device/{id}/command</param>
@@ -278,13 +563,28 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab @@ -278,13 +563,28 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab
278 /// </summary> 563 /// </summary>
279 public async Task StartAsync() 564 public async Task StartAsync()
280 { 565 {
281 - if (!_mqttClient.IsConnected) 566 + try
282 { 567 {
283 - await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); 568 + Log.Information("启动MQTT客户端...");
  569 +
  570 + if (!_mqttClient.IsConnected)
  571 + {
  572 + await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
  573 + Log.Information("MQTT客户端连接成功");
  574 + }
  575 +
  576 + // 确保订阅状态正常
  577 + await EnsureSubscriptionAsync();
  578 +
  579 + // 注意:不立即标记为已验证,等待实际消息接收来验证订阅是否真正生效
  580 +
  581 + Log.Information("MQTT客户端启动完成,等待实际消息验证订阅状态");
  582 + }
  583 + catch (Exception ex)
  584 + {
  585 + Log.Error($"启动MQTT客户端失败: {ex.Message}");
  586 + throw;
284 } 587 }
285 -  
286 - // 确保订阅状态正常  
287 - await EnsureSubscriptionAsync();  
288 } 588 }
289 589
290 /// <summary> 590 /// <summary>
@@ -298,17 +598,102 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab @@ -298,17 +598,102 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab
298 { 598 {
299 // 检查订阅状态,如果订阅丢失则重新订阅 599 // 检查订阅状态,如果订阅丢失则重新订阅
300 Log.Information("检查MQTT订阅状态..."); 600 Log.Information("检查MQTT订阅状态...");
301 - await _mqttClient.SubscribeAsync("device/+/response"); 601 + await _mqttClient.SubscribeAsync(SUBSCRIPTION_TOPIC);
302 Log.Information("MQTT订阅状态正常: device/+/response"); 602 Log.Information("MQTT订阅状态正常: device/+/response");
  603 +
  604 + // 注意:不立即标记为已验证,等待实际消息接收来验证订阅是否真正生效
303 } 605 }
304 } 606 }
305 catch (Exception ex) 607 catch (Exception ex)
306 { 608 {
307 Log.Warning($"检查订阅状态时出错: {ex.Message}"); 609 Log.Warning($"检查订阅状态时出错: {ex.Message}");
  610 + // 订阅失败,尝试重新订阅
  611 + await RetrySubscriptionAsync();
  612 + }
  613 + }
  614 +
  615 + /// <summary>
  616 + /// 验证订阅是否真的生效
  617 + /// </summary>
  618 + private async Task VerifySubscriptionAsync()
  619 + {
  620 + try
  621 + {
  622 + lock (_subscriptionLock)
  623 + {
  624 + // 检查是否长时间没有收到消息
  625 + var timeSinceLastMessage = DateTime.Now - _lastMessageReceivedTime;
  626 + if (timeSinceLastMessage.TotalMinutes > 2 && _subscriptionVerified)
  627 + {
  628 + Log.Warning($"订阅可能失效,已 {timeSinceLastMessage.TotalMinutes:F1} 分钟未收到消息");
  629 + _subscriptionVerified = false;
  630 + }
  631 + }
  632 +
  633 + // 如果订阅未验证,且距离上次重试超过5分钟,才尝试重新订阅
  634 + if (!_subscriptionVerified)
  635 + {
  636 + var timeSinceLastRetry = DateTime.Now - _lastRetryTime;
  637 + if (timeSinceLastRetry.TotalMinutes >= RETRY_INTERVAL_MINUTES)
  638 + {
  639 + Log.Warning("订阅验证失败,尝试重新订阅...");
  640 + await RetrySubscriptionAsync();
  641 + _lastRetryTime = DateTime.Now;
  642 + }
  643 + else
  644 + {
  645 + Log.Debug($"订阅未验证,但距离上次重试仅 {timeSinceLastRetry.TotalMinutes:F1} 分钟,跳过重试");
  646 + }
  647 + }
  648 + }
  649 + catch (Exception ex)
  650 + {
  651 + Log.Error($"验证订阅状态失败: {ex.Message}");
308 } 652 }
309 } 653 }
310 654
311 /// <summary> 655 /// <summary>
  656 + /// 重试订阅
  657 + /// </summary>
  658 + private async Task RetrySubscriptionAsync()
  659 + {
  660 + try
  661 + {
  662 + if (_mqttClient.IsConnected)
  663 + {
  664 + Log.Information("尝试重新订阅...");
  665 +
  666 + // 先取消现有订阅
  667 + try
  668 + {
  669 + await _mqttClient.UnsubscribeAsync(SUBSCRIPTION_TOPIC);
  670 + Log.Information("已取消现有订阅");
  671 + }
  672 + catch (Exception ex)
  673 + {
  674 + Log.Warning($"取消订阅失败: {ex.Message}");
  675 + }
  676 +
  677 + // 重新订阅
  678 + await _mqttClient.SubscribeAsync(SUBSCRIPTION_TOPIC);
  679 + Log.Information("重新订阅成功");
  680 +
  681 + // 重置验证状态,等待实际消息接收来验证订阅是否真正生效
  682 + _subscriptionVerified = false;
  683 + _lastMessageReceivedTime = DateTime.Now;
  684 +
  685 + // 注意:不立即标记为已验证,等待实际消息接收来验证
  686 + }
  687 + }
  688 + catch (Exception ex)
  689 + {
  690 + Log.Error($"重试订阅失败: {ex.Message}");
  691 + }
  692 + }
  693 +
  694 +
  695 +
  696 + /// <summary>
312 /// 查询指定clientId是否在线(通过EMQX管理API,带缓存) 697 /// 查询指定clientId是否在线(通过EMQX管理API,带缓存)
313 /// </summary> 698 /// </summary>
314 /// <param name="clientId">要查询的客户端ID</param> 699 /// <param name="clientId">要查询的客户端ID</param>
@@ -390,7 +775,7 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab @@ -390,7 +775,7 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab
390 } 775 }
391 return isOnline; 776 return isOnline;
392 } 777 }
393 - catch (TaskCanceledException ex) 778 + catch (TaskCanceledException)
394 { 779 {
395 Log.Warning($"查询设备 {clientId} 在线状态超时,重试次数: {retry + 1}"); 780 Log.Warning($"查询设备 {clientId} 在线状态超时,重试次数: {retry + 1}");
396 if (retry == maxRetries) 781 if (retry == maxRetries)
@@ -629,19 +1014,32 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab @@ -629,19 +1014,32 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab
629 { 1014 {
630 try 1015 try
631 { 1016 {
  1017 + Log.Information("开始强制重连MQTT...");
  1018 +
632 if (_mqttClient.IsConnected) 1019 if (_mqttClient.IsConnected)
633 { 1020 {
634 await _mqttClient.DisconnectAsync(); 1021 await _mqttClient.DisconnectAsync();
635 Log.Information("MQTT 主动断开连接"); 1022 Log.Information("MQTT 主动断开连接");
636 } 1023 }
637 1024
  1025 + // 重置订阅状态
  1026 + lock (_subscriptionLock)
  1027 + {
  1028 + _subscriptionVerified = false;
  1029 + _lastMessageReceivedTime = DateTime.Now;
  1030 + }
  1031 +
638 await Task.Delay(2000); // 等待2秒后重连 1032 await Task.Delay(2000); // 等待2秒后重连
639 1033
640 await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); 1034 await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
641 Log.Information("MQTT 强制重连成功"); 1035 Log.Information("MQTT 强制重连成功");
642 1036
643 - // 重连后重新订阅 1037 + // 重连后重新订阅并验证
644 await EnsureSubscriptionAsync(); 1038 await EnsureSubscriptionAsync();
  1039 +
  1040 + // 注意:不立即标记为已验证,等待实际消息接收来验证订阅是否真正生效
  1041 +
  1042 + Log.Information("MQTT 强制重连完成,等待实际消息验证订阅状态");
645 return true; 1043 return true;
646 } 1044 }
647 catch (Exception ex) 1045 catch (Exception ex)
@@ -652,7 +1050,7 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab @@ -652,7 +1050,7 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab
652 } 1050 }
653 1051
654 /// <summary> 1052 /// <summary>
655 - /// 定期健康检查和自动修复(建议每5分钟调用一次) 1053 + /// 定期健康检查和自动修复(建议每3分钟调用一次)
656 /// </summary> 1054 /// </summary>
657 /// <returns>修复是否成功</returns> 1055 /// <returns>修复是否成功</returns>
658 public async Task<bool> PerformHealthCheckAndRepairAsync() 1056 public async Task<bool> PerformHealthCheckAndRepairAsync()
@@ -689,6 +1087,19 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab @@ -689,6 +1087,19 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab
689 return true; 1087 return true;
690 } 1088 }
691 1089
  1090 + // 检查订阅状态和消息接收情况
  1091 + lock (_subscriptionLock)
  1092 + {
  1093 + var timeSinceLastMessage = DateTime.Now - _lastMessageReceivedTime;
  1094 + if (timeSinceLastMessage.TotalMinutes > 10)
  1095 + {
  1096 + Log.Warning($"长时间未收到消息({timeSinceLastMessage.TotalMinutes:F1}分钟),订阅可能有问题");
  1097 + }
  1098 + }
  1099 +
  1100 + // 验证订阅状态
  1101 + await VerifySubscriptionAsync();
  1102 +
692 // 检查订阅状态 1103 // 检查订阅状态
693 await EnsureSubscriptionAsync(); 1104 await EnsureSubscriptionAsync();
694 1105
@@ -706,18 +1117,208 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab @@ -706,18 +1117,208 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab
706 /// 获取当前订阅的主题列表 1117 /// 获取当前订阅的主题列表
707 /// </summary> 1118 /// </summary>
708 /// <returns>订阅的主题列表</returns> 1119 /// <returns>订阅的主题列表</returns>
709 - public async Task<List<string>> GetSubscribedTopicsAsync() 1120 + public Task<List<string>> GetSubscribedTopicsAsync()
710 { 1121 {
711 try 1122 try
712 { 1123 {
713 // 注意:MQTTnet可能不直接提供获取订阅列表的方法 1124 // 注意:MQTTnet可能不直接提供获取订阅列表的方法
714 // 这里返回我们已知的订阅主题 1125 // 这里返回我们已知的订阅主题
715 - return new List<string> { "device/+/response" }; 1126 + return Task.FromResult(new List<string> { SUBSCRIPTION_TOPIC });
716 } 1127 }
717 catch (Exception ex) 1128 catch (Exception ex)
718 { 1129 {
719 Log.Error($"获取订阅主题列表失败: {ex.Message}"); 1130 Log.Error($"获取订阅主题列表失败: {ex.Message}");
720 - return new List<string>(); 1131 + return Task.FromResult(new List<string>());
  1132 + }
  1133 + }
  1134 +
  1135 + /// <summary>
  1136 + /// 获取订阅状态信息
  1137 + /// </summary>
  1138 + /// <returns>订阅状态信息</returns>
  1139 + public dynamic GetSubscriptionStatus()
  1140 + {
  1141 + try
  1142 + {
  1143 + lock (_subscriptionLock)
  1144 + {
  1145 + var timeSinceLastMessage = DateTime.Now - _lastMessageReceivedTime;
  1146 + return new
  1147 + {
  1148 + IsConnected = _mqttClient.IsConnected,
  1149 + SubscriptionVerified = _subscriptionVerified,
  1150 + LastMessageReceived = _lastMessageReceivedTime,
  1151 + TimeSinceLastMessage = $"{timeSinceLastMessage.TotalMinutes:F1} 分钟",
  1152 + SubscriptionTopic = SUBSCRIPTION_TOPIC,
  1153 + IsHealthy = timeSinceLastMessage.TotalMinutes < 10,
  1154 + QueueSize = _currentQueueSize,
  1155 + MaxQueueSize = _maxQueueSize
  1156 + };
  1157 + }
  1158 + }
  1159 + catch (Exception ex)
  1160 + {
  1161 + Log.Error($"获取订阅状态失败: {ex.Message}");
  1162 + return new { Error = ex.Message };
  1163 + }
  1164 + }
  1165 +
  1166 + /// <summary>
  1167 + /// 检查连接稳定性
  1168 + /// </summary>
  1169 + /// <returns>连接稳定性信息</returns>
  1170 + public async Task<dynamic> CheckConnectionStabilityAsync()
  1171 + {
  1172 + try
  1173 + {
  1174 + // 如果连接正常,进行Ping测试
  1175 + if (_mqttClient.IsConnected)
  1176 + {
  1177 + try
  1178 + {
  1179 + var pingStartTime = DateTime.Now;
  1180 + await _mqttClient.PingAsync(CancellationToken.None);
  1181 + var pingDuration = DateTime.Now - pingStartTime;
  1182 +
  1183 + Log.Information($"连接稳定性检查通过,Ping耗时: {pingDuration.TotalMilliseconds:F2}ms");
  1184 +
  1185 + return new
  1186 + {
  1187 + IsConnected = true,
  1188 + PingSuccess = true,
  1189 + PingDuration = pingDuration.TotalMilliseconds,
  1190 + ConnectionTime = DateTime.Now,
  1191 + LastMessageReceived = _lastMessageReceivedTime,
  1192 + SubscriptionVerified = _subscriptionVerified,
  1193 + QueueSize = _currentQueueSize,
  1194 + MaxQueueSize = _maxQueueSize,
  1195 + HealthCheckInterval = _healthCheckInterval,
  1196 + SubscriptionMonitorInterval = _subscriptionMonitorInterval,
  1197 + MessageProcessingInterval = _messageProcessingInterval
  1198 + };
  1199 + }
  1200 + catch (Exception pingEx)
  1201 + {
  1202 + Log.Warning($"连接稳定性检查失败,Ping失败: {pingEx.Message}");
  1203 + return new
  1204 + {
  1205 + IsConnected = false,
  1206 + PingSuccess = false,
  1207 + PingError = pingEx.Message,
  1208 + ConnectionTime = DateTime.Now,
  1209 + LastMessageReceived = _lastMessageReceivedTime,
  1210 + SubscriptionVerified = _subscriptionVerified,
  1211 + QueueSize = _currentQueueSize,
  1212 + MaxQueueSize = _maxQueueSize,
  1213 + HealthCheckInterval = _healthCheckInterval,
  1214 + SubscriptionMonitorInterval = _subscriptionMonitorInterval,
  1215 + MessageProcessingInterval = _messageProcessingInterval
  1216 + };
  1217 + }
  1218 + }
  1219 + else
  1220 + {
  1221 + Log.Warning("连接稳定性检查失败,MQTT客户端未连接");
  1222 + return new
  1223 + {
  1224 + IsConnected = false,
  1225 + ConnectionTime = DateTime.Now,
  1226 + LastMessageReceived = _lastMessageReceivedTime,
  1227 + SubscriptionVerified = _subscriptionVerified,
  1228 + QueueSize = _currentQueueSize,
  1229 + MaxQueueSize = _maxQueueSize,
  1230 + HealthCheckInterval = _healthCheckInterval,
  1231 + SubscriptionMonitorInterval = _subscriptionMonitorInterval,
  1232 + MessageProcessingInterval = _messageProcessingInterval
  1233 + };
  1234 + }
  1235 + }
  1236 + catch (Exception ex)
  1237 + {
  1238 + Log.Error($"检查连接稳定性时发生异常: {ex.Message}");
  1239 + return new
  1240 + {
  1241 + IsConnected = false,
  1242 + Error = ex.Message,
  1243 + ConnectionTime = DateTime.Now
  1244 + };
  1245 + }
  1246 + }
  1247 +
  1248 + /// <summary>
  1249 + /// 测试数据格式兼容性
  1250 + /// </summary>
  1251 + /// <param name="testPayload">测试数据</param>
  1252 + /// <returns>测试结果</returns>
  1253 + public dynamic TestDataFormatCompatibility(string testPayload)
  1254 + {
  1255 + try
  1256 + {
  1257 + Log.Information($"开始测试数据格式兼容性: {testPayload}");
  1258 +
  1259 + // 测试JSON反序列化
  1260 + MqttContent content = null;
  1261 + try
  1262 + {
  1263 + content = JsonSerializer.Deserialize<MqttContent>(testPayload);
  1264 + Log.Information("JSON反序列化成功");
  1265 + }
  1266 + catch (JsonException jsonEx)
  1267 + {
  1268 + Log.Error($"JSON反序列化失败: {jsonEx.Message}");
  1269 + return new
  1270 + {
  1271 + Success = false,
  1272 + Error = "JSON反序列化失败",
  1273 + Details = jsonEx.Message,
  1274 + TestPayload = testPayload
  1275 + };
  1276 + }
  1277 +
  1278 + if (content == null)
  1279 + {
  1280 + return new
  1281 + {
  1282 + Success = false,
  1283 + Error = "JSON反序列化结果为空",
  1284 + TestPayload = testPayload
  1285 + };
  1286 + }
  1287 +
  1288 + // 检查关键字段
  1289 + var fieldChecks = new Dictionary<string, object>
  1290 + {
  1291 + ["id"] = content.id,
  1292 + ["lane"] = content.lane,
  1293 + ["rfid_1"] = content.rfid_1,
  1294 + ["rfid_2"] = content.rfid_2,
  1295 + ["batteryCapacity"] = content.batteryCapacity,
  1296 + ["isOpenSuccess"] = content.isOpenSuccess,
  1297 + ["isUav"] = content.isUav,
  1298 + ["uavcode"] = content.uavcode
  1299 + };
  1300 +
  1301 + Log.Information($"数据格式兼容性测试成功,字段值: {string.Join(", ", fieldChecks.Select(kv => $"{kv.Key}={kv.Value}"))}");
  1302 +
  1303 + return new
  1304 + {
  1305 + Success = true,
  1306 + Message = "数据格式兼容性测试通过",
  1307 + DeserializedContent = content,
  1308 + FieldChecks = fieldChecks,
  1309 + TestPayload = testPayload
  1310 + };
  1311 + }
  1312 + catch (Exception ex)
  1313 + {
  1314 + Log.Error($"数据格式兼容性测试失败: {ex.Message}");
  1315 + return new
  1316 + {
  1317 + Success = false,
  1318 + Error = "测试过程中发生异常",
  1319 + Details = ex.Message,
  1320 + TestPayload = testPayload
  1321 + };
721 } 1322 }
722 } 1323 }
723 1324
@@ -729,7 +1330,10 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab @@ -729,7 +1330,10 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab
729 try 1330 try
730 { 1331 {
731 _healthCheckTimer?.Dispose(); 1332 _healthCheckTimer?.Dispose();
732 - Log.Information("MQTT健康检查定时器已释放"); 1333 + _subscriptionMonitorTimer?.Dispose();
  1334 + _messageProcessingTimer?.Dispose();
  1335 + _messageProcessingSemaphore?.Dispose();
  1336 + Log.Information("MQTT定时器和信号量已释放");
733 } 1337 }
734 catch (Exception ex) 1338 catch (Exception ex)
735 { 1339 {
netcore/src/Modularity/Extend/NCC.Extend/NCC.Extend.xml
@@ -117,6 +117,31 @@ @@ -117,6 +117,31 @@
117 <param name="input"></param> 117 <param name="input"></param>
118 <returns></returns> 118 <returns></returns>
119 </member> 119 </member>
  120 + <member name="T:NCC.Extend.MqttMessageInfo">
  121 + <summary>
  122 + MQTT 消息信息
  123 + </summary>
  124 + </member>
  125 + <member name="P:NCC.Extend.MqttMessageInfo.Topic">
  126 + <summary>
  127 + 消息主题
  128 + </summary>
  129 + </member>
  130 + <member name="P:NCC.Extend.MqttMessageInfo.Payload">
  131 + <summary>
  132 + 消息内容
  133 + </summary>
  134 + </member>
  135 + <member name="P:NCC.Extend.MqttMessageInfo.ReceivedTime">
  136 + <summary>
  137 + 接收时间
  138 + </summary>
  139 + </member>
  140 + <member name="P:NCC.Extend.MqttMessageInfo.DeviceId">
  141 + <summary>
  142 + 设备ID
  143 + </summary>
  144 + </member>
120 <member name="T:NCC.Extend.MqttPublisherService"> 145 <member name="T:NCC.Extend.MqttPublisherService">
121 <summary> 146 <summary>
122 MQTT 发布与订阅统一服务 147 MQTT 发布与订阅统一服务
@@ -142,6 +167,31 @@ @@ -142,6 +167,31 @@
142 健康检查定时器回调方法 167 健康检查定时器回调方法
143 </summary> 168 </summary>
144 </member> 169 </member>
  170 + <member name="M:NCC.Extend.MqttPublisherService.SubscriptionMonitorCallback(System.Object)">
  171 + <summary>
  172 + 订阅监控定时器回调方法
  173 + </summary>
  174 + </member>
  175 + <member name="M:NCC.Extend.MqttPublisherService.MessageProcessingCallback(System.Object)">
  176 + <summary>
  177 + 消息处理定时器回调方法
  178 + </summary>
  179 + </member>
  180 + <member name="M:NCC.Extend.MqttPublisherService.ProcessMessageAsync(NCC.Extend.MqttMessageInfo)">
  181 + <summary>
  182 + 异步处理单个消息
  183 + </summary>
  184 + <param name="messageInfo">消息信息</param>
  185 + <returns></returns>
  186 + </member>
  187 + <member name="M:NCC.Extend.MqttPublisherService.LogFailedMessageAsync(NCC.Extend.MqttMessageInfo,System.String)">
  188 + <summary>
  189 + 记录失败的消息
  190 + </summary>
  191 + <param name="messageInfo">消息信息</param>
  192 + <param name="errorReason">失败原因</param>
  193 + <returns></returns>
  194 + </member>
145 <member name="M:NCC.Extend.MqttPublisherService.PublishAsync(System.String,System.String)"> 195 <member name="M:NCC.Extend.MqttPublisherService.PublishAsync(System.String,System.String)">
146 <summary> 196 <summary>
147 向指定 Topic 发布 MQTT 消息 197 向指定 Topic 发布 MQTT 消息
@@ -160,6 +210,16 @@ @@ -160,6 +210,16 @@
160 确保订阅状态正常,如果订阅丢失则重新订阅 210 确保订阅状态正常,如果订阅丢失则重新订阅
161 </summary> 211 </summary>
162 </member> 212 </member>
  213 + <member name="M:NCC.Extend.MqttPublisherService.VerifySubscriptionAsync">
  214 + <summary>
  215 + 验证订阅是否真的生效
  216 + </summary>
  217 + </member>
  218 + <member name="M:NCC.Extend.MqttPublisherService.RetrySubscriptionAsync">
  219 + <summary>
  220 + 重试订阅
  221 + </summary>
  222 + </member>
163 <member name="M:NCC.Extend.MqttPublisherService.IsClientOnlineAsync(System.String)"> 223 <member name="M:NCC.Extend.MqttPublisherService.IsClientOnlineAsync(System.String)">
164 <summary> 224 <summary>
165 查询指定clientId是否在线(通过EMQX管理API,带缓存) 225 查询指定clientId是否在线(通过EMQX管理API,带缓存)
@@ -193,7 +253,7 @@ @@ -193,7 +253,7 @@
193 </member> 253 </member>
194 <member name="M:NCC.Extend.MqttPublisherService.PerformHealthCheckAndRepairAsync"> 254 <member name="M:NCC.Extend.MqttPublisherService.PerformHealthCheckAndRepairAsync">
195 <summary> 255 <summary>
196 - 定期健康检查和自动修复(建议每5分钟调用一次) 256 + 定期健康检查和自动修复(建议每3分钟调用一次)
197 </summary> 257 </summary>
198 <returns>修复是否成功</returns> 258 <returns>修复是否成功</returns>
199 </member> 259 </member>
@@ -203,6 +263,25 @@ @@ -203,6 +263,25 @@
203 </summary> 263 </summary>
204 <returns>订阅的主题列表</returns> 264 <returns>订阅的主题列表</returns>
205 </member> 265 </member>
  266 + <member name="M:NCC.Extend.MqttPublisherService.GetSubscriptionStatus">
  267 + <summary>
  268 + 获取订阅状态信息
  269 + </summary>
  270 + <returns>订阅状态信息</returns>
  271 + </member>
  272 + <member name="M:NCC.Extend.MqttPublisherService.CheckConnectionStabilityAsync">
  273 + <summary>
  274 + 检查连接稳定性
  275 + </summary>
  276 + <returns>连接稳定性信息</returns>
  277 + </member>
  278 + <member name="M:NCC.Extend.MqttPublisherService.TestDataFormatCompatibility(System.String)">
  279 + <summary>
  280 + 测试数据格式兼容性
  281 + </summary>
  282 + <param name="testPayload">测试数据</param>
  283 + <returns>测试结果</returns>
  284 + </member>
206 <member name="M:NCC.Extend.MqttPublisherService.Dispose"> 285 <member name="M:NCC.Extend.MqttPublisherService.Dispose">
207 <summary> 286 <summary>
208 释放资源 287 释放资源
@@ -784,6 +863,18 @@ @@ -784,6 +863,18 @@
784 <param name="input">查询参数</param> 863 <param name="input">查询参数</param>
785 <returns></returns> 864 <returns></returns>
786 </member> 865 </member>
  866 + <member name="M:NCC.Extend.UavDevice.UavDeviceService.UpdateChargingStatusToComplete">
  867 + <summary>
  868 + 将超过30分钟充电状态的无人机格子状态改为充电完成
  869 + </summary>
  870 + <returns></returns>
  871 + </member>
  872 + <member name="M:NCC.Extend.UavDevice.UavDeviceService.AutoUpdateChargingStatus">
  873 + <summary>
  874 + 定时任务:自动更新超过30分钟的充电状态
  875 + </summary>
  876 + <returns></returns>
  877 + </member>
787 <member name="T:NCC.Extend.UavFeeRule.UavFeeRuleService"> 878 <member name="T:NCC.Extend.UavFeeRule.UavFeeRuleService">
788 <summary> 879 <summary>
789 收费规则服务 880 收费规则服务
@@ -1125,6 +1216,17 @@ @@ -1125,6 +1216,17 @@
1125 <param name="input"></param> 1216 <param name="input"></param>
1126 <returns></returns> 1217 <returns></returns>
1127 </member> 1218 </member>
  1219 + <member name="M:NCC.Extend.UavOrder.UavOrderService.ClearDuplicateUavInfo(System.String,System.String,System.String,System.String,System.String)">
  1220 + <summary>
  1221 + 检测并清空重复的无人机信息
  1222 + </summary>
  1223 + <param name="uavCode">无人机编码</param>
  1224 + <param name="rfid1">电池1 RFID</param>
  1225 + <param name="rfid2">电池2 RFID</param>
  1226 + <param name="currentDeviceId">当前设备ID</param>
  1227 + <param name="currentLane">当前货道</param>
  1228 + <returns></returns>
  1229 + </member>
1128 <member name="M:NCC.Extend.UavOrder.UavOrderService.OrderCheckOpenDoor(NCC.Extend.Entitys.Dto.UavDevice.UavDeviceOpenCrInput,System.String)"> 1230 <member name="M:NCC.Extend.UavOrder.UavOrderService.OrderCheckOpenDoor(NCC.Extend.Entitys.Dto.UavDevice.UavDeviceOpenCrInput,System.String)">
1129 <summary> 1231 <summary>
1130 订单下单开门 1232 订单下单开门
netcore/src/Modularity/Extend/NCC.Extend/UavFeeRuleService.cs
@@ -299,7 +299,7 @@ namespace NCC.Extend.UavFeeRule @@ -299,7 +299,7 @@ namespace NCC.Extend.UavFeeRule
299 } 299 }
300 #endregion 300 #endregion
301 301
302 - #region 根据设备来获获取收费规则 302 + #region 根据设备来获获取收费规则
303 /// <summary> 303 /// <summary>
304 /// 根据设备来获获取收费规则 304 /// 根据设备来获获取收费规则
305 /// </summary> 305 /// </summary>
@@ -333,11 +333,11 @@ namespace NCC.Extend.UavFeeRule @@ -333,11 +333,11 @@ namespace NCC.Extend.UavFeeRule
333 entity.IsDelete = 99; 333 entity.IsDelete = 99;
334 var isOk = await _db.Updateable(entity).IgnoreColumns(ignoreAllNullColumns: true).ExecuteCommandAsync(); 334 var isOk = await _db.Updateable(entity).IgnoreColumns(ignoreAllNullColumns: true).ExecuteCommandAsync();
335 if (!(isOk > 0)) throw NCCException.Oh(ErrorCode.COM1002); 335 if (!(isOk > 0)) throw NCCException.Oh(ErrorCode.COM1002);
336 - //删除绑定管理  
337 - var relationEntity = await _db.Queryable<UavRelationEntity>().FirstAsync(p => p.FeeRuleId == id);  
338 - if (relationEntity != null) 336 + // 删除绑定管理(批量删除所有与该收费规则关联的绑定关系)
  337 + var hasRelations = await _db.Queryable<UavRelationEntity>().AnyAsync(p => p.FeeRuleId == id);
  338 + if (hasRelations)
339 { 339 {
340 - var deleteResult = await _db.Deleteable(relationEntity).ExecuteCommandAsync(); 340 + var deleteResult = await _db.Deleteable<UavRelationEntity>().Where(p => p.FeeRuleId == id).ExecuteCommandAsync();
341 if (deleteResult <= 0) 341 if (deleteResult <= 0)
342 { 342 {
343 throw NCCException.Oh(ErrorCode.COM1002); 343 throw NCCException.Oh(ErrorCode.COM1002);
netcore/src/Modularity/Extend/NCC.Extend/UavOrderService.cs
@@ -587,7 +587,7 @@ namespace NCC.Extend.UavOrder @@ -587,7 +587,7 @@ namespace NCC.Extend.UavOrder
587 foreach (var item in terminalProfitSettingLIst) 587 foreach (var item in terminalProfitSettingLIst)
588 { 588 {
589 //计算终端人员分润 589 //计算终端人员分润
590 - var terminalProfitAmount = terminalFinalAmount *( item.ProfitRatio / 100); 590 + var terminalProfitAmount = terminalFinalAmount * (item.ProfitRatio / 100);
591 UavWalletFlowEntity uavWalletFlowEntity_Terminal = new UavWalletFlowEntity 591 UavWalletFlowEntity uavWalletFlowEntity_Terminal = new UavWalletFlowEntity
592 { 592 {
593 Id = YitIdHelper.NextId().ToString(), 593 Id = YitIdHelper.NextId().ToString(),
@@ -1170,13 +1170,6 @@ namespace NCC.Extend.UavOrder @@ -1170,13 +1170,6 @@ namespace NCC.Extend.UavOrder
1170 var result = await _db.Ado.UseTranAsync(async () => 1170 var result = await _db.Ado.UseTranAsync(async () =>
1171 { 1171 {
1172 var deviceInfo = await _db.Queryable<UavDeviceEntity>().FirstAsync(x => x.Id == input.id); 1172 var deviceInfo = await _db.Queryable<UavDeviceEntity>().FirstAsync(x => x.Id == input.id);
1173 - //添加电池1  
1174 - await HandleReplenishment(input.rfid_1, input, "电池", deviceInfo.BelongUserId);  
1175 - //添加电池2  
1176 - await HandleReplenishment(input.rfid_2, input, "电池", deviceInfo.BelongUserId);  
1177 - //添加无人机  
1178 - await HandleReplenishment(input.uavcode, input, "无人机", deviceInfo.BelongUserId);  
1179 - var cell = await _db.Queryable<UavDeviceCellEntity>().FirstAsync(x => x.DeviceId == input.id && x.CellCode == input.lane);  
1180 if (input.rfid_1 == "000000") 1173 if (input.rfid_1 == "000000")
1181 { 1174 {
1182 input.rfid_1 = ""; 1175 input.rfid_1 = "";
@@ -1185,6 +1178,15 @@ namespace NCC.Extend.UavOrder @@ -1185,6 +1178,15 @@ namespace NCC.Extend.UavOrder
1185 { 1178 {
1186 input.rfid_2 = ""; 1179 input.rfid_2 = "";
1187 } 1180 }
  1181 + // 检测并清空重复的无人机信息
  1182 + await ClearDuplicateUavInfo(input.uavcode, input.rfid_1, input.rfid_2, input.id, input.lane);
  1183 + //添加电池1
  1184 + await HandleReplenishment(input.rfid_1, input, "电池", deviceInfo.BelongUserId);
  1185 + //添加电池2
  1186 + await HandleReplenishment(input.rfid_2, input, "电池", deviceInfo.BelongUserId);
  1187 + //添加无人机
  1188 + await HandleReplenishment(input.uavcode, input, "无人机", deviceInfo.BelongUserId);
  1189 + var cell = await _db.Queryable<UavDeviceCellEntity>().FirstAsync(x => x.DeviceId == input.id && x.CellCode == input.lane);
1188 //更新货道信息 1190 //更新货道信息
1189 cell.Rfid1 = input.rfid_1; 1191 cell.Rfid1 = input.rfid_1;
1190 cell.Rfid2 = input.rfid_2; 1192 cell.Rfid2 = input.rfid_2;
@@ -1221,6 +1223,46 @@ namespace NCC.Extend.UavOrder @@ -1221,6 +1223,46 @@ namespace NCC.Extend.UavOrder
1221 await _db.Insertable(newUav).ExecuteCommandAsync(); 1223 await _db.Insertable(newUav).ExecuteCommandAsync();
1222 } 1224 }
1223 } 1225 }
  1226 +
  1227 + /// <summary>
  1228 + /// 检测并清空重复的无人机信息
  1229 + /// </summary>
  1230 + /// <param name="uavCode">无人机编码</param>
  1231 + /// <param name="rfid1">电池1 RFID</param>
  1232 + /// <param name="rfid2">电池2 RFID</param>
  1233 + /// <param name="currentDeviceId">当前设备ID</param>
  1234 + /// <param name="currentLane">当前货道</param>
  1235 + /// <returns></returns>
  1236 + [NonAction]
  1237 + private async Task ClearDuplicateUavInfo(string uavCode, string rfid1, string rfid2, string currentDeviceId, string currentLane)
  1238 + {
  1239 + try
  1240 + {
  1241 + // 主要检测无人机编码重复,如果重复则清空对应的RFID信息
  1242 + if (!string.IsNullOrWhiteSpace(uavCode))
  1243 + {
  1244 + var duplicateUavCells = await _db.Queryable<UavDeviceCellEntity>().Where(x => x.UavCode == uavCode && !(x.DeviceId == currentDeviceId && x.CellCode == currentLane)).ToListAsync();
  1245 + if (duplicateUavCells.Any())
  1246 + {
  1247 + foreach (var cell in duplicateUavCells)
  1248 + {
  1249 + // 清空无人机编码和对应的RFID信息
  1250 + cell.UavCode = "";
  1251 + cell.Rfid1 = "";
  1252 + cell.Rfid2 = "";
  1253 + cell.UpdateTime = DateTime.Now;
  1254 + }
  1255 + await _db.Updateable(duplicateUavCells).ExecuteCommandAsync();
  1256 + Log.Information($"清空了 {duplicateUavCells.Count} 个货道中重复的无人机编码及RFID信息: {uavCode}");
  1257 + }
  1258 + }
  1259 + }
  1260 + catch (Exception ex)
  1261 + {
  1262 + Log.Error($"清空重复无人机信息失败: {ex.Message}");
  1263 + throw new Exception($"清空重复无人机信息失败: {ex.Message}");
  1264 + }
  1265 + }
1224 #endregion 1266 #endregion
1225 1267
1226 #region 订单下单开门 1268 #region 订单下单开门