| //go:build linux |
| // +build linux |
| |
| /* |
| Copyright 2015 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package nftables |
| |
| // |
| // NOTE: this needs to be tested in e2e since it uses nftables for everything. |
| // |
| |
| import ( |
| "context" |
| "crypto/sha256" |
| "encoding/base32" |
| "fmt" |
| "net" |
| "reflect" |
| "strconv" |
| "strings" |
| "sync" |
| "sync/atomic" |
| "time" |
| |
| v1 "k8s.io/api/core/v1" |
| discovery "k8s.io/api/discovery/v1" |
| "k8s.io/apimachinery/pkg/types" |
| "k8s.io/apimachinery/pkg/util/sets" |
| "k8s.io/apimachinery/pkg/util/wait" |
| "k8s.io/client-go/tools/events" |
| utilsysctl "k8s.io/component-helpers/node/util/sysctl" |
| "k8s.io/klog/v2" |
| "k8s.io/kubernetes/pkg/proxy" |
| "k8s.io/kubernetes/pkg/proxy/conntrack" |
| "k8s.io/kubernetes/pkg/proxy/healthcheck" |
| "k8s.io/kubernetes/pkg/proxy/metaproxier" |
| "k8s.io/kubernetes/pkg/proxy/metrics" |
| proxyutil "k8s.io/kubernetes/pkg/proxy/util" |
| proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables" |
| "k8s.io/kubernetes/pkg/util/async" |
| utilexec "k8s.io/utils/exec" |
| netutils "k8s.io/utils/net" |
| "k8s.io/utils/ptr" |
| "sigs.k8s.io/knftables" |
| ) |
| |
| const ( |
| // Our nftables table. All of our chains/sets/maps are created inside this table, |
| // so they don't need any "kube-" or "kube-proxy-" prefix of their own. |
| kubeProxyTable = "kube-proxy" |
| |
| // base chains |
| filterPreroutingChain = "filter-prerouting" |
| filterInputChain = "filter-input" |
| filterForwardChain = "filter-forward" |
| filterOutputChain = "filter-output" |
| filterOutputPostDNATChain = "filter-output-post-dnat" |
| natPreroutingChain = "nat-prerouting" |
| natOutputChain = "nat-output" |
| natPostroutingChain = "nat-postrouting" |
| |
| // service dispatch |
| servicesChain = "services" |
| serviceIPsMap = "service-ips" |
| serviceNodePortsMap = "service-nodeports" |
| |
| // set of IPs that accept NodePort traffic |
| nodePortIPsSet = "nodeport-ips" |
| |
| // set of active ClusterIPs. |
| clusterIPsSet = "cluster-ips" |
| |
| // handling for services with no endpoints |
| serviceEndpointsCheckChain = "service-endpoints-check" |
| nodePortEndpointsCheckChain = "nodeport-endpoints-check" |
| noEndpointServicesMap = "no-endpoint-services" |
| noEndpointNodePortsMap = "no-endpoint-nodeports" |
| rejectChain = "reject-chain" |
| |
| // handling traffic to unallocated ClusterIPs and undefined ports of ClusterIPs |
| clusterIPsCheckChain = "cluster-ips-check" |
| |
| // LoadBalancerSourceRanges handling |
| firewallIPsMap = "firewall-ips" |
| firewallCheckChain = "firewall-check" |
| |
| // masquerading |
| markMasqChain = "mark-for-masquerade" |
| masqueradingChain = "masquerading" |
| ) |
| |
| // NewDualStackProxier creates a MetaProxier instance, with IPv4 and IPv6 proxies. |
| func NewDualStackProxier( |
| sysctl utilsysctl.Interface, |
| syncPeriod time.Duration, |
| minSyncPeriod time.Duration, |
| masqueradeAll bool, |
| masqueradeBit int, |
| localDetectors [2]proxyutiliptables.LocalTrafficDetector, |
| hostname string, |
| nodeIPs map[v1.IPFamily]net.IP, |
| recorder events.EventRecorder, |
| healthzServer *healthcheck.ProxierHealthServer, |
| nodePortAddresses []string, |
| initOnly bool, |
| ) (proxy.Provider, error) { |
| // Create an ipv4 instance of the single-stack proxier |
| ipv4Proxier, err := NewProxier(v1.IPv4Protocol, sysctl, |
| syncPeriod, minSyncPeriod, masqueradeAll, masqueradeBit, localDetectors[0], hostname, |
| nodeIPs[v1.IPv4Protocol], recorder, healthzServer, nodePortAddresses, initOnly) |
| if err != nil { |
| return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err) |
| } |
| |
| ipv6Proxier, err := NewProxier(v1.IPv6Protocol, sysctl, |
| syncPeriod, minSyncPeriod, masqueradeAll, masqueradeBit, localDetectors[1], hostname, |
| nodeIPs[v1.IPv6Protocol], recorder, healthzServer, nodePortAddresses, initOnly) |
| if err != nil { |
| return nil, fmt.Errorf("unable to create ipv6 proxier: %v", err) |
| } |
| if initOnly { |
| return nil, nil |
| } |
| return metaproxier.NewMetaProxier(ipv4Proxier, ipv6Proxier), nil |
| } |
| |
| // Proxier is an nftables based proxy |
| type Proxier struct { |
| // ipFamily defines the IP family which this proxier is tracking. |
| ipFamily v1.IPFamily |
| |
| // endpointsChanges and serviceChanges contains all changes to endpoints and |
| // services that happened since nftables was synced. For a single object, |
| // changes are accumulated, i.e. previous is state from before all of them, |
| // current is state after applying all of those. |
| endpointsChanges *proxy.EndpointsChangeTracker |
| serviceChanges *proxy.ServiceChangeTracker |
| |
| mu sync.Mutex // protects the following fields |
| svcPortMap proxy.ServicePortMap |
| endpointsMap proxy.EndpointsMap |
| nodeLabels map[string]string |
| // endpointSlicesSynced, and servicesSynced are set to true |
| // when corresponding objects are synced after startup. This is used to avoid |
| // updating nftables with some partial data after kube-proxy restart. |
| endpointSlicesSynced bool |
| servicesSynced bool |
| initialized int32 |
| syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules |
| syncPeriod time.Duration |
| flushed bool |
| |
| // These are effectively const and do not need the mutex to be held. |
| nftables knftables.Interface |
| masqueradeAll bool |
| masqueradeMark string |
| conntrack conntrack.Interface |
| localDetector proxyutiliptables.LocalTrafficDetector |
| hostname string |
| nodeIP net.IP |
| recorder events.EventRecorder |
| |
| serviceHealthServer healthcheck.ServiceHealthServer |
| healthzServer *healthcheck.ProxierHealthServer |
| |
| // nodePortAddresses selects the interfaces where nodePort works. |
| nodePortAddresses *proxyutil.NodePortAddresses |
| // networkInterfacer defines an interface for several net library functions. |
| // Inject for test purpose. |
| networkInterfacer proxyutil.NetworkInterfacer |
| |
| // staleChains contains information about chains to be deleted later |
| staleChains map[string]time.Time |
| |
| // serviceCIDRs is a comma separated list of ServiceCIDRs belonging to the IPFamily |
| // which proxier is operating on, can be directly consumed by knftables. |
| serviceCIDRs string |
| } |
| |
| // Proxier implements proxy.Provider |
| var _ proxy.Provider = &Proxier{} |
| |
| // NewProxier returns a new nftables Proxier. Once a proxier is created, it will keep |
| // nftables up to date in the background and will not terminate if a particular nftables |
| // call fails. |
| func NewProxier(ipFamily v1.IPFamily, |
| sysctl utilsysctl.Interface, |
| syncPeriod time.Duration, |
| minSyncPeriod time.Duration, |
| masqueradeAll bool, |
| masqueradeBit int, |
| localDetector proxyutiliptables.LocalTrafficDetector, |
| hostname string, |
| nodeIP net.IP, |
| recorder events.EventRecorder, |
| healthzServer *healthcheck.ProxierHealthServer, |
| nodePortAddressStrings []string, |
| initOnly bool, |
| ) (*Proxier, error) { |
| nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nodePortAddressStrings, nodeIP) |
| |
| if initOnly { |
| klog.InfoS("System initialized and --init-only specified") |
| return nil, nil |
| } |
| |
| // Generate the masquerade mark to use for SNAT rules. |
| masqueradeValue := 1 << uint(masqueradeBit) |
| masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue) |
| klog.V(2).InfoS("Using nftables mark for masquerade", "ipFamily", ipFamily, "mark", masqueradeMark) |
| |
| serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer) |
| |
| var nftablesFamily knftables.Family |
| if ipFamily == v1.IPv4Protocol { |
| nftablesFamily = knftables.IPv4Family |
| } else { |
| nftablesFamily = knftables.IPv6Family |
| } |
| nft, err := knftables.New(nftablesFamily, kubeProxyTable) |
| if err != nil { |
| return nil, err |
| } |
| |
| proxier := &Proxier{ |
| ipFamily: ipFamily, |
| svcPortMap: make(proxy.ServicePortMap), |
| serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), |
| endpointsMap: make(proxy.EndpointsMap), |
| endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil), |
| syncPeriod: syncPeriod, |
| nftables: nft, |
| masqueradeAll: masqueradeAll, |
| masqueradeMark: masqueradeMark, |
| conntrack: conntrack.NewExec(utilexec.New()), |
| localDetector: localDetector, |
| hostname: hostname, |
| nodeIP: nodeIP, |
| recorder: recorder, |
| serviceHealthServer: serviceHealthServer, |
| healthzServer: healthzServer, |
| nodePortAddresses: nodePortAddresses, |
| networkInterfacer: proxyutil.RealNetwork{}, |
| staleChains: make(map[string]time.Time), |
| } |
| |
| burstSyncs := 2 |
| klog.V(2).InfoS("NFTables sync params", "ipFamily", ipFamily, "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs) |
| proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) |
| |
| return proxier, nil |
| } |
| |
| // internal struct for string service information |
| type servicePortInfo struct { |
| *proxy.BaseServicePortInfo |
| // The following fields are computed and stored for performance reasons. |
| nameString string |
| clusterPolicyChainName string |
| localPolicyChainName string |
| externalChainName string |
| firewallChainName string |
| } |
| |
| // returns a new proxy.ServicePort which abstracts a serviceInfo |
| func newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *proxy.BaseServicePortInfo) proxy.ServicePort { |
| svcPort := &servicePortInfo{BaseServicePortInfo: bsvcPortInfo} |
| |
| // Store the following for performance reasons. |
| svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} |
| svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name} |
| svcPort.nameString = svcPortName.String() |
| |
| chainNameBase := servicePortChainNameBase(&svcPortName, strings.ToLower(string(svcPort.Protocol()))) |
| svcPort.clusterPolicyChainName = servicePortPolicyClusterChainNamePrefix + chainNameBase |
| svcPort.localPolicyChainName = servicePortPolicyLocalChainNamePrefix + chainNameBase |
| svcPort.externalChainName = serviceExternalChainNamePrefix + chainNameBase |
| svcPort.firewallChainName = servicePortFirewallChainNamePrefix + chainNameBase |
| |
| return svcPort |
| } |
| |
| // internal struct for endpoints information |
| type endpointInfo struct { |
| *proxy.BaseEndpointInfo |
| |
| chainName string |
| affinitySetName string |
| } |
| |
| // returns a new proxy.Endpoint which abstracts a endpointInfo |
| func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, svcPortName *proxy.ServicePortName) proxy.Endpoint { |
| chainNameBase := servicePortEndpointChainNameBase(svcPortName, strings.ToLower(string(svcPortName.Protocol)), baseInfo.String()) |
| return &endpointInfo{ |
| BaseEndpointInfo: baseInfo, |
| chainName: servicePortEndpointChainNamePrefix + chainNameBase, |
| affinitySetName: servicePortEndpointAffinityNamePrefix + chainNameBase, |
| } |
| } |
| |
| // nftablesBaseChains lists our "base chains"; those that are directly connected to the |
| // netfilter hooks (e.g., "postrouting", "input", etc.), as opposed to "regular" chains, |
| // which are only run when a rule jumps to them. See |
| // https://wiki.nftables.org/wiki-nftables/index.php/Configuring_chains. |
| // |
| // These are set up from setupNFTables() and then not directly referenced by |
| // syncProxyRules(). |
| // |
| // All of our base chains have names that are just "${type}-${hook}". e.g., "nat-prerouting". |
| type nftablesBaseChain struct { |
| name string |
| chainType knftables.BaseChainType |
| hook knftables.BaseChainHook |
| priority knftables.BaseChainPriority |
| } |
| |
| var nftablesBaseChains = []nftablesBaseChain{ |
| // We want our filtering rules to operate on pre-DNAT dest IPs, so our filter |
| // chains have to run before DNAT. |
| {filterPreroutingChain, knftables.FilterType, knftables.PreroutingHook, knftables.DNATPriority + "-10"}, |
| {filterInputChain, knftables.FilterType, knftables.InputHook, knftables.DNATPriority + "-10"}, |
| {filterForwardChain, knftables.FilterType, knftables.ForwardHook, knftables.DNATPriority + "-10"}, |
| {filterOutputChain, knftables.FilterType, knftables.OutputHook, knftables.DNATPriority + "-10"}, |
| {filterOutputPostDNATChain, knftables.FilterType, knftables.OutputHook, knftables.DNATPriority + "+10"}, |
| {natPreroutingChain, knftables.NATType, knftables.PreroutingHook, knftables.DNATPriority}, |
| {natOutputChain, knftables.NATType, knftables.OutputHook, knftables.DNATPriority}, |
| {natPostroutingChain, knftables.NATType, knftables.PostroutingHook, knftables.SNATPriority}, |
| } |
| |
| // nftablesJumpChains lists our top-level "regular chains" that are jumped to directly |
| // from one of the base chains. These are set up from setupNFTables(), and some of them |
| // are also referenced in syncProxyRules(). |
| type nftablesJumpChain struct { |
| dstChain string |
| srcChain string |
| extraArgs string |
| } |
| |
| var nftablesJumpChains = []nftablesJumpChain{ |
| // We can't jump to endpointsCheckChain from filter-prerouting like |
| // firewallCheckChain because reject action is only valid in chains using the |
| // input, forward or output hooks with kernels before 5.9. |
| {nodePortEndpointsCheckChain, filterInputChain, "ct state new"}, |
| {serviceEndpointsCheckChain, filterInputChain, "ct state new"}, |
| {serviceEndpointsCheckChain, filterForwardChain, "ct state new"}, |
| {serviceEndpointsCheckChain, filterOutputChain, "ct state new"}, |
| |
| {firewallCheckChain, filterPreroutingChain, "ct state new"}, |
| {firewallCheckChain, filterOutputChain, "ct state new"}, |
| |
| {servicesChain, natOutputChain, ""}, |
| {servicesChain, natPreroutingChain, ""}, |
| {masqueradingChain, natPostroutingChain, ""}, |
| |
| {clusterIPsCheckChain, filterForwardChain, "ct state new"}, |
| {clusterIPsCheckChain, filterOutputPostDNATChain, "ct state new"}, |
| } |
| |
| // ensureChain adds commands to tx to ensure that chain exists and doesn't contain |
| // anything from before this transaction (using createdChains to ensure that we don't |
| // Flush a chain more than once and lose *new* rules as well.) |
| func ensureChain(chain string, tx *knftables.Transaction, createdChains sets.Set[string]) { |
| if createdChains.Has(chain) { |
| return |
| } |
| tx.Add(&knftables.Chain{ |
| Name: chain, |
| }) |
| tx.Flush(&knftables.Chain{ |
| Name: chain, |
| }) |
| createdChains.Insert(chain) |
| } |
| |
| func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) { |
| ipX := "ip" |
| ipvX_addr := "ipv4_addr" //nolint:stylecheck // var name intentionally resembles value |
| noLocalhost := "ip daddr != 127.0.0.0/8" |
| if proxier.ipFamily == v1.IPv6Protocol { |
| ipX = "ip6" |
| ipvX_addr = "ipv6_addr" |
| noLocalhost = "ip6 daddr != ::1" |
| } |
| |
| tx.Add(&knftables.Table{ |
| Comment: ptr.To("rules for kube-proxy"), |
| }) |
| |
| // Do an extra "add+delete" once to ensure all previous base chains in the table |
| // will be recreated. Otherwise, altering properties (e.g. priority) of these |
| // chains would fail the transaction. |
| if !proxier.flushed { |
| for _, bc := range nftablesBaseChains { |
| chain := &knftables.Chain{ |
| Name: bc.name, |
| } |
| tx.Add(chain) |
| tx.Delete(chain) |
| } |
| proxier.flushed = true |
| } |
| |
| // Create and flush base chains |
| for _, bc := range nftablesBaseChains { |
| chain := &knftables.Chain{ |
| Name: bc.name, |
| Type: ptr.To(bc.chainType), |
| Hook: ptr.To(bc.hook), |
| Priority: ptr.To(bc.priority), |
| } |
| tx.Add(chain) |
| tx.Flush(chain) |
| } |
| |
| // Create and flush ordinary chains and add rules jumping to them |
| createdChains := sets.New[string]() |
| for _, c := range nftablesJumpChains { |
| ensureChain(c.dstChain, tx, createdChains) |
| tx.Add(&knftables.Rule{ |
| Chain: c.srcChain, |
| Rule: knftables.Concat( |
| c.extraArgs, |
| "jump", c.dstChain, |
| ), |
| }) |
| } |
| |
| // Ensure all of our other "top-level" chains exist |
| for _, chain := range []string{servicesChain, clusterIPsCheckChain, masqueradingChain, markMasqChain} { |
| ensureChain(chain, tx, createdChains) |
| } |
| |
| // Add the rules in the mark-for-masquerade and masquerading chains |
| tx.Add(&knftables.Rule{ |
| Chain: markMasqChain, |
| Rule: knftables.Concat( |
| "mark", "set", "mark", "or", proxier.masqueradeMark, |
| ), |
| }) |
| |
| tx.Add(&knftables.Rule{ |
| Chain: masqueradingChain, |
| Rule: knftables.Concat( |
| "mark", "and", proxier.masqueradeMark, "==", "0", |
| "return", |
| ), |
| }) |
| tx.Add(&knftables.Rule{ |
| Chain: masqueradingChain, |
| Rule: knftables.Concat( |
| "mark", "set", "mark", "xor", proxier.masqueradeMark, |
| ), |
| }) |
| tx.Add(&knftables.Rule{ |
| Chain: masqueradingChain, |
| Rule: "masquerade fully-random", |
| }) |
| |
| // add cluster-ips set. |
| tx.Add(&knftables.Set{ |
| Name: clusterIPsSet, |
| Type: ipvX_addr, |
| Comment: ptr.To("Active ClusterIPs"), |
| }) |
| |
| // reject traffic to invalid ports of ClusterIPs. |
| tx.Add(&knftables.Rule{ |
| Chain: clusterIPsCheckChain, |
| Rule: knftables.Concat( |
| ipX, "daddr", "@", clusterIPsSet, "reject", |
| ), |
| Comment: ptr.To("Reject traffic to invalid ports of ClusterIPs"), |
| }) |
| |
| // drop traffic to unallocated ClusterIPs. |
| if len(proxier.serviceCIDRs) > 0 { |
| tx.Add(&knftables.Rule{ |
| Chain: clusterIPsCheckChain, |
| Rule: knftables.Concat( |
| ipX, "daddr", "{", proxier.serviceCIDRs, "}", |
| "drop", |
| ), |
| Comment: ptr.To("Drop traffic to unallocated ClusterIPs"), |
| }) |
| } |
| |
| // Fill in nodeport-ips set if needed (or delete it if not). (We do "add+delete" |
| // rather than just "delete" when we want to ensure the set doesn't exist, because |
| // doing just "delete" would return an error if the set didn't exist.) |
| tx.Add(&knftables.Set{ |
| Name: nodePortIPsSet, |
| Type: ipvX_addr, |
| Comment: ptr.To("IPs that accept NodePort traffic"), |
| }) |
| if proxier.nodePortAddresses.MatchAll() { |
| tx.Delete(&knftables.Set{ |
| Name: nodePortIPsSet, |
| }) |
| } else { |
| tx.Flush(&knftables.Set{ |
| Name: nodePortIPsSet, |
| }) |
| nodeIPs, err := proxier.nodePortAddresses.GetNodeIPs(proxier.networkInterfacer) |
| if err != nil { |
| klog.ErrorS(err, "Failed to get node ip address matching nodeport cidrs, services with nodeport may not work as intended", "CIDRs", proxier.nodePortAddresses) |
| } |
| for _, ip := range nodeIPs { |
| if ip.IsLoopback() { |
| klog.ErrorS(nil, "--nodeport-addresses includes localhost but localhost NodePorts are not supported", "address", ip.String()) |
| continue |
| } |
| tx.Add(&knftables.Element{ |
| Set: nodePortIPsSet, |
| Key: []string{ |
| ip.String(), |
| }, |
| }) |
| } |
| } |
| |
| // Set up "no endpoints" drop/reject handling |
| tx.Add(&knftables.Map{ |
| Name: noEndpointServicesMap, |
| Type: ipvX_addr + " . inet_proto . inet_service : verdict", |
| Comment: ptr.To("vmap to drop or reject packets to services with no endpoints"), |
| }) |
| tx.Add(&knftables.Map{ |
| Name: noEndpointNodePortsMap, |
| Type: "inet_proto . inet_service : verdict", |
| Comment: ptr.To("vmap to drop or reject packets to service nodeports with no endpoints"), |
| }) |
| |
| tx.Add(&knftables.Chain{ |
| Name: rejectChain, |
| Comment: ptr.To("helper for @no-endpoint-services / @no-endpoint-nodeports"), |
| }) |
| tx.Flush(&knftables.Chain{ |
| Name: rejectChain, |
| }) |
| tx.Add(&knftables.Rule{ |
| Chain: rejectChain, |
| Rule: "reject", |
| }) |
| |
| tx.Add(&knftables.Rule{ |
| Chain: serviceEndpointsCheckChain, |
| Rule: knftables.Concat( |
| ipX, "daddr", ".", "meta l4proto", ".", "th dport", |
| "vmap", "@", noEndpointServicesMap, |
| ), |
| }) |
| |
| if proxier.nodePortAddresses.MatchAll() { |
| tx.Add(&knftables.Rule{ |
| Chain: nodePortEndpointsCheckChain, |
| Rule: knftables.Concat( |
| noLocalhost, |
| "meta l4proto . th dport", |
| "vmap", "@", noEndpointNodePortsMap, |
| ), |
| }) |
| } else { |
| tx.Add(&knftables.Rule{ |
| Chain: nodePortEndpointsCheckChain, |
| Rule: knftables.Concat( |
| ipX, "daddr", "@", nodePortIPsSet, |
| "meta l4proto . th dport", |
| "vmap", "@", noEndpointNodePortsMap, |
| ), |
| }) |
| } |
| |
| // Set up LoadBalancerSourceRanges firewalling |
| tx.Add(&knftables.Map{ |
| Name: firewallIPsMap, |
| Type: ipvX_addr + " . inet_proto . inet_service : verdict", |
| Comment: ptr.To("destinations that are subject to LoadBalancerSourceRanges"), |
| }) |
| |
| ensureChain(firewallCheckChain, tx, createdChains) |
| tx.Add(&knftables.Rule{ |
| Chain: firewallCheckChain, |
| Rule: knftables.Concat( |
| ipX, "daddr", ".", "meta l4proto", ".", "th dport", |
| "vmap", "@", firewallIPsMap, |
| ), |
| }) |
| |
| // Set up service dispatch |
| tx.Add(&knftables.Map{ |
| Name: serviceIPsMap, |
| Type: ipvX_addr + " . inet_proto . inet_service : verdict", |
| Comment: ptr.To("ClusterIP, ExternalIP and LoadBalancer IP traffic"), |
| }) |
| tx.Add(&knftables.Map{ |
| Name: serviceNodePortsMap, |
| Type: "inet_proto . inet_service : verdict", |
| Comment: ptr.To("NodePort traffic"), |
| }) |
| tx.Add(&knftables.Rule{ |
| Chain: servicesChain, |
| Rule: knftables.Concat( |
| ipX, "daddr", ".", "meta l4proto", ".", "th dport", |
| "vmap", "@", serviceIPsMap, |
| ), |
| }) |
| if proxier.nodePortAddresses.MatchAll() { |
| tx.Add(&knftables.Rule{ |
| Chain: servicesChain, |
| Rule: knftables.Concat( |
| "fib daddr type local", |
| noLocalhost, |
| "meta l4proto . th dport", |
| "vmap", "@", serviceNodePortsMap, |
| ), |
| }) |
| } else { |
| tx.Add(&knftables.Rule{ |
| Chain: servicesChain, |
| Rule: knftables.Concat( |
| ipX, "daddr @nodeport-ips", |
| "meta l4proto . th dport", |
| "vmap", "@", serviceNodePortsMap, |
| ), |
| }) |
| } |
| } |
| |
| // CleanupLeftovers removes all nftables rules and chains created by the Proxier |
| // It returns true if an error was encountered. Errors are logged. |
| func CleanupLeftovers() bool { |
| var encounteredError bool |
| |
| for _, family := range []knftables.Family{knftables.IPv4Family, knftables.IPv6Family} { |
| nft, err := knftables.New(family, kubeProxyTable) |
| if err == nil { |
| tx := nft.NewTransaction() |
| tx.Delete(&knftables.Table{}) |
| err = nft.Run(context.TODO(), tx) |
| } |
| if err != nil && !knftables.IsNotFound(err) { |
| klog.ErrorS(err, "Error cleaning up nftables rules") |
| encounteredError = true |
| } |
| } |
| |
| return encounteredError |
| } |
| |
| // Sync is called to synchronize the proxier state to nftables as soon as possible. |
| func (proxier *Proxier) Sync() { |
| if proxier.healthzServer != nil { |
| proxier.healthzServer.QueuedUpdate(proxier.ipFamily) |
| } |
| metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime() |
| proxier.syncRunner.Run() |
| } |
| |
| // SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return. |
| func (proxier *Proxier) SyncLoop() { |
| // Update healthz timestamp at beginning in case Sync() never succeeds. |
| if proxier.healthzServer != nil { |
| proxier.healthzServer.Updated(proxier.ipFamily) |
| } |
| |
| // synthesize "last change queued" time as the informers are syncing. |
| metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime() |
| proxier.syncRunner.Loop(wait.NeverStop) |
| } |
| |
| func (proxier *Proxier) setInitialized(value bool) { |
| var initialized int32 |
| if value { |
| initialized = 1 |
| } |
| atomic.StoreInt32(&proxier.initialized, initialized) |
| } |
| |
| func (proxier *Proxier) isInitialized() bool { |
| return atomic.LoadInt32(&proxier.initialized) > 0 |
| } |
| |
| // OnServiceAdd is called whenever creation of new service object |
| // is observed. |
| func (proxier *Proxier) OnServiceAdd(service *v1.Service) { |
| proxier.OnServiceUpdate(nil, service) |
| } |
| |
| // OnServiceUpdate is called whenever modification of an existing |
| // service object is observed. |
| func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) { |
| if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() { |
| proxier.Sync() |
| } |
| } |
| |
| // OnServiceDelete is called whenever deletion of an existing service |
| // object is observed. |
| func (proxier *Proxier) OnServiceDelete(service *v1.Service) { |
| proxier.OnServiceUpdate(service, nil) |
| |
| } |
| |
| // OnServiceSynced is called once all the initial event handlers were |
| // called and the state is fully propagated to local cache. |
| func (proxier *Proxier) OnServiceSynced() { |
| proxier.mu.Lock() |
| proxier.servicesSynced = true |
| proxier.setInitialized(proxier.endpointSlicesSynced) |
| proxier.mu.Unlock() |
| |
| // Sync unconditionally - this is called once per lifetime. |
| proxier.syncProxyRules() |
| } |
| |
| // OnEndpointSliceAdd is called whenever creation of a new endpoint slice object |
| // is observed. |
| func (proxier *Proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) { |
| if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() { |
| proxier.Sync() |
| } |
| } |
| |
| // OnEndpointSliceUpdate is called whenever modification of an existing endpoint |
| // slice object is observed. |
| func (proxier *Proxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) { |
| if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() { |
| proxier.Sync() |
| } |
| } |
| |
| // OnEndpointSliceDelete is called whenever deletion of an existing endpoint slice |
| // object is observed. |
| func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) { |
| if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, true) && proxier.isInitialized() { |
| proxier.Sync() |
| } |
| } |
| |
| // OnEndpointSlicesSynced is called once all the initial event handlers were |
| // called and the state is fully propagated to local cache. |
| func (proxier *Proxier) OnEndpointSlicesSynced() { |
| proxier.mu.Lock() |
| proxier.endpointSlicesSynced = true |
| proxier.setInitialized(proxier.servicesSynced) |
| proxier.mu.Unlock() |
| |
| // Sync unconditionally - this is called once per lifetime. |
| proxier.syncProxyRules() |
| } |
| |
| // OnNodeAdd is called whenever creation of new node object |
| // is observed. |
| func (proxier *Proxier) OnNodeAdd(node *v1.Node) { |
| if node.Name != proxier.hostname { |
| klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", |
| "eventNode", node.Name, "currentNode", proxier.hostname) |
| return |
| } |
| |
| if reflect.DeepEqual(proxier.nodeLabels, node.Labels) { |
| return |
| } |
| |
| proxier.mu.Lock() |
| proxier.nodeLabels = map[string]string{} |
| for k, v := range node.Labels { |
| proxier.nodeLabels[k] = v |
| } |
| proxier.mu.Unlock() |
| klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels) |
| |
| proxier.Sync() |
| } |
| |
| // OnNodeUpdate is called whenever modification of an existing |
| // node object is observed. |
| func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) { |
| if node.Name != proxier.hostname { |
| klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", |
| "eventNode", node.Name, "currentNode", proxier.hostname) |
| return |
| } |
| |
| if reflect.DeepEqual(proxier.nodeLabels, node.Labels) { |
| return |
| } |
| |
| proxier.mu.Lock() |
| proxier.nodeLabels = map[string]string{} |
| for k, v := range node.Labels { |
| proxier.nodeLabels[k] = v |
| } |
| proxier.mu.Unlock() |
| klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels) |
| |
| proxier.Sync() |
| } |
| |
| // OnNodeDelete is called whenever deletion of an existing node |
| // object is observed. |
| func (proxier *Proxier) OnNodeDelete(node *v1.Node) { |
| if node.Name != proxier.hostname { |
| klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", |
| "eventNode", node.Name, "currentNode", proxier.hostname) |
| return |
| } |
| |
| proxier.mu.Lock() |
| proxier.nodeLabels = nil |
| proxier.mu.Unlock() |
| |
| proxier.Sync() |
| } |
| |
| // OnNodeSynced is called once all the initial event handlers were |
| // called and the state is fully propagated to local cache. |
| func (proxier *Proxier) OnNodeSynced() { |
| } |
| |
| // OnServiceCIDRsChanged is called whenever a change is observed |
| // in any of the ServiceCIDRs, and provides complete list of service cidrs. |
| func (proxier *Proxier) OnServiceCIDRsChanged(cidrs []string) { |
| proxier.mu.Lock() |
| defer proxier.mu.Unlock() |
| |
| cidrsForProxier := make([]string, 0) |
| for _, cidr := range cidrs { |
| isIPv4CIDR := netutils.IsIPv4CIDRString(cidr) |
| if proxier.ipFamily == v1.IPv4Protocol && isIPv4CIDR { |
| cidrsForProxier = append(cidrsForProxier, cidr) |
| } |
| |
| if proxier.ipFamily == v1.IPv6Protocol && !isIPv4CIDR { |
| cidrsForProxier = append(cidrsForProxier, cidr) |
| } |
| } |
| proxier.serviceCIDRs = strings.Join(cidrsForProxier, ",") |
| } |
| |
| const ( |
| // Maximum length for one of our chain name prefixes, including the trailing |
| // hyphen. |
| chainNamePrefixLengthMax = 16 |
| |
| // Maximum length of the string returned from servicePortChainNameBase or |
| // servicePortEndpointChainNameBase. |
| chainNameBaseLengthMax = knftables.NameLengthMax - chainNamePrefixLengthMax |
| ) |
| |
| const ( |
| servicePortPolicyClusterChainNamePrefix = "service-" |
| servicePortPolicyLocalChainNamePrefix = "local-" |
| serviceExternalChainNamePrefix = "external-" |
| servicePortEndpointChainNamePrefix = "endpoint-" |
| servicePortEndpointAffinityNamePrefix = "affinity-" |
| servicePortFirewallChainNamePrefix = "firewall-" |
| ) |
| |
| // hashAndTruncate prefixes name with a hash of itself and then truncates to |
| // chainNameBaseLengthMax. The hash ensures that (a) the name is still unique if we have |
| // to truncate the end, and (b) it's visually distinguishable from other chains that would |
| // otherwise have nearly identical names (e.g., different endpoint chains for a given |
| // service that differ in only a single digit). |
| func hashAndTruncate(name string) string { |
| hash := sha256.Sum256([]byte(name)) |
| encoded := base32.StdEncoding.EncodeToString(hash[:]) |
| name = encoded[:8] + "-" + name |
| if len(name) > chainNameBaseLengthMax { |
| name = name[:chainNameBaseLengthMax-3] + "..." |
| } |
| return name |
| } |
| |
| // servicePortChainNameBase returns the base name for a chain for the given ServicePort. |
| // This is something like "HASH-namespace/serviceName/protocol/portName", e.g, |
| // "ULMVA6XW-ns1/svc1/tcp/p80". |
| func servicePortChainNameBase(servicePortName *proxy.ServicePortName, protocol string) string { |
| // nftables chains can contain the characters [A-Za-z0-9_./-] (but must start with |
| // a letter, underscore, or dot). |
| // |
| // Namespace, Service, and Port names can contain [a-z0-9-] (with some additional |
| // restrictions that aren't relevant here). |
| // |
| // Protocol is /(tcp|udp|sctp)/. |
| // |
| // Thus, we can safely use all Namespace names, Service names, protocol values, |
| // and Port names directly in nftables chain names (though note that this assumes |
| // that the chain name won't *start* with any of those strings, since that might |
| // be illegal). We use "/" to separate the parts of the name, which is one of the |
| // two characters allowed in a chain name that isn't allowed in our input strings. |
| |
| name := fmt.Sprintf("%s/%s/%s/%s", |
| servicePortName.NamespacedName.Namespace, |
| servicePortName.NamespacedName.Name, |
| protocol, |
| servicePortName.Port, |
| ) |
| |
| // The namespace, service, and port name can each be up to 63 characters, protocol |
| // can be up to 4, plus 8 for the hash and 4 additional punctuation characters. |
| // That's a total of 205, which is less than chainNameBaseLengthMax (240). So this |
| // will never actually return a truncated name. |
| return hashAndTruncate(name) |
| } |
| |
| // servicePortEndpointChainNameBase returns the suffix for chain names for the given |
| // endpoint. This is something like |
| // "HASH-namespace/serviceName/protocol/portName__endpointIP/endpointport", e.g., |
| // "5OJB2KTY-ns1/svc1/tcp/p80__10.180.0.1/80". |
| func servicePortEndpointChainNameBase(servicePortName *proxy.ServicePortName, protocol, endpoint string) string { |
| // As above in servicePortChainNameBase: Namespace, Service, Port, Protocol, and |
| // EndpointPort are all safe to copy into the chain name directly. But if |
| // EndpointIP is IPv6 then it will contain colons, which aren't allowed in a chain |
| // name. IPv6 IPs are also quite long, but we can't safely truncate them (e.g. to |
| // only the final segment) because (especially for manually-created external |
| // endpoints), we can't know for sure that any part of them is redundant. |
| |
| endpointIP, endpointPort, _ := net.SplitHostPort(endpoint) |
| if strings.Contains(endpointIP, ":") { |
| endpointIP = strings.ReplaceAll(endpointIP, ":", ".") |
| } |
| |
| // As above, we use "/" to separate parts of the name, and "__" to separate the |
| // "service" part from the "endpoint" part. |
| name := fmt.Sprintf("%s/%s/%s/%s__%s/%s", |
| servicePortName.NamespacedName.Namespace, |
| servicePortName.NamespacedName.Name, |
| protocol, |
| servicePortName.Port, |
| endpointIP, |
| endpointPort, |
| ) |
| |
| // The part of name before the "__" can be up to 205 characters (as with |
| // servicePortChainNameBase above). An IPv6 address can be up to 39 characters, and |
| // a port can be up to 5 digits, plus 3 punctuation characters gives a max total |
| // length of 252, well over chainNameBaseLengthMax (240), so truncation is |
| // theoretically possible (though incredibly unlikely). |
| return hashAndTruncate(name) |
| } |
| |
| func isServiceChainName(chainString string) bool { |
| // The chains returned from servicePortChainNameBase and |
| // servicePortEndpointChainNameBase will always have at least one "/" in them. |
| // Since none of our "stock" chain names use slashes, we can distinguish them this |
| // way. |
| return strings.Contains(chainString, "/") |
| } |
| |
| func isAffinitySetName(set string) bool { |
| return strings.HasPrefix(set, servicePortEndpointAffinityNamePrefix) |
| } |
| |
| // This is where all of the nftables calls happen. |
| // This assumes proxier.mu is NOT held |
| func (proxier *Proxier) syncProxyRules() { |
| proxier.mu.Lock() |
| defer proxier.mu.Unlock() |
| |
| // don't sync rules till we've received services and endpoints |
| if !proxier.isInitialized() { |
| klog.V(2).InfoS("Not syncing nftables until Services and Endpoints have been received from master") |
| return |
| } |
| |
| // |
| // Below this point we will not return until we try to write the nftables rules. |
| // |
| |
| // Keep track of how long syncs take. |
| start := time.Now() |
| defer func() { |
| metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start)) |
| klog.V(2).InfoS("SyncProxyRules complete", "elapsed", time.Since(start)) |
| }() |
| |
| serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges) |
| endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges) |
| |
| klog.V(2).InfoS("Syncing nftables rules") |
| |
| success := false |
| defer func() { |
| if !success { |
| klog.InfoS("Sync failed", "retryingTime", proxier.syncPeriod) |
| proxier.syncRunner.RetryAfter(proxier.syncPeriod) |
| } |
| }() |
| |
| // If there are sufficiently-stale chains left over from previous transactions, |
| // try to delete them now. |
| if len(proxier.staleChains) > 0 { |
| oneSecondAgo := start.Add(-time.Second) |
| tx := proxier.nftables.NewTransaction() |
| deleted := 0 |
| for chain, modtime := range proxier.staleChains { |
| if modtime.Before(oneSecondAgo) { |
| tx.Delete(&knftables.Chain{ |
| Name: chain, |
| }) |
| delete(proxier.staleChains, chain) |
| deleted++ |
| } |
| } |
| if deleted > 0 { |
| klog.InfoS("Deleting stale nftables chains", "numChains", deleted) |
| err := proxier.nftables.Run(context.TODO(), tx) |
| if err != nil { |
| // We already deleted the entries from staleChains, but if |
| // the chains still exist, they'll just get added back |
| // (with a later timestamp) at the end of the sync. |
| klog.ErrorS(err, "Unable to delete stale chains; will retry later") |
| // FIXME: metric |
| } |
| } |
| } |
| |
| // Now start the actual syncing transaction |
| tx := proxier.nftables.NewTransaction() |
| proxier.setupNFTables(tx) |
| |
| // We need to use, eg, "ip daddr" for IPv4 but "ip6 daddr" for IPv6 |
| ipX := "ip" |
| ipvX_addr := "ipv4_addr" //nolint:stylecheck // var name intentionally resembles value |
| if proxier.ipFamily == v1.IPv6Protocol { |
| ipX = "ip6" |
| ipvX_addr = "ipv6_addr" |
| } |
| |
| // We currently fully-rebuild our sets and maps on each resync |
| tx.Flush(&knftables.Set{ |
| Name: clusterIPsSet, |
| }) |
| tx.Flush(&knftables.Map{ |
| Name: firewallIPsMap, |
| }) |
| tx.Flush(&knftables.Map{ |
| Name: noEndpointServicesMap, |
| }) |
| tx.Flush(&knftables.Map{ |
| Name: noEndpointNodePortsMap, |
| }) |
| tx.Flush(&knftables.Map{ |
| Name: serviceIPsMap, |
| }) |
| tx.Flush(&knftables.Map{ |
| Name: serviceNodePortsMap, |
| }) |
| |
| // Accumulate service/endpoint chains and affinity sets to keep. |
| activeChains := sets.New[string]() |
| activeAffinitySets := sets.New[string]() |
| |
| // Compute total number of endpoint chains across all services |
| // to get a sense of how big the cluster is. |
| totalEndpoints := 0 |
| for svcName := range proxier.svcPortMap { |
| totalEndpoints += len(proxier.endpointsMap[svcName]) |
| } |
| |
| // These two variables are used to publish the sync_proxy_rules_no_endpoints_total |
| // metric. |
| serviceNoLocalEndpointsTotalInternal := 0 |
| serviceNoLocalEndpointsTotalExternal := 0 |
| |
| // Build rules for each service-port. |
| for svcName, svc := range proxier.svcPortMap { |
| svcInfo, ok := svc.(*servicePortInfo) |
| if !ok { |
| klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName) |
| continue |
| } |
| protocol := strings.ToLower(string(svcInfo.Protocol())) |
| svcPortNameString := svcInfo.nameString |
| |
| // Figure out the endpoints for Cluster and Local traffic policy. |
| // allLocallyReachableEndpoints is the set of all endpoints that can be routed to |
| // from this node, given the service's traffic policies. hasEndpoints is true |
| // if the service has any usable endpoints on any node, not just this one. |
| allEndpoints := proxier.endpointsMap[svcName] |
| clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeLabels) |
| |
| // Note the endpoint chains that will be used |
| for _, ep := range allLocallyReachableEndpoints { |
| if epInfo, ok := ep.(*endpointInfo); ok { |
| ensureChain(epInfo.chainName, tx, activeChains) |
| } |
| } |
| |
| // clusterPolicyChain contains the endpoints used with "Cluster" traffic policy |
| clusterPolicyChain := svcInfo.clusterPolicyChainName |
| usesClusterPolicyChain := len(clusterEndpoints) > 0 && svcInfo.UsesClusterEndpoints() |
| if usesClusterPolicyChain { |
| ensureChain(clusterPolicyChain, tx, activeChains) |
| } |
| |
| // localPolicyChain contains the endpoints used with "Local" traffic policy |
| localPolicyChain := svcInfo.localPolicyChainName |
| usesLocalPolicyChain := len(localEndpoints) > 0 && svcInfo.UsesLocalEndpoints() |
| if usesLocalPolicyChain { |
| ensureChain(localPolicyChain, tx, activeChains) |
| } |
| |
| // internalPolicyChain is the chain containing the endpoints for |
| // "internal" (ClusterIP) traffic. internalTrafficChain is the chain that |
| // internal traffic is routed to (which is always the same as |
| // internalPolicyChain). hasInternalEndpoints is true if we should |
| // generate rules pointing to internalTrafficChain, or false if there are |
| // no available internal endpoints. |
| internalPolicyChain := clusterPolicyChain |
| hasInternalEndpoints := hasEndpoints |
| if svcInfo.InternalPolicyLocal() { |
| internalPolicyChain = localPolicyChain |
| if len(localEndpoints) == 0 { |
| hasInternalEndpoints = false |
| } |
| } |
| internalTrafficChain := internalPolicyChain |
| |
| // Similarly, externalPolicyChain is the chain containing the endpoints |
| // for "external" (NodePort, LoadBalancer, and ExternalIP) traffic. |
| // externalTrafficChain is the chain that external traffic is routed to |
| // (which is always the service's "EXT" chain). hasExternalEndpoints is |
| // true if there are endpoints that will be reached by external traffic. |
| // (But we may still have to generate externalTrafficChain even if there |
| // are no external endpoints, to ensure that the short-circuit rules for |
| // local traffic are set up.) |
| externalPolicyChain := clusterPolicyChain |
| hasExternalEndpoints := hasEndpoints |
| if svcInfo.ExternalPolicyLocal() { |
| externalPolicyChain = localPolicyChain |
| if len(localEndpoints) == 0 { |
| hasExternalEndpoints = false |
| } |
| } |
| externalTrafficChain := svcInfo.externalChainName // eventually jumps to externalPolicyChain |
| |
| // usesExternalTrafficChain is based on hasEndpoints, not hasExternalEndpoints, |
| // because we need the local-traffic-short-circuiting rules even when there |
| // are no externally-usable endpoints. |
| usesExternalTrafficChain := hasEndpoints && svcInfo.ExternallyAccessible() |
| if usesExternalTrafficChain { |
| ensureChain(externalTrafficChain, tx, activeChains) |
| } |
| |
| var internalTrafficFilterVerdict, externalTrafficFilterVerdict string |
| if !hasEndpoints { |
| // The service has no endpoints at all; hasInternalEndpoints and |
| // hasExternalEndpoints will also be false, and we will not |
| // generate any chains in the "nat" table for the service; only |
| // rules in the "filter" table rejecting incoming packets for |
| // the service's IPs. |
| internalTrafficFilterVerdict = fmt.Sprintf("goto %s", rejectChain) |
| externalTrafficFilterVerdict = fmt.Sprintf("goto %s", rejectChain) |
| } else { |
| if !hasInternalEndpoints { |
| // The internalTrafficPolicy is "Local" but there are no local |
| // endpoints. Traffic to the clusterIP will be dropped, but |
| // external traffic may still be accepted. |
| internalTrafficFilterVerdict = "drop" |
| serviceNoLocalEndpointsTotalInternal++ |
| } |
| if !hasExternalEndpoints { |
| // The externalTrafficPolicy is "Local" but there are no |
| // local endpoints. Traffic to "external" IPs from outside |
| // the cluster will be dropped, but traffic from inside |
| // the cluster may still be accepted. |
| externalTrafficFilterVerdict = "drop" |
| serviceNoLocalEndpointsTotalExternal++ |
| } |
| } |
| |
| // Capture the clusterIP. |
| tx.Add(&knftables.Element{ |
| Set: clusterIPsSet, |
| Key: []string{svcInfo.ClusterIP().String()}, |
| }) |
| if hasInternalEndpoints { |
| tx.Add(&knftables.Element{ |
| Map: serviceIPsMap, |
| Key: []string{ |
| svcInfo.ClusterIP().String(), |
| protocol, |
| strconv.Itoa(svcInfo.Port()), |
| }, |
| Value: []string{ |
| fmt.Sprintf("goto %s", internalTrafficChain), |
| }, |
| }) |
| } else { |
| // No endpoints. |
| tx.Add(&knftables.Element{ |
| Map: noEndpointServicesMap, |
| Key: []string{ |
| svcInfo.ClusterIP().String(), |
| protocol, |
| strconv.Itoa(svcInfo.Port()), |
| }, |
| Value: []string{ |
| internalTrafficFilterVerdict, |
| }, |
| Comment: &svcPortNameString, |
| }) |
| } |
| |
| // Capture externalIPs. |
| for _, externalIP := range svcInfo.ExternalIPs() { |
| if hasEndpoints { |
| // Send traffic bound for external IPs to the "external |
| // destinations" chain. |
| tx.Add(&knftables.Element{ |
| Map: serviceIPsMap, |
| Key: []string{ |
| externalIP.String(), |
| protocol, |
| strconv.Itoa(svcInfo.Port()), |
| }, |
| Value: []string{ |
| fmt.Sprintf("goto %s", externalTrafficChain), |
| }, |
| }) |
| } |
| if !hasExternalEndpoints { |
| // Either no endpoints at all (REJECT) or no endpoints for |
| // external traffic (DROP anything that didn't get |
| // short-circuited by the EXT chain.) |
| tx.Add(&knftables.Element{ |
| Map: noEndpointServicesMap, |
| Key: []string{ |
| externalIP.String(), |
| protocol, |
| strconv.Itoa(svcInfo.Port()), |
| }, |
| Value: []string{ |
| externalTrafficFilterVerdict, |
| }, |
| Comment: &svcPortNameString, |
| }) |
| } |
| } |
| |
| usesFWChain := len(svcInfo.LoadBalancerVIPs()) > 0 && len(svcInfo.LoadBalancerSourceRanges()) > 0 |
| fwChain := svcInfo.firewallChainName |
| if usesFWChain { |
| ensureChain(fwChain, tx, activeChains) |
| var sources []string |
| allowFromNode := false |
| for _, cidr := range svcInfo.LoadBalancerSourceRanges() { |
| if len(sources) > 0 { |
| sources = append(sources, ",") |
| } |
| sources = append(sources, cidr.String()) |
| if cidr.Contains(proxier.nodeIP) { |
| allowFromNode = true |
| } |
| } |
| // For VIP-like LBs, the VIP is often added as a local |
| // address (via an IP route rule). In that case, a request |
| // from a node to the VIP will not hit the loadbalancer but |
| // will loop back with the source IP set to the VIP. We |
| // need the following rules to allow requests from this node. |
| if allowFromNode { |
| for _, lbip := range svcInfo.LoadBalancerVIPs() { |
| sources = append(sources, ",", lbip.String()) |
| } |
| } |
| tx.Add(&knftables.Rule{ |
| Chain: fwChain, |
| Rule: knftables.Concat( |
| ipX, "saddr", "!=", "{", sources, "}", |
| "drop", |
| ), |
| }) |
| } |
| |
| // Capture load-balancer ingress. |
| for _, lbip := range svcInfo.LoadBalancerVIPs() { |
| if hasEndpoints { |
| tx.Add(&knftables.Element{ |
| Map: serviceIPsMap, |
| Key: []string{ |
| lbip.String(), |
| protocol, |
| strconv.Itoa(svcInfo.Port()), |
| }, |
| Value: []string{ |
| fmt.Sprintf("goto %s", externalTrafficChain), |
| }, |
| }) |
| } |
| |
| if usesFWChain { |
| tx.Add(&knftables.Element{ |
| Map: firewallIPsMap, |
| Key: []string{ |
| lbip.String(), |
| protocol, |
| strconv.Itoa(svcInfo.Port()), |
| }, |
| Value: []string{ |
| fmt.Sprintf("goto %s", fwChain), |
| }, |
| Comment: &svcPortNameString, |
| }) |
| } |
| } |
| if !hasExternalEndpoints { |
| // Either no endpoints at all (REJECT) or no endpoints for |
| // external traffic (DROP anything that didn't get short-circuited |
| // by the EXT chain.) |
| for _, lbip := range svcInfo.LoadBalancerVIPs() { |
| tx.Add(&knftables.Element{ |
| Map: noEndpointServicesMap, |
| Key: []string{ |
| lbip.String(), |
| protocol, |
| strconv.Itoa(svcInfo.Port()), |
| }, |
| Value: []string{ |
| externalTrafficFilterVerdict, |
| }, |
| Comment: &svcPortNameString, |
| }) |
| } |
| } |
| |
| // Capture nodeports. |
| if svcInfo.NodePort() != 0 { |
| if hasEndpoints { |
| // Jump to the external destination chain. For better or for |
| // worse, nodeports are not subect to loadBalancerSourceRanges, |
| // and we can't change that. |
| tx.Add(&knftables.Element{ |
| Map: serviceNodePortsMap, |
| Key: []string{ |
| protocol, |
| strconv.Itoa(svcInfo.NodePort()), |
| }, |
| Value: []string{ |
| fmt.Sprintf("goto %s", externalTrafficChain), |
| }, |
| }) |
| } |
| if !hasExternalEndpoints { |
| // Either no endpoints at all (REJECT) or no endpoints for |
| // external traffic (DROP anything that didn't get |
| // short-circuited by the EXT chain.) |
| tx.Add(&knftables.Element{ |
| Map: noEndpointNodePortsMap, |
| Key: []string{ |
| protocol, |
| strconv.Itoa(svcInfo.NodePort()), |
| }, |
| Value: []string{ |
| externalTrafficFilterVerdict, |
| }, |
| Comment: &svcPortNameString, |
| }) |
| } |
| } |
| |
| // Set up internal traffic handling. |
| if hasInternalEndpoints { |
| if proxier.masqueradeAll { |
| tx.Add(&knftables.Rule{ |
| Chain: internalTrafficChain, |
| Rule: knftables.Concat( |
| ipX, "daddr", svcInfo.ClusterIP(), |
| protocol, "dport", svcInfo.Port(), |
| "jump", markMasqChain, |
| ), |
| }) |
| } else if proxier.localDetector.IsImplemented() { |
| // This masquerades off-cluster traffic to a service VIP. The |
| // idea is that you can establish a static route for your |
| // Service range, routing to any node, and that node will |
| // bridge into the Service for you. Since that might bounce |
| // off-node, we masquerade here. |
| tx.Add(&knftables.Rule{ |
| Chain: internalTrafficChain, |
| Rule: knftables.Concat( |
| ipX, "daddr", svcInfo.ClusterIP(), |
| protocol, "dport", svcInfo.Port(), |
| proxier.localDetector.IfNotLocalNFT(), |
| "jump", markMasqChain, |
| ), |
| }) |
| } |
| } |
| |
| // Set up external traffic handling (if any "external" destinations are |
| // enabled). All captured traffic for all external destinations should |
| // jump to externalTrafficChain, which will handle some special cases and |
| // then jump to externalPolicyChain. |
| if usesExternalTrafficChain { |
| if !svcInfo.ExternalPolicyLocal() { |
| // If we are using non-local endpoints we need to masquerade, |
| // in case we cross nodes. |
| tx.Add(&knftables.Rule{ |
| Chain: externalTrafficChain, |
| Rule: knftables.Concat( |
| "jump", markMasqChain, |
| ), |
| }) |
| } else { |
| // If we are only using same-node endpoints, we can retain the |
| // source IP in most cases. |
| |
| if proxier.localDetector.IsImplemented() { |
| // Treat all locally-originated pod -> external destination |
| // traffic as a special-case. It is subject to neither |
| // form of traffic policy, which simulates going up-and-out |
| // to an external load-balancer and coming back in. |
| tx.Add(&knftables.Rule{ |
| Chain: externalTrafficChain, |
| Rule: knftables.Concat( |
| proxier.localDetector.IfLocalNFT(), |
| "goto", clusterPolicyChain, |
| ), |
| Comment: ptr.To("short-circuit pod traffic"), |
| }) |
| } |
| |
| // Locally originated traffic (not a pod, but the host node) |
| // still needs masquerade because the LBIP itself is a local |
| // address, so that will be the chosen source IP. |
| tx.Add(&knftables.Rule{ |
| Chain: externalTrafficChain, |
| Rule: knftables.Concat( |
| "fib", "saddr", "type", "local", |
| "jump", markMasqChain, |
| ), |
| Comment: ptr.To("masquerade local traffic"), |
| }) |
| |
| // Redirect all src-type=LOCAL -> external destination to the |
| // policy=cluster chain. This allows traffic originating |
| // from the host to be redirected to the service correctly. |
| tx.Add(&knftables.Rule{ |
| Chain: externalTrafficChain, |
| Rule: knftables.Concat( |
| "fib", "saddr", "type", "local", |
| "goto", clusterPolicyChain, |
| ), |
| Comment: ptr.To("short-circuit local traffic"), |
| }) |
| } |
| |
| // Anything else falls thru to the appropriate policy chain. |
| if hasExternalEndpoints { |
| tx.Add(&knftables.Rule{ |
| Chain: externalTrafficChain, |
| Rule: knftables.Concat( |
| "goto", externalPolicyChain, |
| ), |
| }) |
| } |
| } |
| |
| if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { |
| // Generate the per-endpoint affinity sets |
| for _, ep := range allLocallyReachableEndpoints { |
| epInfo, ok := ep.(*endpointInfo) |
| if !ok { |
| klog.ErrorS(nil, "Failed to cast endpointsInfo", "endpointsInfo", ep) |
| continue |
| } |
| |
| // Create a set to store current affinity mappings. As |
| // with the iptables backend, endpoint affinity is |
| // recorded for connections from a particular source IP |
| // (without regard to source port) to a particular |
| // ServicePort (without regard to which service IP was |
| // used to reach the service). This may be changed in the |
| // future. |
| tx.Add(&knftables.Set{ |
| Name: epInfo.affinitySetName, |
| Type: ipvX_addr, |
| Flags: []knftables.SetFlag{ |
| // The nft docs say "dynamic" is only |
| // needed for sets containing stateful |
| // objects (eg counters), but (at least on |
| // RHEL8) if we create the set without |
| // "dynamic", it later gets mutated to |
| // have it, and then the next attempt to |
| // tx.Add() it here fails because it looks |
| // like we're trying to change the flags. |
| knftables.DynamicFlag, |
| knftables.TimeoutFlag, |
| }, |
| Timeout: ptr.To(time.Duration(svcInfo.StickyMaxAgeSeconds()) * time.Second), |
| }) |
| activeAffinitySets.Insert(epInfo.affinitySetName) |
| } |
| } |
| |
| // If Cluster policy is in use, create the chain and create rules jumping |
| // from clusterPolicyChain to the clusterEndpoints |
| if usesClusterPolicyChain { |
| proxier.writeServiceToEndpointRules(tx, svcPortNameString, svcInfo, clusterPolicyChain, clusterEndpoints) |
| } |
| |
| // If Local policy is in use, create rules jumping from localPolicyChain |
| // to the localEndpoints |
| if usesLocalPolicyChain { |
| proxier.writeServiceToEndpointRules(tx, svcPortNameString, svcInfo, localPolicyChain, localEndpoints) |
| } |
| |
| // Generate the per-endpoint chains |
| for _, ep := range allLocallyReachableEndpoints { |
| epInfo, ok := ep.(*endpointInfo) |
| if !ok { |
| klog.ErrorS(nil, "Failed to cast endpointInfo", "endpointInfo", ep) |
| continue |
| } |
| |
| endpointChain := epInfo.chainName |
| |
| // Handle traffic that loops back to the originator with SNAT. |
| tx.Add(&knftables.Rule{ |
| Chain: endpointChain, |
| Rule: knftables.Concat( |
| ipX, "saddr", epInfo.IP(), |
| "jump", markMasqChain, |
| ), |
| }) |
| |
| // Handle session affinity |
| if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { |
| tx.Add(&knftables.Rule{ |
| Chain: endpointChain, |
| Rule: knftables.Concat( |
| "update", "@", epInfo.affinitySetName, |
| "{", ipX, "saddr", "}", |
| ), |
| }) |
| } |
| |
| // DNAT to final destination. |
| tx.Add(&knftables.Rule{ |
| Chain: endpointChain, |
| Rule: knftables.Concat( |
| "meta l4proto", protocol, |
| "dnat to", epInfo.String(), |
| ), |
| }) |
| } |
| } |
| |
| // Figure out which chains are now stale. Unfortunately, we can't delete them |
| // right away, because with kernels before 6.2, if there is a map element pointing |
| // to a chain, and you delete that map element, the kernel doesn't notice until a |
| // short amount of time later that the chain is now unreferenced. So we flush them |
| // now, and record the time that they become stale in staleChains so they can be |
| // deleted later. |
| existingChains, err := proxier.nftables.List(context.TODO(), "chains") |
| if err == nil { |
| for _, chain := range existingChains { |
| if isServiceChainName(chain) { |
| if !activeChains.Has(chain) { |
| tx.Flush(&knftables.Chain{ |
| Name: chain, |
| }) |
| proxier.staleChains[chain] = start |
| } else { |
| delete(proxier.staleChains, chain) |
| } |
| } |
| } |
| } else if !knftables.IsNotFound(err) { |
| klog.ErrorS(err, "Failed to list nftables chains: stale chains will not be deleted") |
| } |
| |
| // OTOH, we can immediately delete any stale affinity sets |
| existingSets, err := proxier.nftables.List(context.TODO(), "sets") |
| if err == nil { |
| for _, set := range existingSets { |
| if isAffinitySetName(set) && !activeAffinitySets.Has(set) { |
| tx.Delete(&knftables.Set{ |
| Name: set, |
| }) |
| } |
| } |
| } else if !knftables.IsNotFound(err) { |
| klog.ErrorS(err, "Failed to list nftables sets: stale affinity sets will not be deleted") |
| } |
| |
| // Sync rules. |
| klog.V(2).InfoS("Reloading service nftables data", |
| "numServices", len(proxier.svcPortMap), |
| "numEndpoints", totalEndpoints, |
| ) |
| |
| // FIXME |
| // klog.V(9).InfoS("Running nftables transaction", "transaction", tx.Bytes()) |
| |
| err = proxier.nftables.Run(context.TODO(), tx) |
| if err != nil { |
| klog.ErrorS(err, "nftables sync failed") |
| metrics.IptablesRestoreFailuresTotal.Inc() |
| return |
| } |
| success = true |
| |
| for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes { |
| for _, lastChangeTriggerTime := range lastChangeTriggerTimes { |
| latency := metrics.SinceInSeconds(lastChangeTriggerTime) |
| metrics.NetworkProgrammingLatency.Observe(latency) |
| klog.V(4).InfoS("Network programming", "endpoint", klog.KRef(name.Namespace, name.Name), "elapsed", latency) |
| } |
| } |
| |
| metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(serviceNoLocalEndpointsTotalInternal)) |
| metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(serviceNoLocalEndpointsTotalExternal)) |
| if proxier.healthzServer != nil { |
| proxier.healthzServer.Updated(proxier.ipFamily) |
| } |
| metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime() |
| |
| // Update service healthchecks. The endpoints list might include services that are |
| // not "OnlyLocal", but the services list will not, and the serviceHealthServer |
| // will just drop those endpoints. |
| if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil { |
| klog.ErrorS(err, "Error syncing healthcheck services") |
| } |
| if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil { |
| klog.ErrorS(err, "Error syncing healthcheck endpoints") |
| } |
| |
| // Finish housekeeping, clear stale conntrack entries for UDP Services |
| conntrack.CleanStaleEntries(proxier.conntrack, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult) |
| } |
| |
| func (proxier *Proxier) writeServiceToEndpointRules(tx *knftables.Transaction, svcPortNameString string, svcInfo *servicePortInfo, svcChain string, endpoints []proxy.Endpoint) { |
| // First write session affinity rules, if applicable. |
| if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { |
| ipX := "ip" |
| if proxier.ipFamily == v1.IPv6Protocol { |
| ipX = "ip6" |
| } |
| |
| for _, ep := range endpoints { |
| epInfo, ok := ep.(*endpointInfo) |
| if !ok { |
| continue |
| } |
| |
| tx.Add(&knftables.Rule{ |
| Chain: svcChain, |
| Rule: knftables.Concat( |
| ipX, "saddr", "@", epInfo.affinitySetName, |
| "goto", epInfo.chainName, |
| ), |
| }) |
| } |
| } |
| |
| // Now write loadbalancing rule |
| var elements []string |
| for i, ep := range endpoints { |
| epInfo, ok := ep.(*endpointInfo) |
| if !ok { |
| continue |
| } |
| |
| elements = append(elements, |
| strconv.Itoa(i), ":", "goto", epInfo.chainName, |
| ) |
| if i != len(endpoints)-1 { |
| elements = append(elements, ",") |
| } |
| } |
| tx.Add(&knftables.Rule{ |
| Chain: svcChain, |
| Rule: knftables.Concat( |
| "numgen random mod", len(endpoints), "vmap", |
| "{", elements, "}", |
| ), |
| }) |
| } |