de2bd2f9
“wangming”
项目初始化
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
using System;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Aliyun.Acs.Core.Logging;
using Microsoft.Extensions.DependencyInjection;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using NCC.Dependency;
using NCC.Extend.Entitys;
using NCC.Extend.Interfaces.MqttPublisher;
using NCC.Extend.Interfaces.UavOrder;
using Serilog;
using SqlSugar;
using Yitter.IdGenerator;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Net;
using System.Linq;
namespace NCC.Extend;
/// <summary>
/// MQTT 发布与订阅统一服务
/// </summary>
|
a0ec3cdb
“wangming”
优化了MQTT连接
|
29
|
public class MqttPublisherService : IMqttPublisherService, ITransient, IDisposable
|
de2bd2f9
“wangming”
项目初始化
|
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
{
/// <summary>
/// MQTT 客户端对象(用于连接、发布、订阅等操作)
/// </summary>
private readonly IMqttClient _mqttClient;
/// <summary>
/// MQTT 连接参数配置
/// </summary>
private readonly IMqttClientOptions _mqttOptions;
private readonly IServiceProvider _serviceProvider;
private readonly ISqlSugarClient _db;
private readonly Dictionary<string, (bool isOnline, DateTime lastCheck)> _onlineStatusCache = new Dictionary<string, (bool, DateTime)>();
private readonly object _cacheLock = new object();
|
a0ec3cdb
“wangming”
优化了MQTT连接
|
44
45
|
private readonly Timer _healthCheckTimer; // 健康检查定时器
private readonly int _healthCheckInterval = 5 * 60 * 1000; // 5分钟检查一次
|
de2bd2f9
“wangming”
项目初始化
|
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
/// <summary>
/// 构造函数:初始化客户端和配置、注册事件
/// </summary>
public MqttPublisherService(IServiceProvider serviceProvider, ISqlSugarClient db)
{
_serviceProvider = serviceProvider;
_db = db;
// 创建 MQTT 客户端实例
var factory = new MqttFactory();
_mqttClient = factory.CreateMqttClient();
// 构建连接配置(MQTT 服务器地址、端口、用户名密码、客户端 ID)
_mqttOptions = new MqttClientOptionsBuilder()
.WithTcpServer("mqtt.cqjiangzhichao.cn", 1883) // Broker 地址
.WithCredentials("wrjservice", "P@ssw0rd") // 账号密码
.WithClientId("server_publisher") // 客户端 ID,必须唯一
.Build();
// 连接成功事件:订阅所有设备的响应主题(如 device/xxx/response)
_mqttClient.UseConnectedHandler(async e =>
{
Log.Information("MQTT 已连接成功");
// 订阅所有设备的响应主题(+ 代表通配符)
await _mqttClient.SubscribeAsync("device/+/response");
Log.Information("已订阅通用响应 topic: device/+/response");
});
// 连接断开事件
|
62329717
“wangming”
修复了一些BUG
|
74
|
_mqttClient.UseDisconnectedHandler(async e =>
|
de2bd2f9
“wangming”
项目初始化
|
75
76
77
|
{
Log.Warning("MQTT 已断开连接");
int retryInterval = 5; // 秒
|
62329717
“wangming”
修复了一些BUG
|
78
79
80
81
82
83
84
85
|
while (!_mqttClient.IsConnected)
{
try
{
Log.Information($"尝试在 {retryInterval} 秒后重新连接 MQTT...");
await Task.Delay(TimeSpan.FromSeconds(retryInterval));
await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
Log.Information("MQTT 重连成功");
|
a0ec3cdb
“wangming”
优化了MQTT连接
|
86
87
88
89
90
91
92
93
94
95
96
97
|
// 重连成功后重新订阅主题
try
{
await _mqttClient.SubscribeAsync("device/+/response");
Log.Information("重连后重新订阅主题成功: device/+/response");
}
catch (Exception subEx)
{
Log.Error($"重连后重新订阅主题失败: {subEx.Message}");
}
|
62329717
“wangming”
修复了一些BUG
|
98
99
100
101
102
103
104
105
106
|
break; // 重连成功后跳出循环
}
catch (Exception ex)
{
Log.Error($"MQTT 重连失败: {ex.Message}");
// 重连失败后继续循环,但增加延迟避免频繁重试
await Task.Delay(TimeSpan.FromSeconds(retryInterval * 2));
}
}
|
de2bd2f9
“wangming”
项目初始化
|
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
});
// 收到消息事件:处理设备回传的消息
_mqttClient.UseApplicationMessageReceivedHandler(e =>
{
try
{
// 获取 topic 和 payload 内容
var topic = e.ApplicationMessage.Topic;
var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? Array.Empty<byte>());
Log.Information($"收到 MQTT 消息,Topic: {topic}, Payload: {payload}");
// 反序列化 JSON 内容为 MqttContent 实体
var content = JsonSerializer.Deserialize<MqttContent>(payload);
if (content != null)
{
// 从 topic 中提取设备 ID,例如 device/abc123/response
var topicParts = topic.Split('/');
var deviceId = topicParts.Length >= 2 ? topicParts[1] : "unknown";
Log.Information($"{topicParts[0]}: {deviceId})");
UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity
{
Id = YitIdHelper.NextId().ToString(),
MessageType = "response",
Topic = topic,
Payload = payload,
DeviceId = deviceId,
Processed = 1,
Status = "success",
CreatedAt = DateTime.Now,
};
if (!payload.Contains("\"status\":\"received\""))
{
//判断两秒内,Payload一样的话,就不处理了
var twoSecondsAgo = DateTime.Now.AddSeconds(-2);
var lastLog = _db.Queryable<UavMqttMessageLogEntity>().Where(p => p.DeviceId == deviceId && p.Payload == payload && p.CreatedAt >= twoSecondsAgo).Any();
if (lastLog == false)
{
|
62329717
“wangming”
修复了一些BUG
|
143
|
Log.Information($"[订阅消息] 处理新消息:设备 {deviceId},内容:{payload}");
|
de2bd2f9
“wangming”
项目初始化
|
144
145
146
147
148
|
var orderService = _serviceProvider.GetRequiredService<IUavOrderService>();
orderService.HandleSubscribeMessage(payload);
}
else
{
|
62329717
“wangming”
修复了一些BUG
|
149
|
Log.Information($"[订阅消息] 接收到重复的消息,已忽略:设备 {deviceId},内容:{payload}");
|
de2bd2f9
“wangming”
项目初始化
|
150
151
152
153
|
}
//只保存正常消息
_db.Insertable(logEntity).ExecuteCommand();
}
|
62329717
“wangming”
修复了一些BUG
|
154
155
156
157
158
159
|
else
{
Log.Debug($"[订阅消息] 接收到确认消息,不处理:设备 {deviceId},内容:{payload}");
// 保存确认消息到日志
_db.Insertable(logEntity).ExecuteCommand();
}
|
de2bd2f9
“wangming”
项目初始化
|
160
161
162
163
164
165
166
167
|
}
}
catch (Exception ex)
{
Log.Error($"消息处理异常: {ex.Message}");
}
});
|
a0ec3cdb
“wangming”
优化了MQTT连接
|
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
|
// 初始化健康检查定时器
_healthCheckTimer = new Timer(HealthCheckCallback, null, _healthCheckInterval, _healthCheckInterval);
Log.Information("MQTT健康检查定时器已启动,每5分钟检查一次");
}
/// <summary>
/// 健康检查定时器回调方法
/// </summary>
private async void HealthCheckCallback(object state)
{
try
{
Log.Debug("执行定时MQTT健康检查...");
await PerformHealthCheckAndRepairAsync();
}
catch (Exception ex)
{
Log.Error($"定时健康检查执行失败: {ex.Message}");
}
|
de2bd2f9
“wangming”
项目初始化
|
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
|
}
/// <summary>
/// 向指定 Topic 发布 MQTT 消息
/// </summary>
/// <param name="topic">主题路径,例如 device/{id}/command</param>
/// <param name="payload">发送的 JSON 字符串内容</param>
/// <returns>是否成功发送</returns>
public async Task<bool> PublishAsync(string topic, string payload)
{
try
{
// 如果未连接则先连接
if (!_mqttClient.IsConnected)
{
|
62329717
“wangming”
修复了一些BUG
|
202
|
Log.Warning($"MQTT 未连接,尝试重新连接...");
|
de2bd2f9
“wangming”
项目初始化
|
203
|
await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
|
62329717
“wangming”
修复了一些BUG
|
204
205
206
207
208
209
210
211
212
213
214
215
216
217
|
// 等待连接稳定
int retryCount = 0;
while (!_mqttClient.IsConnected && retryCount < 3)
{
await Task.Delay(1000);
retryCount++;
}
if (!_mqttClient.IsConnected)
{
Log.Error("MQTT 连接失败,无法发送消息");
return false;
}
|
de2bd2f9
“wangming”
项目初始化
|
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
|
}
// 构建 MQTT 消息
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithAtLeastOnceQoS() // QoS 1:至少送达一次
.WithRetainFlag(false) // 不保留历史消息
.Build();
// 异步发送
await _mqttClient.PublishAsync(message);
Log.Information($"MQTT 发布成功 -> Topic: {topic}, Payload: {payload}");
var topicParts = topic.Split('/');
UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity
{
Id = YitIdHelper.NextId().ToString(),
MessageType = "send",
Topic = topic,
Payload = payload,
DeviceId = topicParts[1],
Processed = 1,
Status = "success",
CreatedAt = DateTime.Now,
};
_db.Insertable(logEntity).ExecuteCommand();
return true;
}
catch (Exception ex)
{
Log.Error($"MQTT 发送失败:{ex.Message}");
|
62329717
“wangming”
修复了一些BUG
|
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
|
// 记录失败的消息到数据库
try
{
var topicParts = topic.Split('/');
UavMqttMessageLogEntity logEntity = new UavMqttMessageLogEntity
{
Id = YitIdHelper.NextId().ToString(),
MessageType = "send",
Topic = topic,
Payload = payload,
DeviceId = topicParts.Length > 1 ? topicParts[1] : "unknown",
Processed = 0,
Status = "failed",
CreatedAt = DateTime.Now,
};
_db.Insertable(logEntity).ExecuteCommand();
}
catch (Exception logEx)
{
Log.Error($"记录失败消息到数据库时出错:{logEx.Message}");
}
|
de2bd2f9
“wangming”
项目初始化
|
272
273
274
275
276
277
278
279
280
281
282
283
284
|
return false;
}
}
/// <summary>
/// 手动启动 MQTT 客户端(推荐在程序启动时调用)
/// </summary>
public async Task StartAsync()
{
if (!_mqttClient.IsConnected)
{
await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
}
|
a0ec3cdb
“wangming”
优化了MQTT连接
|
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
|
// 确保订阅状态正常
await EnsureSubscriptionAsync();
}
/// <summary>
/// 确保订阅状态正常,如果订阅丢失则重新订阅
/// </summary>
private async Task EnsureSubscriptionAsync()
{
try
{
if (_mqttClient.IsConnected)
{
// 检查订阅状态,如果订阅丢失则重新订阅
Log.Information("检查MQTT订阅状态...");
await _mqttClient.SubscribeAsync("device/+/response");
Log.Information("MQTT订阅状态正常: device/+/response");
}
}
catch (Exception ex)
{
Log.Warning($"检查订阅状态时出错: {ex.Message}");
}
|
de2bd2f9
“wangming”
项目初始化
|
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
|
}
/// <summary>
/// 查询指定clientId是否在线(通过EMQX管理API,带缓存)
/// </summary>
/// <param name="clientId">要查询的客户端ID</param>
/// <returns>在线返回true,不在线返回false</returns>
public async Task<dynamic> IsClientOnlineAsync(string clientId)
{
if (string.IsNullOrEmpty(clientId))
{
Log.Warning("设备ID为空,无法查询在线状态");
return false;
}
// 检查缓存 - 增加缓存时间到60秒,减少API调用
lock (_cacheLock)
{
if (_onlineStatusCache.TryGetValue(clientId, out var cacheEntry))
{
var (isOnline, lastCheck) = cacheEntry;
var cacheAge = DateTime.Now - lastCheck;
// 如果缓存时间小于60秒,直接返回缓存结果
if (cacheAge.TotalSeconds < 60)
{
Log.Debug($"使用缓存结果:设备 {clientId} 在线状态 = {isOnline}");
return isOnline;
}
}
}
// 使用静态HttpClient以提高性能
using (var client = new HttpClient())
{
client.Timeout = TimeSpan.FromSeconds(3); // 减少超时时间到3秒
// 使用你的EMQX管理账号密码
var byteArray = Encoding.ASCII.GetBytes("285f888395fa6906:9BAdZeir9C3IVweof9Ate8HfQpn2u6GrH8Q9ArAF9Bb26tnI");
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(byteArray));
// 减少重试次数,提高响应速度
int maxRetries = 1;
for (int retry = 0; retry <= maxRetries; retry++)
{
try
{
// 查询指定客户端
var clientUrl = $"http://mqtt.cqjiangzhichao.cn/api/v5/clients/{clientId}";
var response = await client.GetAsync(clientUrl);
bool isOnline = false;
if (response.StatusCode == HttpStatusCode.OK)
{
Log.Debug($"设备 {clientId} 在线状态查询成功");
isOnline = true;
}
else if (response.StatusCode == HttpStatusCode.NotFound)
{
Log.Debug($"设备 {clientId} 不在线");
isOnline = false;
}
else
{
Log.Warning($"查询设备 {clientId} 在线状态失败,状态码: {response.StatusCode},重试次数: {retry + 1}");
if (retry == maxRetries)
{
Log.Error($"查询设备 {clientId} 在线状态最终失败,状态码: {response.StatusCode}");
isOnline = false;
}
else
{
await Task.Delay(500); // 减少等待时间到500ms
continue;
}
}
// 更新缓存
lock (_cacheLock)
{
_onlineStatusCache[clientId] = (isOnline, DateTime.Now);
}
return isOnline;
}
catch (TaskCanceledException ex)
{
Log.Warning($"查询设备 {clientId} 在线状态超时,重试次数: {retry + 1}");
if (retry == maxRetries)
{
Log.Error($"查询设备 {clientId} 在线状态最终超时");
// 更新缓存为false
lock (_cacheLock)
{
_onlineStatusCache[clientId] = (false, DateTime.Now);
}
return false;
}
await Task.Delay(500); // 减少等待时间到500ms
}
catch (Exception ex)
{
Log.Error($"查询设备 {clientId} 在线状态异常: {ex.Message},重试次数: {retry + 1}");
if (retry == maxRetries)
{
// 更新缓存为false
lock (_cacheLock)
{
_onlineStatusCache[clientId] = (false, DateTime.Now);
}
return false;
}
await Task.Delay(500); // 减少等待时间到500ms
}
}
// 更新缓存为false
lock (_cacheLock)
{
_onlineStatusCache[clientId] = (false, DateTime.Now);
}
return false;
}
}
/// <summary>
/// 批量查询设备在线状态(提高效率)
/// </summary>
/// <param name="clientIds">要查询的客户端ID列表</param>
/// <returns>在线状态字典</returns>
public async Task<Dictionary<string, bool>> BatchCheckOnlineStatusAsync(List<string> clientIds)
{
if (clientIds == null || clientIds.Count == 0)
{
return new Dictionary<string, bool>();
}
var result = new Dictionary<string, bool>();
var needQueryIds = new List<string>();
// 先检查缓存
lock (_cacheLock)
{
foreach (var clientId in clientIds)
{
if (_onlineStatusCache.TryGetValue(clientId, out var cacheEntry))
{
var (isOnline, lastCheck) = cacheEntry;
var cacheAge = DateTime.Now - lastCheck;
// 如果缓存时间小于60秒,使用缓存结果
if (cacheAge.TotalSeconds < 60)
{
result[clientId] = isOnline;
continue;
}
}
needQueryIds.Add(clientId);
}
}
// 如果没有需要查询的设备,直接返回缓存结果
if (needQueryIds.Count == 0)
{
return result;
}
// 批量查询在线状态
using (var client = new HttpClient())
{
client.Timeout = TimeSpan.FromSeconds(5);
var byteArray = Encoding.ASCII.GetBytes("285f888395fa6906:9BAdZeir9C3IVweof9Ate8HfQpn2u6GrH8Q9ArAF9Bb26tnI");
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(byteArray));
// 并行查询多个设备
var tasks = needQueryIds.Select(async clientId =>
{
try
{
var clientUrl = $"http://mqtt.cqjiangzhichao.cn/api/v5/clients/{clientId}";
var response = await client.GetAsync(clientUrl);
bool isOnline = response.StatusCode == HttpStatusCode.OK;
// 更新缓存
lock (_cacheLock)
{
_onlineStatusCache[clientId] = (isOnline, DateTime.Now);
}
return new { ClientId = clientId, IsOnline = isOnline };
}
catch (Exception ex)
{
Log.Warning($"批量查询设备 {clientId} 在线状态失败: {ex.Message}");
// 更新缓存为false
lock (_cacheLock)
{
_onlineStatusCache[clientId] = (false, DateTime.Now);
}
return new { ClientId = clientId, IsOnline = false };
}
});
var queryResults = await Task.WhenAll(tasks);
// 合并结果
foreach (var queryResult in queryResults)
{
result[queryResult.ClientId] = queryResult.IsOnline;
}
}
return result;
}
/// <summary>
/// 清理过期的缓存数据(建议定期调用)
/// </summary>
public void CleanupExpiredCache()
{
lock (_cacheLock)
{
var now = DateTime.Now;
var expiredKeys = new List<string>();
foreach (var kvp in _onlineStatusCache)
{
var (isOnline, lastCheck) = kvp.Value;
var cacheAge = now - lastCheck;
// 如果缓存时间超过5分钟,标记为过期
if (cacheAge.TotalMinutes > 5)
{
expiredKeys.Add(kvp.Key);
}
}
// 删除过期的缓存
foreach (var key in expiredKeys)
{
_onlineStatusCache.Remove(key);
}
if (expiredKeys.Count > 0)
{
Log.Information($"清理了 {expiredKeys.Count} 个过期的在线状态缓存");
}
}
}
#region 私有回调逻辑
|
62329717
“wangming”
修复了一些BUG
|
561
|
#endregion
|
de2bd2f9
“wangming”
项目初始化
|
562
|
|
62329717
“wangming”
修复了一些BUG
|
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
|
/// <summary>
/// 检查MQTT连接健康状态
/// </summary>
/// <returns>连接状态信息</returns>
public async Task<dynamic> GetConnectionHealthAsync()
{
try
{
if (_mqttClient.IsConnected)
{
try
{
await _mqttClient.PingAsync(CancellationToken.None);
Log.Debug("MQTT Ping 成功");
return new
{
IsConnected = true,
ConnectionTime = DateTime.Now,
LastPing = DateTime.Now,
ClientId = _mqttOptions.ClientId,
BrokerAddress = "mqtt.cqjiangzhichao.cn:1883"
};
}
catch (Exception pingEx)
{
Log.Warning($"MQTT Ping 失败: {pingEx.Message}");
return new
{
IsConnected = false,
ConnectionTime = (DateTime?)null,
LastPing = DateTime.Now,
ClientId = _mqttOptions.ClientId,
BrokerAddress = "mqtt.cqjiangzhichao.cn:1883",
PingError = pingEx.Message
};
}
}
else
{
return new
{
IsConnected = false,
ConnectionTime = (DateTime?)null,
LastPing = DateTime.Now,
ClientId = _mqttOptions.ClientId,
BrokerAddress = "mqtt.cqjiangzhichao.cn:1883"
};
}
}
catch (Exception ex)
{
Log.Error($"获取连接健康状态失败: {ex.Message}");
return new
{
IsConnected = false,
Error = ex.Message,
Timestamp = DateTime.Now
};
}
}
|
de2bd2f9
“wangming”
项目初始化
|
623
|
|
62329717
“wangming”
修复了一些BUG
|
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
|
/// <summary>
/// 强制重新连接MQTT
/// </summary>
/// <returns>重连是否成功</returns>
public async Task<bool> ForceReconnectAsync()
{
try
{
if (_mqttClient.IsConnected)
{
await _mqttClient.DisconnectAsync();
Log.Information("MQTT 主动断开连接");
}
await Task.Delay(2000); // 等待2秒后重连
await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
Log.Information("MQTT 强制重连成功");
|
a0ec3cdb
“wangming”
优化了MQTT连接
|
642
643
644
|
// 重连后重新订阅
await EnsureSubscriptionAsync();
|
62329717
“wangming”
修复了一些BUG
|
645
646
647
648
649
650
651
652
|
return true;
}
catch (Exception ex)
{
Log.Error($"MQTT 强制重连失败: {ex.Message}");
return false;
}
}
|
a0ec3cdb
“wangming”
优化了MQTT连接
|
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
|
/// <summary>
/// 定期健康检查和自动修复(建议每5分钟调用一次)
/// </summary>
/// <returns>修复是否成功</returns>
public async Task<bool> PerformHealthCheckAndRepairAsync()
{
try
{
Log.Information("开始MQTT健康检查...");
// 检查连接状态
if (!_mqttClient.IsConnected)
{
Log.Warning("MQTT连接已断开,尝试重新连接...");
await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
await EnsureSubscriptionAsync();
return true;
}
// 检查连接质量(发送Ping)
try
{
await _mqttClient.PingAsync(CancellationToken.None);
Log.Debug("MQTT Ping 成功,连接质量良好");
}
catch (Exception pingEx)
{
Log.Warning($"MQTT Ping 失败,连接可能有问题: {pingEx.Message}");
// Ping失败,尝试重新连接
Log.Information("尝试重新建立连接...");
await _mqttClient.DisconnectAsync();
await Task.Delay(1000);
await _mqttClient.ConnectAsync(_mqttOptions, CancellationToken.None);
await EnsureSubscriptionAsync();
return true;
}
// 检查订阅状态
await EnsureSubscriptionAsync();
Log.Information("MQTT健康检查完成,状态正常");
return true;
}
catch (Exception ex)
{
Log.Error($"MQTT健康检查失败: {ex.Message}");
return false;
}
}
/// <summary>
/// 获取当前订阅的主题列表
/// </summary>
/// <returns>订阅的主题列表</returns>
public async Task<List<string>> GetSubscribedTopicsAsync()
{
try
{
// 注意:MQTTnet可能不直接提供获取订阅列表的方法
// 这里返回我们已知的订阅主题
return new List<string> { "device/+/response" };
}
catch (Exception ex)
{
Log.Error($"获取订阅主题列表失败: {ex.Message}");
return new List<string>();
}
}
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
try
{
_healthCheckTimer?.Dispose();
Log.Information("MQTT健康检查定时器已释放");
}
catch (Exception ex)
{
Log.Error($"释放MQTT资源时出错: {ex.Message}");
}
}
|
de2bd2f9
“wangming”
项目初始化
|
739
|
}
|