How To Build an IoT Platform with Elixir Phoenix and CoAP: Part I

How To Build an IoT Platform with Elixir Phoenix and CoAP: Part I

Michał Podwórny

The number of devices going online grows every day. Connecting them all presents new challenges since IoT hardware is often limited in terms of available resources. Cheap, power-efficient microcontrollers with unreliable power sources have to talk to each other over wireless networks with low throughput and high packet loss rate. They usually use lightweight application layer protocols for communication. Some of the popular choices include messaging protocols like MQTT or XMPP, but today we’ll have a look at something quite different. Constrained Application Protocol described in RFC 7252 is specially tailored to the task.

CoAP

It’s based on the familiar client/server model. The idea is to represent accessible data as “resources”. It can be a temperature reading from a thermometer, the amount of battery life left in a device or anything else. A CoAP server makes resources available under human-readable URIs like /thermometers/5. By convention used for discoverability, all resources can be listed by hitting /.well-known/core address. Each resource can also specify a set of options which can be queried on - /.well-known/core?rt=light_switch would list all resources with the rt (resource type) parameter set to light_switch.

Resources can be interacted with using the GET, POST, PUT and DELETE requests. As you might expect, they work just like the HTTP counterparts. This similarity makes it possible to map the requests from one protocol to another and integrate CoAP systems with the Web, which we’ll try to do. Unlike HTTP, to keep things light, communication happens over UDP. Requests can be “confirmable” or “non-confirmable” depending on our needs. Large payloads are handled on the application layer to avoid IP fragmentation through a block-wise transfer.

The most interesting feature is the “observe” option. A client, while making a GET request, can pass a specific flag to start an observation. It is then added by the server to the list of observers of the given resource and continues listening for the server’s responses. It allows us to build systems passively accepting data whenever they come just like the messaging protocols do.

There are other interesting features to explore. Datagrams can be secured with DTLS. There’s also a possibility for multicast and grouping of resources. It is used in one of the biggest and most promising open source IoT frameworks IoTivity sponsored by Open Connectivity Foundation - one of the emerging IoT standards.

The Aim

Our end goal is to create a simple setup similar to the one described in this draft. We’ll have a bunch of nodes hooked up to a node directory. The nodes will register their resources in the directory. Our directory will expose an HTTP API and translate all HTTP requests to CoAP requests and route them to the correct node. We’ll also have a web application that will allow us to see and change the state of our resources through this API. The idea is that the only static IP address that anyone needs to know is the address of the resource directory - either for the node to register a new resource or for the web application to hit the HTTP API.

I’ve decided to use Elixir for the task. It’s a promising technology in the IoT world due to the Nerves project and being an awesome language in general. By the end of this article, we’ll have created an Elixir application that mocks a node and another one that implements the directory without the HTTP API part. In the following articles we’ll complete the directory, add the web UI to the setup and then we’ll try running the node directory on a Raspberry PI thanks to Nerves. Finally, we’ll get rid of the Elixir application mocking a node, implement it in C instead and put it on an actual hardware like the popular ESP8266.

Let’s get to it!

The first challenge is to find an Elixir-compatible CoAP library. The best thing I’ve come up with was the gen_coap Erlang implementation. It’s not complete and lacks documentation, but it’s more than enough for what we’re going to do here. Plus it will be a good lesson in using an Erlang library in Elixir code.

“Mix new” is a good place to start. We’re going to divide our project into 3 parts straight away. First will be the node app, second the directory app and third - common code between these two apps.

mix new coap_node
mix new coap_directory
mix new coap

To keep things short I won’t follow the whole process that I went through step-by-step, but rather I'll describe the end result. So let’s start with the common library. We’re just going to include it in coap_node and coap_directory as a dependency, so it doesn’t need to define an application. All it needs is the gen_coap dependency:

# coap/mix.exs
def application do
  []
end

defp deps do
  [
    {:gen_coap, git: "https://github.com/gotthardp/gen_coap.git"}
  ]
end

The first thing from this library that we’ll be using extensively is an Erlang record defined in gen_coap/include/coap.hrl. For compatibility with Erlang records, Elixir provides the Record module. Its defrecord/3 macro creates a set of functions to create and access data from a record. The extract/2 function allows us to extract a record definition from an Erlang file. Remember to require Record before using its macro.

# coap/lib/coap/records.ex
defmodule Coap.Records do
  require Record
  Record.defrecord :coap_content, Record.extract(
    :coap_content, from_lib: "gen_coap/include/coap.hrl"
  )
end

The main idea behind gen_coap is to create a handler for different request types and register it under a specified path. A handler should implement the :coap_resource behaviour. In the shared library, we want to define a wrapper for this behaviour that provides a default implementation of all the callbacks, plus a start/2 function to create a resource. All the callbacks correspond to specific requests made to the given path. The coap_discover/2 callback is called when the CoAP server is asked for a list of registered resources. It will make more sense once we have used it.

# coap/lib/coap/resource.ex
defmodule Coap.Resource do
  defmacro __using__(_) do
    quote do
      import Coap.Records

      @behaviour :coap_resource

      def start(path, params) do
        :coap_server_registry.add_handler(path, __MODULE__, params)
      end

      def coap_discover(prefix, args) do
        [{:absolute, prefix, args}]
      end

      def coap_get(_ch_id, _prefix, _name, _query) do
        coap_content()
      end

      def coap_post(_ch_id, _prefix, _name, _content) do
        {:ok, :content, coap_content()}
      end

      def coap_put(_ch_id, _prefix, _name, _content) do
        :ok
      end

      def coap_delete(_ch_id, _prefix, _name) do
        :ok
      end

      def coap_observe(ch_id, prefix, name, _ack) do
        {:ok, {:state, prefix, name}}
      end

      def coap_unobserve({_state, _prefix, _name}) do
        :ok
      end

      def handle_info(_message, state) do
        {:noreply, state}
      end

      def coap_ack(_ref, state) do
        {:ok, state}
      end

      defoverridable [start: 2, coap_discover: 2, coap_get: 4, coap_post: 4, coap_put: 4,
        coap_delete: 3, coap_observe: 4, coap_unobserve: 1, handle_info: 2, coap_ack: 2]
    end
  end
end

The last common piece of code will be a somewhat generic key-value storage based on :ets.

# coap/lib/coap/storage.ex
defmodule Coap.Storage do
  use GenServer

  @server_name Coap.Storage

  # GenServer interface

  def start_link(_args, _options) do
    GenServer.start_link(__MODULE__, [], name: @server_name)
  end

  def get(key) do
    GenServer.call(@server_name, {:get, key})
  end

  def set(key, change, overwrite \\ false) do
    GenServer.call(@server_name, {:set, key, change, overwrite})
  end

  def all do
    GenServer.call(@server_name, :all)
  end

  # GenServer handlers

  def init(_args) do
    ets = :ets.new(@server_name, [:set, :private])
    {:ok, ets}
  end

  def handle_call({:get, key}, _from, ets) do
    {:reply, do_get(ets, key), ets}
  end

  def handle_call({:set, key, change, overwrite}, _from, ets) do
    {:reply, do_set(ets, key, change, overwrite), ets}
  end

  def handle_call(:all, _from, ets) do
    {:reply, :ets.tab2list(ets), ets}
  end

  def handle_call(request, from, ets) do
    super(request, from, ets)
  end

  def handle_cast(request, ets) do
    super(request, ets)
  end

  # private

  defp do_get(ets, key) do
    case :ets.lookup(ets, key) do
      [{^key, value}] -> value
      _ -> :not_found
    end
  end

  defp do_set(ets, key, change, overwrite) do
    value = if(is_function(change), do: change.(do_get(ets, key)), else: change)
    insertion = if(overwrite, do: &:ets.insert/2, else: &:ets.insert_new/2)
    case insertion.(ets, {key, value}) do
      true -> value
      false -> false
    end
  end
end

CoAP Node

Armed with these tools we can start implementing the node. For now, we’ll add the gen_coap to the dependencies along with an absolute path to our library with shared code. If you look at gen_coap.app.src in the gen_coap’s source code, you’ll see that the application’s entry module is :coap_server. And it turns out that this module implements both the application behaviour and supervisor behaviour, but the application start/2 function doesn’t let you define the port it is supposed to run on. It will always fall back to the default CoAP port 5683. In order to specify which port we want the node to run on, we’ll have to put the server in our supervision tree. Also, I add a bit of code in mix.exs to read the port from an env variable and add a worker running the previously defined storage.

# coap_node/mix.exs
@default_port 5683

def application do
  [
    applications: [:logger],
    mod: {CoapNode, port}
  ]
end

defp deps do
  [
    {:gen_coap, git: "https://github.com/gotthardp/gen_coap.git"},
    {:coap, path: "../coap"}
  ]
end

defp port do
  if port = System.get_env("COAP_PORT") do
    case Integer.parse(port) do
      {port, ""} -> port
      _ -> @default_port
    end
  else
    @default_port
  end
end

# coap_node/lib/coap_node.ex
defmodule CoapNode do
  use Application

  def start(_type, port) do
    CoapNode.Supervisor.start_link(port)
  end
end

# coap_node/lib/coap_node/supervisor.ex
defmodule CoapNode.Supervisor do
  use Supervisor

  @name CoapNode.Supervisor

  def start_link(port) do
    Supervisor.start_link(__MODULE__, port, name: @name)
  end

  def init(port) do
    children = [
      supervisor(CoapNode.ServerSupervisor, [port]),
      worker(Coap.Storage, [[], []])
    ]

    supervise(children, strategy: :one_for_one)
  end
end

# coap_node/lib/coap_node/server_supervisor.ex
defmodule CoapNode.ServerSupervisor do
  def start_link(port) do
    Supervisor.start_link(:coap_server, [port])
  end
end

Now we have a CoAP server running under the port chosen with a COAP_PORT env variable. But before we test it, let’s actually add a resource to it. We’ll be using the resource wrapper that we already implemented in our library with shared code. Our resource is going to represent a switch. It will have a binary state: true for “on” and false for “off”. The state is going to be stored in the key-value storage under the same key as the path of the resource. We’ll be able to check its state by sending a GET request and change it by sending “on”, “off” or “toggle” with a PUT request. In order to test the CoAP observe option, we’ll respond to a PUT request with a notification.

# coap_node/lib/coap_node/resources/switch.ex
defmodule CoapNode.Resources.Switch do
  use Coap.Resource
  alias Coap.Storage

  def start(path, params) do
      Storage.set(path_to_string(path), false)
      super(path, params)
    end
  end

  # gen_coap handlers

  def coap_get(_ch_id, prefix, _name, _query) do
    key = path_to_string(prefix)
    case Storage.get(key) do
      :not_found -> coap_content(payload: "not found")
      value -> coap_content(payload: key <> " " <> serialize_value(value))
    end
  end

  def coap_put(_ch_id, prefix, _name, content) do
    {:coap_content, _etag, _max_age, _format, payload} = content
    key = path_to_string(prefix)
    response = process_payload(key, payload)
    :coap_responder.notify(prefix, coap_content(payload: key <> " " <> serialize_value(response)))
    :ok
  end

  # private

  defp serialize_value(value) do
    cond do
      is_boolean(value) -> if(value, do: "on", else: "off")
      true -> value
    end
  end

  defp path_to_string(path) do
    Enum.join(path, "/")
  end

  defp process_payload(storage_key, payload) do
    case payload do
      "on" -> Storage.set(storage_key, true, true)
      "off" -> Storage.set(storage_key, false, true)
      "toggle" -> Storage.set(storage_key, fn(state) -> !state end, true)
      _ -> "not recognized"
    end
  end
end

Let’s test what we’ve done so far. The easiest way to do it seems to be the Copper plugin for Firefox. It allows the browser to handle the coap:// URI scheme and shows a nice interface for interacting with the CoAP server. But first, start the server and add a resource under switches/1.

~/coap_node> COAP_PORT=50000 iex -S mix
iex(1)> CoapNode.Resources.Switch.start(["switches", "1"], [])
:ok

Now open a Firefox tab and visit coap://localhost:50000 . Here you can GET ./well-known/core to see all the registered resources, GET switches/1 to see the state of our switch and PUT a new state of the switch by specifying the outgoing payload to “on”, “off” or “toggle”. Before you update the state though, open a new tab, visit the same URL and OBSERVE the resource to see that the notifications indeed work. Have fun!

CoAP Directory

As a reminder: the idea is that the outside world won’t know the IP address of every single node. It will only know the address of the directory and will want to access all the nodes through it. So we need a CoAP endpoint to let nodes register their resources and a way to make requests to them. In the next article, we’ll expose an HTTP API, translate all the HTTP requests to CoAP and route them to the correct resources.

This time, we’ll just add the gen_coap to the list of our applications since we don’t need to specify the CoAP server port. The resource registry will actually be implemented as a CoAP resource under the path /registry. Our main module will start the main supervisor and the registry. Resource registration will work through a POST request with payload specifying the desired resource path and port on which the resource’s server is listening. Registry will then try to add an entry to the storage with a key switches/1 and value {{127, 0, 0, 1}, 50000}. It will respond with “ok” or “path taken” in case there's already a resource registered under this path.

# coap_directory/mix.exs
def application do
  [
    applications: [:logger, :gen_coap],
    mod: {CoapDirectory, []}
  ]
end

defp deps do
  [
    {:gen_coap, git: "https://github.com/gotthardp/gen_coap.git"},
    {:coap, path: "../coap"}
  ]
end

# coap_directory/lib/coap_directory.ex
defmodule CoapDirectory do
  use Application

  def start(_type, _args) do
    CoapDirectory.ResourceRegistry.start
    CoapDirectory.Supervisor.start_link
  end
end

# coap_directory/lib/coap_directory/supervisor.ex
defmodule CoapDirectory.Supervisor do
  use Supervisor

  @name CoapDirectory.Supervisor

  def start_link do
    Supervisor.start_link(__MODULE__, [], name: @name)
  end

  def init([]) do
    children = [
      worker(Coap.Storage, [[], []])
    ]

    supervise(children, strategy: :one_for_one)
  end
end

# coap_directory/lib/coap_directory/resource_registry.ex
defmodule CoapDirectory.ResourceRegistry do
  use Coap.Resource
  alias Coap.Storage

  def start do
    start(["registry"], [])
  end

  # gen_coap handlers

  def coap_post(ch_id, _prefix, _name, content) do
    {ip, _channel_port} = ch_id
    {:coap_content, _etag, _max_age, _format, payload} = content
    {path, port} = parse_payload(payload)

    response = case Storage.set(path, {ip, port}) do
      false -> "path taken"
      _ -> "ok"
    end
    {:ok, :content, coap_content(payload: response)}
  end

  # private

  defp parse_payload(payload) do
    [path, port] = String.split(payload, " ")
    port = String.to_integer(port)
    {path, port}
  end
end

It’s time to add a way to make requests to the registered servers or start an observation. Let’s start with a simple wrapper for the gen_coap :coap_client module with just get and put requests. It will return a Task.

# coap_directory/lib/coap_directory/client.ex
defmodule CoapDirectory.Client do
  import Coap.Records
  alias Coap.Storage

  def discover do
    Storage.all |> Enum.map(fn({path, _address}) -> path end)
  end

  def get(path) do
    request(path, :get, coap_content())
  end

  def put(path, payload) do
    request(path, :put, coap_content(payload: payload))
  end

  def request(path, method, content) do
    case Storage.get(path) do
      :not_found -> :not_found
      address -> async_request(address, path, method, content)
    end
  end

  defp async_request(address, path, method, content) do
    Task.async(fn -> :coap_client.request(method, coap_uri(address, path), content) end)
  end

  defp coap_uri(address, path) do
    {ip, port} = address
    'coap://' ++ :inet.ntoa(ip) ++ ':' ++ Integer.to_char_list(port) ++ '/' ++
    String.to_char_list(path)
  end
end

Now for something more interesting - an observer. We’ll use the gen_coap :coap_observer module which sends observed notifications as messages to the owner process. We also have to implement the handling of those messages. For now, we'll do it by relaying the payload of the notifications to the process with PID passed to the observer at the start of the observation. In order to be able to create many observations at once, they will be started using a supervisor with :simple_one_for_one strategy.

# coap_directory/lib/coap_directory/supervisor.ex
children = [
  worker(Coap.Storage, [[], []]),
  supervisor(CoapDirectory.ObserverSupervisor, [])
]

# coap_directory/lib/coap_directory/observer_supervisor.ex
defmodule CoapDirectory.ObserverSupervisor do
  use Supervisor

  @name CoapDirectory.ObserverSupervisor

  def start_link do
    Supervisor.start_link(__MODULE__, [], name: @name)
  end

  def start_observer(path, target_pid) do
    Supervisor.start_child(@name, [path, target_pid])
  end

  def init([]) do
    children = [
      worker(CoapDirectory.Observer, [], restart: :temporary)
    ]

    supervise(children, strategy: :simple_one_for_one)
  end
end

# coap_directory/lib/coap_directory/observer.ex
defmodule CoapDirectory.Observer do
  use GenServer
  alias Coap.Storage

  def start_link(path, target_pid) do
    GenServer.start_link(__MODULE__, [path, target_pid], [])
  end

  # GenServer handlers

  def init([path, target_pid]) do
    case Storage.get(path) do
      :not_found -> {:stop, :not_found}
      address ->
        :coap_observer.observe(coap_uri(address, path))
        {:ok, target_pid}
    end
  end

  def handle_info({:coap_notify, _pid, _n, _code, content}, target_pid) do
    {:coap_content, _etag, _max_age, _format, payload} = content
    send(target_pid, payload)
    {:noreply, target_pid}
  end

  defp coap_uri(address, path) do
    {ip, port} = address
    'coap://' ++ :inet.ntoa(ip) ++ ':' ++ Integer.to_char_list(port) ++ '/' ++
    String.to_char_list(path)
  end
end

Finally the last piece of the puzzle - we need to register the resource of CoapNode on its creation. We’ll hard-code the IP address of the registry endpoint for now.

# coap_node/mix.exs
def application do
  [
    applications: [:logger],
    mod: {CoapNode, port},
    env: [coap_port: port, registry_endpoint: 'coap://127.0.0.1:5683/registry']
  ]
end

# coap_node/lib/coap_node/resources/switch.ex
def start(path, params) do
  {:ok, port} = Application.fetch_env(:coap_server, :coap_port)
  {:ok, registry_endpoint} = Application.fetch_env(:coap_server, :registry_endpoint)
  {:ok, :content, {:coap_content, _etag, _max_age, _format, payload}} = :coap_client.request(
    :post, registry_endpoint,
    coap_content(payload: path_to_string(path) <> " " <> Integer.to_string(port))
  )
  case payload do
    "ok" ->
      Storage.set(path_to_string(path), false)
      super(path, params)
    "path taken" -> :path_taken
  end
end

And that’s it. Now we can see the whole setup in action. We pass self() to the observer creator and then flush() the notifications to see that the observation works.

~/coap_directory> iex -S mix
iex(1)>

~/coap_node> COAP_PORT=50000 iex -S mix
iex(1)> CoapNode.Resources.Switch.start(["switches", "1"], [])
:ok

~/coap_directory>
iex(1)> CoapDirectory.Client.get("switches/1") |> Task.await()
{:ok, :content, {:coap_content, :undefined, 60, :undefined, "switches/1 off"}}
iex(2)> CoapDirectory.Client.put("switches/1", "toggle") |> Task.await()
{:ok, :changed, {:coap_content, :undefined, 60, :undefined, ""}}
iex(3)> CoapDirectory.Client.get("switches/1") |> Task.await()
{:ok, :content, {:coap_content, :undefined, 60, :undefined, "switches/1 on"}}

iex(4)> CoapDirectory.ObserverSupervisor.start_observer("switches/1", self())
{:ok, #PID<0.288.0>}
iex(5)> CoapDirectory.Client.put("switches/1", "toggle") |> Task.await()
{:ok, :changed, {:coap_content, :undefined, 60, :undefined, ""}}
iex(6)> flush()
"switches/1 off"
:ok

What’s next

Next time we’ll try to implement an HTTP API for the CoapDirectory, translate HTTP requests to CoAP and route them to the correct CoapNodes. I’ve had fun creating this setup and even though it’s a bit clunky and the library I’ve used is not ready, this little project has taught me a lot about Elixir and Erlang. And on top of that I got to research a promising, new technology.

Repositories:

Part 2 is out. Check it out!

Looking for Elixir developers?

Our devs are so communicative and diligent you’ll feel they are your in-house team. Work with JavaScript experts who will push hard to understand your business and squeeze the most out of Elixir.

Talk to our team and confidently build your next big thing.

Michał Podwórny avatar
Michał Podwórny