Aquisição de dados via MQTT e armazenamento em banco de dados relacional com Python

A não ser pra geração de alertas, dados brutos não são muito úteis se não são armazenados em algum lugar, pra que possamos por exemplo ver uma série histórica de algum dado sensor.

Acontece que usando o MQTT, não armazenamos automaticamente os dados que captamos. O MQTT é um protocolo de publicação e recebimento de dados, e armazenamento é um conceito separado com o qual o protocolo não se preocupa, além do armazenamento temporário de mensagens para garantia de entrega, dependendo das configurações de QoL.

Se você quer armazenar os dados que são publicados num canal MQTT, então, é preciso criar um cliente que se inscreva nos tópicos relevantes e registre os dados a um banco relacional, por exemplo. É isso o que vamos fazer hoje!


A visão macro

O diagrama acima mostra o fluxo de informação no nosso sistema, desde o produtor dos dados (os dispositivos de Internet das Coisas) até o nosso usuário final. Pra esse experimento, vamos mostrar os dados numa página web; assim, democratizamos a entrega dos dados, que não vai estar presa a nenhuma plataforma ou sistema operacional específico.

A grande questão, e foco desse post, é a persistência dos dados produzidos. Se cortássemos a parte do cliente MQTT de armazenamento no diagrama acima e mandássemos os dados do broker direto pra um consumidor, não teríamos acesso aos dados passados. No máximo dos máximos guardaríamos os dados por algum tempo na página web ou no processo do servidor, mas aí perderíamos todos os dados assim que houvesse um refresh na página ou um reboot do servidor.

Os nossos dados então vão seguir o seguinte caminho:

  • Dispositivos IoT publicam informação em certos tópicos pro nosso broker MQTT
  • Nosso cliente de armazenamento, conectado ao broker e inscrito nos tópicos relevantes, recebe essa informação e só a guarda num banco de dados
  • Um processo consumidor de dados lê desse mesmo banco e redistribui as informações pro usuário final

Por sorte, tudo isso é bastante simples de fazer!


A implementação

Vamos simplificar algumas coisas pra poder rodar tudo numa máquina só. Ao invés de sensores externos, vamos ter um programinha em Python simulando um sensor de temperatura através de números aleatórios. A boa notícia é que como ele vai simplesmente publicar MQTT, trocar a simulação por um dispositivo real involve só fazer o dispositivo real publicar as mensagens no mesmo tópico.

Pra nossa receita vamos precisar dos seguintes ingredientes:

  • Python
  • Um cliente MQTT, pra receber e publicar mensagens: pip install paho-mqtt
  • Um ORM, pra interagirmos mais elegantemente com nosso banco de dados: pip install SQLAlchemy
  • Um servidor web, pra disponibilizarmos nossos dados: pip install flask

A versão final do nosso programa está disponível nesse repositório do meu GitHub!

O ORM é um Object Relational Mapper. Ele nos permite trocar isso:

'SELECT valor_temperatura.id, valor_temperatura.temperatura, valor_temperatura.criacao FROM valor_temperatura ORDER BY valor_temperatura.criacao DESC LIMIT 100'

Por isso:

sessao.query(ValorTemperatura).order_by(ValorTemperatura.criacao.desc()).limit(100).all()

Além das queries, ele também nos permite modelar com Python as tabelas que definimos, e transforma tipos quando necessário de forma transparente:

from sqlalchemy import Column, Float, DateTime, Integer ...
class ValorTemperatura(Base):
    __tablename__ = 'valor_temperatura'

    id = Column(Integer, primary_key=True)
    temperatura = Column(Float)
    criacao = Column(DateTime)

Fácil, né? Outra vantagem enorme de usar um ORM é que ele é agnóstico à implementação do banco de dados; quer dizer, podemos começar usando SQLite, como estamos fazendo, pela facilidade de ele já vir instalado com Python e não precisar de um processo separado, mas se quisermos escalar as operações, podemos trocar pra uma mega-instância de PostgreSQL na RDS da Amazon trocando só uma linha.

No nosso projeto exemplo, definimos a modelagem do banco em modelagem_banco.py e usamos esses modelos e a função de criar sessões do SQLAlchemy nos outros arquivos.

Simulação de sensor

Nossa simulação de um microcontrolador/sensor que transmite dados de temperatura é bem simples; começando com uma temperatura de 25 °C, variamos ela aleatoriamente com um valor entre -1 e 1 grau a cada 5 segundos, só pra fazer de conta que temos dados sendo gerados.

Como dito anteriormente, tudo o que a simulação aqui faz é publicar num tópico MQTT; pra trocar pra um sensor de verdade, basta fazer esse sensor publicar no mesmo tópico.

Acho que vale dizer aqui que a parte de simulação do sensor (e numa aplicação real, o código do sensor também) não sabe de nada do banco de dados! Essa é uma parte que está mais abaixo no fluxo de informações. Nossos dispositivos só têm que se importar com a geração e transmissão do dado.

Armazenamento de dados

No arquivo cliente_armazenamento.py é que acontece a mágica de verdade. Bom, talvez não mágica; na verdade é tudo bem simples. A única coisa que distoa um pouco da utilização comum do MQTT é o passo de armazenamento que fazemos ao receber uma mensagem:

def on_message(client, userdata, msg):
    # Se o tópico for o nosso tópico de temperatura...
    if msg.topic == 'valores/temperatura':

        valor_temperatura = float(msg.payload)
        print(f'Recebido valor de temperatura {valor_temperatura}')

        # Criamos um objeto ValorTemperatura
        novo_valor = ValorTemperatura(
            temperatura=valor_temperatura,
            criacao=datetime.now())

        # Criamos uma sessão de conexão com o banco de dados, inserimos o novo objeto,
        # fazemos o commit e fechamos a sessão
        sessao = criar_sessao()
        sessao.add(novo_valor)
        sessao.commit()
        sessao.close()
    else:
        # Nesse caso, o tópico não é o de temperatura; só printamos a mensagem
        print(f'[{msg.topic}] {msg.payload}')

A inserção dos dados no banco de dados criado no arquivo acima se dá ao criar um objeto ValorTemperatura (lá da modelagem do banco) com os valores que recebemos e o tempo atual, adicioná-lo na sessão de conexão com o banco, e dar um commit e close pra confirmar a mudança e fechar a sessão.

Esses dados agora podem ser consumidos por qualquer aplicação ou programa que interaja com o banco de dados que você escolheu (nesse caso, um arquivo SQLite local).

Consumo dos dados

Pra mostrar o consumo desses dados depois de armazenados, temos uma pequena aplicação em Flask, que abre um servidor local e disponibiliza numa página da Web um gráfico com as últimas informações coletadas:

Pra isso usamos dois arquivos no repositório: consumir_dados_armazenados.py, que contém as rotas Flask (funções que são chamadas quando acessamos uma dada URL e retornam dados ou HTML) e templates/template.html, que tem nosso HTML que vai gerar o visual e gráfico no navegador.

Vamos dar uma olhada na nossa rota principal:


@app.route('/')
def mostrar_dados():
    sessao = criar_sessao()
    # Aqui pegamos os dados que queremos mostrar na tela.
    # O que estamos fazendo é pedir os últimos 100 objetos
    # ValorTemperatura do nosso banco de dados.
    dados = sessao.query(ValorTemperatura).order_by(ValorTemperatura.criacao.desc()).limit(100).all()
    sessao.close()

    dados = list(reversed(dados))

    # Dividimos os dados de temperatura e criação, pra facilitar um pouco no lado do JavaScript
    dados_temperatura = []
    for dado in dados:
        dados_temperatura.append(dado.temperatura)

    dados_criacao = []
    for dado in dados:
        # Aqui temos um objeto datetime, mas o Python não transforma isso em JSON automaticamente,
        # porque não é um formato padrão JSON. Ao invés disso, vamos transformar o valor de data de
        # criação em um timestamp.
        dados_criacao.append(dado.criacao.timestamp())

    # Retornamos o HTML já com os dados que queremos usar pro gráfico renderizados.
    return render_template('template.html',
                           dados_temperatura=json.dumps(dados_temperatura),
                           dados_criacao=json.dumps(dados_criacao))

O código é praticamente auto-explicativo. Fazemos uma query pra buscar até 100 dos ValorTemperatura mais recentes, dividimos os timestamps e dados de temperatura, e renderizamos isso no template.html.

A renderização em HTML nada mais é do que a substituição de trechos dos nossos templates. No HTML, onde tivermos {{ dados_temperatura }}, o renderizador trocará isso pelos dados que passamos (uma string json com a lista de dados de temperatura), e o mesmo acontece com {{ dados_criacao. Quer dizer, isso

    let dados_temperatura = {{ dados_temperatura }};
    let dados_criacao = {{ dados_criacao }};

Vira isso:

let dados_temperatura = [31.2120257628043, 31.403419759646354, 32.19124708941314, ...
let dados_criacao = [1556414297.785298, 1556414302.786275, 1556414307.786743, ...

Em seguida, usamos a ótima biblioteca Chart.js pra montar um gráfico de linha que aparece na tela.

Pro último passo de polimento, vamos fazer com que o gráfico se atualize automaticamente de tempos em tempos. Como estamos gerando dados a cada 5 segundos, então faz sentido também que atualizamos o gráfico a cada 5 segundos.

Na parte do front-end, com JavaScript, isso se dá chamando junto com a criação do gráfico a função setInterval, que agenda uma função pra ser chamada periodicamente:

setInterval(atualizar_dados, 5000);

Então, criamos a função em si que vai ser chamada:

    function atualizar_dados() {
        fetch('/update')
            .then((resposta) => resposta.json())
            .then((dados) => {

                // Atualizamos o texto que mostra "Temperatura atual: X °C" no topo da página
                document.getElementById('temperatura_atual').innerText = dados['temperatura'].toFixed(2);

                // Inserimos o novo ponto no nosso gráfico
                grafico.data.labels.push(new Date(dados['criacao'] * 1000));
                grafico.data.datasets[0].data.push(dados['temperatura']);

                // Se houver mais de 100 pontos de dados no gráfico, removemos o menos recente, pra
                // limitar o número de pontos na tela.
                if (grafico.data.labels.length > 100) {
                    grafico.data.labels.shift();
                    grafico.data.datasets[0].data.shift();
                }

                // Chamamos a função que atualiza o visual do gráfico
                grafico.update();
            })
    }

Atualizamos o texto na página, e com o Chart.js é tranquilo atualizar: só dar um push (equivalente ao append do Python, concatenar em array/lista) dos dados de label (eixo X, instante de criação do dado; note que novamente transformamos o timestamp de milissegundos pra segundos) e data (eixo Y, valor de temperatura).

Também evitamos que os pontos fiquem muito juntos quando damos um shift (eliminar o primeiro elemento) se o número de pontos no gráfico for maior que 100. Assim, limitamos o intervalo mostrado.

Os observadores terão notado que isso tudo decorre a partir de uma chamada fetch à rota /update, que ainda não implementamos no nosso back-end! Por sorte, isso é bem, bem simples:


@app.route('/update')
def update():
    sessao = criar_sessao()
    # Pegamos o ValorTemperatura mais recente pelo campo criacao
    ultima_atualizacao = sessao.query(ValorTemperatura).order_by(ValorTemperatura.criacao.desc()).first()
    sessao.close()

    dados_ultima_atualizacao = {'temperatura': ultima_atualizacao.temperatura,
                                'criacao': ultima_atualizacao.criacao.timestamp()}

    # Retornamos os dados em formato JSON
    return jsonify(dados_ultima_atualizacao)

Basta uma query parecida com a anterior, mas que só retorna um resultado, o mais recente. Retornamos um JSON com o formato esperado pela função, e voilà!

Lembrando - o código completo, com algumas partes que não foram mencionadas, está nesse repositório aqui. Por hoje é só!

Caveats

Pequenos detalhes de implementação que merecem menção e sugestões de melhorias futuras:

  • No programa de armazenamento dos dados, criamos uma nova sessão a cada mensagem recebida. Isso funciona bem e é simples, mas pode ser um pouco problemático pra um número muito grande de mensagens ao mesmo tempo. A alternativa seria, então, criar uma sessão só e reutilizá-la entre mensagens. Isso introduz, porém, a complexidade adicional de ter certeza de que a sessão foi fechada depois que o programa é encerrado.
  • O gráfico sempre puxa dados e os insere no dataset, mesmo que os dados sejam antigos; uma implementação um pouco mais inteligente detectaria isso (através do próprio valor do timestamp que chega) e não permitiria que isso acontecesse.
  • O gráfico só mostra 100 pontos com espaçamento de 5 segundos cada; assim, temos uma visão de no máximo os últimos 500 segundos, ~8 minutos. Um aprimoramento interesante é permitir a visualização de dados passados e/ou prover a opção de ver uma janela de tempo maior.
Show Comments

Get the latest posts delivered right to your inbox.