blob: da85cfdaaf51607b62c7599a4ad1cc5c64668d1f [file] [log] [blame]
// Copyright 2018 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef LIBBRILLO_BRILLO_GRPC_RPC_STATE_H_
#define LIBBRILLO_BRILLO_GRPC_RPC_STATE_H_
#include <memory>
#include <utility>
#include <base/bind.h>
#include <base/callback.h>
#include <base/check.h>
#include <base/macros.h>
#include <base/memory/weak_ptr.h>
#include <brillo/brillo_export.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/support/async_unary_call.h>
namespace base {
class SequencedTaskRunner;
}
namespace brillo {
namespace internal {
// Base class which holds the state of an expected or incoming RPC. Provides an
// interface for driving the RPC which is agnostic of RequestType /
// ResponseType, acting as a proxy towards gRPC and the application's RPC
// handler.
// The templatized |RpcState<RequestType, ResponseType>| class implements this
// interface.
// Lifetime and ownership (same as the templatized RpcState):
// (*) Created when expecting an incoming RPC of this type, i.e. when:
// - the AsyncGrpcServer starts
// - an RPC of this type comes in. A new RpcState(Base) is then created
// to expect the next RPC of this type.
// (*) Owned by the AsyncGrpcServer/GrpcCompletionQueueDispatcher when
// waiting for gRPC-side events.
// (*) Owned by the AsyncGrpcServer when waiting for the handler to provide
// a reply.
// (*) Destroyed when gRPC is done sending the reply for the incoming rpc, or
// when the AsyncGrpcServer is shutting down.
//
// This class has to be exported as RpcState is a templated class
class BRILLO_EXPORT RpcStateBase {
public:
RpcStateBase();
RpcStateBase(const RpcStateBase&) = delete;
RpcStateBase& operator=(const RpcStateBase&) = delete;
virtual ~RpcStateBase();
// Returns the tag uniquely identifying this |RpcStateBase|. Whenever
// this |RpcStateBase| interacts with gRPC, it uses this tag.
void* tag() { return this; }
// Request an incoming RPC of this type in gRPC.
// Pass |server_completion_queue| as the CompletionQueue which should be
// notified both for when an incoming RPC starts and for subsequent RPC
// events (e.g. response has been sent).
virtual void RequestRpc(
grpc::ServerCompletionQueue* server_completion_queue) = 0;
// Call the handler which should provide a reply. Invoke |on_handler_done|
// when the handler has provided a reply (this could be a response, or a
// cancellation). Guarantees that |on_handler_done| will only be called if
// this object has not been destroyed.
virtual void CallHandler(const base::Closure& on_handler_done) = 0;
// Returns true if the handler has provided a response. If this returns false,
// it means that the RPC should be cancelled.
virtual bool HasResponse() = 0;
// Forward the reply to gRPC. This is called by the AsyncGrpcServer(Base)
// after the handler has provided a reply, and the server has prepared to
// receive the |tag()| again.
virtual void SendResponse() = 0;
// Cancel this RPC in gRPC.
virtual void Cancel() = 0;
protected:
grpc::ServerContext ctx_;
};
// Templatized version of |RpcStateBase|. Implements the actual RequestType /
// ResponseType specific calls to gRPC and the handler callback.
// Lifetime and ownership: See |RpcStateBase|.
template <typename RequestType, typename ResponseType>
class RpcState final : public RpcStateBase {
public:
// Call to grpc to request the next RPC of this type.
using RequestRpcCallback =
base::Callback<void(grpc::ServerContext*,
RequestType*,
grpc::ServerAsyncResponseWriter<ResponseType>*,
grpc::CompletionQueue*,
grpc::ServerCompletionQueue*,
void*)>;
// Called by the handler to send |status| and |response|.
using HandlerDoneCallback = base::Callback<void(
grpc::Status status, std::unique_ptr<ResponseType> response)>;
// The handler callback - will be invoked to compute a response for a request.
using HandlerCallback =
base::Callback<void(std::unique_ptr<RequestType> request,
const HandlerDoneCallback& send_response_callback)>;
RpcState(RequestRpcCallback request_rpc_closure,
const HandlerCallback& handler_callback)
: request_rpc_closure_(request_rpc_closure),
handler_callback_(handler_callback),
responder_(&ctx_) {}
RpcState(const RpcState&) = delete;
RpcState& operator=(const RpcState&) = delete;
~RpcState() = default;
void RequestRpc(
grpc::ServerCompletionQueue* server_completion_queue) override {
request_rpc_closure_.Run(&ctx_, request_.get(), &responder_,
server_completion_queue, server_completion_queue,
tag());
}
void CallHandler(const base::Closure& on_handler_done) override {
// Ensure that this is not called twice.
CHECK(request_);
handler_callback_.Run(
std::move(request_),
base::Bind(&RpcState::OnHandlerDone, weak_ptr_factory_.GetWeakPtr(),
on_handler_done));
}
bool HasResponse() override { return response_.get() != nullptr; }
void SendResponse() override {
CHECK(response_);
responder_.Finish(*response_, status_, tag());
}
void Cancel() override {
grpc::Status error_status(grpc::StatusCode::UNKNOWN,
"Cancelled by application");
responder_.FinishWithError(error_status, tag());
}
private:
// This will be invoked by the handler when it provides us a |status| and a
// |response|.
void OnHandlerDone(const base::Closure& on_handler_done,
grpc::Status status,
std::unique_ptr<ResponseType> response) {
// Ideally, we would CHECK that |OnHandlerDone| is only called once, but
// that would require introducing a dedicated boolean flag, which seems to
// be overkill.
CHECK(!response_);
response_ = std::move(response);
status_ = std::move(status);
on_handler_done.Run();
}
RequestRpcCallback request_rpc_closure_;
HandlerCallback handler_callback_;
std::unique_ptr<RequestType> request_ = std::make_unique<RequestType>();
// The response generated by the handler. If this is not set after the handler
// has finished processing this RPC, i.e. when |OnHandlerDone()| is called, it
// means that the RPC has been cancelled.
std::unique_ptr<ResponseType> response_;
grpc::Status status_;
grpc::ServerAsyncResponseWriter<ResponseType> responder_;
base::WeakPtrFactory<RpcState> weak_ptr_factory_{this};
};
} // namespace internal
} // namespace brillo
#endif // LIBBRILLO_BRILLO_GRPC_RPC_STATE_H_