// Copyright 2018 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include <base/bind.h>
#include <base/callback.h>
#include <base/macros.h>
#include <base/memory/ref_counted.h>
#include <base/sequenced_task_runner.h>
#include <grpcpp/grpcpp.h>
#include "diagnostics/grpc_async_adapter/grpc_completion_queue_dispatcher.h"
#include "diagnostics/grpc_async_adapter/rpc_state.h"
namespace diagnostics {
namespace internal {
// Base class for the asynchronous RPC Server, contains functionality that does
// not depend on the actual gRPC Service class.
// The server creates an object of type |RpcState| for each RPC that is
// expected. It uses factory functions bound in
// |AsyncGrpcServer::RegisterHandler| to create these |RpcState| objects. The
// AsyncGrpcServerBase then drives incoming RPCs by interacting with the
// |RpcStateBase| interface of the |RpcState| objects. This interface hides the
// RPC-specific details (such as RequestType, ResponseType) and acts as proxy
// towards gRPC and the application's RPC handler.
class AsyncGrpcServerBase {
// A factory function which creates an |RpcStateBase| for an expected
// RPC type.
using RpcStateFactory = base::Callback<std::unique_ptr<RpcStateBase>()>;
AsyncGrpcServerBase(scoped_refptr<base::SequencedTaskRunner> task_runner,
const std::vector<std::string>& server_uris);
virtual ~AsyncGrpcServerBase();
// Starts this server. When this returns failure, no further methods are
// allowed to be called, except Shutdown() - which is allowed but not required
// in this case.
// This function must not be called twice.
bool Start();
// Shuts down this server. This must be used before deleting this instance in
// case when the server successfully started - the instance must be destroyed
// only after |on_shutdown| has been called.
// If this server has not been successfully started, calling Shutdown() is
// optional but allowed (|on_shutdown_| will be called immediately in this
// case).
// This function must not be called twice.
void ShutDown(const base::Closure& on_shutdown);
// Returns the grpc::Service instance this server is exposing.
virtual grpc::Service* service() = 0;
// Adds |rpc_state_factory| which will be used to create a |RpcStateBase|
// instance for an RPC type.
void AddRpcStateFactory(const RpcStateFactory& rpc_state_factory);
enum class State { kNotStarted, kStarted, kShutDown };
// Expects the next RPC of the type described by |rpc_state_factory|.
// In detail, uses |rpc_state_factory| to create a |RpcStateBase| object
// for the expected RPC, registers the state's tag with |dispatcher_|, and
// requests the RPC in gRPC. After this function, ownership of the created
// |RpcStateBase| has been transferred to |dispatcher_| (wrapped in a bound
// Callback argument).
void ExpectNextRpc(const RpcStateFactory& rpc_state_factory);
// Called on an incoming RPC. |rpc_state| holds all state about that RPC.
// |rpc_state_factory| for the RPC type is passed in too so this function can
// start expecting the next RPC.
// |ok| is the gRPC |CompletionQueue| ok parameter.
// After this, ownership of the |RpcStateBase| has been transferred to
// |rpcs_awaiting_handler_reply_|.
void OnIncomingRpc(const RpcStateFactory& rpc_state_factory,
std::unique_ptr<RpcStateBase> rpc_state,
bool ok);
// Called when the handler has made a reply available for the RpcState
// identified by |tag|. This registers the |RpcStateBase|'s tag with
// |dispatcher_| again and actually sends the reply or cancellation.
// After this function, ownership of the |RpcStateBase| has
// been transferred to |dispatcher_| (wrapped in a bound Callback argument).
void OnHandlerDone(const void* tag);
// Called when the response for the RPC described by |rpc_state| has been
// sent. |ok| is the gRPC |CompletionQueue| ok parameter.
void OnResponseSent(std::unique_ptr<RpcStateBase> rpc_state, bool ok);
// State of this server.
State state_ = State::kNotStarted;
// The TaskRunner used for |dispatcher_|.
scoped_refptr<base::SequencedTaskRunner> task_runner_;
// The addresses this server listens on.
const std::vector<std::string> server_uris_;
// The gRPC |Server| instance.
std::unique_ptr<grpc::Server> server_;
// The |ServerCompletionQueue| associated with |server_|. This is the
// completion queue |dispatcher_| monitors.
std::unique_ptr<grpc::ServerCompletionQueue> completion_queue_;
// Monitors |completion_queue_| and for available tags and posts tasks to
// |task_runner_|.
std::unique_ptr<GrpcCompletionQueueDispatcher> dispatcher_;
// Factories that are used to create |RpcState| objects. One such object is
// needed per expected/incoming RPC. This is used to accumulate the factories
// registered through |AsyncGrpcServer::RegisterHandler| /
// |AddRpcStateFactory| before |Start| is called and will be cleared in
// |Start|.
std::vector<RpcStateFactory> rpc_state_factories_;
// Holds all |RpcState| objects which have been passed to the corresponding
// Handler but do not have a reply yet.
std::map<const void*, std::unique_ptr<RpcStateBase>>
} // namespace internal
// Templatized concrete class implementing an asynchronous gRPC server receiving
// RPCs defined by |AsyncService| on a task runner. Each RPC which should be
// handled must be registered using |RegisterHandler| before starting the server
// using |AsyncGrpcServerBase::Start|.
// Example usage:
// AsyncGrpcServer<Foo> server(base::ThreadTaskRunnerHandle::Get(),
// "unix:/path/to/socket");
// server.RegisterHandler(&FooService::AsyncService::RequestDoSomething,
// do_something_handler);
// server.RegisterHandler(&FooService::AsyncService::RequestDoOtherThing,
// do_other_thing_handler);
// server.Start();
// // ...
// server.Shutdown(on_shutdown_callback);
// // Important: Make sure |server| is not destroyed before
// // |on_shutdown_callback| is called.
// The handlers (e.g. |do_something_handler| in the example) have the following
// form:
// void DoSomethingHandler(std::unique_ptr<DoSomethingRequest> request,
// const base::Callback<void (std::unique_ptr<DoSomethingResponse)>
// send_response_callback>);
template <typename AsyncService>
class AsyncGrpcServer final : public internal::AsyncGrpcServerBase {
using RpcStateBase = internal::RpcStateBase;
template <typename RequestType, typename ResponseType>
using RpcState = internal::RpcState<RequestType, ResponseType>;
// Creates a server which exposes |service| on each URI in |server_uris|.
// It will post tasks for processing incoming RPCs on |task_runner|.
AsyncGrpcServer(scoped_refptr<base::SequencedTaskRunner> task_runner,
const std::vector<std::string>& server_uris)
: internal::AsyncGrpcServerBase(task_runner, server_uris),
service_(std::make_unique<AsyncService>()) {}
~AsyncGrpcServer() = default;
// A factory function which creates a |RpcState<RequestType, ResponseType>|.
template <typename RequestType, typename ResponseType>
static std::unique_ptr<RpcStateBase> RpcStateFactoryFunction(
const typename RpcState<RequestType, ResponseType>::RequestRpcCallback&
const typename RpcState<RequestType, ResponseType>::HandlerCallback&
handler_callback) {
return std::make_unique<RpcState<RequestType, ResponseType>>(
request_rpc_callback, handler_callback);
// A member function pointer which has the signature of functions used to
// request an async RPC on a GRPC AsyncService class.
// Note that the |AsyncService| class-level template argument is not used
// here, because the |request_rpc_function| could be defined on a base class.
template <typename AsyncServiceBase,
typename RequestType,
typename ResponseType>
using RequestRpcFunction =
void (AsyncServiceBase::*)(grpc::ServerContext*,
// Makes this server process RPCs of the type specified by
// |request_rpc_function|. When such an RPC is received, this server will call
// |handler_callback| on the task runner passed to the constructor.
// Note that the |AsyncService| class-level template argument is not used
// here, because the |request_rpc_function| could be defined on a base class.
// This should be called before the server is started using |Start()|.
template <typename AsyncServiceBase,
typename RequestType,
typename ResponseType>
void RegisterHandler(
RequestRpcFunction<AsyncServiceBase, RequestType, ResponseType>
const typename RpcState<RequestType, ResponseType>::HandlerCallback&
handler_callback) {
auto request_rpc_callback =
base::Bind(request_rpc_function, base::Unretained(service_.get()));
&AsyncGrpcServer::RpcStateFactoryFunction<RequestType, ResponseType>,
request_rpc_callback, handler_callback));
// AsyncGrpcServerBase:
grpc::Service* service() override { return service_.get(); }
std::unique_ptr<AsyncService> service_;
} // namespace diagnostics