blob: a380ecb2ce016fcac343ea20d691f5eacc60b661 [file] [log] [blame]
From 5661dea72ca23428034b583d0e0365de58e0393a Mon Sep 17 00:00:00 2001
From: DavieV <davidvalleau@gmail.com>
Date: Wed, 14 Feb 2018 14:27:46 -0800
Subject: [PATCH 3/3] Making the communication bi-directional with polling
Changing the implementation of the communication code to use separate threads
which are each responsible for reading from one end of the communication. The
current implementation has to take turns reading from one end at a time, and has
to rely on reading the contents of the HTTP headers in order to determine when
one end has finished communicating. This change allows for both ends to
communicate continuously without having to have any internal state about
which turn it is, and eliminates the need to parse the contents of the
messages.
Since parsing the contents of the messages is no longer necessary, the
majority of the code in the http libraries is being removed.
---
src/http.c | 573 +-------------------------------------
src/http.h | 44 +--
src/ippusbxd.c | 727 ++++++++++++++++++++++++++++++++-----------------
src/options.h | 1 -
src/tcp.c | 72 ++---
src/tcp.h | 3 +-
src/uds.c | 63 ++---
src/uds.h | 5 +-
src/usb.c | 171 ++----------
src/usb.h | 6 +-
10 files changed, 567 insertions(+), 1098 deletions(-)
diff --git a/src/http.c b/src/http.c
index a2c897b..bdfe46b 100644
--- a/src/http.c
+++ b/src/http.c
@@ -27,551 +27,28 @@
#define BUFFER_STEP (1 << 12)
-struct http_message_t *http_message_new()
+struct http_packet_t *packet_new()
{
- struct http_message_t *msg = calloc(1, sizeof(*msg));
- if (msg == NULL) {
- ERR("failed to alloc space for http message");
- return NULL;
- }
-
- msg->spare_capacity = 0;
- msg->spare_filled = 0;
- msg->spare_buffer = NULL;
-
- return msg;
-}
-
-void message_free(struct http_message_t *msg)
-{
- free(msg->spare_buffer);
- free(msg);
-}
-
-static void packet_check_completion(struct http_packet_t *pkt)
-{
- struct http_message_t *msg = pkt->parent_message;
- /* Msg full */
- if (msg->claimed_size && msg->received_size >= msg->claimed_size) {
- msg->is_completed = 1;
- NOTE("http: Message completed: Received size >= claimed size");
-
- /* Sanity check */
- if (msg->spare_filled > 0)
- ERR_AND_EXIT("Msg spare not empty upon completion");
- }
-
- /* Pkt full */
- if (pkt->expected_size && pkt->filled_size >= pkt->expected_size) {
- pkt->is_completed = 1;
- NOTE("http: Packet completed: Packet full");
- }
-
- /* Pkt over capacity */
- if (pkt->filled_size > pkt->buffer_capacity) {
- /* Santiy check */
- ERR_AND_EXIT("Overflowed packet buffer");
- }
-}
-
-static int doesMatch(const char *matcher, size_t matcher_len,
- const uint8_t *key, size_t key_len)
-{
- for (size_t i = 0; i < matcher_len; i++)
- if (i >= key_len || matcher[i] != key[i])
- return 0;
- return 1;
-}
-
-static int inspect_header_field(struct http_packet_t *pkt, size_t header_size,
- char *key, size_t key_size)
-{
- /* Find key */
- uint8_t *pos = memmem(pkt->buffer, header_size, key, key_size);
- if (pos == NULL)
- return -1;
-
- /* Find first digit */
- size_t number_pos = (size_t) (pos - pkt->buffer) + key_size;
- while (number_pos < pkt->filled_size && !isdigit(pkt->buffer[number_pos]))
- ++number_pos;
-
- /* Find next non-digit */
- size_t number_end = number_pos;
- while (number_end < pkt->filled_size && isdigit(pkt->buffer[number_end]))
- ++number_end;
-
- /* Failed to find next non-digit
- header field might be broken */
- if (number_end >= pkt->filled_size)
- return -1;
-
- /* Temporary stringification of buffer for atoi() */
- uint8_t original_char = pkt->buffer[number_end];
- pkt->buffer[number_end] = '\0';
- int val = atoi((const char *)(pkt->buffer + number_pos));
-
- /* Restore buffer */
- pkt->buffer[number_end] = original_char;
- return val;
-}
-
-static void packet_store_excess(struct http_packet_t *pkt)
-{
- struct http_message_t *msg = pkt->parent_message;
- if (msg->spare_buffer != NULL)
- ERR_AND_EXIT("Do not store excess to non-empty packet");
-
- if (pkt->expected_size >= pkt->filled_size)
- ERR_AND_EXIT("Do not call packet_store_excess() unless needed");
-
- size_t spare_size = pkt->filled_size - pkt->expected_size;
- size_t non_spare = pkt->expected_size;
- NOTE("HTTP: Storing %d bytes of excess", spare_size);
-
- /* Align to BUFFER_STEP */
- size_t needed_size = 0;
- needed_size += spare_size / BUFFER_STEP;
- needed_size += (spare_size % BUFFER_STEP) > 0 ? BUFFER_STEP : 0;
-
- if (msg->spare_buffer == NULL) {
- uint8_t *buffer = calloc(1, needed_size);
- if (buffer == NULL)
- ERR_AND_EXIT("Failed to alloc msg spare buffer");
- msg->spare_buffer = buffer;
- }
-
- memcpy(msg->spare_buffer, pkt->buffer + non_spare, spare_size);
- pkt->filled_size = non_spare;
-
- msg->spare_capacity = needed_size;
- msg->spare_filled = spare_size;
-}
-
-static void packet_take_spare(struct http_packet_t *pkt)
-{
- struct http_message_t *msg = pkt->parent_message;
- if (msg->spare_filled == 0)
- return;
-
- if (msg->spare_buffer == NULL)
- return;
-
- if (pkt->filled_size > 0)
- ERR_AND_EXIT("pkt should be empty when loading msg spare");
-
- /* Take message's buffer */
- size_t msg_size = msg->spare_capacity;
- size_t msg_filled = msg->spare_filled;
- uint8_t *msg_buffer = msg->spare_buffer;
-
- pkt->buffer_capacity = msg_size;
- pkt->filled_size = msg_filled;
- pkt->buffer = msg_buffer;
-
- msg->spare_capacity = 0;
- msg->spare_filled = 0;
- msg->spare_buffer = NULL;
-}
-
-static ssize_t packet_find_chunked_size(struct http_packet_t *pkt)
-{
- /* NOTE:
- chunks can have trailers which are
- tacked on http header fields.
- NOTE:
- chunks may also have extensions.
- No one uses or supports them. */
-
- /* Find end of size string */
- if (pkt->filled_size >= SSIZE_MAX)
- ERR_AND_EXIT("Buffer beyond sane size");
-
- ssize_t max = (ssize_t) pkt->filled_size;
- ssize_t size_end = -1;
- ssize_t miniheader_end = -1;
- ssize_t delimiter_start = -1;
- for (ssize_t i = 0; i < max; i++) {
-
- uint8_t *buf = pkt->buffer;
- if (size_end < 0) {
- /* No extension */
- if (i + 1 < max &&
- (buf[i] == '\r' && /* CR */
- buf[i + 1] == '\n')) { /* LF */
- size_end = i + 1;
- miniheader_end = size_end;
- delimiter_start = i;
- break;
- }
-
- /* No extension */
- if (buf[i] == '\n') { /* LF */
- size_end = i;
- miniheader_end = size_end;
- delimiter_start = i;
- break;
- }
-
- /* Has extensions */
- if (buf[i] == ';') {
- size_end = i;
- continue;
- }
- }
- if (miniheader_end < 0) {
- if (i + 1 < max &&
- (buf[i] == '\r' && /* CR */
- buf[i + 1] == '\n')) { /* LF */
- miniheader_end = i + 1;
- delimiter_start = i;
- break;
- }
-
- if (buf[i] == '\n') { /* LF */
- miniheader_end = i;
- delimiter_start = i;
- break;
- }
- }
- }
-
- if (miniheader_end < 0) {
- /* NOTE: knowing just the size field
- is not enough since the extensions
- are not included in the size */
- NOTE("failed to find chunk mini-header so far");
- return -1;
- }
-
- /* Temporary stringification for strtol() */
- uint8_t original_char = *(pkt->buffer + size_end);
- *(pkt->buffer + size_end) = '\0';
- size_t size = strtoul((char *)pkt->buffer, NULL, 16);
- NOTE("Chunk size raw: %s", pkt->buffer);
- *(pkt->buffer + size_end) = original_char;
- if (size > SSIZE_MAX)
- ERR_AND_EXIT("chunk size is insane");
-
- if (size > 0) {
- /* Regular chunk */
- ssize_t chunk_size = (ssize_t) size; /* Chunk body */
- chunk_size += miniheader_end + 1; /* Mini-header */
- chunk_size += 2; /* Trailing CRLF */
- NOTE("HTTP: Chunk size: %lu", chunk_size);
- return (ssize_t) chunk_size;
- }
-
- /* Terminator chunk
- May have trailers in body */
- ssize_t full_size = -1;
- for (ssize_t i = delimiter_start; i < max; i++) {
- uint8_t *buf = pkt->buffer;
- if (i + 3 < max &&
- (buf[i] == '\r' && /* CR */
- buf[i + 1] == '\n' && /* LF */
- buf[i + 2] == '\r' && /* CR */
- buf[i + 3] == '\n')) { /* LF */
- full_size = i + 4;
- break;
- }
-
- if (i + 1 < max &&
- buf[i] == '\n' && /* LF */
- buf[i + 1] == '\n') { /* LF */
- full_size = i + 2;
- break;
- }
- }
-
- if (full_size < 0) {
- NOTE("Chunk miniheader present but body incomplete");
- return -1;
- }
-
- NOTE("Found end chunked packet");
- pkt->parent_message->is_completed = 1;
- pkt->is_completed = 1;
- return full_size;
-}
-
-static ssize_t packet_get_header_size(struct http_packet_t *pkt)
-{
- if (pkt->header_size != 0)
- goto found;
-
- /*
- * RFC2616 recomends we match newline on \n despite full
- * complience requires the message to use only \r\n
- * http://www.w3.org/Protocols/rfc2616/rfc2616-sec19.html#sec19.3
- */
-
- /* Find header */
- for (size_t i = 0; i < pkt->filled_size && i < SSIZE_MAX; i++) {
- /* two \r\n pairs */
- if ((i + 3) < pkt->filled_size &&
- '\r' == pkt->buffer[i] &&
- '\n' == pkt->buffer[i + 1] &&
- '\r' == pkt->buffer[i + 2] &&
- '\n' == pkt->buffer[i + 3]) {
- pkt->header_size = i + 4;
- goto found;
- }
-
- /* two \n pairs */
- if ((i + 1) < pkt->filled_size &&
- '\n' == pkt->buffer[i] &&
- '\n' == pkt->buffer[i + 1]) {
- pkt->header_size = i + 2;
- goto found;
- }
- }
-
- return -1;
-
- found:
- return (ssize_t) pkt->header_size;
-}
-
-enum http_request_t packet_find_type(struct http_packet_t *pkt)
-{
- enum http_request_t type = HTTP_UNSET;
- size_t size = 0;
- /*
- * Valid methods for determining http request
- * size are defined by W3 in RFC2616 section 4.4
- * link: http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.4
- */
-
- /*
- * This function attempts to find what method this
- * packet would use. This is only possible in specific case:
- * 1. if the request uses method 1 we can check the http
- * request type. We must be called on a packet which
- * has the full header.
- * 2. if the request uses method 2 we need the full header
- * but a simple network-byte-order-aware string search
- * works. This function does not work if called with
- * a chunked transport's sub-packet.
- * 3. if the request uses method 3 we again perform the
- * string search.
- *
- * All cases require the packat to contain the full header.
- */
-
- ssize_t header_size_raw = packet_get_header_size(pkt);
- if (header_size_raw < 0) {
- /* We don't have the header yet */
- goto do_ret;
- }
- size_t header_size = (size_t) header_size_raw;
-
- /* Try Transfer-Encoding Chunked */
- char xfer_encode_str[] = "Transfer-Encoding: chunked";
- size_t xfer_encode_str_size = sizeof(xfer_encode_str) - 1;
- uint8_t *xfer_encode_pos = memmem(pkt->buffer, header_size,
- xfer_encode_str,
- xfer_encode_str_size);
- if (xfer_encode_pos != NULL) {
- size = 0;
- type = HTTP_CHUNKED;
- goto do_ret;
- }
-
- /* Try Content-Length */
- char content_length_str[] = "Content-Length: ";
- ssize_t contlen_size =
- inspect_header_field(pkt, header_size,
- content_length_str, sizeof(content_length_str) - 1);
- if (contlen_size >= 0) {
- size = (size_t) contlen_size + header_size;
- type = HTTP_CONTENT_LENGTH;
- goto do_ret;
- }
-
- /* GET requests (start with GET) or answers from the server (start
- with HTTP) */
- if (doesMatch("GET", 3, pkt->buffer, pkt->filled_size) ||
- doesMatch("HTTP", 4, pkt->buffer, pkt->filled_size)) {
- size = header_size;
- type = HTTP_HEADER_ONLY;
- goto do_ret;
- }
-
- /* No size was detectable yet header was found */
- type = HTTP_UNKNOWN;
- size = 0;
-
- do_ret:
- pkt->parent_message->claimed_size = size;
- pkt->parent_message->type = type;
- return type;
-}
-
-size_t packet_pending_bytes(struct http_packet_t *pkt)
-{
- struct http_message_t *msg = pkt->parent_message;
-
- /* Check Cache */
- if (pkt->expected_size > 0)
- goto pending_known;
-
- if (HTTP_UNSET == msg->type) {
- msg->type = packet_find_type(pkt);
-
- if (HTTP_CHUNKED == msg->type) {
- /* Note: this was the packet with the
- header of our chunked message. */
-
- /* Save any non-header data we got */
- ssize_t header_size = packet_get_header_size(pkt);
-
- /* Sanity check */
- if (header_size < 0 ||
- (size_t)header_size > pkt->filled_size)
- ERR_AND_EXIT("HTTP: Could not find header twice");
-
- NOTE("HTTP: Chunked header size is %ld bytes",
- header_size);
- pkt->expected_size = (size_t) header_size;
- msg->claimed_size = 0;
- goto pending_known;
- }
- }
-
- if (HTTP_CHUNKED == msg->type) {
- if (pkt->filled_size == 0) {
- /* Grab chunk's mini-header */
- goto pending_known;
- }
-
- if (pkt->expected_size == 0) {
- /* Check chunk's mini-header */
- ssize_t size = packet_find_chunked_size(pkt);
- if (size <= 0) {
- ERR("=============================================");
- ERR("Malformed chunk-transport http header receivd");
- ERR("Missing chunk's mini-headers in first data");
- ERR("Have %d bytes", pkt->filled_size);
- printf("%.*s\n", (int)pkt->filled_size, pkt->buffer);
- ERR("Malformed chunk-transport http header receivd");
- ERR("=============================================");
- goto pending_known;
- }
-
- pkt->expected_size = (size_t) size;
- msg->claimed_size = 0;
- }
-
- goto pending_known;
- }
- if (HTTP_HEADER_ONLY == msg->type) {
- /* Note: we can only know it is header only
- when the buffer already contains the header.
- So this next call cannot fail. */
- pkt->expected_size = (size_t) packet_get_header_size(pkt);
- msg->claimed_size = pkt->expected_size;
- goto pending_known;
- }
- if (HTTP_CONTENT_LENGTH == msg->type) {
- /* Note: find_header() has
- filled msg's claimed_size */
- msg->claimed_size = msg->claimed_size;
- pkt->expected_size = msg->claimed_size;
- goto pending_known;
- }
-
- pending_known:
-
- /* Save excess data */
- if (pkt->expected_size && pkt->filled_size > pkt->expected_size)
- packet_store_excess(pkt);
-
- size_t expected = pkt->expected_size;
- if (expected == 0)
- expected = msg->claimed_size;
- if (expected == 0)
- expected = pkt->buffer_capacity;
-
- /* Sanity check */
- if (expected < pkt->filled_size)
- ERR_AND_EXIT("Expected cannot be larger than filled");
-
- size_t pending = expected - pkt->filled_size;
-
- /* Expand buffer as needed */
- while (pending + pkt->filled_size > pkt->buffer_capacity) {
- ssize_t size_added = packet_expand(pkt);
- if (size_added < 0) {
- WARN("packet at max allowed size");
- return 0;
- }
- if (size_added == 0) {
- ERR("Failed to expand packet");
- return 0;
- }
- }
-
- packet_check_completion(pkt);
-
- return pending;
-}
-
-void packet_mark_received(struct http_packet_t *pkt, size_t received)
-{
- struct http_message_t *msg = pkt->parent_message;
- msg->received_size += received;
-
- pkt->filled_size += received;
- if (received) {
- NOTE("HTTP: got %lu bytes so: pkt has %lu bytes, "
- "msg has %lu bytes",
- received, pkt->filled_size, msg->received_size);
- }
-
- packet_check_completion(pkt);
-
- if (pkt->filled_size > pkt->buffer_capacity)
- ERR_AND_EXIT("Overflowed packet's buffer");
-
- if (pkt->expected_size && pkt->filled_size > pkt->expected_size) {
- /* Store excess data */
- packet_store_excess(pkt);
- }
-}
-
-struct http_packet_t *packet_new(struct http_message_t *parent_msg)
-{
- struct http_packet_t *pkt = NULL;
- uint8_t *buf = NULL;
size_t const capacity = BUFFER_STEP;
- assert(parent_msg != NULL);
- pkt = calloc(1, sizeof(*pkt));
+ struct http_packet_t *pkt = calloc(1, sizeof(*pkt));
if (pkt == NULL) {
ERR("failed to alloc packet");
return NULL;
}
- pkt->parent_message = parent_msg;
- pkt->expected_size = 0;
-
- /* Claim any spare data from prior packets */
- packet_take_spare(pkt);
- if (pkt->buffer == NULL) {
- buf = calloc(capacity, sizeof(*buf));
- if (buf == NULL) {
- ERR("failed to alloc space for packet's buffer or space for packet");
- free(pkt);
- return NULL;
- }
-
- /* Assemble packet */
- pkt->buffer = buf;
- pkt->buffer_capacity = capacity;
- pkt->filled_size = 0;
+ uint8_t *buf = calloc(capacity, sizeof(*buf));
+ if (buf == NULL) {
+ ERR("failed to alloc space for packet's buffer or space for packet");
+ free(pkt);
+ return NULL;
}
+ /* Assemble packet */
+ pkt->buffer = buf;
+ pkt->buffer_capacity = capacity;
+ pkt->filled_size = 0;
+
return pkt;
}
@@ -580,29 +57,3 @@ void packet_free(struct http_packet_t *pkt)
free(pkt->buffer);
free(pkt);
}
-
-#define MAX_PACKET_SIZE (1 << 26) /* 64MiB */
-ssize_t packet_expand(struct http_packet_t *pkt)
-{
- size_t cur_size = pkt->buffer_capacity;
- size_t new_size = cur_size * 2;
- if (new_size > MAX_PACKET_SIZE) {
- WARN("HTTP: cannot expand packet beyond limit");
- return -1;
- }
- NOTE("HTTP: doubling packet buffer to %lu", new_size);
-
- uint8_t *new_buf = realloc(pkt->buffer, new_size);
- if (new_buf == NULL) {
- /* If realloc fails the original buffer is still valid */
- WARN("Failed to expand packet");
- return 0;
- }
- pkt->buffer = new_buf;
- pkt->buffer_capacity = new_size;
-
- size_t diff = new_size - cur_size;
- if (diff > SSIZE_MAX)
- ERR_AND_EXIT("Buffer expanded beyond sane limit");
- return (ssize_t) diff;
-}
diff --git a/src/http.h b/src/http.h
index 44744ec..664fae4 100644
--- a/src/http.h
+++ b/src/http.h
@@ -16,51 +16,11 @@
#include <stdint.h>
#include <sys/types.h>
-enum http_request_t {
- HTTP_UNSET,
- HTTP_UNKNOWN,
- HTTP_CHUNKED,
- HTTP_CONTENT_LENGTH,
- HTTP_HEADER_ONLY
-};
-
-struct http_message_t {
- enum http_request_t type;
-
- size_t spare_filled;
- size_t spare_capacity;
- uint8_t *spare_buffer;
-
- size_t unreceived_size;
- uint8_t is_completed;
-
- /* Detected from child packets */
- size_t claimed_size;
- size_t received_size;
-};
-
struct http_packet_t {
- /* Cache */
- size_t header_size;
-
size_t filled_size;
- size_t expected_size;
-
size_t buffer_capacity;
uint8_t *buffer;
-
- struct http_message_t *parent_message;
-
- uint8_t is_completed;
};
-struct http_message_t *http_message_new(void);
-void message_free(struct http_message_t *);
-
-enum http_request_t packet_find_type(struct http_packet_t *pkt);
-size_t packet_pending_bytes(struct http_packet_t *);
-void packet_mark_received(struct http_packet_t *, size_t);
-
-struct http_packet_t *packet_new(struct http_message_t *);
-void packet_free(struct http_packet_t *);
-ssize_t packet_expand(struct http_packet_t *);
+struct http_packet_t *packet_new();
+void packet_free(struct http_packet_t *pkt);
diff --git a/src/ippusbxd.c b/src/ippusbxd.c
index f5cbd63..5ef5d17 100644
--- a/src/ippusbxd.c
+++ b/src/ippusbxd.c
@@ -23,6 +23,9 @@
#include <unistd.h>
#include <getopt.h>
#include <pthread.h>
+#include <errno.h>
+
+#include <libusb.h>
#include "options.h"
#include "logging.h"
@@ -36,14 +39,53 @@ struct service_thread_param {
struct tcp_conn_t *tcp;
struct uds_conn_t *uds;
struct usb_sock_t *usb_sock;
+ struct usb_conn_t *usb_conn;
pthread_t thread_handle;
- int thread_num;
+ uint32_t thread_num;
+ pthread_cond_t *cond;
+};
+
+struct libusb_callback_data {
+ int *read_inflight;
+ uint32_t thread_num;
+ struct tcp_conn_t *tcp;
+ struct uds_conn_t *uds;
+ struct http_packet_t *pkt;
+ pthread_mutex_t *read_inflight_mutex;
+ pthread_cond_t *read_inflight_cond;
};
+/* Function prototypes */
+static void *service_connection(void *params_void);
+
+static void service_socket_connection(struct service_thread_param *params);
+
+static void *service_printer_connection(void *params_void);
+
+static int allocate_socket_connection(struct service_thread_param *param);
+
+static int setup_socket_connection(struct service_thread_param *param);
+
+static int setup_usb_connection(struct usb_sock_t *usb_sock,
+ struct service_thread_param *param);
+
+static int setup_communication_thread(void *(*routine)(void *),
+ struct service_thread_param *param);
+
+static int get_read_inflight(const int *read_inflight,
+ pthread_mutex_t *read_inflight_mutex);
+
+static struct libusb_callback_data *setup_libusb_callback_data(
+ struct http_packet_t *pkt, int *read_inflight,
+ struct service_thread_param *thread_param,
+ pthread_mutex_t *read_inflight_mutex);
+
+static int is_socket_open(const struct service_thread_param *param);
+
+/* Global variables */
static pthread_mutex_t thread_register_mutex;
static struct service_thread_param **service_threads = NULL;
static int num_service_threads = 0;
-
static struct timeval start_time;
static void sigterm_handler(int sig)
@@ -66,7 +108,7 @@ static void list_service_threads(int num_service_threads,
snprintf(p, sizeof(buf) - strlen(buf), "None");
} else {
for (i = 0; i < num_service_threads; i ++) {
- snprintf(p, sizeof(buf) - strlen(buf), "#%d, ",
+ snprintf(p, sizeof(buf) - strlen(buf), "#%u, ",
service_threads[i]->thread_num);
p = buf + strlen(buf);
}
@@ -81,12 +123,12 @@ static int register_service_thread(int *num_service_threads,
struct service_thread_param ***service_threads,
struct service_thread_param *new_thread)
{
- NOTE("Registering thread #%d", new_thread->thread_num);
+ NOTE("Registering thread #%u", new_thread->thread_num);
(*num_service_threads) ++;
*service_threads = realloc(*service_threads,
*num_service_threads * sizeof(void*));
if (*service_threads == NULL) {
- ERR("Registering thread #%d: Failed to alloc space for thread registration list",
+ ERR("Registering thread #%u: Failed to alloc space for thread registration list",
new_thread->thread_num);
return -1;
}
@@ -94,18 +136,18 @@ static int register_service_thread(int *num_service_threads,
return 0;
}
-static int unregister_service_thread(int *num_service_threads,
- struct service_thread_param ***service_threads,
- int thread_num)
+static int unregister_service_thread(
+ int *num_service_threads, struct service_thread_param ***service_threads,
+ uint32_t thread_num)
{
int i;
- NOTE("Unregistering thread #%d", thread_num);
+ NOTE("Unregistering thread #%u", thread_num);
for (i = 0; i < *num_service_threads; i ++)
if ((*service_threads)[i]->thread_num == thread_num)
break;
if (i >= *num_service_threads) {
- ERR("Unregistering thread #%d: Cannot unregister, not found", thread_num);
+ ERR("Unregistering thread #%u: Cannot unregister, not found", thread_num);
return -1;
}
(*num_service_threads) --;
@@ -116,7 +158,7 @@ static int unregister_service_thread(int *num_service_threads,
if (*num_service_threads == 0)
*service_threads = NULL;
else if (*service_threads == NULL) {
- ERR("Unregistering thread #%d: Failed to alloc space for thread registration list",
+ ERR("Unregistering thread #%u: Failed to alloc space for thread registration list",
thread_num);
return -1;
}
@@ -126,14 +168,81 @@ static int unregister_service_thread(int *num_service_threads,
static void
cleanup_handler(void *arg_void)
{
- int thread_num = *((int *)(arg_void));
- NOTE("Thread #%d: Called clean-up handler", thread_num);
+ uint32_t thread_num = *((int *)(arg_void));
+ NOTE("Thread #%u: Called clean-up handler", thread_num);
pthread_mutex_lock(&thread_register_mutex);
unregister_service_thread(&num_service_threads, &service_threads, thread_num);
list_service_threads(num_service_threads, service_threads);
pthread_mutex_unlock(&thread_register_mutex);
}
+static void read_transfer_callback(struct libusb_transfer *transfer)
+{
+ struct libusb_callback_data *user_data =
+ (struct libusb_callback_data *)transfer->user_data;
+
+ uint32_t thread_num = user_data->thread_num;
+ pthread_mutex_t *read_inflight_mutex = user_data->read_inflight_mutex;
+ pthread_cond_t *read_inflight_cond = user_data->read_inflight_cond;
+
+ switch (transfer->status) {
+ case LIBUSB_TRANSFER_COMPLETED:
+ NOTE("Thread #%u: Transfer has completed successfully", thread_num);
+ user_data->pkt->filled_size = transfer->actual_length;
+
+ NOTE("Thread #%u: Pkt from %s (buffer size: %zu)\n===\n%s===", thread_num,
+ "usb", user_data->pkt->filled_size,
+ hexdump(user_data->pkt->buffer, (int)user_data->pkt->filled_size));
+
+ if (g_options.unix_socket_mode)
+ uds_packet_send(user_data->uds, user_data->pkt);
+ else
+ tcp_packet_send(user_data->tcp, user_data->pkt);
+
+ break;
+ case LIBUSB_TRANSFER_ERROR:
+ ERR("Thread #%u: There was an error completing the transfer", thread_num);
+ g_options.terminate = 1;
+ break;
+ case LIBUSB_TRANSFER_TIMED_OUT:
+ ERR("Thread #%u: The transfer timed out before it could be completed: "
+ "Received %u bytes",
+ thread_num, transfer->actual_length);
+ break;
+ case LIBUSB_TRANSFER_CANCELLED:
+ NOTE("Thread #%u: The transfer was cancelled", thread_num);
+ break;
+ case LIBUSB_TRANSFER_STALL:
+ ERR("Thread #%u: The transfer has stalled", thread_num);
+ g_options.terminate = 1;
+ break;
+ case LIBUSB_TRANSFER_NO_DEVICE:
+ ERR("Thread #%u: The printer was disconnected during the transfer",
+ thread_num);
+ g_options.terminate = 1;
+ break;
+ case LIBUSB_TRANSFER_OVERFLOW:
+ ERR("Thread #%u: The printer sent more data than was requested",
+ thread_num);
+ g_options.terminate = 1;
+ break;
+ default:
+ ERR("Thread #%u: Something unexpected happened", thread_num);
+ g_options.terminate = 1;
+ }
+
+ /* Mark the transfer as completed. */
+ pthread_mutex_lock(read_inflight_mutex);
+ *user_data->read_inflight = 0;
+ pthread_cond_broadcast(read_inflight_cond);
+ pthread_mutex_unlock(read_inflight_mutex);
+
+ /* Cleanup the data used for the transfer */
+ packet_free(user_data->pkt);
+ free(user_data);
+ libusb_free_transfer(transfer);
+}
+
static void check_timeout(void)
{
if (num_service_threads == 0 && !g_options.measuring_timeout) {
@@ -165,213 +274,248 @@ static void check_timeout(void)
}
}
-static void *service_connection(void *arg_void)
+/* This function is responsible for handling connection requests and
+ is run in a separate thread. It detaches itself from the main thread and sets
+ up a USB connection with the printer. This function spawns a partner thread
+ which is responsible for reading from the printer, and then this function
+ calls into service_socket_connection() which is responsible for reading from
+ the socket which made the connection request. Once the socket has closed its
+ end of communiction, this function notifies its partner thread that the
+ connection has been closed and then joins on the partner thread before
+ shutting down. */
+static void *service_connection(void *params_void)
{
- struct service_thread_param *arg =
- (struct service_thread_param *)arg_void;
- int thread_num = arg->thread_num;
+ struct service_thread_param *params =
+ (struct service_thread_param *)params_void;
+ uint32_t thread_num = params->thread_num;
- NOTE("Thread #%d: Starting", thread_num);
+ NOTE("Thread #%u: Setting up both ends for communication", thread_num);
- /* Detach this thread so that the main thread does not need to join this thread
- after termination for clean-up */
+ /* Detach this thread so that the main thread does not need to join this
+ thread after termination for clean-up. */
pthread_detach(pthread_self());
- /* Register clean-up handler */
+ /* Register clean-up handler. */
pthread_cleanup_push(cleanup_handler, &thread_num);
- /* Allow immediate cancelling of this thread */
+ /* Allow immediate cancelling of this thread. */
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
- /* classify priority */
- struct usb_conn_t *usb = NULL;
- int usb_failed = 0;
- while (((g_options.unix_socket_mode && !arg->uds->is_closed) ||
- (!g_options.unix_socket_mode && !arg->tcp->is_closed)) &&
- usb_failed == 0 && !g_options.terminate) {
- struct http_message_t *server_msg = NULL;
- struct http_message_t *client_msg = NULL;
-
- /* Client's request */
- client_msg = http_message_new();
- if (client_msg == NULL) {
- ERR("Thread #%d: Failed to create client message", thread_num);
- break;
- }
- NOTE("Thread #%d: M %p: Client msg starting",
- thread_num, client_msg);
-
- while (!client_msg->is_completed && !g_options.terminate) {
- struct http_packet_t *pkt;
-
- pkt = g_options.unix_socket_mode ? uds_packet_get(arg->uds, client_msg)
- : tcp_packet_get(arg->tcp, client_msg);
-
- if (pkt == NULL) {
- if ((g_options.unix_socket_mode && arg->uds->is_closed) ||
- (!g_options.unix_socket_mode && arg->tcp->is_closed)) {
- NOTE("Thread #%d: M %p: Client closed connection", thread_num,
- client_msg);
- goto cleanup_subconn;
- }
- ERR("Thread #%d: M %p: Got null packet from %s", thread_num,
- client_msg, g_options.unix_socket_mode ? "uds" : "tcp");
- goto cleanup_subconn;
- }
+ /* Attempt to establish a connection with the printer. */
+ if (setup_usb_connection(params->usb_sock, params))
+ goto cleanup;
+
+ /* Condition variable used to broadcast updates to the printer thread. */
+ pthread_cond_t cond;
+ if (pthread_cond_init(&cond, NULL))
+ goto cleanup;
+ params->cond = &cond;
+
+ /* Copy the contents of |params| into |printer_params|. The only
+ differences between the two are the |thread_num| and |thread_handle|. */
+ struct service_thread_param *printer_params =
+ calloc(1, sizeof(*printer_params));
+ memcpy(printer_params, params, sizeof(*printer_params));
+ printer_params->thread_num += 1;
+
+ /* Attempt to start the printer's end of the communication. */
+ NOTE("Thread #%u: Attempting to register thread %u", thread_num,
+ thread_num + 1);
+ if (setup_communication_thread(&service_printer_connection, printer_params))
+ goto cleanup;
+
+ /* This function will run until the socket has been closed. When this function
+ returns it means that the communication has been completed. */
+ service_socket_connection(params);
+
+ /* Notify the printer's end that the socket has closed so that it does not
+ have to wait for any pending asynchronous transfers to complete. */
+ pthread_cond_broadcast(params->cond);
+
+ /* Wait for the printer thread to exit. */
+ NOTE("Thread #%u: Waiting for thread #%u to complete", thread_num,
+ printer_params->thread_num);
+ if (pthread_join(printer_params->thread_handle, NULL))
+ ERR("Thread #%u: Something went wrong trying to join the printer thread",
+ thread_num);
+
+cleanup:
+ if (params->usb_conn != NULL) {
+ NOTE("Thread #%u: interface #%u: releasing usb conn", thread_num,
+ params->usb_conn->interface_index);
+ usb_conn_release(params->usb_conn);
+ params->usb_conn = NULL;
+ }
- if (usb == NULL && arg->usb_sock != NULL) {
- usb = usb_conn_acquire(arg->usb_sock);
- if (usb == NULL) {
- ERR("Thread #%d: M %p: Failed to acquire usb interface",
- thread_num, client_msg);
- packet_free(pkt);
- usb_failed = 1;
- goto cleanup_subconn;
- }
- usb_failed = 0;
- NOTE("Thread #%d: M %p: Interface #%d: acquired usb conn",
- thread_num, client_msg,
- usb->interface_index);
- }
+ NOTE("Thread #%u: closing, %s", thread_num,
+ g_options.terminate ? "shutdown requested"
+ : "communication thread terminated");
+ if (g_options.unix_socket_mode)
+ uds_conn_close(params->uds);
+ else
+ tcp_conn_close(params->tcp);
+ free(params);
- if (g_options.terminate)
- goto cleanup_subconn;
-
- NOTE("Thread #%d: M %p P %p: Pkt from %s (buffer size: %d)\n===\n%s===",
- thread_num, client_msg, pkt,
- g_options.unix_socket_mode ? "uds" : "tcp",
- pkt->filled_size,
- hexdump(pkt->buffer, (int)pkt->filled_size));
- /* In no-printer mode we simply ignore passing the
- client message on to the printer */
- if (arg->usb_sock != NULL) {
- if (usb_conn_packet_send(usb, pkt) != 0) {
- ERR("Thread #%d: M %p P %p: Interface #%d: Unable to send client package via USB",
- thread_num,
- client_msg, pkt, usb->interface_index);
- packet_free(pkt);
- goto cleanup_subconn;
- }
- NOTE("Thread #%d: M %p P %p: Interface #%d: Client pkt done",
- thread_num,
- client_msg, pkt, usb->interface_index);
- }
- packet_free(pkt);
- }
- if (usb != NULL)
- NOTE("Thread #%d: M %p: Interface #%d: Client msg completed",
- thread_num, client_msg,
- usb->interface_index);
+ /* Execute clean-up handler. */
+ pthread_cleanup_pop(1);
+ pthread_exit(NULL);
+}
+
+/* Reads from the socket and writes data to the printer. */
+static void service_socket_connection(struct service_thread_param *params)
+{
+ uint32_t thread_num = params->thread_num;
+
+ NOTE("Thread #%u: Starting on socket end", thread_num);
+
+ struct http_packet_t *pkt = NULL;
+
+ while (is_socket_open(params) && !g_options.terminate) {
+ if (g_options.unix_socket_mode)
+ pkt = uds_packet_get(params->uds);
else
- NOTE("Thread #%d: M %p: Client msg completed",
- thread_num, client_msg);
- message_free(client_msg);
- client_msg = NULL;
+ pkt = tcp_packet_get(params->tcp);
+
+ if (pkt == NULL) {
+ if (!is_socket_open(params))
+ NOTE("Thread: #%u: Client closed connection", thread_num);
+ else
+ NOTE("Thread: #%u: There was an error reading from the socket",
+ thread_num);
+ return;
+ }
+
+ NOTE("Thread #%u: Pkt from %s (buffer size: %zu)\n===\n%s===", thread_num,
+ g_options.unix_socket_mode ? "uds" : "tcp", pkt->filled_size,
+ hexdump(pkt->buffer, (int)pkt->filled_size));
- if (g_options.terminate)
- goto cleanup_subconn;
+ /* Send pkt to printer. */
+ usb_conn_packet_send(params->usb_conn, pkt);
+ packet_free(pkt);
+ }
+}
+
+/* Returns the value of |read_inflight|. Uses a mutex since another thread which
+ is processing the asynchronous transfer may change the value once the
+ transfer is complete. */
+static int get_read_inflight(const int *read_inflight, pthread_mutex_t *mtx)
+{
+ pthread_mutex_lock(mtx);
+ int val = *read_inflight;
+ pthread_mutex_unlock(mtx);
+
+ return val;
+}
+
+/* Sets the value of |read_inflight| to |val|. Uses a mutex since another thread
+ which is processing the asynchronous transfer may change the value once the
+ transfer is complete. */
+static void set_read_inflight(int val, pthread_mutex_t *mtx, int *read_inflight)
+{
+ pthread_mutex_lock(mtx);
+ *read_inflight = val;
+ pthread_mutex_unlock(mtx);
+}
+
+/* Reads from the printer and writes to the socket. */
+static void *service_printer_connection(void *params_void)
+{
+ struct service_thread_param *params =
+ (struct service_thread_param *)params_void;
+ uint32_t thread_num = params->thread_num;
- /* Server's response */
- server_msg = http_message_new();
+ NOTE("Thread #%u: Starting on printer end", thread_num);
- if (server_msg == NULL) {
- ERR("Thread #%d: Failed to create server message",
- thread_num);
- goto cleanup_subconn;
+ /* Register clean-up handler. */
+ pthread_cleanup_push(cleanup_handler, &thread_num);
+
+ int read_inflight = 0;
+ pthread_mutex_t read_inflight_mutex;
+ if (pthread_mutex_init(&read_inflight_mutex, NULL))
+ goto cleanup;
+
+ struct libusb_transfer *read_transfer = NULL;
+
+ while (is_socket_open(params) && !g_options.terminate) {
+ /* If there is already a read from the printer underway, block until it has
+ completed. */
+ pthread_mutex_lock(&read_inflight_mutex);
+ while (is_socket_open(params) && read_inflight)
+ pthread_cond_wait(params->cond, &read_inflight_mutex);
+ pthread_mutex_unlock(&read_inflight_mutex);
+
+ /* After waking up due to a completed transfer, verify that the socket is
+ still open and that the termination flag has not been set before
+ attempting to start another transfer. */
+ if (!is_socket_open(params) || g_options.terminate)
+ break;
+
+ NOTE("Thread #%u: No read in flight, starting a new one", thread_num);
+ struct http_packet_t *pkt = packet_new();
+ if (pkt == NULL) {
+ ERR("Thread #%u: Failed to allocate packet", thread_num);
+ break;
+ }
+
+ struct libusb_callback_data *user_data = setup_libusb_callback_data(
+ pkt, &read_inflight, params, &read_inflight_mutex);
+
+ if (user_data == NULL) {
+ ERR("Thread #%u: Failed to allocate memory for libusb_callback_data",
+ thread_num);
+ break;
}
- if (usb != NULL)
- NOTE("Thread #%d: M %p: Interface #%d: Server msg starting",
- thread_num, server_msg,
- usb->interface_index);
- else
- NOTE("Thread #%d: M %p: Server msg starting",
- thread_num, server_msg);
- while (!server_msg->is_completed && !g_options.terminate) {
- struct http_packet_t *pkt;
- if (arg->usb_sock != NULL) {
- pkt = usb_conn_packet_get(usb, server_msg);
- if (pkt == NULL) {
- usb_failed = 1;
- goto cleanup_subconn;
- }
- } else {
- /* In no-printer mode we "invent" the answer
- of the printer, a simple HTML message as
- a pseudo web interface */
- pkt = packet_new(server_msg);
- snprintf((char*)(pkt->buffer),
- pkt->buffer_capacity - 1,
- "HTTP/1.1 200 OK\r\nContent-Type: text/html; name=ippusbxd.html; charset=UTF-8\r\n\r\n<html><h2>ippusbxd</h2><p>Debug/development mode without connection to IPP-over-USB printer</p></html>\r\n");
- pkt->filled_size = 183;
- /* End the TCP connection, so that a
- web browser does not wait for more data */
- server_msg->is_completed = 1;
- if (g_options.unix_socket_mode)
- arg->uds->is_closed = 1;
- else
- arg->tcp->is_closed = 1;
- }
- if (g_options.terminate)
- goto cleanup_subconn;
+ read_transfer = setup_async_read(
+ params->usb_conn, pkt, read_transfer_callback, (void *)user_data, 2000);
- NOTE("Thread #%d: M %p P %p: Pkt from usb (buffer size: %d)\n===\n%s===",
- thread_num, server_msg, pkt, pkt->filled_size,
- hexdump(pkt->buffer, (int)pkt->filled_size));
+ if (read_transfer == NULL) {
+ ERR("Thread #%u: Failed to allocate memory for libusb transfer",
+ thread_num);
+ break;
+ }
- if ((g_options.unix_socket_mode && uds_packet_send(arg->uds, pkt) != 0) ||
- (!g_options.unix_socket_mode && tcp_packet_send(arg->tcp, pkt)) != 0) {
- ERR("Thread #%d: M %p P %p: Unable to send client package via %s",
- thread_num, client_msg, pkt,
- g_options.unix_socket_mode ? "uds" : "tcp");
- packet_free(pkt);
- goto cleanup_subconn;
- }
- if (usb != NULL)
- NOTE("Thread #%d: M %p P %p: Interface #%d: Server pkt done",
- thread_num, server_msg, pkt,
- usb->interface_index);
- else
- NOTE("Thread #%d: M %p P %p: Server pkt done",
- thread_num, server_msg, pkt);
+ /* Mark that there is a new read in flight. A mutex should not be needed
+ here since the transfer callback won't be fired until after calling
+ libusb_submit_transfer() */
+ read_inflight = 1;
- packet_free(pkt);
+ if (libusb_submit_transfer(read_transfer)) {
+ ERR("Thread #%u: Failed to submit asynchronous USB transfer", thread_num);
+ set_read_inflight(0, &read_inflight_mutex, &read_inflight);
+ break;
}
- if (usb != NULL)
- NOTE("Thread #%d: M %p: Interface #%d: Server msg completed",
- thread_num, server_msg,
- usb->interface_index);
- else
- NOTE("Thread #%d: M %p: Server msg completed",
- thread_num, server_msg);
-
- cleanup_subconn:
- if (usb != NULL && ((g_options.unix_socket_mode && arg->uds->is_closed) ||
- (!g_options.unix_socket_mode && arg->tcp->is_closed) ||
- usb_failed == 1)) {
- NOTE("Thread #%d: M %p: Interface #%d: releasing usb conn", thread_num,
- server_msg, usb->interface_index);
- usb_conn_release(usb);
- usb = NULL;
+ }
+
+ /* If the socket used for communication has closed and there is still a
+ transfer from the printer in flight then we attempt to cancel it. */
+ if (get_read_inflight(&read_inflight, &read_inflight_mutex)) {
+ NOTE(
+ "Thread #%u: There was a read in flight when the connection was "
+ "closed, cancelling transfer", thread_num);
+ int cancel_status = libusb_cancel_transfer(read_transfer);
+ if (!cancel_status) {
+ /* Wait until the cancellation has completed. */
+ NOTE("Thread #%u: Waiting until the transfer has been cancelled",
+ thread_num);
+ pthread_mutex_lock(&read_inflight_mutex);
+ while (read_inflight)
+ pthread_cond_wait(params->cond, &read_inflight_mutex);
+ pthread_mutex_unlock(&read_inflight_mutex);
+ } else if (cancel_status == LIBUSB_ERROR_NOT_FOUND) {
+ NOTE("Thread #%u: The transfer has already completed", thread_num);
+ } else {
+ NOTE("Thread #%u: Failed to cancel transfer");
+ g_options.terminate = 1;
}
- if (client_msg != NULL)
- message_free(client_msg);
- if (server_msg != NULL)
- message_free(server_msg);
}
- NOTE("Thread #%d: Closing, %s", thread_num,
- g_options.terminate ? "shutdown requested" : "communication thread terminated");
- if (g_options.unix_socket_mode)
- uds_conn_close(arg->uds);
- else
- tcp_conn_close(arg->tcp);
- free(arg);
+ pthread_mutex_destroy(&read_inflight_mutex);
- /* Execute clean-up handler */
+cleanup:
+ /* Execute clean-up handler. */
pthread_cleanup_pop(1);
-
pthread_exit(NULL);
}
@@ -400,22 +544,130 @@ static uint16_t open_tcp_socket(void)
return desired_port;
}
+/* Attempts to allocate space for either a uds socket or a tcp socket depending
+ on the value of |g_options.unix_socket_mode|. If the allocation is successful
+ then a value of 0 is returned, otherwise a non-zero value is returned. */
+static int allocate_socket_connection(struct service_thread_param *param)
+{
+ if (g_options.unix_socket_mode)
+ param->uds = calloc(1, sizeof(*param->uds));
+ else
+ param->tcp = calloc(1, sizeof(*param->tcp));
+
+ if (param->uds == NULL && param->tcp == NULL) {
+ ERR("Preparing thread #%u: Failed to allocate space for cups connection",
+ param->thread_num);
+ return -1;
+ }
+
+ return 0;
+}
+
+/* Attempts to setup a connection to either the uds or tcp socket depending on
+ the value of |g_options.unix_socket_mode|. Returns a 0 value on success and a
+ non-zero value if something went wrong attempting to establish the
+ connection. */
+static int setup_socket_connection(struct service_thread_param *param)
+{
+ if (g_options.unix_socket_mode) {
+ int poll_status = 0;
+ while (!g_options.terminate && poll_status == 0) {
+ check_timeout();
+ poll_status = uds_connect(g_options.uds_socket, param->uds);
+ }
+ if (g_options.terminate || poll_status < 0)
+ return -1;
+ } else {
+ param->tcp = tcp_conn_select(g_options.tcp_socket, g_options.tcp6_socket);
+ if (g_options.terminate || param->tcp == NULL)
+ return -1;
+ }
+
+ return 0;
+}
+
+/* Attempt to create a new usb_conn_t and assign it to |param| by acquiring an
+ available usb interface. Returns 0 if the creating on the connection struct
+ was successful, and non-zero if there was an error attempting to acquire the
+ interface. */
+static int setup_usb_connection(struct usb_sock_t *usb_sock,
+ struct service_thread_param *param)
+{
+ param->usb_conn = usb_conn_acquire(usb_sock);
+ if (param->usb_conn == NULL) {
+ ERR("Thread #%u: Failed to acquire usb interface", param->thread_num);
+ return -1;
+ }
+
+ return 0;
+}
+
+/* Attempt to register a new communication thread to execute the function
+ |routine| with the given |params|. If successful a 0 value is returned,
+ otherwise a non-zero value is returned. */
+static int setup_communication_thread(void *(*routine)(void *),
+ struct service_thread_param *param)
+{
+ pthread_mutex_lock(&thread_register_mutex);
+ register_service_thread(&num_service_threads, &service_threads, param);
+ list_service_threads(num_service_threads, service_threads);
+ pthread_mutex_unlock(&thread_register_mutex);
+
+ int status =
+ pthread_create(&param->thread_handle, NULL, routine, param);
+
+ if (status) {
+ ERR("Creating thread #%u: Failed to spawn thread, error %d",
+ param->thread_num, status);
+ pthread_mutex_lock(&thread_register_mutex);
+ unregister_service_thread(&num_service_threads, &service_threads,
+ param->thread_num);
+ list_service_threads(num_service_threads, service_threads);
+ pthread_mutex_unlock(&thread_register_mutex);
+ return -1;
+ }
+
+ return 0;
+}
+
+static struct libusb_callback_data *setup_libusb_callback_data(
+ struct http_packet_t *pkt, int *read_inflight,
+ struct service_thread_param *thread_param,
+ pthread_mutex_t *read_inflight_mutex) {
+ struct libusb_callback_data *data = calloc(1, sizeof(*data));
+ if (data == NULL)
+ return NULL;
+
+ data->pkt = pkt;
+ data->read_inflight = read_inflight;
+ data->thread_num = thread_param->thread_num;
+ data->read_inflight_mutex = read_inflight_mutex;
+ data->read_inflight_cond = thread_param->cond;
+
+ if (g_options.unix_socket_mode)
+ data->uds = thread_param->uds;
+ else
+ data->tcp = thread_param->tcp;
+
+ return data;
+}
+
+static int is_socket_open(const struct service_thread_param *param) {
+ if (g_options.unix_socket_mode)
+ return !param->uds->is_closed;
+ return !param->tcp->is_closed;
+}
+
static void start_daemon()
{
- /* Capture USB device if not in no-printer mode */
+ /* Capture USB device. */
struct usb_sock_t *usb_sock;
/* Termination flag */
g_options.terminate = 0;
- if (g_options.noprinter_mode == 0) {
- usb_sock = usb_open();
- if (usb_sock == NULL)
- goto cleanup_usb;
- } else {
- usb_sock = NULL;
- g_options.device_id = "MFG:Acme;MDL:LaserStar 2000;CMD:AppleRaster,PWGRaster;CLS:PRINTER;DES:Acme LaserStar 2000;SN:001;";
- }
+ usb_sock = usb_open();
+ if (usb_sock == NULL) goto cleanup_usb;
if (g_options.unix_socket_mode) {
g_options.uds_socket = uds_open(g_options.unix_socket_path);
@@ -490,61 +742,33 @@ static void start_daemon()
}
/* Main loop */
- int i = 0;
+ uint32_t i = 1;
pthread_mutex_init(&thread_register_mutex, NULL);
while (!g_options.terminate) {
- i ++;
struct service_thread_param *args = calloc(1, sizeof(*args));
if (args == NULL) {
- ERR("Preparing thread #%d: Failed to alloc space for thread args",
- i);
+ ERR("Preparing thread #%u: Failed to alloc space for thread args", i);
goto cleanup_thread;
}
- if (g_options.unix_socket_mode) {
- args->uds = calloc(1, sizeof(*args->uds));
- if (args->uds == NULL) {
- ERR("Preparing thread #%d: Failed to allocate space for uds socket", i);
- }
- } else {
- args->tcp = calloc(1, sizeof(*args->tcp));
- if (args->tcp == NULL) {
- ERR("Preparing thread #%d: Failed to allocate space for tcp socket", i);
- }
- }
-
args->thread_num = i;
args->usb_sock = usb_sock;
- if (g_options.unix_socket_mode) {
- int poll_status = 0;
- while (!g_options.terminate && poll_status == 0) {
- check_timeout();
- poll_status = uds_connect(g_options.uds_socket, args->uds);
- }
- if (g_options.terminate || poll_status < 0)
- goto cleanup_thread;
- } else {
- args->tcp = tcp_conn_select(g_options.tcp_socket, g_options.tcp6_socket);
- if (g_options.terminate || args->tcp == NULL)
- goto cleanup_thread;
- }
+ /* Allocate space for either a uds or tcp socket to be used for
+ communication. */
+ if (allocate_socket_connection(args))
+ goto cleanup_thread;
- pthread_mutex_lock(&thread_register_mutex);
- register_service_thread(&num_service_threads, &service_threads, args);
- list_service_threads(num_service_threads, service_threads);
- pthread_mutex_unlock(&thread_register_mutex);
- int status = pthread_create(&args->thread_handle, NULL,
- &service_connection, args);
- if (status) {
- ERR("Creating thread #%d: Failed to spawn thread, error %d",
- i, status);
- pthread_mutex_lock(&thread_register_mutex);
- unregister_service_thread(&num_service_threads, &service_threads, i);
- list_service_threads(num_service_threads, service_threads);
- pthread_mutex_unlock(&thread_register_mutex);
+ /* Attempt to establish a connection to the relevant CUPS socket. */
+ if (setup_socket_connection(args))
+ goto cleanup_thread;
+
+ /* Attempt to start up a new thread to handle the CUPS end of
+ communication. */
+ if (setup_communication_thread(&service_connection, args))
goto cleanup_thread;
- }
+
+ i += 2;
continue;
@@ -568,7 +792,7 @@ static void start_daemon()
stopping ippusbxd, so that no USB communication with the printer can
happen after the final reset */
while (num_service_threads) {
- NOTE("Thread #%d did not terminate, canceling it now ...",
+ NOTE("Thread #%u did not terminate, canceling it now ...",
service_threads[0]->thread_num);
i = num_service_threads;
pthread_cancel(service_threads[0]->thread_handle);
@@ -577,6 +801,7 @@ static void start_daemon()
}
/* Wait for USB unplug event observer thread to terminate */
+ NOTE("Shutting down usb observer thread");
pthread_join(g_options.usb_event_thread_handle, NULL);
/* TCP clean-up */
@@ -632,7 +857,6 @@ int main(int argc, char *argv[])
{"verbose", no_argument, 0, 'q' },
{"no-fork", no_argument, 0, 'n' },
{"no-broadcast", no_argument, 0, 'B' },
- {"no-printer", no_argument, 0, 'N' },
{"help", no_argument, 0, 'h' },
{NULL, 0, 0, 0 }
};
@@ -647,7 +871,7 @@ int main(int argc, char *argv[])
g_options.device = 0;
g_options.measuring_timeout = 0;
- while ((c = getopt_long(argc, argv, "qnhdp:P:i:s:lv:m:NB",
+ while ((c = getopt_long(argc, argv, "qnhdp:P:i:s:lv:m:B",
long_options, &option_index)) != -1) {
switch (c) {
case '?':
@@ -720,9 +944,6 @@ int main(int argc, char *argv[])
case 's':
g_options.serial_num = (unsigned char *)optarg;
break;
- case 'N':
- g_options.noprinter_mode = 1;
- break;
case 'B':
g_options.nobroadcast = 1;
break;
@@ -772,13 +993,11 @@ int main(int argc, char *argv[])
" -n No-fork mode\n"
" --no-broadcast\n"
" -B No-broadcast mode, do not DNS-SD-broadcast\n"
- " --no-printer\n"
- " -N No-printer mode, debug/developer mode which makes ippusbxd\n"
- " run without IPP-over-USB printer\n"
, argv[0], argv[0], argv[0]);
return 0;
}
start_daemon();
+ NOTE("ippusbxd completed successfully");
return 0;
}
diff --git a/src/options.h b/src/options.h
index cc7b93e..99cadd4 100644
--- a/src/options.h
+++ b/src/options.h
@@ -36,7 +36,6 @@ struct options {
int help_mode;
int verbose_mode;
int nofork_mode;
- int noprinter_mode;
int nobroadcast;
int unix_socket_mode;
diff --git a/src/tcp.c b/src/tcp.c
index 5e11884..2e08516 100644
--- a/src/tcp.c
+++ b/src/tcp.c
@@ -30,11 +30,11 @@
#include <unistd.h>
#include <errno.h>
-#include "options.h"
+#include "http.h"
#include "logging.h"
+#include "options.h"
#include "tcp.h"
-
struct tcp_sock_t *tcp_open(uint16_t port, char* interface)
{
struct tcp_sock_t *this = calloc(1, sizeof *this);
@@ -221,56 +221,37 @@ uint16_t tcp_port_number_get(struct tcp_sock_t *sock)
return 0;
}
-struct http_packet_t *tcp_packet_get(struct tcp_conn_t *tcp,
- struct http_message_t *msg)
+struct http_packet_t *tcp_packet_get(struct tcp_conn_t *tcp)
{
/* Alloc packet ==---------------------------------------------------== */
- struct http_packet_t *pkt = packet_new(msg);
+ struct http_packet_t *pkt = packet_new();
if (pkt == NULL) {
ERR("failed to create packet for incoming tcp message");
goto error;
}
- size_t want_size = packet_pending_bytes(pkt);
- if (want_size == 0) {
- NOTE("TCP: Got %lu from spare buffer", pkt->filled_size);
- return pkt;
- }
-
struct timeval tv;
tv.tv_sec = 3;
tv.tv_usec = 0;
- setsockopt(tcp->sd, SOL_SOCKET, SO_RCVTIMEO,
- (char *)&tv, sizeof(struct timeval));
-
- while (want_size != 0 && !msg->is_completed && !g_options.terminate) {
- NOTE("TCP: Getting %d bytes", want_size);
- uint8_t *subbuffer = pkt->buffer + pkt->filled_size;
- ssize_t gotten_size = recv(tcp->sd, subbuffer, want_size, 0);
- if (gotten_size < 0) {
- int errno_saved = errno;
- ERR("recv failed with err %d:%s", errno_saved,
- strerror(errno_saved));
- tcp->is_closed = 1;
- goto error;
- }
- NOTE("TCP: Got %d bytes", gotten_size);
- if (gotten_size == 0) {
- tcp->is_closed = 1;
- if (pkt->filled_size == 0) {
- /* Client closed TCP conn */
- goto error;
- } else {
- break;
- }
- }
+ if (setsockopt(tcp->sd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) {
+ ERR("TCP: Setting options for tcp connection socket failed");
+ goto error;
+ }
+
+ ssize_t gotten_size = recv(tcp->sd, pkt->buffer, pkt->buffer_capacity, 0);
+
+ if (gotten_size < 0) {
+ int errno_saved = errno;
+ ERR("recv failed with err %d:%s", errno_saved, strerror(errno_saved));
+ tcp->is_closed = 1;
+ goto error;
+ }
- packet_mark_received(pkt, (unsigned) gotten_size);
- want_size = packet_pending_bytes(pkt);
- NOTE("TCP: Want more %d bytes; Message %scompleted", want_size, msg->is_completed ? "" : "not ");
+ if (gotten_size == 0) {
+ tcp->is_closed = 1;
}
- NOTE("TCP: Received %lu bytes", pkt->filled_size);
+ pkt->filled_size = gotten_size;
return pkt;
error:
@@ -283,9 +264,10 @@ int tcp_packet_send(struct tcp_conn_t *conn, struct http_packet_t *pkt)
{
size_t remaining = pkt->filled_size;
size_t total = 0;
+
while (remaining > 0 && !g_options.terminate) {
- ssize_t sent = send(conn->sd, pkt->buffer + total,
- remaining, MSG_NOSIGNAL);
+ ssize_t sent = send(conn->sd, pkt->buffer + total, remaining, MSG_NOSIGNAL);
+
if (sent < 0) {
if (errno == EPIPE) {
conn->is_closed = 1;
@@ -295,13 +277,13 @@ int tcp_packet_send(struct tcp_conn_t *conn, struct http_packet_t *pkt)
return -1;
}
- size_t sent_ulong = (unsigned) sent;
- total += sent_ulong;
- if (sent_ulong >= remaining)
+ total += sent;
+ if (sent >= remaining)
remaining = 0;
else
- remaining -= sent_ulong;
+ remaining -= sent;
}
+
NOTE("TCP: sent %lu bytes", total);
return 0;
}
diff --git a/src/tcp.h b/src/tcp.h
index e58cb17..5ae6b4f 100644
--- a/src/tcp.h
+++ b/src/tcp.h
@@ -48,6 +48,5 @@ struct tcp_conn_t *tcp_conn_select(struct tcp_sock_t *sock,
struct tcp_sock_t *sock6);
void tcp_conn_close(struct tcp_conn_t *);
-struct http_packet_t *tcp_packet_get(struct tcp_conn_t *,
- struct http_message_t *);
+struct http_packet_t *tcp_packet_get(struct tcp_conn_t *);
int tcp_packet_send(struct tcp_conn_t *, struct http_packet_t *);
diff --git a/src/uds.c b/src/uds.c
index b146b48..be2dab0 100644
--- a/src/uds.c
+++ b/src/uds.c
@@ -127,60 +127,36 @@ void uds_conn_close(struct uds_conn_t *conn)
free(conn);
}
-struct http_packet_t *uds_packet_get(struct uds_conn_t *conn,
- struct http_message_t *msg)
-{
- struct http_packet_t *pkt = packet_new(msg);
+struct http_packet_t *uds_packet_get(struct uds_conn_t *conn) {
+ struct http_packet_t *pkt = packet_new();
if (pkt == NULL) {
ERR("UDS: Allocating memory for incoming uds message failed");
goto error;
}
- size_t want_size = packet_pending_bytes(pkt);
- if (want_size == 0) {
- NOTE("UDS: Got %zu from spare buffer", pkt->filled_size);
- return pkt;
- }
-
struct timeval tv;
tv.tv_sec = 3;
tv.tv_usec = 0;
- if (setsockopt(conn->fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) {
- ERR("UDS: Setting options for connection socket failed");
+ if (setsockopt(conn->fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) {
+ ERR("UDS: Setting options for uds socket failed");
goto error;
}
- while (want_size != 0 && !msg->is_completed && !g_options.terminate) {
- NOTE("UDS: Getting %zu bytes", want_size);
- uint8_t *subbuffer = pkt->buffer + pkt->filled_size;
- ssize_t gotten_size = recv(conn->fd, subbuffer, want_size, 0);
+ ssize_t gotten_size = recv(conn->fd, pkt->buffer, pkt->buffer_capacity, 0);
- if (gotten_size < 0) {
- int errno_saved = errno;
- ERR("UDS: recv failed with err %d:%s", errno_saved,
- strerror(errno_saved));
- conn->is_closed = 1;
- goto error;
- }
-
- NOTE("UDS: Got %zd bytes", gotten_size);
- if (gotten_size == 0) {
- conn->is_closed = 1;
- if (pkt->filled_size == 0) {
- // Client closed connection.
- goto error;
- } else {
- break;
- }
- }
+ if (gotten_size < 0) {
+ int errno_saved = errno;
+ ERR("UDS: recv failed with err %d:%s", errno_saved, strerror(errno_saved));
+ conn->is_closed = 1;
+ goto error;
+ }
- packet_mark_received(pkt, (unsigned)gotten_size);
- want_size = packet_pending_bytes(pkt);
- NOTE("UDS: Want %zu more bytes; Message %scompleted", want_size,
- msg->is_completed ? "" : "not ");
+ if (gotten_size == 0) {
+ conn->is_closed = 1;
+ goto error;
}
- NOTE("UDS: Received %zu bytes", pkt->filled_size);
+ pkt->filled_size = gotten_size;
return pkt;
error:
@@ -217,3 +193,12 @@ int uds_packet_send(struct uds_conn_t *conn, struct http_packet_t *pkt)
NOTE("UDS: sent %zu bytes", total);
return 0;
}
+
+int uds_poll_connection(struct uds_conn_t *conn) {
+ struct pollfd poll_fd;
+ poll_fd.fd = conn->fd;
+ poll_fd.events = POLLIN;
+ static int timeout = 1000;
+
+ return poll(&poll_fd, 1, timeout);
+}
diff --git a/src/uds.h b/src/uds.h
index 15be0c6..b79c3c8 100644
--- a/src/uds.h
+++ b/src/uds.h
@@ -39,7 +39,8 @@ int uds_connect(struct uds_sock_t *sock, struct uds_conn_t *conn);
void uds_conn_close(struct uds_conn_t *conn);
-struct http_packet_t *uds_packet_get(struct uds_conn_t *conn,
- struct http_message_t *msg);
+struct http_packet_t *uds_packet_get(struct uds_conn_t *conn);
int uds_packet_send(struct uds_conn_t *conn, struct http_packet_t *pkt);
+
+int uds_poll_connection(struct uds_conn_t *conn);
diff --git a/src/usb.c b/src/usb.c
index 8e3f43a..690bed8 100644
--- a/src/usb.c
+++ b/src/usb.c
@@ -173,10 +173,8 @@ static void try_detach_kernel_driver(struct usb_sock_t *usb,
/* Make kernel release interface */
if (libusb_kernel_driver_active(usb->printer, uf->libusb_interface_index) ==
1) {
- /* Only linux supports this
- other platforms will fail
- thus we ignore the error code
- it either works or it does not */
+ /* Only linux supports this other platforms will fail thus we ignore the
+ error code it either works or it does not */
libusb_detach_kernel_driver(usb->printer, uf->libusb_interface_index);
}
}
@@ -186,9 +184,8 @@ static int try_claim_usb_interface(struct usb_sock_t *usb,
/* Claim the whole interface */
int status = 0;
do {
- /* Spinlock-like
- Libusb does not offer a blocking call
- so we're left with a spinlock */
+ /* Spinlock-like Libusb does not offer a blocking call so we're left with a
+ spinlock. */
status = libusb_claim_interface(usb->printer, uf->libusb_interface_index);
if (status)
NOTE("Failed to claim interface %d, retrying",
@@ -412,16 +409,16 @@ struct usb_sock_t *usb_open()
goto error;
}
- /* Try to make the kernel release the usb interface */
+ /* Try to make the kernel release the usb interface. */
try_detach_kernel_driver(usb, uf);
- /* Try to claim the usb interface */
+ /* Try to claim the usb interface. */
if (try_claim_usb_interface(usb, uf)) {
ERR("Failed to claim usb interface #%d", uf->interface_number);
goto error;
}
- /* Select the IPP-USB alt setting of the interface */
+ /* Select the IPP-USB alt setting of the interface. */
if (libusb_set_interface_alt_setting(
usb->printer, uf->libusb_interface_index, uf->interface_alt)) {
ERR("Failed to set alt setting for interface #%d",
@@ -492,7 +489,10 @@ void usb_close(struct usb_sock_t *usb)
NOTE("Resetting printer ...");
libusb_reset_device(usb->printer);
NOTE("Reset completed.");
+ NOTE("Closing device handle...");
libusb_close(usb->printer);
+ NOTE("Closed device handle.");
+
if (usb != NULL) {
if (usb->context != NULL)
libusb_exit(usb->context);
@@ -677,7 +677,6 @@ struct usb_conn_t *usb_conn_acquire(struct usb_sock_t *usb)
ERR("Timed out waiting for a free USB interface");
return NULL;
}
- usleep(100000);
}
struct usb_conn_t *conn = calloc(1, sizeof(*conn));
@@ -791,148 +790,18 @@ int usb_conn_packet_send(struct usb_conn_t *conn, struct http_packet_t *pkt)
return 0;
}
-struct http_packet_t *usb_conn_packet_get(struct usb_conn_t *conn, struct http_message_t *msg)
+struct libusb_transfer *setup_async_read(struct usb_conn_t *conn,
+ struct http_packet_t *pkt,
+ libusb_transfer_cb_fn callback,
+ void *user_data, uint32_t timeout)
{
- if (msg->is_completed)
+ struct libusb_transfer *transfer = libusb_alloc_transfer(0);
+ if (transfer == NULL)
return NULL;
- struct http_packet_t *pkt = packet_new(msg);
- if (pkt == NULL) {
- ERR("failed to create packet for incoming usb message");
- goto cleanup;
- }
-
- /* File packet */
- const int timeout = 1000; /* 1 sec */
- size_t read_size_ulong = packet_pending_bytes(pkt);
- if (read_size_ulong == 0)
- return pkt;
-
- uint64_t times_staled = 0;
- while (read_size_ulong > 0 && !msg->is_completed && !g_options.terminate) {
- if (read_size_ulong >= INT_MAX)
- goto cleanup;
- int read_size = (int)read_size_ulong;
-
- /* Pad read_size to multiple of usb's max packet size */
- read_size += (512 - (read_size % 512)) % 512;
-
- /* Expand buffer if needed */
- if (pkt->buffer_capacity < pkt->filled_size + read_size_ulong)
- if (packet_expand(pkt) < 0) {
- ERR("Failed to ensure room for usb pkt");
- goto cleanup;
- }
-
- int gotten_size = 0;
- int status = libusb_bulk_transfer(conn->parent->printer,
- conn->interface->endpoint_in,
- pkt->buffer + pkt->filled_size,
- read_size,
- &gotten_size, timeout);
-
- if (status == LIBUSB_ERROR_NO_DEVICE) {
- ERR("Printer has been disconnected");
- goto cleanup;
- }
-
- if (status != 0 && status != LIBUSB_ERROR_TIMEOUT) {
- ERR("bulk xfer failed with error code %d", status);
- ERR("tried reading %d bytes", read_size);
- goto cleanup;
- } else if (status == LIBUSB_ERROR_TIMEOUT) {
- ERR("bulk xfer timed out, retrying ...");
- ERR("tried reading %d bytes, actually read %d bytes",
- read_size, gotten_size);
- }
-
- if (gotten_size < 0) {
- ERR("Negative read size unexpected");
- goto cleanup;
- }
-
- if (gotten_size > 0) {
- times_staled = 0;
- usb_conn_mark_moving(conn);
- } else {
-
- /* Performance Test ---------------
- How long we sleep here has a
- dramatic affect on how long it
- takes to load a page.
- Earlier versions waited a tenth
- of a second which resulted in
- minute long page loads.
- On my HP printer the most obvious
- bottleneck is the "Unified.js" file
- which weighs 517.87KB. My profiling
- looked at how shortening this sleep
- could improve this file's load times.
- The cycle count is from perf and
- covers an entire page load.
-
- Below are my results:
- 1 in 100 == 2447ms, 261M cycles
- 1 in 1,000 == 483ms, 500M cycles
- 5 in 10,000 == 433ms, 800M cycles
- 1 in 10,000 == 320ms, 3000M cycles */
- #define TIMEOUT_RATIO (10000 / 5)
- static uint64_t stale_timeout =
- CONN_STALE_THRESHHOLD * TIMEOUT_RATIO;
- static uint64_t crash_timeout =
- PRINTER_CRASH_TIMEOUT_ANSWER * TIMEOUT_RATIO;
- static uint64_t skip_timeout =
- 1000000000 / TIMEOUT_RATIO;
-
- struct timespec sleep_dur;
- sleep_dur.tv_sec = 0;
- sleep_dur.tv_nsec = skip_timeout;
- nanosleep(&sleep_dur, NULL);
-
- if (status == LIBUSB_ERROR_TIMEOUT)
- times_staled += TIMEOUT_RATIO * timeout / 1000;
- else
- times_staled++;
- if (times_staled % TIMEOUT_RATIO == 0 ||
- status == LIBUSB_ERROR_TIMEOUT) {
- NOTE("No bytes received for %d sec.",
- times_staled / TIMEOUT_RATIO);
- if (pkt->filled_size > 0)
- NOTE("Packet so far \n===\n%s===\n",
- hexdump(pkt->buffer,
- pkt->filled_size));
- }
-
- if (times_staled > stale_timeout) {
- usb_conn_mark_staled(conn);
-
- if (pkt->filled_size > 0 ||
- usb_all_conns_staled(conn->parent) ||
- times_staled > crash_timeout) {
- ERR("USB timed out, giving up waiting for more data");
- break;
- }
- }
- }
-
- if (gotten_size) {
- NOTE("USB: Getting %d bytes of %d",
- read_size, pkt->expected_size);
- NOTE("USB: Got %d bytes", gotten_size);
- }
- packet_mark_received(pkt, (size_t)gotten_size);
- read_size_ulong = packet_pending_bytes(pkt);
- }
- NOTE("USB: Received %d bytes of %d with type %d",
- pkt->filled_size, pkt->expected_size, msg->type);
-
- if (pkt->filled_size == 0)
- goto cleanup;
+ libusb_fill_bulk_transfer(transfer, conn->parent->printer,
+ conn->interface->endpoint_in, pkt->buffer,
+ pkt->buffer_capacity, callback, user_data, timeout);
- return pkt;
-
- cleanup:
- if (pkt != NULL)
- packet_free(pkt);
- return NULL;
+ return transfer;
}
diff --git a/src/usb.h b/src/usb.h
index e23e03c..3aa939c 100644
--- a/src/usb.h
+++ b/src/usb.h
@@ -67,4 +67,8 @@ struct usb_conn_t *usb_conn_acquire(struct usb_sock_t *);
void usb_conn_release(struct usb_conn_t *);
int usb_conn_packet_send(struct usb_conn_t *, struct http_packet_t *);
-struct http_packet_t *usb_conn_packet_get(struct usb_conn_t *, struct http_message_t *);
+
+struct libusb_transfer *setup_async_read(struct usb_conn_t *conn,
+ struct http_packet_t *pkt,
+ libusb_transfer_cb_fn callback,
+ void *user_data, uint32_t timeout);
--
2.17.0.rc1.321.gba9d0f2565-goog