blob: 950ed8f699465b7c11aa76110ba203c96a5d9f6d [file] [log] [blame]
# Copyright 2019 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
from io import BytesIO
from os import strerror
from struct import Struct
import socket
from socket import (
AF_NETLINK, AF_UNSPEC,
MSG_PEEK, MSG_TRUNC,
NETLINK_ROUTE,
SOCK_DGRAM,
inet_pton,
)
IFA_LOCAL = 2
IFF_UP = 0x1
IFLA_IFNAME = 3
NLMSG_ERROR = 2
RTM_NEWLINK = 16
RTM_GETLINK = 18
RTM_NEWADDR = 20
NLM_F_REQUEST = 0x1
NLM_F_ACK = 0x4
NLM_F_EXCL = 0x200
NLM_F_CREATE = 0x400
nlmsghdr = Struct('=IHHII')
nlmsgerr = Struct('i')
rtattr = Struct('HH')
ifinfomsg = Struct('BHiII')
ifaddrmsg = Struct('BBBBi')
def create_nlmsg(nlmsg_type, nlmsg_flags, nlmsg_seq, nlmsg_pid, data):
nlmsg_len = nlmsghdr.size + len(data)
return nlmsghdr.pack(nlmsg_len, nlmsg_type, nlmsg_flags, nlmsg_seq, nlmsg_pid) + data
def create_rtattr(rta_type, data):
rta_len = rtattr.size + len(data)
return rtattr.pack(rta_len, rta_type) + data
def parse_message(msg):
buf = BytesIO(msg)
hdr = nlmsghdr.unpack(buf.read(nlmsghdr.size))
if hdr[1] == NLMSG_ERROR:
err = nlmsgerr.unpack(buf.read(nlmsgerr.size))
error = -err[0]
if error != 0:
raise OSError(error, strerror(error))
elif hdr[1] == RTM_NEWLINK:
# kernel responds to RTM_GETLINK with RTM_NEWLINK.
# We only care about the ifindex for get_link_ifindex.
return ifinfomsg.unpack(buf.read(ifinfomsg.size))
class RtNetlink:
def __init__(self):
self.sock = socket.socket(AF_NETLINK, SOCK_DGRAM, NETLINK_ROUTE)
self.addr = (0, 0)
try:
self.sock.bind(self.addr)
except socket.error:
self.sock.close()
raise
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.sock.close()
def send_message(self, msg):
self.sock.sendto(msg, self.addr)
# Messages are variable length, but 128 is enough for the the ones we care about.
resp = self.sock.recv(128)
return parse_message(resp)
def get_link_ifindex(self, ifname):
body = ifinfomsg.pack(AF_UNSPEC, 0, 0, 0, 0)
body += create_rtattr(IFLA_IFNAME, ifname)
flags = NLM_F_REQUEST
msg = create_nlmsg(RTM_GETLINK, flags, 1, 0, body)
resp = self.send_message(msg)
return resp[2]
def set_link_up(self, ifindex):
body = ifinfomsg.pack(AF_UNSPEC, 0, ifindex, IFF_UP, IFF_UP)
flags = NLM_F_REQUEST|NLM_F_ACK
msg = create_nlmsg(RTM_NEWLINK, flags, 1, 0, body)
self.send_message(msg)
def add_address(self, ifindex, family, address, prefixlen):
body = ifaddrmsg.pack(family, prefixlen, 0, 0, ifindex)
addr = inet_pton(family, address)
body += create_rtattr(IFA_LOCAL, addr)
flags = NLM_F_REQUEST|NLM_F_ACK|NLM_F_EXCL|NLM_F_CREATE
msg = create_nlmsg(RTM_NEWADDR, flags, 1, 0, body)
self.send_message(msg)