diagnostics: Introduce grpc_async_adapter::GrpcCompletionQueueDispatcher

Introduce the first building block for grpc_async_adapter in diagnostics:
The GrpcCompletionQueueDispatcher bridges a grpc CompletionQueue
which is used to interact with asynchronous grpc operations and a
chromium-style MessageLoop.

BUG=chromium:869377
CQ-DEPEND=CL:1174265
TEST=cros_workon_make --board=${BOARD} diagnostics --test

Change-Id: Ica8641060bf5d2f297665e4e6b70c5f160de46e1
Reviewed-on: https://chromium-review.googlesource.com/1174263
Commit-Ready: Pavol Marko <pmarko@chromium.org>
Tested-by: Pavol Marko <pmarko@chromium.org>
Reviewed-by: Maksim Ivanov <emaxx@chromium.org>
diff --git a/diagnostics/diagnostics.gyp b/diagnostics/diagnostics.gyp
index f2d14d9..41ae827 100644
--- a/diagnostics/diagnostics.gyp
+++ b/diagnostics/diagnostics.gyp
@@ -1,8 +1,34 @@
 {
   'targets': [
     {
+      'target_name': 'libgrpc_async_adapter',
+      'type': 'static_library',
+      'variables': {
+        'exported_deps': [
+          'grpc++',
+          'libchrome-<(libbase_ver)',
+        ],
+        'deps': [
+          '<@(exported_deps)',
+        ],
+      },
+      'all_dependent_settings': {
+        'variables': {
+          'deps': [
+            '<@(exported_deps)',
+          ],
+        },
+      },
+      'sources': [
+        'grpc_async_adapter/grpc_completion_queue_dispatcher.cc',
+      ],
+    },
+    {
       'target_name': 'diagnosticsd',
       'type': 'executable',
+      'dependencies': [
+        'libgrpc_async_adapter',
+      ],
       'includes': ['mojom_generator.gypi'],
       'variables': {
         'deps': [
@@ -19,6 +45,29 @@
     ['USE_test == 1', {
       'targets': [
         {
+          'target_name': 'libgrpc_async_adapter_test',
+          'type': 'executable',
+          'includes': ['../common-mk/common_test.gypi'],
+          'dependencies': [
+            '../common-mk/testrunner.gyp:testrunner',
+            'libgrpc_async_adapter',
+          ],
+          'sources': [
+            'grpc_async_adapter/grpc_completion_queue_dispatcher_test.cc',
+          ],
+          'variables': {
+            'deps': [
+              'libchrome-<(libbase_ver)',
+              'libchrome-test-<(libbase_ver)',
+            ],
+          },
+          'link_settings': {
+            'libraries': [
+              '-lgpr',
+            ],
+          },
+        },
+        {
           'target_name': 'diagnosticsd_test',
           'type': 'executable',
           'includes': ['../common-mk/common_test.gypi'],
diff --git a/diagnostics/grpc_async_adapter/grpc_completion_queue_dispatcher.cc b/diagnostics/grpc_async_adapter/grpc_completion_queue_dispatcher.cc
new file mode 100644
index 0000000..b4a76a9
--- /dev/null
+++ b/diagnostics/grpc_async_adapter/grpc_completion_queue_dispatcher.cc
@@ -0,0 +1,161 @@
+// Copyright 2018 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "diagnostics/grpc_async_adapter/grpc_completion_queue_dispatcher.h"
+
+#include <utility>
+
+#include <base/bind.h>
+#include <base/location.h>
+#include <base/logging.h>
+#include <base/sequenced_task_runner.h>
+#include <grpc++/grpc++.h>
+
+namespace diagnostics {
+namespace internal {
+
+// This is the background ("monitoring") thread delegate used by
+// |GrpcCompletionQueueDispatcher|.
+class MonitoringThreadDelegate : public base::DelegateSimpleThread::Delegate {
+ public:
+  using OnTagAvailableCallback = base::Callback<void(const void* tag, bool ok)>;
+  using OnShutdownCallback = base::Closure;
+
+  // |GrpcCompletionQueueDispatcher| guarantees that the unowned pointers
+  // outlive this delegate. This delegate will post |on_tag_available_callback|
+  // on the |task_runner| when a tag is available on |completion_queue|, and it
+  // will post |on_shutdown_callback| on the |task_runner| when it is shutting
+  // down.
+  MonitoringThreadDelegate(grpc::CompletionQueue* completion_queue,
+                           base::SequencedTaskRunner* task_runner,
+                           OnTagAvailableCallback on_tag_available_callback,
+                           OnShutdownCallback on_shutdown_callback)
+      : completion_queue_(completion_queue),
+        task_runner_(task_runner),
+        on_tag_available_callback_(on_tag_available_callback),
+        on_shutdown_callback_(on_shutdown_callback) {}
+
+  ~MonitoringThreadDelegate() override = default;
+
+  // base::DelegateSimpleThread::Delegate:
+  void Run() override {
+    // This runs on the background thread. It monitors |completion_queue_| and
+    // posts task to |task_runner_|.
+    while (true) {
+      void* tag;
+      bool ok;
+
+      if (completion_queue_->Next(&tag, &ok)) {
+        task_runner_->PostTask(FROM_HERE,
+                               base::Bind(on_tag_available_callback_, tag, ok));
+      } else {
+        // Next() returned false, which means that this queue has shut down.
+        // Exit this 'event loop'.
+        break;
+      }
+    }
+
+    task_runner_->PostTask(FROM_HERE, on_shutdown_callback_);
+  }
+
+ private:
+  // The |CompletionQueue| that this object is monitoring.
+  // Not owned.
+  grpc::CompletionQueue* const completion_queue_;
+  // The |SequencedTaskRunner| this object is posting tasks to. It is accessed
+  // from the monitoring thread.
+  // Not owned.
+  base::SequencedTaskRunner* const task_runner_;
+
+  OnTagAvailableCallback on_tag_available_callback_;
+  OnShutdownCallback on_shutdown_callback_;
+};
+
+}  // namespace internal
+
+GrpcCompletionQueueDispatcher::GrpcCompletionQueueDispatcher(
+    grpc::CompletionQueue* completion_queue,
+    scoped_refptr<base::SequencedTaskRunner> task_runner)
+    : completion_queue_(completion_queue), task_runner_(task_runner) {
+  CHECK(task_runner_);
+  CHECK(completion_queue_);
+}
+
+GrpcCompletionQueueDispatcher::~GrpcCompletionQueueDispatcher() {
+  CHECK(sequence_checker_.CalledOnValidSequencedThread());
+  // Ensure that this |GrpcCompletionQueueDispatcher| has been shut down
+  // properly.
+  CHECK(!monitoring_thread_);
+  CHECK(tag_to_callback_map_.empty());
+}
+
+void GrpcCompletionQueueDispatcher::Start() {
+  CHECK(sequence_checker_.CalledOnValidSequencedThread());
+  CHECK(!monitoring_thread_);
+  // Create the delegate which will be run on the background thread.
+  // It is OK to pass unowned pointers and use |base::Unretained|  because:
+  // - |GrpcCompletionQueueDispatcher| CHECK-fails if it's destroyed
+  //   before |OnShutdown| has been called
+  // - |task_runner_| is a |SequencedTaskRunner|
+  // - |OnShutdown| is posted as the last thing before the background thread
+  // exits.
+  monitoring_thread_delegate_ =
+      std::make_unique<internal::MonitoringThreadDelegate>(
+          completion_queue_, task_runner_.get(),
+          base::Bind(&GrpcCompletionQueueDispatcher::OnTagAvailable,
+                     base::Unretained(this)),
+          base::Bind(&GrpcCompletionQueueDispatcher::OnShutdown,
+                     base::Unretained(this)));
+  monitoring_thread_ = std::make_unique<base::DelegateSimpleThread>(
+      monitoring_thread_delegate_.get(),
+      "GrpcCompletionQueueDispatcher" /* name_prefix */);
+  monitoring_thread_->Start();
+}
+
+void GrpcCompletionQueueDispatcher::Shutdown(
+    base::Closure on_shutdown_callback) {
+  CHECK(sequence_checker_.CalledOnValidSequencedThread());
+  CHECK(monitoring_thread_);
+  CHECK(on_shutdown_callback_.is_null());
+  CHECK(!on_shutdown_callback.is_null());
+
+  on_shutdown_callback_ = on_shutdown_callback;
+  completion_queue_->Shutdown();
+}
+
+void GrpcCompletionQueueDispatcher::RegisterTag(const void* tag,
+                                                TagAvailableCallback callback) {
+  CHECK(sequence_checker_.CalledOnValidSequencedThread());
+  CHECK(tag_to_callback_map_.find(tag) == tag_to_callback_map_.end());
+  tag_to_callback_map_.insert(std::make_pair(tag, callback));
+}
+
+void GrpcCompletionQueueDispatcher::OnTagAvailable(const void* tag, bool ok) {
+  CHECK(sequence_checker_.CalledOnValidSequencedThread());
+  auto iter = tag_to_callback_map_.find(tag);
+  // Treat tags received from the |CompletionQueue| that we're not expecting as
+  // fatal errors.
+  CHECK(iter != tag_to_callback_map_.end());
+  TagAvailableCallback callback = iter->second;
+  tag_to_callback_map_.erase(iter);
+  callback.Run(ok);
+}
+
+void GrpcCompletionQueueDispatcher::OnShutdown() {
+  CHECK(sequence_checker_.CalledOnValidSequencedThread());
+  tag_to_callback_map_.clear();
+
+  monitoring_thread_->Join();
+  monitoring_thread_.reset();
+  monitoring_thread_delegate_.reset();
+
+  if (!on_shutdown_callback_.is_null()) {
+    // Post the |on_shutdown_callback_| on the task runner instead of calling it
+    // directly, allowing the owner of this instance to delete it in the context
+    // of |on_shutdown_callback_|.
+    task_runner_->PostTask(FROM_HERE, on_shutdown_callback_);
+  }
+}
+
+}  // namespace diagnostics
diff --git a/diagnostics/grpc_async_adapter/grpc_completion_queue_dispatcher.h b/diagnostics/grpc_async_adapter/grpc_completion_queue_dispatcher.h
new file mode 100644
index 0000000..d618226
--- /dev/null
+++ b/diagnostics/grpc_async_adapter/grpc_completion_queue_dispatcher.h
@@ -0,0 +1,119 @@
+// Copyright 2018 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef DIAGNOSTICS_GRPC_ASYNC_ADAPTER_GRPC_COMPLETION_QUEUE_DISPATCHER_H_
+#define DIAGNOSTICS_GRPC_ASYNC_ADAPTER_GRPC_COMPLETION_QUEUE_DISPATCHER_H_
+
+#include <map>
+#include <memory>
+
+#include <base/callback.h>
+#include <base/macros.h>
+#include <base/memory/ref_counted.h>
+#include <base/sequence_checker.h>
+#include <base/threading/simple_thread.h>
+
+namespace base {
+class SequencedTaskRunner;
+}
+
+namespace grpc {
+class CompletionQueue;
+}
+
+namespace diagnostics {
+namespace internal {
+class MonitoringThreadDelegate;
+}
+
+// A GrpcCompletionQueueDispatcher monitors a grpc |CompletionQueue| for
+// signalled events and posts tasks to a |SequencedTaskRunner| as a result.
+// It uses a background thread to block on the |CompletionQueue|'s next event in
+// a loop because this is a blocking operation.
+// The |GrpcCompletionQueueDispatcher| may be constructed from anywhere, but its
+// public methods should only be called on the same task runner that was passed
+// to its constructor.
+// This class assumes that every tag that is received from the |CompletionQueue|
+// is expected, i.e. that |RegisterTag| has been called for every tag.
+class GrpcCompletionQueueDispatcher {
+ public:
+  // Callbacks of this type will be called on the task runner passed
+  // to the constructor when an expected event is available on the monitored
+  // |CompletionQueue|. |ok| has an operation-specific meaning, see grpc's
+  // |CompletionQueue::Next| documentation for details.
+  using TagAvailableCallback = base::Callback<void(bool ok)>;
+
+  // The constructed object will monitor |completion_queue| and post tasks to
+  // |task_runner|. Note that the |GrpcCompletionQueueDispatcher| only
+  // starts monitoring the |completion_queue| when |Start| is called.
+  // |completion_queue| should outlive this object.
+  GrpcCompletionQueueDispatcher(
+      grpc::CompletionQueue* completion_queue,
+      scoped_refptr<base::SequencedTaskRunner> task_runner);
+
+  // Note that the destructor CHECKs that this instance has been shut down
+  // properly using |Shutdown|.
+  ~GrpcCompletionQueueDispatcher();
+
+  // Starts the background thread and consequently starts monitoring the
+  // |CompletionQueue| passed to the constructor.
+  void Start();
+
+  // Triggers shutting down the |CompletionQueue| and this
+  // |GrpcCompletionQueueDispatcher|.
+  // |on_shutdown_callback| will be called when the |CompletionQueue| is fully
+  // drained and background thread has shut down. Only then may this instance be
+  // destroyed.
+  void Shutdown(base::Closure on_shutdown_callback);
+
+  // Starts waiting for an event with |tag|. If |tag| has been or will be sent
+  // (through RPC operations or alarms) to the CompletionQueue, |callback| is
+  // guaranteed to be called exactly once on the task runner  passed to the
+  // constructor. The reason is that the CompletionQueue itself guarantees that
+  // every tag sent to the completion queue will be delivered out of the
+  // completion queue. |GrpcCompletionQueueDispatcher| additionally guarantees
+  // that if |callback| is never called (because |tag| was not sent to the
+  // |CompletionQueue|), |callback| will be destroyed on shutdown on the
+  // |TaskRunner| passed to the constructor.
+  void RegisterTag(const void* tag, TagAvailableCallback callback);
+
+  // Returns the monitored |CompletionQueue|.
+  grpc::CompletionQueue* completion_queue() { return completion_queue_; }
+
+ private:
+  // This is called on the |task_runner_| when |tag| has been delivered out of
+  // the |completion_queue_|.
+  void OnTagAvailable(const void* tag, bool ok);
+
+  // This is called on the |task_runner_| when the background thread is shutting
+  // down because the |completion_queue_| has no more events.
+  void OnShutdown();
+
+  // The |CompletionQueue| that this object is monitoring.
+  // Not owned.
+  grpc::CompletionQueue* const completion_queue_;
+  // The |SequencedTaskRunner| this object is posting tasks to.
+  scoped_refptr<base::SequencedTaskRunner> task_runner_;
+
+  // The delegate which will be executed on the |monitoring_thread|.
+  std::unique_ptr<internal::MonitoringThreadDelegate>
+      monitoring_thread_delegate_;
+  // The background thread monitoring the |completion_queue_| and posting tasks
+  // back on the task runner.
+  std::unique_ptr<base::DelegateSimpleThread> monitoring_thread_;
+
+  // This callback will be invoked when the moniting thread is exiting.
+  base::Closure on_shutdown_callback_;
+  // Maps tags to the callbacks that should be run on the |task_runner_| when
+  // the corresponding event fires.
+  std::map<const void*, TagAvailableCallback> tag_to_callback_map_;
+
+  base::SequenceChecker sequence_checker_;
+
+  DISALLOW_COPY_AND_ASSIGN(GrpcCompletionQueueDispatcher);
+};
+
+}  // namespace diagnostics
+
+#endif  // DIAGNOSTICS_GRPC_ASYNC_ADAPTER_GRPC_COMPLETION_QUEUE_DISPATCHER_H_
diff --git a/diagnostics/grpc_async_adapter/grpc_completion_queue_dispatcher_test.cc b/diagnostics/grpc_async_adapter/grpc_completion_queue_dispatcher_test.cc
new file mode 100644
index 0000000..be73e6f
--- /dev/null
+++ b/diagnostics/grpc_async_adapter/grpc_completion_queue_dispatcher_test.cc
@@ -0,0 +1,261 @@
+// Copyright 2018 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "diagnostics/grpc_async_adapter/grpc_completion_queue_dispatcher.h"
+
+#include <list>
+#include <memory>
+
+#include <base/bind.h>
+#include <base/callback.h>
+#include <base/location.h>
+#include <base/macros.h>
+#include <base/message_loop/message_loop.h>
+#include <base/run_loop.h>
+#include <base/task_runner.h>
+#include <base/time/time.h>
+#include <gmock/gmock.h>
+#include <grpc++/alarm.h>
+#include <grpc++/grpc++.h>
+#include <gtest/gtest.h>
+
+namespace diagnostics {
+
+namespace {
+
+// Allows testing if a callback has been invoked, and the value of the
+// grpc-specific |ok| bool parameter.
+class TagAvailableCalledTester {
+ public:
+  TagAvailableCalledTester() = default;
+  ~TagAvailableCalledTester() = default;
+
+  GrpcCompletionQueueDispatcher::TagAvailableCallback
+  GetTagAvailableCallback() {
+    return base::Bind(&TagAvailableCalledTester::Callback,
+                      base::Unretained(this));
+  }
+
+  // Bind this to a RegisterTag call of the
+  // |GrpcCompletionQueueDispatcher|. Will check that it is invoked at
+  // most once, remember the value of |ok|, and call the closure passed to
+  // |CallWhenInvoked|, if any.
+  void Callback(bool ok) {
+    CHECK(!has_been_called_);
+    has_been_called_ = true;
+    value_of_ok_ = ok;
+
+    std::list<base::Closure> callbacks_temp;
+    callbacks_temp.swap(call_when_invoked_);
+    for (auto& callback : callbacks_temp)
+      callback.Run();
+  }
+
+  // Register |call_when_invoked| to be called when |Callback| is called.
+  void CallWhenInvoked(base::Closure call_when_invoked) {
+    call_when_invoked_.push_back(call_when_invoked);
+  }
+
+  // Returns true if |Callback| has been called.
+  bool has_been_called() const { return has_been_called_; }
+
+  // Only call if |has_been_called()| is returning true. Returns the value of
+  // |ok| passed to |Callback|.
+  bool value_of_ok() const {
+    CHECK(has_been_called());
+    return value_of_ok_;
+  }
+
+ private:
+  bool has_been_called_ = false;
+  bool value_of_ok_ = false;
+  std::list<base::Closure> call_when_invoked_;
+
+  DISALLOW_COPY_AND_ASSIGN(TagAvailableCalledTester);
+};
+
+// Allows testing if an object (owned by callback) has been destroyed. Also
+// tests that this is destroyed on the same message loop it has been
+// instantiated on.
+class ObjectDestroyedTester {
+ public:
+  // Will set |*has_been_destroyed| to true when this instance is being
+  // destroyed.
+  explicit ObjectDestroyedTester(bool* has_been_destroyed)
+      : expected_message_loop_(base::MessageLoop::current()),
+        has_been_destroyed_(has_been_destroyed) {
+    *has_been_destroyed_ = false;
+  }
+
+  ~ObjectDestroyedTester() {
+    EXPECT_TRUE(
+        expected_message_loop_->task_runner()->RunsTasksOnCurrentThread());
+    *has_been_destroyed_ = true;
+  }
+
+ private:
+  base::MessageLoop* const expected_message_loop_;
+  bool* const has_been_destroyed_;
+
+  DISALLOW_COPY_AND_ASSIGN(ObjectDestroyedTester);
+};
+
+// An adapter to be able to give a Callback to
+// |GrpcCompletionQueueDispatcher::RegisterTag| which owns an
+// |ObjectDestroyedTester|.
+void ObjectDestroyedTesterAdapter(
+    TagAvailableCalledTester* tag_available_called_tester,
+    std::unique_ptr<ObjectDestroyedTester> object_destroyed_tester,
+    bool ok) {
+  tag_available_called_tester->Callback(ok);
+}
+
+gpr_timespec GprTimespecWithDeltaFromNow(base::TimeDelta delta) {
+  return gpr_time_add(
+      gpr_now(GPR_CLOCK_MONOTONIC),
+      gpr_time_from_millis(delta.InMilliseconds(), GPR_TIMESPAN));
+}
+
+}  // namespace
+
+class GrpcCompletionQueueDispatcherTest : public ::testing::Test {
+ public:
+  GrpcCompletionQueueDispatcherTest()
+      : dispatcher_(&completion_queue_,
+                    base::MessageLoop::current()->task_runner()) {
+    dispatcher_.Start();
+  }
+
+  ~GrpcCompletionQueueDispatcherTest() override = default;
+
+ protected:
+  base::MessageLoopForIO message_loop_;
+  grpc::CompletionQueue completion_queue_;
+
+  // The dispatcher under test.
+  GrpcCompletionQueueDispatcher dispatcher_;
+
+  // Note: This can't be |const void*| because gRPC functions expect |void*|.
+  void* const kTag = reinterpret_cast<void*>(1);
+
+  void ShutdownDispatcher() {
+    base::RunLoop run_loop;
+    dispatcher_.Shutdown(run_loop.QuitClosure());
+    run_loop.Run();
+  }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(GrpcCompletionQueueDispatcherTest);
+};
+
+// Start and shutdown a dispatcher, with no tags posted to the underlying
+// CompletionQueue.
+TEST_F(GrpcCompletionQueueDispatcherTest, StartAndShutdownEmpty) {
+  ShutdownDispatcher();
+}
+
+// Register a tag that is not passed to the CompletionQueue. Check that the
+// callback is never called, but that it is properly destroyed. This also
+// demonstrates that instances passed to the callback using base::Passed are
+// properly destroyed in this case.
+TEST_F(GrpcCompletionQueueDispatcherTest, TagNeverAvailable) {
+  bool object_has_been_destroyed = false;
+  auto object_destroyed_tester =
+      std::make_unique<ObjectDestroyedTester>(&object_has_been_destroyed);
+
+  TagAvailableCalledTester tag_available_called_tester;
+  dispatcher_.RegisterTag(
+      nullptr,
+      base::Bind(&ObjectDestroyedTesterAdapter, &tag_available_called_tester,
+                 base::Passed(&object_destroyed_tester)));
+
+  ShutdownDispatcher();
+
+  EXPECT_FALSE(tag_available_called_tester.has_been_called());
+  EXPECT_TRUE(object_has_been_destroyed);
+}
+
+// Register a tag that becomes available with |ok=true|. Verify that the
+// registered callback is called with |ok=true|.
+TEST_F(GrpcCompletionQueueDispatcherTest,
+       CompletionQueueTagAvailableWithOkTrue) {
+  base::RunLoop run_loop;
+  TagAvailableCalledTester tag_available_called_tester;
+  tag_available_called_tester.CallWhenInvoked(run_loop.QuitClosure());
+
+  dispatcher_.RegisterTag(
+      kTag, tag_available_called_tester.GetTagAvailableCallback());
+
+  grpc::Alarm alarm(
+      &completion_queue_,
+      GprTimespecWithDeltaFromNow(base::TimeDelta::FromMilliseconds(1)), kTag);
+  run_loop.Run();
+
+  EXPECT_TRUE(tag_available_called_tester.has_been_called());
+  EXPECT_TRUE(tag_available_called_tester.value_of_ok());
+
+  ShutdownDispatcher();
+}
+
+// Register a tag that becomes available with |ok=false|. Verify that the
+// regitered callback is called with |ok=false|.
+TEST_F(GrpcCompletionQueueDispatcherTest,
+       CompletionQueueTagAvailableWithOkFalse) {
+  base::RunLoop run_loop;
+  TagAvailableCalledTester tag_available_called_tester;
+  tag_available_called_tester.CallWhenInvoked(run_loop.QuitClosure());
+
+  dispatcher_.RegisterTag(
+      kTag, tag_available_called_tester.GetTagAvailableCallback());
+
+  grpc::Alarm alarm(&completion_queue_,
+                    GprTimespecWithDeltaFromNow(base::TimeDelta::FromHours(24)),
+                    kTag);
+  alarm.Cancel();
+  run_loop.Run();
+
+  EXPECT_TRUE(tag_available_called_tester.has_been_called());
+  EXPECT_FALSE(tag_available_called_tester.value_of_ok());
+
+  ShutdownDispatcher();
+}
+
+// Re-register a tag that becomes available in the context of the tag's
+// callback.
+TEST_F(GrpcCompletionQueueDispatcherTest, ReregisterTag) {
+  base::RunLoop run_loop_1;
+  TagAvailableCalledTester tag_available_called_tester_1;
+  base::RunLoop run_loop_2;
+  TagAvailableCalledTester tag_available_called_tester_2;
+
+  dispatcher_.RegisterTag(
+      kTag, tag_available_called_tester_1.GetTagAvailableCallback());
+  auto reregister_tag_callback =
+      base::Bind(&GrpcCompletionQueueDispatcher::RegisterTag,
+                 base::Unretained(&dispatcher_), kTag,
+                 tag_available_called_tester_2.GetTagAvailableCallback());
+  tag_available_called_tester_1.CallWhenInvoked(reregister_tag_callback);
+  tag_available_called_tester_1.CallWhenInvoked(run_loop_1.QuitClosure());
+
+  tag_available_called_tester_2.CallWhenInvoked(run_loop_2.QuitClosure());
+
+  grpc::Alarm alarm_1(
+      &completion_queue_,
+      GprTimespecWithDeltaFromNow(base::TimeDelta::FromMilliseconds(1)), kTag);
+  run_loop_1.Run();
+
+  grpc::Alarm alarm_2(
+      &completion_queue_,
+      GprTimespecWithDeltaFromNow(base::TimeDelta::FromMilliseconds(1)), kTag);
+  run_loop_2.Run();
+
+  EXPECT_TRUE(tag_available_called_tester_1.has_been_called());
+  EXPECT_TRUE(tag_available_called_tester_1.value_of_ok());
+  EXPECT_TRUE(tag_available_called_tester_2.has_been_called());
+  EXPECT_TRUE(tag_available_called_tester_2.value_of_ok());
+
+  ShutdownDispatcher();
+}
+
+}  // namespace diagnostics