blob: a36714a95e8d3d83a9f1dbdcec67226a29240a1f [file] [log] [blame]
From b7b50775a4a76e005aa497bdd749581818d9b89d Mon Sep 17 00:00:00 2001
From: DavieV <davidvalleau@gmail.com>
Date: Thu, 23 Jan 2020 13:52:42 -0800
Subject: [PATCH] Adding support for 'keep-alive' protocol
This change updates ippusbxd to support the use of 'keep-alive' messages
on a dedicated socket.
---
src/dnssd.c | 6 +-
src/ippusbxd.c | 238 ++++++++++++++++++++++++++++++++++++++-----------
src/ippusbxd.h | 10 +++
src/logging.h | 13 +--
src/options.c | 32 ++++++-
src/options.h | 21 +++++
src/tcp.c | 4 +-
src/uds.c | 125 ++++++++++++++++++++++++--
src/uds.h | 14 ++-
src/usb.c | 54 +++++------
src/usb.h | 4 +
11 files changed, 419 insertions(+), 102 deletions(-)
diff --git a/src/dnssd.c b/src/dnssd.c
index 33e2149..a3fac02 100644
--- a/src/dnssd.c
+++ b/src/dnssd.c
@@ -41,7 +41,7 @@ dnssd_callback(AvahiEntryGroup *g, /* I - Service */
case AVAHI_ENTRY_GROUP_FAILURE :
ERR("Entry group failure: %s\n",
avahi_strerror(avahi_client_errno(avahi_entry_group_get_client(g))));
- g_options.terminate = 1;
+ set_terminate(1, &g_options);
break;
case AVAHI_ENTRY_GROUP_UNCOMMITED:
case AVAHI_ENTRY_GROUP_REGISTERING:
@@ -137,12 +137,12 @@ dnssd_client_cb(AvahiClient *c, /* I - Client */
AVAHI_CLIENT_NO_FAIL,
dnssd_client_cb, NULL, &error)) == NULL) {
ERR("Error: Unable to initialize DNS-SD client.");
- g_options.terminate = 1;
+ set_terminate(1, &g_options);
}
} else {
ERR("Avahi server connection failure: %s",
avahi_strerror(avahi_client_errno(c)));
- g_options.terminate = 1;
+ set_terminate(1, &g_options);
}
break;
diff --git a/src/ippusbxd.c b/src/ippusbxd.c
index a270ee5..724c731 100644
--- a/src/ippusbxd.c
+++ b/src/ippusbxd.c
@@ -46,10 +46,12 @@ static struct timeval start_time;
static void sigterm_handler(int sig)
{
/* Flag that we should stop and return... */
- g_options.terminate = 1;
+ set_terminate(1, &g_options);
NOTE("Caught signal %d, shutting down ...", sig);
}
+// NOTE: It is assumed that |thread_register_mutex| is locked before this
+// function is called.
static void list_service_threads(
uint32_t num_service_threads,
struct service_thread_param **service_threads)
@@ -82,6 +84,8 @@ static void list_service_threads(
NOTE("%s", buf);
}
+// NOTE: It is assumed that |thread_register_mutex| is locked before this
+// function is called.
static int register_service_thread(
uint32_t *num_service_threads,
struct service_thread_param ***service_threads,
@@ -101,6 +105,8 @@ static int register_service_thread(
return 0;
}
+// NOTE: It is assumed that |thread_register_mutex| is locked before this
+// function is called.
static int unregister_service_thread(
uint32_t *num_service_threads,
struct service_thread_param ***service_threads, uint32_t thread_num)
@@ -154,22 +160,28 @@ cleanup_handler(void *arg_void)
static void check_timeout(void)
{
- if (num_service_threads == 0 && !g_options.measuring_timeout) {
+ uint32_t num_threads =
+ get_num_service_threads(&num_service_threads, &thread_register_mutex);
+ if (num_threads == 0 && !get_measuring_timeout(&g_options)) {
if (gettimeofday(&start_time, NULL) < 0) {
ERR("Failed to get time");
- g_options.terminate = 1;
+ set_terminate(1, &g_options);
return;
}
- g_options.measuring_timeout = 1;
- } else if (num_service_threads > 0) {
- g_options.measuring_timeout = 0;
+ set_measuring_timeout(1, &g_options);
+ } else if (num_threads > 0) {
+ set_measuring_timeout(0, &g_options);
}
- if (g_options.measuring_timeout) {
+ // Lock |keep_alive_mutex| in order to prevent an 'ack' message from being
+ // sent before determining whether or not to shutdown.
+ pthread_mutex_lock(&g_options.keep_alive_mutex);
+ if (get_measuring_timeout(&g_options)) {
struct timeval current_time;
if (gettimeofday(&current_time, NULL)) {
ERR("Failed to get time");
- g_options.terminate = 1;
+ set_terminate(1, &g_options);
+ pthread_mutex_unlock(&g_options.keep_alive_mutex);
return;
}
@@ -178,9 +190,10 @@ static void check_timeout(void)
NOTE("Elapsed time: %lld seconds", (long long)seconds);
if (seconds > 10) {
NOTE("Timeout has been reached - shutting down");
- g_options.terminate = 1;
+ set_terminate(1, &g_options);
}
}
+ pthread_mutex_unlock(&g_options.keep_alive_mutex);
}
static void read_transfer_callback(struct libusb_transfer *transfer)
@@ -215,7 +228,7 @@ static void read_transfer_callback(struct libusb_transfer *transfer)
break;
case LIBUSB_TRANSFER_ERROR:
ERR("Thread #%u: There was an error completing the transfer", thread_num);
- g_options.terminate = 1;
+ set_terminate(1, &g_options);
break;
case LIBUSB_TRANSFER_TIMED_OUT:
ERR("Thread #%u: The transfer timed out before it could be completed: "
@@ -227,21 +240,21 @@ static void read_transfer_callback(struct libusb_transfer *transfer)
break;
case LIBUSB_TRANSFER_STALL:
ERR("Thread #%u: The transfer has stalled", thread_num);
- g_options.terminate = 1;
+ set_terminate(1, &g_options);
break;
case LIBUSB_TRANSFER_NO_DEVICE:
ERR("Thread #%u: The printer was disconnected during the transfer",
thread_num);
- g_options.terminate = 1;
+ set_terminate(1, &g_options);
break;
case LIBUSB_TRANSFER_OVERFLOW:
ERR("Thread #%u: The printer sent more data than was requested",
thread_num);
- g_options.terminate = 1;
+ set_terminate(1, &g_options);
break;
default:
ERR("Thread #%u: Something unexpected happened", thread_num);
- g_options.terminate = 1;
+ set_terminate(1, &g_options);
}
/* Free the packet used for the transfer. */
@@ -258,6 +271,15 @@ static void read_transfer_callback(struct libusb_transfer *transfer)
libusb_free_transfer(transfer);
}
+uint32_t get_num_service_threads(const uint32_t *num_threads,
+ pthread_mutex_t *thread_mutex) {
+ uint32_t ret;
+ pthread_mutex_lock(thread_mutex);
+ ret = *num_threads;
+ pthread_mutex_unlock(thread_mutex);
+ return ret;
+}
+
void *service_connection(void *params_void)
{
struct service_thread_param *params =
@@ -326,8 +348,8 @@ cleanup:
}
NOTE("Thread #%u: closing, %s", thread_num,
- g_options.terminate ? "shutdown requested"
- : "communication thread terminated");
+ get_terminate(&g_options) ? "shutdown requested"
+ : "communication thread terminated");
if (g_options.unix_socket_mode)
uds_conn_close(params->uds);
else
@@ -346,9 +368,9 @@ void service_socket_connection(struct service_thread_param *params)
struct http_packet_t *pkt = NULL;
- while (is_socket_open(params) && !g_options.terminate) {
+ while (is_socket_open(params) && !get_terminate(&g_options)) {
if (g_options.unix_socket_mode) {
- int poll_result = uds_poll_connection(params->uds->fd);
+ int poll_result = uds_poll_connection(params->uds->fd, 1000);
if (poll_result < 0) {
ERR("Thread #%u: Failed to poll the uds socket");
params->uds->is_closed = 1;
@@ -406,18 +428,20 @@ void *service_printer_connection(void *params_void)
struct libusb_transfer *read_transfer = NULL;
- while (is_socket_open(params) && !g_options.terminate) {
+ while (is_socket_open(params) && !get_terminate(&g_options)) {
/* If there is already a read from the printer underway, block until it has
completed. */
- pthread_mutex_lock(&read_inflight_mutex);
- while (is_socket_open(params) && read_inflight)
+ while (is_socket_open(params) &&
+ get_read_inflight(&read_inflight, &read_inflight_mutex)) {
+ pthread_mutex_lock(&read_inflight_mutex);
pthread_cond_wait(params->cond, &read_inflight_mutex);
- pthread_mutex_unlock(&read_inflight_mutex);
+ pthread_mutex_unlock(&read_inflight_mutex);
+ }
/* After waking up due to a completed transfer, verify that the socket is
still open and that the termination flag has not been set before
attempting to start another transfer. */
- if (!is_socket_open(params) || g_options.terminate)
+ if (!is_socket_open(params) || get_terminate(&g_options))
break;
/* If we received an empty response from the printer then wait for |backoff|
@@ -484,15 +508,16 @@ void *service_printer_connection(void *params_void)
/* Wait until the cancellation has completed. */
NOTE("Thread #%u: Waiting until the transfer has been cancelled",
thread_num);
- pthread_mutex_lock(&read_inflight_mutex);
- while (read_inflight)
+ while (get_read_inflight(&read_inflight, &read_inflight_mutex)) {
+ pthread_mutex_lock(&read_inflight_mutex);
pthread_cond_wait(params->cond, &read_inflight_mutex);
- pthread_mutex_unlock(&read_inflight_mutex);
+ pthread_mutex_unlock(&read_inflight_mutex);
+ }
} else if (cancel_status == LIBUSB_ERROR_NOT_FOUND) {
NOTE("Thread #%u: The transfer has already completed", thread_num);
} else {
NOTE("Thread #%u: Failed to cancel transfer");
- g_options.terminate = 1;
+ set_terminate(1, &g_options);
}
}
@@ -549,15 +574,15 @@ int setup_socket_connection(struct service_thread_param *param)
{
if (g_options.unix_socket_mode) {
int poll_status = 0;
- while (!g_options.terminate && poll_status == 0) {
+ while (!get_terminate(&g_options) && poll_status == 0) {
check_timeout();
poll_status = uds_connect(g_options.uds_socket, param->uds);
}
- if (g_options.terminate || poll_status < 0)
+ if (get_terminate(&g_options) || poll_status < 0)
return -1;
} else {
param->tcp = tcp_conn_select(g_options.tcp_socket, g_options.tcp6_socket);
- if (g_options.terminate || param->tcp == NULL)
+ if (get_terminate(&g_options) || param->tcp == NULL)
return -1;
}
@@ -601,6 +626,73 @@ int setup_communication_thread(void *(*routine)(void *),
return 0;
}
+int setup_keep_alive_thread(void *(*routine)(void *),
+ pthread_t *keep_alive_handle) {
+ NOTE("Setting up keep-alive thread");
+ int status = pthread_create(keep_alive_handle, NULL, routine, NULL);
+ if (status) {
+ ERR("Failed to create keep-alive thread, error %d", status);
+ return -1;
+ }
+ return 0;
+}
+
+void *run_keep_alive(void* param) {
+ if (param != NULL) {
+ ERR("run_keep_alive: Received unexpected parameter in run_keep_alive()");
+ set_terminate(1, &g_options);
+ pthread_exit(NULL);
+ }
+
+ // Detach this thread so that the main thread does not need to join this
+ // thread after termination for clean-up.
+ pthread_detach(pthread_self());
+
+ // Allow immediate cancelling of this thread.
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
+ pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
+
+ while (!get_terminate(&g_options)) {
+ int poll_status = poll_keep_alive(g_options.keep_alive_socket);
+ if (poll_status < 0) {
+ break;
+ } else if (poll_status > 0) {
+ // accept connection.
+ int conn_fd = accept_keep_alive(g_options.keep_alive_socket);
+ if (conn_fd < 0) {
+ break;
+ }
+ // read "keep-alive" message.
+ if (recv_keep_alive(conn_fd) < 0) {
+ close(conn_fd);
+ break;
+ }
+
+ // Lock |keep_alive_mutex| in order to ensure that a timeout doesn't occur
+ // while we are in the process of sending an 'ack' response.
+ pthread_mutex_lock(&g_options.keep_alive_mutex);
+ if (!get_terminate(&g_options)) {
+ NOTE("Successfully received 'keep-alive'. Resetting countdown");
+ set_measuring_timeout(0, &g_options);
+ // send "ack" message.
+ if (send_ack(conn_fd) < 0) {
+ close(conn_fd);
+ pthread_mutex_unlock(&g_options.keep_alive_mutex);
+ break;
+ }
+ }
+ pthread_mutex_unlock(&g_options.keep_alive_mutex);
+
+ NOTE("Shutting down connection on keep-alive socket");
+ shutdown(conn_fd, SHUT_RDWR);
+ close(conn_fd);
+ }
+ }
+
+ set_terminate(1, &g_options);
+ pthread_exit(NULL);
+}
+
struct libusb_callback_data *setup_libusb_callback_data(
struct http_packet_t *pkt, int *read_inflight, int *empty_response,
struct service_thread_param *thread_param,
@@ -662,7 +754,8 @@ static void start_daemon()
struct usb_sock_t *usb_sock;
/* Termination flag */
- g_options.terminate = 0;
+ pthread_mutex_init(&g_options.terminate_mutex, NULL);
+ set_terminate(0, &g_options);
usb_sock = usb_open();
if (usb_sock == NULL) goto cleanup_usb;
@@ -697,6 +790,13 @@ static void start_daemon()
g_options.tcp6_socket ? "" : "not ");
}
+ if (g_options.use_keep_alive && g_options.keep_alive_path) {
+ g_options.keep_alive_socket = uds_open(g_options.keep_alive_path);
+ if (g_options.keep_alive_socket == NULL) {
+ goto cleanup_connections;
+ }
+ }
+
/* Lose connection to caller */
uint16_t pid;
if (!g_options.nofork_mode && (pid = fork()) > 0) {
@@ -739,10 +839,26 @@ static void start_daemon()
goto cleanup_connections;
}
+ // Setup dedicated thread for keep-alive socket.
+ pthread_mutex_init(&g_options.measuring_timeout_mutex, NULL);
+ pthread_mutex_init(&g_options.keep_alive_mutex, NULL);
+ pthread_t keep_alive_handle = 0;
+ int keep_alive_status;
+ if (g_options.use_keep_alive) {
+ keep_alive_status =
+ setup_keep_alive_thread(&run_keep_alive, &keep_alive_handle);
+ if (keep_alive_status < 0) {
+ ERR("Failed to launch keep-alive thread");
+ goto cleanup_connections;
+ }
+ NOTE("Launched keep-alive socket at %d", keep_alive_handle);
+ }
+
+ pthread_mutex_init(&thread_register_mutex, NULL);
+
/* Main loop */
uint32_t i = 1;
- pthread_mutex_init(&thread_register_mutex, NULL);
- while (!g_options.terminate) {
+ while (!get_terminate(&g_options)) {
struct service_thread_param *args = calloc(1, sizeof(*args));
if (args == NULL) {
ERR("Preparing thread #%u: Failed to alloc space for thread args", i);
@@ -782,6 +898,16 @@ static void start_daemon()
}
cleanup_connections:
+ if (g_options.use_keep_alive && keep_alive_status == 0) {
+ NOTE("Shutting down keep-alive observer thread");
+ pthread_cancel(keep_alive_handle);
+ pthread_join(keep_alive_handle, NULL);
+ }
+
+ // Close keep-alive socket.
+ if (g_options.keep_alive_socket != NULL)
+ uds_close(g_options.keep_alive_socket);
+
/* Stop DNS-SD advertising of the printer */
if (g_options.dnssd_data != NULL)
dnssd_shutdown();
@@ -789,18 +915,23 @@ static void start_daemon()
/* Cancel communication threads which did not terminate by themselves when
stopping ippusbxd, so that no USB communication with the printer can
happen after the final reset */
- while (num_service_threads) {
+ uint32_t cur;
+ while ((cur = get_num_service_threads(&num_service_threads,
+ &thread_register_mutex))) {
NOTE("Thread #%u did not terminate, canceling it now ...",
service_threads[0]->thread_num);
- i = num_service_threads;
pthread_cancel(service_threads[0]->thread_handle);
- while (i == num_service_threads)
+ while (cur == get_num_service_threads(&num_service_threads,
+ &thread_register_mutex)) {
usleep(1000000);
+ }
}
/* Wait for USB unplug event observer thread to terminate */
- NOTE("Shutting down usb observer thread");
- pthread_join(g_options.usb_event_thread_handle, NULL);
+ if (usb_sock != NULL && usb_sock->registered_callback) {
+ NOTE("Shutting down usb observer thread");
+ pthread_join(g_options.usb_event_thread_handle, NULL);
+ }
/* TCP clean-up */
if (g_options.tcp_socket!= NULL)
@@ -808,14 +939,19 @@ static void start_daemon()
if (g_options.tcp6_socket!= NULL)
tcp_close(g_options.tcp6_socket);
- /* UDS clean-up */
- if (g_options.uds_socket != NULL)
- uds_close(g_options.uds_socket);
-
cleanup_usb:
/* USB clean-up and final reset of the printer */
if (usb_sock != NULL)
usb_close(usb_sock);
+
+ // NOTE: This must be performed last as ippusb_manager waits until the closure
+ // of this socket before attempting to launch another instance of
+ // ippusbxd.
+ //
+ // Close UDS socket.
+ if (g_options.uds_socket != NULL)
+ uds_close(g_options.uds_socket);
+
return;
}
@@ -850,6 +986,7 @@ int main(int argc, char *argv[])
{"only-port", required_argument, 0, 'p' },
{"interface", required_argument, 0, 'i' },
{"uds-path", required_argument, 0, 'U' },
+ {"keep-alive", required_argument, 0, 'K' },
{"logging", no_argument, 0, 'l' },
{"debug", no_argument, 0, 'd' },
{"verbose", no_argument, 0, 'q' },
@@ -858,18 +995,13 @@ int main(int argc, char *argv[])
{"help", no_argument, 0, 'h' },
{NULL, 0, 0, 0 }
};
+
+ memset(&g_options, 0, sizeof(g_options));
g_options.log_destination = LOGGING_STDERR;
- g_options.only_desired_port = 0;
g_options.desired_port = 60000;
g_options.interface = "lo";
- g_options.serial_num = NULL;
- g_options.vendor_id = 0;
- g_options.product_id = 0;
- g_options.bus = 0;
- g_options.device = 0;
- g_options.measuring_timeout = 0;
-
- while ((c = getopt_long(argc, argv, "qnhdp:P:i:s:lv:m:B",
+
+ while ((c = getopt_long(argc, argv, "qnhdp:P:i:s:lv:m:BU:K:",
long_options, &option_index)) != -1) {
switch (c) {
case '?':
@@ -949,6 +1081,10 @@ int main(int argc, char *argv[])
g_options.unix_socket_mode = 1;
g_options.unix_socket_path = strdup(optarg);
break;
+ case 'K':
+ g_options.keep_alive_path = strdup(optarg);
+ g_options.use_keep_alive = 1;
+ NOTE("Received keep alive socket at %s", g_options.keep_alive_path);
}
}
diff --git a/src/ippusbxd.h b/src/ippusbxd.h
index 69461fe..f7854dd 100644
--- a/src/ippusbxd.h
+++ b/src/ippusbxd.h
@@ -48,6 +48,11 @@ const int maximum_backoff = 1000;
/* Function prototypes */
+/* Gets the value of |num_service_threads|. Uses |thread_mutex| in order to
+ prevent concurrent access to |num_service_threads|. */
+uint32_t get_num_service_threads(const uint32_t *num_threads,
+ pthread_mutex_t *thread_mutex);
+
/* Handles connection requests and
is run in a separate thread. It detaches itself from the main thread and sets
up a USB connection with the printer. This function spawns a partner thread
@@ -90,6 +95,11 @@ int setup_usb_connection(struct usb_sock_t *usb_sock,
int setup_communication_thread(void *(*routine)(void *),
struct service_thread_param *param);
+/* Attempts to register the keep-alive thread to run the given |routine|.
+ Returns 0 if successful, and a non-zero value otherwise. */
+int setup_keep_alive_thread(void *(*routine)(void *),
+ pthread_t *keep_alive_handle);
+
/* Creates a new libusb_callback_data struct with the given paramaters. */
struct libusb_callback_data *setup_libusb_callback_data(
struct http_packet_t *pkt, int *read_inflight, int *empty_response,
diff --git a/src/logging.h b/src/logging.h
index d6a2db5..dff89a9 100644
--- a/src/logging.h
+++ b/src/logging.h
@@ -50,12 +50,13 @@ enum log_level {
#define CONF_1(msg) BASE_LOG(LOGGING_CONFORMANCE, "<%d>Standard Conformance Failure: " msg "\n", TID())
#define CONF_2(msg, ...) BASE_LOG(LOGGING_CONFORMANCE, "<%d>Standard Conformance Failure: " msg "\n", TID(), __VA_ARGS__)
-#define ERR_AND_EXIT(...) \
- do { \
- ERR(__VA_ARGS__); \
- if (g_options.dnssd_data != NULL) dnssd_shutdown(g_options.dnssd_data); \
- if (g_options.uds_socket != NULL) uds_close(g_options.uds_socket); \
- exit(-1); \
+#define ERR_AND_EXIT(...) \
+ do { \
+ ERR(__VA_ARGS__); \
+ if (g_options.dnssd_data != NULL) dnssd_shutdown(g_options.dnssd_data); \
+ if (g_options.keep_alive != NULL) uds_close(g_options.keep_alive_socket); \
+ if (g_options.uds_socket != NULL) uds_close(g_options.uds_socket); \
+ exit(-1); \
} while (0)
void BASE_LOG(enum log_level, const char *, ...);
diff --git a/src/options.c b/src/options.c
index 464645d..cc3424d 100644
--- a/src/options.c
+++ b/src/options.c
@@ -14,5 +14,35 @@
#include "options.h"
-struct options g_options;
+#include <pthread.h>
+#include <string.h>
+
+void set_measuring_timeout(int val, struct options* option) {
+ pthread_mutex_lock(&option->measuring_timeout_mutex);
+ option->measuring_timeout = val;
+ pthread_mutex_unlock(&option->measuring_timeout_mutex);
+}
+
+int get_measuring_timeout(struct options* option) {
+ int ret;
+ pthread_mutex_lock(&option->measuring_timeout_mutex);
+ ret = option->measuring_timeout;
+ pthread_mutex_unlock(&option->measuring_timeout_mutex);
+ return ret;
+}
+void set_terminate(int val, struct options* option) {
+ pthread_mutex_lock(&option->terminate_mutex);
+ option->terminate = val;
+ pthread_mutex_unlock(&option->terminate_mutex);
+}
+
+int get_terminate(struct options* option) {
+ int ret;
+ pthread_mutex_lock(&option->terminate_mutex);
+ ret = option->terminate;
+ pthread_mutex_unlock(&option->terminate_mutex);
+ return ret;
+}
+
+struct options g_options;
diff --git a/src/options.h b/src/options.h
index dc93bcf..552e10c 100644
--- a/src/options.h
+++ b/src/options.h
@@ -30,6 +30,7 @@ struct options {
uint16_t real_port;
char *interface;
char *unix_socket_path;
+ char *keep_alive_path;
enum log_target log_destination;
/* Behavior */
@@ -38,6 +39,7 @@ struct options {
int nofork_mode;
int nobroadcast;
int unix_socket_mode;
+ int use_keep_alive;
/* Printer identity */
unsigned char *serial_num;
@@ -59,6 +61,25 @@ struct options {
int scanner_present;
struct uds_sock_t *uds_socket;
+ struct uds_sock_t *keep_alive_socket;
+
+ pthread_mutex_t terminate_mutex;
+
+ // Locks access to the |measuring_timeout| flag.
+ pthread_mutex_t measuring_timeout_mutex;
+
+ // Used to lock critical sections of the 'keep-alive' process.
+ pthread_mutex_t keep_alive_mutex;
};
+// Accessor functions for the |measuring_timeout| flag which use a mutex to
+// prevent concurrent access.
+void set_measuring_timeout(int val, struct options *option);
+int get_measuring_timeout(struct options *option);
+
+// Accessor functions for the |terminate| flag which use a mutex to prevent
+// concurrent access.
+void set_terminate(int val, struct options *option);
+int get_terminate(struct options *option);
+
extern struct options g_options;
diff --git a/src/tcp.c b/src/tcp.c
index 92a128c..5ed7931 100644
--- a/src/tcp.c
+++ b/src/tcp.c
@@ -266,7 +266,7 @@ int tcp_packet_send(struct tcp_conn_t *conn, struct http_packet_t *pkt)
size_t remaining = pkt->filled_size;
size_t total = 0;
- while (remaining > 0 && !g_options.terminate) {
+ while (remaining > 0 && !get_terminate(&g_options)) {
ssize_t sent = send(conn->sd, pkt->buffer + total, remaining, MSG_NOSIGNAL);
if (sent < 0) {
@@ -319,7 +319,7 @@ struct tcp_conn_t *tcp_conn_select(struct tcp_sock_t *sock,
}
nfds += 1;
retval = select(nfds, &rfds, NULL, NULL, NULL);
- if (g_options.terminate)
+ if (get_terminate(&g_options))
goto error;
if (retval < 1) {
ERR("Failed to open tcp connection");
diff --git a/src/uds.c b/src/uds.c
index de349db..6b4498e 100644
--- a/src/uds.c
+++ b/src/uds.c
@@ -18,6 +18,11 @@
struct uds_sock_t *uds_open(const char *path)
{
+ if (path == NULL) {
+ ERR("UDS: Given socket path is null");
+ goto error;
+ }
+
struct uds_sock_t *sock = calloc(1, sizeof(*sock));
if (sock == NULL) {
ERR("UDS: Allocating memory for socket failed");
@@ -91,9 +96,9 @@ int uds_connect(struct uds_sock_t *sock, struct uds_conn_t *conn)
return -1;
}
- int poll_result = uds_poll_connection(sock->fd);
+ int poll_result = uds_poll_connection(sock->fd, 1000);
- if (g_options.terminate)
+ if (get_terminate(&g_options))
return -1;
if (poll_result < 0) {
@@ -165,7 +170,7 @@ int uds_packet_send(struct uds_conn_t *conn, struct http_packet_t *pkt)
size_t remaining = pkt->filled_size;
size_t total = 0;
- while (remaining > 0 && !g_options.terminate) {
+ while (remaining > 0 && !get_terminate(&g_options)) {
ssize_t sent = send(conn->fd, pkt->buffer + total, remaining, MSG_NOSIGNAL);
if (sent < 0) {
@@ -190,11 +195,119 @@ int uds_packet_send(struct uds_conn_t *conn, struct http_packet_t *pkt)
return 0;
}
-// Polls the UDS connection in |conn| to see if there is any data to be read.
-int uds_poll_connection(int fd) {
+// Polls the UDS connection at |fd| for a maximum of |timeout| milliseconds to
+// see if there is any data to be read.
+int uds_poll_connection(int fd, int timeout) {
struct pollfd poll_fd;
poll_fd.fd = fd;
poll_fd.events = POLLIN;
- static int timeout = 1000;
return poll(&poll_fd, 1, timeout);
}
+
+int poll_keep_alive(struct uds_sock_t *sock) {
+ if (sock == NULL) {
+ ERR("UDS: No valid keep-alive socket provided");
+ return -1;
+ }
+
+ int poll_result = uds_poll_connection(sock->fd, 100);
+
+ if (get_terminate(&g_options))
+ return -1;
+
+ if (poll_result < 0) {
+ ERR("Something went wrong when polling the keep-alive socket");
+ return -1;
+ }
+
+ // There is no data to be read on the socket.
+ if (poll_result == 0) {
+ return 0;
+ }
+
+ NOTE("Found a pending message on the keep-alive socket");
+ return poll_result;
+}
+
+int accept_keep_alive(struct uds_sock_t *sock) {
+ if (get_terminate(&g_options)) {
+ ERR("Termination has been requested");
+ return -1;
+ }
+
+ int conn_fd = accept(sock->fd, NULL, NULL);
+ if (conn_fd < 0) {
+ ERR("Failed to accept connection on keep-alive socket");
+ } else {
+ NOTE("Accepted the connection on the keep-alive socket");
+ }
+ return conn_fd;
+}
+
+int recv_keep_alive(int conn_fd) {
+ if (get_terminate(&g_options)) {
+ ERR("Termination has been requested");
+ return -1;
+ }
+
+ uint8_t length = 0;
+ if (recv(conn_fd, &length, 1, 0) == -1) {
+ ERR("Failed to read length of message");
+ return -1;
+ }
+ // We only ever expect to receive the 'keep-alive' message on the
+ // socket (which includes the trailing '\0' character).
+ if (length != 11) {
+ ERR("Received unexpected message length %d", length);
+ return -1;
+ }
+
+ char buf[11] = {0};
+ size_t bytes_received = 0;
+ while (bytes_received < length) {
+ ssize_t bytes_read =
+ recv(conn_fd, buf + bytes_received, length - bytes_received, 0);
+ if (bytes_read < 0) {
+ ERR("Failed to read from keep-alive socket");
+ return -1;
+ }
+ bytes_received += (size_t)bytes_read;
+ }
+
+ if (strncmp(buf, "keep-alive", 11) != 0) {
+ ERR("Received unexpected message '%s' on keep-alive socket", buf);
+ return -1;
+ }
+
+ NOTE("Received 'keep-alive' message on keep-alive socket");
+ return 0;
+}
+
+int send_ack(int conn_fd) {
+ if (get_terminate(&g_options)) {
+ ERR("Termination has been requested");
+ return -1;
+ }
+
+ uint8_t length = 4;
+ if (send(conn_fd, &length, 1, 0) == -1) {
+ ERR("Failed to send length prefix byte");
+ return -1;
+ }
+
+ const char *ack = "ack";
+ size_t bytes_sent = 0;
+ size_t bytes_remaining = 4;
+ while (bytes_remaining > 0) {
+ ssize_t sent = send(conn_fd, ack + bytes_sent, bytes_remaining, 0);
+ if (sent < 0) {
+ ERR("Failed to send 'ack' on keep-alive socket");
+ return -1;
+ }
+ bytes_sent += (size_t)sent;
+ bytes_remaining -= (size_t)sent;
+ }
+
+ NOTE("Sent 'ack' response on keep-alive socket");
+ return 0;
+}
diff --git a/src/uds.h b/src/uds.h
index 67c13f9..1a39b08 100644
--- a/src/uds.h
+++ b/src/uds.h
@@ -43,9 +43,17 @@ struct http_packet_t *uds_packet_get(struct uds_conn_t *conn);
int uds_packet_send(struct uds_conn_t *conn, struct http_packet_t *pkt);
-// Polls the given file descriptor |fd| to see if there is any data to be read.
-// Return values:
+// Polls the given file descriptor |fd| up to |timeout| milliseconds to see if
+// there is any data to be read. Return values:
// 1 - there is data to be read.
// 0 - there is not any data to be read.
// -1 - there was an error.
-int uds_poll_connection(int fd);
+int uds_poll_connection(int fd, int timeout);
+
+int poll_keep_alive(struct uds_sock_t *sock);
+
+int accept_keep_alive(struct uds_sock_t *sock);
+
+int recv_keep_alive(int conn_fd);
+
+int send_ack(int conn_fd);
diff --git a/src/usb.c b/src/usb.c
index 21c84d4..aa05960 100644
--- a/src/usb.c
+++ b/src/usb.c
@@ -244,7 +244,7 @@ static int try_claim_usb_interface(struct usb_sock_t *usb,
default:
break;
}
- } while (status != 0 && !g_options.terminate);
+ } while (status != 0 && !get_terminate(&g_options));
return 0;
}
@@ -530,9 +530,6 @@ void usb_close(struct usb_sock_t *usb)
sem_destroy(&usb->interfaces[i].lock);
}
- NOTE("Resetting printer ...");
- libusb_reset_device(usb->printer);
- NOTE("Reset completed.");
NOTE("Closing device handle...");
libusb_close(usb->printer);
NOTE("Closed device handle.");
@@ -600,6 +597,8 @@ static int LIBUSB_CALL usb_exit_on_unplug(libusb_context *context,
tcp_close(g_options.tcp6_socket);
/* UDS clean-up */
+ if (g_options.keep_alive_socket != NULL)
+ uds_close(g_options.keep_alive_socket);
if (g_options.uds_socket != NULL)
uds_close(g_options.uds_socket);
@@ -615,7 +614,7 @@ static void *usb_pump_events(void *user_data)
NOTE("USB unplug event observer thread starting");
- while (!g_options.terminate) {
+ while (!get_terminate(&g_options)) {
/* NOTE: This is a blocking call so
no need for sleep() */
struct timeval tv;
@@ -629,32 +628,27 @@ static void *usb_pump_events(void *user_data)
return NULL;
}
-void usb_register_callback(struct usb_sock_t *usb)
-{
- IGNORE(usb);
-
- int status =
- libusb_hotplug_register_callback(NULL,
- LIBUSB_HOTPLUG_EVENT_DEVICE_LEFT,
- /* Note: libusb's enum has no default value
- a bug has been filled with libusb.
- Please switch the below line to 0
- once the issue has been fixed in
- deployed versions of libusb
- https://github.com/libusb/libusb/issues/35 */
- /* 0, */
- LIBUSB_HOTPLUG_ENUMERATE,
- g_options.vendor_id,
- g_options.product_id,
- LIBUSB_HOTPLUG_MATCH_ANY,
- &usb_exit_on_unplug,
- NULL,
- NULL);
+void usb_register_callback(struct usb_sock_t *usb) {
+ int status = libusb_hotplug_register_callback(
+ NULL, LIBUSB_HOTPLUG_EVENT_DEVICE_LEFT,
+ /* Note: libusb's enum has no default value
+ a bug has been filled with libusb.
+ Please switch the below line to 0
+ once the issue has been fixed in
+ deployed versions of libusb
+ https://github.com/libusb/libusb/issues/35 */
+ /* 0, */
+ LIBUSB_HOTPLUG_ENUMERATE, g_options.vendor_id, g_options.product_id,
+ LIBUSB_HOTPLUG_MATCH_ANY, &usb_exit_on_unplug, NULL, NULL);
if (status == LIBUSB_SUCCESS) {
- pthread_create(&(g_options.usb_event_thread_handle), NULL, &usb_pump_events, NULL);
+ pthread_create(&(g_options.usb_event_thread_handle), NULL, &usb_pump_events,
+ NULL);
+ usb->registered_callback = 1;
NOTE("Registered unplug callback");
- } else
+ } else {
ERR("Failed to register unplug callback");
+ usb->registered_callback = 0;
+ }
}
struct usb_conn_t *usb_conn_acquire(struct usb_sock_t *usb)
@@ -664,7 +658,7 @@ struct usb_conn_t *usb_conn_acquire(struct usb_sock_t *usb)
if (usb->num_avail <= 0) {
NOTE("All USB interfaces busy, waiting ...");
for (i = 0; i < 30 && usb->num_avail <= 0; i ++) {
- if (g_options.terminate)
+ if (get_terminate(&g_options))
return NULL;
usleep(100000);
}
@@ -737,7 +731,7 @@ int usb_conn_packet_send(struct usb_conn_t *conn, struct http_packet_t *pkt)
int num_timeouts = 0;
size_t sent = 0;
size_t pending = pkt->filled_size;
- while (pending > 0 && !g_options.terminate) {
+ while (pending > 0 && !get_terminate(&g_options)) {
int to_send = (int)pending;
NOTE("P %p: USB: want to send %d bytes", pkt, to_send);
diff --git a/src/usb.h b/src/usb.h
index 3aa939c..5b9ae68 100644
--- a/src/usb.h
+++ b/src/usb.h
@@ -37,6 +37,10 @@ struct usb_sock_t {
char *device_id;
int max_packet_size;
+ // Represents whether the unplug callback was successfully registered for this
+ // USB device.
+ int registered_callback;
+
uint32_t num_interfaces;
struct usb_interface *interfaces;
--
2.25.0.265.gbab2e86ba0-goog