blob: 1ce6e13e30e2e3a1e716d3610650ecd12c810ee8 [file] [log] [blame]
// Copyright 2018 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "brillo/grpc/async_grpc_server.h"
#include <base/check.h>
#include <base/check_op.h>
#include <base/logging.h>
#include "brillo/grpc/async_grpc_constants.h"
namespace brillo {
namespace internal {
AsyncGrpcServerBase::AsyncGrpcServerBase(
scoped_refptr<base::SequencedTaskRunner> task_runner,
const std::vector<std::string>& server_uris)
: task_runner_(task_runner), server_uris_(server_uris) {
DCHECK(!server_uris_.empty());
}
AsyncGrpcServerBase::~AsyncGrpcServerBase() = default;
bool AsyncGrpcServerBase::Start() {
CHECK_EQ(State::kNotStarted, state_);
state_ = State::kStarted;
grpc::ServerBuilder builder;
builder.SetMaxSendMessageSize(kMaxGrpcMessageSize);
builder.SetMaxReceiveMessageSize(kMaxGrpcMessageSize);
for (const auto& uri : server_uris_) {
builder.AddListeningPort(uri, grpc::InsecureServerCredentials());
}
builder.RegisterService(service());
completion_queue_ = builder.AddCompletionQueue();
server_ = builder.BuildAndStart();
if (!server_)
return false;
dispatcher_ = std::make_unique<GrpcCompletionQueueDispatcher>(
completion_queue_.get(), task_runner_);
dispatcher_->Start();
for (const auto& rpc_state_factory : rpc_state_factories_)
ExpectNextRpc(rpc_state_factory);
rpc_state_factories_.clear();
return true;
}
void AsyncGrpcServerBase::ShutDown(const base::Closure& on_shutdown) {
CHECK_NE(state_, State::kShutDown);
// Don't do anything if Start() was not called or it failed.
if (state_ != State::kStarted || !server_) {
state_ = State::kShutDown;
on_shutdown.Run();
return;
}
state_ = State::kShutDown;
// Shut down the server immediately, not waiting until the application
// provides responses to pending requests.
server_->Shutdown(gpr_now(GPR_CLOCK_MONOTONIC));
rpcs_awaiting_handler_reply_.clear();
dispatcher_->Shutdown(on_shutdown);
}
void AsyncGrpcServerBase::AddRpcStateFactory(
const RpcStateFactory& rpc_state_factory) {
CHECK_EQ(State::kNotStarted, state_);
rpc_state_factories_.push_back(rpc_state_factory);
}
void AsyncGrpcServerBase::ExpectNextRpc(
const RpcStateFactory& rpc_state_factory) {
std::unique_ptr<RpcStateBase> rpc_state = rpc_state_factory.Run();
RpcStateBase* rpc_state_unowned = rpc_state.get();
dispatcher_->RegisterTag(
rpc_state_unowned->tag(),
base::Bind(&AsyncGrpcServerBase::OnIncomingRpc, base::Unretained(this),
rpc_state_factory, base::Passed(&rpc_state)));
// Note: It is OK to use |rpc_state_unowned| because |dispatcher_| guarantees
// that it will remain alive until the associated tag becomes available
// through the grpc |CompletionQueue|. This can only happen after it has been
// requested.
rpc_state_unowned->RequestRpc(completion_queue_.get());
}
void AsyncGrpcServerBase::OnIncomingRpc(
const RpcStateFactory& rpc_state_factory,
std::unique_ptr<RpcStateBase> rpc_state,
bool ok) {
if (state_ == State::kStarted)
ExpectNextRpc(rpc_state_factory);
// Exit on shutdown. In theory, incoming RPCs with |ok|==false can only happen
// after gRPC shutdown has been triggered, which only happens after |state_|
// has been switched to State::kShutdown, so it would be enough to check
// |state_|. Checking both doesn't hurt and spells out the different
// conditions explicitly.
// The CompletionQueue gives back all pending tags in case of shutdown, so
// that the server has the chance to free memory. Freeing memory happens
// implicitly by letting the |rpc_state| go out of scope here.
if (!ok || state_ == State::kShutDown)
return;
RpcStateBase* rpc_state_unowned = rpc_state.get();
CHECK(rpcs_awaiting_handler_reply_.find(rpc_state_unowned->tag()) ==
rpcs_awaiting_handler_reply_.end());
rpcs_awaiting_handler_reply_.insert(
std::make_pair(rpc_state_unowned->tag(), std::move(rpc_state)));
// Note: It os OK to use |rpc_state_unowned| because it is now owned by
// |rpcs_awaiting_handler_reply_|.
// It is OK to use Unretained here, because |RpcStateBase| guarantees to
// only call the callback if it hasn't been destroyed.
rpc_state_unowned->CallHandler(base::Bind(&AsyncGrpcServerBase::OnHandlerDone,
base::Unretained(this),
rpc_state_unowned->tag()));
}
void AsyncGrpcServerBase::OnHandlerDone(const void* tag) {
auto iter = rpcs_awaiting_handler_reply_.find(tag);
CHECK(iter != rpcs_awaiting_handler_reply_.end());
std::unique_ptr<RpcStateBase> rpc_state = std::move(iter->second);
rpcs_awaiting_handler_reply_.erase(iter);
RpcStateBase* rpc_state_unowned = rpc_state.get();
dispatcher_->RegisterTag(
rpc_state_unowned->tag(),
base::Bind(&AsyncGrpcServerBase::OnResponseSent, base::Unretained(this),
base::Passed(&rpc_state)));
// Note: It is OK to use |rpc_state_unowned| because |dispatcher_| guarantees
// that it will remain alive until the associated tag becomes available
// through the grpc |CompletionQueue|. This can only happen after it has been
// added through |SendReply|.
if (rpc_state_unowned->HasResponse()) {
rpc_state_unowned->SendResponse();
return;
}
rpc_state_unowned->Cancel();
}
void AsyncGrpcServerBase::OnResponseSent(
std::unique_ptr<RpcStateBase> rpc_state, bool ok) {}
} // namespace internal
} // namespace brillo