| // Copyright 2022 The ChromiumOS Authors |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "missive/client/missive_client.h" |
| |
| #include <memory> |
| #include <string> |
| #include <utility> |
| |
| #include <base/bind.h> |
| #include <base/callback.h> |
| #include <base/callback_helpers.h> |
| #include <base/logging.h> |
| #include <base/memory/weak_ptr.h> |
| #include <base/sequence_checker.h> |
| #include <base/task/sequenced_task_runner.h> |
| #include <dbus/bus.h> |
| #include <dbus/message.h> |
| #include <dbus/object_proxy.h> |
| |
| #include "missive/client/missive_dbus_constants.h" |
| #include "missive/proto/interface.pb.h" |
| #include "missive/proto/record.pb.h" |
| #include "missive/proto/record_constants.pb.h" |
| #include "missive/storage/missive_storage_module.h" |
| #include "missive/storage/missive_storage_module_delegate_impl.h" |
| #include "missive/util/disconnectable_client.h" |
| #include "missive/util/status.h" |
| #include "missive/util/statusor.h" |
| |
| namespace reporting { |
| namespace { |
| |
| MissiveClient* g_instance = nullptr; |
| |
| class MissiveClientImpl : public MissiveClient { |
| public: |
| MissiveClientImpl() : client_(origin_task_runner()) {} |
| MissiveClientImpl(const MissiveClientImpl& other) = delete; |
| MissiveClientImpl& operator=(const MissiveClientImpl& other) = delete; |
| ~MissiveClientImpl() override = default; |
| |
| void Init(dbus::Bus* const bus) { |
| DCHECK(bus); |
| origin_task_runner_ = bus->GetOriginTaskRunner(); |
| |
| DCHECK(!missive_service_proxy_); |
| missive_service_proxy_ = |
| bus->GetObjectProxy(missive::kMissiveServiceName, |
| dbus::ObjectPath(missive::kMissiveServicePath)); |
| missive_service_proxy_->SetNameOwnerChangedCallback(base::BindRepeating( |
| &MissiveClientImpl::OwnerChanged, weak_ptr_factory_.GetWeakPtr())); |
| missive_service_proxy_->WaitForServiceToBeAvailable(base::BindOnce( |
| &MissiveClientImpl::ServerAvailable, weak_ptr_factory_.GetWeakPtr())); |
| } |
| |
| void EnqueueRecord( |
| const Priority priority, |
| Record record, |
| base::OnceCallback<void(Status)> completion_callback) override { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(origin_checker_); |
| auto delegate = std::make_unique<EnqueueRecordDelegate>( |
| priority, std::move(record), this, std::move(completion_callback)); |
| client_.MaybeMakeCall(std::move(delegate)); |
| } |
| |
| void Flush(const Priority priority, |
| base::OnceCallback<void(Status)> completion_callback) override { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(origin_checker_); |
| auto delegate = std::make_unique<FlushDelegate>( |
| priority, this, std::move(completion_callback)); |
| client_.MaybeMakeCall(std::move(delegate)); |
| } |
| |
| void UpdateEncryptionKey( |
| const SignedEncryptionInfo& encryption_info) override { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(origin_checker_); |
| auto delegate = |
| std::make_unique<UpdateEncryptionKeyDelegate>(encryption_info, this); |
| client_.MaybeMakeCall(std::move(delegate)); |
| } |
| |
| void ReportSuccess(const SequenceInformation& sequence_information, |
| bool force_confirm) override { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(origin_checker_); |
| auto delegate = std::make_unique<ReportSuccessDelegate>( |
| sequence_information, force_confirm, this); |
| client_.MaybeMakeCall(std::move(delegate)); |
| } |
| |
| MissiveClient::TestInterface* GetTestInterface() override { return nullptr; } |
| |
| base::WeakPtr<MissiveClient> GetWeakPtr() override { |
| return weak_ptr_factory_.GetWeakPtr(); |
| } |
| |
| private: |
| // Class implements DisconnectableClient::Delegate specifically for dBus |
| // calls. Logic that handles dBus connect/disconnect cases remains with the |
| // base class. |
| class DBusDelegate : public DisconnectableClient::Delegate { |
| public: |
| DBusDelegate(const DBusDelegate& other) = delete; |
| DBusDelegate& operator=(const DBusDelegate& other) = delete; |
| ~DBusDelegate() override = default; |
| |
| // Writes request into dBus message writer. |
| virtual bool WriteRequest(dbus::MessageWriter* writer) = 0; |
| |
| // Parses response, retrieves status information from it and returns it. |
| // Optional - returns OK if absent. |
| virtual Status ParseResponse(dbus::MessageReader* reader) { |
| return Status::StatusOK(); |
| } |
| |
| protected: |
| DBusDelegate(const char* dbus_method, |
| MissiveClientImpl* owner, |
| base::OnceCallback<void(Status)> completion_callback) |
| : dbus_method_(dbus_method), |
| owner_(owner), |
| completion_callback_(std::move(completion_callback)) {} |
| |
| private: |
| // Implementation of DisconnectableClient::Delegate |
| void DoCall(base::OnceClosure cb) final { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(owner_->origin_checker_); |
| base::ScopedClosureRunner autorun(std::move(cb)); |
| dbus::MethodCall method_call(missive::kMissiveServiceInterface, |
| dbus_method_); |
| dbus::MessageWriter writer(&method_call); |
| if (!WriteRequest(&writer)) { |
| Status status(error::UNKNOWN, |
| "MessageWriter was unable to append the request."); |
| LOG(ERROR) << status; |
| std::move(completion_callback_).Run(status); |
| return; |
| } |
| |
| // Make a dBus call. |
| owner_->missive_service_proxy_->CallMethod( |
| &method_call, dbus::ObjectProxy::TIMEOUT_USE_DEFAULT, |
| base::BindOnce( |
| [](base::ScopedClosureRunner autorun, |
| base::WeakPtr<DBusDelegate> self, dbus::Response* response) { |
| if (!self) { |
| return; // Delegate already deleted. |
| } |
| DCHECK_CALLED_ON_VALID_SEQUENCE(self->owner_->origin_checker_); |
| if (!response) { |
| self->Respond( |
| Status(error::UNAVAILABLE, "Returned no response")); |
| return; |
| } |
| self->response_ = response; |
| }, |
| std::move(autorun), weak_ptr_factory_.GetWeakPtr())); |
| } |
| |
| // Process dBus response, if status is OK, or error otherwise. |
| void Respond(Status status) final { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(owner_->origin_checker_); |
| if (!completion_callback_) { |
| return; |
| } |
| if (status.ok()) { |
| dbus::MessageReader reader(response_); |
| status = ParseResponse(&reader); |
| } |
| std::move(completion_callback_).Run(status); |
| } |
| |
| const char* const dbus_method_; |
| dbus::Response* response_; |
| MissiveClientImpl* const owner_; |
| base::OnceCallback<void(Status)> completion_callback_; |
| |
| // Weak pointer factory - must be last member of the class. |
| base::WeakPtrFactory<DBusDelegate> weak_ptr_factory_{this}; |
| }; |
| |
| class EnqueueRecordDelegate : public DBusDelegate { |
| public: |
| EnqueueRecordDelegate(Priority priority, |
| Record record, |
| MissiveClientImpl* owner, |
| base::OnceCallback<void(Status)> completion_callback) |
| : DBusDelegate( |
| missive::kEnqueueRecord, owner, std::move(completion_callback)) { |
| *request_.mutable_record() = std::move(record); |
| request_.set_priority(priority); |
| } |
| |
| bool WriteRequest(dbus::MessageWriter* writer) override { |
| return writer->AppendProtoAsArrayOfBytes(request_); |
| } |
| |
| Status ParseResponse(dbus::MessageReader* reader) override { |
| EnqueueRecordResponse response_body; |
| if (!reader->PopArrayOfBytesAsProto(&response_body)) { |
| return Status(error::INTERNAL, "Response was not parsable."); |
| } |
| Status status; |
| status.RestoreFrom(response_body.status()); |
| return status; |
| } |
| |
| private: |
| EnqueueRecordRequest request_; |
| }; |
| |
| class FlushDelegate : public DBusDelegate { |
| public: |
| FlushDelegate(Priority priority, |
| MissiveClientImpl* owner, |
| base::OnceCallback<void(Status)> completion_callback) |
| : DBusDelegate( |
| missive::kFlushPriority, owner, std::move(completion_callback)) { |
| request_.set_priority(priority); |
| } |
| |
| bool WriteRequest(dbus::MessageWriter* writer) override { |
| return writer->AppendProtoAsArrayOfBytes(request_); |
| } |
| |
| Status ParseResponse(dbus::MessageReader* reader) override { |
| FlushPriorityResponse response_body; |
| if (!reader->PopArrayOfBytesAsProto(&response_body)) { |
| return Status(error::INTERNAL, "Response was not parsable."); |
| } |
| Status status; |
| status.RestoreFrom(response_body.status()); |
| return status; |
| } |
| |
| private: |
| FlushPriorityRequest request_; |
| }; |
| |
| class UpdateEncryptionKeyDelegate : public DBusDelegate { |
| public: |
| UpdateEncryptionKeyDelegate(const SignedEncryptionInfo& encryption_info, |
| MissiveClientImpl* owner) |
| : DBusDelegate( |
| missive::kUpdateEncryptionKey, owner, base::DoNothing()) { |
| *request_.mutable_signed_encryption_info() = encryption_info; |
| } |
| |
| bool WriteRequest(dbus::MessageWriter* writer) override { |
| return writer->AppendProtoAsArrayOfBytes(request_); |
| } |
| |
| private: |
| UpdateEncryptionKeyRequest request_; |
| }; |
| |
| class ReportSuccessDelegate : public DBusDelegate { |
| public: |
| ReportSuccessDelegate(const SequenceInformation& sequence_information, |
| bool force_confirm, |
| MissiveClientImpl* owner) |
| : DBusDelegate( |
| missive::kConfirmRecordUpload, owner, base::DoNothing()) { |
| *request_.mutable_sequence_information() = sequence_information; |
| request_.set_force_confirm(force_confirm); |
| } |
| |
| bool WriteRequest(dbus::MessageWriter* writer) override { |
| return writer->AppendProtoAsArrayOfBytes(request_); |
| } |
| |
| private: |
| ConfirmRecordUploadRequest request_; |
| }; |
| |
| void OwnerChanged(const std::string& old_owner, |
| const std::string& new_owner) { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(origin_checker_); |
| client_.SetAvailability(/*is_available=*/!new_owner.empty()); |
| } |
| |
| void ServerAvailable(bool service_is_available) { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(origin_checker_); |
| client_.SetAvailability(/*is_available=*/service_is_available); |
| } |
| |
| scoped_refptr<dbus::ObjectProxy> missive_service_proxy_; |
| |
| DisconnectableClient client_; |
| |
| // Weak pointer factory - must be last member of the class. |
| base::WeakPtrFactory<MissiveClientImpl> weak_ptr_factory_{this}; |
| }; |
| |
| } // namespace |
| |
| MissiveClient::MissiveClient() { |
| DCHECK(!g_instance); |
| g_instance = this; |
| } |
| |
| MissiveClient::~MissiveClient() { |
| DCHECK_EQ(this, g_instance); |
| g_instance = nullptr; |
| } |
| |
| scoped_refptr<base::SequencedTaskRunner> MissiveClient::origin_task_runner() |
| const { |
| return origin_task_runner_; |
| } |
| |
| // static |
| void MissiveClient::Initialize(dbus::Bus* bus) { |
| DCHECK(bus); |
| (new MissiveClientImpl())->Init(bus); |
| } |
| |
| // static |
| void MissiveClient::Shutdown() { |
| DCHECK(g_instance); |
| delete g_instance; |
| } |
| |
| // static |
| MissiveClient* MissiveClient::Get() { |
| return g_instance; |
| } |
| |
| } // namespace reporting |