我有一个示例 MQTTNet 服务器,我想将其部署为 docker 容器。下面是作为 Net Core Web App (net core 2.2) 一部分的服务器代码,我将在命令行中手动构建并部署到 Docker:
我正在使用 net core 2.2,mqttnet 3.0.8。
“docker build -t mqttwebservice 。” “docker run -d -p 32780:80 -p 1883:1883 --name myapp2 mqttwebservice”
部署成功。但是,我无法连接到 MQTT 服务器 (MqttCommunicationTimedOutException)。
1)什么会阻止mqtt客户端与服务器通信?
2) 如何知道服务器是否正确启动?
DockerFile
FROM mcr.microsoft.com/dotnet/core/aspnet:3.1-buster-slim AS base
WORKDIR /app
EXPOSE 80
EXPOSE 1883
FROM mcr.microsoft.com/dotnet/core/sdk:3.1-buster AS build
WORKDIR /src
COPY ["MQTTWebService.csproj", "MQTTWebService/"]
RUN dotnet restore "MQTTWebService/MQTTWebService.csproj"
COPY . .
WORKDIR "/src/MQTTWebService"
COPY . .
RUN dotnet build "MQTTWebService.csproj" -c Release -o /app/build
FROM build AS publish
RUN dotnet publish "MQTTWebService.csproj" -c Release -o /app/publish
FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "MQTTWebService.dll"]
namespace MQTTWebService
{
using System;
using System.Net;
using System.Security.Authentication;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using MQTTnet;
using MQTTnet.Server;
public class MQTTService : BackgroundService
{
private static ILogger<MQTTService> mqttServiceLogger;
private MQTTConfiguration mqttConfiguration;
private IMqttServerOptions mqttServerOptions;
private IMqttServer mqttServer;
/// <summary>
/// TODO: Implement client connection validator here
/// </summary>
private Action<MqttConnectionValidatorContext> MQTTConnectionValidator = (c) =>
{
LogMessage(mqttServiceLogger, c);
};
/// <summary>
/// Constructor
/// </summary>
/// <param name="logger"></param>
public MQTTService(ILogger<MQTTService> logger)
{
mqttServiceLogger = logger;
this.mqttConfiguration = new MQTTConfiguration
{
BrokerHostName = "127.0.0.1",
BrokerPort = 1883,
MqttSslProtocol = SslProtocols.None,
UseSSL = false,
};
this.BuildServerOptions();
this.CreateMqttServer();
}
public override async Task StartAsync(CancellationToken cancellationToken)
{
await this.StartMqttServerAsync();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
mqttServiceLogger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
await Task.Delay(1000, stoppingToken);
}
}
private void BuildServerOptions()
{
try
{
IPAddress ipAddress = IPAddress.Parse(this.mqttConfiguration.BrokerHostName);
MqttServerOptionsBuilder optionsBuilder = new MqttServerOptionsBuilder();
if (this.mqttConfiguration.UseSSL)
{
//// TODO: Implement insert certification
optionsBuilder.WithClientCertificate()
.WithEncryptionSslProtocol(this.mqttConfiguration.MqttSslProtocol);
}
optionsBuilder.WithDefaultEndpointBoundIPAddress(ipAddress)
.WithDefaultEndpointPort(this.mqttConfiguration.BrokerPort)
.WithConnectionValidator(MQTTConnectionValidator)
.WithSubscriptionInterceptor(c =>
{
c.AcceptSubscription = true;
LogMessage(mqttServiceLogger, c, true);
})
.WithApplicationMessageInterceptor(c =>
{
c.AcceptPublish = true;
LogMessage(mqttServiceLogger, c);
});
this.mqttServerOptions = optionsBuilder.Build();
}
catch (Exception ex)
{
mqttServiceLogger.LogError(ex.Message);
throw;
}
}
private void CreateMqttServer()
{
try
{
this.mqttServer = new MqttFactory().CreateMqttServer();
//// Add handlers for server
this.mqttServer.UseClientConnectedHandler(ClientConnectedHandler);
this.mqttServer.UseClientDisconnectedHandler(ClientDisconnectedHandler);
this.mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(args =>
{
try
{
string clientID = args.ClientId;
TopicFilter topicFilter = args.TopicFilter;
string topicString = ConvertTopicFilterToString(topicFilter);
mqttServiceLogger.LogInformation($"[{DateTime.Now}] Client '{clientID}' subscribed to {topicString}.");
}
catch (Exception ex)
{
mqttServiceLogger.LogError(ex.Message);
}
});
this.mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(args =>
{
try
{
string clientID = args.ClientId;
string topicFilter = args.TopicFilter;
mqttServiceLogger.LogInformation($"[{DateTime.Now}] Client '{clientID}' un-subscribed to {topicFilter}.");
}
catch (Exception ex)
{
mqttServiceLogger.LogError(ex.Message);
}
});
}
catch (Exception ex)
{
mqttServiceLogger.LogError(ex.Message);
throw;
}
}
private async Task StartMqttServerAsync()
{
try
{
if (this.mqttServerOptions == null)
{
throw new ArgumentNullException(nameof(this.mqttServerOptions));
}
await this.mqttServer.StartAsync(this.mqttServerOptions);
}
catch (Exception ex)
{
mqttServiceLogger.LogError(ex.Message);
throw;
}
}
public static void ClientConnectedHandler(MqttServerClientConnectedEventArgs args)
{
try
{
string clientID = args.ClientId;
}
catch (Exception ex)
{
mqttServiceLogger.LogError(ex.Message);
}
}
public static void ClientDisconnectedHandler(MqttServerClientDisconnectedEventArgs args)
{
try
{
string clientID = args.ClientId;
MqttClientDisconnectType mqttClientDisconnectType = args.DisconnectType;
}
catch (Exception ex)
{
mqttServiceLogger.LogError(ex.Message);
}
}
private static string ConvertTopicFilterToString(TopicFilter topicFilter)
{
string output = string.Empty;
if (topicFilter != null)
{
output = $"{topicFilter.Topic} - {topicFilter.QualityOfServiceLevel.ToString()}";
}
return output;
}
/// <summary>
/// Logs the message from the MQTT subscription interceptor context.
/// </summary>
/// <param name="context">The MQTT subscription interceptor context.</param>
/// <param name="successful">A <see cref="bool"/> value indicating whether the subscription was successful or not.</param>
private static void LogMessage(ILogger<MQTTService> logger, MqttSubscriptionInterceptorContext context, bool successful)
{
if (context == null)
{
return;
}
logger.LogInformation(successful ? $"New subscription: ClientId = {context.ClientId}, TopicFilter = {context.TopicFilter}" : $"Subscription failed for clientId = {context.ClientId}, TopicFilter = {context.TopicFilter}");
}
/// <summary>
/// Logs the message from the MQTT message interceptor context.
/// </summary>
/// <param name="context">The MQTT message interceptor context.</param>
private static void LogMessage(ILogger<MQTTService> logger, MqttApplicationMessageInterceptorContext context)
{
if (context == null)
{
return;
}
var payload = context.ApplicationMessage?.Payload == null ? null : Encoding.UTF8.GetString(context.ApplicationMessage?.Payload);
logger.LogInformation(
$"Message: ClientId = {context.ClientId}, Topic = {context.ApplicationMessage?.Topic},"
+ $" Payload = {payload}, QoS = {context.ApplicationMessage?.QualityOfServiceLevel},"
+ $" Retain-Flag = {context.ApplicationMessage?.Retain}");
}
/// <summary>
/// Logs the message from the MQTT connection validation context.
/// </summary>
/// <param name="context">The MQTT connection validation context.</param>
private static void LogMessage(ILogger<MQTTService> logger, MqttConnectionValidatorContext context)
{
if (context == null)
{
return;
}
logger.LogInformation(
$"New connection: ClientId = {context.ClientId}, Endpoint = {context.Endpoint},"
+ $" Username = {context.Username}, CleanSession = {context.CleanSession}");
}
}
internal class MQTTConfiguration
{
public string BrokerHostName { get; set; }
public bool UseSSL { get; set; }
public SslProtocols MqttSslProtocol { get; set; }
public int BrokerPort { get; set; } = 1883;
}
}