iioservice: Support Moving Averages of Samples

This commit adds support of moving averages for clients that require
lower frequencies than the IIO device frequency.

BUG=chromium:1006141
TEST=builds and unit tests.

Change-Id: I628364c1acf15b2586be489f8b67183936e94ecc
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform2/+/2319438
Commit-Queue: Cheng-Hao Yang <chenghaoyang@chromium.org>
Tested-by: Cheng-Hao Yang <chenghaoyang@chromium.org>
Reviewed-by: Gwendal Grignou <gwendal@chromium.org>
Auto-Submit: Cheng-Hao Yang <chenghaoyang@chromium.org>
diff --git a/iioservice/daemon/samples_handler.cc b/iioservice/daemon/samples_handler.cc
index 025e4bc..7824138 100644
--- a/iioservice/daemon/samples_handler.cc
+++ b/iioservice/daemon/samples_handler.cc
@@ -26,6 +26,12 @@
 
 namespace iioservice {
 
+namespace {
+
+constexpr char kNoBatchChannels[][10] = {"timestamp", "count"};
+
+}
+
 // static
 void SamplesHandler::SamplesHandlerDeleter(SamplesHandler* handler) {
   if (handler == nullptr)
@@ -235,6 +241,16 @@
       on_sample_updated_callback_(std::move(on_sample_updated_callback)),
       on_error_occurred_callback_(std::move(on_error_occurred_callback)) {
   DCHECK_GE(dev_max_frequency_, dev_min_frequency_);
+
+  auto channels = iio_device_->GetAllChannels();
+  for (size_t i = 0; i < channels.size(); ++i) {
+    for (size_t j = 0; j < base::size(kNoBatchChannels); ++j) {
+      if (strcmp(channels[i]->GetId(), kNoBatchChannels[j]) == 0) {
+        no_batch_chn_indices.emplace(i);
+        break;
+      }
+    }
+  }
 }
 
 SamplesHandler::SamplesHandler(
@@ -256,6 +272,16 @@
       on_sample_updated_callback_(std::move(on_sample_updated_callback)),
       on_error_occurred_callback_(std::move(on_error_occurred_callback)) {
   DCHECK_GE(dev_max_frequency_, dev_min_frequency_);
+
+  auto channels = iio_device_->GetAllChannels();
+  for (size_t i = 0; i < channels.size(); ++i) {
+    for (size_t j = 0; j < base::size(kNoBatchChannels); ++j) {
+      if (strcmp(channels[i]->GetId(), kNoBatchChannels[j]) == 0) {
+        no_batch_chn_indices.emplace(i);
+        break;
+      }
+    }
+  }
 }
 
 void SamplesHandler::SetSampleWatcherOnThread() {
@@ -542,8 +568,11 @@
       client_data->enabled_chn_indices.emplace(chn_index);
     }
   } else {
-    for (int32_t chn_index : iio_chn_indices)
+    for (int32_t chn_index : iio_chn_indices) {
       client_data->enabled_chn_indices.erase(chn_index);
+      // remove cached chn's moving average
+      clients_map_[client_data].chns.erase(chn_index);
+    }
   }
 
   ipc_task_runner_->PostTask(
@@ -649,10 +678,33 @@
     int step =
         std::max(1, static_cast<int>(dev_frequency_ / client.first->frequency));
 
+    // Update moving averages for channels
+    for (int32_t chn_index : client.first->enabled_chn_indices) {
+      if (no_batch_chn_indices.find(chn_index) != no_batch_chn_indices.end())
+        continue;
+
+      if (sample->find(chn_index) == sample->end()) {
+        LOG(ERROR) << "Missing chn index: " << chn_index << " in sample";
+        continue;
+      }
+
+      int size = samples_cnt_ - client.second.sample_index + 1;
+      if (client.second.chns.find(chn_index) == client.second.chns.end() &&
+          size != 1) {
+        // A new enabled channel: fill up previous sample points with the
+        // current value
+        client.second.chns[chn_index] =
+            sample.value()[chn_index] * (size * (size - 1) / 2);
+      }
+
+      client.second.chns[chn_index] += sample.value()[chn_index] * size;
+    }
+
     if (client.second.sample_index + step - 1 <= samples_cnt_) {
       // Send a sample to the client
       int64_t size = samples_cnt_ - client.second.sample_index + 1;
       DCHECK_GE(size, 1);
+      int64_t denom = ((size + 1) * size / 2);
 
       libmems::IioDevice::IioSample client_sample;
       for (int32_t chn_index : client.first->enabled_chn_indices) {
@@ -661,10 +713,24 @@
           continue;
         }
 
-        client_sample[chn_index] = sample.value()[chn_index];
+        if (no_batch_chn_indices.find(chn_index) !=
+            no_batch_chn_indices.end()) {
+          // Use the current value directly
+          client_sample[chn_index] = sample.value()[chn_index];
+          continue;
+        }
+
+        if (client.second.chns.find(chn_index) == client.second.chns.end()) {
+          LOG(ERROR) << "Missed chn index: " << chn_index
+                     << " in moving averages";
+          continue;
+        }
+
+        client_sample[chn_index] = client.second.chns[chn_index] / denom;
       }
 
       client.second.sample_index = samples_cnt_ + 1;
+      client.second.chns.clear();
 
       ipc_task_runner_->PostTask(
           FROM_HERE,
diff --git a/iioservice/daemon/samples_handler.h b/iioservice/daemon/samples_handler.h
index 02b3649..79e9152 100644
--- a/iioservice/daemon/samples_handler.h
+++ b/iioservice/daemon/samples_handler.h
@@ -90,6 +90,8 @@
   struct SampleData {
     // The starting index of the next sample.
     uint64_t sample_index = 0;
+    // Moving averages of channels except for channels that have no batch mode
+    std::map<int32_t, int64_t> chns;
   };
 
   static const uint32_t kNumReadFailedLogsBeforeGivingUp = 100;
@@ -186,6 +188,8 @@
                                cros::mojom::ObserverErrorType)>
       on_error_occurred_callback_;
 
+  std::set<int32_t> no_batch_chn_indices;
+
   std::unique_ptr<base::FileDescriptorWatcher::Controller> watcher_;
 
  private:
diff --git a/iioservice/daemon/test_fakes.cc b/iioservice/daemon/test_fakes.cc
index 80bcfe8..d85d290 100644
--- a/iioservice/daemon/test_fakes.cc
+++ b/iioservice/daemon/test_fakes.cc
@@ -22,6 +22,21 @@
 
 namespace fakes {
 
+namespace {
+
+int64_t CalcMovingAverage(const std::vector<int64_t>& values) {
+  int64_t size = values.size();
+  int64_t value_total = 0, sum = 0;
+  for (int64_t i = size - 1; i >= 0; --i) {
+    sum += values[i];
+    value_total += sum;
+  }
+
+  return value_total / ((size + 1) * size / 2);
+}
+
+}  // namespace
+
 // static
 FakeSamplesHandler::ScopedFakeSamplesHandler FakeSamplesHandler::CreateWithFifo(
     scoped_refptr<base::SingleThreadTaskRunner> ipc_task_runner,
@@ -143,9 +158,29 @@
 
       CHECK(it != sample.end());
 
-      CHECK_EQ(it->second,
-               libmems::fakes::kFakeAccelSamples[sample_index_ + step - 1]
-                                                [chnIndex]);
+      // Check timestamp channel
+      if (strncmp(libmems::fakes::kFakeAccelChns[chnIndex],
+                  libmems::kTimestampAttr,
+                  strlen(libmems::kTimestampAttr)) == 0) {
+        CHECK_EQ(it->second,
+                 libmems::fakes::kFakeAccelSamples[sample_index_ + step - 1]
+                                                  [chnIndex]);
+        continue;
+      }
+
+      // Check other channels
+      std::vector<int64_t> values;
+      for (int index = 0; index < step; ++index) {
+        if (chnIndex == 1 && sample_index_ + index < pause_index_) {
+          values.push_back(
+              libmems::fakes::kFakeAccelSamples[pause_index_][chnIndex]);
+        } else {
+          values.push_back(libmems::fakes::kFakeAccelSamples[sample_index_ +
+                                                             index][chnIndex]);
+        }
+      }
+
+      CHECK_EQ(it->second, CalcMovingAverage(values));
     }
   } else {
     auto channels = device_->GetAllChannels();