| From b7b50775a4a76e005aa497bdd749581818d9b89d Mon Sep 17 00:00:00 2001 |
| From: DavieV <davidvalleau@gmail.com> |
| Date: Thu, 23 Jan 2020 13:52:42 -0800 |
| Subject: [PATCH] Adding support for 'keep-alive' protocol |
| |
| This change updates ippusbxd to support the use of 'keep-alive' messages |
| on a dedicated socket. |
| --- |
| src/dnssd.c | 6 +- |
| src/ippusbxd.c | 238 ++++++++++++++++++++++++++++++++++++++----------- |
| src/ippusbxd.h | 10 +++ |
| src/logging.h | 13 +-- |
| src/options.c | 32 ++++++- |
| src/options.h | 21 +++++ |
| src/tcp.c | 4 +- |
| src/uds.c | 125 ++++++++++++++++++++++++-- |
| src/uds.h | 14 ++- |
| src/usb.c | 54 +++++------ |
| src/usb.h | 4 + |
| 11 files changed, 419 insertions(+), 102 deletions(-) |
| |
| diff --git a/src/dnssd.c b/src/dnssd.c |
| index 33e2149..a3fac02 100644 |
| --- a/src/dnssd.c |
| +++ b/src/dnssd.c |
| @@ -41,7 +41,7 @@ dnssd_callback(AvahiEntryGroup *g, /* I - Service */ |
| case AVAHI_ENTRY_GROUP_FAILURE : |
| ERR("Entry group failure: %s\n", |
| avahi_strerror(avahi_client_errno(avahi_entry_group_get_client(g)))); |
| - g_options.terminate = 1; |
| + set_terminate(1, &g_options); |
| break; |
| case AVAHI_ENTRY_GROUP_UNCOMMITED: |
| case AVAHI_ENTRY_GROUP_REGISTERING: |
| @@ -137,12 +137,12 @@ dnssd_client_cb(AvahiClient *c, /* I - Client */ |
| AVAHI_CLIENT_NO_FAIL, |
| dnssd_client_cb, NULL, &error)) == NULL) { |
| ERR("Error: Unable to initialize DNS-SD client."); |
| - g_options.terminate = 1; |
| + set_terminate(1, &g_options); |
| } |
| } else { |
| ERR("Avahi server connection failure: %s", |
| avahi_strerror(avahi_client_errno(c))); |
| - g_options.terminate = 1; |
| + set_terminate(1, &g_options); |
| } |
| break; |
| |
| diff --git a/src/ippusbxd.c b/src/ippusbxd.c |
| index a270ee5..724c731 100644 |
| --- a/src/ippusbxd.c |
| +++ b/src/ippusbxd.c |
| @@ -46,10 +46,12 @@ static struct timeval start_time; |
| static void sigterm_handler(int sig) |
| { |
| /* Flag that we should stop and return... */ |
| - g_options.terminate = 1; |
| + set_terminate(1, &g_options); |
| NOTE("Caught signal %d, shutting down ...", sig); |
| } |
| |
| +// NOTE: It is assumed that |thread_register_mutex| is locked before this |
| +// function is called. |
| static void list_service_threads( |
| uint32_t num_service_threads, |
| struct service_thread_param **service_threads) |
| @@ -82,6 +84,8 @@ static void list_service_threads( |
| NOTE("%s", buf); |
| } |
| |
| +// NOTE: It is assumed that |thread_register_mutex| is locked before this |
| +// function is called. |
| static int register_service_thread( |
| uint32_t *num_service_threads, |
| struct service_thread_param ***service_threads, |
| @@ -101,6 +105,8 @@ static int register_service_thread( |
| return 0; |
| } |
| |
| +// NOTE: It is assumed that |thread_register_mutex| is locked before this |
| +// function is called. |
| static int unregister_service_thread( |
| uint32_t *num_service_threads, |
| struct service_thread_param ***service_threads, uint32_t thread_num) |
| @@ -154,22 +160,28 @@ cleanup_handler(void *arg_void) |
| |
| static void check_timeout(void) |
| { |
| - if (num_service_threads == 0 && !g_options.measuring_timeout) { |
| + uint32_t num_threads = |
| + get_num_service_threads(&num_service_threads, &thread_register_mutex); |
| + if (num_threads == 0 && !get_measuring_timeout(&g_options)) { |
| if (gettimeofday(&start_time, NULL) < 0) { |
| ERR("Failed to get time"); |
| - g_options.terminate = 1; |
| + set_terminate(1, &g_options); |
| return; |
| } |
| - g_options.measuring_timeout = 1; |
| - } else if (num_service_threads > 0) { |
| - g_options.measuring_timeout = 0; |
| + set_measuring_timeout(1, &g_options); |
| + } else if (num_threads > 0) { |
| + set_measuring_timeout(0, &g_options); |
| } |
| |
| - if (g_options.measuring_timeout) { |
| + // Lock |keep_alive_mutex| in order to prevent an 'ack' message from being |
| + // sent before determining whether or not to shutdown. |
| + pthread_mutex_lock(&g_options.keep_alive_mutex); |
| + if (get_measuring_timeout(&g_options)) { |
| struct timeval current_time; |
| if (gettimeofday(¤t_time, NULL)) { |
| ERR("Failed to get time"); |
| - g_options.terminate = 1; |
| + set_terminate(1, &g_options); |
| + pthread_mutex_unlock(&g_options.keep_alive_mutex); |
| return; |
| } |
| |
| @@ -178,9 +190,10 @@ static void check_timeout(void) |
| NOTE("Elapsed time: %lld seconds", (long long)seconds); |
| if (seconds > 10) { |
| NOTE("Timeout has been reached - shutting down"); |
| - g_options.terminate = 1; |
| + set_terminate(1, &g_options); |
| } |
| } |
| + pthread_mutex_unlock(&g_options.keep_alive_mutex); |
| } |
| |
| static void read_transfer_callback(struct libusb_transfer *transfer) |
| @@ -215,7 +228,7 @@ static void read_transfer_callback(struct libusb_transfer *transfer) |
| break; |
| case LIBUSB_TRANSFER_ERROR: |
| ERR("Thread #%u: There was an error completing the transfer", thread_num); |
| - g_options.terminate = 1; |
| + set_terminate(1, &g_options); |
| break; |
| case LIBUSB_TRANSFER_TIMED_OUT: |
| ERR("Thread #%u: The transfer timed out before it could be completed: " |
| @@ -227,21 +240,21 @@ static void read_transfer_callback(struct libusb_transfer *transfer) |
| break; |
| case LIBUSB_TRANSFER_STALL: |
| ERR("Thread #%u: The transfer has stalled", thread_num); |
| - g_options.terminate = 1; |
| + set_terminate(1, &g_options); |
| break; |
| case LIBUSB_TRANSFER_NO_DEVICE: |
| ERR("Thread #%u: The printer was disconnected during the transfer", |
| thread_num); |
| - g_options.terminate = 1; |
| + set_terminate(1, &g_options); |
| break; |
| case LIBUSB_TRANSFER_OVERFLOW: |
| ERR("Thread #%u: The printer sent more data than was requested", |
| thread_num); |
| - g_options.terminate = 1; |
| + set_terminate(1, &g_options); |
| break; |
| default: |
| ERR("Thread #%u: Something unexpected happened", thread_num); |
| - g_options.terminate = 1; |
| + set_terminate(1, &g_options); |
| } |
| |
| /* Free the packet used for the transfer. */ |
| @@ -258,6 +271,15 @@ static void read_transfer_callback(struct libusb_transfer *transfer) |
| libusb_free_transfer(transfer); |
| } |
| |
| +uint32_t get_num_service_threads(const uint32_t *num_threads, |
| + pthread_mutex_t *thread_mutex) { |
| + uint32_t ret; |
| + pthread_mutex_lock(thread_mutex); |
| + ret = *num_threads; |
| + pthread_mutex_unlock(thread_mutex); |
| + return ret; |
| +} |
| + |
| void *service_connection(void *params_void) |
| { |
| struct service_thread_param *params = |
| @@ -326,8 +348,8 @@ cleanup: |
| } |
| |
| NOTE("Thread #%u: closing, %s", thread_num, |
| - g_options.terminate ? "shutdown requested" |
| - : "communication thread terminated"); |
| + get_terminate(&g_options) ? "shutdown requested" |
| + : "communication thread terminated"); |
| if (g_options.unix_socket_mode) |
| uds_conn_close(params->uds); |
| else |
| @@ -346,9 +368,9 @@ void service_socket_connection(struct service_thread_param *params) |
| |
| struct http_packet_t *pkt = NULL; |
| |
| - while (is_socket_open(params) && !g_options.terminate) { |
| + while (is_socket_open(params) && !get_terminate(&g_options)) { |
| if (g_options.unix_socket_mode) { |
| - int poll_result = uds_poll_connection(params->uds->fd); |
| + int poll_result = uds_poll_connection(params->uds->fd, 1000); |
| if (poll_result < 0) { |
| ERR("Thread #%u: Failed to poll the uds socket"); |
| params->uds->is_closed = 1; |
| @@ -406,18 +428,20 @@ void *service_printer_connection(void *params_void) |
| |
| struct libusb_transfer *read_transfer = NULL; |
| |
| - while (is_socket_open(params) && !g_options.terminate) { |
| + while (is_socket_open(params) && !get_terminate(&g_options)) { |
| /* If there is already a read from the printer underway, block until it has |
| completed. */ |
| - pthread_mutex_lock(&read_inflight_mutex); |
| - while (is_socket_open(params) && read_inflight) |
| + while (is_socket_open(params) && |
| + get_read_inflight(&read_inflight, &read_inflight_mutex)) { |
| + pthread_mutex_lock(&read_inflight_mutex); |
| pthread_cond_wait(params->cond, &read_inflight_mutex); |
| - pthread_mutex_unlock(&read_inflight_mutex); |
| + pthread_mutex_unlock(&read_inflight_mutex); |
| + } |
| |
| /* After waking up due to a completed transfer, verify that the socket is |
| still open and that the termination flag has not been set before |
| attempting to start another transfer. */ |
| - if (!is_socket_open(params) || g_options.terminate) |
| + if (!is_socket_open(params) || get_terminate(&g_options)) |
| break; |
| |
| /* If we received an empty response from the printer then wait for |backoff| |
| @@ -484,15 +508,16 @@ void *service_printer_connection(void *params_void) |
| /* Wait until the cancellation has completed. */ |
| NOTE("Thread #%u: Waiting until the transfer has been cancelled", |
| thread_num); |
| - pthread_mutex_lock(&read_inflight_mutex); |
| - while (read_inflight) |
| + while (get_read_inflight(&read_inflight, &read_inflight_mutex)) { |
| + pthread_mutex_lock(&read_inflight_mutex); |
| pthread_cond_wait(params->cond, &read_inflight_mutex); |
| - pthread_mutex_unlock(&read_inflight_mutex); |
| + pthread_mutex_unlock(&read_inflight_mutex); |
| + } |
| } else if (cancel_status == LIBUSB_ERROR_NOT_FOUND) { |
| NOTE("Thread #%u: The transfer has already completed", thread_num); |
| } else { |
| NOTE("Thread #%u: Failed to cancel transfer"); |
| - g_options.terminate = 1; |
| + set_terminate(1, &g_options); |
| } |
| } |
| |
| @@ -549,15 +574,15 @@ int setup_socket_connection(struct service_thread_param *param) |
| { |
| if (g_options.unix_socket_mode) { |
| int poll_status = 0; |
| - while (!g_options.terminate && poll_status == 0) { |
| + while (!get_terminate(&g_options) && poll_status == 0) { |
| check_timeout(); |
| poll_status = uds_connect(g_options.uds_socket, param->uds); |
| } |
| - if (g_options.terminate || poll_status < 0) |
| + if (get_terminate(&g_options) || poll_status < 0) |
| return -1; |
| } else { |
| param->tcp = tcp_conn_select(g_options.tcp_socket, g_options.tcp6_socket); |
| - if (g_options.terminate || param->tcp == NULL) |
| + if (get_terminate(&g_options) || param->tcp == NULL) |
| return -1; |
| } |
| |
| @@ -601,6 +626,73 @@ int setup_communication_thread(void *(*routine)(void *), |
| return 0; |
| } |
| |
| +int setup_keep_alive_thread(void *(*routine)(void *), |
| + pthread_t *keep_alive_handle) { |
| + NOTE("Setting up keep-alive thread"); |
| + int status = pthread_create(keep_alive_handle, NULL, routine, NULL); |
| + if (status) { |
| + ERR("Failed to create keep-alive thread, error %d", status); |
| + return -1; |
| + } |
| + return 0; |
| +} |
| + |
| +void *run_keep_alive(void* param) { |
| + if (param != NULL) { |
| + ERR("run_keep_alive: Received unexpected parameter in run_keep_alive()"); |
| + set_terminate(1, &g_options); |
| + pthread_exit(NULL); |
| + } |
| + |
| + // Detach this thread so that the main thread does not need to join this |
| + // thread after termination for clean-up. |
| + pthread_detach(pthread_self()); |
| + |
| + // Allow immediate cancelling of this thread. |
| + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); |
| + pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); |
| + |
| + while (!get_terminate(&g_options)) { |
| + int poll_status = poll_keep_alive(g_options.keep_alive_socket); |
| + if (poll_status < 0) { |
| + break; |
| + } else if (poll_status > 0) { |
| + // accept connection. |
| + int conn_fd = accept_keep_alive(g_options.keep_alive_socket); |
| + if (conn_fd < 0) { |
| + break; |
| + } |
| + // read "keep-alive" message. |
| + if (recv_keep_alive(conn_fd) < 0) { |
| + close(conn_fd); |
| + break; |
| + } |
| + |
| + // Lock |keep_alive_mutex| in order to ensure that a timeout doesn't occur |
| + // while we are in the process of sending an 'ack' response. |
| + pthread_mutex_lock(&g_options.keep_alive_mutex); |
| + if (!get_terminate(&g_options)) { |
| + NOTE("Successfully received 'keep-alive'. Resetting countdown"); |
| + set_measuring_timeout(0, &g_options); |
| + // send "ack" message. |
| + if (send_ack(conn_fd) < 0) { |
| + close(conn_fd); |
| + pthread_mutex_unlock(&g_options.keep_alive_mutex); |
| + break; |
| + } |
| + } |
| + pthread_mutex_unlock(&g_options.keep_alive_mutex); |
| + |
| + NOTE("Shutting down connection on keep-alive socket"); |
| + shutdown(conn_fd, SHUT_RDWR); |
| + close(conn_fd); |
| + } |
| + } |
| + |
| + set_terminate(1, &g_options); |
| + pthread_exit(NULL); |
| +} |
| + |
| struct libusb_callback_data *setup_libusb_callback_data( |
| struct http_packet_t *pkt, int *read_inflight, int *empty_response, |
| struct service_thread_param *thread_param, |
| @@ -662,7 +754,8 @@ static void start_daemon() |
| struct usb_sock_t *usb_sock; |
| |
| /* Termination flag */ |
| - g_options.terminate = 0; |
| + pthread_mutex_init(&g_options.terminate_mutex, NULL); |
| + set_terminate(0, &g_options); |
| |
| usb_sock = usb_open(); |
| if (usb_sock == NULL) goto cleanup_usb; |
| @@ -697,6 +790,13 @@ static void start_daemon() |
| g_options.tcp6_socket ? "" : "not "); |
| } |
| |
| + if (g_options.use_keep_alive && g_options.keep_alive_path) { |
| + g_options.keep_alive_socket = uds_open(g_options.keep_alive_path); |
| + if (g_options.keep_alive_socket == NULL) { |
| + goto cleanup_connections; |
| + } |
| + } |
| + |
| /* Lose connection to caller */ |
| uint16_t pid; |
| if (!g_options.nofork_mode && (pid = fork()) > 0) { |
| @@ -739,10 +839,26 @@ static void start_daemon() |
| goto cleanup_connections; |
| } |
| |
| + // Setup dedicated thread for keep-alive socket. |
| + pthread_mutex_init(&g_options.measuring_timeout_mutex, NULL); |
| + pthread_mutex_init(&g_options.keep_alive_mutex, NULL); |
| + pthread_t keep_alive_handle = 0; |
| + int keep_alive_status; |
| + if (g_options.use_keep_alive) { |
| + keep_alive_status = |
| + setup_keep_alive_thread(&run_keep_alive, &keep_alive_handle); |
| + if (keep_alive_status < 0) { |
| + ERR("Failed to launch keep-alive thread"); |
| + goto cleanup_connections; |
| + } |
| + NOTE("Launched keep-alive socket at %d", keep_alive_handle); |
| + } |
| + |
| + pthread_mutex_init(&thread_register_mutex, NULL); |
| + |
| /* Main loop */ |
| uint32_t i = 1; |
| - pthread_mutex_init(&thread_register_mutex, NULL); |
| - while (!g_options.terminate) { |
| + while (!get_terminate(&g_options)) { |
| struct service_thread_param *args = calloc(1, sizeof(*args)); |
| if (args == NULL) { |
| ERR("Preparing thread #%u: Failed to alloc space for thread args", i); |
| @@ -782,6 +898,16 @@ static void start_daemon() |
| } |
| |
| cleanup_connections: |
| + if (g_options.use_keep_alive && keep_alive_status == 0) { |
| + NOTE("Shutting down keep-alive observer thread"); |
| + pthread_cancel(keep_alive_handle); |
| + pthread_join(keep_alive_handle, NULL); |
| + } |
| + |
| + // Close keep-alive socket. |
| + if (g_options.keep_alive_socket != NULL) |
| + uds_close(g_options.keep_alive_socket); |
| + |
| /* Stop DNS-SD advertising of the printer */ |
| if (g_options.dnssd_data != NULL) |
| dnssd_shutdown(); |
| @@ -789,18 +915,23 @@ static void start_daemon() |
| /* Cancel communication threads which did not terminate by themselves when |
| stopping ippusbxd, so that no USB communication with the printer can |
| happen after the final reset */ |
| - while (num_service_threads) { |
| + uint32_t cur; |
| + while ((cur = get_num_service_threads(&num_service_threads, |
| + &thread_register_mutex))) { |
| NOTE("Thread #%u did not terminate, canceling it now ...", |
| service_threads[0]->thread_num); |
| - i = num_service_threads; |
| pthread_cancel(service_threads[0]->thread_handle); |
| - while (i == num_service_threads) |
| + while (cur == get_num_service_threads(&num_service_threads, |
| + &thread_register_mutex)) { |
| usleep(1000000); |
| + } |
| } |
| |
| /* Wait for USB unplug event observer thread to terminate */ |
| - NOTE("Shutting down usb observer thread"); |
| - pthread_join(g_options.usb_event_thread_handle, NULL); |
| + if (usb_sock != NULL && usb_sock->registered_callback) { |
| + NOTE("Shutting down usb observer thread"); |
| + pthread_join(g_options.usb_event_thread_handle, NULL); |
| + } |
| |
| /* TCP clean-up */ |
| if (g_options.tcp_socket!= NULL) |
| @@ -808,14 +939,19 @@ static void start_daemon() |
| if (g_options.tcp6_socket!= NULL) |
| tcp_close(g_options.tcp6_socket); |
| |
| - /* UDS clean-up */ |
| - if (g_options.uds_socket != NULL) |
| - uds_close(g_options.uds_socket); |
| - |
| cleanup_usb: |
| /* USB clean-up and final reset of the printer */ |
| if (usb_sock != NULL) |
| usb_close(usb_sock); |
| + |
| + // NOTE: This must be performed last as ippusb_manager waits until the closure |
| + // of this socket before attempting to launch another instance of |
| + // ippusbxd. |
| + // |
| + // Close UDS socket. |
| + if (g_options.uds_socket != NULL) |
| + uds_close(g_options.uds_socket); |
| + |
| return; |
| } |
| |
| @@ -850,6 +986,7 @@ int main(int argc, char *argv[]) |
| {"only-port", required_argument, 0, 'p' }, |
| {"interface", required_argument, 0, 'i' }, |
| {"uds-path", required_argument, 0, 'U' }, |
| + {"keep-alive", required_argument, 0, 'K' }, |
| {"logging", no_argument, 0, 'l' }, |
| {"debug", no_argument, 0, 'd' }, |
| {"verbose", no_argument, 0, 'q' }, |
| @@ -858,18 +995,13 @@ int main(int argc, char *argv[]) |
| {"help", no_argument, 0, 'h' }, |
| {NULL, 0, 0, 0 } |
| }; |
| + |
| + memset(&g_options, 0, sizeof(g_options)); |
| g_options.log_destination = LOGGING_STDERR; |
| - g_options.only_desired_port = 0; |
| g_options.desired_port = 60000; |
| g_options.interface = "lo"; |
| - g_options.serial_num = NULL; |
| - g_options.vendor_id = 0; |
| - g_options.product_id = 0; |
| - g_options.bus = 0; |
| - g_options.device = 0; |
| - g_options.measuring_timeout = 0; |
| - |
| - while ((c = getopt_long(argc, argv, "qnhdp:P:i:s:lv:m:B", |
| + |
| + while ((c = getopt_long(argc, argv, "qnhdp:P:i:s:lv:m:BU:K:", |
| long_options, &option_index)) != -1) { |
| switch (c) { |
| case '?': |
| @@ -949,6 +1081,10 @@ int main(int argc, char *argv[]) |
| g_options.unix_socket_mode = 1; |
| g_options.unix_socket_path = strdup(optarg); |
| break; |
| + case 'K': |
| + g_options.keep_alive_path = strdup(optarg); |
| + g_options.use_keep_alive = 1; |
| + NOTE("Received keep alive socket at %s", g_options.keep_alive_path); |
| } |
| } |
| |
| diff --git a/src/ippusbxd.h b/src/ippusbxd.h |
| index 69461fe..f7854dd 100644 |
| --- a/src/ippusbxd.h |
| +++ b/src/ippusbxd.h |
| @@ -48,6 +48,11 @@ const int maximum_backoff = 1000; |
| |
| /* Function prototypes */ |
| |
| +/* Gets the value of |num_service_threads|. Uses |thread_mutex| in order to |
| + prevent concurrent access to |num_service_threads|. */ |
| +uint32_t get_num_service_threads(const uint32_t *num_threads, |
| + pthread_mutex_t *thread_mutex); |
| + |
| /* Handles connection requests and |
| is run in a separate thread. It detaches itself from the main thread and sets |
| up a USB connection with the printer. This function spawns a partner thread |
| @@ -90,6 +95,11 @@ int setup_usb_connection(struct usb_sock_t *usb_sock, |
| int setup_communication_thread(void *(*routine)(void *), |
| struct service_thread_param *param); |
| |
| +/* Attempts to register the keep-alive thread to run the given |routine|. |
| + Returns 0 if successful, and a non-zero value otherwise. */ |
| +int setup_keep_alive_thread(void *(*routine)(void *), |
| + pthread_t *keep_alive_handle); |
| + |
| /* Creates a new libusb_callback_data struct with the given paramaters. */ |
| struct libusb_callback_data *setup_libusb_callback_data( |
| struct http_packet_t *pkt, int *read_inflight, int *empty_response, |
| diff --git a/src/logging.h b/src/logging.h |
| index d6a2db5..dff89a9 100644 |
| --- a/src/logging.h |
| +++ b/src/logging.h |
| @@ -50,12 +50,13 @@ enum log_level { |
| #define CONF_1(msg) BASE_LOG(LOGGING_CONFORMANCE, "<%d>Standard Conformance Failure: " msg "\n", TID()) |
| #define CONF_2(msg, ...) BASE_LOG(LOGGING_CONFORMANCE, "<%d>Standard Conformance Failure: " msg "\n", TID(), __VA_ARGS__) |
| |
| -#define ERR_AND_EXIT(...) \ |
| - do { \ |
| - ERR(__VA_ARGS__); \ |
| - if (g_options.dnssd_data != NULL) dnssd_shutdown(g_options.dnssd_data); \ |
| - if (g_options.uds_socket != NULL) uds_close(g_options.uds_socket); \ |
| - exit(-1); \ |
| +#define ERR_AND_EXIT(...) \ |
| + do { \ |
| + ERR(__VA_ARGS__); \ |
| + if (g_options.dnssd_data != NULL) dnssd_shutdown(g_options.dnssd_data); \ |
| + if (g_options.keep_alive != NULL) uds_close(g_options.keep_alive_socket); \ |
| + if (g_options.uds_socket != NULL) uds_close(g_options.uds_socket); \ |
| + exit(-1); \ |
| } while (0) |
| |
| void BASE_LOG(enum log_level, const char *, ...); |
| diff --git a/src/options.c b/src/options.c |
| index 464645d..cc3424d 100644 |
| --- a/src/options.c |
| +++ b/src/options.c |
| @@ -14,5 +14,35 @@ |
| |
| #include "options.h" |
| |
| -struct options g_options; |
| +#include <pthread.h> |
| +#include <string.h> |
| + |
| +void set_measuring_timeout(int val, struct options* option) { |
| + pthread_mutex_lock(&option->measuring_timeout_mutex); |
| + option->measuring_timeout = val; |
| + pthread_mutex_unlock(&option->measuring_timeout_mutex); |
| +} |
| + |
| +int get_measuring_timeout(struct options* option) { |
| + int ret; |
| + pthread_mutex_lock(&option->measuring_timeout_mutex); |
| + ret = option->measuring_timeout; |
| + pthread_mutex_unlock(&option->measuring_timeout_mutex); |
| + return ret; |
| +} |
| |
| +void set_terminate(int val, struct options* option) { |
| + pthread_mutex_lock(&option->terminate_mutex); |
| + option->terminate = val; |
| + pthread_mutex_unlock(&option->terminate_mutex); |
| +} |
| + |
| +int get_terminate(struct options* option) { |
| + int ret; |
| + pthread_mutex_lock(&option->terminate_mutex); |
| + ret = option->terminate; |
| + pthread_mutex_unlock(&option->terminate_mutex); |
| + return ret; |
| +} |
| + |
| +struct options g_options; |
| diff --git a/src/options.h b/src/options.h |
| index dc93bcf..552e10c 100644 |
| --- a/src/options.h |
| +++ b/src/options.h |
| @@ -30,6 +30,7 @@ struct options { |
| uint16_t real_port; |
| char *interface; |
| char *unix_socket_path; |
| + char *keep_alive_path; |
| enum log_target log_destination; |
| |
| /* Behavior */ |
| @@ -38,6 +39,7 @@ struct options { |
| int nofork_mode; |
| int nobroadcast; |
| int unix_socket_mode; |
| + int use_keep_alive; |
| |
| /* Printer identity */ |
| unsigned char *serial_num; |
| @@ -59,6 +61,25 @@ struct options { |
| int scanner_present; |
| |
| struct uds_sock_t *uds_socket; |
| + struct uds_sock_t *keep_alive_socket; |
| + |
| + pthread_mutex_t terminate_mutex; |
| + |
| + // Locks access to the |measuring_timeout| flag. |
| + pthread_mutex_t measuring_timeout_mutex; |
| + |
| + // Used to lock critical sections of the 'keep-alive' process. |
| + pthread_mutex_t keep_alive_mutex; |
| }; |
| |
| +// Accessor functions for the |measuring_timeout| flag which use a mutex to |
| +// prevent concurrent access. |
| +void set_measuring_timeout(int val, struct options *option); |
| +int get_measuring_timeout(struct options *option); |
| + |
| +// Accessor functions for the |terminate| flag which use a mutex to prevent |
| +// concurrent access. |
| +void set_terminate(int val, struct options *option); |
| +int get_terminate(struct options *option); |
| + |
| extern struct options g_options; |
| diff --git a/src/tcp.c b/src/tcp.c |
| index 92a128c..5ed7931 100644 |
| --- a/src/tcp.c |
| +++ b/src/tcp.c |
| @@ -266,7 +266,7 @@ int tcp_packet_send(struct tcp_conn_t *conn, struct http_packet_t *pkt) |
| size_t remaining = pkt->filled_size; |
| size_t total = 0; |
| |
| - while (remaining > 0 && !g_options.terminate) { |
| + while (remaining > 0 && !get_terminate(&g_options)) { |
| ssize_t sent = send(conn->sd, pkt->buffer + total, remaining, MSG_NOSIGNAL); |
| |
| if (sent < 0) { |
| @@ -319,7 +319,7 @@ struct tcp_conn_t *tcp_conn_select(struct tcp_sock_t *sock, |
| } |
| nfds += 1; |
| retval = select(nfds, &rfds, NULL, NULL, NULL); |
| - if (g_options.terminate) |
| + if (get_terminate(&g_options)) |
| goto error; |
| if (retval < 1) { |
| ERR("Failed to open tcp connection"); |
| diff --git a/src/uds.c b/src/uds.c |
| index de349db..6b4498e 100644 |
| --- a/src/uds.c |
| +++ b/src/uds.c |
| @@ -18,6 +18,11 @@ |
| |
| struct uds_sock_t *uds_open(const char *path) |
| { |
| + if (path == NULL) { |
| + ERR("UDS: Given socket path is null"); |
| + goto error; |
| + } |
| + |
| struct uds_sock_t *sock = calloc(1, sizeof(*sock)); |
| if (sock == NULL) { |
| ERR("UDS: Allocating memory for socket failed"); |
| @@ -91,9 +96,9 @@ int uds_connect(struct uds_sock_t *sock, struct uds_conn_t *conn) |
| return -1; |
| } |
| |
| - int poll_result = uds_poll_connection(sock->fd); |
| + int poll_result = uds_poll_connection(sock->fd, 1000); |
| |
| - if (g_options.terminate) |
| + if (get_terminate(&g_options)) |
| return -1; |
| |
| if (poll_result < 0) { |
| @@ -165,7 +170,7 @@ int uds_packet_send(struct uds_conn_t *conn, struct http_packet_t *pkt) |
| size_t remaining = pkt->filled_size; |
| size_t total = 0; |
| |
| - while (remaining > 0 && !g_options.terminate) { |
| + while (remaining > 0 && !get_terminate(&g_options)) { |
| ssize_t sent = send(conn->fd, pkt->buffer + total, remaining, MSG_NOSIGNAL); |
| |
| if (sent < 0) { |
| @@ -190,11 +195,119 @@ int uds_packet_send(struct uds_conn_t *conn, struct http_packet_t *pkt) |
| return 0; |
| } |
| |
| -// Polls the UDS connection in |conn| to see if there is any data to be read. |
| -int uds_poll_connection(int fd) { |
| +// Polls the UDS connection at |fd| for a maximum of |timeout| milliseconds to |
| +// see if there is any data to be read. |
| +int uds_poll_connection(int fd, int timeout) { |
| struct pollfd poll_fd; |
| poll_fd.fd = fd; |
| poll_fd.events = POLLIN; |
| - static int timeout = 1000; |
| return poll(&poll_fd, 1, timeout); |
| } |
| + |
| +int poll_keep_alive(struct uds_sock_t *sock) { |
| + if (sock == NULL) { |
| + ERR("UDS: No valid keep-alive socket provided"); |
| + return -1; |
| + } |
| + |
| + int poll_result = uds_poll_connection(sock->fd, 100); |
| + |
| + if (get_terminate(&g_options)) |
| + return -1; |
| + |
| + if (poll_result < 0) { |
| + ERR("Something went wrong when polling the keep-alive socket"); |
| + return -1; |
| + } |
| + |
| + // There is no data to be read on the socket. |
| + if (poll_result == 0) { |
| + return 0; |
| + } |
| + |
| + NOTE("Found a pending message on the keep-alive socket"); |
| + return poll_result; |
| +} |
| + |
| +int accept_keep_alive(struct uds_sock_t *sock) { |
| + if (get_terminate(&g_options)) { |
| + ERR("Termination has been requested"); |
| + return -1; |
| + } |
| + |
| + int conn_fd = accept(sock->fd, NULL, NULL); |
| + if (conn_fd < 0) { |
| + ERR("Failed to accept connection on keep-alive socket"); |
| + } else { |
| + NOTE("Accepted the connection on the keep-alive socket"); |
| + } |
| + return conn_fd; |
| +} |
| + |
| +int recv_keep_alive(int conn_fd) { |
| + if (get_terminate(&g_options)) { |
| + ERR("Termination has been requested"); |
| + return -1; |
| + } |
| + |
| + uint8_t length = 0; |
| + if (recv(conn_fd, &length, 1, 0) == -1) { |
| + ERR("Failed to read length of message"); |
| + return -1; |
| + } |
| + // We only ever expect to receive the 'keep-alive' message on the |
| + // socket (which includes the trailing '\0' character). |
| + if (length != 11) { |
| + ERR("Received unexpected message length %d", length); |
| + return -1; |
| + } |
| + |
| + char buf[11] = {0}; |
| + size_t bytes_received = 0; |
| + while (bytes_received < length) { |
| + ssize_t bytes_read = |
| + recv(conn_fd, buf + bytes_received, length - bytes_received, 0); |
| + if (bytes_read < 0) { |
| + ERR("Failed to read from keep-alive socket"); |
| + return -1; |
| + } |
| + bytes_received += (size_t)bytes_read; |
| + } |
| + |
| + if (strncmp(buf, "keep-alive", 11) != 0) { |
| + ERR("Received unexpected message '%s' on keep-alive socket", buf); |
| + return -1; |
| + } |
| + |
| + NOTE("Received 'keep-alive' message on keep-alive socket"); |
| + return 0; |
| +} |
| + |
| +int send_ack(int conn_fd) { |
| + if (get_terminate(&g_options)) { |
| + ERR("Termination has been requested"); |
| + return -1; |
| + } |
| + |
| + uint8_t length = 4; |
| + if (send(conn_fd, &length, 1, 0) == -1) { |
| + ERR("Failed to send length prefix byte"); |
| + return -1; |
| + } |
| + |
| + const char *ack = "ack"; |
| + size_t bytes_sent = 0; |
| + size_t bytes_remaining = 4; |
| + while (bytes_remaining > 0) { |
| + ssize_t sent = send(conn_fd, ack + bytes_sent, bytes_remaining, 0); |
| + if (sent < 0) { |
| + ERR("Failed to send 'ack' on keep-alive socket"); |
| + return -1; |
| + } |
| + bytes_sent += (size_t)sent; |
| + bytes_remaining -= (size_t)sent; |
| + } |
| + |
| + NOTE("Sent 'ack' response on keep-alive socket"); |
| + return 0; |
| +} |
| diff --git a/src/uds.h b/src/uds.h |
| index 67c13f9..1a39b08 100644 |
| --- a/src/uds.h |
| +++ b/src/uds.h |
| @@ -43,9 +43,17 @@ struct http_packet_t *uds_packet_get(struct uds_conn_t *conn); |
| |
| int uds_packet_send(struct uds_conn_t *conn, struct http_packet_t *pkt); |
| |
| -// Polls the given file descriptor |fd| to see if there is any data to be read. |
| -// Return values: |
| +// Polls the given file descriptor |fd| up to |timeout| milliseconds to see if |
| +// there is any data to be read. Return values: |
| // 1 - there is data to be read. |
| // 0 - there is not any data to be read. |
| // -1 - there was an error. |
| -int uds_poll_connection(int fd); |
| +int uds_poll_connection(int fd, int timeout); |
| + |
| +int poll_keep_alive(struct uds_sock_t *sock); |
| + |
| +int accept_keep_alive(struct uds_sock_t *sock); |
| + |
| +int recv_keep_alive(int conn_fd); |
| + |
| +int send_ack(int conn_fd); |
| diff --git a/src/usb.c b/src/usb.c |
| index 21c84d4..aa05960 100644 |
| --- a/src/usb.c |
| +++ b/src/usb.c |
| @@ -244,7 +244,7 @@ static int try_claim_usb_interface(struct usb_sock_t *usb, |
| default: |
| break; |
| } |
| - } while (status != 0 && !g_options.terminate); |
| + } while (status != 0 && !get_terminate(&g_options)); |
| |
| return 0; |
| } |
| @@ -530,9 +530,6 @@ void usb_close(struct usb_sock_t *usb) |
| sem_destroy(&usb->interfaces[i].lock); |
| } |
| |
| - NOTE("Resetting printer ..."); |
| - libusb_reset_device(usb->printer); |
| - NOTE("Reset completed."); |
| NOTE("Closing device handle..."); |
| libusb_close(usb->printer); |
| NOTE("Closed device handle."); |
| @@ -600,6 +597,8 @@ static int LIBUSB_CALL usb_exit_on_unplug(libusb_context *context, |
| tcp_close(g_options.tcp6_socket); |
| |
| /* UDS clean-up */ |
| + if (g_options.keep_alive_socket != NULL) |
| + uds_close(g_options.keep_alive_socket); |
| if (g_options.uds_socket != NULL) |
| uds_close(g_options.uds_socket); |
| |
| @@ -615,7 +614,7 @@ static void *usb_pump_events(void *user_data) |
| |
| NOTE("USB unplug event observer thread starting"); |
| |
| - while (!g_options.terminate) { |
| + while (!get_terminate(&g_options)) { |
| /* NOTE: This is a blocking call so |
| no need for sleep() */ |
| struct timeval tv; |
| @@ -629,32 +628,27 @@ static void *usb_pump_events(void *user_data) |
| return NULL; |
| } |
| |
| -void usb_register_callback(struct usb_sock_t *usb) |
| -{ |
| - IGNORE(usb); |
| - |
| - int status = |
| - libusb_hotplug_register_callback(NULL, |
| - LIBUSB_HOTPLUG_EVENT_DEVICE_LEFT, |
| - /* Note: libusb's enum has no default value |
| - a bug has been filled with libusb. |
| - Please switch the below line to 0 |
| - once the issue has been fixed in |
| - deployed versions of libusb |
| - https://github.com/libusb/libusb/issues/35 */ |
| - /* 0, */ |
| - LIBUSB_HOTPLUG_ENUMERATE, |
| - g_options.vendor_id, |
| - g_options.product_id, |
| - LIBUSB_HOTPLUG_MATCH_ANY, |
| - &usb_exit_on_unplug, |
| - NULL, |
| - NULL); |
| +void usb_register_callback(struct usb_sock_t *usb) { |
| + int status = libusb_hotplug_register_callback( |
| + NULL, LIBUSB_HOTPLUG_EVENT_DEVICE_LEFT, |
| + /* Note: libusb's enum has no default value |
| + a bug has been filled with libusb. |
| + Please switch the below line to 0 |
| + once the issue has been fixed in |
| + deployed versions of libusb |
| + https://github.com/libusb/libusb/issues/35 */ |
| + /* 0, */ |
| + LIBUSB_HOTPLUG_ENUMERATE, g_options.vendor_id, g_options.product_id, |
| + LIBUSB_HOTPLUG_MATCH_ANY, &usb_exit_on_unplug, NULL, NULL); |
| if (status == LIBUSB_SUCCESS) { |
| - pthread_create(&(g_options.usb_event_thread_handle), NULL, &usb_pump_events, NULL); |
| + pthread_create(&(g_options.usb_event_thread_handle), NULL, &usb_pump_events, |
| + NULL); |
| + usb->registered_callback = 1; |
| NOTE("Registered unplug callback"); |
| - } else |
| + } else { |
| ERR("Failed to register unplug callback"); |
| + usb->registered_callback = 0; |
| + } |
| } |
| |
| struct usb_conn_t *usb_conn_acquire(struct usb_sock_t *usb) |
| @@ -664,7 +658,7 @@ struct usb_conn_t *usb_conn_acquire(struct usb_sock_t *usb) |
| if (usb->num_avail <= 0) { |
| NOTE("All USB interfaces busy, waiting ..."); |
| for (i = 0; i < 30 && usb->num_avail <= 0; i ++) { |
| - if (g_options.terminate) |
| + if (get_terminate(&g_options)) |
| return NULL; |
| usleep(100000); |
| } |
| @@ -737,7 +731,7 @@ int usb_conn_packet_send(struct usb_conn_t *conn, struct http_packet_t *pkt) |
| int num_timeouts = 0; |
| size_t sent = 0; |
| size_t pending = pkt->filled_size; |
| - while (pending > 0 && !g_options.terminate) { |
| + while (pending > 0 && !get_terminate(&g_options)) { |
| int to_send = (int)pending; |
| |
| NOTE("P %p: USB: want to send %d bytes", pkt, to_send); |
| diff --git a/src/usb.h b/src/usb.h |
| index 3aa939c..5b9ae68 100644 |
| --- a/src/usb.h |
| +++ b/src/usb.h |
| @@ -37,6 +37,10 @@ struct usb_sock_t { |
| char *device_id; |
| int max_packet_size; |
| |
| + // Represents whether the unplug callback was successfully registered for this |
| + // USB device. |
| + int registered_callback; |
| + |
| uint32_t num_interfaces; |
| struct usb_interface *interfaces; |
| |
| -- |
| 2.25.0.265.gbab2e86ba0-goog |
| |