blob: 30e376b38506e0bdf5e4dcda52abc7d53a3a9563 [file] [log] [blame] [edit]
#!/usr/bin/env python3
# Copyright 2023 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Monitor conntrack connections.
Note that this only tracks connections that were started after the script was
run. It doesn't catch connections that were already being tracked by conntrack.
Usage:
Build and install a test CrOS image, and ssh into the DUT
# For basic output
(dut) $ /usr/lib/flimflam/test/monitor-conntrack
# See print_help() for explanations:
(dut) $ /usr/lib/flimflam/test/monitor-conntrack -h
(dut) $ /usr/lib/flimflam/test/monitor-conntrack -p
(dut) $ /usr/lib/flimflam/test/monitor-conntrack -v
(dut) $ /usr/lib/flimflam/test/monitor-conntrack -v -p
(dut) $ /usr/lib/flimflam/test/monitor-conntrack --awk
"""
import datetime
import re
import select
import subprocess
import sys
import signal
AWK_COMPAT = False
PRINT_LINES = False
VERBOSE = False
# Buffer to hold the unprocessed conntrack output.
CONNTRACK_LINES = []
# Dictionary mapping {id: Connection}. Only active connections that are being
# tracked by conntrack should be in this dictionary. Once a connection has been
# destroyed by conntrack, the corresponding entry should be removed from this
# dictionary.
ACTIVE_CONNECTIONS = {}
# List of old connections that are no longer actively tracked by conntrack.
# Should be in order of when the connection was destroyed.
DESTROYED_CONNECTIONS = []
# Must be in sync with src/platform2/patchpanel/routing_service.h
MARKS_TO_SOURCES = {
0x0: "UNKNOWN",
0x100: "CHROME",
0x200: "USER",
0x300: "UPDATE_ENGINE",
0x400: "SYSTEM",
0x500: "HOST_VPN",
0x2000: "ARC",
0x2100: "CROSVM",
0x2200: "PLUGINVM",
0x2300: "TETHERING",
0x2400: "ARC_VPN",
}
def log(message):
"""Prints the |message| if |VERBOSE| is enabled."""
if VERBOSE:
print(message)
class Connection:
"""Class that encapsulates a network connection.
Attributes:
conn_id: String of the id that conntrack assigned this connection. Note
that this is only guaranteed(?) to be unique among active connections.
After a connection is closed, conntrack can choose to reuse the id.
src_ip: String of the source IP address from the originator of the
connection. Can be IPv4 or IPv6.
src_port: String of the source port that from the originator of the
connection.
nat_src_ip: String of the potentially NAT-ed source IP address of the
originator of the connection. If the source IP is not NAT-ed, then
this will be unset. Can be IPv4 or IPv6.
nat_src_port: String of the potentially NAT-ed source port of the
originator of the connection. If the source port is not NAT-ed, then
this will be unset.
dst_ip: String of the destination IP address from the originator of the
connection. Can be IPv4 or IPv6.
dst_port: String of the destination port from the originator of the
connection.
proto: String of the L4 protocol, e.g. UDP, TCP, ICMP, etc.
start_timestamp: String of when the connection was first detected by
conntrack (correpsonds to the NEW event). In "YYYY/MM/DD HH:MM:SS"
format.
end_timestamp: String of when the connection was destroyed by conntrack
(corresponds to the DESTROY event). In "YYYY/MM/DD HH:MM:SS" format.
fwmark: String of the integer 'fwmark' traffic tag on a connection.
source: String of the source enum extracted from the |fwmark|
"""
def __init__(
self,
conn_id,
src_ip,
src_port,
nat_src_ip,
nat_src_port,
dst_ip,
dst_port,
proto,
start_timestamp,
):
self.conn_id = conn_id
self.src_ip = src_ip
self.src_port = src_port
self.nat_src_ip = nat_src_ip
self.nat_src_port = nat_src_port
self.dst_ip = dst_ip
self.dst_port = dst_port
self.proto = proto
self.start_timestamp = start_timestamp
self.end_timestamp = ""
self.fwmark = ""
self.source = ""
def __repr__(self):
return (
f"Connection(id: {self.conn_id}, src_ip: {self.src_ip}, "
f"src_port: {self.src_port}, nat_src_ip: {self.nat_src_ip}, "
f"nat_src_port: {self.nat_src_port}, dst_ip: {self.dst_ip}, "
f"dst_port: {self.dst_port}, proto: {self.proto}, "
f"start_timestamp: {self.start_timestamp}, "
f"end_timestamp: {self.end_timestamp}, fwmark: {self.fwmark}, "
f"source: {self.source})"
)
def set_fwmark(self, fwmark):
self.fwmark = fwmark
self.source = self.parse_fwmark(fwmark)
def parse_fwmark(self, fwmark):
"""Transforms the fwmark into a hex source value.
(copied from src/platform2/patchpanel/routing_service.h)
A representation of how fwmark bits are split and used for tagging and
routing traffic. The 32 bits of the fwmark are currently organized as:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| routing table id |VPN|source enum| reserved |*|
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
routing table id (16bits): the routing table id of a physical device
managed by shill or of a virtual private
network.
VPN (2bits): policy bits controlled by host application to force VPN
routing or bypass VPN routing.
source enum(6bits): policy bits controlled by patchpanel for grouping
originated traffic by domain.
reserved(7bits): no usage at the moment.
legacy SNAT(1bit): legacy bit used for setting up SNAT for ARC,
Crostini, and PluginVMs with iptables MASQUERADE.
Args:
fwmark: A string of an int fwmark value, e.g. "123456789"
Returns:
The 6 |source| bits from bits 18-24 in hex form.
"""
return MARKS_TO_SOURCES[int(fwmark) & 0x3F00]
def awk(self):
"""Returns string output that would be easier for |awk| to parse."""
# Fields that might not be set in a connection
nat_src_ip = None
if self.nat_src_ip:
nat_src_ip = self.nat_src_ip
nat_src_port = None
if self.nat_src_port:
nat_src_port = self.nat_src_port
end_epoch = None
if self.end_timestamp:
end_epoch = datetime_to_epoch(self.end_timestamp)
fwmark = None
if self.fwmark:
fwmark = self.fwmark
source = None
if self.source:
source = self.source
return (
f"{self.conn_id} "
f"{self.src_ip} "
f"{self.src_port} "
f"{nat_src_ip} "
f"{nat_src_port} "
f"{self.dst_ip} "
f"{self.dst_port} "
f"{self.proto} "
f"{datetime_to_epoch(self.start_timestamp)} "
f"{end_epoch} "
f"{fwmark} "
f"{source}"
)
def exit_handler(signum, frame):
"""Handler to process all the conntrack lines"""
# Unused, this is just a catch for exiting the infinite-running process
del signum, frame
for line in CONNTRACK_LINES:
process_xml_line(line)
if AWK_COMPAT:
for conn in ACTIVE_CONNECTIONS.values():
print(conn.awk())
for conn in DESTROYED_CONNECTIONS:
print(conn.awk())
else:
print()
print("Active connections:")
for conn in ACTIVE_CONNECTIONS.values():
print(conn)
print()
print("Closed connections:")
for conn in DESTROYED_CONNECTIONS:
print(conn)
sys.exit()
def process_xml_line(line):
"""Processes each conntrack line. Assumed to be in |xml| form.
Example |xml| line:
Raw:
<flow type="update"><meta direction="original"><layer3 protonum="10"
protoname="ipv6"><src>2401:fa00:480:ee08:d027:80aa:5a5a:139a</src>
<dst>2404:6800:4004:81f::2004</dst></layer3> ... </flow>
Indented (for readability purposes):
<flow type="update">
<meta direction="original">
<layer3 protonum="10" protoname="ipv6">
<src>2401:fa00:480:ee08:d027:80aa:5a5a:139a</src>
<dst>2404:6800:4004:81f::2004</dst>
</layer3>
<layer4 protonum="6" protoname="tcp">
<sport>39128</sport>
<dport>443</dport>
</layer4>
</meta>
<meta direction="reply">
<layer3 protonum="10" protoname="ipv6">
<src>2404:6800:4004:81f::2004</src>
<dst>2401:fa00:480:ee08:d027:80aa:5a5a:139a</dst>
</layer3>
<layer4 protonum="6" protoname="tcp">
<sport>443</sport>
<dport>39128</dport>
</layer4>
</meta>
<meta direction="independent">
<state>FIN_WAIT</state>
<timeout>120</timeout>
<mark>67895552</mark>
<id>2009223818</id>
<assured/>
</meta>
<when>
<hour>14</hour>
<min>07</min>
<sec>39</sec>
<wday>3</wday>
<day>28</day>
<month>2</month>
<year>2023</year>
</when>
</flow>
"""
log("=============")
log(line)
event = ""
event_match = re.search(r"type=\"([a-z]+)\"", line)
if event_match:
event = event_match.group(1)
else:
log("No event found")
conn_id = ""
id_match = re.search(r"\<id\>(\d+)\<\/id\>", line)
if id_match:
conn_id = id_match.group(1)
log(f"id: {conn_id}")
else:
log("No id found")
if event == "new":
process_new(line, conn_id)
elif event == "update":
process_update(line, conn_id)
elif event == "destroy":
process_destroy(line, conn_id)
log("=============\n")
def process_new(line, conn_id):
"""Processes a conntrack line with the |new| event"""
if conn_id in ACTIVE_CONNECTIONS:
log(f"[process_new]: Already saw id {conn_id} in an active connection")
return
# There are two "src", "dst", "sport", and "dport"'s.
# The first group will be in the original outgoing flow.
# The second group will be in the reply direction, and may show any NAT
# rules that are being applied.
# Xml form:
# <src>2401:fa00:480:ee08:d027:80aa:5a5a:139a</src>
src_ip = ""
src_ip_match = re.search(r"\<src\>([\d+\.\:a-zA-Z]+)\<\/src\>", line)
if src_ip_match:
src_ip = src_ip_match.group(1)
log(f"src_ip: {src_ip}")
else:
log("No src_ip found")
# Xml form:
# <sport>39128</sport>
src_port = ""
src_port_match = re.search(r"\<sport\>(\d+)\<\/sport\>", line)
if src_port_match:
src_port = src_port_match.group(1)
log(f"sport: {src_port}")
else:
log("No sport found")
# Xml form:
# <dst>2404:6800:4004:81f::2004</dst>
dst_ip = ""
nat_src_ip = ""
dst_ip_match = re.findall(r"\<dst\>([\d+\.\:a-zA-Z]+)\<\/dst\>", line)
if dst_ip_match:
dst_ip = dst_ip_match[0]
log(f"dst_ip: {dst_ip}")
# Look at the reply-direction |dst*| to infer the NAT-ed source
if len(dst_ip_match) >= 2 and dst_ip_match[1] != src_ip:
nat_src_ip = dst_ip_match[1]
log(f"nat_src_ip: {nat_src_ip}")
else:
log("No dst_ip found")
# Xml form:
# <dport>443</dport>
dst_port = ""
nat_src_port = ""
dst_port_match = re.findall(r"\<dport\>(\d+)\<\/dport\>", line)
if dst_port_match:
dst_port = dst_port_match[0]
log(f"dport: {dst_port}")
# Look at the reply-direction |dst*| to infer the NAT-ed source
if len(dst_port_match) >= 2 and dst_port_match[1] != src_port:
nat_src_port = dst_port_match[1]
log(f"nat_src_port: {nat_src_port}")
else:
log("No dport found")
# Xml form:
# <layer4 protonum="6" protoname="tcp">
proto = ""
proto_match = re.search(
r"\<layer4 protonum=\"\d+\" protoname=\"([a-z]+)\"\>", line
)
if proto_match:
proto = proto_match.group(1)
log(f"proto: {proto}")
else:
log("No proto found")
start_timestamp = format_timestamp(line)
conn = Connection(
conn_id,
src_ip,
src_port,
nat_src_ip,
nat_src_port,
dst_ip,
dst_port,
proto,
start_timestamp,
)
ACTIVE_CONNECTIONS[conn_id] = conn
log(f"[process_new]: Tracking new connection {conn}")
def process_update(line, conn_id):
"""Processes a conntrack line with the |update| event.
Example xml line:
"""
conn = ACTIVE_CONNECTIONS.get(conn_id)
if conn is None:
log(f"[process_update]: Didn't find id {conn_id} in active connections")
return
# Xml form:
# <mark>67895552</mark>
fwmark = ""
fwmark_match = re.search(r"\<mark\>(\d+)\<\/mark\>", line)
if fwmark_match:
fwmark = fwmark_match.group(1)
conn.set_fwmark(fwmark)
log(f"fwmark: {fwmark}")
else:
log("No fwmark found")
# Update the |nat*| fields if it's been updated.
#
# Xml form:
# <dst>2404:6800:4004:81f::2004</dst>
nat_src_ip = ""
dst_ip_match = re.findall(r"\<dst\>([\d+\.\:a-zA-Z]+)\<\/dst\>", line)
if len(dst_ip_match) >= 2:
nat_src_ip = dst_ip_match[1]
if nat_src_ip != conn.src_ip and nat_src_ip != conn.nat_src_ip:
conn.nat_src_ip = nat_src_ip
log(f"nat_src_ip updated to: {nat_src_ip}")
# Xml form:
# <dport>443</dport>
nat_src_port = ""
dst_port_match = re.findall(r"\<dport\>(\d+)\<\/dport\>", line)
if len(dst_port_match) >= 2:
nat_src_port = dst_port_match[1]
if nat_src_port != conn.src_port and nat_src_port != conn.nat_src_port:
conn.nat_src_port = nat_src_port
log(f"nat_src_port updated to: {nat_src_port}")
log(f"[process_update]: Updated {conn}")
def process_destroy(line, conn_id):
"""Processes a conntrack line with the |destroy| event."""
conn = ACTIVE_CONNECTIONS.pop(conn_id, None)
if conn is None:
log(
f"[process_destroy]: Didn't find id {conn_id} in active connections"
)
return
conn.end_timestamp = format_timestamp(line)
DESTROYED_CONNECTIONS.append(conn)
log(f"[process_destroy]: Updated {conn}")
def format_timestamp(line):
"""Parses the xml entries for a timestamp into YYYY/MM/DD HH:MM:SS."""
# Xml form:
# <hour>14</hour>
# <min>07</min>
# <sec>39</sec>
# <wday>3</wday>
# <day>28</day>
# <month>2</month>
# <year>2023</year>
hour_match = re.search(r"\<hour\>(\d+)\<\/hour\>", line)
minute_match = re.search(r"\<min\>(\d+)\<\/min\>", line)
sec_match = re.search(r"\<sec\>(\d+)\<\/sec\>", line)
day_match = re.search(r"\<day\>(\d+)\<\/day\>", line)
month_match = re.search(r"\<month\>(\d+)\<\/month\>", line)
year_match = re.search(r"\<year\>(\d+)\<\/year\>", line)
timestamp = ""
if hour_match:
hour = hour_match.group(1)
minute = minute_match.group(1)
sec = sec_match.group(1)
day = day_match.group(1)
month = month_match.group(1)
year = year_match.group(1)
timestamp = f"{year}/{month}/{day} {hour}:{minute}:{sec}"
log(f"timestamp: {timestamp}")
else:
log("No timestamp found")
return timestamp
def datetime_to_epoch(datetime_str):
"""Converts a "YYYY/MM/DD HH:MM:SS" string into an integer epoch time.
For example:
"2023/3/6 13:27:22" => 1678076842
"""
return datetime.datetime.strptime(
datetime_str, "%Y/%m/%d %H:%M:%S"
).strftime("%s")
# Register the signal exit handler
signal.signal(signal.SIGINT, exit_handler)
def print_help():
print("Usage:")
print(" monitor-conntrack --awk|[options]")
print()
print(" --awk: output the processed lines in an awk-compatible format")
print(" options:")
print(" -h: print this help information")
print(" -p: print out the conntrack lines to stdout as we receive them")
print(" -v: show verbose logs during processing")
def main():
global AWK_COMPAT, VERBOSE, PRINT_LINES # pylint: disable=global-statement
if len(sys.argv) > 1:
if "--awk" in sys.argv:
AWK_COMPAT = True
else:
if "-h" in sys.argv:
print_help()
return 1
if "-v" in sys.argv:
VERBOSE = True
if "-p" in sys.argv:
PRINT_LINES = True
# Show continuous output of events
if not AWK_COMPAT:
print("Monitoring conntrack...")
print("Press Ctrl+C to stop")
with subprocess.Popen(
["conntrack", "-E", "-o", "xml,timestamp,id"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
encoding="utf-8",
) as proc:
poll_obj = select.poll()
poll_obj.register(proc.stdout, select.POLLIN)
while True:
if proc.poll():
print("conntrack died unexpectedly")
break
l = proc.stdout.readline()
if PRINT_LINES:
print(l)
CONNTRACK_LINES.append(l)
if not AWK_COMPAT:
print("Stopping conntrack")
proc.kill()
return 0
if __name__ == "__main__":
sys.exit(main())