Blame view

netcore/src/Modularity/Extend/NCC.Extend/MqttPublisherService.cs 50.1 KB
de2bd2f9   “wangming”   项目初始化
1
  using System;
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
2
  using System.Collections.Concurrent;
de2bd2f9   “wangming”   项目初始化
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
  using System.Collections.Generic;
  using System.Text;
  using System.Text.Json;
  using System.Threading;
  using System.Threading.Tasks;
  using Aliyun.Acs.Core.Logging;
  using Microsoft.Extensions.DependencyInjection;
  using MQTTnet;
  using MQTTnet.Client;
  using MQTTnet.Client.Options;
  using NCC.Dependency;
  using NCC.Extend.Entitys;
  using NCC.Extend.Interfaces.MqttPublisher;
  using NCC.Extend.Interfaces.UavOrder;
  using Serilog;
  using SqlSugar;
  using Yitter.IdGenerator;
  using System.Net.Http;
  using System.Net.Http.Headers;
  using System.Net;
  using System.Linq;
  
  namespace NCC.Extend;
  
  /// <summary>
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
  /// MQTT 消息信息
  /// </summary>
  public class MqttMessageInfo
  {
      /// <summary>
      /// 消息主题
      /// </summary>
      public string Topic { get; set; }
      
      /// <summary>
      /// 消息内容
      /// </summary>
      public string Payload { get; set; }
      
      /// <summary>
      /// 接收时间
      /// </summary>
      public DateTime ReceivedTime { get; set; }
      
      /// <summary>
      /// 设备ID
      /// </summary>
      public string DeviceId { get; set; }
  }
  
  /// <summary>
de2bd2f9   “wangming”   项目初始化
54
55
  /// MQTT 发布与订阅统一服务
  /// </summary>
a0ec3cdb   “wangming”   优化了MQTT连接
56
  public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposable
de2bd2f9   “wangming”   项目初始化
57
58
59
60
61
62
63
64
65
66
67
68
69
70
  {
      /// <summary>
      /// MQTT 客户端对象(用于连接、发布、订阅等操作)
      /// </summary>
      private readonly IMqttClient _mqttClient;
  
      /// <summary>
      /// MQTT 连接参数配置
      /// </summary>
      private readonly IMqttClientOptions _mqttOptions;
      private readonly IServiceProvider _serviceProvider;
      private readonly ISqlSugarClient _db;
      private readonly Dictionary<string, (bool isOnline, DateTime lastCheck)> _onlineStatusCache = new Dictionary<string, (bool, DateTime)>();
      private readonly object _cacheLock = new object();
a0ec3cdb   “wangming”   优化了MQTT连接
71
      private readonly Timer _healthCheckTimer; // 健康检查定时器
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
      private readonly Timer _subscriptionMonitorTimer; // 订阅监控定时器
      private readonly int _healthCheckInterval = 3 * 60 * 1000; // 3分钟检查一次
      private readonly int _subscriptionMonitorInterval = 30 * 1000; // 30秒监控一次订阅状态
      
      // 消息队列处理
      private readonly ConcurrentQueue<MqttMessageInfo> _messageQueue = new ConcurrentQueue<MqttMessageInfo>();
      private readonly SemaphoreSlim _messageProcessingSemaphore = new SemaphoreSlim(5, 5); // 最多5个并发处理
      private readonly Timer _messageProcessingTimer; // 消息处理定时器
      private readonly int _messageProcessingInterval = 100; // 100ms处理一次队列
      private readonly int _maxQueueSize = 10000; // 最大队列大小,防止内存溢出
      private volatile int _currentQueueSize = 0; // 当前队列大小
      
      // 订阅状态监控
      private DateTime _lastMessageReceivedTime = DateTime.Now;
      private bool _subscriptionVerified = false;
      private DateTime _lastRetryTime = DateTime.MinValue; // 上次重试时间
      private readonly object _subscriptionLock = new object();
      private const string SUBSCRIPTION_TOPIC = "device/+/response";
      private const int RETRY_INTERVAL_MINUTES = 5; // 重试间隔5分钟
      
de2bd2f9   “wangming”   项目初始化
92
      /// <summary>
69379643   “wangming”   新增获取客户端ID功能,区分生产环...
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
      /// 获取客户端ID,区分生产环境和开发环境
      /// </summary>
      /// <returns>客户端ID</returns>
      private string GetClientId()
      {
          // 手动设置:true为生产环境,false为开发环境
          bool isProduction = false; // 这里可以手动调整
          
          if (isProduction)
          {
              // 生产环境使用固定ID
              return "server_publisher";
          }
          else
          {
              // 开发环境使用带机器名的ID,避免冲突
              var machineName = Environment.MachineName;
              return $"dev_publisher_{machineName}";
          }
      }
  
      /// <summary>
de2bd2f9   “wangming”   项目初始化
115
116
117
118
119
120
121
122
123
124
125
      /// 构造函数:初始化客户端和配置、注册事件
      /// </summary>
      public MqttPublisherService(IServiceProvider serviceProvider, ISqlSugarClient db)
      {
          _serviceProvider = serviceProvider;
          _db = db;
          // 创建 MQTT 客户端实例
          var factory = new MqttFactory();
          _mqttClient = factory.CreateMqttClient();
  
          // 构建连接配置(MQTT 服务器地址、端口、用户名密码、客户端 ID
69379643   “wangming”   新增获取客户端ID功能,区分生产环...
126
          var clientId = GetClientId();
de2bd2f9   “wangming”   项目初始化
127
128
129
          _mqttOptions = new MqttClientOptionsBuilder()
              .WithTcpServer("mqtt.cqjiangzhichao.cn", 1883)     // Broker 地址
              .WithCredentials("wrjservice", "P@ssw0rd")         // 账号密码
69379643   “wangming”   新增获取客户端ID功能,区分生产环...
130
              .WithClientId(clientId)                            // 客户端 ID,必须唯一
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
131
132
              .WithKeepAlivePeriod(TimeSpan.FromSeconds(60))     // 保持连接心跳
              .WithCleanSession(false)                           // 保持会话状态
de2bd2f9   “wangming”   项目初始化
133
134
135
136
137
              .Build();
  
          // 连接成功事件:订阅所有设备的响应主题(如 device/xxx/response
          _mqttClient.UseConnectedHandler(async e =>
          {
69379643   “wangming”   新增获取客户端ID功能,区分生产环...
138
              Log.Information($"MQTT 已连接成功,客户端ID: {_mqttOptions.ClientId}");
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
139
140
              _subscriptionVerified = false; // 重置订阅验证状态
              
de2bd2f9   “wangming”   项目初始化
141
              // 订阅所有设备的响应主题(+ 代表通配符)
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
142
              await EnsureSubscriptionAsync();
de2bd2f9   “wangming”   项目初始化
143
144
145
          });
  
          // 连接断开事件
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
146
          _mqttClient.UseDisconnectedHandler(e =>
de2bd2f9   “wangming”   项目初始化
147
          {
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
148
149
150
151
152
              Log.Warning($"MQTT 已断开连接,原因: {e.Reason}");
              _subscriptionVerified = false; // 重置订阅验证状态
              
              // 使用Task.Run避免阻塞MQTT客户端线程
              _ = Task.Run(async () =>
62329717   “wangming”   修复了一些BUG
153
              {
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
154
155
156
157
158
                  int retryInterval = 5; // 
                  int maxRetries = 10; // 最大重试次数
                  int retryCount = 0;
                  
                  while (!_mqttClient.IsConnected && retryCount < maxRetries)
62329717   “wangming”   修复了一些BUG
159
                  {
a0ec3cdb   “wangming”   优化了MQTT连接
160
161
                      try
                      {
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
162
163
164
165
166
167
168
169
170
171
172
                          retryCount++;
                          Log.Information($"尝试第 {retryCount} 次重新连接 MQTT,{retryInterval} 秒后开始...");
                          await Task.Delay(TimeSpan.FromSeconds(retryInterval));
                          
                          await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
                          Log.Information("MQTT 重连成功");
                          
                          // 重连成功后重新订阅主题
                          await EnsureSubscriptionAsync();
                          
                          break; // 重连成功后跳出循环
a0ec3cdb   “wangming”   优化了MQTT连接
173
                      }
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
174
                      catch (Exception ex)
a0ec3cdb   “wangming”   优化了MQTT连接
175
                      {
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
176
177
178
179
180
181
182
183
184
185
186
                          Log.Error($"MQTT 第 {retryCount} 次重连失败: {ex.Message}");
                          
                          if (retryCount >= maxRetries)
                          {
                              Log.Error($"MQTT 重连失败次数达到上限 ({maxRetries}),停止重连");
                              break;
                          }
                          
                          // 重连失败后增加延迟,避免频繁重试
                          retryInterval = Math.Min(retryInterval * 2, 60); // 最大延迟60
                          await Task.Delay(TimeSpan.FromSeconds(retryInterval));
a0ec3cdb   “wangming”   优化了MQTT连接
187
                      }
62329717   “wangming”   修复了一些BUG
188
                  }
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
189
              });
de2bd2f9   “wangming”   项目初始化
190
          });
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
191
192
          
          // 收到消息事件:快速入队,避免阻塞
de2bd2f9   “wangming”   项目初始化
193
194
195
196
          _mqttClient.UseApplicationMessageReceivedHandler(e =>
          {
              try
              {
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
197
198
199
200
201
202
203
                  // 更新最后收到消息的时间和验证状态
                  lock (_subscriptionLock)
                  {
                      _lastMessageReceivedTime = DateTime.Now;
                      _subscriptionVerified = true; // 只有收到实际消息才标记为已验证
                  }
                  
de2bd2f9   “wangming”   项目初始化
204
205
206
                  // 获取 topic  payload 内容
                  var topic = e.ApplicationMessage.Topic;
                  var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? Array.Empty<byte>());
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
207
208
209
210
211
212
213
214
215
216
217
                  
                  //  topic 中提取设备 ID
                  var topicParts = topic.Split('/');
                  var deviceId = topicParts.Length >= 2 ? topicParts[1] : "unknown";
                  
                  // 检查队列大小,防止内存溢出
                  if (_currentQueueSize >= _maxQueueSize)
                  {
                      Log.Warning($"消息队列已满({_maxQueueSize}),丢弃消息: Topic: {topic}, DeviceId: {deviceId}");
                      // 记录丢弃的消息(异步执行,不阻塞)
                      _ = Task.Run(async () =>
de2bd2f9   “wangming”   项目初始化
218
                      {
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
219
                          await LogFailedMessageAsync(new MqttMessageInfo
de2bd2f9   “wangming”   项目初始化
220
                          {
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
221
222
223
224
225
226
227
                              Topic = topic,
                              Payload = payload,
                              ReceivedTime = DateTime.Now,
                              DeviceId = deviceId
                          }, "队列已满,消息被丢弃");
                      });
                      return;
de2bd2f9   “wangming”   项目初始化
228
                  }
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
229
230
231
232
233
234
235
236
237
238
239
240
241
                  
                  // 快速入队,避免阻塞MQTT接收
                  var messageInfo = new MqttMessageInfo
                  {
                      Topic = topic,
                      Payload = payload,
                      ReceivedTime = DateTime.Now,
                      DeviceId = deviceId
                  };
                  
                  _messageQueue.Enqueue(messageInfo);
                  Interlocked.Increment(ref _currentQueueSize);
                  Log.Debug($"MQTT消息已入队,Topic: {topic}, DeviceId: {deviceId}, 队列长度: {_currentQueueSize}");
de2bd2f9   “wangming”   项目初始化
242
243
244
              }
              catch (Exception ex)
              {
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
245
                  Log.Error($"消息入队异常: {ex.Message}");
de2bd2f9   “wangming”   项目初始化
246
247
248
              }
          });
  
a0ec3cdb   “wangming”   优化了MQTT连接
249
250
          // 初始化健康检查定时器
          _healthCheckTimer = new Timer(HealthCheckCallback, null, _healthCheckInterval, _healthCheckInterval);
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
251
252
253
254
255
256
257
258
259
          Log.Information("MQTT健康检查定时器已启动,每3分钟检查一次");
          
          // 初始化订阅监控定时器
          _subscriptionMonitorTimer = new Timer(SubscriptionMonitorCallback, null, _subscriptionMonitorInterval, _subscriptionMonitorInterval);
          Log.Information("MQTT订阅监控定时器已启动,每30秒监控一次");
          
          // 初始化消息处理定时器
          _messageProcessingTimer = new Timer(MessageProcessingCallback, null, _messageProcessingInterval, _messageProcessingInterval);
          Log.Information("MQTT消息处理定时器已启动,每100ms处理一次队列");
a0ec3cdb   “wangming”   优化了MQTT连接
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
      }
  
      /// <summary>
      /// 健康检查定时器回调方法
      /// </summary>
      private async void HealthCheckCallback(object state)
      {
          try
          {
              Log.Debug("执行定时MQTT健康检查...");
              await PerformHealthCheckAndRepairAsync();
          }
          catch (Exception ex)
          {
              Log.Error($"定时健康检查执行失败: {ex.Message}");
          }
de2bd2f9   “wangming”   项目初始化
276
277
278
      }
  
      /// <summary>
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
      /// 订阅监控定时器回调方法
      /// </summary>
      private async void SubscriptionMonitorCallback(object state)
      {
          try
          {
              Log.Debug("执行定时MQTT订阅监控...");
              await VerifySubscriptionAsync();
          }
          catch (Exception ex)
          {
              Log.Error($"订阅监控执行失败: {ex.Message}");
          }
      }
  
      /// <summary>
      /// 消息处理定时器回调方法
      /// </summary>
      private void MessageProcessingCallback(object state)
      {
          try
          {
              // 处理队列中的消息,最多同时处理5
              var tasks = new List<Task>();
              for (int i = 0; i < 5 && _currentQueueSize > 0; i++)
              {
                  if (_messageQueue.TryDequeue(out var messageInfo))
                  {
                      Interlocked.Decrement(ref _currentQueueSize);
                      tasks.Add(ProcessMessageAsync(messageInfo));
                  }
              }
              
              if (tasks.Count > 0)
              {
                  // 使用Task.Run避免阻塞定时器线程
                  Task.Run(async () =>
                  {
                      try
                      {
                          await Task.WhenAll(tasks);
                      }
                      catch (Exception ex)
                      {
                          Log.Error($"消息处理任务执行失败: {ex.Message}");
                      }
                  });
              }
          }
          catch (Exception ex)
          {
              Log.Error($"消息处理定时器执行失败: {ex.Message}");
          }
      }
  
      /// <summary>
      /// 异步处理单个消息
      /// </summary>
      /// <param name="messageInfo">消息信息</param>
      /// <returns></returns>
      private async Task ProcessMessageAsync(MqttMessageInfo messageInfo)
      {
          await _messageProcessingSemaphore.WaitAsync();
          try
          {
              Log.Information($"开始处理MQTT消息,Topic: {messageInfo.Topic}, DeviceId: {messageInfo.DeviceId}");
              
              // 验证消息格式
              if (string.IsNullOrWhiteSpace(messageInfo.Payload))
              {
                  Log.Warning($"消息内容为空,跳过处理: Topic: {messageInfo.Topic}, DeviceId: {messageInfo.DeviceId}");
                  return;
              }
              
              // 反序列化 JSON 内容为 MqttContent 实体
              MqttContent content = null;
              try
              {
                  content = JsonSerializer.Deserialize<MqttContent>(messageInfo.Payload);
              }
              catch (JsonException jsonEx)
              {
                  Log.Error($"JSON反序列化失败: {jsonEx.Message}, Payload: {messageInfo.Payload}");
                  // 即使JSON解析失败,也要记录到日志
                  await LogFailedMessageAsync(messageInfo, "JSON解析失败");
                  return;
              }
              
              if (content != null)
              {
                  UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity
                  {
                      Id = YitIdHelper.NextId().ToString(),
                      MessageType = "response",
                      Topic = messageInfo.Topic,
                      Payload = messageInfo.Payload,
                      DeviceId = messageInfo.DeviceId,
                      Processed = 1,
                      Status = "success",
                      CreatedAt = messageInfo.ReceivedTime,
                  };
                  
                  if (!messageInfo.Payload.Contains("\"status\":\"received\""))
                  {
                      // 判断两秒内,Payload一样的话,就不处理了
                      var twoSecondsAgo = DateTime.Now.AddSeconds(-2);
                      bool lastLog = false;
                      try
                      {
                          lastLog = await _db.Queryable<UavMqttMessageLogEntity>()
                              .Where(p => p.DeviceId == messageInfo.DeviceId && 
                                         p.Payload == messageInfo.Payload && 
                                         p.CreatedAt >= twoSecondsAgo)
                              .AnyAsync();
                      }
                      catch (Exception dbEx)
                      {
                          Log.Error($"数据库查询失败: {dbEx.Message}, DeviceId: {messageInfo.DeviceId}");
                          // 数据库查询失败时,为了安全起见,不处理消息
                          await LogFailedMessageAsync(messageInfo, "数据库查询失败");
                          return;
                      }
                      
                      if (!lastLog)
                      {
                          Log.Information($"[订阅消息] 处理新消息:设备 {messageInfo.DeviceId},内容:{messageInfo.Payload}");
                          try
                          {
                              var orderService = _serviceProvider.GetRequiredService<IUavOrderService>();
                              await Task.Run(() => orderService.HandleSubscribeMessage(messageInfo.Payload));
                          }
                          catch (Exception businessEx)
                          {
                              Log.Error($"业务逻辑处理失败: {businessEx.Message}, DeviceId: {messageInfo.DeviceId}");
                              logEntity.Status = "business_error";
                              logEntity.Processed = 0;
                          }
                      }
                      else
                      {
                          Log.Information($"[订阅消息] 接收到重复的消息,已忽略:设备 {messageInfo.DeviceId},内容:{messageInfo.Payload}");
                          logEntity.Status = "duplicate";
                      }
                      
                      // 保存消息到日志
                      try
                      {
                          await _db.Insertable(logEntity).ExecuteCommandAsync();
                      }
                      catch (Exception logEx)
                      {
                          Log.Error($"保存消息日志失败: {logEx.Message}, DeviceId: {messageInfo.DeviceId}");
                      }
                  }
                  else
                  {
                      Log.Debug($"[订阅消息] 接收到确认消息,不处理:设备 {messageInfo.DeviceId},内容:{messageInfo.Payload}");
                      // 保存确认消息到日志
                      try
                      {
                          await _db.Insertable(logEntity).ExecuteCommandAsync();
                      }
                      catch (Exception logEx)
                      {
                          Log.Error($"保存确认消息日志失败: {logEx.Message}, DeviceId: {messageInfo.DeviceId}");
                      }
                  }
              }
              else
              {
                  Log.Warning($"JSON反序列化结果为空: Payload: {messageInfo.Payload}");
                  await LogFailedMessageAsync(messageInfo, "JSON反序列化结果为空");
              }
              
              Log.Debug($"MQTT消息处理完成,Topic: {messageInfo.Topic}, DeviceId: {messageInfo.DeviceId}");
          }
          catch (Exception ex)
          {
              Log.Error($"处理MQTT消息异常: {ex.Message}, Topic: {messageInfo.Topic}, DeviceId: {messageInfo.DeviceId}");
              await LogFailedMessageAsync(messageInfo, $"处理异常: {ex.Message}");
          }
          finally
          {
              _messageProcessingSemaphore.Release();
          }
      }
      
      /// <summary>
      /// 记录失败的消息
      /// </summary>
      /// <param name="messageInfo">消息信息</param>
      /// <param name="errorReason">失败原因</param>
      /// <returns></returns>
      private async Task LogFailedMessageAsync(MqttMessageInfo messageInfo, string errorReason)
      {
          try
          {
              var logEntity = new UavMqttMessageLogEntity
              {
                  Id = YitIdHelper.NextId().ToString(),
                  MessageType = "response",
                  Topic = messageInfo.Topic,
                  Payload = messageInfo.Payload,
                  DeviceId = messageInfo.DeviceId,
                  Processed = 0,
                  Status = "failed",
                  CreatedAt = messageInfo.ReceivedTime,
              };
              
              await _db.Insertable(logEntity).ExecuteCommandAsync();
              Log.Information($"失败消息已记录: DeviceId: {messageInfo.DeviceId}, 原因: {errorReason}");
          }
          catch (Exception ex)
          {
              Log.Error($"记录失败消息时出错: {ex.Message}, DeviceId: {messageInfo.DeviceId}");
          }
      }
  
      /// <summary>
de2bd2f9   “wangming”   项目初始化
498
499
500
501
502
503
504
505
506
507
508
509
      /// 向指定 Topic 发布 MQTT 消息
      /// </summary>
      /// <param name="topic">主题路径,例如 device/{id}/command</param>
      /// <param name="payload">发送的 JSON 字符串内容</param>
      /// <returns>是否成功发送</returns>
      public async Task<bool> PublishAsync(string topic, string payload)
      {
          try
          {
              // 如果未连接则先连接
              if (!_mqttClient.IsConnected)
              {
62329717   “wangming”   修复了一些BUG
510
                  Log.Warning($"MQTT 未连接,尝试重新连接...");
de2bd2f9   “wangming”   项目初始化
511
                  await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
62329717   “wangming”   修复了一些BUG
512
513
514
515
516
517
518
519
520
521
522
523
524
525
                  
                  // 等待连接稳定
                  int retryCount = 0;
                  while (!_mqttClient.IsConnected && retryCount < 3)
                  {
                      await Task.Delay(1000);
                      retryCount++;
                  }
                  
                  if (!_mqttClient.IsConnected)
                  {
                      Log.Error("MQTT 连接失败,无法发送消息");
                      return false;
                  }
de2bd2f9   “wangming”   项目初始化
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
              }
  
              // 构建 MQTT 消息
              var message = new MqttApplicationMessageBuilder()
                  .WithTopic(topic)
                  .WithPayload(payload)
                  .WithAtLeastOnceQoS()     // QoS 1:至少送达一次
                  .WithRetainFlag(false)    // 不保留历史消息
                  .Build();
  
              // 异步发送
              await _mqttClient.PublishAsync(message);
              Log.Information($"MQTT 发布成功 -> Topic: {topic}, Payload: {payload}");
              var topicParts = topic.Split('/');
              UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity
              {
                      Id = YitIdHelper.NextId().ToString(),
                  MessageType = "send",
                  Topic = topic,
                  Payload = payload,
                  DeviceId = topicParts[1],
                  Processed = 1,
                  Status = "success",
                  CreatedAt = DateTime.Now,
              };
              _db.Insertable(logEntity).ExecuteCommand();
              return true;
          }
          catch (Exception ex)
          {
              Log.Error($"MQTT 发送失败:{ex.Message}");
62329717   “wangming”   修复了一些BUG
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
              
              // 记录失败的消息到数据库
              try
              {
                  var topicParts = topic.Split('/');
                  UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity
                  {
                      Id = YitIdHelper.NextId().ToString(),
                      MessageType = "send",
                      Topic = topic,
                      Payload = payload,
                      DeviceId = topicParts.Length > 1 ? topicParts[1] : "unknown",
                      Processed = 0,
                      Status = "failed",
                      CreatedAt = DateTime.Now,
                  };
                  _db.Insertable(logEntity).ExecuteCommand();
              }
              catch (Exception logEx)
              {
                  Log.Error($"记录失败消息到数据库时出错:{logEx.Message}");
              }
              
de2bd2f9   “wangming”   项目初始化
580
581
582
583
584
585
586
587
588
              return false;
          }
      }
  
      /// <summary>
      /// 手动启动 MQTT 客户端(推荐在程序启动时调用)
      /// </summary>
      public async Task StartAsync()
      {
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
589
          try
de2bd2f9   “wangming”   项目初始化
590
          {
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
              Log.Information("启动MQTT客户端...");
              
              if (!_mqttClient.IsConnected)
              {
                  await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
                  Log.Information("MQTT客户端连接成功");
              }
              
              // 确保订阅状态正常
              await EnsureSubscriptionAsync();
              
              // 注意:不立即标记为已验证,等待实际消息接收来验证订阅是否真正生效
              
              Log.Information("MQTT客户端启动完成,等待实际消息验证订阅状态");
          }
          catch (Exception ex)
          {
              Log.Error($"启动MQTT客户端失败: {ex.Message}");
              throw;
de2bd2f9   “wangming”   项目初始化
610
          }
a0ec3cdb   “wangming”   优化了MQTT连接
611
612
613
614
615
616
617
618
619
620
621
622
623
      }
      
      /// <summary>
      /// 确保订阅状态正常,如果订阅丢失则重新订阅
      /// </summary>
      private async Task EnsureSubscriptionAsync()
      {
          try
          {
              if (_mqttClient.IsConnected)
              {
                  // 检查订阅状态,如果订阅丢失则重新订阅
                  Log.Information("检查MQTT订阅状态...");
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
624
                  await _mqttClient.SubscribeAsync(SUBSCRIPTION_TOPIC);
a0ec3cdb   “wangming”   优化了MQTT连接
625
                  Log.Information("MQTT订阅状态正常: device/+/response");
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
626
627
                  
                  // 注意:不立即标记为已验证,等待实际消息接收来验证订阅是否真正生效
a0ec3cdb   “wangming”   优化了MQTT连接
628
629
630
631
632
              }
          }
          catch (Exception ex)
          {
              Log.Warning($"检查订阅状态时出错: {ex.Message}");
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
              // 订阅失败,尝试重新订阅
              await RetrySubscriptionAsync();
          }
      }
  
      /// <summary>
      /// 验证订阅是否真的生效
      /// </summary>
      private async Task VerifySubscriptionAsync()
      {
          try
          {
              lock (_subscriptionLock)
              {
                  // 检查是否长时间没有收到消息
                  var timeSinceLastMessage = DateTime.Now - _lastMessageReceivedTime;
                  if (timeSinceLastMessage.TotalMinutes > 2 && _subscriptionVerified)
                  {
                      Log.Warning($"订阅可能失效,已 {timeSinceLastMessage.TotalMinutes:F1} 分钟未收到消息");
                      _subscriptionVerified = false;
                  }
              }
  
              // 如果订阅未验证,且距离上次重试超过5分钟,才尝试重新订阅
              if (!_subscriptionVerified)
              {
                  var timeSinceLastRetry = DateTime.Now - _lastRetryTime;
                  if (timeSinceLastRetry.TotalMinutes >= RETRY_INTERVAL_MINUTES)
                  {
                      Log.Warning("订阅验证失败,尝试重新订阅...");
                      await RetrySubscriptionAsync();
                      _lastRetryTime = DateTime.Now;
                  }
                  else
                  {
                      Log.Debug($"订阅未验证,但距离上次重试仅 {timeSinceLastRetry.TotalMinutes:F1} 分钟,跳过重试");
                  }
              }
          }
          catch (Exception ex)
          {
              Log.Error($"验证订阅状态失败: {ex.Message}");
a0ec3cdb   “wangming”   优化了MQTT连接
675
          }
de2bd2f9   “wangming”   项目初始化
676
677
678
      }
  
      /// <summary>
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
      /// 重试订阅
      /// </summary>
      private async Task RetrySubscriptionAsync()
      {
          try
          {
              if (_mqttClient.IsConnected)
              {
                  Log.Information("尝试重新订阅...");
                  
                  // 先取消现有订阅
                  try
                  {
                      await _mqttClient.UnsubscribeAsync(SUBSCRIPTION_TOPIC);
                      Log.Information("已取消现有订阅");
                  }
                  catch (Exception ex)
                  {
                      Log.Warning($"取消订阅失败: {ex.Message}");
                  }
                  
                  // 重新订阅
                  await _mqttClient.SubscribeAsync(SUBSCRIPTION_TOPIC);
                  Log.Information("重新订阅成功");
                  
                  // 重置验证状态,等待实际消息接收来验证订阅是否真正生效
                  _subscriptionVerified = false;
                  _lastMessageReceivedTime = DateTime.Now;
                  
                  // 注意:不立即标记为已验证,等待实际消息接收来验证
              }
          }
          catch (Exception ex)
          {
              Log.Error($"重试订阅失败: {ex.Message}");
          }
      }
  
  
  
      /// <summary>
de2bd2f9   “wangming”   项目初始化
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
      /// 查询指定clientId是否在线(通过EMQX管理API,带缓存)
      /// </summary>
      /// <param name="clientId">要查询的客户端ID</param>
      /// <returns>在线返回true,不在线返回false</returns>
      public async Task<dynamic> IsClientOnlineAsync(string clientId)
      {
          if (string.IsNullOrEmpty(clientId))
          {
              Log.Warning("设备ID为空,无法查询在线状态");
              return false;
          }
  
          // 检查缓存 - 增加缓存时间到60秒,减少API调用
          lock (_cacheLock)
          {
              if (_onlineStatusCache.TryGetValue(clientId, out var cacheEntry))
              {
                  var (isOnline, lastCheck) = cacheEntry;
                  var cacheAge = DateTime.Now - lastCheck;
                  
                  // 如果缓存时间小于60秒,直接返回缓存结果
                  if (cacheAge.TotalSeconds < 60)
                  {
                      Log.Debug($"使用缓存结果:设备 {clientId} 在线状态 = {isOnline}");
                      return isOnline;
                  }
              }
          }
  
          // 使用静态HttpClient以提高性能
          using (var client = new HttpClient())
          {
              client.Timeout = TimeSpan.FromSeconds(3); // 减少超时时间到3
              
              // 使用你的EMQX管理账号密码
              var byteArray = Encoding.ASCII.GetBytes("285f888395fa6906:9BAdZeir9C3IVweof9Ate8HfQpn2u6GrH8Q9ArAF9Bb26tnI");
              client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(byteArray));
              
              // 减少重试次数,提高响应速度
              int maxRetries = 1;
              for (int retry = 0; retry <= maxRetries; retry++)
              {
                  try
                  {
                      // 查询指定客户端
                      var clientUrl = $"http://mqtt.cqjiangzhichao.cn/api/v5/clients/{clientId}";
                      var response = await client.GetAsync(clientUrl);
                      bool isOnline = false;
                      
                      if (response.StatusCode == HttpStatusCode.OK)
                      {
                          Log.Debug($"设备 {clientId} 在线状态查询成功");
                          isOnline = true;
                      }
                      else if (response.StatusCode == HttpStatusCode.NotFound)
                      {
                          Log.Debug($"设备 {clientId} 不在线");
                          isOnline = false;
                      }
                      else
                      {
                          Log.Warning($"查询设备 {clientId} 在线状态失败,状态码: {response.StatusCode},重试次数: {retry + 1}");
                          if (retry == maxRetries)
                          {
                              Log.Error($"查询设备 {clientId} 在线状态最终失败,状态码: {response.StatusCode}");
                              isOnline = false;
                          }
                          else
                          {
                              await Task.Delay(500); // 减少等待时间到500ms
                              continue;
                          }
                      }
                      
                      // 更新缓存
                      lock (_cacheLock)
                      {
                          _onlineStatusCache[clientId] = (isOnline, DateTime.Now);
                      }
                      return isOnline;
                  }
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
801
                  catch (TaskCanceledException)
de2bd2f9   “wangming”   项目初始化
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
                  {
                      Log.Warning($"查询设备 {clientId} 在线状态超时,重试次数: {retry + 1}");
                      if (retry == maxRetries)
                      {
                          Log.Error($"查询设备 {clientId} 在线状态最终超时");
                          // 更新缓存为false
                          lock (_cacheLock)
                          {
                              _onlineStatusCache[clientId] = (false, DateTime.Now);
                          }
                          return false;
                      }
                      await Task.Delay(500); // 减少等待时间到500ms
                  }
                  catch (Exception ex)
                  {
                      Log.Error($"查询设备 {clientId} 在线状态异常: {ex.Message},重试次数: {retry + 1}");
                      if (retry == maxRetries)
                      {
                          // 更新缓存为false
                          lock (_cacheLock)
                          {
                              _onlineStatusCache[clientId] = (false, DateTime.Now);
                          }
                          return false;
                      }
                      await Task.Delay(500); // 减少等待时间到500ms
                  }
              }
              
              // 更新缓存为false
              lock (_cacheLock)
              {
                  _onlineStatusCache[clientId] = (false, DateTime.Now);
              }
              return false;
          }
      }
  
      /// <summary>
      /// 批量查询设备在线状态(提高效率)
      /// </summary>
      /// <param name="clientIds">要查询的客户端ID列表</param>
      /// <returns>在线状态字典</returns>
      public async Task<Dictionary<string, bool>> BatchCheckOnlineStatusAsync(List<string> clientIds)
      {
          if (clientIds == null || clientIds.Count == 0)
          {
              return new Dictionary<string, bool>();
          }
  
          var result = new Dictionary<string, bool>();
          var needQueryIds = new List<string>();
  
          // 先检查缓存
          lock (_cacheLock)
          {
              foreach (var clientId in clientIds)
              {
                  if (_onlineStatusCache.TryGetValue(clientId, out var cacheEntry))
                  {
                      var (isOnline, lastCheck) = cacheEntry;
                      var cacheAge = DateTime.Now - lastCheck;
                      
                      // 如果缓存时间小于60秒,使用缓存结果
                      if (cacheAge.TotalSeconds < 60)
                      {
                          result[clientId] = isOnline;
                          continue;
                      }
                  }
                  needQueryIds.Add(clientId);
              }
          }
  
          // 如果没有需要查询的设备,直接返回缓存结果
          if (needQueryIds.Count == 0)
          {
              return result;
          }
  
          // 批量查询在线状态
          using (var client = new HttpClient())
          {
              client.Timeout = TimeSpan.FromSeconds(5);
              
              var byteArray = Encoding.ASCII.GetBytes("285f888395fa6906:9BAdZeir9C3IVweof9Ate8HfQpn2u6GrH8Q9ArAF9Bb26tnI");
              client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(byteArray));
  
              // 并行查询多个设备
              var tasks = needQueryIds.Select(async clientId =>
              {
                  try
                  {
                      var clientUrl = $"http://mqtt.cqjiangzhichao.cn/api/v5/clients/{clientId}";
                      var response = await client.GetAsync(clientUrl);
                      bool isOnline = response.StatusCode == HttpStatusCode.OK;
                      
                      // 更新缓存
                      lock (_cacheLock)
                      {
                          _onlineStatusCache[clientId] = (isOnline, DateTime.Now);
                      }
                      
                      return new { ClientId = clientId, IsOnline = isOnline };
                  }
                  catch (Exception ex)
                  {
                      Log.Warning($"批量查询设备 {clientId} 在线状态失败: {ex.Message}");
                      // 更新缓存为false
                      lock (_cacheLock)
                      {
                          _onlineStatusCache[clientId] = (false, DateTime.Now);
                      }
                      return new { ClientId = clientId, IsOnline = false };
                  }
              });
  
              var queryResults = await Task.WhenAll(tasks);
              
              // 合并结果
              foreach (var queryResult in queryResults)
              {
                  result[queryResult.ClientId] = queryResult.IsOnline;
              }
          }
  
          return result;
      }
  
      /// <summary>
      /// 清理过期的缓存数据(建议定期调用)
      /// </summary>
      public void CleanupExpiredCache()
      {
          lock (_cacheLock)
          {
              var now = DateTime.Now;
              var expiredKeys = new List<string>();
              
              foreach (var kvp in _onlineStatusCache)
              {
                  var (isOnline, lastCheck) = kvp.Value;
                  var cacheAge = now - lastCheck;
                  
                  // 如果缓存时间超过5分钟,标记为过期
                  if (cacheAge.TotalMinutes > 5)
                  {
                      expiredKeys.Add(kvp.Key);
                  }
              }
              
              // 删除过期的缓存
              foreach (var key in expiredKeys)
              {
                  _onlineStatusCache.Remove(key);
              }
              
              if (expiredKeys.Count > 0)
              {
                  Log.Information($"清理了 {expiredKeys.Count} 个过期的在线状态缓存");
              }
          }
      }
  
      #region 私有回调逻辑
  
62329717   “wangming”   修复了一些BUG
969
      #endregion
de2bd2f9   “wangming”   项目初始化
970
  
62329717   “wangming”   修复了一些BUG
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
      /// <summary>
      /// 检查MQTT连接健康状态
      /// </summary>
      /// <returns>连接状态信息</returns>
      public async Task<dynamic> GetConnectionHealthAsync()
      {
          try
          {
              if (_mqttClient.IsConnected)
              {
                  try
                  {
                      await _mqttClient.PingAsync(CancellationToken.None);
                      Log.Debug("MQTT Ping 成功");
                      return new
                      {
                          IsConnected = true,
                          ConnectionTime = DateTime.Now,
                          LastPing = DateTime.Now,
                          ClientId = _mqttOptions.ClientId,
                          BrokerAddress = "mqtt.cqjiangzhichao.cn:1883"
                      };
                  }
                  catch (Exception pingEx)
                  {
                      Log.Warning($"MQTT Ping 失败: {pingEx.Message}");
                      return new
                      {
                          IsConnected = false,
                          ConnectionTime = (DateTime?)null,
                          LastPing = DateTime.Now,
                          ClientId = _mqttOptions.ClientId,
                          BrokerAddress = "mqtt.cqjiangzhichao.cn:1883",
                          PingError = pingEx.Message
                      };
                  }
              }
              else
              {
                  return new
                  {
                      IsConnected = false,
                      ConnectionTime = (DateTime?)null,
                      LastPing = DateTime.Now,
                      ClientId = _mqttOptions.ClientId,
                      BrokerAddress = "mqtt.cqjiangzhichao.cn:1883"
                  };
              }
          }
          catch (Exception ex)
          {
              Log.Error($"获取连接健康状态失败: {ex.Message}");
              return new
              {
                  IsConnected = false,
                  Error = ex.Message,
                  Timestamp = DateTime.Now
              };
          }
      }
de2bd2f9   “wangming”   项目初始化
1031
  
62329717   “wangming”   修复了一些BUG
1032
1033
1034
1035
1036
1037
1038
1039
      /// <summary>
      /// 强制重新连接MQTT
      /// </summary>
      /// <returns>重连是否成功</returns>
      public async Task<bool> ForceReconnectAsync()
      {
          try
          {
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
1040
1041
              Log.Information("开始强制重连MQTT...");
              
62329717   “wangming”   修复了一些BUG
1042
1043
1044
1045
1046
1047
              if (_mqttClient.IsConnected)
              {
                  await _mqttClient.DisconnectAsync();
                  Log.Information("MQTT 主动断开连接");
              }
  
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
1048
1049
1050
1051
1052
1053
1054
              // 重置订阅状态
              lock (_subscriptionLock)
              {
                  _subscriptionVerified = false;
                  _lastMessageReceivedTime = DateTime.Now;
              }
              
62329717   “wangming”   修复了一些BUG
1055
1056
1057
1058
              await Task.Delay(2000); // 等待2秒后重连
              
              await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
              Log.Information("MQTT 强制重连成功");
a0ec3cdb   “wangming”   优化了MQTT连接
1059
              
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
1060
              // 重连后重新订阅并验证
a0ec3cdb   “wangming”   优化了MQTT连接
1061
              await EnsureSubscriptionAsync();
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
1062
1063
1064
1065
              
              // 注意:不立即标记为已验证,等待实际消息接收来验证订阅是否真正生效
              
              Log.Information("MQTT 强制重连完成,等待实际消息验证订阅状态");
62329717   “wangming”   修复了一些BUG
1066
1067
1068
1069
1070
1071
1072
1073
              return true;
          }
          catch (Exception ex)
          {
              Log.Error($"MQTT 强制重连失败: {ex.Message}");
              return false;
          }
      }
a0ec3cdb   “wangming”   优化了MQTT连接
1074
1075
  
      /// <summary>
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
1076
      /// 定期健康检查和自动修复(建议每3分钟调用一次)
a0ec3cdb   “wangming”   优化了MQTT连接
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
      /// </summary>
      /// <returns>修复是否成功</returns>
      public async Task<bool> PerformHealthCheckAndRepairAsync()
      {
          try
          {
              Log.Information("开始MQTT健康检查...");
              
              // 检查连接状态
              if (!_mqttClient.IsConnected)
              {
                  Log.Warning("MQTT连接已断开,尝试重新连接...");
                  await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
                  await EnsureSubscriptionAsync();
                  return true;
              }
              
              // 检查连接质量(发送Ping
              try
              {
                  await _mqttClient.PingAsync(CancellationToken.None);
                  Log.Debug("MQTT Ping 成功,连接质量良好");
              }
              catch (Exception pingEx)
              {
                  Log.Warning($"MQTT Ping 失败,连接可能有问题: {pingEx.Message}");
                  
                  // Ping失败,尝试重新连接
                  Log.Information("尝试重新建立连接...");
                  await _mqttClient.DisconnectAsync();
                  await Task.Delay(1000);
                  await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
                  await EnsureSubscriptionAsync();
                  return true;
              }
              
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
              // 检查订阅状态和消息接收情况
              lock (_subscriptionLock)
              {
                  var timeSinceLastMessage = DateTime.Now - _lastMessageReceivedTime;
                  if (timeSinceLastMessage.TotalMinutes > 10)
                  {
                      Log.Warning($"长时间未收到消息({timeSinceLastMessage.TotalMinutes:F1}分钟),订阅可能有问题");
                  }
              }
              
              // 验证订阅状态
              await VerifySubscriptionAsync();
              
a0ec3cdb   “wangming”   优化了MQTT连接
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
              // 检查订阅状态
              await EnsureSubscriptionAsync();
              
              Log.Information("MQTT健康检查完成,状态正常");
              return true;
          }
          catch (Exception ex)
          {
              Log.Error($"MQTT健康检查失败: {ex.Message}");
              return false;
          }
      }
  
      /// <summary>
      /// 获取当前订阅的主题列表
      /// </summary>
      /// <returns>订阅的主题列表</returns>
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
1143
      public Task<List<string>> GetSubscribedTopicsAsync()
a0ec3cdb   “wangming”   优化了MQTT连接
1144
1145
1146
1147
1148
      {
          try
          {
              // 注意:MQTTnet可能不直接提供获取订阅列表的方法
              // 这里返回我们已知的订阅主题
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
1149
              return Task.FromResult(new List<string> { SUBSCRIPTION_TOPIC });
a0ec3cdb   “wangming”   优化了MQTT连接
1150
1151
1152
1153
          }
          catch (Exception ex)
          {
              Log.Error($"获取订阅主题列表失败: {ex.Message}");
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
              return Task.FromResult(new List<string>());
          }
      }
  
      /// <summary>
      /// 获取订阅状态信息
      /// </summary>
      /// <returns>订阅状态信息</returns>
      public dynamic GetSubscriptionStatus()
      {
          try
          {
              lock (_subscriptionLock)
              {
                  var timeSinceLastMessage = DateTime.Now - _lastMessageReceivedTime;
                  return new
                  {
                      IsConnected = _mqttClient.IsConnected,
                      SubscriptionVerified = _subscriptionVerified,
                      LastMessageReceived = _lastMessageReceivedTime,
                      TimeSinceLastMessage = $"{timeSinceLastMessage.TotalMinutes:F1} 分钟",
                      SubscriptionTopic = SUBSCRIPTION_TOPIC,
                      IsHealthy = timeSinceLastMessage.TotalMinutes < 10,
                      QueueSize = _currentQueueSize,
                      MaxQueueSize = _maxQueueSize
                  };
              }
          }
          catch (Exception ex)
          {
              Log.Error($"获取订阅状态失败: {ex.Message}");
              return new { Error = ex.Message };
          }
      }
      
      /// <summary>
      /// 检查连接稳定性
      /// </summary>
      /// <returns>连接稳定性信息</returns>
      public async Task<dynamic> CheckConnectionStabilityAsync()
      {
          try
          {
              // 如果连接正常,进行Ping测试
              if (_mqttClient.IsConnected)
              {
                  try
                  {
                      var pingStartTime = DateTime.Now;
                      await _mqttClient.PingAsync(CancellationToken.None);
                      var pingDuration = DateTime.Now - pingStartTime;
                      
                      Log.Information($"连接稳定性检查通过,Ping耗时: {pingDuration.TotalMilliseconds:F2}ms");
                      
                      return new
                      {
                          IsConnected = true,
                          PingSuccess = true,
                          PingDuration = pingDuration.TotalMilliseconds,
                          ConnectionTime = DateTime.Now,
                          LastMessageReceived = _lastMessageReceivedTime,
                          SubscriptionVerified = _subscriptionVerified,
                          QueueSize = _currentQueueSize,
                          MaxQueueSize = _maxQueueSize,
                          HealthCheckInterval = _healthCheckInterval,
                          SubscriptionMonitorInterval = _subscriptionMonitorInterval,
                          MessageProcessingInterval = _messageProcessingInterval
                      };
                  }
                  catch (Exception pingEx)
                  {
                      Log.Warning($"连接稳定性检查失败,Ping失败: {pingEx.Message}");
                      return new
                      {
                          IsConnected = false,
                          PingSuccess = false,
                          PingError = pingEx.Message,
                          ConnectionTime = DateTime.Now,
                          LastMessageReceived = _lastMessageReceivedTime,
                          SubscriptionVerified = _subscriptionVerified,
                          QueueSize = _currentQueueSize,
                          MaxQueueSize = _maxQueueSize,
                          HealthCheckInterval = _healthCheckInterval,
                          SubscriptionMonitorInterval = _subscriptionMonitorInterval,
                          MessageProcessingInterval = _messageProcessingInterval
                      };
                  }
              }
              else
              {
                  Log.Warning("连接稳定性检查失败,MQTT客户端未连接");
                  return new
                  {
                      IsConnected = false,
                      ConnectionTime = DateTime.Now,
                      LastMessageReceived = _lastMessageReceivedTime,
                      SubscriptionVerified = _subscriptionVerified,
                      QueueSize = _currentQueueSize,
                      MaxQueueSize = _maxQueueSize,
                      HealthCheckInterval = _healthCheckInterval,
                      SubscriptionMonitorInterval = _subscriptionMonitorInterval,
                      MessageProcessingInterval = _messageProcessingInterval
                  };
              }
          }
          catch (Exception ex)
          {
              Log.Error($"检查连接稳定性时发生异常: {ex.Message}");
              return new
              {
                  IsConnected = false,
                  Error = ex.Message,
                  ConnectionTime = DateTime.Now
              };
          }
      }
      
      /// <summary>
      /// 测试数据格式兼容性
      /// </summary>
      /// <param name="testPayload">测试数据</param>
      /// <returns>测试结果</returns>
      public dynamic TestDataFormatCompatibility(string testPayload)
      {
          try
          {
              Log.Information($"开始测试数据格式兼容性: {testPayload}");
              
              // 测试JSON反序列化
              MqttContent content = null;
              try
              {
                  content = JsonSerializer.Deserialize<MqttContent>(testPayload);
                  Log.Information("JSON反序列化成功");
              }
              catch (JsonException jsonEx)
              {
                  Log.Error($"JSON反序列化失败: {jsonEx.Message}");
                  return new
                  {
                      Success = false,
                      Error = "JSON反序列化失败",
                      Details = jsonEx.Message,
                      TestPayload = testPayload
                  };
              }
              
              if (content == null)
              {
                  return new
                  {
                      Success = false,
                      Error = "JSON反序列化结果为空",
                      TestPayload = testPayload
                  };
              }
              
              // 检查关键字段
              var fieldChecks = new Dictionary<string, object>
              {
                  ["id"] = content.id,
                  ["lane"] = content.lane,
                  ["rfid_1"] = content.rfid_1,
                  ["rfid_2"] = content.rfid_2,
                  ["batteryCapacity"] = content.batteryCapacity,
                  ["isOpenSuccess"] = content.isOpenSuccess,
                  ["isUav"] = content.isUav,
                  ["uavcode"] = content.uavcode
              };
              
              Log.Information($"数据格式兼容性测试成功,字段值: {string.Join(", ", fieldChecks.Select(kv => $"{kv.Key}={kv.Value}"))}");
              
              return new
              {
                  Success = true,
                  Message = "数据格式兼容性测试通过",
                  DeserializedContent = content,
                  FieldChecks = fieldChecks,
                  TestPayload = testPayload
              };
          }
          catch (Exception ex)
          {
              Log.Error($"数据格式兼容性测试失败: {ex.Message}");
              return new
              {
                  Success = false,
                  Error = "测试过程中发生异常",
                  Details = ex.Message,
                  TestPayload = testPayload
              };
a0ec3cdb   “wangming”   优化了MQTT连接
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
          }
      }
  
      /// <summary>
      /// 释放资源
      /// </summary>
      public void Dispose()
      {
          try
          {
              _healthCheckTimer?.Dispose();
52443042   “wangming”   新增MQTT消息处理功能,优化订阅...
1356
1357
1358
1359
              _subscriptionMonitorTimer?.Dispose();
              _messageProcessingTimer?.Dispose();
              _messageProcessingSemaphore?.Dispose();
              Log.Information("MQTT定时器和信号量已释放");
a0ec3cdb   “wangming”   优化了MQTT连接
1360
1361
1362
1363
1364
1365
          }
          catch (Exception ex)
          {
              Log.Error($"释放MQTT资源时出错: {ex.Message}");
          }
      }
de2bd2f9   “wangming”   项目初始化
1366
  }