diagnostics: Introduce grpc_async_adapter::GrpcCompletionQueueDispatcher
Introduce the first building block for grpc_async_adapter in diagnostics:
The GrpcCompletionQueueDispatcher bridges a grpc CompletionQueue
which is used to interact with asynchronous grpc operations and a
chromium-style MessageLoop.
BUG=chromium:869377
CQ-DEPEND=CL:1174265
TEST=cros_workon_make --board=${BOARD} diagnostics --test
Change-Id: Ica8641060bf5d2f297665e4e6b70c5f160de46e1
Reviewed-on: https://chromium-review.googlesource.com/1174263
Commit-Ready: Pavol Marko <pmarko@chromium.org>
Tested-by: Pavol Marko <pmarko@chromium.org>
Reviewed-by: Maksim Ivanov <emaxx@chromium.org>
diff --git a/diagnostics/diagnostics.gyp b/diagnostics/diagnostics.gyp
index f2d14d9..41ae827 100644
--- a/diagnostics/diagnostics.gyp
+++ b/diagnostics/diagnostics.gyp
@@ -1,8 +1,34 @@
{
'targets': [
{
+ 'target_name': 'libgrpc_async_adapter',
+ 'type': 'static_library',
+ 'variables': {
+ 'exported_deps': [
+ 'grpc++',
+ 'libchrome-<(libbase_ver)',
+ ],
+ 'deps': [
+ '<@(exported_deps)',
+ ],
+ },
+ 'all_dependent_settings': {
+ 'variables': {
+ 'deps': [
+ '<@(exported_deps)',
+ ],
+ },
+ },
+ 'sources': [
+ 'grpc_async_adapter/grpc_completion_queue_dispatcher.cc',
+ ],
+ },
+ {
'target_name': 'diagnosticsd',
'type': 'executable',
+ 'dependencies': [
+ 'libgrpc_async_adapter',
+ ],
'includes': ['mojom_generator.gypi'],
'variables': {
'deps': [
@@ -19,6 +45,29 @@
['USE_test == 1', {
'targets': [
{
+ 'target_name': 'libgrpc_async_adapter_test',
+ 'type': 'executable',
+ 'includes': ['../common-mk/common_test.gypi'],
+ 'dependencies': [
+ '../common-mk/testrunner.gyp:testrunner',
+ 'libgrpc_async_adapter',
+ ],
+ 'sources': [
+ 'grpc_async_adapter/grpc_completion_queue_dispatcher_test.cc',
+ ],
+ 'variables': {
+ 'deps': [
+ 'libchrome-<(libbase_ver)',
+ 'libchrome-test-<(libbase_ver)',
+ ],
+ },
+ 'link_settings': {
+ 'libraries': [
+ '-lgpr',
+ ],
+ },
+ },
+ {
'target_name': 'diagnosticsd_test',
'type': 'executable',
'includes': ['../common-mk/common_test.gypi'],
diff --git a/diagnostics/grpc_async_adapter/grpc_completion_queue_dispatcher.cc b/diagnostics/grpc_async_adapter/grpc_completion_queue_dispatcher.cc
new file mode 100644
index 0000000..b4a76a9
--- /dev/null
+++ b/diagnostics/grpc_async_adapter/grpc_completion_queue_dispatcher.cc
@@ -0,0 +1,161 @@
+// Copyright 2018 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "diagnostics/grpc_async_adapter/grpc_completion_queue_dispatcher.h"
+
+#include <utility>
+
+#include <base/bind.h>
+#include <base/location.h>
+#include <base/logging.h>
+#include <base/sequenced_task_runner.h>
+#include <grpc++/grpc++.h>
+
+namespace diagnostics {
+namespace internal {
+
+// This is the background ("monitoring") thread delegate used by
+// |GrpcCompletionQueueDispatcher|.
+class MonitoringThreadDelegate : public base::DelegateSimpleThread::Delegate {
+ public:
+ using OnTagAvailableCallback = base::Callback<void(const void* tag, bool ok)>;
+ using OnShutdownCallback = base::Closure;
+
+ // |GrpcCompletionQueueDispatcher| guarantees that the unowned pointers
+ // outlive this delegate. This delegate will post |on_tag_available_callback|
+ // on the |task_runner| when a tag is available on |completion_queue|, and it
+ // will post |on_shutdown_callback| on the |task_runner| when it is shutting
+ // down.
+ MonitoringThreadDelegate(grpc::CompletionQueue* completion_queue,
+ base::SequencedTaskRunner* task_runner,
+ OnTagAvailableCallback on_tag_available_callback,
+ OnShutdownCallback on_shutdown_callback)
+ : completion_queue_(completion_queue),
+ task_runner_(task_runner),
+ on_tag_available_callback_(on_tag_available_callback),
+ on_shutdown_callback_(on_shutdown_callback) {}
+
+ ~MonitoringThreadDelegate() override = default;
+
+ // base::DelegateSimpleThread::Delegate:
+ void Run() override {
+ // This runs on the background thread. It monitors |completion_queue_| and
+ // posts task to |task_runner_|.
+ while (true) {
+ void* tag;
+ bool ok;
+
+ if (completion_queue_->Next(&tag, &ok)) {
+ task_runner_->PostTask(FROM_HERE,
+ base::Bind(on_tag_available_callback_, tag, ok));
+ } else {
+ // Next() returned false, which means that this queue has shut down.
+ // Exit this 'event loop'.
+ break;
+ }
+ }
+
+ task_runner_->PostTask(FROM_HERE, on_shutdown_callback_);
+ }
+
+ private:
+ // The |CompletionQueue| that this object is monitoring.
+ // Not owned.
+ grpc::CompletionQueue* const completion_queue_;
+ // The |SequencedTaskRunner| this object is posting tasks to. It is accessed
+ // from the monitoring thread.
+ // Not owned.
+ base::SequencedTaskRunner* const task_runner_;
+
+ OnTagAvailableCallback on_tag_available_callback_;
+ OnShutdownCallback on_shutdown_callback_;
+};
+
+} // namespace internal
+
+GrpcCompletionQueueDispatcher::GrpcCompletionQueueDispatcher(
+ grpc::CompletionQueue* completion_queue,
+ scoped_refptr<base::SequencedTaskRunner> task_runner)
+ : completion_queue_(completion_queue), task_runner_(task_runner) {
+ CHECK(task_runner_);
+ CHECK(completion_queue_);
+}
+
+GrpcCompletionQueueDispatcher::~GrpcCompletionQueueDispatcher() {
+ CHECK(sequence_checker_.CalledOnValidSequencedThread());
+ // Ensure that this |GrpcCompletionQueueDispatcher| has been shut down
+ // properly.
+ CHECK(!monitoring_thread_);
+ CHECK(tag_to_callback_map_.empty());
+}
+
+void GrpcCompletionQueueDispatcher::Start() {
+ CHECK(sequence_checker_.CalledOnValidSequencedThread());
+ CHECK(!monitoring_thread_);
+ // Create the delegate which will be run on the background thread.
+ // It is OK to pass unowned pointers and use |base::Unretained| because:
+ // - |GrpcCompletionQueueDispatcher| CHECK-fails if it's destroyed
+ // before |OnShutdown| has been called
+ // - |task_runner_| is a |SequencedTaskRunner|
+ // - |OnShutdown| is posted as the last thing before the background thread
+ // exits.
+ monitoring_thread_delegate_ =
+ std::make_unique<internal::MonitoringThreadDelegate>(
+ completion_queue_, task_runner_.get(),
+ base::Bind(&GrpcCompletionQueueDispatcher::OnTagAvailable,
+ base::Unretained(this)),
+ base::Bind(&GrpcCompletionQueueDispatcher::OnShutdown,
+ base::Unretained(this)));
+ monitoring_thread_ = std::make_unique<base::DelegateSimpleThread>(
+ monitoring_thread_delegate_.get(),
+ "GrpcCompletionQueueDispatcher" /* name_prefix */);
+ monitoring_thread_->Start();
+}
+
+void GrpcCompletionQueueDispatcher::Shutdown(
+ base::Closure on_shutdown_callback) {
+ CHECK(sequence_checker_.CalledOnValidSequencedThread());
+ CHECK(monitoring_thread_);
+ CHECK(on_shutdown_callback_.is_null());
+ CHECK(!on_shutdown_callback.is_null());
+
+ on_shutdown_callback_ = on_shutdown_callback;
+ completion_queue_->Shutdown();
+}
+
+void GrpcCompletionQueueDispatcher::RegisterTag(const void* tag,
+ TagAvailableCallback callback) {
+ CHECK(sequence_checker_.CalledOnValidSequencedThread());
+ CHECK(tag_to_callback_map_.find(tag) == tag_to_callback_map_.end());
+ tag_to_callback_map_.insert(std::make_pair(tag, callback));
+}
+
+void GrpcCompletionQueueDispatcher::OnTagAvailable(const void* tag, bool ok) {
+ CHECK(sequence_checker_.CalledOnValidSequencedThread());
+ auto iter = tag_to_callback_map_.find(tag);
+ // Treat tags received from the |CompletionQueue| that we're not expecting as
+ // fatal errors.
+ CHECK(iter != tag_to_callback_map_.end());
+ TagAvailableCallback callback = iter->second;
+ tag_to_callback_map_.erase(iter);
+ callback.Run(ok);
+}
+
+void GrpcCompletionQueueDispatcher::OnShutdown() {
+ CHECK(sequence_checker_.CalledOnValidSequencedThread());
+ tag_to_callback_map_.clear();
+
+ monitoring_thread_->Join();
+ monitoring_thread_.reset();
+ monitoring_thread_delegate_.reset();
+
+ if (!on_shutdown_callback_.is_null()) {
+ // Post the |on_shutdown_callback_| on the task runner instead of calling it
+ // directly, allowing the owner of this instance to delete it in the context
+ // of |on_shutdown_callback_|.
+ task_runner_->PostTask(FROM_HERE, on_shutdown_callback_);
+ }
+}
+
+} // namespace diagnostics
diff --git a/diagnostics/grpc_async_adapter/grpc_completion_queue_dispatcher.h b/diagnostics/grpc_async_adapter/grpc_completion_queue_dispatcher.h
new file mode 100644
index 0000000..d618226
--- /dev/null
+++ b/diagnostics/grpc_async_adapter/grpc_completion_queue_dispatcher.h
@@ -0,0 +1,119 @@
+// Copyright 2018 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef DIAGNOSTICS_GRPC_ASYNC_ADAPTER_GRPC_COMPLETION_QUEUE_DISPATCHER_H_
+#define DIAGNOSTICS_GRPC_ASYNC_ADAPTER_GRPC_COMPLETION_QUEUE_DISPATCHER_H_
+
+#include <map>
+#include <memory>
+
+#include <base/callback.h>
+#include <base/macros.h>
+#include <base/memory/ref_counted.h>
+#include <base/sequence_checker.h>
+#include <base/threading/simple_thread.h>
+
+namespace base {
+class SequencedTaskRunner;
+}
+
+namespace grpc {
+class CompletionQueue;
+}
+
+namespace diagnostics {
+namespace internal {
+class MonitoringThreadDelegate;
+}
+
+// A GrpcCompletionQueueDispatcher monitors a grpc |CompletionQueue| for
+// signalled events and posts tasks to a |SequencedTaskRunner| as a result.
+// It uses a background thread to block on the |CompletionQueue|'s next event in
+// a loop because this is a blocking operation.
+// The |GrpcCompletionQueueDispatcher| may be constructed from anywhere, but its
+// public methods should only be called on the same task runner that was passed
+// to its constructor.
+// This class assumes that every tag that is received from the |CompletionQueue|
+// is expected, i.e. that |RegisterTag| has been called for every tag.
+class GrpcCompletionQueueDispatcher {
+ public:
+ // Callbacks of this type will be called on the task runner passed
+ // to the constructor when an expected event is available on the monitored
+ // |CompletionQueue|. |ok| has an operation-specific meaning, see grpc's
+ // |CompletionQueue::Next| documentation for details.
+ using TagAvailableCallback = base::Callback<void(bool ok)>;
+
+ // The constructed object will monitor |completion_queue| and post tasks to
+ // |task_runner|. Note that the |GrpcCompletionQueueDispatcher| only
+ // starts monitoring the |completion_queue| when |Start| is called.
+ // |completion_queue| should outlive this object.
+ GrpcCompletionQueueDispatcher(
+ grpc::CompletionQueue* completion_queue,
+ scoped_refptr<base::SequencedTaskRunner> task_runner);
+
+ // Note that the destructor CHECKs that this instance has been shut down
+ // properly using |Shutdown|.
+ ~GrpcCompletionQueueDispatcher();
+
+ // Starts the background thread and consequently starts monitoring the
+ // |CompletionQueue| passed to the constructor.
+ void Start();
+
+ // Triggers shutting down the |CompletionQueue| and this
+ // |GrpcCompletionQueueDispatcher|.
+ // |on_shutdown_callback| will be called when the |CompletionQueue| is fully
+ // drained and background thread has shut down. Only then may this instance be
+ // destroyed.
+ void Shutdown(base::Closure on_shutdown_callback);
+
+ // Starts waiting for an event with |tag|. If |tag| has been or will be sent
+ // (through RPC operations or alarms) to the CompletionQueue, |callback| is
+ // guaranteed to be called exactly once on the task runner passed to the
+ // constructor. The reason is that the CompletionQueue itself guarantees that
+ // every tag sent to the completion queue will be delivered out of the
+ // completion queue. |GrpcCompletionQueueDispatcher| additionally guarantees
+ // that if |callback| is never called (because |tag| was not sent to the
+ // |CompletionQueue|), |callback| will be destroyed on shutdown on the
+ // |TaskRunner| passed to the constructor.
+ void RegisterTag(const void* tag, TagAvailableCallback callback);
+
+ // Returns the monitored |CompletionQueue|.
+ grpc::CompletionQueue* completion_queue() { return completion_queue_; }
+
+ private:
+ // This is called on the |task_runner_| when |tag| has been delivered out of
+ // the |completion_queue_|.
+ void OnTagAvailable(const void* tag, bool ok);
+
+ // This is called on the |task_runner_| when the background thread is shutting
+ // down because the |completion_queue_| has no more events.
+ void OnShutdown();
+
+ // The |CompletionQueue| that this object is monitoring.
+ // Not owned.
+ grpc::CompletionQueue* const completion_queue_;
+ // The |SequencedTaskRunner| this object is posting tasks to.
+ scoped_refptr<base::SequencedTaskRunner> task_runner_;
+
+ // The delegate which will be executed on the |monitoring_thread|.
+ std::unique_ptr<internal::MonitoringThreadDelegate>
+ monitoring_thread_delegate_;
+ // The background thread monitoring the |completion_queue_| and posting tasks
+ // back on the task runner.
+ std::unique_ptr<base::DelegateSimpleThread> monitoring_thread_;
+
+ // This callback will be invoked when the moniting thread is exiting.
+ base::Closure on_shutdown_callback_;
+ // Maps tags to the callbacks that should be run on the |task_runner_| when
+ // the corresponding event fires.
+ std::map<const void*, TagAvailableCallback> tag_to_callback_map_;
+
+ base::SequenceChecker sequence_checker_;
+
+ DISALLOW_COPY_AND_ASSIGN(GrpcCompletionQueueDispatcher);
+};
+
+} // namespace diagnostics
+
+#endif // DIAGNOSTICS_GRPC_ASYNC_ADAPTER_GRPC_COMPLETION_QUEUE_DISPATCHER_H_
diff --git a/diagnostics/grpc_async_adapter/grpc_completion_queue_dispatcher_test.cc b/diagnostics/grpc_async_adapter/grpc_completion_queue_dispatcher_test.cc
new file mode 100644
index 0000000..be73e6f
--- /dev/null
+++ b/diagnostics/grpc_async_adapter/grpc_completion_queue_dispatcher_test.cc
@@ -0,0 +1,261 @@
+// Copyright 2018 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "diagnostics/grpc_async_adapter/grpc_completion_queue_dispatcher.h"
+
+#include <list>
+#include <memory>
+
+#include <base/bind.h>
+#include <base/callback.h>
+#include <base/location.h>
+#include <base/macros.h>
+#include <base/message_loop/message_loop.h>
+#include <base/run_loop.h>
+#include <base/task_runner.h>
+#include <base/time/time.h>
+#include <gmock/gmock.h>
+#include <grpc++/alarm.h>
+#include <grpc++/grpc++.h>
+#include <gtest/gtest.h>
+
+namespace diagnostics {
+
+namespace {
+
+// Allows testing if a callback has been invoked, and the value of the
+// grpc-specific |ok| bool parameter.
+class TagAvailableCalledTester {
+ public:
+ TagAvailableCalledTester() = default;
+ ~TagAvailableCalledTester() = default;
+
+ GrpcCompletionQueueDispatcher::TagAvailableCallback
+ GetTagAvailableCallback() {
+ return base::Bind(&TagAvailableCalledTester::Callback,
+ base::Unretained(this));
+ }
+
+ // Bind this to a RegisterTag call of the
+ // |GrpcCompletionQueueDispatcher|. Will check that it is invoked at
+ // most once, remember the value of |ok|, and call the closure passed to
+ // |CallWhenInvoked|, if any.
+ void Callback(bool ok) {
+ CHECK(!has_been_called_);
+ has_been_called_ = true;
+ value_of_ok_ = ok;
+
+ std::list<base::Closure> callbacks_temp;
+ callbacks_temp.swap(call_when_invoked_);
+ for (auto& callback : callbacks_temp)
+ callback.Run();
+ }
+
+ // Register |call_when_invoked| to be called when |Callback| is called.
+ void CallWhenInvoked(base::Closure call_when_invoked) {
+ call_when_invoked_.push_back(call_when_invoked);
+ }
+
+ // Returns true if |Callback| has been called.
+ bool has_been_called() const { return has_been_called_; }
+
+ // Only call if |has_been_called()| is returning true. Returns the value of
+ // |ok| passed to |Callback|.
+ bool value_of_ok() const {
+ CHECK(has_been_called());
+ return value_of_ok_;
+ }
+
+ private:
+ bool has_been_called_ = false;
+ bool value_of_ok_ = false;
+ std::list<base::Closure> call_when_invoked_;
+
+ DISALLOW_COPY_AND_ASSIGN(TagAvailableCalledTester);
+};
+
+// Allows testing if an object (owned by callback) has been destroyed. Also
+// tests that this is destroyed on the same message loop it has been
+// instantiated on.
+class ObjectDestroyedTester {
+ public:
+ // Will set |*has_been_destroyed| to true when this instance is being
+ // destroyed.
+ explicit ObjectDestroyedTester(bool* has_been_destroyed)
+ : expected_message_loop_(base::MessageLoop::current()),
+ has_been_destroyed_(has_been_destroyed) {
+ *has_been_destroyed_ = false;
+ }
+
+ ~ObjectDestroyedTester() {
+ EXPECT_TRUE(
+ expected_message_loop_->task_runner()->RunsTasksOnCurrentThread());
+ *has_been_destroyed_ = true;
+ }
+
+ private:
+ base::MessageLoop* const expected_message_loop_;
+ bool* const has_been_destroyed_;
+
+ DISALLOW_COPY_AND_ASSIGN(ObjectDestroyedTester);
+};
+
+// An adapter to be able to give a Callback to
+// |GrpcCompletionQueueDispatcher::RegisterTag| which owns an
+// |ObjectDestroyedTester|.
+void ObjectDestroyedTesterAdapter(
+ TagAvailableCalledTester* tag_available_called_tester,
+ std::unique_ptr<ObjectDestroyedTester> object_destroyed_tester,
+ bool ok) {
+ tag_available_called_tester->Callback(ok);
+}
+
+gpr_timespec GprTimespecWithDeltaFromNow(base::TimeDelta delta) {
+ return gpr_time_add(
+ gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_millis(delta.InMilliseconds(), GPR_TIMESPAN));
+}
+
+} // namespace
+
+class GrpcCompletionQueueDispatcherTest : public ::testing::Test {
+ public:
+ GrpcCompletionQueueDispatcherTest()
+ : dispatcher_(&completion_queue_,
+ base::MessageLoop::current()->task_runner()) {
+ dispatcher_.Start();
+ }
+
+ ~GrpcCompletionQueueDispatcherTest() override = default;
+
+ protected:
+ base::MessageLoopForIO message_loop_;
+ grpc::CompletionQueue completion_queue_;
+
+ // The dispatcher under test.
+ GrpcCompletionQueueDispatcher dispatcher_;
+
+ // Note: This can't be |const void*| because gRPC functions expect |void*|.
+ void* const kTag = reinterpret_cast<void*>(1);
+
+ void ShutdownDispatcher() {
+ base::RunLoop run_loop;
+ dispatcher_.Shutdown(run_loop.QuitClosure());
+ run_loop.Run();
+ }
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(GrpcCompletionQueueDispatcherTest);
+};
+
+// Start and shutdown a dispatcher, with no tags posted to the underlying
+// CompletionQueue.
+TEST_F(GrpcCompletionQueueDispatcherTest, StartAndShutdownEmpty) {
+ ShutdownDispatcher();
+}
+
+// Register a tag that is not passed to the CompletionQueue. Check that the
+// callback is never called, but that it is properly destroyed. This also
+// demonstrates that instances passed to the callback using base::Passed are
+// properly destroyed in this case.
+TEST_F(GrpcCompletionQueueDispatcherTest, TagNeverAvailable) {
+ bool object_has_been_destroyed = false;
+ auto object_destroyed_tester =
+ std::make_unique<ObjectDestroyedTester>(&object_has_been_destroyed);
+
+ TagAvailableCalledTester tag_available_called_tester;
+ dispatcher_.RegisterTag(
+ nullptr,
+ base::Bind(&ObjectDestroyedTesterAdapter, &tag_available_called_tester,
+ base::Passed(&object_destroyed_tester)));
+
+ ShutdownDispatcher();
+
+ EXPECT_FALSE(tag_available_called_tester.has_been_called());
+ EXPECT_TRUE(object_has_been_destroyed);
+}
+
+// Register a tag that becomes available with |ok=true|. Verify that the
+// registered callback is called with |ok=true|.
+TEST_F(GrpcCompletionQueueDispatcherTest,
+ CompletionQueueTagAvailableWithOkTrue) {
+ base::RunLoop run_loop;
+ TagAvailableCalledTester tag_available_called_tester;
+ tag_available_called_tester.CallWhenInvoked(run_loop.QuitClosure());
+
+ dispatcher_.RegisterTag(
+ kTag, tag_available_called_tester.GetTagAvailableCallback());
+
+ grpc::Alarm alarm(
+ &completion_queue_,
+ GprTimespecWithDeltaFromNow(base::TimeDelta::FromMilliseconds(1)), kTag);
+ run_loop.Run();
+
+ EXPECT_TRUE(tag_available_called_tester.has_been_called());
+ EXPECT_TRUE(tag_available_called_tester.value_of_ok());
+
+ ShutdownDispatcher();
+}
+
+// Register a tag that becomes available with |ok=false|. Verify that the
+// regitered callback is called with |ok=false|.
+TEST_F(GrpcCompletionQueueDispatcherTest,
+ CompletionQueueTagAvailableWithOkFalse) {
+ base::RunLoop run_loop;
+ TagAvailableCalledTester tag_available_called_tester;
+ tag_available_called_tester.CallWhenInvoked(run_loop.QuitClosure());
+
+ dispatcher_.RegisterTag(
+ kTag, tag_available_called_tester.GetTagAvailableCallback());
+
+ grpc::Alarm alarm(&completion_queue_,
+ GprTimespecWithDeltaFromNow(base::TimeDelta::FromHours(24)),
+ kTag);
+ alarm.Cancel();
+ run_loop.Run();
+
+ EXPECT_TRUE(tag_available_called_tester.has_been_called());
+ EXPECT_FALSE(tag_available_called_tester.value_of_ok());
+
+ ShutdownDispatcher();
+}
+
+// Re-register a tag that becomes available in the context of the tag's
+// callback.
+TEST_F(GrpcCompletionQueueDispatcherTest, ReregisterTag) {
+ base::RunLoop run_loop_1;
+ TagAvailableCalledTester tag_available_called_tester_1;
+ base::RunLoop run_loop_2;
+ TagAvailableCalledTester tag_available_called_tester_2;
+
+ dispatcher_.RegisterTag(
+ kTag, tag_available_called_tester_1.GetTagAvailableCallback());
+ auto reregister_tag_callback =
+ base::Bind(&GrpcCompletionQueueDispatcher::RegisterTag,
+ base::Unretained(&dispatcher_), kTag,
+ tag_available_called_tester_2.GetTagAvailableCallback());
+ tag_available_called_tester_1.CallWhenInvoked(reregister_tag_callback);
+ tag_available_called_tester_1.CallWhenInvoked(run_loop_1.QuitClosure());
+
+ tag_available_called_tester_2.CallWhenInvoked(run_loop_2.QuitClosure());
+
+ grpc::Alarm alarm_1(
+ &completion_queue_,
+ GprTimespecWithDeltaFromNow(base::TimeDelta::FromMilliseconds(1)), kTag);
+ run_loop_1.Run();
+
+ grpc::Alarm alarm_2(
+ &completion_queue_,
+ GprTimespecWithDeltaFromNow(base::TimeDelta::FromMilliseconds(1)), kTag);
+ run_loop_2.Run();
+
+ EXPECT_TRUE(tag_available_called_tester_1.has_been_called());
+ EXPECT_TRUE(tag_available_called_tester_1.value_of_ok());
+ EXPECT_TRUE(tag_available_called_tester_2.has_been_called());
+ EXPECT_TRUE(tag_available_called_tester_2.value_of_ok());
+
+ ShutdownDispatcher();
+}
+
+} // namespace diagnostics