In diesem Artikel hatte ich bereits ausführlich über das mqtt-Protokoll geschwärmt.

Einen kleinen Mangel (den man allerdings nicht wirklich "Mangel" nennen kann, da es auf Grund des Protokolles halt nun mal so ist) will ich im Folgenden beheben:
Per mqtt abgesetzte Nachrichten werden nicht in einer Datenbank á la mysql gespeichert. Als Fallbeispiel: Ich kann mir zwar den Infokanal eines Temperatursensors abonnieren und die aktuelle Temperatur sehen; wie die Temperatur allerdings vor einem Tag war, kann ich nicht mehr abrufen. Geschweige denn, ein Verlaufsdiagramm erstellen.

Also schauen wir, dass wir Werte bestimmter Topics in eine Datenbank (bei mir gewohntsbedingt immer noch bevorzugt mysql) bekommen.

Witzigerweise habe ich just, als ich diesen Artikel schreibe, und nachdem ich mir zuvor lange darüber Gedanken gemacht habe, wie ich die Speicherung bewerkstelligen will, hier auf github ein fertiges Skript gefunden. Mist.

Als Fall-Beispiel habe ich mir den latitude-Ersatz via Android/-iOS-Programm OwnTracks ausgesucht.

Datenbanken

Einmal eine Datenbank für eingehende Orts-Angaben und einmal eine Datenbank für eingehende Checkins (wenn Owntracks einen Ort á la "Daheim" erkennt).

Ich habe zu Debug-Zwecken noch ein Feld "payload" hinzugefügt, in dem der eigentliche payload, dessen Werte ich in lat/lang/acc/zeit speichere, gespeichert wird.

mqtt.latitude
SET SQL_MODE = "NO_AUTO_VALUE_ON_ZERO";
SET time_zone = "+00:00";
CREATE TABLE IF NOT EXISTS `latitude` (
  `nutzer` VARCHAR(15) NOT NULL,
  `lat` DECIMAL(10,7) NOT NULL,
  `lang` DECIMAL(10,7) NOT NULL,
  `acc` DECIMAL(5,1) NOT NULL,
  `payload` VARCHAR(255) DEFAULT NULL,
  `zeit` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

ALTER TABLE `latitude`
 ADD UNIQUE KEY `nutzer` (`nutzer`,`zeit`);

mqtt.checkin
SET SQL_MODE = "NO_AUTO_VALUE_ON_ZERO";
SET time_zone = "+00:00";

CREATE TABLE IF NOT EXISTS `checkin` (
  `nutzer` VARCHAR(15) NOT NULL,
  `ort` VARCHAR(35) DEFAULT NULL,
  `lat` DECIMAL(10,7) NOT NULL,
  `lang` DECIMAL(10,7) NOT NULL,
  `acc` DECIMAL(5,1) NOT NULL,
  `payload` VARCHAR(255) DEFAULT NULL,
  `zeit` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

ALTER TABLE `checkin`
 ADD UNIQUE KEY `nutzer` (`nutzer`,`zeit`);
 

Das Verarbeitungs-Skript

(DBI und JSON-Module müssen ggf. erst noch installiert werden)

mqtt2sql.pl
#!/usr/bin/perl

use DBI;
use strict;
use JSON;

$SIG{'INT'} = 'INT_handler';

# mySQL-Konfiguration
my $db          = DBI->connect("DBI:mysql:database=mqtt;host=127.0.0.1;port=3306", "apfelz_DB_User", "apfelz_DB_Passwort", {RaiseError => 1, AutoCommit =>1, mysql_auto_reconnect=>1});
my $insertLat   = $db->prepare_cached("INSERT IGNORE INTO latitude (nutzer, lat, lang, acc, zeit, payload) VALUES (?, ?, ?, ?, ?, ?)");
my $insertCheck = $db->prepare_cached("INSERT IGNORE INTO checkin (nutzer, ort, lat, lang, acc, zeit, payload) VALUES (?, ?, ?, ?, ?, ?, ?)");

# Stetig eingehende Meldungen des Topics /owntracks/# auslesen und um Folgenden verarbeiten
open(MQTT, "/usr/local/opt/mosquitto/bin/mosquitto_sub -u apfelz_MQTT_User -P apfelz_MQTT_Passwort -t '/owntracks/#' -v|") or die("Kann Topic nicht abonnieren.");

while (<MQTT>) {
        my ($topic, $payload) = split(/ /, $_, 2);
        if ($topic =~ m/^owntracks\/([a-zA-Z0-9]+)$/) {
                # owntracks payload
                my $data = decode_json($payload);
                my $user = $1;
                my ($s,$i,$h,$d,$m,$y,$wday,$yday,$isdst) = localtime($data->{tst});
                my $sqlTime = sprintf("%04s-%02s-%02s %02s:%02s:%02s.000", $y+1900, $m+1, $d, $h, $i, $s);
                if ($data->{_type} eq "location") {
                        # location data
                        $insertLat->execute($1, $data->{lat}, $data->{lon}, $data->{acc}, $sqlTime, $payload);
                } elsif ($data->{_type} eq "waypoint") {
                        # checkin/waypoint data
                        $insertCheck->execute($1, $data->{desc}, $data->{lat}, $data->{lon}, $data->{rad}, $sqlTime, $payload);
                        }
                }
        }

close(MQTT);

$insertLat->finish();
$insertCheck->finish();
$db->disconnect();

sub INT_handler {
        $insertLat->finish();
        $insertCheck->finish();
        $db->disconnect();
        print "Abbruch.\n";
        }
sub EXIT {
        $insertLat->finish();
        $insertCheck->finish();
        $db->disconnect();
        }

Als launchd-Job beim Systemstart starten lassen.

/Libraray/LaunchDaemons/net.apfelz.mqtt2sql.plist
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
        <key>KeepAlive</key>
        <true/>
        <key>Label</key>
        <string>net.apfelz.mqtt2sql</string>
        <key>ProgramArguments</key>
        <array>
                <string>/Library/apfelzScripts/mqtt2db.pl</string>
        </array>
        <key>RunAtLoad</key>
        <true/>
        <key>StandardErrorPath</key>
        <string>/Library/Logs/apfelzScripts/mqtt2sql.log</string>
        <key>StandardOutPath</key>
        <string>/Library/Logs/apfelzScripts/mqtt2sql.log</string>
</dict>
</plist>

launchctl load /Libraray/LaunchDaemons/net.apfelz.mqtt2sql.plist