| # Copyright 2016 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Tester feedback request multiplexer.""" |
| |
| from multiprocessing import reduction |
| import Queue |
| import collections |
| import multiprocessing |
| import os |
| import sys |
| |
| import common |
| from autotest_lib.client.common_lib.feedback import tester_feedback_client |
| |
| import input_handlers |
| import request |
| import sequenced_request |
| |
| |
| ReqTuple = collections.namedtuple( |
| 'ReqTuple', ('obj', 'reduced_reply_pipe', 'query_num', 'atomic')) |
| |
| |
| class FeedbackRequestMultiplexer(object): |
| """A feedback request multiplexer.""" |
| |
| class RequestProcessingTerminated(Exception): |
| """User internally to signal processor termination.""" |
| |
| |
| def __init__(self): |
| self._request_queue = multiprocessing.Queue() |
| self._pending = [] |
| self._request_handling_process = None |
| self._running = False |
| self._atomic_seq = None |
| |
| |
| def _dequeue_request(self, block=False): |
| try: |
| req_tuple = self._request_queue.get(block=block) |
| except Queue.Empty: |
| return False |
| |
| if req_tuple is None: |
| raise self.RequestProcessingTerminated |
| self._pending.append(req_tuple) |
| return True |
| |
| |
| def _atomic_seq_cont(self): |
| """Returns index of next pending request in atomic sequence, if any.""" |
| for req_idx, req_tuple in enumerate(self._pending): |
| if req_tuple.query_num == self._atomic_seq: |
| return req_idx |
| |
| |
| def _handle_requests(self, stdin): |
| """Processes feedback requests until termination is signaled. |
| |
| This method is run in a separate process and needs to override stdin in |
| order for raw_input() to work. |
| """ |
| sys.stdin = stdin |
| try: |
| while True: |
| req_idx = None |
| |
| # Wait for a (suitable) request to become available. |
| while True: |
| if self._atomic_seq is None: |
| if self._pending: |
| break |
| else: |
| req_idx = self._atomic_seq_cont() |
| if req_idx is not None: |
| break |
| self._dequeue_request(block=True) |
| |
| # If no request was pre-selected, prompt the user to choose one. |
| if req_idx is None: |
| raw_input('Pending feedback requests, hit Enter to ' |
| 'process... ') |
| |
| # Pull all remaining queued requests. |
| while self._dequeue_request(): |
| pass |
| |
| # Select the request to process. |
| if len(self._pending) == 1: |
| print('Processing: %s' % |
| self._pending[0].obj.get_title()) |
| req_idx = 0 |
| else: |
| choose_req = sequenced_request.SequencedFeedbackRequest( |
| None, None, None) |
| choose_req.append_question( |
| 'List of pending feedback requests:', |
| input_handlers.MultipleChoiceInputHandler( |
| [req_tuple.obj.get_title() |
| for req_tuple in self._pending], |
| default=1), |
| prompt='Choose a request to process') |
| req_idx, _ = choose_req.execute() |
| |
| # Pop and handle selected request, then close pipe. |
| req_tuple = self._pending.pop(req_idx) |
| if req_tuple.obj is not None: |
| try: |
| ret = req_tuple.obj.execute() |
| except request.FeedbackRequestError as e: |
| ret = (tester_feedback_client.QUERY_RET_ERROR, str(e)) |
| reply_pipe = req_tuple.reduced_reply_pipe[0]( |
| *req_tuple.reduced_reply_pipe[1]) |
| reply_pipe.send(ret) |
| reply_pipe.close() |
| |
| # Set the atomic sequence if so instructed. |
| self._atomic_seq = (req_tuple.query_num if req_tuple.atomic |
| else None) |
| |
| except self.RequestProcessingTerminated: |
| pass |
| |
| |
| def start(self): |
| """Starts the request multiplexer.""" |
| if self._running: |
| return |
| |
| dup_stdin = os.fdopen(os.dup(sys.stdin.fileno())) |
| self._request_handling_process = multiprocessing.Process( |
| target=self._handle_requests, args=(dup_stdin,)) |
| self._request_handling_process.start() |
| |
| self._running = True |
| |
| |
| def stop(self): |
| """Stops the request multiplexer.""" |
| if not self._running: |
| return |
| |
| # Tell the request handler to quit. |
| self._request_queue.put(None) |
| self._request_handling_process.join() |
| |
| self._running = False |
| |
| |
| def process_request(self, request, query_num, atomic): |
| """Processes a feedback requests and returns its result. |
| |
| This call is used by queries for submitting individual requests. It is |
| a blocking call that should be called from a separate execution thread. |
| |
| @param request: The feedback request to process. |
| @param query_num: The unique query number. |
| @param atomic: Whether subsequent request(s) are expected and should be |
| processed without interruption. |
| """ |
| reply_pipe_send, reply_pipe_recv = multiprocessing.Pipe() |
| reduced_reply_pipe_send = reduction.reduce_connection(reply_pipe_send) |
| self._request_queue.put(ReqTuple(request, reduced_reply_pipe_send, |
| query_num, atomic)) |
| return reply_pipe_recv.recv() |
| |
| |
| def end_atomic_seq(self, query_num): |
| """Ends the current atomic sequence. |
| |
| This enqueues a null request with the given query_num and atomicity set |
| to False, causing the multiplexer to terminate the atomic sequence. |
| |
| @param query_num: The unique query number. |
| """ |
| self._request_queue.put(ReqTuple(None, None, query_num, False)) |