An Elixir wrapper around the Erlang emqtt library.
Why this package?
- supports MQTT v3.0, v3.1.1, and v5.0
- supports clean_session/clean_start
- simplifies usage to just defining opts and implementing a message handler
The package can be installed by adding exmqtt to your list of dependencies in
mix.exs:
def deps do
[
{:exmqtt, github: "ryanwinchester/exmqtt", branch: "master"}
]
endSometimes you might have dependency issues because of the emqtt dependencies
using git links and tags (see: emqx/emqtt#100),
so you might need to do:
def deps do
[
{:exmqtt, github: "ryanwinchester/exmqtt", branch: "master"},
{:gun, "~> 1.3.0", override: true},
{:cowlib, "~> 2.6.0", override: true}
]
endNote: This is not available in hex, and there are no plans to do so unless
emqtt starts consistently and reliably publishing to hex (they do publish to
hex but not consistently and reliably).
By default emqtt compiles Quic library, if it is not necessary is
possible set the env variable BUILD_WITHOUT_QUIC to 1:
BUILD_WITHOUT_QUIC=1 iex -S mix
You can use the GenServer or the Supervisor like so:
ExMQTT.start_link(opts)or
ExMQTT.Supervisor.start_link(opts)You probably just want to add either to your application's supervision tree.
ExMQTT.publish(message, topic, qos)
ExMQTT.subscribe(topic, qos)
ExMQTT.unsubscribe(topic)
ExMQTT.publish_sync(message, topic, qos)
ExMQTT.subscribe_sync(topic, qos)
ExMQTT.unsubscribe_sync(topic){:name, atom}
{:owner, pid}
{:message_handler, module}
{:puback_handler, module}
{:publish_handler, module}
{:disconnect_handler, module}
{:host, binary}
{:hosts, [{binary, :inet.port_number()}]}
{:port, :inet.port_number()}
{:tcp_opts, [:gen_tcp.option()]}
{:ssl, boolean}
{:ssl_opts, [:ssl.ssl_option()]}
{:ws_path, binary}
{:connect_timeout, pos_integer}
{:bridge_mode, boolean}
{:client_id, iodata}
{:clean_start, boolean}
{:username, iodata}
{:password, iodata}
{:protocol_version, :"3.1" | :"3.1.1" | :"5"}
{:keepalive, non_neg_integer}
{:max_inflight, pos_integer}
{:retry_interval, timeout}
{:will_topic, iodata}
{:will_payload, iodata}
{:will_retain, boolean}
{:will_qos, pos_integer}
{:will_props, %{atom => term}}
{:auto_ack, boolean}
{:ack_timeout, pos_integer}
{:force_ping, boolean}
{:properties, %{atom => term}}
{:subscriptions, [{topic :: binary, qos :: non_neg_integer}]}
{:start_when, {mfa, retry_in :: non_neg_integer}}Note:
- The
optsare mostly the same as:emqtt.option(), but they are different, so use the type defs in this library opts.ssl_optsare erlang's:ssl.option()opts.handler_functionstype is defined hereopts.start_whenis for controller the GenServer'shandle_continue/2callback, so you can add an init condition. This is handy for example if you need to wait for the network to be ready before you try to connect to the MQTT broker. The value is a tuple{start_when, retry_in}wherestart_whenis a{module, function, arguments}(MFA) tuple for a function that resolves to abooleanwhich determines when we actually finishinit, andretry_inis the time to sleep (in ms) before we try again.- To work with common CA, it is useful to use
certifi
[
host: "127.0.0.1",
port: 8883,
protocol_version: :"5",
ssl: true,
client_id: "client-02",
username: "user-01",
password: "mysecretprivates",
clean_start: false,
ssl_opts: [
cacertfile: '/etc/mqtt/certs/all-ca.crt',
keyfile: '/etc/mqtt/certs/client.key',
certfile: '/etc/mqtt/certs/client.crt'
],
start_when: {{MyProject.Network, :connected?, []}, 2000},
message_handler: {MyApp.MQTTMessageHandler, []},
subscriptions: [
{"foo/#", 1},
{"baz/+", 0}
]
]the same example with certifi:
[
host: "127.0.0.1",
port: 8883,
protocol_version: :"5",
ssl: true,
client_id: "client-02",
username: "user-01",
password: "mysecretprivates",
clean_start: false,
ssl_opts: [
cacerts: :certifi.cacerts(),
keyfile: '/etc/mqtt/certs/client.key',
certfile: '/etc/mqtt/certs/client.crt'
],
start_when: {{MyProject.Network, :connected?, []}, 2000},
message_handler: {MyApp.MQTTMessageHandler, []},
subscriptions: [
{"foo/#", 1},
{"baz/+", 0}
]
]defmodule MyApp.MQTTMessageHandler do
@behaviour ExMQTT.MessageHandler
@impl true
def handle_message(["foo", "bar"], message, _extra) do
# Matches on "foo/bar"
end
def handle_message(["foo", "bar" | _rest], message, _extra) do
# Matches on "foo/bar/#"
end
def handle_message(["baz", buzz], message, _extra) do
# Matches on "baz/+"
end
def handle_message(topic, message, _extra) do
# Catch-all
end
end