0

我有一个示例 MQTTNet 服务器,我想将其部署为 docker 容器。下面是作为 Net Core Web App (net core 2.2) 一部分的服务器代码,我将在命令行中手动构建并部署到 Docker:

我正在使用 net core 2.2,mqttnet 3.0.8。

“docker build -t mqttwebservice 。” “docker run -d -p 32780:80 -p 1883:1883 --name myapp2 mqttwebservice”

部署成功。但是,我无法连接到 MQTT 服务器 (MqttCommunicationTimedOutException)。

1)什么会阻止mqtt客户端与服务器通信?

2) 如何知道服务器是否正确启动?

DockerFile

FROM mcr.microsoft.com/dotnet/core/aspnet:3.1-buster-slim AS base
WORKDIR /app
EXPOSE 80
EXPOSE 1883

FROM mcr.microsoft.com/dotnet/core/sdk:3.1-buster AS build
WORKDIR /src
COPY ["MQTTWebService.csproj", "MQTTWebService/"]
RUN dotnet restore "MQTTWebService/MQTTWebService.csproj"
COPY . .
WORKDIR "/src/MQTTWebService"
COPY . .
RUN dotnet build "MQTTWebService.csproj" -c Release -o /app/build

FROM build AS publish
RUN dotnet publish "MQTTWebService.csproj" -c Release -o /app/publish

FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "MQTTWebService.dll"]
namespace MQTTWebService
{
    using System;
    using System.Net;
    using System.Security.Authentication;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Extensions.Hosting;
    using Microsoft.Extensions.Logging;
    using MQTTnet;
    using MQTTnet.Server;

    public class MQTTService : BackgroundService
    {
        private static ILogger<MQTTService> mqttServiceLogger;

        private MQTTConfiguration mqttConfiguration;

        private IMqttServerOptions mqttServerOptions;

        private IMqttServer mqttServer;


        /// <summary>
        /// TODO: Implement client connection validator here
        /// </summary>
        private Action<MqttConnectionValidatorContext> MQTTConnectionValidator = (c) =>
        {
            LogMessage(mqttServiceLogger, c);
        };



        /// <summary>
        /// Constructor
        /// </summary>
        /// <param name="logger"></param>
        public MQTTService(ILogger<MQTTService> logger)
        {
            mqttServiceLogger = logger;

            this.mqttConfiguration = new MQTTConfiguration
            {
                BrokerHostName = "127.0.0.1",
                BrokerPort = 1883,
                MqttSslProtocol = SslProtocols.None,
                UseSSL = false,
            };
            this.BuildServerOptions();
            this.CreateMqttServer();
        }


        public override async Task StartAsync(CancellationToken cancellationToken)
        {
            await this.StartMqttServerAsync();
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                mqttServiceLogger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
                await Task.Delay(1000, stoppingToken);
            }
        }


        private void BuildServerOptions()
        {
            try
            {
                IPAddress ipAddress = IPAddress.Parse(this.mqttConfiguration.BrokerHostName);
                MqttServerOptionsBuilder optionsBuilder = new MqttServerOptionsBuilder();

                if (this.mqttConfiguration.UseSSL)
                {
                    //// TODO: Implement insert certification
                    optionsBuilder.WithClientCertificate()
                        .WithEncryptionSslProtocol(this.mqttConfiguration.MqttSslProtocol);
                }

                optionsBuilder.WithDefaultEndpointBoundIPAddress(ipAddress)
                    .WithDefaultEndpointPort(this.mqttConfiguration.BrokerPort)
                    .WithConnectionValidator(MQTTConnectionValidator)
                    .WithSubscriptionInterceptor(c =>
                    {
                        c.AcceptSubscription = true;
                        LogMessage(mqttServiceLogger, c, true);
                    })
                    .WithApplicationMessageInterceptor(c =>
                    {
                        c.AcceptPublish = true;
                        LogMessage(mqttServiceLogger, c);
                    });

                this.mqttServerOptions = optionsBuilder.Build();
            }
            catch (Exception ex)
            {
                mqttServiceLogger.LogError(ex.Message);
                throw;
            }
        }

        private void CreateMqttServer()
        {
            try
            {
                this.mqttServer = new MqttFactory().CreateMqttServer();

                //// Add handlers for server
                this.mqttServer.UseClientConnectedHandler(ClientConnectedHandler);
                this.mqttServer.UseClientDisconnectedHandler(ClientDisconnectedHandler);

                this.mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(args =>
                {
                    try
                    {
                        string clientID = args.ClientId;
                        TopicFilter topicFilter = args.TopicFilter;

                        string topicString = ConvertTopicFilterToString(topicFilter);
                        mqttServiceLogger.LogInformation($"[{DateTime.Now}] Client '{clientID}' subscribed to {topicString}.");
                    }
                    catch (Exception ex)
                    {
                        mqttServiceLogger.LogError(ex.Message);
                    }
                });

                this.mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(args =>
                {
                    try
                    {
                        string clientID = args.ClientId;
                        string topicFilter = args.TopicFilter;

                        mqttServiceLogger.LogInformation($"[{DateTime.Now}] Client '{clientID}' un-subscribed to {topicFilter}.");
                    }
                    catch (Exception ex)
                    {
                        mqttServiceLogger.LogError(ex.Message);
                    }
                });
            }
            catch (Exception ex)
            {
                mqttServiceLogger.LogError(ex.Message);
                throw;
            }
        }

        private async Task StartMqttServerAsync()
        {
            try
            {
                if (this.mqttServerOptions == null)
                {
                    throw new ArgumentNullException(nameof(this.mqttServerOptions));
                }

                await this.mqttServer.StartAsync(this.mqttServerOptions);
            }
            catch (Exception ex)
            {
                mqttServiceLogger.LogError(ex.Message);
                throw;
            }
        }



        public static void ClientConnectedHandler(MqttServerClientConnectedEventArgs args)
        {
            try
            {
                string clientID = args.ClientId;
            }
            catch (Exception ex)
            {
                mqttServiceLogger.LogError(ex.Message);
            }
        }

        public static void ClientDisconnectedHandler(MqttServerClientDisconnectedEventArgs args)
        {
            try
            {
                string clientID = args.ClientId;
                MqttClientDisconnectType mqttClientDisconnectType = args.DisconnectType;
            }
            catch (Exception ex)
            {
                mqttServiceLogger.LogError(ex.Message);
            }
        }

        private static string ConvertTopicFilterToString(TopicFilter topicFilter)
        {
            string output = string.Empty;

            if (topicFilter != null)
            {
                output = $"{topicFilter.Topic} - {topicFilter.QualityOfServiceLevel.ToString()}";
            }

            return output;
        }

        /// <summary> 
        /// Logs the message from the MQTT subscription interceptor context. 
        /// </summary> 
        /// <param name="context">The MQTT subscription interceptor context.</param> 
        /// <param name="successful">A <see cref="bool"/> value indicating whether the subscription was successful or not.</param> 
        private static void LogMessage(ILogger<MQTTService> logger, MqttSubscriptionInterceptorContext context, bool successful)
        {
            if (context == null)
            {
                return;
            }

            logger.LogInformation(successful ? $"New subscription: ClientId = {context.ClientId}, TopicFilter = {context.TopicFilter}" : $"Subscription failed for clientId = {context.ClientId}, TopicFilter = {context.TopicFilter}");
        }

        /// <summary>
        /// Logs the message from the MQTT message interceptor context.
        /// </summary>
        /// <param name="context">The MQTT message interceptor context.</param>
        private static void LogMessage(ILogger<MQTTService> logger, MqttApplicationMessageInterceptorContext context)
        {
            if (context == null)
            {
                return;
            }

            var payload = context.ApplicationMessage?.Payload == null ? null : Encoding.UTF8.GetString(context.ApplicationMessage?.Payload);

            logger.LogInformation(
                $"Message: ClientId = {context.ClientId}, Topic = {context.ApplicationMessage?.Topic},"
                + $" Payload = {payload}, QoS = {context.ApplicationMessage?.QualityOfServiceLevel},"
                + $" Retain-Flag = {context.ApplicationMessage?.Retain}");
        }

        /// <summary> 
        /// Logs the message from the MQTT connection validation context. 
        /// </summary> 
        /// <param name="context">The MQTT connection validation context.</param> 
        private static void LogMessage(ILogger<MQTTService> logger, MqttConnectionValidatorContext context)
        {
            if (context == null)
            {
                return;
            }

            logger.LogInformation(
                $"New connection: ClientId = {context.ClientId}, Endpoint = {context.Endpoint},"
                + $" Username = {context.Username}, CleanSession = {context.CleanSession}");
        }
    }


    internal class MQTTConfiguration
    {
        public string BrokerHostName { get; set; }
        public bool UseSSL { get; set; }
        public SslProtocols MqttSslProtocol { get; set; }
        public int BrokerPort { get; set; } = 1883;
    }
}
4

1 回答 1

0

您需要运行“docker ps”来查找容器的 ID 及其状态。如果容器运行状态您尝试运行“docker logs idContainer”我希望它可以帮助你

于 2020-03-03T04:22:33.453 回答