| #!/usr/bin/env python3 |
| # Copyright 2022 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Recover IP network when shill is not running.""" |
| |
| import re |
| import select |
| import subprocess |
| import sys |
| import time |
| |
| |
| # Helper for run a command with redirecting outputs. |
| def run(*args): |
| print("> " + " ".join(args)) |
| # Do not use check=True here. We only need to check the error code in a few |
| # cases in this script. |
| return subprocess.run( |
| args, |
| stdout=sys.stdout, |
| stderr=sys.stderr, |
| ) |
| |
| |
| def enable_interface(ifname): |
| run("ip", "link", "set", ifname, "up") |
| |
| |
| # For simplicity, ignore the prefix length here. Install a direct route to the |
| # gateway. |
| def install_ip_and_route(ifname, ip, gateway): |
| run("ip", "addr", "add", ip, "dev", ifname) |
| run("ip", "route", "add", gateway, "dev", ifname) |
| run("ip", "route", "add", "default", "via", gateway) |
| |
| |
| # Start dhcpcd for DHCP negotiation. Note that we modified dhcpcd package to |
| # stop it from installing addresses and routes by itself, so here we will parse |
| # its output to get the acquired DHCP lease and install routes manually. We only |
| # use dhcpcd for the initial DHCP negotiation and will not keep dhcpcd running |
| # after getting the lease. It means that we don't fully respect the state |
| # machine of a DHCP client. Here it is just for reestablish minimal connectivity |
| # so that we re-flash an image or re-deploy the package. |
| def run_dhcpcd(ifname): |
| print("Running dhcpcd") |
| with subprocess.Popen( |
| ["dhcpcd7", "-B", "-4", ifname], |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT, |
| encoding="utf-8", |
| ) as proc: |
| client_ip = "" |
| gateway = "" |
| start = time.time() |
| end = start + 10 # run dhcpcd for 10 seconds at most |
| poll_obj = select.poll() |
| poll_obj.register(proc.stdout, select.POLLIN) |
| while True: |
| if proc.poll(): |
| print("dhcpcd died unexpectedly") |
| break |
| if time.time() > end: |
| print( |
| "Timeout to get DHCP lease." |
| " Might be a temporary issue, try it again?" |
| ) |
| break |
| if not poll_obj.poll(1000): # 1 second |
| continue |
| l = proc.stdout.readline() |
| # dhcpcd will output the same line twice. Skip one of them. |
| if l.startswith("dhcpcd") or len(l) == 0: |
| continue |
| print(l.strip()) |
| match_ip = re.match( |
| r".*leased (\d+\.\d+\.\d+\.\d+) for \d+ seconds", l |
| ) |
| if match_ip: |
| client_ip = match_ip[1] |
| match_gateway = re.match( |
| r".*adding default route via (\d+\.\d+\.\d+\.\d+)", l |
| ) |
| if match_gateway: |
| gateway = match_gateway[1] |
| if len(client_ip) > 0 and len(gateway) > 0: |
| install_ip_and_route(ifname, client_ip, gateway) |
| print("IPv4 configured") |
| break |
| print("Stopping dhcpcd") |
| proc.kill() |
| |
| |
| def enable_slaac(ifname): |
| print("Enable SLAAC") |
| # In shill, we instruct the kernel to install IPv6 routes in the |
| # per-interface table. We need to undo that change here. |
| run( |
| "sysctl", "-w", f"net.ipv6.conf.{ifname}.accept_ra_rt_table=0" |
| ) |
| run("sysctl", "-w", f"net.ipv6.conf.{ifname}.use_tempaddr=2") |
| run("sysctl", "-w", f"net.ipv6.conf.{ifname}.disable_ipv6=0") |
| run("sysctl", "-w", f"net.ipv6.conf.{ifname}.accept_dad=1") |
| run("sysctl", "-w", f"net.ipv6.conf.{ifname}.accept_ra=2") |
| |
| |
| def print_help(): |
| print("recover-network <interface_name> [-4] [-6]") |
| print("Options:") |
| print(" -4: start DHCP negotiation") |
| print(" -6: enable SLAAC") |
| |
| |
| def main(): |
| ipv4 = "-4" in sys.argv |
| ipv6 = "-6" in sys.argv |
| if not ipv4 and not ipv6: |
| ipv4 = True |
| ipv6 = True |
| if len(sys.argv) < 2 or sys.argv[1] == "help": |
| print_help() |
| return |
| |
| ifname = sys.argv[1] |
| ret = run("ip", "link", "show", ifname) |
| if ret.returncode != 0: |
| print(f"Interface {ifname} not found") |
| print_help() |
| return |
| |
| enable_interface("lo") |
| enable_interface(ifname) |
| if ipv4: |
| run_dhcpcd(ifname) |
| if ipv6: |
| enable_slaac(ifname) |
| |
| |
| if __name__ == "__main__": |
| main() |