blob: 1b11ca64a3d774de0d27c620942f9ac702ef84fd [file] [log] [blame]
// Copyright 2016 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "patchpanel/message_dispatcher.h"
#include <utility>
#include <vector>
#include <sys/socket.h>
#include <base/functional/bind.h>
#include <base/logging.h>
#include <google/protobuf/message_lite.h>
namespace patchpanel {
MessageDispatcherInternal::MessageDispatcherInternal(base::ScopedFD fd)
: fd_(std::move(fd)) {}
void MessageDispatcherInternal::RegisterFailureHandler(
base::RepeatingCallback<void()> handler) {
failure_handler_ = std::move(handler);
}
void MessageDispatcherInternal::RegisterMessageHandler(
base::RepeatingCallback<void()> handler) {
watcher_ =
base::FileDescriptorWatcher::WatchReadable(fd_.get(), std::move(handler));
}
bool MessageDispatcherInternal::GetMessage(
google::protobuf::MessageLite* proto) {
char buffer[1024];
ssize_t len = recvfrom(fd_.get(), buffer, sizeof(buffer), MSG_DONTWAIT,
nullptr, nullptr);
// Don't stop watchers on these errors.
if (len < 0 && (errno == EAGAIN || errno == EINTR)) {
return false;
}
// Handle errors (len < 0) and graceful shutdowns (len == 0). Explicit 0-byte
// messages are not supported and considered as graceful shutdown.
if (len <= 0) {
if (len == 0) {
LOG(ERROR) << "Read failed: stopping watcher: socket closed";
} else {
PLOG(ERROR) << "Read failed: stopping watcher";
}
watcher_.reset();
if (!failure_handler_.is_null()) {
failure_handler_.Run();
}
return false;
}
proto->Clear();
if (!proto->ParseFromArray(buffer, static_cast<int>(len))) {
LOG(ERROR) << "Error parsing protobuf " << proto->GetTypeName();
return false;
}
return true;
}
bool MessageDispatcherInternal::SendMessage(
const google::protobuf::MessageLite& proto) {
if (!proto.IsInitialized()) {
LOG(DFATAL) << "protobuf missing mandatory fields";
return false;
}
std::string str;
if (!proto.SerializeToString(&str)) {
LOG(ERROR) << "error serializing protobuf";
return false;
}
ssize_t len = write(fd_.get(), str.data(), str.size());
if (len < 0) {
PLOG(ERROR) << "write failed";
return false;
}
if (len != static_cast<ssize_t>(str.size())) {
LOG(ERROR) << "short write on protobuf";
return false;
}
return true;
}
} // namespace patchpanel