// using System;
// using System.Collections.Generic;
// using System.Text;
// using System.Text.Json;
// using System.Threading;
// using System.Threading.Tasks;
// using Aliyun.Acs.Core.Logging;
// using Microsoft.Extensions.DependencyInjection;
// using MQTTnet;
// using MQTTnet.Client;
// using MQTTnet.Client.Options;
// using NCC.Dependency;
// using NCC.Extend.Entitys;
// using NCC.Extend.Interfaces.MqttPublisher;
// using NCC.Extend.Interfaces.UavOrder;
// using Serilog;
// using SqlSugar;
// using Yitter.IdGenerator;
// using System.Net.Http;
// using System.Net.Http.Headers;
// using System.Net;
// using System.Linq;
// namespace NCC.Extend;
// ///
// /// MQTT 发布与订阅统一服务
// ///
// public class MqttPublisherService : IMqttPublisherService, ITransient
// {
// ///
// /// MQTT 客户端对象(用于连接、发布、订阅等操作)
// ///
// private readonly IMqttClient _mqttClient;
// ///
// /// MQTT 连接参数配置
// ///
// private readonly IMqttClientOptions _mqttOptions;
// private readonly IServiceProvider _serviceProvider;
// private readonly ISqlSugarClient _db;
// private readonly Dictionary _onlineStatusCache = new Dictionary();
// private readonly object _cacheLock = new object();
// ///
// /// 构造函数:初始化客户端和配置、注册事件
// ///
// public MqttPublisherService(IServiceProvider serviceProvider, ISqlSugarClient db)
// {
// _serviceProvider = serviceProvider;
// _db = db;
// // 创建 MQTT 客户端实例
// var factory = new MqttFactory();
// _mqttClient = factory.CreateMqttClient();
// // 构建连接配置(MQTT 服务器地址、端口、用户名密码、客户端 ID)
// _mqttOptions = new MqttClientOptionsBuilder()
// .WithTcpServer("mqtt.cqjiangzhichao.cn", 1883) // Broker 地址
// .WithCredentials("wrjservice", "P@ssw0rd") // 账号密码
// .WithClientId("server_publisher") // 客户端 ID,必须唯一
// .Build();
// // 连接成功事件:订阅所有设备的响应主题(如 device/xxx/response)
// _mqttClient.UseConnectedHandler(async e =>
// {
// Log.Information("MQTT 已连接成功");
// // 订阅所有设备的响应主题(+ 代表通配符)
// await _mqttClient.SubscribeAsync("device/+/response");
// Log.Information("已订阅通用响应 topic: device/+/response");
// });
// // 连接断开事件
// _mqttClient.UseDisconnectedHandler(async e =>
// {
// Log.Warning("MQTT 已断开连接");
// int retryInterval = 5; // 秒
// while (!_mqttClient.IsConnected)
// {
// try
// {
// Log.Information($"尝试在 {retryInterval} 秒后重新连接 MQTT...");
// await Task.Delay(TimeSpan.FromSeconds(retryInterval));
// await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
// Log.Information("MQTT 重连成功");
// break; // 重连成功后跳出循环
// }
// catch (Exception ex)
// {
// Log.Error($"MQTT 重连失败: {ex.Message}");
// // 重连失败后继续循环,但增加延迟避免频繁重试
// await Task.Delay(TimeSpan.FromSeconds(retryInterval * 2));
// }
// }
// });
// // 收到消息事件:处理设备回传的消息
// _mqttClient.UseApplicationMessageReceivedHandler(e =>
// {
// try
// {
// // 获取 topic 和 payload 内容
// var topic = e.ApplicationMessage.Topic;
// var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? Array.Empty());
// Log.Information($"收到 MQTT 消息,Topic: {topic}, Payload: {payload}");
// // 反序列化 JSON 内容为 MqttContent 实体
// var content = JsonSerializer.Deserialize(payload);
// if (content != null)
// {
// // 从 topic 中提取设备 ID,例如 device/abc123/response
// var topicParts = topic.Split('/');
// var deviceId = topicParts.Length >= 2 ? topicParts[1] : "unknown";
// Log.Information($"{topicParts[0]}: {deviceId})");
// UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity
// {
// Id = YitIdHelper.NextId().ToString(),
// MessageType = "response",
// Topic = topic,
// Payload = payload,
// DeviceId = deviceId,
// Processed = 1,
// Status = "success",
// CreatedAt = DateTime.Now,
// };
// if (!payload.Contains("\"status\":\"received\""))
// {
// //判断两秒内,Payload一样的话,就不处理了
// var twoSecondsAgo = DateTime.Now.AddSeconds(-2);
// var lastLog = _db.Queryable().Where(p => p.DeviceId == deviceId && p.Payload == payload && p.CreatedAt >= twoSecondsAgo).Any();
// if (lastLog == false)
// {
// Log.Information($"[订阅消息] 处理新消息:设备 {deviceId},内容:{payload}");
// var orderService = _serviceProvider.GetRequiredService();
// orderService.HandleSubscribeMessage(payload);
// }
// else
// {
// Log.Information($"[订阅消息] 接收到重复的消息,已忽略:设备 {deviceId},内容:{payload}");
// }
// //只保存正常消息
// _db.Insertable(logEntity).ExecuteCommand();
// }
// else
// {
// Log.Debug($"[订阅消息] 接收到确认消息,不处理:设备 {deviceId},内容:{payload}");
// // 保存确认消息到日志
// _db.Insertable(logEntity).ExecuteCommand();
// }
// }
// }
// catch (Exception ex)
// {
// Log.Error($"消息处理异常: {ex.Message}");
// }
// });
// }
// ///
// /// 向指定 Topic 发布 MQTT 消息
// ///
// /// 主题路径,例如 device/{id}/command
// /// 发送的 JSON 字符串内容
// /// 是否成功发送
// public async Task PublishAsync(string topic, string payload)
// {
// try
// {
// // 如果未连接则先连接
// if (!_mqttClient.IsConnected)
// {
// Log.Warning($"MQTT 未连接,尝试重新连接...");
// await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
// // 等待连接稳定
// int retryCount = 0;
// while (!_mqttClient.IsConnected && retryCount < 3)
// {
// await Task.Delay(1000);
// retryCount++;
// }
// if (!_mqttClient.IsConnected)
// {
// Log.Error("MQTT 连接失败,无法发送消息");
// return false;
// }
// }
// // 构建 MQTT 消息
// var message = new MqttApplicationMessageBuilder()
// .WithTopic(topic)
// .WithPayload(payload)
// .WithAtLeastOnceQoS() // QoS 1:至少送达一次
// .WithRetainFlag(false) // 不保留历史消息
// .Build();
// // 异步发送
// await _mqttClient.PublishAsync(message);
// Log.Information($"MQTT 发布成功 -> Topic: {topic}, Payload: {payload}");
// var topicParts = topic.Split('/');
// UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity
// {
// Id = YitIdHelper.NextId().ToString(),
// MessageType = "send",
// Topic = topic,
// Payload = payload,
// DeviceId = topicParts[1],
// Processed = 1,
// Status = "success",
// CreatedAt = DateTime.Now,
// };
// _db.Insertable(logEntity).ExecuteCommand();
// return true;
// }
// catch (Exception ex)
// {
// Log.Error($"MQTT 发送失败:{ex.Message}");
// // 记录失败的消息到数据库
// try
// {
// var topicParts = topic.Split('/');
// UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity
// {
// Id = YitIdHelper.NextId().ToString(),
// MessageType = "send",
// Topic = topic,
// Payload = payload,
// DeviceId = topicParts.Length > 1 ? topicParts[1] : "unknown",
// Processed = 0,
// Status = "failed",
// CreatedAt = DateTime.Now,
// };
// _db.Insertable(logEntity).ExecuteCommand();
// }
// catch (Exception logEx)
// {
// Log.Error($"记录失败消息到数据库时出错:{logEx.Message}");
// }
// return false;
// }
// }
// ///
// /// 手动启动 MQTT 客户端(推荐在程序启动时调用)
// ///
// public async Task StartAsync()
// {
// if (!_mqttClient.IsConnected)
// {
// await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
// }
// }
// ///
// /// 查询指定clientId是否在线(通过EMQX管理API,带缓存)
// ///
// /// 要查询的客户端ID
// /// 在线返回true,不在线返回false
// public async Task IsClientOnlineAsync(string clientId)
// {
// if (string.IsNullOrEmpty(clientId))
// {
// Log.Warning("设备ID为空,无法查询在线状态");
// return false;
// }
// // 检查缓存 - 增加缓存时间到60秒,减少API调用
// lock (_cacheLock)
// {
// if (_onlineStatusCache.TryGetValue(clientId, out var cacheEntry))
// {
// var (isOnline, lastCheck) = cacheEntry;
// var cacheAge = DateTime.Now - lastCheck;
// // 如果缓存时间小于60秒,直接返回缓存结果
// if (cacheAge.TotalSeconds < 60)
// {
// Log.Debug($"使用缓存结果:设备 {clientId} 在线状态 = {isOnline}");
// return isOnline;
// }
// }
// }
// // 使用静态HttpClient以提高性能
// using (var client = new HttpClient())
// {
// client.Timeout = TimeSpan.FromSeconds(3); // 减少超时时间到3秒
// // 使用你的EMQX管理账号密码
// var byteArray = Encoding.ASCII.GetBytes("285f888395fa6906:9BAdZeir9C3IVweof9Ate8HfQpn2u6GrH8Q9ArAF9Bb26tnI");
// client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(byteArray));
// // 减少重试次数,提高响应速度
// int maxRetries = 1;
// for (int retry = 0; retry <= maxRetries; retry++)
// {
// try
// {
// // 查询指定客户端
// var clientUrl = $"http://mqtt.cqjiangzhichao.cn/api/v5/clients/{clientId}";
// var response = await client.GetAsync(clientUrl);
// bool isOnline = false;
// if (response.StatusCode == HttpStatusCode.OK)
// {
// Log.Debug($"设备 {clientId} 在线状态查询成功");
// isOnline = true;
// }
// else if (response.StatusCode == HttpStatusCode.NotFound)
// {
// Log.Debug($"设备 {clientId} 不在线");
// isOnline = false;
// }
// else
// {
// Log.Warning($"查询设备 {clientId} 在线状态失败,状态码: {response.StatusCode},重试次数: {retry + 1}");
// if (retry == maxRetries)
// {
// Log.Error($"查询设备 {clientId} 在线状态最终失败,状态码: {response.StatusCode}");
// isOnline = false;
// }
// else
// {
// await Task.Delay(500); // 减少等待时间到500ms
// continue;
// }
// }
// // 更新缓存
// lock (_cacheLock)
// {
// _onlineStatusCache[clientId] = (isOnline, DateTime.Now);
// }
// return isOnline;
// }
// catch (TaskCanceledException ex)
// {
// Log.Warning($"查询设备 {clientId} 在线状态超时,重试次数: {retry + 1}");
// if (retry == maxRetries)
// {
// Log.Error($"查询设备 {clientId} 在线状态最终超时");
// // 更新缓存为false
// lock (_cacheLock)
// {
// _onlineStatusCache[clientId] = (false, DateTime.Now);
// }
// return false;
// }
// await Task.Delay(500); // 减少等待时间到500ms
// }
// catch (Exception ex)
// {
// Log.Error($"查询设备 {clientId} 在线状态异常: {ex.Message},重试次数: {retry + 1}");
// if (retry == maxRetries)
// {
// // 更新缓存为false
// lock (_cacheLock)
// {
// _onlineStatusCache[clientId] = (false, DateTime.Now);
// }
// return false;
// }
// await Task.Delay(500); // 减少等待时间到500ms
// }
// }
// // 更新缓存为false
// lock (_cacheLock)
// {
// _onlineStatusCache[clientId] = (false, DateTime.Now);
// }
// return false;
// }
// }
// ///
// /// 批量查询设备在线状态(提高效率)
// ///
// /// 要查询的客户端ID列表
// /// 在线状态字典
// public async Task> BatchCheckOnlineStatusAsync(List clientIds)
// {
// if (clientIds == null || clientIds.Count == 0)
// {
// return new Dictionary();
// }
// var result = new Dictionary();
// var needQueryIds = new List();
// // 先检查缓存
// lock (_cacheLock)
// {
// foreach (var clientId in clientIds)
// {
// if (_onlineStatusCache.TryGetValue(clientId, out var cacheEntry))
// {
// var (isOnline, lastCheck) = cacheEntry;
// var cacheAge = DateTime.Now - lastCheck;
// // 如果缓存时间小于60秒,使用缓存结果
// if (cacheAge.TotalSeconds < 60)
// {
// result[clientId] = isOnline;
// continue;
// }
// }
// needQueryIds.Add(clientId);
// }
// }
// // 如果没有需要查询的设备,直接返回缓存结果
// if (needQueryIds.Count == 0)
// {
// return result;
// }
// // 批量查询在线状态
// using (var client = new HttpClient())
// {
// client.Timeout = TimeSpan.FromSeconds(5);
// var byteArray = Encoding.ASCII.GetBytes("285f888395fa6906:9BAdZeir9C3IVweof9Ate8HfQpn2u6GrH8Q9ArAF9Bb26tnI");
// client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(byteArray));
// // 并行查询多个设备
// var tasks = needQueryIds.Select(async clientId =>
// {
// try
// {
// var clientUrl = $"http://mqtt.cqjiangzhichao.cn/api/v5/clients/{clientId}";
// var response = await client.GetAsync(clientUrl);
// bool isOnline = response.StatusCode == HttpStatusCode.OK;
// // 更新缓存
// lock (_cacheLock)
// {
// _onlineStatusCache[clientId] = (isOnline, DateTime.Now);
// }
// return new { ClientId = clientId, IsOnline = isOnline };
// }
// catch (Exception ex)
// {
// Log.Warning($"批量查询设备 {clientId} 在线状态失败: {ex.Message}");
// // 更新缓存为false
// lock (_cacheLock)
// {
// _onlineStatusCache[clientId] = (false, DateTime.Now);
// }
// return new { ClientId = clientId, IsOnline = false };
// }
// });
// var queryResults = await Task.WhenAll(tasks);
// // 合并结果
// foreach (var queryResult in queryResults)
// {
// result[queryResult.ClientId] = queryResult.IsOnline;
// }
// }
// return result;
// }
// ///
// /// 清理过期的缓存数据(建议定期调用)
// ///
// public void CleanupExpiredCache()
// {
// lock (_cacheLock)
// {
// var now = DateTime.Now;
// var expiredKeys = new List();
// foreach (var kvp in _onlineStatusCache)
// {
// var (isOnline, lastCheck) = kvp.Value;
// var cacheAge = now - lastCheck;
// // 如果缓存时间超过5分钟,标记为过期
// if (cacheAge.TotalMinutes > 5)
// {
// expiredKeys.Add(kvp.Key);
// }
// }
// // 删除过期的缓存
// foreach (var key in expiredKeys)
// {
// _onlineStatusCache.Remove(key);
// }
// if (expiredKeys.Count > 0)
// {
// Log.Information($"清理了 {expiredKeys.Count} 个过期的在线状态缓存");
// }
// }
// }
// #region 私有回调逻辑
// #endregion
// ///
// /// 检查MQTT连接健康状态
// ///
// /// 连接状态信息
// public async Task GetConnectionHealthAsync()
// {
// try
// {
// if (_mqttClient.IsConnected)
// {
// try
// {
// await _mqttClient.PingAsync(CancellationToken.None);
// Log.Debug("MQTT Ping 成功");
// return new
// {
// IsConnected = true,
// ConnectionTime = DateTime.Now,
// LastPing = DateTime.Now,
// ClientId = _mqttOptions.ClientId,
// BrokerAddress = "mqtt.cqjiangzhichao.cn:1883"
// };
// }
// catch (Exception pingEx)
// {
// Log.Warning($"MQTT Ping 失败: {pingEx.Message}");
// return new
// {
// IsConnected = false,
// ConnectionTime = (DateTime?)null,
// LastPing = DateTime.Now,
// ClientId = _mqttOptions.ClientId,
// BrokerAddress = "mqtt.cqjiangzhichao.cn:1883",
// PingError = pingEx.Message
// };
// }
// }
// else
// {
// return new
// {
// IsConnected = false,
// ConnectionTime = (DateTime?)null,
// LastPing = DateTime.Now,
// ClientId = _mqttOptions.ClientId,
// BrokerAddress = "mqtt.cqjiangzhichao.cn:1883"
// };
// }
// }
// catch (Exception ex)
// {
// Log.Error($"获取连接健康状态失败: {ex.Message}");
// return new
// {
// IsConnected = false,
// Error = ex.Message,
// Timestamp = DateTime.Now
// };
// }
// }
// ///
// /// 强制重新连接MQTT
// ///
// /// 重连是否成功
// public async Task ForceReconnectAsync()
// {
// try
// {
// if (_mqttClient.IsConnected)
// {
// await _mqttClient.DisconnectAsync();
// Log.Information("MQTT 主动断开连接");
// }
// await Task.Delay(2000); // 等待2秒后重连
// await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
// Log.Information("MQTT 强制重连成功");
// return true;
// }
// catch (Exception ex)
// {
// Log.Error($"MQTT 强制重连失败: {ex.Message}");
// return false;
// }
// }
// }