| /* |
| Copyright 2021 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package grpc |
| |
| import ( |
| "context" |
| "fmt" |
| "net" |
| "time" |
| |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/credentials/insecure" |
| grpchealth "google.golang.org/grpc/health/grpc_health_v1" |
| "google.golang.org/grpc/metadata" |
| "google.golang.org/grpc/status" |
| "k8s.io/component-base/version" |
| "k8s.io/klog/v2" |
| "k8s.io/kubernetes/pkg/probe" |
| ) |
| |
| // Prober is an interface that defines the Probe function for doing GRPC readiness/liveness/startup checks. |
| type Prober interface { |
| Probe(host, service string, port int, timeout time.Duration) (probe.Result, string, error) |
| } |
| |
| type grpcProber struct { |
| } |
| |
| // New Prober for execute grpc probe |
| func New() Prober { |
| return grpcProber{} |
| } |
| |
| // Probe executes a grpc call to check the liveness/readiness/startup of container. |
| // Returns the Result status, command output, and errors if any. |
| // Any failure is considered as a probe failure to mimic grpc_health_probe tool behavior. |
| // err is always nil |
| func (p grpcProber) Probe(host, service string, port int, timeout time.Duration) (probe.Result, string, error) { |
| v := version.Get() |
| |
| opts := []grpc.DialOption{ |
| grpc.WithUserAgent(fmt.Sprintf("kube-probe/%s.%s", v.Major, v.Minor)), |
| grpc.WithBlock(), |
| grpc.WithTransportCredentials(insecure.NewCredentials()), //credentials are currently not supported |
| grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) { |
| return probe.ProbeDialer().DialContext(ctx, "tcp", addr) |
| }), |
| } |
| |
| ctx, cancel := context.WithTimeout(context.Background(), timeout) |
| |
| defer cancel() |
| |
| addr := net.JoinHostPort(host, fmt.Sprintf("%d", port)) |
| conn, err := grpc.DialContext(ctx, addr, opts...) |
| |
| if err != nil { |
| if err == context.DeadlineExceeded { |
| klog.V(4).ErrorS(err, "failed to connect grpc service due to timeout", "addr", addr, "service", service, "timeout", timeout) |
| return probe.Failure, fmt.Sprintf("timeout: failed to connect service %q within %v: %+v", addr, timeout, err), nil |
| } else { |
| klog.V(4).ErrorS(err, "failed to connect grpc service", "service", addr) |
| return probe.Failure, fmt.Sprintf("error: failed to connect service at %q: %+v", addr, err), nil |
| } |
| } |
| |
| defer func() { |
| _ = conn.Close() |
| }() |
| |
| client := grpchealth.NewHealthClient(conn) |
| |
| resp, err := client.Check(metadata.NewOutgoingContext(ctx, make(metadata.MD)), &grpchealth.HealthCheckRequest{ |
| Service: service, |
| }) |
| |
| if err != nil { |
| stat, ok := status.FromError(err) |
| if ok { |
| switch stat.Code() { |
| case codes.Unimplemented: |
| klog.V(4).ErrorS(err, "server does not implement the grpc health protocol (grpc.health.v1.Health)", "addr", addr, "service", service) |
| return probe.Failure, fmt.Sprintf("error: this server does not implement the grpc health protocol (grpc.health.v1.Health): %s", stat.Message()), nil |
| case codes.DeadlineExceeded: |
| klog.V(4).ErrorS(err, "rpc request not finished within timeout", "addr", addr, "service", service, "timeout", timeout) |
| return probe.Failure, fmt.Sprintf("timeout: health rpc did not complete within %v", timeout), nil |
| default: |
| klog.V(4).ErrorS(err, "rpc probe failed") |
| } |
| } else { |
| klog.V(4).ErrorS(err, "health rpc probe failed") |
| } |
| |
| return probe.Failure, fmt.Sprintf("error: health rpc probe failed: %+v", err), nil |
| } |
| |
| if resp.GetStatus() != grpchealth.HealthCheckResponse_SERVING { |
| return probe.Failure, fmt.Sprintf("service unhealthy (responded with %q)", resp.GetStatus().String()), nil |
| } |
| |
| return probe.Success, fmt.Sprintf("service healthy"), nil |
| } |