blob: 6a96ffcb191a84b9eb05ef3589b0fd8a61a2b5e8 [file] [log] [blame]
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "federated/storage_manager_impl.h"
#include <cstddef>
#include <memory>
#include <base/files/file_path.h>
#include <base/files/file_util.h>
#include <base/logging.h>
#include <base/no_destructor.h>
#include <base/strings/stringprintf.h>
#include "federated/session_manager_proxy.h"
#include "federated/utils.h"
namespace federated {
void StorageManagerImpl::InitializeSessionManagerProxy(dbus::Bus* bus) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(!session_manager_proxy_)
<< "session_manager_proxy is already initialized!";
DCHECK(bus);
session_manager_proxy_ = std::make_unique<SessionManagerProxy>(
std::make_unique<org::chromium::SessionManagerInterfaceProxy>(bus));
session_manager_proxy_->AddObserver(this);
// If session already started, connect to database.
if (session_manager_proxy_->RetrieveSessionState() == kSessionStartedState) {
ConnectToDatabaseIfNecessary();
}
}
bool StorageManagerImpl::OnExampleReceived(
const std::string& client_name, const std::string& serialized_example) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!example_database_ || !example_database_->IsOpen()) {
VLOG(1) << "No database connection.";
return false;
}
ExampleRecord example_record;
example_record.client_name = client_name;
example_record.serialized_example = serialized_example;
example_record.timestamp = base::Time::Now();
return example_database_->InsertExample(example_record);
}
bool StorageManagerImpl::PrepareStreamingForClient(
const std::string& client_name) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!example_database_ || !example_database_->IsOpen()) {
LOG(ERROR) << "No database connection.";
return false;
}
last_seen_example_id_ = 0;
streaming_client_name_ = client_name;
return example_database_->PrepareStreamingForClient(
client_name, kMaxStreamingExampleCount);
}
bool StorageManagerImpl::GetNextExample(std::string* example,
bool* end_of_iterator) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!example_database_ || !example_database_->IsOpen()) {
VLOG(1) << "No database connection.";
return false;
}
*end_of_iterator = false;
auto maybe_example_record = example_database_->GetNextStreamedRecord();
if (maybe_example_record == base::nullopt) {
*end_of_iterator = true;
} else {
last_seen_example_id_ = maybe_example_record.value().id;
*example = maybe_example_record.value().serialized_example;
}
return true;
}
bool StorageManagerImpl::CloseStreaming(bool clean_examples) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!example_database_ || !example_database_->IsOpen()) {
VLOG(1) << "No database connection!";
return true;
}
example_database_->CloseStreaming();
return !clean_examples ||
example_database_->DeleteExamplesWithSmallerIdForClient(
streaming_client_name_, last_seen_example_id_);
}
void StorageManagerImpl::OnSessionStarted() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
ConnectToDatabaseIfNecessary();
}
void StorageManagerImpl::OnSessionStopped() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
example_database_.reset();
}
void StorageManagerImpl::ConnectToDatabaseIfNecessary() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
std::string new_sanitized_username =
session_manager_proxy_->GetSanitizedUsername();
if (new_sanitized_username.empty()) {
VLOG(1) << "Sanitized_username is empty, disconnect the database.";
example_database_.reset();
return;
}
if (example_database_ && example_database_->IsOpen() &&
new_sanitized_username == sanitized_username_) {
VLOG(1) << "Database for user " << sanitized_username_
<< " is already connected, nothing changed.";
return;
}
sanitized_username_ = new_sanitized_username;
auto db_path = GetDatabasePath(sanitized_username_);
// TODO(alanlxl): temp clients for test.
example_database_.reset(
new ExampleDatabase(db_path, {"analytics_test_population"}));
if (!example_database_->Init()) {
LOG(ERROR) << "Failed to connect to database for user "
<< sanitized_username_;
example_database_.reset();
} else if (!example_database_->CheckIntegrity()) {
LOG(ERROR) << "Failed to verify the database integrity for user "
<< sanitized_username_ << ", delete the existing db file.";
if (!base::DeleteFile(db_path)) {
LOG(ERROR) << "Failed to delete corrupted db file " << db_path.value();
}
example_database_.reset();
}
}
StorageManager* StorageManager::GetInstance() {
static base::NoDestructor<StorageManagerImpl> storage_manager;
return storage_manager.get();
}
} // namespace federated