blob: fdbcf773b0d4eb0bede2869f8593197d2732ee62 [file] [log] [blame]
// Copyright 2023 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <string>
#include <string_view>
#include <utility>
#include <base/at_exit.h>
#include <base/logging.h>
#include <base/functional/bind.h>
#include <base/functional/callback_forward.h>
#include <base/functional/callback_helpers.h>
#include <base/run_loop.h>
#include <base/strings/strcat.h>
#include <base/task/single_thread_task_executor.h>
#include <base/task/sequenced_task_runner.h>
#include <base/task/thread_pool.h>
#include <base/task/thread_pool/thread_pool_instance.h>
#include <base/task/task_traits.h>
#include <base/threading/sequence_bound.h>
#include <brillo/dbus/dbus_connection.h>
#include <brillo/flag_helper.h>
#include <brillo/syslog_logging.h>
#include <dbus/bus.h>
#include "missive/client/missive_client.h"
#include "missive/proto/record_constants.pb.h"
#include "missive/util/status.h"
#include "missive/util/status_macros.h"
#include "missive/util/statusor.h"
// The tool for manual Enqueue and Flush operations.
// Built as part of missive when running tests.
// Detailed instruction - see in README.md
namespace reporting {
namespace {
class DBusSender {
public:
DBusSender() {
dbus::Bus::Options options;
bus_ = dbus_connection_.Connect();
CHECK(bus_);
MissiveClient::Initialize(bus_.get());
}
DBusSender(const DBusSender&) = delete;
DBusSender& operator=(const DBusSender&) = delete;
~DBusSender() { MissiveClient::Shutdown(); }
void EnqueueEvent(std::string_view priority_string,
std::string_view destination_string,
std::string_view event_string,
base::OnceCallback<void(Status)> cb) {
ASSIGN_OR_ONCE_CALLBACK_AND_RETURN(Priority priority, cb,
DecodePriority(priority_string));
ASSIGN_OR_ONCE_CALLBACK_AND_RETURN(Destination destination, cb,
DecodeDestination(destination_string));
Record record;
record.set_destination(destination);
record.mutable_data()->assign(event_string);
(new RunAndRetryIfNeeded(
base::BindRepeating(&MissiveClient::EnqueueRecord,
base::Unretained(MissiveClient::Get()), priority,
std::move(record)),
std::move(cb)))
->Run();
}
void Flush(std::string_view priority_string,
base::OnceCallback<void(Status)> cb) {
ASSIGN_OR_ONCE_CALLBACK_AND_RETURN(Priority priority, cb,
DecodePriority(priority_string));
(new RunAndRetryIfNeeded(
base::BindRepeating(&MissiveClient::Flush,
base::Unretained(MissiveClient::Get()), priority),
std::move(cb)))
->Run();
}
private:
// Helper class runs `action` and repeats it if the returned `status` is
// UNAVAILABLE. If the `status` is OK or any other error, exits.
class RunAndRetryIfNeeded {
public:
RunAndRetryIfNeeded(
base::RepeatingCallback<void(base::OnceCallback<void(Status)>)> action,
base::OnceCallback<void(Status)> cb)
: action_(action), cb_(std::move(cb)) {}
void Run() {
action_.Run(base::BindOnce(&RunAndRetryIfNeeded::CalledBack,
base::Unretained(this)));
}
private:
void CalledBack(Status status) {
if (status.ok() || status.error_code() != error::UNAVAILABLE) {
std::move(cb_).Run(status);
delete this;
return;
}
// Back off and retry.
base::SequencedTaskRunner::GetCurrentDefault()->PostDelayedTask(
FROM_HERE,
base::BindOnce(&RunAndRetryIfNeeded::Run, base::Unretained(this)),
/*delay=*/base::Seconds(1));
}
const base::RepeatingCallback<void(base::OnceCallback<void(Status)>)>
action_;
base::OnceCallback<void(Status)> cb_;
};
StatusOr<Priority> DecodePriority(std::string_view priority_string) {
Priority priority;
if (!Priority_Parse(std::string(priority_string), &priority) ||
priority == Priority::UNDEFINED_PRIORITY) {
return Status(
error::INVALID_ARGUMENT,
base::StrCat({"Wrong priority: ", std::string(priority_string)}));
}
return priority;
}
StatusOr<Destination> DecodeDestination(std::string_view destination_string) {
Destination destination;
if (!Destination_Parse(std::string(destination_string), &destination) ||
destination == Destination::UNDEFINED_DESTINATION) {
return Status(error::INVALID_ARGUMENT,
base::StrCat({"Wrong destination: ",
std::string(destination_string)}));
}
return destination;
}
brillo::DBusConnection dbus_connection_;
scoped_refptr<::dbus::Bus> bus_;
};
} // namespace
} // namespace reporting
int main(int argc, char* argv[]) {
base::AtExitManager exit_manager;
DEFINE_string(priority, "UNDEFINED", "Priority of the queue");
DEFINE_string(destination, "UNDEFINED", "Destination of the event");
DEFINE_string(enqueue, "", "Event to be enqueued");
DEFINE_bool(flush, false, "Flag to flush the event");
brillo::FlagHelper::Init(argc, argv, R"(
missive_testing_tool:
To enqueue event:
--priority=... priority queue to enqueue to
--destination=... destination of the enqueued event
--enqueue="..." requesting to enqueue event
(record data passed as a string)
To flush queue:
--priority=... priority queue to flush
--flush requesting to flush the queue
)");
// Always log to syslog and log to stderr if we are connected to a tty.
brillo::InitLog(brillo::kLogToSyslog | brillo::kLogToStderrIfTty);
// Override the log items set by brillo::InitLog.
logging::SetLogItems(/*enable_process_id=*/true, /*enable_thread_id=*/true,
/*enable_timestamp=*/true, /*enable_tickcount=*/true);
base::ThreadPoolInstance::CreateAndStartWithDefaultParams(
"missive_test_thread_pool");
base::SingleThreadTaskExecutor task_executor(base::MessagePumpType::IO);
base::RunLoop run_loop;
base::SequenceBound<::reporting::DBusSender> sender(
base::ThreadPool::CreateSequencedTaskRunner({base::MayBlock()}));
auto cb = base::BindOnce(
[](base::ScopedClosureRunner done, ::reporting::Status status) {
LOG_IF(WARNING, status.ok()) << "Success!";
LOG_IF(ERROR, !status.ok()) << "Failed: " << status;
},
base::ScopedClosureRunner(run_loop.QuitClosure()));
if (FLAGS_flush) {
LOG(INFO) << "Flush priority=" << FLAGS_priority;
sender.AsyncCall(&::reporting::DBusSender::Flush)
.WithArgs(FLAGS_priority, std::move(cb));
} else if (!FLAGS_enqueue.empty()) {
LOG(INFO) << "Enqueue priority=" << FLAGS_priority << " "
<< " destination=" << FLAGS_destination
<< " size_of_data=" << FLAGS_enqueue.size();
sender.AsyncCall(&::reporting::DBusSender::EnqueueEvent)
.WithArgs(FLAGS_priority, FLAGS_destination, FLAGS_enqueue,
std::move(cb));
} else {
std::move(cb).Run(::reporting::Status(
::reporting::error::FAILED_PRECONDITION, "No request specified."));
}
run_loop.Run();
exit(0);
}