blob: 1fbe219abac851eafde14d5d8736fc5067f87f9e [file] [log] [blame] [edit]
#!/usr/bin/env python3
# Copyright 2022 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Recover IP network when shill is not running."""
import re
import select
import subprocess
import sys
import time
# Helper for run a command with redirecting outputs.
def run(*args):
print("> " + " ".join(args))
# Do not use check=True here. We only need to check the error code in a few
# cases in this script.
return subprocess.run(
args,
stdout=sys.stdout,
stderr=sys.stderr,
)
def enable_interface(ifname):
run("ip", "link", "set", ifname, "up")
# For simplicity, ignore the prefix length here. Install a direct route to the
# gateway.
def install_ip_and_route(ifname, ip, gateway):
run("ip", "addr", "add", ip, "dev", ifname)
run("ip", "route", "add", gateway, "dev", ifname)
run("ip", "route", "add", "default", "via", gateway)
# Start dhcpcd for DHCP negotiation. Note that we modified dhcpcd package to
# stop it from installing addresses and routes by itself, so here we will parse
# its output to get the acquired DHCP lease and install routes manually. We only
# use dhcpcd for the initial DHCP negotiation and will not keep dhcpcd running
# after getting the lease. It means that we don't fully respect the state
# machine of a DHCP client. Here it is just for reestablish minimal connectivity
# so that we re-flash an image or re-deploy the package.
def run_dhcpcd(ifname):
print("Running dhcpcd")
with subprocess.Popen(
["dhcpcd7", "-B", "-4", ifname],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
encoding="utf-8",
) as proc:
client_ip = ""
gateway = ""
start = time.time()
end = start + 10 # run dhcpcd for 10 seconds at most
poll_obj = select.poll()
poll_obj.register(proc.stdout, select.POLLIN)
while True:
if proc.poll():
print("dhcpcd died unexpectedly")
break
if time.time() > end:
print(
"Timeout to get DHCP lease."
" Might be a temporary issue, try it again?"
)
break
if not poll_obj.poll(1000): # 1 second
continue
l = proc.stdout.readline()
# dhcpcd will output the same line twice. Skip one of them.
if l.startswith("dhcpcd") or len(l) == 0:
continue
print(l.strip())
match_ip = re.match(
r".*leased (\d+\.\d+\.\d+\.\d+) for \d+ seconds", l
)
if match_ip:
client_ip = match_ip[1]
match_gateway = re.match(
r".*adding default route via (\d+\.\d+\.\d+\.\d+)", l
)
if match_gateway:
gateway = match_gateway[1]
if len(client_ip) > 0 and len(gateway) > 0:
install_ip_and_route(ifname, client_ip, gateway)
print("IPv4 configured")
break
print("Stopping dhcpcd")
proc.kill()
def enable_slaac(ifname):
print("Enable SLAAC")
# In shill, we instruct the kernel to install IPv6 routes in the
# per-interface table. We need to undo that change here.
run(
"sysctl", "-w", f"net.ipv6.conf.{ifname}.accept_ra_rt_table=0"
)
run("sysctl", "-w", f"net.ipv6.conf.{ifname}.use_tempaddr=2")
run("sysctl", "-w", f"net.ipv6.conf.{ifname}.disable_ipv6=0")
run("sysctl", "-w", f"net.ipv6.conf.{ifname}.accept_dad=1")
run("sysctl", "-w", f"net.ipv6.conf.{ifname}.accept_ra=2")
def print_help():
print("recover-network <interface_name> [-4] [-6]")
print("Options:")
print(" -4: start DHCP negotiation")
print(" -6: enable SLAAC")
def main():
ipv4 = "-4" in sys.argv
ipv6 = "-6" in sys.argv
if not ipv4 and not ipv6:
ipv4 = True
ipv6 = True
if len(sys.argv) < 2 or sys.argv[1] == "help":
print_help()
return
ifname = sys.argv[1]
ret = run("ip", "link", "show", ifname)
if ret.returncode != 0:
print(f"Interface {ifname} not found")
print_help()
return
enable_interface("lo")
enable_interface(ifname)
if ipv4:
run_dhcpcd(ifname)
if ipv6:
enable_slaac(ifname)
if __name__ == "__main__":
main()