Changed from using MQTT to using REST instead.

This commit is contained in:
Ola Andersson 2016-02-04 10:47:08 +00:00
parent c3c8cc82a4
commit 3acc909195
6 changed files with 183 additions and 212 deletions

View File

@ -1,5 +1,5 @@
CFLAGS?=-O2 -g -Wall -W $(shell pkg-config --cflags librtlsdr)
LDLIBS+=$(shell pkg-config --libs librtlsdr) -lpthread -lm -lpaho-mqtt3c
CFLAGS?=-O2 -g -Wall -W $(shell pkg-config --cflags librtlsdr json-c)
LDLIBS+=$(shell pkg-config --libs librtlsdr json-c ) -lpthread -lm -lcurl
CC?=gcc
PROGNAME=dump1090
@ -8,12 +8,12 @@ all: dump1090
%.o: %.c
$(CC) $(CFLAGS) -c $<
dump1090: dump1090.o anet.o mqtt.o
$(CC) -g -o dump1090 dump1090.o anet.o mqtt.o $(LDFLAGS) $(LDLIBS)
dump1090: dump1090.o anet.o rest.o
$(CC) -g -o dump1090 dump1090.o anet.o rest.o $(LDFLAGS) $(LDLIBS)
clean:
rm -f *.o dump1090
release:
$(CC) -o dump1090 dump1090.o anet.o mqtt.o $(LDFLAGS) $(LDLIBS)
$(CC) -o dump1090 dump1090.o anet.o rest.o $(LDFLAGS) $(LDLIBS)

View File

@ -45,7 +45,7 @@
#include <sys/select.h>
#include "rtl-sdr.h"
#include "anet.h"
#include "mqtt.h"
#include "rest.h"
#define MODES_DEFAULT_RATE 2000000
#define MODES_DEFAULT_FREQ 1090000000
@ -170,10 +170,8 @@ struct {
int metric; /* Use metric units. */
int quiet; /* Quiet mode, not printing any human readable messages */
/* Mqtt configuration */
char* mqtt_uri;
char* mqtt_username;
char* mqtt_password;
/* REST configuration */
char* rest_uri;
/* Interactive mode */
struct aircraft *aircrafts;
@ -1564,7 +1562,7 @@ void useModesMessage(struct modesMessage *mm) {
if (!Modes.raw && !Modes.onlyaddr) printf("\n");
}
/* Send data to connected clients. */
if (Modes.net || Modes.mqtt_uri) {
if (Modes.net || Modes.rest_uri) {
modesSendRawOutput(mm); /* Feed raw output clients. */
}
}
@ -2036,8 +2034,8 @@ void modesSendRawOutput(struct modesMessage *mm) {
}
*p++ = ';';
*p++ = '\n';
if (Modes.mqtt_uri) {
addRawMessageToMq(msg, p-msg); // TODO: Check if MQ really should be used, perhaps as a compile flag for both include and sending.
if (Modes.rest_uri) {
addRawMessageToRestQueue(msg, p-msg);
}
if (Modes.net) {
modesSendAllClients(Modes.ros, msg, p-msg);
@ -2576,12 +2574,8 @@ int main(int argc, char **argv) {
exit(0);
} else if (!strcmp(argv[j],"--quiet")) {
Modes.quiet = 1;
} else if (!strcmp(argv[j],"--mqtt-uri") && more) {
Modes.mqtt_uri = argv[++j];
} else if (!strcmp(argv[j],"--mqtt-username") && more) {
Modes.mqtt_username = argv[++j];
} else if (!strcmp(argv[j],"--mqtt-password") && more) {
Modes.mqtt_password = argv[++j];
} else if (!strcmp(argv[j],"--rest-uri") && more) {
Modes.rest_uri = argv[++j];
} else if (!strcmp(argv[j],"--help")) {
showHelp();
exit(0);
@ -2599,7 +2593,7 @@ int main(int argc, char **argv) {
/* Initialization */
modesInit();
initMqConnection(Modes.mqtt_uri, Modes.mqtt_username, Modes.mqtt_password);
initRestConnection(Modes.rest_uri);
if (Modes.net_only) {
fprintf(stderr,"Net-only mode, no RTL device or file open.\n");
} else if (Modes.filename == NULL) {
@ -2665,7 +2659,6 @@ int main(int argc, char **argv) {
}
rtlsdr_close(Modes.dev);
shutdownMqConnection();
return 0;
}

175
mqtt.c
View File

@ -1,175 +0,0 @@
/*
* Mqtt.c
*
* Created on: Apr 18, 2015
* Author: borax
*/
#include <stdio.h>
#include <string.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include "mqtt.h"
#include "MQTTClient.h"
#define CLIENTID "Dump1090"
#define TOPIC "adsb/data/raw"
#define QOS 0
#define TIMEOUT 10000L
struct {
// Threading
pthread_t sender_thread;
pthread_mutex_t thread_lock;
// Internal message list
struct queue_message *first_message;
struct queue_message *last_message;
// MQTT connection options
char* uri;
char* username;
char* password;
MQTTClient client;
MQTTClient_deliveryToken token;
} Mqtt;
struct queue_message {
char *message;
int length;
struct queue_message *next;
};
struct queue_message *popFirstMessageInQueue();
void sendMessagesToMq();
void sendMessageToMq(struct queue_message *msg);
/* Initialize things */
int initMqConnection(char* uri, char* username, char* password) {
printf("MQ connection settings: %s %s %s\n", uri, username, password);
Mqtt.uri = uri;
Mqtt.username = username;
Mqtt.password = password;
Mqtt.first_message = NULL;
pthread_mutex_init(&Mqtt.thread_lock,NULL);
pthread_create(&Mqtt.sender_thread, NULL, sendMessagesToMq, NULL);
return 0;
}
void shutdownMqConnection() {
printf("Closing MQ connection\n");
}
/* Queue functionality */
void addRawMessageToMq(char *data, int length) {
if(!Mqtt.uri) {
return;
}
struct queue_message *curr = malloc(sizeof(struct queue_message));
struct timeval tv;
gettimeofday(&tv, NULL);
unsigned long long millisecondsSinceEpoch =
(unsigned long long)(tv.tv_sec) * 1000 +
(unsigned long long)(tv.tv_usec) / 1000;
data[length-1] = '\0';
char *buf = malloc(100);
sprintf(buf, "{ \"timeSinceEpochUTC\":%llu, \"message\":\"%s\" }", millisecondsSinceEpoch, data);
curr->message = buf;
curr->length = strlen(curr->message);
addMessageToQueue(curr);
}
void addMessageToQueue(struct queue_message *msg) {
pthread_mutex_lock(&Mqtt.thread_lock);
struct queue_message *tail = Mqtt.last_message;
if(tail) {
tail->next = msg;
} else {
Mqtt.first_message = msg;
}
Mqtt.last_message = msg;
pthread_mutex_unlock(&Mqtt.thread_lock);
}
struct queue_message *popFirstMessageInQueue() {
pthread_mutex_lock(&Mqtt.thread_lock);
struct queue_message *msg = Mqtt.first_message;
if(Mqtt.first_message == Mqtt.last_message) {
Mqtt.first_message = 0;
Mqtt.last_message = 0;
} else {
Mqtt.first_message = Mqtt.first_message->next;
}
pthread_mutex_unlock(&Mqtt.thread_lock);
return msg;
}
/* Mqtt */
void sendMessagesToMq() {
while(1) {
if(!Mqtt.client) {
initiateConnection();
}
if (Mqtt.first_message && Mqtt.client) {
struct queue_message *msg = popFirstMessageInQueue();
if(msg) {
sendMessageToMq(msg);
free(msg->message);
free(msg);
continue;
}
}
usleep(250);
}
}
void initiateConnection() {
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
int rc;
MQTTClient_create(&Mqtt.client, Mqtt.uri, CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
if(Mqtt.username) {
conn_opts.username = Mqtt.username;
conn_opts.password = Mqtt.password;
}
if ((rc = MQTTClient_connect(Mqtt.client, &conn_opts)) != MQTTCLIENT_SUCCESS)
{
//printf("Failed to connect, return code %d\n", rc);
}
}
void destroyConnection() {
MQTTClient_disconnect(Mqtt.client, 10000);
MQTTClient_destroy(&Mqtt.client);
}
void sendMessageToMq(struct queue_message *msg) {
MQTTClient_message pubmsg = MQTTClient_message_initializer;
pubmsg.payload = msg->message;
pubmsg.payloadlen = msg->length;
pubmsg.qos = QOS;
pubmsg.retained = 0;
if(MQTTClient_publishMessage(Mqtt.client, TOPIC, &pubmsg, &Mqtt.token) != MQTTCLIENT_SUCCESS) {
destroyConnection();
return;
}
if(MQTTClient_waitForCompletion(Mqtt.client, Mqtt.token, TIMEOUT) != MQTTCLIENT_SUCCESS) {
destroyConnection();
return;
}
}

16
mqtt.h
View File

@ -1,16 +0,0 @@
/*
* Mqtt.h
*
* Created on: Apr 18, 2015
* Author: borax
*/
#ifndef MQTT_H_
#define MQTT_H_
int initMqConnection(char* uri, char* username, char* password);
void shutdownMqConnection();
void addRawMessageToMq(char *data, int length);
#endif /* MQTT_H_ */

154
rest.c Normal file
View File

@ -0,0 +1,154 @@
/*
* rest.c
*
* Created on: Apr 18, 2015
* Author: borax
*/
#include <stdio.h>
#include <string.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <json-c/json.h>
#include <curl/curl.h>
#include "rest.h"
#define TIMEOUT 10000L
struct {
// Threading
pthread_t sender_thread;
pthread_mutex_t thread_lock;
// Internal message list
struct queue_message *first_message;
struct queue_message *last_message;
char* uri;
CURL *curl;
} Rest;
struct queue_message {
json_object *message;
struct queue_message *next;
};
struct queue_message *popFirstMessageInQueue();
void sendMessagesToRestInterface();
void sendMessage(json_object *json);
/* Initialize things */
int initRestConnection(char* uri) {
printf("Rest connection settings: %s\n", uri);
Rest.uri = uri;
Rest.first_message = NULL;
pthread_mutex_init(&Rest.thread_lock,NULL);
pthread_create(&Rest.sender_thread, NULL, sendMessagesToRestInterface, NULL);
Rest.curl = curl_easy_init();
if(Rest.curl) {
struct curl_slist *headers = NULL;
headers = curl_slist_append(headers, "Accept: application/json");
headers = curl_slist_append(headers, "Content-Type: application/json");
curl_easy_setopt(Rest.curl, CURLOPT_CUSTOMREQUEST, "POST");
curl_easy_setopt(Rest.curl, CURLOPT_HTTPHEADER, headers);
curl_easy_setopt(Rest.curl, CURLOPT_URL, uri);
// curl_easy_setopt(Rest.curl, CURLOPT_VERBOSE, 1L);
}
return 0;
}
/* Queue functionality */
void addRawMessageToRestQueue(char *data, int length) {
if(!Rest.uri) {
return;
}
struct queue_message *curr = malloc(sizeof(struct queue_message));
struct timeval tv;
gettimeofday(&tv, NULL);
unsigned long long millisecondsSinceEpoch =
(unsigned long long)(tv.tv_sec) * 1000 +
(unsigned long long)(tv.tv_usec) / 1000;
data[length-1] = '\0';
json_object *json = json_object_new_object();
json_object *adsb_data = json_object_new_string(data);
json_object *timestamp = json_object_new_int64(millisecondsSinceEpoch);
json_object_object_add(json, "message", adsb_data);
json_object_object_add(json, "timestamp", timestamp);
curr->message = json;
addMessageToQueue(curr);
}
void addMessageToQueue(struct queue_message *msg) {
pthread_mutex_lock(&Rest.thread_lock);
struct queue_message *tail = Rest.last_message;
if(tail) {
tail->next = msg;
} else {
Rest.first_message = msg;
}
Rest.last_message = msg;
pthread_mutex_unlock(&Rest.thread_lock);
}
struct queue_message *popFirstMessageInQueue() {
pthread_mutex_lock(&Rest.thread_lock);
struct queue_message *msg = Rest.first_message;
if(Rest.first_message == Rest.last_message) {
Rest.first_message = 0;
Rest.last_message = 0;
} else {
Rest.first_message = Rest.first_message->next;
}
pthread_mutex_unlock(&Rest.thread_lock);
return msg;
}
/* REST */
void sendMessagesToRestInterface() {
json_object *json = json_object_new_object();
int message_counter = 1;
int wait_counter = 0;
while(1) {
// Send batch in case we either have 100 messages in queue or we have waited 5s and have messages to send
if(message_counter > 100 || (message_counter > 1 && wait_counter > 100)) {
sendMessage(json);
message_counter = 1;
wait_counter = 0;
json_object_put(json);
json = json_object_new_object();
}
else if (Rest.first_message) {
struct queue_message *msg = popFirstMessageInQueue();
if(msg) {
char buf[9];
sprintf(buf, "entry-%d", message_counter);
json_object_object_add(json, buf, msg->message);
message_counter++;
free(msg);
continue;
}
} else {
// Sleep for 50ms
usleep(50000);
wait_counter++;
}
}
}
void sendMessage(json_object *json) {
curl_easy_setopt(Rest.curl, CURLOPT_POSTFIELDS, json_object_to_json_string(json));
curl_easy_perform(Rest.curl);
// printf("\nSending message %s\n", json_object_to_json_string(json));
}

15
rest.h Normal file
View File

@ -0,0 +1,15 @@
/*
* Mqtt.h
*
* Created on: Apr 18, 2015
* Author: borax
*/
#ifndef REST_H_
#define REST_H_
int initRestConnection(char* uri);
void addRawMessageToRestQueue(char *data, int length);
#endif /* REST_H_ */