// using System; // using System.Collections.Generic; // using System.Text; // using System.Text.Json; // using System.Threading; // using System.Threading.Tasks; // using Aliyun.Acs.Core.Logging; // using Microsoft.Extensions.DependencyInjection; // using MQTTnet; // using MQTTnet.Client; // using MQTTnet.Client.Options; // using NCC.Dependency; // using NCC.Extend.Entitys; // using NCC.Extend.Interfaces.MqttPublisher; // using NCC.Extend.Interfaces.UavOrder; // using Serilog; // using SqlSugar; // using Yitter.IdGenerator; // using System.Net.Http; // using System.Net.Http.Headers; // using System.Net; // using System.Linq; // namespace NCC.Extend; // /// // /// MQTT 发布与订阅统一服务 // /// // public class MqttPublisherService : IMqttPublisherService, ITransient // { // /// // /// MQTT 客户端对象(用于连接、发布、订阅等操作) // /// // private readonly IMqttClient _mqttClient; // /// // /// MQTT 连接参数配置 // /// // private readonly IMqttClientOptions _mqttOptions; // private readonly IServiceProvider _serviceProvider; // private readonly ISqlSugarClient _db; // private readonly Dictionary _onlineStatusCache = new Dictionary(); // private readonly object _cacheLock = new object(); // /// // /// 构造函数:初始化客户端和配置、注册事件 // /// // public MqttPublisherService(IServiceProvider serviceProvider, ISqlSugarClient db) // { // _serviceProvider = serviceProvider; // _db = db; // // 创建 MQTT 客户端实例 // var factory = new MqttFactory(); // _mqttClient = factory.CreateMqttClient(); // // 构建连接配置(MQTT 服务器地址、端口、用户名密码、客户端 ID) // _mqttOptions = new MqttClientOptionsBuilder() // .WithTcpServer("mqtt.cqjiangzhichao.cn", 1883) // Broker 地址 // .WithCredentials("wrjservice", "P@ssw0rd") // 账号密码 // .WithClientId("server_publisher") // 客户端 ID,必须唯一 // .Build(); // // 连接成功事件:订阅所有设备的响应主题(如 device/xxx/response) // _mqttClient.UseConnectedHandler(async e => // { // Log.Information("MQTT 已连接成功"); // // 订阅所有设备的响应主题(+ 代表通配符) // await _mqttClient.SubscribeAsync("device/+/response"); // Log.Information("已订阅通用响应 topic: device/+/response"); // }); // // 连接断开事件 // _mqttClient.UseDisconnectedHandler(async e => // { // Log.Warning("MQTT 已断开连接"); // int retryInterval = 5; // 秒 // while (!_mqttClient.IsConnected) // { // try // { // Log.Information($"尝试在 {retryInterval} 秒后重新连接 MQTT..."); // await Task.Delay(TimeSpan.FromSeconds(retryInterval)); // await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); // Log.Information("MQTT 重连成功"); // break; // 重连成功后跳出循环 // } // catch (Exception ex) // { // Log.Error($"MQTT 重连失败: {ex.Message}"); // // 重连失败后继续循环,但增加延迟避免频繁重试 // await Task.Delay(TimeSpan.FromSeconds(retryInterval * 2)); // } // } // }); // // 收到消息事件:处理设备回传的消息 // _mqttClient.UseApplicationMessageReceivedHandler(e => // { // try // { // // 获取 topic 和 payload 内容 // var topic = e.ApplicationMessage.Topic; // var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? Array.Empty()); // Log.Information($"收到 MQTT 消息,Topic: {topic}, Payload: {payload}"); // // 反序列化 JSON 内容为 MqttContent 实体 // var content = JsonSerializer.Deserialize(payload); // if (content != null) // { // // 从 topic 中提取设备 ID,例如 device/abc123/response // var topicParts = topic.Split('/'); // var deviceId = topicParts.Length >= 2 ? topicParts[1] : "unknown"; // Log.Information($"{topicParts[0]}: {deviceId})"); // UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity // { // Id = YitIdHelper.NextId().ToString(), // MessageType = "response", // Topic = topic, // Payload = payload, // DeviceId = deviceId, // Processed = 1, // Status = "success", // CreatedAt = DateTime.Now, // }; // if (!payload.Contains("\"status\":\"received\"")) // { // //判断两秒内,Payload一样的话,就不处理了 // var twoSecondsAgo = DateTime.Now.AddSeconds(-2); // var lastLog = _db.Queryable().Where(p => p.DeviceId == deviceId && p.Payload == payload && p.CreatedAt >= twoSecondsAgo).Any(); // if (lastLog == false) // { // Log.Information($"[订阅消息] 处理新消息:设备 {deviceId},内容:{payload}"); // var orderService = _serviceProvider.GetRequiredService(); // orderService.HandleSubscribeMessage(payload); // } // else // { // Log.Information($"[订阅消息] 接收到重复的消息,已忽略:设备 {deviceId},内容:{payload}"); // } // //只保存正常消息 // _db.Insertable(logEntity).ExecuteCommand(); // } // else // { // Log.Debug($"[订阅消息] 接收到确认消息,不处理:设备 {deviceId},内容:{payload}"); // // 保存确认消息到日志 // _db.Insertable(logEntity).ExecuteCommand(); // } // } // } // catch (Exception ex) // { // Log.Error($"消息处理异常: {ex.Message}"); // } // }); // } // /// // /// 向指定 Topic 发布 MQTT 消息 // /// // /// 主题路径,例如 device/{id}/command // /// 发送的 JSON 字符串内容 // /// 是否成功发送 // public async Task PublishAsync(string topic, string payload) // { // try // { // // 如果未连接则先连接 // if (!_mqttClient.IsConnected) // { // Log.Warning($"MQTT 未连接,尝试重新连接..."); // await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); // // 等待连接稳定 // int retryCount = 0; // while (!_mqttClient.IsConnected && retryCount < 3) // { // await Task.Delay(1000); // retryCount++; // } // if (!_mqttClient.IsConnected) // { // Log.Error("MQTT 连接失败,无法发送消息"); // return false; // } // } // // 构建 MQTT 消息 // var message = new MqttApplicationMessageBuilder() // .WithTopic(topic) // .WithPayload(payload) // .WithAtLeastOnceQoS() // QoS 1:至少送达一次 // .WithRetainFlag(false) // 不保留历史消息 // .Build(); // // 异步发送 // await _mqttClient.PublishAsync(message); // Log.Information($"MQTT 发布成功 -> Topic: {topic}, Payload: {payload}"); // var topicParts = topic.Split('/'); // UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity // { // Id = YitIdHelper.NextId().ToString(), // MessageType = "send", // Topic = topic, // Payload = payload, // DeviceId = topicParts[1], // Processed = 1, // Status = "success", // CreatedAt = DateTime.Now, // }; // _db.Insertable(logEntity).ExecuteCommand(); // return true; // } // catch (Exception ex) // { // Log.Error($"MQTT 发送失败:{ex.Message}"); // // 记录失败的消息到数据库 // try // { // var topicParts = topic.Split('/'); // UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity // { // Id = YitIdHelper.NextId().ToString(), // MessageType = "send", // Topic = topic, // Payload = payload, // DeviceId = topicParts.Length > 1 ? topicParts[1] : "unknown", // Processed = 0, // Status = "failed", // CreatedAt = DateTime.Now, // }; // _db.Insertable(logEntity).ExecuteCommand(); // } // catch (Exception logEx) // { // Log.Error($"记录失败消息到数据库时出错:{logEx.Message}"); // } // return false; // } // } // /// // /// 手动启动 MQTT 客户端(推荐在程序启动时调用) // /// // public async Task StartAsync() // { // if (!_mqttClient.IsConnected) // { // await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); // } // } // /// // /// 查询指定clientId是否在线(通过EMQX管理API,带缓存) // /// // /// 要查询的客户端ID // /// 在线返回true,不在线返回false // public async Task IsClientOnlineAsync(string clientId) // { // if (string.IsNullOrEmpty(clientId)) // { // Log.Warning("设备ID为空,无法查询在线状态"); // return false; // } // // 检查缓存 - 增加缓存时间到60秒,减少API调用 // lock (_cacheLock) // { // if (_onlineStatusCache.TryGetValue(clientId, out var cacheEntry)) // { // var (isOnline, lastCheck) = cacheEntry; // var cacheAge = DateTime.Now - lastCheck; // // 如果缓存时间小于60秒,直接返回缓存结果 // if (cacheAge.TotalSeconds < 60) // { // Log.Debug($"使用缓存结果:设备 {clientId} 在线状态 = {isOnline}"); // return isOnline; // } // } // } // // 使用静态HttpClient以提高性能 // using (var client = new HttpClient()) // { // client.Timeout = TimeSpan.FromSeconds(3); // 减少超时时间到3秒 // // 使用你的EMQX管理账号密码 // var byteArray = Encoding.ASCII.GetBytes("285f888395fa6906:9BAdZeir9C3IVweof9Ate8HfQpn2u6GrH8Q9ArAF9Bb26tnI"); // client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(byteArray)); // // 减少重试次数,提高响应速度 // int maxRetries = 1; // for (int retry = 0; retry <= maxRetries; retry++) // { // try // { // // 查询指定客户端 // var clientUrl = $"http://mqtt.cqjiangzhichao.cn/api/v5/clients/{clientId}"; // var response = await client.GetAsync(clientUrl); // bool isOnline = false; // if (response.StatusCode == HttpStatusCode.OK) // { // Log.Debug($"设备 {clientId} 在线状态查询成功"); // isOnline = true; // } // else if (response.StatusCode == HttpStatusCode.NotFound) // { // Log.Debug($"设备 {clientId} 不在线"); // isOnline = false; // } // else // { // Log.Warning($"查询设备 {clientId} 在线状态失败,状态码: {response.StatusCode},重试次数: {retry + 1}"); // if (retry == maxRetries) // { // Log.Error($"查询设备 {clientId} 在线状态最终失败,状态码: {response.StatusCode}"); // isOnline = false; // } // else // { // await Task.Delay(500); // 减少等待时间到500ms // continue; // } // } // // 更新缓存 // lock (_cacheLock) // { // _onlineStatusCache[clientId] = (isOnline, DateTime.Now); // } // return isOnline; // } // catch (TaskCanceledException ex) // { // Log.Warning($"查询设备 {clientId} 在线状态超时,重试次数: {retry + 1}"); // if (retry == maxRetries) // { // Log.Error($"查询设备 {clientId} 在线状态最终超时"); // // 更新缓存为false // lock (_cacheLock) // { // _onlineStatusCache[clientId] = (false, DateTime.Now); // } // return false; // } // await Task.Delay(500); // 减少等待时间到500ms // } // catch (Exception ex) // { // Log.Error($"查询设备 {clientId} 在线状态异常: {ex.Message},重试次数: {retry + 1}"); // if (retry == maxRetries) // { // // 更新缓存为false // lock (_cacheLock) // { // _onlineStatusCache[clientId] = (false, DateTime.Now); // } // return false; // } // await Task.Delay(500); // 减少等待时间到500ms // } // } // // 更新缓存为false // lock (_cacheLock) // { // _onlineStatusCache[clientId] = (false, DateTime.Now); // } // return false; // } // } // /// // /// 批量查询设备在线状态(提高效率) // /// // /// 要查询的客户端ID列表 // /// 在线状态字典 // public async Task> BatchCheckOnlineStatusAsync(List clientIds) // { // if (clientIds == null || clientIds.Count == 0) // { // return new Dictionary(); // } // var result = new Dictionary(); // var needQueryIds = new List(); // // 先检查缓存 // lock (_cacheLock) // { // foreach (var clientId in clientIds) // { // if (_onlineStatusCache.TryGetValue(clientId, out var cacheEntry)) // { // var (isOnline, lastCheck) = cacheEntry; // var cacheAge = DateTime.Now - lastCheck; // // 如果缓存时间小于60秒,使用缓存结果 // if (cacheAge.TotalSeconds < 60) // { // result[clientId] = isOnline; // continue; // } // } // needQueryIds.Add(clientId); // } // } // // 如果没有需要查询的设备,直接返回缓存结果 // if (needQueryIds.Count == 0) // { // return result; // } // // 批量查询在线状态 // using (var client = new HttpClient()) // { // client.Timeout = TimeSpan.FromSeconds(5); // var byteArray = Encoding.ASCII.GetBytes("285f888395fa6906:9BAdZeir9C3IVweof9Ate8HfQpn2u6GrH8Q9ArAF9Bb26tnI"); // client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(byteArray)); // // 并行查询多个设备 // var tasks = needQueryIds.Select(async clientId => // { // try // { // var clientUrl = $"http://mqtt.cqjiangzhichao.cn/api/v5/clients/{clientId}"; // var response = await client.GetAsync(clientUrl); // bool isOnline = response.StatusCode == HttpStatusCode.OK; // // 更新缓存 // lock (_cacheLock) // { // _onlineStatusCache[clientId] = (isOnline, DateTime.Now); // } // return new { ClientId = clientId, IsOnline = isOnline }; // } // catch (Exception ex) // { // Log.Warning($"批量查询设备 {clientId} 在线状态失败: {ex.Message}"); // // 更新缓存为false // lock (_cacheLock) // { // _onlineStatusCache[clientId] = (false, DateTime.Now); // } // return new { ClientId = clientId, IsOnline = false }; // } // }); // var queryResults = await Task.WhenAll(tasks); // // 合并结果 // foreach (var queryResult in queryResults) // { // result[queryResult.ClientId] = queryResult.IsOnline; // } // } // return result; // } // /// // /// 清理过期的缓存数据(建议定期调用) // /// // public void CleanupExpiredCache() // { // lock (_cacheLock) // { // var now = DateTime.Now; // var expiredKeys = new List(); // foreach (var kvp in _onlineStatusCache) // { // var (isOnline, lastCheck) = kvp.Value; // var cacheAge = now - lastCheck; // // 如果缓存时间超过5分钟,标记为过期 // if (cacheAge.TotalMinutes > 5) // { // expiredKeys.Add(kvp.Key); // } // } // // 删除过期的缓存 // foreach (var key in expiredKeys) // { // _onlineStatusCache.Remove(key); // } // if (expiredKeys.Count > 0) // { // Log.Information($"清理了 {expiredKeys.Count} 个过期的在线状态缓存"); // } // } // } // #region 私有回调逻辑 // #endregion // /// // /// 检查MQTT连接健康状态 // /// // /// 连接状态信息 // public async Task GetConnectionHealthAsync() // { // try // { // if (_mqttClient.IsConnected) // { // try // { // await _mqttClient.PingAsync(CancellationToken.None); // Log.Debug("MQTT Ping 成功"); // return new // { // IsConnected = true, // ConnectionTime = DateTime.Now, // LastPing = DateTime.Now, // ClientId = _mqttOptions.ClientId, // BrokerAddress = "mqtt.cqjiangzhichao.cn:1883" // }; // } // catch (Exception pingEx) // { // Log.Warning($"MQTT Ping 失败: {pingEx.Message}"); // return new // { // IsConnected = false, // ConnectionTime = (DateTime?)null, // LastPing = DateTime.Now, // ClientId = _mqttOptions.ClientId, // BrokerAddress = "mqtt.cqjiangzhichao.cn:1883", // PingError = pingEx.Message // }; // } // } // else // { // return new // { // IsConnected = false, // ConnectionTime = (DateTime?)null, // LastPing = DateTime.Now, // ClientId = _mqttOptions.ClientId, // BrokerAddress = "mqtt.cqjiangzhichao.cn:1883" // }; // } // } // catch (Exception ex) // { // Log.Error($"获取连接健康状态失败: {ex.Message}"); // return new // { // IsConnected = false, // Error = ex.Message, // Timestamp = DateTime.Now // }; // } // } // /// // /// 强制重新连接MQTT // /// // /// 重连是否成功 // public async Task ForceReconnectAsync() // { // try // { // if (_mqttClient.IsConnected) // { // await _mqttClient.DisconnectAsync(); // Log.Information("MQTT 主动断开连接"); // } // await Task.Delay(2000); // 等待2秒后重连 // await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None); // Log.Information("MQTT 强制重连成功"); // return true; // } // catch (Exception ex) // { // Log.Error($"MQTT 强制重连失败: {ex.Message}"); // return false; // } // } // }