Raspi – Using MQTT on Raspberry Pi with .NET and C#

This article shows how to connect to a MQTT broker using TLS and client certificate as well as publish and subscribe topics. The proposed solution runs on any .NET Core environment and is using :NET, C# and the MQTTnet library. The three applications, publisher, subscriber and MQTT broker can run on three or two or simply all together on one Raspberry Pi.

For an introduction to MQTT see e.g. MQTT Essentials from HiveMQ. Even though we are not using the HiveMQ broker here, this is a good introduction. To get information about how to setup the Mosquitto MQTT broker used for this example, see Raspi – Setting up Mosquitto (MQTT broker) on Raspberry Pi / Docker.

The complete example source code can be found here

Publisher application (iot-raspi-mqtt-pub-01)

The program polls the buttons (GPIO ports have to be configured in appsettings.json) and publishes changes (button press/release) to the MQTT topics inputs/button{buttonIndex}/isPressed and inputs/button{buttonIndex}/lastChangedAt.

The Main() method initializes the logger (Serilog), see also appsettings.json for logger configuration. Log.Here() is an extension method to Serilog.ILogger, it automatically extends the logger context with information about class name, method name, source file path and line number. This information may be written to log outputs (see configuration).

SendMessagesIfButtonHasChanged() checks if button state (PinValue.High, PinValue.Low) has changed. If so, publish two MQTT messages inputs/button{buttonIndex}/isPressed and inputs/button{buttonIndex}/lastChanged. The button inputs have PinValue.High state when released and PinValue.Low wenn pressed. The buttons configured in appsettings.json are referenced in this method with a zero based index.

using System;
using System.Device.Gpio;

using Iot.Raspi.Logger;

namespace Iot.Raspi.Mqtt
{
    class Program
    {
        // Set logger context
        private static Serilog.ILogger Log => Serilog.Log.ForContext<Program>();

        private static GpioController? gpio;
        private static List<PinValue> previousButtonStates = new List<PinValue>();
        private static MqttClient? mqttClient;

        static async Task Main(string[] args)
        {
            try 
            {
                IotLogger.Init("appsettings.json");
                Log.Here().Information("Application starting ...");

                // InitialiZe list with previous button states equal to 'high' (button not pressed).
                previousButtonStates = Enumerable.Repeat(PinValue.High, Config.Instance.Inputs.ButtonPins.Length).ToList();

                // Configure GPIO digital inputs
                gpio = new GpioController();
                foreach (var buttonPin in Config.Instance.Inputs.ButtonPins)
                {
                    gpio.OpenPin(buttonPin, PinMode.InputPullUp);    
                }

                // Configure MQTT client
                mqttClient = new MqttClient();
                await mqttClient.StartAsync();
                
                // Read digital inputs and publish changes
                while (true)
                {
                    for (var i=0; i<Config.Instance.Inputs.ButtonPins.Length; i++)
                    {
                        await SendMessageIfButtonHasChanged(i);
                    }

                    // Delay should be longer than max debouncing time of button
                    Thread.Sleep(20);
                }
            }
            catch (Exception ex)
            {
                Log.Here().Fatal("Failed: {exception}", ex.Message );
            }
            finally
            {
                Serilog.Log.CloseAndFlush();
            }
        }

        private static async Task SendMessagesIfButtonHasChanged(int buttonIndex)
        {
            if (gpio == null) { throw new Exception("Failed to initialize GPIO."); }
            if (mqttClient == null) { throw new Exception("Failed to initalize MQTT client."); }

            var buttonState = gpio.Read(Config.Instance.Inputs.ButtonPins[buttonIndex]);
            if (buttonState != previousButtonStates[buttonIndex])
            {
                previousButtonStates[buttonIndex] = buttonState;
                await mqttClient.PublishMessageAsync(
                    $"inputs/button{buttonIndex}/isPressed", 
                    (buttonState == PinValue.Low).ToString());
                await mqttClient.PublishMessageAsync(
                    $"inputs/button{buttonIndex}/lastChangedAt", 
                    DateTime.Now.ToString("o"));
            }
        }

    }
}

Subscriber application (iot-raspi-mqtt-sub-01)

The program is very similar to the previous one but instead of publishing it subscribes topic changes using the following patterns:

  • Topic pattern: “inputs/+/isPressed
    Callback: OnButtonStateChanged()
    This callback method updates the state of the corresponding LED, i.e. the LED is switched on when a corresponding button is pressed. and updates the corresponding LEDs. GPIO ports for LEDs have to be configured in appsettings.json.
  • Topic pattern: “inputs/#
    Callback OnButtonChanged()
    This callback method just writes the topics and their payloads (values) to the logger.
using System;
using System.Text;
using System.Text.RegularExpressions;
using System.Device.Gpio;
using MQTTnet;

using Iot.Raspi.Logger;

namespace Iot.Raspi.Mqtt
{
    class Program
    {
        // Set logger context
        private static Serilog.ILogger Log => Serilog.Log.ForContext<Program>();

        private static GpioController? gpio;
        private static MqttClient? mqttClient;

        static async Task Main(string[] args)
        {
            try 
            {
                IotLogger.Init("appsettings.json");
                Log.Here().Information("Application starting ...");

                // Configure GPIO digital inputs
                gpio = new GpioController();
                foreach (var ledPin in Config.Instance.Outputs.LedPins)
                {
                    gpio.OpenPin(ledPin, PinMode.Output);
                    gpio.Write(ledPin, PinValue.Low);
                }

                // Configure MQTT client
                mqttClient = new MqttClient();
                await mqttClient.AddSubscriptionAsync("inputs/+/isPressed", OnButtonStateChanged);
                await mqttClient.AddSubscriptionAsync("inputs/#", OnButtonChanged);
                await mqttClient.StartAsync();
                
                // Do nothing
                while (true)
                {
                    Thread.Sleep(1000);
                }
            }
            catch (Exception ex)
            {
                Log.Here().Fatal("Failed: {exception}", ex.Message );
            }
            finally
            {
                Serilog.Log.CloseAndFlush();
            }
        }

        private static void OnButtonStateChanged(MqttApplicationMessageReceivedEventArgs ev)
        {
            if (gpio == null) { throw new Exception("Failed to initialize GPIO."); }

            Log.Here().Information("Topic has changed: {topic} = '{payload}'.", 
                ev.ApplicationMessage.Topic, 
                Encoding.UTF8.GetString(ev.ApplicationMessage.Payload));

            var topic = ev.ApplicationMessage.Topic;
            var buttonIndex = GetButtonIndexFromTopic(topic);
            var ledPin = Config.Instance.Outputs.LedPins[buttonIndex];
            var payload = Encoding.UTF8.GetString(ev.ApplicationMessage.Payload);
            var ledValue = String.Compare(payload, "true", true) == 0 ? PinValue.High : PinValue.Low;
            gpio.Write(ledPin, ledValue);
        }

        private static void OnButtonChanged(MqttApplicationMessageReceivedEventArgs ev)
        {
            Log.Here().Information("Topic has changed: {topic} = '{payload}'.", 
                ev.ApplicationMessage.Topic, 
                Encoding.UTF8.GetString(ev.ApplicationMessage.Payload));
        }

        private static int GetButtonIndexFromTopic(string topic)
        {
            var index = -1;
            var pattern = "inputs\\/button(\\d*)\\/isPressed";
            var regex = new Regex(pattern, RegexOptions.IgnoreCase);
            var match = regex.Match(topic);
            if (match.Groups.Count == 2)
            {
                int.TryParse(match.Groups[1].Captures[0].Value, out index);
            }
            return index;
        }
    }
}

MqttClient class – a thin wrapper around MQTTnet

MqttClass is used by both applications, publisher and subscriber.

The main goal of this class is to remove most of the MQTTnet specific stuff from the applications:

  • Initialization with setup of connection to MQTT broker with TLS 1.3 and client certificate.
  • Subscriptions for topics: Allow to register individual callback functions for individual topics (including # and + wildcards).
  • Publish topics.

Initialization

The constructor creates a MQTTnet.Extensions.ManagedClient.IManagedMqttClient object and configures some callback functions:

public MqttClient(MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311)
{
	Log.Here().Verbose("Ctor ...");
	this.protocolVersion = protocolVersion;
	topicChangedEventHandlers = new Dictionary<string, TopicChangedEventHandler>();
	
	// Create IMagagedMqttClient, see also https://github.com/dotnet/MQTTnet/wiki/Client
	managedMqttClient = new MqttFactory().CreateManagedMqttClient();
	managedMqttClient.ConnectingFailedHandler = new ConnectingFailedHandlerDelegate(this.OnConnectingFailed);
	managedMqttClient.ApplicationMessageProcessedHandler = new ApplicationMessageProcessedHandlerDelegate(this.OnMessagePublished);
	managedMqttClient.UseApplicationMessageReceivedHandler(ev => OnMessageReceived(ev));
}

The StartAsync() method finally starts the MQTT client.

Important: Subscriptions should be added before calling this method to ensure not to lose any retained messages published already before this method is called (i.e. when using PublishMessageAsync() with parameter retain set to true, see below).

public async Task StartAsync()
{
	Log.Here().Verbose("Starting MQTT client ...");
	await managedMqttClient.StartAsync(GetManagedClientOptions());
	Log.Here().Information("MQTT client started.");
}

The configuration used for starting the MQTT client is prepared by the following two methods:

  • GetManagedClientOptions()
    Sets various configuration parameters of the ManagedMqttClient, such as the hostname of the broker and port number to use, client ID, username/password, MQTT protocol version, various timeouts values and whether to use TLS.
  • GetTlsParameters()
    If TLS is to be used, this method sets the TLS parameters, such as TLS version (1.3), server certificate validation handler and whether to use a client certificate including its CA certificate.

Most configuration settings are read from appsettings.json.

private ManagedMqttClientOptions GetManagedClientOptions()
{
	var configMqtt = Config.Instance.Mqtt;
	Log.Here().Debug("Preparing MQTT client options:");
	Log.Here().Debug("    server = {server}:{port}", configMqtt.BrokerHost, configMqtt.UseSecureConnection ? configMqtt.BrockerSecurePort : configMqtt.BrockerPort);            Log.Here().Debug("    clientId={clientId}");
	Log.Here().Debug("    use TLS = {useTls}", configMqtt.UseSecureConnection);
	Log.Here().Debug("    username = {username}", configMqtt.Username);
	Log.Here().Debug("    MQTT protocol version = {protocolVersion}", protocolVersion);
	Log.Here().Debug("    auto reconnect delay = {autoReconnectDelay} seconds", configMqtt.AutoReconnectDelay);
	Log.Here().Debug("    keep alive period = {keepAlivePeriod} seconds", configMqtt.KeepAlivePeriod);
	IMqttClientOptions clientOptions;
	if (configMqtt.UseSecureConnection)
	{
		var tlsParameters = GetTlsParameters();
		clientOptions = new MqttClientOptionsBuilder()
			.WithTcpServer(configMqtt.BrokerHost, configMqtt.BrockerSecurePort)
			.WithClientId(configMqtt.ClientId)
			.WithCredentials(configMqtt.Username, configMqtt.Password)
			.WithTls(tlsParameters)
			.WithProtocolVersion(protocolVersion)
			.WithKeepAlivePeriod(TimeSpan.FromSeconds(configMqtt.KeepAlivePeriod))
			.Build();
	}
	else 
	{
		clientOptions = new MqttClientOptionsBuilder()
			.WithTcpServer(configMqtt.BrokerHost, configMqtt.BrockerPort)
			.WithClientId(configMqtt.ClientId)
			.WithCredentials(configMqtt.Username, configMqtt.Password)
			.WithProtocolVersion(protocolVersion)
			.WithKeepAlivePeriod(TimeSpan.FromSeconds(configMqtt.KeepAlivePeriod))
			.Build();
	}
	var managedClientOptions = new ManagedMqttClientOptionsBuilder()
		.WithClientOptions(clientOptions)
		.WithAutoReconnectDelay(TimeSpan.FromSeconds(configMqtt.AutoReconnectDelay))
		.Build();
	return managedClientOptions;
}

private MqttClientOptionsBuilderTlsParameters GetTlsParameters()
{
	var configMqtt = Config.Instance.Mqtt;
	var configCertificates = Config.Instance.Certificates;
	var tlsParameters = new MqttClientOptionsBuilderTlsParameters() {
		AllowUntrustedCertificates = false,
		UseTls = true,
		SslProtocol = System.Security.Authentication.SslProtocols.Tls13,
		CertificateValidationHandler = certificateContext => { 
			return Certificates.ValidateServerCertificate(new X509Certificate2(certificateContext.Certificate)); 
		},
	};
	Log.Here().Debug("    use client certificate = {useClientCertificate}", configMqtt.UseClientCertificate);
	if (configMqtt.UseClientCertificate)
	{
		var caCertFilePath = Path.Combine(Directory.GetCurrentDirectory(), configCertificates.CaCertificateFilePath);
		var caCert = new X509Certificate(File.ReadAllBytes(caCertFilePath));
		var clientCertFilePath = Path.Combine(Directory.GetCurrentDirectory(), configCertificates.ClientCertificateFilePath);
		var clientCert = new X509Certificate2(File.ReadAllBytes(clientCertFilePath), configCertificates.ClientCertificatePassword);
		tlsParameters.Certificates = new List<X509Certificate>()
		{
			clientCert, caCert
		};
	}
	return tlsParameters;
}

Publishing messages

The method PublishMessageAsync() publishes a MQTT message:

  • topic – The MQTT topic to which the message belongs.
  • payload – The massage.
  • contentType – may be used to describe the type of the payload (used with MQTT protocol version 5, see also here).
  • retain – defines whether the broker shall retain the last message with this topic (see also here).
    Important: When using retained messages in a subscribing client, AddSubscriptionAsync() should be called before StartAsync() to ensure not to lose any retained messages published already before StartAsync() is called.
  • qosLevel – Desired Quality of Service level for transmission client -> MQTT broker (see also here).

A messages is published (sent to the broker) asynchronously. I.e. PublishMessage() writes the message into a output queue and returns more or less immediately. Once the message is later sent to the broker, the OnMessagePublished() callback method is called.

public async Task PublishMessageAsync(
	string topic, 
	string payload, 
	string contentType = "", 
	bool retain = false, 
	MqttQualityOfServiceLevel qosLevel = MqttQualityOfServiceLevel.AtLeastOnce)
{
	Log.Here().Verbose("Publishing {topic} = '{payload}' ...", topic, payload);
	var mqttMessage = new MqttApplicationMessageBuilder()
		.WithTopic(topic)
		.WithPayload(payload)
		.WithPayloadFormatIndicator(MQTTnet.Protocol.MqttPayloadFormatIndicator.CharacterData)
		.WithContentType(contentType)
		.WithRetainFlag(retain)
		.WithQualityOfServiceLevel(qosLevel)
		.Build();
	var res = await managedMqttClient.PublishAsync(mqttMessage);

	// OnMessagePublished() is called as soon as the message has been sent to the MQTT broker.
	// That's why we don't write to logger here.
}

Subscribing messages

The method AddSubscriptionAsync() is used to subscribe for MQTT messages:

  • topic – Topic to subscribe, may include wildcards # or +.
  • topicEventHandler – Handler/callback to be called each time a message with this topic is received, i.e. when topic’s payload has changed. Any existing subscription with exactly the same topic is replaced, i.e. the handler/callback is replaced.
  • qosLevel – Desired Quality of Service level for transmission MQTT broker -> client (see also here).

The following wildcard characters may be use in the topic parameter:

  • + – wildcard for a single level in ‘topic path’
  • # – wildcard for multiple levels in ‘topic path’
  • Examples:
    inputs/+/isPressed” –
    inputs/button1/+
    inputs/#
public async Task AddSubscriptionAsync(
	string topic, 
	TopicChangedEventHandler topicEventHandler,
	MqttQualityOfServiceLevel qosLevel = MqttQualityOfServiceLevel.AtLeastOnce)
{
	Log.Here().Verbose("Adding subsription for {topic} ...");
	if (topic == null) { throw new ArgumentNullException(nameof(topic)); }
	if (topicEventHandler == null) { throw new ArgumentNullException(nameof(topicEventHandler)); }

	if (topicChangedEventHandlers.ContainsKey(topic))
	{
		topicChangedEventHandlers.Remove(topic);
		await managedMqttClient.UnsubscribeAsync(topic);
	}
	topicChangedEventHandlers.Add(topic, topicEventHandler);
	await managedMqttClient.SubscribeAsync(topic, qosLevel);
	
	Log.Here().Information("Added subscription for {topic}.", topic);
}

Callbacks

The (private) callback methods setup in the constructor are the followings:

  • OnMessagePublished()
    Called as soon as a pulished message has been sent to the MQTT broker. This method just writes an information to the logger.
  • OnMessageReceived()
    Called every time a message is received for a subscribed topic. This method calls the handler/callback registered for this topic, see also AddSubscriptionAsync().
  • OnConnectingFailed()
    Called upon connection failure. This method just writes an information to the logger.
    Remark: The ManagedMqttClient automatically reconnects to the broker as soon as possible.
private void OnMessagePublished(ApplicationMessageProcessedEventArgs ev)
{
	var topic = ev.ApplicationMessage.ApplicationMessage.Topic;
	var binaryPayload = ev.ApplicationMessage.ApplicationMessage.Payload;
	var payload = Encoding.UTF8.GetString(binaryPayload, 0, binaryPayload.Length);
	var now = DateTime.Now.ToString("HH:mm:ss.fff");
	Log.Here().Information("Published {topic} = '{payload}'.", topic, payload);
}

private void OnMessageReceived(MqttApplicationMessageReceivedEventArgs ev)
{
	var topic = ev.ApplicationMessage.Topic;
	Log.Here().Debug("Message received for {topic} ...", topic);
	foreach (var handler in topicChangedEventHandlers)
	{
		if (IsMatch(topic, handler.Key))
		{
			// Log the match
			var binaryPayload = ev.ApplicationMessage.Payload;
			var payload = Encoding.UTF8.GetString(binaryPayload, 0, binaryPayload.Length);
			var method = handler.Value.Method;
			var handlerName = $"{method.DeclaringType?.Name}.{method.Name}()";
			Log.Here().Information("Received {topic} = '{payload}'. Calling handler {handler} ...", topic, payload, handlerName);

			// Call handler
			handler.Value(ev);
		}
	}
}

private void OnConnectingFailed(ManagedProcessFailedEventArgs ev)
{
	var msg = ev.Exception.Message;
	if (ev.Exception.InnerException != null) { msg += ": " + ev.Exception.InnerException.Message; }
	Log.Here().Error("Connection failed: {message}.", msg);
}

Certificate validation

Certificate validation is implemented in its own Certificates class by the static method ValidateServerCertificate():

public static bool ValidateServerCertificate(X509Certificate2 serverCertificate)
{
	var caCertificate = LoadCaCertificate();

	var ok = ValidateCertificateChain(serverCertificate, caCertificate);
	ok = ok && ValidateCertificateThumbprint(serverCertificate, Config.Instance.Certificates.ServerCertificateThumbprint);
	return ok;
}

private static bool ValidateCertificateChain(X509Certificate2 certificate, X509Certificate2? caCertificate = null)
{
	X509Chain chain = new X509Chain();
	chain.ChainPolicy.VerificationFlags = X509VerificationFlags.NoFlag;
	chain.ChainPolicy.RevocationMode = X509RevocationMode.NoCheck;
	if (caCertificate != null)
	{
		chain.ChainPolicy.TrustMode = X509ChainTrustMode.CustomRootTrust;
		chain.ChainPolicy.CustomTrustStore.Add(caCertificate);
	}
	var ok = chain.Build(certificate);
	if (!ok)
	{
		// Error messages
		Log.Here().Error("Failed to verify certificate '{subject}'.", certificate.Subject);
		foreach (var status in chain.ChainStatus)
		{
			Log.Here().Error(" - {reason}.", status.StatusInformation);
		}
	}
	return ok;
}

private static bool ValidateCertificateThumbprint(X509Certificate2 certificate, string expectedThumbprint)
{
	var thumbprint = certificate.GetCertHashString().ToLower();
	if (thumbprint != expectedThumbprint)
	{
		Console.WriteLine($"Verification of certificate thumbprint failed, certificate '{certificate.Subject}'.");
		return false;
	}    
	return true;
}

Configuration

Content of appsettings.json: for publisher application (iot-raspi-mqtt-pub-01):

{
    "mqtt": {
        "clientId": "iot-raspi-mqtt-pub-01",
        "useSecureConnection": true,
        "useClientCertificate": true,
        "brokerHost": "raspiServer",
        "brokerPort": 1883,
        "brokerSecurePort": 8883,
        "username": "mqtt-user",
        "password": "mQt-135$",
        "autoReconnectDelay": 5,
        "keepAlivePeriod": 15
    },
    "certificates": {
        "serverCertificateThumbprint": "1aa751728f4222d93beab79894b482b7ef3c4a71",
        "caCertificateFilePath": "schaeren_ca.crt",
        "clientCertificateFilePath": "device001_client.pfx",
        "clientCertificatePassword": "pX=3ku.$"
    },
    "inputs": {
        "buttonPins": [ 18, 23, 24 ]
    },
    "Serilog": {
        "MinimumLevel": {
            "Default": "Verbose",
            "Override": {
                "Iot.Raspi.Logger": "Error",
                "Iot.Raspi.Mqtt.MqttClient": "Debug"
            }
        },
        "WriteTo": [
            {
                "Name": "Console",
                "Args": { 
                    "formatter": {
                        "type": "Serilog.Templates.ExpressionTemplate, Serilog.Expressions",
                        "template": "{@t:HH:mm:ss.fff} {@l:u3} {ThreadId} [{ClassName}.{MemberName}():{LineNumber}] {@m}\n{@x}"
                    }
                }
            },
            {
                "Name": "File",
                "Args": { 
                    "path": "xxlog.txt",
                    "rollingInterval": "Day",
                    "retainedFileCountLimit": "90",
                    "formatter": {
                        "type": "Serilog.Templates.ExpressionTemplate, Serilog.Expressions",
                        "template": "{@t:HH:mm:ss.fff} {@l:u3} {ThreadId} [{SourceContext}.{MemberName}():{LineNumber}] {@m}\n{@x}"
                    }
                }
            },
            {
                "Name": "File",
                "Args": {
                  "path": "xxlog.json",
                  "rollingInterval": "Day",
                  "retainedFileCountLimit": "90",
                  "formatter": {
                    "type": "Serilog.Formatting.Json.JsonFormatter, Serilog"
                  }
                }
            },
            {
                "Name": "Seq",
                "Args": {
                  "serverUrl": "https://localhost:9001"
                }
            }
        ]
    }
}

Content of appsettings.json: for subscribe application (iot-raspi-mqtt-sub-01), only the differences are shown:

{
    "mqtt": {
        "clientId": "iot-raspi-mqtt-sub-01",
        ...
    },
    "certificates": {
        ...
        "clientCertificateFilePath": "device002_client.pfx",
        ...
    },
    "outputs": {
        "ledPins": [ 17, 27, 22 ]
    },
    ...
}

Remark: Both applications can also use the same client certificate.