blob: 8f0974ae54b2aec14463ac815a678f8ace6011ae [file] [log] [blame] [edit]
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Tester feedback request multiplexer."""
from multiprocessing import reduction
import Queue
import collections
import multiprocessing
import os
import sys
import common
from autotest_lib.client.common_lib.feedback import tester_feedback_client
import input_handlers
import request
import sequenced_request
ReqTuple = collections.namedtuple(
'ReqTuple', ('obj', 'reduced_reply_pipe', 'query_num', 'atomic'))
class FeedbackRequestMultiplexer(object):
"""A feedback request multiplexer."""
class RequestProcessingTerminated(Exception):
"""User internally to signal processor termination."""
def __init__(self):
self._request_queue = multiprocessing.Queue()
self._pending = []
self._request_handling_process = None
self._running = False
self._atomic_seq = None
def _dequeue_request(self, block=False):
try:
req_tuple = self._request_queue.get(block=block)
except Queue.Empty:
return False
if req_tuple is None:
raise self.RequestProcessingTerminated
self._pending.append(req_tuple)
return True
def _atomic_seq_cont(self):
"""Returns index of next pending request in atomic sequence, if any."""
for req_idx, req_tuple in enumerate(self._pending):
if req_tuple.query_num == self._atomic_seq:
return req_idx
def _handle_requests(self, stdin):
"""Processes feedback requests until termination is signaled.
This method is run in a separate process and needs to override stdin in
order for raw_input() to work.
"""
sys.stdin = stdin
try:
while True:
req_idx = None
# Wait for a (suitable) request to become available.
while True:
if self._atomic_seq is None:
if self._pending:
break
else:
req_idx = self._atomic_seq_cont()
if req_idx is not None:
break
self._dequeue_request(block=True)
# If no request was pre-selected, prompt the user to choose one.
if req_idx is None:
raw_input('Pending feedback requests, hit Enter to '
'process... ')
# Pull all remaining queued requests.
while self._dequeue_request():
pass
# Select the request to process.
if len(self._pending) == 1:
print('Processing: %s' %
self._pending[0].obj.get_title())
req_idx = 0
else:
choose_req = sequenced_request.SequencedFeedbackRequest(
None, None, None)
choose_req.append_question(
'List of pending feedback requests:',
input_handlers.MultipleChoiceInputHandler(
[req_tuple.obj.get_title()
for req_tuple in self._pending],
default=1),
prompt='Choose a request to process')
req_idx, _ = choose_req.execute()
# Pop and handle selected request, then close pipe.
req_tuple = self._pending.pop(req_idx)
if req_tuple.obj is not None:
try:
ret = req_tuple.obj.execute()
except request.FeedbackRequestError as e:
ret = (tester_feedback_client.QUERY_RET_ERROR, str(e))
reply_pipe = req_tuple.reduced_reply_pipe[0](
*req_tuple.reduced_reply_pipe[1])
reply_pipe.send(ret)
reply_pipe.close()
# Set the atomic sequence if so instructed.
self._atomic_seq = (req_tuple.query_num if req_tuple.atomic
else None)
except self.RequestProcessingTerminated:
pass
def start(self):
"""Starts the request multiplexer."""
if self._running:
return
dup_stdin = os.fdopen(os.dup(sys.stdin.fileno()))
self._request_handling_process = multiprocessing.Process(
target=self._handle_requests, args=(dup_stdin,))
self._request_handling_process.start()
self._running = True
def stop(self):
"""Stops the request multiplexer."""
if not self._running:
return
# Tell the request handler to quit.
self._request_queue.put(None)
self._request_handling_process.join()
self._running = False
def process_request(self, request, query_num, atomic):
"""Processes a feedback requests and returns its result.
This call is used by queries for submitting individual requests. It is
a blocking call that should be called from a separate execution thread.
@param request: The feedback request to process.
@param query_num: The unique query number.
@param atomic: Whether subsequent request(s) are expected and should be
processed without interruption.
"""
reply_pipe_send, reply_pipe_recv = multiprocessing.Pipe()
reduced_reply_pipe_send = reduction.reduce_connection(reply_pipe_send)
self._request_queue.put(ReqTuple(request, reduced_reply_pipe_send,
query_num, atomic))
return reply_pipe_recv.recv()
def end_atomic_seq(self, query_num):
"""Ends the current atomic sequence.
This enqueues a null request with the given query_num and atomicity set
to False, causing the multiplexer to terminate the atomic sequence.
@param query_num: The unique query number.
"""
self._request_queue.put(ReqTuple(None, None, query_num, False))