| // Copyright 2021 The Chromium OS Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "missive/storage/storage_queue.h" |
| |
| #include <cstdint> |
| #include <initializer_list> |
| #include <utility> |
| #include <vector> |
| |
| #include <base/containers/flat_map.h> |
| #include <base/files/file_path.h> |
| #include <base/files/file_util.h> |
| #include <base/files/scoped_temp_dir.h> |
| #include <base/optional.h> |
| #include <base/strings/strcat.h> |
| #include <base/strings/string_number_conversions.h> |
| #include <base/synchronization/waitable_event.h> |
| #include <base/task/thread_pool.h> |
| #include <base/test/task_environment.h> |
| #include <crypto/sha2.h> |
| #include <gmock/gmock.h> |
| #include <gtest/gtest.h> |
| |
| #include "missive/encryption/scoped_encryption_feature.h" |
| #include "missive/encryption/test_encryption_module.h" |
| #include "missive/proto/record.pb.h" |
| #include "missive/storage/resources/resource_interface.h" |
| #include "missive/storage/storage_configuration.h" |
| #include "missive/util/status.h" |
| #include "missive/util/statusor.h" |
| #include "missive/util/test_support_callbacks.h" |
| |
| using ::testing::_; |
| using ::testing::Between; |
| using ::testing::Eq; |
| using ::testing::Invoke; |
| using ::testing::NotNull; |
| using ::testing::Return; |
| using ::testing::Sequence; |
| using ::testing::StrEq; |
| using ::testing::WithArg; |
| |
| namespace reporting { |
| namespace { |
| |
| constexpr size_t kTotalQueueStarts = 4; |
| constexpr size_t kTotalWritesPerStart = 16; |
| constexpr char kDataPrefix[] = "Rec"; |
| |
| class TestUploadClient : public UploaderInterface { |
| public: |
| // Mapping of <generation id, sequencing id> to matching record digest. |
| // Whenever a record is uploaded and includes last record digest, this map |
| // should have that digest already recorded. Only the first record in a |
| // generation is uploaded without last record digest. |
| using LastRecordDigestMap = base::flat_map< |
| std::pair<int64_t /*generation id */, int64_t /*sequencing id*/>, |
| base::Optional<std::string /*digest*/>>; |
| |
| explicit TestUploadClient(LastRecordDigestMap* last_record_digest_map) |
| : last_record_digest_map_(last_record_digest_map) {} |
| |
| void ProcessRecord(EncryptedRecord encrypted_record, |
| base::OnceCallback<void(bool)> processed_cb) override { |
| WrappedRecord wrapped_record; |
| ASSERT_TRUE(wrapped_record.ParseFromString( |
| encrypted_record.encrypted_wrapped_record())); |
| // Verify generation match. |
| const auto& sequencing_information = |
| encrypted_record.sequencing_information(); |
| if (!generation_id_.has_value()) { |
| generation_id_ = sequencing_information.generation_id(); |
| } else { |
| ASSERT_THAT(generation_id_.value(), |
| Eq(sequencing_information.generation_id())); |
| } |
| |
| // Verify digest and its match. |
| // Last record digest is not verified yet, since duplicate records are |
| // accepted in this test. |
| { |
| std::string serialized_record; |
| wrapped_record.record().SerializeToString(&serialized_record); |
| const auto record_digest = crypto::SHA256HashString(serialized_record); |
| DCHECK_EQ(record_digest.size(), crypto::kSHA256Length); |
| ASSERT_THAT(record_digest, Eq(wrapped_record.record_digest())); |
| // Store record digest for the next record in sequence to verify. |
| last_record_digest_map_->emplace( |
| std::make_pair(sequencing_information.sequencing_id(), |
| sequencing_information.generation_id()), |
| record_digest); |
| // If last record digest is present, match it and validate. |
| if (wrapped_record.has_last_record_digest()) { |
| auto it = last_record_digest_map_->find( |
| std::make_pair(sequencing_information.sequencing_id() - 1, |
| sequencing_information.generation_id())); |
| if (it != last_record_digest_map_->end() && it->second.has_value()) { |
| ASSERT_THAT(it->second.value(), |
| Eq(wrapped_record.last_record_digest())) |
| << "seq_id=" << sequencing_information.sequencing_id(); |
| } |
| } |
| } |
| |
| std::move(processed_cb).Run(true); |
| } |
| |
| void ProcessGap(SequencingInformation sequencing_information, |
| uint64_t count, |
| base::OnceCallback<void(bool)> processed_cb) override { |
| ASSERT_TRUE(false) << "There should be no gaps"; |
| } |
| |
| void Completed(Status status) override { ASSERT_OK(status); } |
| |
| private: |
| base::Optional<int64_t> generation_id_; |
| LastRecordDigestMap* const last_record_digest_map_; |
| |
| Sequence test_upload_sequence_; |
| }; |
| |
| class StorageQueueStressTest : public ::testing::TestWithParam<size_t> { |
| public: |
| void SetUp() override { |
| ASSERT_TRUE(location_.CreateUniqueTempDir()); |
| options_.set_directory(base::FilePath(location_.GetPath())) |
| .set_single_file_size(GetParam()); |
| } |
| |
| void TearDown() override { |
| ResetTestStorageQueue(); |
| // Make sure all memory is deallocated. |
| ASSERT_THAT(GetMemoryResource()->GetUsed(), Eq(0u)); |
| // Make sure all disk is not reserved (files remain, but Storage is not |
| // responsible for them anymore). |
| ASSERT_THAT(GetDiskResource()->GetUsed(), Eq(0u)); |
| } |
| |
| void CreateTestStorageQueueOrDie(const QueueOptions& options) { |
| ASSERT_FALSE(storage_queue_) << "StorageQueue already assigned"; |
| test_encryption_module_ = |
| base::MakeRefCounted<test::TestEncryptionModule>(); |
| test::TestEvent<Status> key_update_event; |
| test_encryption_module_->UpdateAsymmetricKey("DUMMY KEY", 0, |
| key_update_event.cb()); |
| ASSERT_OK(key_update_event.result()); |
| test::TestEvent<StatusOr<scoped_refptr<StorageQueue>>> |
| storage_queue_create_event; |
| StorageQueue::Create( |
| options, |
| base::BindRepeating(&StorageQueueStressTest::AsyncStartTestUploader, |
| base::Unretained(this)), |
| test_encryption_module_, storage_queue_create_event.cb()); |
| StatusOr<scoped_refptr<StorageQueue>> storage_queue_result = |
| storage_queue_create_event.result(); |
| ASSERT_OK(storage_queue_result) << "Failed to create StorageQueue, error=" |
| << storage_queue_result.status(); |
| storage_queue_ = std::move(storage_queue_result.ValueOrDie()); |
| } |
| |
| void ResetTestStorageQueue() { |
| // Let everything ongoing to finish. |
| task_environment_.RunUntilIdle(); |
| storage_queue_.reset(); |
| // StorageQueue is destructed on a thread, |
| // so we need to wait for it to destruct. |
| task_environment_.RunUntilIdle(); |
| } |
| |
| QueueOptions BuildStorageQueueOptionsImmediate() const { |
| return QueueOptions(options_).set_subdirectory("D1").set_file_prefix( |
| "F0001"); |
| } |
| |
| QueueOptions BuildStorageQueueOptionsPeriodic( |
| base::TimeDelta upload_period = base::TimeDelta::FromSeconds(1)) const { |
| return BuildStorageQueueOptionsImmediate().set_upload_period(upload_period); |
| } |
| |
| QueueOptions BuildStorageQueueOptionsOnlyManual() const { |
| return BuildStorageQueueOptionsPeriodic(base::TimeDelta::Max()); |
| } |
| |
| void AsyncStartTestUploader( |
| UploaderInterface::UploaderInterfaceResultCb start_uploader_cb) { |
| std::move(start_uploader_cb) |
| .Run(std::make_unique<TestUploadClient>(&last_record_digest_map_)); |
| } |
| |
| void WriteStringAsync(base::StringPiece data, |
| base::OnceCallback<void(Status)> cb) { |
| EXPECT_TRUE(storage_queue_) << "StorageQueue not created yet"; |
| Record record; |
| record.mutable_data()->assign(data.data(), data.size()); |
| record.set_destination(UPLOAD_EVENTS); |
| record.set_dm_token("DM TOKEN"); |
| storage_queue_->Write(std::move(record), std::move(cb)); |
| } |
| |
| base::test::TaskEnvironment task_environment_{ |
| base::test::TaskEnvironment::TimeSource::MOCK_TIME}; |
| |
| test::ScopedEncryptionFeature encryption_feature_{/*enable=*/true}; |
| base::ScopedTempDir location_; |
| StorageOptions options_; |
| scoped_refptr<test::TestEncryptionModule> test_encryption_module_; |
| scoped_refptr<StorageQueue> storage_queue_; |
| |
| // Test-wide global mapping of <generation id, sequencing id> to record |
| // digest. Serves all TestUploadClients created by test fixture. |
| TestUploadClient::LastRecordDigestMap last_record_digest_map_; |
| }; |
| |
| TEST_P(StorageQueueStressTest, |
| WriteIntoNewStorageQueueReopenWriteMoreAndUpload) { |
| for (size_t iStart = 0; iStart < kTotalQueueStarts; ++iStart) { |
| test::TestCallbackWaiter write_waiter; |
| base::RepeatingCallback<void(Status)> cb = base::BindRepeating( |
| [](test::TestCallbackWaiter* waiter, Status status) { |
| EXPECT_OK(status); |
| waiter->Signal(); |
| }, |
| &write_waiter); |
| |
| SCOPED_TRACE(base::StrCat({"Create ", base::NumberToString(iStart)})); |
| CreateTestStorageQueueOrDie(BuildStorageQueueOptionsOnlyManual()); |
| |
| // Write into the queue at random order (simultaneously). |
| SCOPED_TRACE(base::StrCat({"Write ", base::NumberToString(iStart)})); |
| const std::string rec_prefix = |
| base::StrCat({kDataPrefix, base::NumberToString(iStart), "_"}); |
| for (size_t iRec = 0; iRec < kTotalWritesPerStart; ++iRec) { |
| write_waiter.Attach(); |
| base::ThreadPool::PostTask( |
| FROM_HERE, {base::TaskPriority::BEST_EFFORT}, |
| base::BindOnce( |
| [](base::StringPiece rec_prefix, size_t iRec, |
| StorageQueueStressTest* test, |
| base::RepeatingCallback<void(Status)> cb) { |
| test->WriteStringAsync( |
| base::StrCat({rec_prefix, base::NumberToString(iRec)}), cb); |
| }, |
| rec_prefix, iRec, this, cb)); |
| } |
| write_waiter.Wait(); |
| |
| SCOPED_TRACE(base::StrCat({"Upload ", base::NumberToString(iStart)})); |
| storage_queue_->Flush(); |
| |
| SCOPED_TRACE(base::StrCat({"Reset ", base::NumberToString(iStart)})); |
| ResetTestStorageQueue(); |
| EXPECT_THAT(last_record_digest_map_.size(), |
| Eq((iStart + 1) * kTotalWritesPerStart)); |
| |
| SCOPED_TRACE(base::StrCat({"Done ", base::NumberToString(iStart)})); |
| } |
| } |
| |
| INSTANTIATE_TEST_SUITE_P( |
| VaryingFileSize, |
| StorageQueueStressTest, |
| testing::Values(1 * 1024LL, 2 * 1024LL, 3 * 1024LL, 4 * 1024LL)); |
| |
| } // namespace |
| } // namespace reporting |