blob: 63255c2770fee51e72d8dda9b5eee254435b2319 [file] [log] [blame]
// Copyright 2021 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "missive/compression/compression_module.h"
#include <optional>
#include <string>
#include <utility>
#include <base/bind.h>
#include <base/callback.h>
#include <base/feature_list.h>
#include <base/logging.h>
#include <base/memory/ref_counted.h>
#include <base/strings/string_piece.h>
#include <base/task/thread_pool.h>
#include <snappy.h>
#include "missive/proto/record.pb.h"
#include "missive/resources/resource_interface.h"
namespace reporting {
const base::Feature kCompressReportingPipeline{
CompressionModule::kCompressReportingFeature,
base::FEATURE_ENABLED_BY_DEFAULT};
// static
const char CompressionModule::kCompressReportingFeature[] =
"CompressReportingPipeline";
// static
scoped_refptr<CompressionModule> CompressionModule::Create(
uint64_t compression_threshold,
CompressionInformation::CompressionAlgorithm compression_type) {
return base::WrapRefCounted(
new CompressionModule(compression_threshold, compression_type));
}
void CompressionModule::CompressRecord(
std::string record,
base::OnceCallback<void(std::string, std::optional<CompressionInformation>)>
cb) const {
if (!is_enabled()) {
// Compression disabled, don't compress and don't return compression
// information.
std::move(cb).Run(std::move(record), std::nullopt);
return;
}
// Compress if record is larger than the compression threshold and compression
// enabled
switch (compression_type_) {
case CompressionInformation::COMPRESSION_NONE: {
// Don't compress, simply return serialized record
CompressionInformation compression_information;
compression_information.set_compression_algorithm(
CompressionInformation::COMPRESSION_NONE);
std::move(cb).Run(std::move(record), std::move(compression_information));
break;
}
case CompressionInformation::COMPRESSION_SNAPPY: {
if (record.length() < compression_threshold_) {
// Record size is smaller than threshold, don't compress.
CompressionInformation compression_information;
compression_information.set_compression_algorithm(
CompressionInformation::COMPRESSION_NONE);
std::move(cb).Run(std::move(record),
std::move(compression_information));
return;
}
// Before doing compression, we must make sure there is enough memory - we
// are going to temporarily double the record.
ScopedReservation scoped_reservation(record.size(), GetMemoryResource());
if (!scoped_reservation.reserved()) {
CompressionInformation compression_information;
compression_information.set_compression_algorithm(
CompressionInformation::COMPRESSION_NONE);
std::move(cb).Run(std::move(record),
std::move(compression_information));
return;
}
// Perform compression.
CompressionModule::CompressRecordSnappy(std::move(record), std::move(cb));
break;
}
}
}
// static
bool CompressionModule::is_enabled() {
return base::FeatureList::IsEnabled(kCompressReportingPipeline);
}
CompressionModule::CompressionModule(
uint64_t compression_threshold,
CompressionInformation::CompressionAlgorithm compression_type)
: compression_type_(compression_type),
compression_threshold_(compression_threshold) {}
CompressionModule::~CompressionModule() = default;
void CompressionModule::CompressRecordSnappy(
std::string record,
base::OnceCallback<void(std::string, std::optional<CompressionInformation>)>
cb) const {
// Compression is enabled and crosses the threshold,
std::string output;
snappy::Compress(record.data(), record.size(), &output);
// Return compressed string
CompressionInformation compression_information;
compression_information.set_compression_algorithm(
CompressionInformation::COMPRESSION_SNAPPY);
std::move(cb).Run(output, compression_information);
}
} // namespace reporting