blob: 3f696cab1f0befd4b4cb3c030f58612e5fb2e4ce [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2021 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from subprocess import PIPE
import subprocess
import sys
# Must be in sync with src/platform2/patchpanel/routing_service.h
MARK_TO_SOURCES = {
"0x0/0x3f00": "UNKNOWN",
"0x100/0x3f00": "CHROME",
"0x200/0x3f00": "USER",
"0x300/0x3f00": "UPDATE_ENGINE",
"0x400/0x3f00": "SYSTEM",
"0x500/0x3f00": "HOST_VPN",
"0x2000/0x3f00": "ARC",
"0x2100/0x3f00": "CROSVM",
"0x2200/0x3f00": "PLUGINVM",
"0x2300/0x3f00": "TETHERING",
"0x2400/0x3f00": "ARC_VPN",
}
# Helper for executing a subprocess and obtaining the standard output.
def command(args):
return map(str, subprocess.run(args, stdout=PIPE).stdout.splitlines())
# Resets traffic counters in iptables. This will confuse shill traffic accounting
# logic and should only be used for testing.
def reset_counters(ipv4, ipv6):
if ipv4:
command(["iptables", "-t", "mangle", "-Z", "-w"])
if ipv6:
command(["ip6tables", "-t", "mangle", "-Z", "-w"])
def easy_unit(counter):
counter = int(counter)
suffix = "B"
if counter >= 1024:
suffix = "K"
counter = counter / 1024
if counter >= 1024:
suffix = "M"
counter = counter / 1024
if counter >= 1024:
suffix = "G"
counter = counter / 1024
return str(int(counter)) + suffix
# Print traffic counters grouped by network interface and source.
def print_counters(ipv4, ipv6, easy_units):
# Extract interfaces for which counter chains are defined.
# We are looking for jump chains like "-A POSTROUTING -o wlan0 -c 13947 1698015 -j tx_wlan0"
postrouting_chains = command(["iptables", "-t", "mangle", "-v", "-S", "POSTROUTING", "-w"])
ifaces = [x.split()[3] for x in postrouting_chains if "tx_" in x]
# Create a nested map of iface -> source -> {tx,rx}_{ipv4,ipv6} buckets to store counters.
counters = { iface: {s: {} for s in MARK_TO_SOURCES.values()} for iface in ifaces}
# For every iface, dump all the tx and rx counters for both ipv4 and ipv6.
for iface in ifaces:
counter_commands = {}
if ipv4:
counter_commands["tx_ipv4"] = ["iptables", "-t", "mangle", "-v", "-S", "tx_" + iface, "-w"]
counter_commands["rx_ipv4"] = ["iptables", "-t", "mangle", "-v", "-S", "rx_" + iface, "-w"]
if ipv6:
counter_commands["tx_ipv6"] = ["ip6tables", "-t", "mangle", "-v", "-S", "tx_" + iface, "-w"]
counter_commands["rx_ipv6"] = ["ip6tables", "-t", "mangle", "-v", "-S", "rx_" + iface, "-w"]
# Extract counters from subchains for all source rules looking like:
# "-A tx_eth0 -m mark --mark 0x100/0x3f00 -c 33456 8637724 -j RETURN"
for bucket, arg in counter_commands.items():
for counter in command(arg):
atoms = counter.split()
if len(atoms) == 5:
counters[iface]["UNKNOWN"][bucket] = atoms[4][:-1]
continue
if len(atoms) < 9:
continue
source = MARK_TO_SOURCES.get(atoms[5])
if source:
counters[iface][source][bucket] = atoms[8]
# Print the output
for iface, tail in counters.items():
for source, buckets in tail.items():
if sum(map(int, buckets.values())) == 0:
continue
o = ""
for bucket, counter in buckets.items():
if counter == "0":
continue
if easy_units:
counter = easy_unit(counter)
o += bucket + "=" + counter + " "
print(iface, source, o)
HELP = [
"show|print|list: print traffic counters grouped by network interface",
"reset: reset traffic counters",
"help: print this help information",
"options:",
" -h: use unit suffixes B (byte), K (kilobyte), M (megabyte), G (gigabyte). By default counters are displayed as total bytes",
" -4: print or reset IPv4 counters",
" -6: print or reset IPv6 counters",
" (by default both -4 and -6 are passed implicitly)",
]
def print_help():
for l in HELP:
print(l, file=sys.stderr)
def main():
easy_units = "-h" in sys.argv
ipv4 = "-6" not in sys.argv
ipv6 = "-4" not in sys.argv
if not ipv4 and not ipv6:
ipv4 = True
ipv6 = True
if len(sys.argv) < 2 or sys.argv[1] == "help":
print_help()
return 0
if sys.argv[1] in ["show", "print", "list"]:
print_counters(ipv4, ipv6, easy_units)
return 0
if sys.argv[1] == "reset":
reset_counters(ipv4, ipv6)
return 0
print("incorrect arguments", file=sys.stderr)
print_help()
return 1
if __name__ == '__main__':
sys.exit(main())