Ability to receive traffic from network.

This commit is contained in:
antirez 2013-01-12 11:46:32 +01:00
parent b5ff48e5f5
commit 0ce746dc5f
2 changed files with 192 additions and 21 deletions

33
README
View File

@ -101,8 +101,11 @@ Network server features
--- ---
By enabling the networking support with --net Dump1090 starts listening By enabling the networking support with --net Dump1090 starts listening
for clients connections on port 30002 (you can change the port using for clients connections on port 30002 and 30001 (you can change both the
the --net-ro-port option). ports if you want, see --help output).
Port 30002
---
Connected clients are served with data ASAP as they arrive from the device Connected clients are served with data ASAP as they arrive from the device
(or from file if --ifile is used) in the raw format similar to the following: (or from file if --ifile is used) in the raw format similar to the following:
@ -111,6 +114,32 @@ Connected clients are served with data ASAP as they arrive from the device
Every entry is separated by a simple newline (LF character, hex 0x0A). Every entry is separated by a simple newline (LF character, hex 0x0A).
Port 30001
---
Port 30001 is the raw input port, and can be used to feed Dump1090 with
data in the same format as specified above, with hex messages starting with
a '*' and ending with a ';' character.
So for instance if there is another remote Dump1090 instance collecting data
it is possible to sum the output to a local Dump1090 instance doing something
like this:
nc remote-dump1090.example.net 30002 | nc localhost 30001
It is important to note that what is received via port 30001 is also
broadcasted to clients listening to port 30002.
In general everything received from port 30001 is handled exactly like the
normal traffic from RTL devices or from file when --ifile is used.
It is possible to use Dump1090 just as an hub using --ifile with /dev/zero
as argument as in the following example:
./dump1090 --ifile /dev/zero --net --interactive
Then you can feed it from different data sources from the internet.
Antenna Antenna
--- ---

View File

@ -80,6 +80,8 @@
#define MODES_NET_MAX_FD 1024 #define MODES_NET_MAX_FD 1024
#define MODES_NET_OUTPUT_RAW_PORT 30002 #define MODES_NET_OUTPUT_RAW_PORT 30002
#define MODES_NET_INPUT_RAW_PORT 30001
#define MODES_CLIENT_BUF_SIZE 256
#define MODES_NOTUSED(V) ((void) V) #define MODES_NOTUSED(V) ((void) V)
@ -87,6 +89,8 @@
struct client { struct client {
int fd; /* File descriptor. */ int fd; /* File descriptor. */
int service; /* TCP port the client is connected to. */ int service; /* TCP port the client is connected to. */
char buf[MODES_CLIENT_BUF_SIZE]; /* Read buffer. */
int buflen; /* Amount of data on buffer. */
}; };
/* Structure used to describe an aircraft in iteractive mode. */ /* Structure used to describe an aircraft in iteractive mode. */
@ -137,6 +141,7 @@ struct {
struct client *clients[MODES_NET_MAX_FD]; /* Our clients. */ struct client *clients[MODES_NET_MAX_FD]; /* Our clients. */
int maxfd; /* Greatest fd currently active. */ int maxfd; /* Greatest fd currently active. */
int ros; /* Raw output listening socket. */ int ros; /* Raw output listening socket. */
int ris; /* Raw input listening socket. */
/* Configuration */ /* Configuration */
char *filename; /* Input form file, --ifile option. */ char *filename; /* Input form file, --ifile option. */
@ -146,6 +151,7 @@ struct {
int debug; /* Debugging mode. */ int debug; /* Debugging mode. */
int net; /* Enable networking. */ int net; /* Enable networking. */
int net_output_raw_port; /* Raw output TCP port. */ int net_output_raw_port; /* Raw output TCP port. */
int net_input_raw_port; /* Raw input TCP port. */
int interactive; /* Interactive mode */ int interactive; /* Interactive mode */
int interactive_rows; /* Interactive mode: max number of rows. */ int interactive_rows; /* Interactive mode: max number of rows. */
int interactive_ttl; /* Interactive mode: TTL before deletion. */ int interactive_ttl; /* Interactive mode: TTL before deletion. */
@ -212,6 +218,7 @@ struct modesMessage {
void interactiveShowData(void); void interactiveShowData(void);
void interactiveReceiveData(struct modesMessage *mm); void interactiveReceiveData(struct modesMessage *mm);
void modesSendRawOutput(struct modesMessage *mm); void modesSendRawOutput(struct modesMessage *mm);
void useModesMessage(struct modesMessage *mm);
/* ============================= Utility functions ========================== */ /* ============================= Utility functions ========================== */
@ -238,6 +245,7 @@ void modesInitConfig(void) {
Modes.raw = 0; Modes.raw = 0;
Modes.net = 0; Modes.net = 0;
Modes.net_output_raw_port = MODES_NET_OUTPUT_RAW_PORT; Modes.net_output_raw_port = MODES_NET_OUTPUT_RAW_PORT;
Modes.net_input_raw_port = MODES_NET_INPUT_RAW_PORT;
Modes.onlyaddr = 0; Modes.onlyaddr = 0;
Modes.debug = 0; Modes.debug = 0;
Modes.interactive = 0; Modes.interactive = 0;
@ -1227,18 +1235,7 @@ void detectModeS(uint16_t *m, uint32_t mlen) {
dumpRawMessage("Decoded with good CRC", msg, m, j); dumpRawMessage("Decoded with good CRC", msg, m, j);
/* Pass data to the next layer */ /* Pass data to the next layer */
if (!Modes.stats && (Modes.check_crc == 0 || mm.crcok)) { useModesMessage(&mm);
if (Modes.interactive) {
interactiveReceiveData(&mm);
} else {
displayModesMessage(&mm);
if (!Modes.raw && !Modes.onlyaddr) printf("\n");
}
/* Send data to connected clients. */
if (Modes.net) {
modesSendRawOutput(&mm); /* Feed raw output clients. */
}
}
/* Skip this message if we are sure it's fine. */ /* Skip this message if we are sure it's fine. */
if (mm.crcok) j += (MODES_PREAMBLE_US+(msglen*8))*2; if (mm.crcok) j += (MODES_PREAMBLE_US+(msglen*8))*2;
@ -1251,6 +1248,28 @@ void detectModeS(uint16_t *m, uint32_t mlen) {
} }
} }
/* When a new message is available, because it was decoded from the
* RTL device, file, or received in the TCP input port, or any other
* way we can receive a decoded message, we call this function in order
* to use the message.
*
* Basically this function passes a raw message to the upper layers for
* further processing and visualization. */
void useModesMessage(struct modesMessage *mm) {
if (!Modes.stats && (Modes.check_crc == 0 || mm->crcok)) {
if (Modes.interactive) {
interactiveReceiveData(mm);
} else {
displayModesMessage(mm);
if (!Modes.raw && !Modes.onlyaddr) printf("\n");
}
/* Send data to connected clients. */
if (Modes.net) {
modesSendRawOutput(mm); /* Feed raw output clients. */
}
}
}
/* ========================= Interactive mode =============================== */ /* ========================= Interactive mode =============================== */
/* Return a new aircraft structure for the interactive mode linked list /* Return a new aircraft structure for the interactive mode linked list
@ -1575,13 +1594,25 @@ void snipMode(int level) {
void modesInitNet(void) { void modesInitNet(void) {
memset(Modes.clients,0,sizeof(Modes.clients)); memset(Modes.clients,0,sizeof(Modes.clients));
Modes.maxfd = -1; Modes.maxfd = -1;
/* Raw output port */
Modes.ros = anetTcpServer(Modes.aneterr, Modes.net_output_raw_port, NULL); Modes.ros = anetTcpServer(Modes.aneterr, Modes.net_output_raw_port, NULL);
anetNonBlock(Modes.aneterr, Modes.ros);
if (Modes.ros == -1) { if (Modes.ros == -1) {
fprintf(stderr, "Error opening TCP port %d: %s\n", fprintf(stderr, "Error opening raw TCP output port %d: %s\n",
Modes.net_output_raw_port, Modes.aneterr); Modes.net_output_raw_port, Modes.aneterr);
exit(1); exit(1);
} }
/* Raw input port */
Modes.ris = anetTcpServer(Modes.aneterr, Modes.net_input_raw_port, NULL);
if (Modes.ris == -1) {
fprintf(stderr, "Error opening raw TCP input port %d: %s\n",
Modes.net_input_raw_port, Modes.aneterr);
exit(1);
}
anetNonBlock(Modes.aneterr, Modes.ros);
anetNonBlock(Modes.aneterr, Modes.ris);
signal(SIGPIPE, SIG_IGN); signal(SIGPIPE, SIG_IGN);
} }
@ -1590,22 +1621,30 @@ void modesInitNet(void) {
* second. */ * second. */
void modesAcceptClients(void) { void modesAcceptClients(void) {
int fd, port; int fd, port;
unsigned int j;
struct client *c; struct client *c;
int services[2];
services[0] = Modes.ros;
services[1] = Modes.ris;
for (j = 0; j < sizeof(services)/sizeof(int); j++) {
fd = anetTcpAccept(Modes.aneterr, services[j], NULL, &port);
if (fd == -1) continue;
while(1) {
fd = anetTcpAccept(Modes.aneterr, Modes.ros, NULL, &port);
if (fd == -1) return;
if (fd >= MODES_NET_MAX_FD) { if (fd >= MODES_NET_MAX_FD) {
close(fd); close(fd);
return; /* Max number of clients reached. */ return; /* Max number of clients reached. */
} }
c = malloc(sizeof(*c)); c = malloc(sizeof(*c));
c->service = Modes.ros; c->service = services[j];
c->fd = fd; c->fd = fd;
c->buflen = 0;
Modes.clients[fd] = c; Modes.clients[fd] = c;
if (Modes.maxfd < fd) Modes.maxfd = fd; if (Modes.maxfd < fd) Modes.maxfd = fd;
j--; /* Try again with the same listening port. */
} }
} }
@ -1658,6 +1697,103 @@ void modesSendRawOutput(struct modesMessage *mm) {
modesSendAllClients(Modes.ros, msg, p-msg); modesSendAllClients(Modes.ros, msg, p-msg);
} }
/* Turn an hex digit into its 4 bit decimal value.
* Returns -1 if the digit is not in the 0-F range. */
int hexDigitVal(int c) {
c = tolower(c);
if (c >= '0' && c <= '9') return c-'0';
else if (c >= 'a' && c <= 'f') return c-'a'+10;
else return -1;
}
/* This function decodes a string representing a Mode S message in
* raw hex format like: *8D4B969699155600E87406F5B69F;
*
* The message is passed to the higher level layers, so it feeds
* the selected screen output, the network output and so forth.
*
* If the message looks invalid is silently discarded. */
void decodeHexMessage(char *hex) {
int l = strlen(hex), j;
unsigned char msg[MODES_LONG_MSG_BYTES];
struct modesMessage mm;
if (l < 2 || hex[0] != '*' || hex[l-1] != ';') return;
hex++; l-=2; /* Skip * and ; */
if (l > MODES_LONG_MSG_BYTES*2) return; /* Too long message... broken. */
for (j = 0; j < l; j += 2) {
int high = hexDigitVal(hex[j]);
int low = hexDigitVal(hex[j+1]);
if (high == -1 || low == -1) return;
msg[j/2] = (high<<4) | low;
}
decodeModesMessage(&mm,msg);
useModesMessage(&mm);
}
/* This function polls all the clients using read() in order to receive new
* messages from the net.
*
* Every full message received is decoded and passed to the higher layers. */
void modesReceiveRawInput(void) {
int j;
struct client *c;
for (j = 0; j <= Modes.maxfd; j++) {
c = Modes.clients[j];
if (c && c->service == Modes.ris) {
while(1) {
int left = sizeof(c->buf) - c->buflen;
int nread = read(j, c->buf+c->buflen, left);
int decoded = 0;
int oldpos = c->buflen;
int i;
if (nread < 0) {
if (nread == 0 || errno != EAGAIN) {
/* Error, or end of file. */
modesFreeClient(j);
}
break; /* Serve next client. */
}
c->buflen += nread;
/* If there is a complete message there must be a newline
* in the buffer. The iteration starts from 'oldpos' as
* we need to check only the chars we read in this interaction
* as we are sure there is no newline in the pre-existing
* buffer. */
for (i = oldpos; i < c->buflen; i++) {
if (c->buf[i] == '\n') {
c->buf[i] = '\0';
if (i && c->buf[i-1] == '\r') c->buf[i-1] = '\0';
decodeHexMessage(c->buf);
/* Move what's left at the start of the buffer. */
i++;
memmove(c->buf,c->buf+i,c->buflen-i);
c->buflen -= i;
/* Maybe there are more messages inside the buffer.
* Start looping from the start again. */
i = -1;
decoded = 1;
}
}
/* If our buffer is full discard it, this is some badly
* formatted shit. */
if (c->buflen == sizeof(c->buf)) {
c->buflen = 0;
/* If there is garbage, read more to discard it ASAP. */
continue;
}
/* If no message was decoded process the next client, otherwise
* read more data from the same client. */
if (!decoded) break;
}
}
}
}
/* ================================ Main ==================================== */ /* ================================ Main ==================================== */
void showHelp(void) { void showHelp(void) {
@ -1673,6 +1809,7 @@ void showHelp(void) {
"--raw Show only messages hex values.\n" "--raw Show only messages hex values.\n"
"--net Enable networking.\n" "--net Enable networking.\n"
"--net-ro-port <port> TCP listening port for raw output (default: 30002).\n" "--net-ro-port <port> TCP listening port for raw output (default: 30002).\n"
"--net-ri-port <port> TCP listening port for raw input (default: 30001).\n"
"--no-fix Disable single-bits error correction using CRC.\n" "--no-fix Disable single-bits error correction using CRC.\n"
"--no-crc-check Disable messages with broken CRC (discouraged).\n" "--no-crc-check Disable messages with broken CRC (discouraged).\n"
"--stats With --ifile print stats at exit. No other output.\n" "--stats With --ifile print stats at exit. No other output.\n"
@ -1714,6 +1851,8 @@ int main(int argc, char **argv) {
Modes.net = 1; Modes.net = 1;
} else if (!strcmp(argv[j],"--net-ro-port") && more) { } else if (!strcmp(argv[j],"--net-ro-port") && more) {
Modes.net_output_raw_port = atoi(argv[++j]); Modes.net_output_raw_port = atoi(argv[++j]);
} else if (!strcmp(argv[j],"--net-ri-port") && more) {
Modes.net_input_raw_port = atoi(argv[++j]);
} else if (!strcmp(argv[j],"--onlyaddr")) { } else if (!strcmp(argv[j],"--onlyaddr")) {
Modes.onlyaddr = 1; Modes.onlyaddr = 1;
} else if (!strcmp(argv[j],"--metric")) { } else if (!strcmp(argv[j],"--metric")) {
@ -1779,7 +1918,10 @@ int main(int argc, char **argv) {
* slow processors). */ * slow processors). */
pthread_mutex_unlock(&Modes.data_mutex); pthread_mutex_unlock(&Modes.data_mutex);
detectModeS(Modes.magnitude, Modes.data_len/2); detectModeS(Modes.magnitude, Modes.data_len/2);
if (Modes.net) modesAcceptClients(); if (Modes.net) {
modesAcceptClients();
modesReceiveRawInput();
}
/* Refresh screen when in interactive mode. */ /* Refresh screen when in interactive mode. */
if (Modes.interactive && if (Modes.interactive &&