diff --git a/netcore/src/Application/.DS_Store b/netcore/src/Application/.DS_Store index 109f47a..c599f52 100644 --- a/netcore/src/Application/.DS_Store +++ b/netcore/src/Application/.DS_Store diff --git a/netcore/src/Modularity/Extend/NCC.Extend.Entitys/Dto/MqttContent.cs b/netcore/src/Modularity/Extend/NCC.Extend.Entitys/Dto/MqttContent.cs index 54e3694..c70d0d9 100644 --- a/netcore/src/Modularity/Extend/NCC.Extend.Entitys/Dto/MqttContent.cs +++ b/netcore/src/Modularity/Extend/NCC.Extend.Entitys/Dto/MqttContent.cs @@ -33,4 +33,40 @@ public class MqttContent /// 回调地址 /// public string tokenUrl { get; set; } + + // 新增字段,支持无人机设备消息格式 + /// + /// 设备ID + /// + public string id { get; set; } + + /// + /// 电池1 RFID + /// + public string rfid_1 { get; set; } + + /// + /// 电池2 RFID + /// + public string rfid_2 { get; set; } + + /// + /// 电池容量 + /// + public string batteryCapacity { get; set; } + + /// + /// 是否开启成功 + /// + public bool? isOpenSuccess { get; set; } + + /// + /// 是否为无人机 + /// + public bool? isUav { get; set; } + + /// + /// 无人机编码 + /// + public string uavcode { get; set; } } diff --git a/netcore/src/Modularity/Extend/NCC.Extend/MqttPublisherService.cs b/netcore/src/Modularity/Extend/NCC.Extend/MqttPublisherService.cs index f71fd2d..e2c7d23 100644 --- a/netcore/src/Modularity/Extend/NCC.Extend/MqttPublisherService.cs +++ b/netcore/src/Modularity/Extend/NCC.Extend/MqttPublisherService.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Text; using System.Text.Json; @@ -24,6 +25,32 @@ using System.Linq; namespace NCC.Extend; /// +/// MQTT 消息信息 +/// +public class MqttMessageInfo +{ + /// + /// 消息主题 + /// + public string Topic { get; set; } + + /// + /// 消息内容 + /// + public string Payload { get; set; } + + /// + /// 接收时间 + /// + public DateTime ReceivedTime { get; set; } + + /// + /// 设备ID + /// + public string DeviceId { get; set; } +} + +/// /// MQTT 发布与订阅统一服务 /// public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposable @@ -42,7 +69,26 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab private readonly Dictionary _onlineStatusCache = new Dictionary(); private readonly object _cacheLock = new object(); private readonly Timer _healthCheckTimer; // 健康检查定时器 - private readonly int _healthCheckInterval = 5 * 60 * 1000; // 5分钟检查一次 + private readonly Timer _subscriptionMonitorTimer; // 订阅监控定时器 + private readonly int _healthCheckInterval = 3 * 60 * 1000; // 3分钟检查一次 + private readonly int _subscriptionMonitorInterval = 30 * 1000; // 30秒监控一次订阅状态 + + // 消息队列处理 + private readonly ConcurrentQueue _messageQueue = new ConcurrentQueue(); + private readonly SemaphoreSlim _messageProcessingSemaphore = new SemaphoreSlim(5, 5); // 最多5个并发处理 + private readonly Timer _messageProcessingTimer; // 消息处理定时器 + private readonly int _messageProcessingInterval = 100; // 100ms处理一次队列 + private readonly int _maxQueueSize = 10000; // 最大队列大小,防止内存溢出 + private volatile int _currentQueueSize = 0; // 当前队列大小 + + // 订阅状态监控 + private DateTime _lastMessageReceivedTime = DateTime.Now; + private bool _subscriptionVerified = false; + private DateTime _lastRetryTime = DateTime.MinValue; // 上次重试时间 + private readonly object _subscriptionLock = new object(); + private const string SUBSCRIPTION_TOPIC = "device/+/response"; + private const int RETRY_INTERVAL_MINUTES = 5; // 重试间隔5分钟 + /// /// 构造函数:初始化客户端和配置、注册事件 /// @@ -59,115 +105,135 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab .WithTcpServer("mqtt.cqjiangzhichao.cn", 1883) // Broker 地址 .WithCredentials("wrjservice", "P@ssw0rd") // 账号密码 .WithClientId("server_publisher") // 客户端 ID,必须唯一 + .WithKeepAlivePeriod(TimeSpan.FromSeconds(60)) // 保持连接心跳 + .WithCleanSession(false) // 保持会话状态 .Build(); // 连接成功事件:订阅所有设备的响应主题(如 device/xxx/response) _mqttClient.UseConnectedHandler(async e => { Log.Information("MQTT 已连接成功"); + _subscriptionVerified = false; // 重置订阅验证状态 + // 订阅所有设备的响应主题(+ 代表通配符) - await _mqttClient.SubscribeAsync("device/+/response"); - Log.Information("已订阅通用响应 topic: device/+/response"); + await EnsureSubscriptionAsync(); }); // 连接断开事件 - _mqttClient.UseDisconnectedHandler(async e => + _mqttClient.UseDisconnectedHandler(e => { - Log.Warning("MQTT 已断开连接"); - int retryInterval = 5; // 秒 - while (!_mqttClient.IsConnected) + Log.Warning($"MQTT 已断开连接,原因: {e.Reason}"); + _subscriptionVerified = false; // 重置订阅验证状态 + + // 使用Task.Run避免阻塞MQTT客户端线程 + _ = Task.Run(async () => { - try + int retryInterval = 5; // 秒 + int maxRetries = 10; // 最大重试次数 + int retryCount = 0; + + while (!_mqttClient.IsConnected && retryCount < maxRetries) { - Log.Information($"尝试在 {retryInterval} 秒后重新连接 MQTT..."); - await Task.Delay(TimeSpan.FromSeconds(retryInterval)); - await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); - Log.Information("MQTT 重连成功"); - - // 重连成功后重新订阅主题 try { - await _mqttClient.SubscribeAsync("device/+/response"); - Log.Information("重连后重新订阅主题成功: device/+/response"); + retryCount++; + Log.Information($"尝试第 {retryCount} 次重新连接 MQTT,{retryInterval} 秒后开始..."); + await Task.Delay(TimeSpan.FromSeconds(retryInterval)); + + await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); + Log.Information("MQTT 重连成功"); + + // 重连成功后重新订阅主题 + await EnsureSubscriptionAsync(); + + break; // 重连成功后跳出循环 } - catch (Exception subEx) + catch (Exception ex) { - Log.Error($"重连后重新订阅主题失败: {subEx.Message}"); + Log.Error($"MQTT 第 {retryCount} 次重连失败: {ex.Message}"); + + if (retryCount >= maxRetries) + { + Log.Error($"MQTT 重连失败次数达到上限 ({maxRetries}),停止重连"); + break; + } + + // 重连失败后增加延迟,避免频繁重试 + retryInterval = Math.Min(retryInterval * 2, 60); // 最大延迟60秒 + await Task.Delay(TimeSpan.FromSeconds(retryInterval)); } - - break; // 重连成功后跳出循环 } - catch (Exception ex) - { - Log.Error($"MQTT 重连失败: {ex.Message}"); - // 重连失败后继续循环,但增加延迟避免频繁重试 - await Task.Delay(TimeSpan.FromSeconds(retryInterval * 2)); - } - } + }); }); - // 收到消息事件:处理设备回传的消息 + + // 收到消息事件:快速入队,避免阻塞 _mqttClient.UseApplicationMessageReceivedHandler(e => { try { + // 更新最后收到消息的时间和验证状态 + lock (_subscriptionLock) + { + _lastMessageReceivedTime = DateTime.Now; + _subscriptionVerified = true; // 只有收到实际消息才标记为已验证 + } + // 获取 topic 和 payload 内容 var topic = e.ApplicationMessage.Topic; var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? Array.Empty()); - Log.Information($"收到 MQTT 消息,Topic: {topic}, Payload: {payload}"); - // 反序列化 JSON 内容为 MqttContent 实体 - var content = JsonSerializer.Deserialize(payload); - if (content != null) - { - // 从 topic 中提取设备 ID,例如 device/abc123/response - var topicParts = topic.Split('/'); - var deviceId = topicParts.Length >= 2 ? topicParts[1] : "unknown"; - Log.Information($"{topicParts[0]}: {deviceId})"); - UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity - { - Id = YitIdHelper.NextId().ToString(), - MessageType = "response", - Topic = topic, - Payload = payload, - DeviceId = deviceId, - Processed = 1, - Status = "success", - CreatedAt = DateTime.Now, - }; - if (!payload.Contains("\"status\":\"received\"")) + + // 从 topic 中提取设备 ID + var topicParts = topic.Split('/'); + var deviceId = topicParts.Length >= 2 ? topicParts[1] : "unknown"; + + // 检查队列大小,防止内存溢出 + if (_currentQueueSize >= _maxQueueSize) + { + Log.Warning($"消息队列已满({_maxQueueSize}),丢弃消息: Topic: {topic}, DeviceId: {deviceId}"); + // 记录丢弃的消息(异步执行,不阻塞) + _ = Task.Run(async () => { - //判断两秒内,Payload一样的话,就不处理了 - var twoSecondsAgo = DateTime.Now.AddSeconds(-2); - var lastLog = _db.Queryable().Where(p => p.DeviceId == deviceId && p.Payload == payload && p.CreatedAt >= twoSecondsAgo).Any(); - if (lastLog == false) + await LogFailedMessageAsync(new MqttMessageInfo { - Log.Information($"[订阅消息] 处理新消息:设备 {deviceId},内容:{payload}"); - var orderService = _serviceProvider.GetRequiredService(); - orderService.HandleSubscribeMessage(payload); - } - else - { - Log.Information($"[订阅消息] 接收到重复的消息,已忽略:设备 {deviceId},内容:{payload}"); - } - //只保存正常消息 - _db.Insertable(logEntity).ExecuteCommand(); - } - else - { - Log.Debug($"[订阅消息] 接收到确认消息,不处理:设备 {deviceId},内容:{payload}"); - // 保存确认消息到日志 - _db.Insertable(logEntity).ExecuteCommand(); - } + Topic = topic, + Payload = payload, + ReceivedTime = DateTime.Now, + DeviceId = deviceId + }, "队列已满,消息被丢弃"); + }); + return; } + + // 快速入队,避免阻塞MQTT接收 + var messageInfo = new MqttMessageInfo + { + Topic = topic, + Payload = payload, + ReceivedTime = DateTime.Now, + DeviceId = deviceId + }; + + _messageQueue.Enqueue(messageInfo); + Interlocked.Increment(ref _currentQueueSize); + Log.Debug($"MQTT消息已入队,Topic: {topic}, DeviceId: {deviceId}, 队列长度: {_currentQueueSize}"); } catch (Exception ex) { - Log.Error($"消息处理异常: {ex.Message}"); + Log.Error($"消息入队异常: {ex.Message}"); } }); // 初始化健康检查定时器 _healthCheckTimer = new Timer(HealthCheckCallback, null, _healthCheckInterval, _healthCheckInterval); - Log.Information("MQTT健康检查定时器已启动,每5分钟检查一次"); + Log.Information("MQTT健康检查定时器已启动,每3分钟检查一次"); + + // 初始化订阅监控定时器 + _subscriptionMonitorTimer = new Timer(SubscriptionMonitorCallback, null, _subscriptionMonitorInterval, _subscriptionMonitorInterval); + Log.Information("MQTT订阅监控定时器已启动,每30秒监控一次"); + + // 初始化消息处理定时器 + _messageProcessingTimer = new Timer(MessageProcessingCallback, null, _messageProcessingInterval, _messageProcessingInterval); + Log.Information("MQTT消息处理定时器已启动,每100ms处理一次队列"); } /// @@ -187,6 +253,225 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab } /// + /// 订阅监控定时器回调方法 + /// + private async void SubscriptionMonitorCallback(object state) + { + try + { + Log.Debug("执行定时MQTT订阅监控..."); + await VerifySubscriptionAsync(); + } + catch (Exception ex) + { + Log.Error($"订阅监控执行失败: {ex.Message}"); + } + } + + /// + /// 消息处理定时器回调方法 + /// + private void MessageProcessingCallback(object state) + { + try + { + // 处理队列中的消息,最多同时处理5个 + var tasks = new List(); + for (int i = 0; i < 5 && _currentQueueSize > 0; i++) + { + if (_messageQueue.TryDequeue(out var messageInfo)) + { + Interlocked.Decrement(ref _currentQueueSize); + tasks.Add(ProcessMessageAsync(messageInfo)); + } + } + + if (tasks.Count > 0) + { + // 使用Task.Run避免阻塞定时器线程 + Task.Run(async () => + { + try + { + await Task.WhenAll(tasks); + } + catch (Exception ex) + { + Log.Error($"消息处理任务执行失败: {ex.Message}"); + } + }); + } + } + catch (Exception ex) + { + Log.Error($"消息处理定时器执行失败: {ex.Message}"); + } + } + + /// + /// 异步处理单个消息 + /// + /// 消息信息 + /// + private async Task ProcessMessageAsync(MqttMessageInfo messageInfo) + { + await _messageProcessingSemaphore.WaitAsync(); + try + { + Log.Information($"开始处理MQTT消息,Topic: {messageInfo.Topic}, DeviceId: {messageInfo.DeviceId}"); + + // 验证消息格式 + if (string.IsNullOrWhiteSpace(messageInfo.Payload)) + { + Log.Warning($"消息内容为空,跳过处理: Topic: {messageInfo.Topic}, DeviceId: {messageInfo.DeviceId}"); + return; + } + + // 反序列化 JSON 内容为 MqttContent 实体 + MqttContent content = null; + try + { + content = JsonSerializer.Deserialize(messageInfo.Payload); + } + catch (JsonException jsonEx) + { + Log.Error($"JSON反序列化失败: {jsonEx.Message}, Payload: {messageInfo.Payload}"); + // 即使JSON解析失败,也要记录到日志 + await LogFailedMessageAsync(messageInfo, "JSON解析失败"); + return; + } + + if (content != null) + { + UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity + { + Id = YitIdHelper.NextId().ToString(), + MessageType = "response", + Topic = messageInfo.Topic, + Payload = messageInfo.Payload, + DeviceId = messageInfo.DeviceId, + Processed = 1, + Status = "success", + CreatedAt = messageInfo.ReceivedTime, + }; + + if (!messageInfo.Payload.Contains("\"status\":\"received\"")) + { + // 判断两秒内,Payload一样的话,就不处理了 + var twoSecondsAgo = DateTime.Now.AddSeconds(-2); + bool lastLog = false; + try + { + lastLog = await _db.Queryable() + .Where(p => p.DeviceId == messageInfo.DeviceId && + p.Payload == messageInfo.Payload && + p.CreatedAt >= twoSecondsAgo) + .AnyAsync(); + } + catch (Exception dbEx) + { + Log.Error($"数据库查询失败: {dbEx.Message}, DeviceId: {messageInfo.DeviceId}"); + // 数据库查询失败时,为了安全起见,不处理消息 + await LogFailedMessageAsync(messageInfo, "数据库查询失败"); + return; + } + + if (!lastLog) + { + Log.Information($"[订阅消息] 处理新消息:设备 {messageInfo.DeviceId},内容:{messageInfo.Payload}"); + try + { + var orderService = _serviceProvider.GetRequiredService(); + await Task.Run(() => orderService.HandleSubscribeMessage(messageInfo.Payload)); + } + catch (Exception businessEx) + { + Log.Error($"业务逻辑处理失败: {businessEx.Message}, DeviceId: {messageInfo.DeviceId}"); + logEntity.Status = "business_error"; + logEntity.Processed = 0; + } + } + else + { + Log.Information($"[订阅消息] 接收到重复的消息,已忽略:设备 {messageInfo.DeviceId},内容:{messageInfo.Payload}"); + logEntity.Status = "duplicate"; + } + + // 保存消息到日志 + try + { + await _db.Insertable(logEntity).ExecuteCommandAsync(); + } + catch (Exception logEx) + { + Log.Error($"保存消息日志失败: {logEx.Message}, DeviceId: {messageInfo.DeviceId}"); + } + } + else + { + Log.Debug($"[订阅消息] 接收到确认消息,不处理:设备 {messageInfo.DeviceId},内容:{messageInfo.Payload}"); + // 保存确认消息到日志 + try + { + await _db.Insertable(logEntity).ExecuteCommandAsync(); + } + catch (Exception logEx) + { + Log.Error($"保存确认消息日志失败: {logEx.Message}, DeviceId: {messageInfo.DeviceId}"); + } + } + } + else + { + Log.Warning($"JSON反序列化结果为空: Payload: {messageInfo.Payload}"); + await LogFailedMessageAsync(messageInfo, "JSON反序列化结果为空"); + } + + Log.Debug($"MQTT消息处理完成,Topic: {messageInfo.Topic}, DeviceId: {messageInfo.DeviceId}"); + } + catch (Exception ex) + { + Log.Error($"处理MQTT消息异常: {ex.Message}, Topic: {messageInfo.Topic}, DeviceId: {messageInfo.DeviceId}"); + await LogFailedMessageAsync(messageInfo, $"处理异常: {ex.Message}"); + } + finally + { + _messageProcessingSemaphore.Release(); + } + } + + /// + /// 记录失败的消息 + /// + /// 消息信息 + /// 失败原因 + /// + private async Task LogFailedMessageAsync(MqttMessageInfo messageInfo, string errorReason) + { + try + { + var logEntity = new UavMqttMessageLogEntity + { + Id = YitIdHelper.NextId().ToString(), + MessageType = "response", + Topic = messageInfo.Topic, + Payload = messageInfo.Payload, + DeviceId = messageInfo.DeviceId, + Processed = 0, + Status = "failed", + CreatedAt = messageInfo.ReceivedTime, + }; + + await _db.Insertable(logEntity).ExecuteCommandAsync(); + Log.Information($"失败消息已记录: DeviceId: {messageInfo.DeviceId}, 原因: {errorReason}"); + } + catch (Exception ex) + { + Log.Error($"记录失败消息时出错: {ex.Message}, DeviceId: {messageInfo.DeviceId}"); + } + } + + /// /// 向指定 Topic 发布 MQTT 消息 /// /// 主题路径,例如 device/{id}/command @@ -278,13 +563,28 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab /// public async Task StartAsync() { - if (!_mqttClient.IsConnected) + try { - await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); + Log.Information("启动MQTT客户端..."); + + if (!_mqttClient.IsConnected) + { + await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); + Log.Information("MQTT客户端连接成功"); + } + + // 确保订阅状态正常 + await EnsureSubscriptionAsync(); + + // 注意:不立即标记为已验证,等待实际消息接收来验证订阅是否真正生效 + + Log.Information("MQTT客户端启动完成,等待实际消息验证订阅状态"); + } + catch (Exception ex) + { + Log.Error($"启动MQTT客户端失败: {ex.Message}"); + throw; } - - // 确保订阅状态正常 - await EnsureSubscriptionAsync(); } /// @@ -298,17 +598,102 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab { // 检查订阅状态,如果订阅丢失则重新订阅 Log.Information("检查MQTT订阅状态..."); - await _mqttClient.SubscribeAsync("device/+/response"); + await _mqttClient.SubscribeAsync(SUBSCRIPTION_TOPIC); Log.Information("MQTT订阅状态正常: device/+/response"); + + // 注意:不立即标记为已验证,等待实际消息接收来验证订阅是否真正生效 } } catch (Exception ex) { Log.Warning($"检查订阅状态时出错: {ex.Message}"); + // 订阅失败,尝试重新订阅 + await RetrySubscriptionAsync(); + } + } + + /// + /// 验证订阅是否真的生效 + /// + private async Task VerifySubscriptionAsync() + { + try + { + lock (_subscriptionLock) + { + // 检查是否长时间没有收到消息 + var timeSinceLastMessage = DateTime.Now - _lastMessageReceivedTime; + if (timeSinceLastMessage.TotalMinutes > 2 && _subscriptionVerified) + { + Log.Warning($"订阅可能失效,已 {timeSinceLastMessage.TotalMinutes:F1} 分钟未收到消息"); + _subscriptionVerified = false; + } + } + + // 如果订阅未验证,且距离上次重试超过5分钟,才尝试重新订阅 + if (!_subscriptionVerified) + { + var timeSinceLastRetry = DateTime.Now - _lastRetryTime; + if (timeSinceLastRetry.TotalMinutes >= RETRY_INTERVAL_MINUTES) + { + Log.Warning("订阅验证失败,尝试重新订阅..."); + await RetrySubscriptionAsync(); + _lastRetryTime = DateTime.Now; + } + else + { + Log.Debug($"订阅未验证,但距离上次重试仅 {timeSinceLastRetry.TotalMinutes:F1} 分钟,跳过重试"); + } + } + } + catch (Exception ex) + { + Log.Error($"验证订阅状态失败: {ex.Message}"); } } /// + /// 重试订阅 + /// + private async Task RetrySubscriptionAsync() + { + try + { + if (_mqttClient.IsConnected) + { + Log.Information("尝试重新订阅..."); + + // 先取消现有订阅 + try + { + await _mqttClient.UnsubscribeAsync(SUBSCRIPTION_TOPIC); + Log.Information("已取消现有订阅"); + } + catch (Exception ex) + { + Log.Warning($"取消订阅失败: {ex.Message}"); + } + + // 重新订阅 + await _mqttClient.SubscribeAsync(SUBSCRIPTION_TOPIC); + Log.Information("重新订阅成功"); + + // 重置验证状态,等待实际消息接收来验证订阅是否真正生效 + _subscriptionVerified = false; + _lastMessageReceivedTime = DateTime.Now; + + // 注意:不立即标记为已验证,等待实际消息接收来验证 + } + } + catch (Exception ex) + { + Log.Error($"重试订阅失败: {ex.Message}"); + } + } + + + + /// /// 查询指定clientId是否在线(通过EMQX管理API,带缓存) /// /// 要查询的客户端ID @@ -390,7 +775,7 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab } return isOnline; } - catch (TaskCanceledException ex) + catch (TaskCanceledException) { Log.Warning($"查询设备 {clientId} 在线状态超时,重试次数: {retry + 1}"); if (retry == maxRetries) @@ -629,19 +1014,32 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab { try { + Log.Information("开始强制重连MQTT..."); + if (_mqttClient.IsConnected) { await _mqttClient.DisconnectAsync(); Log.Information("MQTT 主动断开连接"); } + // 重置订阅状态 + lock (_subscriptionLock) + { + _subscriptionVerified = false; + _lastMessageReceivedTime = DateTime.Now; + } + await Task.Delay(2000); // 等待2秒后重连 await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); Log.Information("MQTT 强制重连成功"); - // 重连后重新订阅 + // 重连后重新订阅并验证 await EnsureSubscriptionAsync(); + + // 注意:不立即标记为已验证,等待实际消息接收来验证订阅是否真正生效 + + Log.Information("MQTT 强制重连完成,等待实际消息验证订阅状态"); return true; } catch (Exception ex) @@ -652,7 +1050,7 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab } /// - /// 定期健康检查和自动修复(建议每5分钟调用一次) + /// 定期健康检查和自动修复(建议每3分钟调用一次) /// /// 修复是否成功 public async Task PerformHealthCheckAndRepairAsync() @@ -689,6 +1087,19 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab return true; } + // 检查订阅状态和消息接收情况 + lock (_subscriptionLock) + { + var timeSinceLastMessage = DateTime.Now - _lastMessageReceivedTime; + if (timeSinceLastMessage.TotalMinutes > 10) + { + Log.Warning($"长时间未收到消息({timeSinceLastMessage.TotalMinutes:F1}分钟),订阅可能有问题"); + } + } + + // 验证订阅状态 + await VerifySubscriptionAsync(); + // 检查订阅状态 await EnsureSubscriptionAsync(); @@ -706,18 +1117,208 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab /// 获取当前订阅的主题列表 /// /// 订阅的主题列表 - public async Task> GetSubscribedTopicsAsync() + public Task> GetSubscribedTopicsAsync() { try { // 注意:MQTTnet可能不直接提供获取订阅列表的方法 // 这里返回我们已知的订阅主题 - return new List { "device/+/response" }; + return Task.FromResult(new List { SUBSCRIPTION_TOPIC }); } catch (Exception ex) { Log.Error($"获取订阅主题列表失败: {ex.Message}"); - return new List(); + return Task.FromResult(new List()); + } + } + + /// + /// 获取订阅状态信息 + /// + /// 订阅状态信息 + public dynamic GetSubscriptionStatus() + { + try + { + lock (_subscriptionLock) + { + var timeSinceLastMessage = DateTime.Now - _lastMessageReceivedTime; + return new + { + IsConnected = _mqttClient.IsConnected, + SubscriptionVerified = _subscriptionVerified, + LastMessageReceived = _lastMessageReceivedTime, + TimeSinceLastMessage = $"{timeSinceLastMessage.TotalMinutes:F1} 分钟", + SubscriptionTopic = SUBSCRIPTION_TOPIC, + IsHealthy = timeSinceLastMessage.TotalMinutes < 10, + QueueSize = _currentQueueSize, + MaxQueueSize = _maxQueueSize + }; + } + } + catch (Exception ex) + { + Log.Error($"获取订阅状态失败: {ex.Message}"); + return new { Error = ex.Message }; + } + } + + /// + /// 检查连接稳定性 + /// + /// 连接稳定性信息 + public async Task CheckConnectionStabilityAsync() + { + try + { + // 如果连接正常,进行Ping测试 + if (_mqttClient.IsConnected) + { + try + { + var pingStartTime = DateTime.Now; + await _mqttClient.PingAsync(CancellationToken.None); + var pingDuration = DateTime.Now - pingStartTime; + + Log.Information($"连接稳定性检查通过,Ping耗时: {pingDuration.TotalMilliseconds:F2}ms"); + + return new + { + IsConnected = true, + PingSuccess = true, + PingDuration = pingDuration.TotalMilliseconds, + ConnectionTime = DateTime.Now, + LastMessageReceived = _lastMessageReceivedTime, + SubscriptionVerified = _subscriptionVerified, + QueueSize = _currentQueueSize, + MaxQueueSize = _maxQueueSize, + HealthCheckInterval = _healthCheckInterval, + SubscriptionMonitorInterval = _subscriptionMonitorInterval, + MessageProcessingInterval = _messageProcessingInterval + }; + } + catch (Exception pingEx) + { + Log.Warning($"连接稳定性检查失败,Ping失败: {pingEx.Message}"); + return new + { + IsConnected = false, + PingSuccess = false, + PingError = pingEx.Message, + ConnectionTime = DateTime.Now, + LastMessageReceived = _lastMessageReceivedTime, + SubscriptionVerified = _subscriptionVerified, + QueueSize = _currentQueueSize, + MaxQueueSize = _maxQueueSize, + HealthCheckInterval = _healthCheckInterval, + SubscriptionMonitorInterval = _subscriptionMonitorInterval, + MessageProcessingInterval = _messageProcessingInterval + }; + } + } + else + { + Log.Warning("连接稳定性检查失败,MQTT客户端未连接"); + return new + { + IsConnected = false, + ConnectionTime = DateTime.Now, + LastMessageReceived = _lastMessageReceivedTime, + SubscriptionVerified = _subscriptionVerified, + QueueSize = _currentQueueSize, + MaxQueueSize = _maxQueueSize, + HealthCheckInterval = _healthCheckInterval, + SubscriptionMonitorInterval = _subscriptionMonitorInterval, + MessageProcessingInterval = _messageProcessingInterval + }; + } + } + catch (Exception ex) + { + Log.Error($"检查连接稳定性时发生异常: {ex.Message}"); + return new + { + IsConnected = false, + Error = ex.Message, + ConnectionTime = DateTime.Now + }; + } + } + + /// + /// 测试数据格式兼容性 + /// + /// 测试数据 + /// 测试结果 + public dynamic TestDataFormatCompatibility(string testPayload) + { + try + { + Log.Information($"开始测试数据格式兼容性: {testPayload}"); + + // 测试JSON反序列化 + MqttContent content = null; + try + { + content = JsonSerializer.Deserialize(testPayload); + Log.Information("JSON反序列化成功"); + } + catch (JsonException jsonEx) + { + Log.Error($"JSON反序列化失败: {jsonEx.Message}"); + return new + { + Success = false, + Error = "JSON反序列化失败", + Details = jsonEx.Message, + TestPayload = testPayload + }; + } + + if (content == null) + { + return new + { + Success = false, + Error = "JSON反序列化结果为空", + TestPayload = testPayload + }; + } + + // 检查关键字段 + var fieldChecks = new Dictionary + { + ["id"] = content.id, + ["lane"] = content.lane, + ["rfid_1"] = content.rfid_1, + ["rfid_2"] = content.rfid_2, + ["batteryCapacity"] = content.batteryCapacity, + ["isOpenSuccess"] = content.isOpenSuccess, + ["isUav"] = content.isUav, + ["uavcode"] = content.uavcode + }; + + Log.Information($"数据格式兼容性测试成功,字段值: {string.Join(", ", fieldChecks.Select(kv => $"{kv.Key}={kv.Value}"))}"); + + return new + { + Success = true, + Message = "数据格式兼容性测试通过", + DeserializedContent = content, + FieldChecks = fieldChecks, + TestPayload = testPayload + }; + } + catch (Exception ex) + { + Log.Error($"数据格式兼容性测试失败: {ex.Message}"); + return new + { + Success = false, + Error = "测试过程中发生异常", + Details = ex.Message, + TestPayload = testPayload + }; } } @@ -729,7 +1330,10 @@ public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposab try { _healthCheckTimer?.Dispose(); - Log.Information("MQTT健康检查定时器已释放"); + _subscriptionMonitorTimer?.Dispose(); + _messageProcessingTimer?.Dispose(); + _messageProcessingSemaphore?.Dispose(); + Log.Information("MQTT定时器和信号量已释放"); } catch (Exception ex) { diff --git a/netcore/src/Modularity/Extend/NCC.Extend/NCC.Extend.xml b/netcore/src/Modularity/Extend/NCC.Extend/NCC.Extend.xml index 6fefd09..42eee6b 100644 --- a/netcore/src/Modularity/Extend/NCC.Extend/NCC.Extend.xml +++ b/netcore/src/Modularity/Extend/NCC.Extend/NCC.Extend.xml @@ -117,6 +117,31 @@ + + + MQTT 消息信息 + + + + + 消息主题 + + + + + 消息内容 + + + + + 接收时间 + + + + + 设备ID + + MQTT 发布与订阅统一服务 @@ -142,6 +167,31 @@ 健康检查定时器回调方法 + + + 订阅监控定时器回调方法 + + + + + 消息处理定时器回调方法 + + + + + 异步处理单个消息 + + 消息信息 + + + + + 记录失败的消息 + + 消息信息 + 失败原因 + + 向指定 Topic 发布 MQTT 消息 @@ -160,6 +210,16 @@ 确保订阅状态正常,如果订阅丢失则重新订阅 + + + 验证订阅是否真的生效 + + + + + 重试订阅 + + 查询指定clientId是否在线(通过EMQX管理API,带缓存) @@ -193,7 +253,7 @@ - 定期健康检查和自动修复(建议每5分钟调用一次) + 定期健康检查和自动修复(建议每3分钟调用一次) 修复是否成功 @@ -203,6 +263,25 @@ 订阅的主题列表 + + + 获取订阅状态信息 + + 订阅状态信息 + + + + 检查连接稳定性 + + 连接稳定性信息 + + + + 测试数据格式兼容性 + + 测试数据 + 测试结果 + 释放资源 @@ -784,6 +863,18 @@ 查询参数 + + + 将超过30分钟充电状态的无人机格子状态改为充电完成 + + + + + + 定时任务:自动更新超过30分钟的充电状态 + + + 收费规则服务 @@ -1125,6 +1216,17 @@ + + + 检测并清空重复的无人机信息 + + 无人机编码 + 电池1 RFID + 电池2 RFID + 当前设备ID + 当前货道 + + 订单下单开门 diff --git a/netcore/src/Modularity/Extend/NCC.Extend/UavFeeRuleService.cs b/netcore/src/Modularity/Extend/NCC.Extend/UavFeeRuleService.cs index 06c58f8..3706762 100644 --- a/netcore/src/Modularity/Extend/NCC.Extend/UavFeeRuleService.cs +++ b/netcore/src/Modularity/Extend/NCC.Extend/UavFeeRuleService.cs @@ -299,7 +299,7 @@ namespace NCC.Extend.UavFeeRule } #endregion - #region 根据设备来获获取收费规则 + #region 根据设备来获获取收费规则 /// /// 根据设备来获获取收费规则 /// @@ -333,11 +333,11 @@ namespace NCC.Extend.UavFeeRule entity.IsDelete = 99; var isOk = await _db.Updateable(entity).IgnoreColumns(ignoreAllNullColumns: true).ExecuteCommandAsync(); if (!(isOk > 0)) throw NCCException.Oh(ErrorCode.COM1002); - //删除绑定管理 - var relationEntity = await _db.Queryable().FirstAsync(p => p.FeeRuleId == id); - if (relationEntity != null) + // 删除绑定管理(批量删除所有与该收费规则关联的绑定关系) + var hasRelations = await _db.Queryable().AnyAsync(p => p.FeeRuleId == id); + if (hasRelations) { - var deleteResult = await _db.Deleteable(relationEntity).ExecuteCommandAsync(); + var deleteResult = await _db.Deleteable().Where(p => p.FeeRuleId == id).ExecuteCommandAsync(); if (deleteResult <= 0) { throw NCCException.Oh(ErrorCode.COM1002); diff --git a/netcore/src/Modularity/Extend/NCC.Extend/UavOrderService.cs b/netcore/src/Modularity/Extend/NCC.Extend/UavOrderService.cs index 8a64a28..a3433bf 100644 --- a/netcore/src/Modularity/Extend/NCC.Extend/UavOrderService.cs +++ b/netcore/src/Modularity/Extend/NCC.Extend/UavOrderService.cs @@ -587,7 +587,7 @@ namespace NCC.Extend.UavOrder foreach (var item in terminalProfitSettingLIst) { //计算终端人员分润 - var terminalProfitAmount = terminalFinalAmount *( item.ProfitRatio / 100); + var terminalProfitAmount = terminalFinalAmount * (item.ProfitRatio / 100); UavWalletFlowEntity uavWalletFlowEntity_Terminal = new UavWalletFlowEntity { Id = YitIdHelper.NextId().ToString(), @@ -1170,13 +1170,6 @@ namespace NCC.Extend.UavOrder var result = await _db.Ado.UseTranAsync(async () => { var deviceInfo = await _db.Queryable().FirstAsync(x => x.Id == input.id); - //添加电池1 - await HandleReplenishment(input.rfid_1, input, "电池", deviceInfo.BelongUserId); - //添加电池2 - await HandleReplenishment(input.rfid_2, input, "电池", deviceInfo.BelongUserId); - //添加无人机 - await HandleReplenishment(input.uavcode, input, "无人机", deviceInfo.BelongUserId); - var cell = await _db.Queryable().FirstAsync(x => x.DeviceId == input.id && x.CellCode == input.lane); if (input.rfid_1 == "000000") { input.rfid_1 = ""; @@ -1185,6 +1178,15 @@ namespace NCC.Extend.UavOrder { input.rfid_2 = ""; } + // 检测并清空重复的无人机信息 + await ClearDuplicateUavInfo(input.uavcode, input.rfid_1, input.rfid_2, input.id, input.lane); + //添加电池1 + await HandleReplenishment(input.rfid_1, input, "电池", deviceInfo.BelongUserId); + //添加电池2 + await HandleReplenishment(input.rfid_2, input, "电池", deviceInfo.BelongUserId); + //添加无人机 + await HandleReplenishment(input.uavcode, input, "无人机", deviceInfo.BelongUserId); + var cell = await _db.Queryable().FirstAsync(x => x.DeviceId == input.id && x.CellCode == input.lane); //更新货道信息 cell.Rfid1 = input.rfid_1; cell.Rfid2 = input.rfid_2; @@ -1221,6 +1223,46 @@ namespace NCC.Extend.UavOrder await _db.Insertable(newUav).ExecuteCommandAsync(); } } + + /// + /// 检测并清空重复的无人机信息 + /// + /// 无人机编码 + /// 电池1 RFID + /// 电池2 RFID + /// 当前设备ID + /// 当前货道 + /// + [NonAction] + private async Task ClearDuplicateUavInfo(string uavCode, string rfid1, string rfid2, string currentDeviceId, string currentLane) + { + try + { + // 主要检测无人机编码重复,如果重复则清空对应的RFID信息 + if (!string.IsNullOrWhiteSpace(uavCode)) + { + var duplicateUavCells = await _db.Queryable().Where(x => x.UavCode == uavCode && !(x.DeviceId == currentDeviceId && x.CellCode == currentLane)).ToListAsync(); + if (duplicateUavCells.Any()) + { + foreach (var cell in duplicateUavCells) + { + // 清空无人机编码和对应的RFID信息 + cell.UavCode = ""; + cell.Rfid1 = ""; + cell.Rfid2 = ""; + cell.UpdateTime = DateTime.Now; + } + await _db.Updateable(duplicateUavCells).ExecuteCommandAsync(); + Log.Information($"清空了 {duplicateUavCells.Count} 个货道中重复的无人机编码及RFID信息: {uavCode}"); + } + } + } + catch (Exception ex) + { + Log.Error($"清空重复无人机信息失败: {ex.Message}"); + throw new Exception($"清空重复无人机信息失败: {ex.Message}"); + } + } #endregion #region 订单下单开门