Networking layer speedup and refactoring in --net-only mode.

The networking layer is still a toy however now there is not a blocking
sleep when we need to wait to process for new clients, but a basic form
of multiplexing is used to signal when at least one client needs some
care.

The code was also partially refactored, but more work is needed to make
it more straightforward.
This commit is contained in:
antirez 2013-12-15 21:15:24 +01:00
parent 7d03c180a0
commit 0219f927dd

View File

@ -42,6 +42,7 @@
#include <ctype.h>
#include <sys/stat.h>
#include <sys/ioctl.h>
#include <sys/select.h>
#include "rtl-sdr.h"
#include "anet.h"
@ -160,10 +161,6 @@ struct {
int debug; /* Debugging mode. */
int net; /* Enable networking. */
int net_only; /* Enable just networking. */
int net_output_sbs_port; /* SBS output TCP port. */
int net_output_raw_port; /* Raw output TCP port. */
int net_input_raw_port; /* Raw input TCP port. */
int net_http_port; /* HTTP port. */
int interactive; /* Interactive mode */
int interactive_rows; /* Interactive mode: max number of rows. */
int interactive_ttl; /* Interactive mode: TTL before deletion. */
@ -270,10 +267,6 @@ void modesInitConfig(void) {
Modes.raw = 0;
Modes.net = 0;
Modes.net_only = 0;
Modes.net_output_sbs_port = MODES_NET_OUTPUT_SBS_PORT;
Modes.net_output_raw_port = MODES_NET_OUTPUT_RAW_PORT;
Modes.net_input_raw_port = MODES_NET_INPUT_RAW_PORT;
Modes.net_http_port = MODES_NET_HTTP_PORT;
Modes.onlyaddr = 0;
Modes.debug = 0;
Modes.interactive = 0;
@ -1889,37 +1882,44 @@ void snipMode(int level) {
* user space buffering.
* 2) We don't register any kind of event handler, from time to time a
* function gets called and we accept new connections. All the rest is
* handled via non-blocking I/O and manually pullign clients to see if
* handled via non-blocking I/O and manually pulling clients to see if
* they have something new to share with us when reading is needed.
*/
/* Networking "stack" initialization. */
#define MODES_NET_SERVICE_RAWO 0
#define MODES_NET_SERVICE_RAWI 1
#define MODES_NET_SERVICE_HTTP 2
#define MODES_NET_SERVICE_SBS 3
#define MODES_NET_SERVICES_NUM 4
struct {
char *descr;
int *socket;
int port;
} modesNetServices[MODES_NET_SERVICES_NUM] = {
{"Raw TCP output", &Modes.ros, MODES_NET_OUTPUT_RAW_PORT},
{"Raw TCP input", &Modes.ris, MODES_NET_INPUT_RAW_PORT},
{"HTTP server", &Modes.https, MODES_NET_HTTP_PORT},
{"Basestation TCP output", &Modes.sbsos, MODES_NET_OUTPUT_SBS_PORT}
};
/* Networking "stack" initialization. */
void modesInitNet(void) {
struct {
char *descr;
int *socket;
int port;
} services[MODES_NET_SERVICES_NUM] = {
{"Raw TCP output", &Modes.ros, Modes.net_output_raw_port},
{"Raw TCP input", &Modes.ris, Modes.net_input_raw_port},
{"HTTP server", &Modes.https, Modes.net_http_port},
{"Basestation TCP output", &Modes.sbsos, Modes.net_output_sbs_port}
};
int j;
memset(Modes.clients,0,sizeof(Modes.clients));
Modes.maxfd = -1;
for (j = 0; j < MODES_NET_SERVICES_NUM; j++) {
int s = anetTcpServer(Modes.aneterr, services[j].port, NULL);
int s = anetTcpServer(Modes.aneterr, modesNetServices[j].port, NULL);
if (s == -1) {
fprintf(stderr, "Error opening the listening port %d (%s): %s\n",
services[j].port, services[j].descr, strerror(errno));
modesNetServices[j].port,
modesNetServices[j].descr,
strerror(errno));
exit(1);
}
anetNonBlock(Modes.aneterr, s);
*services[j].socket = s;
*modesNetServices[j].socket = s;
}
signal(SIGPIPE, SIG_IGN);
@ -1932,15 +1932,10 @@ void modesAcceptClients(void) {
int fd, port;
unsigned int j;
struct client *c;
int services[4];
services[0] = Modes.ros;
services[1] = Modes.ris;
services[2] = Modes.https;
services[3] = Modes.sbsos;
for (j = 0; j < sizeof(services)/sizeof(int); j++) {
fd = anetTcpAccept(Modes.aneterr, services[j], NULL, &port);
for (j = 0; j < MODES_NET_SERVICES_NUM; j++) {
fd = anetTcpAccept(Modes.aneterr, *modesNetServices[j].socket,
NULL, &port);
if (fd == -1) continue;
if (fd >= MODES_NET_MAX_FD) {
@ -1950,14 +1945,15 @@ void modesAcceptClients(void) {
anetNonBlock(Modes.aneterr, fd);
c = malloc(sizeof(*c));
c->service = services[j];
c->service = *modesNetServices[j].socket;
c->fd = fd;
c->buflen = 0;
Modes.clients[fd] = c;
anetSetSendBuffer(Modes.aneterr,fd,MODES_NET_SNDBUF_SIZE);
if (Modes.maxfd < fd) Modes.maxfd = fd;
if (services[j] == Modes.sbsos) Modes.stat_sbs_connections++;
if (*modesNetServices[j].socket == Modes.sbsos)
Modes.stat_sbs_connections++;
j--; /* Try again with the same listening port. */
@ -2362,6 +2358,39 @@ void modesReadFromClients(void) {
}
}
/* This function is used when "net only" mode is enabled to know when there
* is at least a new client to serve. Note that the dump1090 networking model
* is extremely trivial and a function takes care of handling all the clients
* that have something to serve, without a proper event library, so the
* function here returns as long as there is a single client ready, or
* when the specified timeout in milliesconds elapsed, without specifying to
* the caller what client requires to be served. */
void modesWaitReadableClients(int timeout_ms) {
struct timeval tv;
fd_set fds;
int j, maxfd = Modes.maxfd;
FD_ZERO(&fds);
/* Set client FDs */
for (j = 0; j <= Modes.maxfd; j++) {
if (Modes.clients[j]) FD_SET(j,&fds);
}
/* Set listening sockets to accept new clients ASAP. */
for (j = 0; j < MODES_NET_SERVICES_NUM; j++) {
int s = *modesNetServices[j].socket;
FD_SET(s,&fds);
if (s > maxfd) maxfd = s;
}
tv.tv_sec = timeout_ms/1000;
tv.tv_usec = (timeout_ms%1000)*1000;
/* We don't care why select returned here, timeout, error, or
* FDs ready are all conditions for which we just return. */
select(maxfd+1,&fds,NULL,NULL,&tv);
}
/* ============================ Terminal handling ========================== */
/* Handle resizing terminal. */
@ -2471,13 +2500,13 @@ int main(int argc, char **argv) {
Modes.net = 1;
Modes.net_only = 1;
} else if (!strcmp(argv[j],"--net-ro-port") && more) {
Modes.net_output_raw_port = atoi(argv[++j]);
modesNetServices[MODES_NET_SERVICE_RAWO].port = atoi(argv[++j]);
} else if (!strcmp(argv[j],"--net-ri-port") && more) {
Modes.net_input_raw_port = atoi(argv[++j]);
modesNetServices[MODES_NET_SERVICE_RAWI].port = atoi(argv[++j]);
} else if (!strcmp(argv[j],"--net-http-port") && more) {
Modes.net_http_port = atoi(argv[++j]);
modesNetServices[MODES_NET_SERVICE_HTTP].port = atoi(argv[++j]);
} else if (!strcmp(argv[j],"--net-sbs-port") && more) {
Modes.net_output_sbs_port = atoi(argv[++j]);
modesNetServices[MODES_NET_SERVICE_SBS].port = atoi(argv[++j]);
} else if (!strcmp(argv[j],"--onlyaddr")) {
Modes.onlyaddr = 1;
} else if (!strcmp(argv[j],"--metric")) {
@ -2548,7 +2577,7 @@ int main(int argc, char **argv) {
* clients without reading data from the RTL device. */
while (Modes.net_only) {
backgroundTasks();
usleep(100000);
modesWaitReadableClients(100);
}
/* Create the thread that will read the data from the device. */