| // Copyright 2023 The ChromiumOS Authors |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "secagentd/batch_sender.h" |
| |
| #include <string> |
| |
| #include "base/functional/bind.h" |
| #include "base/functional/callback_forward.h" |
| #include "base/memory/scoped_refptr.h" |
| #include "base/strings/stringprintf.h" |
| #include "base/test/bind.h" |
| #include "base/test/task_environment.h" |
| #include "gmock/gmock.h" |
| #include "gtest/gtest.h" |
| #include "secagentd/proto/security_xdr_events.pb.h" |
| #include "secagentd/test/mock_message_sender.h" |
| |
| namespace secagentd::testing { |
| |
| namespace pb = cros_xdr::reporting; |
| using ::testing::_; |
| using ::testing::Eq; |
| using ::testing::Invoke; |
| using ::testing::Mock; |
| using ::testing::NiceMock; |
| using ::testing::StrictMock; |
| using ::testing::WithArg; |
| using ::testing::WithArgs; |
| |
| class BatchSenderTestFixture : public ::testing::Test { |
| protected: |
| // KeyType type. |
| using KT = std::string; |
| // XdrMessage type. |
| using XM = pb::XdrProcessEvent; |
| // AtomicVariantMessage type. |
| using AVM = pb::ProcessEventAtomicVariant; |
| using BatchSenderType = BatchSender<KT, XM, AVM>; |
| |
| static constexpr auto kDestination = |
| reporting::Destination::CROS_SECURITY_PROCESS; |
| static constexpr uint32_t kBatchInterval = 10; |
| |
| static std::string GetProcessEventKey( |
| const pb::ProcessEventAtomicVariant& process_event) { |
| switch (process_event.variant_type_case()) { |
| case AVM::kProcessExec: |
| return process_event.process_exec().spawn_process().process_uuid(); |
| case AVM::kProcessTerminate: |
| return process_event.process_terminate().process().process_uuid(); |
| case AVM::VARIANT_TYPE_NOT_SET: |
| CHECK(false); |
| return ""; |
| } |
| } |
| |
| BatchSenderTestFixture() |
| : task_environment_(base::test::TaskEnvironment::TimeSource::MOCK_TIME), |
| message_sender_(base::MakeRefCounted<StrictMock<MockMessageSender>>()) { |
| } |
| |
| void SetUp() override { |
| batch_sender_ = std::make_unique<BatchSenderType>( |
| base::BindRepeating(&GetProcessEventKey), message_sender_, kDestination, |
| kBatchInterval); |
| batch_sender_->Start(); |
| expected_process_exec_1_.mutable_process_exec() |
| ->mutable_spawn_process() |
| ->set_process_uuid("uuid1"); |
| expected_process_exec_2_.mutable_process_exec() |
| ->mutable_spawn_process() |
| ->set_process_uuid("uuid2"); |
| expected_process_term_1_.mutable_process_terminate() |
| ->mutable_process() |
| ->set_process_uuid("uuid1"); |
| } |
| |
| base::test::TaskEnvironment task_environment_; |
| scoped_refptr<StrictMock<MockMessageSender>> message_sender_; |
| std::unique_ptr<BatchSenderType> batch_sender_; |
| AVM expected_process_exec_1_; |
| AVM expected_process_exec_2_; |
| AVM expected_process_term_1_; |
| }; |
| |
| TEST_F(BatchSenderTestFixture, TestSimpleBatchingPeriodicFlush) { |
| std::unique_ptr<google::protobuf::MessageLite> actual_sent_message; |
| pb::CommonEventDataFields* actual_mutable_common = nullptr; |
| EXPECT_CALL(*message_sender_, |
| SendMessage(Eq(BatchSenderTestFixture::kDestination), _, _, _)) |
| .WillRepeatedly( |
| [&actual_sent_message, &actual_mutable_common]( |
| auto d, pb::CommonEventDataFields* c, |
| std::unique_ptr<google::protobuf::MessageLite> m, |
| std::optional<reporting::ReportQueue::EnqueueCallback> cb) { |
| // SaveArgByMove unfortunately doesn't exist. |
| actual_sent_message = std::move(m); |
| actual_mutable_common = c; |
| return absl::OkStatus(); |
| }); |
| |
| auto process_event_1 = std::make_unique<BatchSenderTestFixture::AVM>(); |
| process_event_1->CopyFrom(expected_process_exec_1_); |
| batch_sender_->Enqueue(std::move(process_event_1)); |
| |
| auto process_event_2 = std::make_unique<BatchSenderTestFixture::AVM>(); |
| process_event_2->CopyFrom(expected_process_exec_2_); |
| batch_sender_->Enqueue(std::move(process_event_2)); |
| |
| task_environment_.AdvanceClock(base::Seconds(kBatchInterval)); |
| task_environment_.RunUntilIdle(); |
| |
| BatchSenderTestFixture::XM* actual_process_event = |
| google::protobuf::internal::DownCast<pb::XdrProcessEvent*>( |
| actual_sent_message.get()); |
| EXPECT_EQ(actual_process_event->mutable_common(), actual_mutable_common); |
| ASSERT_EQ(2, actual_process_event->batched_events_size()); |
| EXPECT_TRUE(actual_process_event->batched_events(0) |
| .common() |
| .has_create_timestamp_us()); |
| EXPECT_EQ( |
| expected_process_exec_1_.process_exec().spawn_process().process_uuid(), |
| actual_process_event->batched_events(0) |
| .process_exec() |
| .spawn_process() |
| .process_uuid()); |
| EXPECT_TRUE(actual_process_event->batched_events(1) |
| .common() |
| .has_create_timestamp_us()); |
| EXPECT_EQ( |
| expected_process_exec_2_.process_exec().spawn_process().process_uuid(), |
| actual_process_event->batched_events(1) |
| .process_exec() |
| .spawn_process() |
| .process_uuid()); |
| |
| auto process_event_3 = std::make_unique<BatchSenderTestFixture::AVM>(); |
| process_event_3->CopyFrom(expected_process_term_1_); |
| batch_sender_->Enqueue(std::move(process_event_3)); |
| |
| task_environment_.AdvanceClock(base::Seconds(kBatchInterval)); |
| task_environment_.RunUntilIdle(); |
| |
| actual_process_event = |
| google::protobuf::internal::DownCast<pb::XdrProcessEvent*>( |
| actual_sent_message.get()); |
| ASSERT_EQ(1, actual_process_event->batched_events_size()); |
| EXPECT_EQ( |
| expected_process_term_1_.process_exec().spawn_process().process_uuid(), |
| actual_process_event->batched_events(0) |
| .process_exec() |
| .spawn_process() |
| .process_uuid()); |
| } |
| |
| TEST_F(BatchSenderTestFixture, TestBatchingSizeLimit) { |
| std::vector<std::unique_ptr<google::protobuf::MessageLite>> |
| actual_sent_messages; |
| std::vector<pb::CommonEventDataFields*> actual_mutable_commons; |
| EXPECT_CALL(*message_sender_, |
| SendMessage(Eq(BatchSenderTestFixture::kDestination), _, _, _)) |
| .WillRepeatedly( |
| [&actual_sent_messages, &actual_mutable_commons]( |
| auto d, pb::CommonEventDataFields* c, |
| std::unique_ptr<google::protobuf::MessageLite> m, |
| std::optional<reporting::ReportQueue::EnqueueCallback> cb) { |
| // SaveArgByMove unfortunately doesn't exist. |
| actual_sent_messages.emplace_back(std::move(m)); |
| actual_mutable_commons.push_back(c); |
| return absl::OkStatus(); |
| }); |
| |
| size_t est_batch_size = 0; |
| int sent_events = 0; |
| // Enqueue more than enough for the batches to be split. |
| while (est_batch_size < BatchSenderType::kMaxMessageSizeBytes * 2) { |
| auto process_event = std::make_unique<BatchSenderTestFixture::AVM>(); |
| process_event->CopyFrom(expected_process_exec_1_); |
| process_event->mutable_process_exec() |
| ->mutable_spawn_process() |
| ->set_process_uuid(base::StringPrintf("%s_%d", |
| process_event->process_exec() |
| .spawn_process() |
| .process_uuid() |
| .c_str(), |
| sent_events++)); |
| est_batch_size += process_event->ByteSizeLong(); |
| batch_sender_->Enqueue(std::move(process_event)); |
| } |
| |
| task_environment_.AdvanceClock(base::Seconds(kBatchInterval)); |
| task_environment_.RunUntilIdle(); |
| |
| // Our math here is not perfect so tolerate a minor deviation. What we |
| // actually care about is that the batches were split at least once and that |
| // there weren't hundreds of batches created due to some internal glitch. |
| EXPECT_LE(2, actual_sent_messages.size()); |
| EXPECT_GE(5, actual_sent_messages.size()); |
| // Verify that all the sent messages disjointly account for all of the |
| // enqueued events. |
| std::set<std::string> sent_ids; |
| for (const auto& message : actual_sent_messages) { |
| EXPECT_GE(BatchSenderType::kMaxMessageSizeBytes, message->ByteSizeLong()); |
| auto actual_process_event = |
| google::protobuf::internal::DownCast<pb::XdrProcessEvent*>( |
| message.get()); |
| for (int i = 0; i < actual_process_event->batched_events_size(); ++i) { |
| auto id = GetProcessEventKey(actual_process_event->batched_events(i)); |
| CHECK_EQ(0, sent_ids.count(id)) << "Found dupe id " << id; |
| sent_ids.insert(id); |
| } |
| } |
| EXPECT_EQ(sent_events, sent_ids.size()); |
| } |
| |
| TEST_F(BatchSenderTestFixture, TestVisit) { |
| auto process_event_1 = std::make_unique<BatchSenderTestFixture::AVM>(); |
| process_event_1->CopyFrom(expected_process_exec_1_); |
| batch_sender_->Enqueue(std::move(process_event_1)); |
| |
| auto process_event_2 = std::make_unique<BatchSenderTestFixture::AVM>(); |
| process_event_2->CopyFrom(expected_process_exec_2_); |
| batch_sender_->Enqueue(std::move(process_event_2)); |
| |
| auto process_event_3 = std::make_unique<BatchSenderTestFixture::AVM>(); |
| process_event_3->CopyFrom(expected_process_term_1_); |
| batch_sender_->Enqueue(std::move(process_event_3)); |
| |
| ASSERT_EQ( |
| expected_process_exec_1_.process_exec().spawn_process().process_uuid(), |
| expected_process_term_1_.process_terminate().process().process_uuid()); |
| const auto& key = |
| expected_process_term_1_.process_terminate().process().process_uuid(); |
| bool cb1_run = false; |
| auto cb1 = base::BindLambdaForTesting([key, &cb1_run](AVM* process_event) { |
| EXPECT_TRUE(process_event->has_process_terminate()); |
| EXPECT_EQ(key, process_event->process_terminate().process().process_uuid()); |
| cb1_run = true; |
| return true; |
| }); |
| // Ask specifically for a terminate event and verify that Visit ignores the |
| // exec event with the same key. |
| EXPECT_TRUE( |
| batch_sender_->Visit(AVM::kProcessTerminate, key, std::move(cb1))); |
| EXPECT_TRUE(cb1_run); |
| |
| bool cb2_run = false; |
| auto cb2 = base::BindLambdaForTesting([&cb2_run](AVM* process_event) { |
| cb2_run = true; |
| return true; |
| }); |
| EXPECT_FALSE(batch_sender_->Visit(AVM::kProcessTerminate, |
| "Key does not exist", std::move(cb2))); |
| EXPECT_FALSE(cb2_run); |
| } |
| |
| TEST_F(BatchSenderTestFixture, TestVisitMostRecent) { |
| auto proc_exec_1 = std::make_unique<BatchSenderTestFixture::AVM>(); |
| auto proc_exec_1_ptr = proc_exec_1.get(); |
| proc_exec_1->CopyFrom(expected_process_exec_1_); |
| proc_exec_1->mutable_process_exec()->set_terminate_timestamp_us(1); |
| batch_sender_->Enqueue(std::move(proc_exec_1)); |
| |
| // Create a second process exec with the same UUID and different terminate |
| // timestamp to verify that the second one is updated. |
| // Note this should never happen in practice. |
| auto proc_exec_2 = std::make_unique<BatchSenderTestFixture::AVM>(); |
| auto proc_exec_2_ptr = proc_exec_2.get(); |
| proc_exec_2->CopyFrom(expected_process_exec_1_); |
| proc_exec_2->mutable_process_exec()->set_terminate_timestamp_us(5); |
| batch_sender_->Enqueue(std::move(proc_exec_2)); |
| |
| // Update the terminate timestamp to verify most recent event updated. |
| auto update_count_cb = |
| base::BindLambdaForTesting([](BatchSenderTestFixture::AVM* exec_event) { |
| exec_event->mutable_process_exec()->set_terminate_timestamp_us( |
| exec_event->process_exec().terminate_timestamp_us() + 1); |
| return true; |
| }); |
| EXPECT_TRUE(batch_sender_->Visit( |
| BatchSenderTestFixture::AVM::kProcessExec, |
| proc_exec_1_ptr->process_exec().spawn_process().process_uuid(), |
| std::move(update_count_cb))); |
| EXPECT_EQ(1, proc_exec_1_ptr->process_exec().terminate_timestamp_us()); |
| EXPECT_EQ(6, proc_exec_2_ptr->process_exec().terminate_timestamp_us()); |
| } |
| |
| } // namespace secagentd::testing |