| /* |
| Copyright The containerd Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package io |
| |
| import ( |
| "bufio" |
| "bytes" |
| "fmt" |
| "io" |
| "time" |
| |
| runtime "k8s.io/cri-api/pkg/apis/runtime/v1" |
| |
| cioutil "github.com/containerd/containerd/v2/pkg/ioutil" |
| "github.com/containerd/log" |
| ) |
| |
| const ( |
| // delimiter used in CRI logging format. |
| delimiter = ' ' |
| // eof is end-of-line. |
| eol = '\n' |
| // timestampFormat is the timestamp format used in CRI logging format. |
| timestampFormat = time.RFC3339Nano |
| // defaultBufSize is the default size of the read buffer in bytes. |
| defaultBufSize = 4096 |
| ) |
| |
| // NewDiscardLogger creates logger which discards all the input. |
| func NewDiscardLogger() io.WriteCloser { |
| return cioutil.NewNopWriteCloser(io.Discard) |
| } |
| |
| // NewCRILogger returns a write closer which redirect container log into |
| // log file, and decorate the log line into CRI defined format. It also |
| // returns a channel which indicates whether the logger is stopped. |
| // maxLen is the max length limit of a line. A line longer than the |
| // limit will be cut into multiple lines. |
| func NewCRILogger(path string, w io.Writer, stream StreamType, maxLen int) (io.WriteCloser, <-chan struct{}) { |
| log.L.Debugf("Start writing stream %q to log file %q", stream, path) |
| prc, pwc := io.Pipe() |
| stop := make(chan struct{}) |
| go func() { |
| redirectLogs(path, prc, w, stream, maxLen) |
| close(stop) |
| }() |
| return pwc, stop |
| } |
| |
| // bufio.ReadLine in golang eats both read errors and tailing newlines |
| // (See https://golang.org/pkg/bufio/#Reader.ReadLine). When reading |
| // to io.EOF, it is impossible for the caller to figure out whether |
| // there is a newline at the end, for example: |
| // 1) When reading "CONTENT\n", it returns "CONTENT" without error; |
| // 2) When reading "CONTENT", it also returns "CONTENT" without error. |
| // |
| // To differentiate these 2 cases, we need to write a readLine function |
| // ourselves to not ignore the error. |
| // |
| // The code is similar with https://golang.org/src/bufio/bufio.go?s=9537:9604#L359. |
| // The only difference is that it returns all errors from `ReadSlice`. |
| // |
| // readLine returns err != nil if and only if line does not end with a new line. |
| func readLine(b *bufio.Reader) (line []byte, isPrefix bool, err error) { |
| line, err = b.ReadSlice('\n') |
| if err == bufio.ErrBufferFull { |
| // Handle the case where "\r\n" straddles the buffer. |
| if len(line) > 0 && line[len(line)-1] == '\r' { |
| // Unread the last '\r' |
| if err := b.UnreadByte(); err != nil { |
| panic(fmt.Sprintf("invalid unread %v", err)) |
| } |
| line = line[:len(line)-1] |
| } |
| return line, true, nil |
| } |
| |
| if len(line) == 0 { |
| if err != nil { |
| line = nil |
| } |
| return |
| } |
| |
| if line[len(line)-1] == '\n' { |
| // "ReadSlice returns err != nil if and only if line does not end in delim" |
| // (See https://golang.org/pkg/bufio/#Reader.ReadSlice). |
| if err != nil { |
| panic(fmt.Sprintf("full read with unexpected error %v", err)) |
| } |
| drop := 1 |
| if len(line) > 1 && line[len(line)-2] == '\r' { |
| drop = 2 |
| } |
| line = line[:len(line)-drop] |
| } |
| return |
| } |
| |
| func redirectLogs(path string, rc io.ReadCloser, w io.Writer, s StreamType, maxLen int) { |
| defer rc.Close() |
| var ( |
| stream = []byte(s) |
| delimiter = []byte{delimiter} |
| partial = []byte(runtime.LogTagPartial) |
| full = []byte(runtime.LogTagFull) |
| buf [][]byte |
| length int |
| bufSize = defaultBufSize |
| |
| timeBuffer = make([]byte, len(timestampFormat)) |
| lineBuffer = bytes.Buffer{} |
| ) |
| // Make sure bufSize <= maxLen |
| if maxLen > 0 && maxLen < bufSize { |
| bufSize = maxLen |
| } |
| r := bufio.NewReaderSize(rc, bufSize) |
| writeLineBuffer := func(tag []byte, lineBytes [][]byte) { |
| timeBuffer = time.Now().AppendFormat(timeBuffer[:0], timestampFormat) |
| headers := [][]byte{timeBuffer, stream, tag} |
| |
| lineBuffer.Reset() |
| for _, h := range headers { |
| lineBuffer.Write(h) |
| lineBuffer.Write(delimiter) |
| } |
| for _, l := range lineBytes { |
| lineBuffer.Write(l) |
| } |
| lineBuffer.WriteByte(eol) |
| if n, err := lineBuffer.WriteTo(w); err == nil { |
| outputEntries.Inc() |
| outputBytes.Inc(float64(n)) |
| } else { |
| log.L.WithError(err).Errorf("Fail to write %q log to log file %q", s, path) |
| // Continue on write error to drain the container output. |
| } |
| } |
| for { |
| var stop bool |
| newLine, isPrefix, err := readLine(r) |
| // NOTE(random-liu): readLine can return actual content even if there is an error. |
| if len(newLine) > 0 { |
| inputEntries.Inc() |
| inputBytes.Inc(float64(len(newLine))) |
| // Buffer returned by ReadLine will change after |
| // next read, copy it. |
| l := make([]byte, len(newLine)) |
| copy(l, newLine) |
| buf = append(buf, l) |
| length += len(l) |
| } |
| if err != nil { |
| if err == io.EOF { |
| log.L.Tracef("Getting EOF from stream %q while redirecting to log file %q", s, path) |
| } else { |
| log.L.WithError(err).Errorf("An error occurred when redirecting stream %q to log file %q", s, path) |
| } |
| if length == 0 { |
| // No content left to write, break. |
| break |
| } |
| // Stop after writing the content left in buffer. |
| stop = true |
| } |
| if maxLen > 0 && length > maxLen { |
| exceedLen := length - maxLen |
| last := buf[len(buf)-1] |
| if exceedLen > len(last) { |
| // exceedLen must <= len(last), or else the buffer |
| // should have be written in the previous iteration. |
| panic("exceed length should <= last buffer size") |
| } |
| buf[len(buf)-1] = last[:len(last)-exceedLen] |
| writeLineBuffer(partial, buf) |
| splitEntries.Inc() |
| buf = [][]byte{last[len(last)-exceedLen:]} |
| length = exceedLen |
| } |
| if isPrefix { |
| continue |
| } |
| if stop { |
| // readLine only returns error when the message doesn't |
| // end with a newline, in that case it should be treated |
| // as a partial line. |
| writeLineBuffer(partial, buf) |
| } else { |
| writeLineBuffer(full, buf) |
| } |
| buf = nil |
| length = 0 |
| if stop { |
| break |
| } |
| } |
| log.L.Debugf("Finish redirecting stream %q to log file %q", s, path) |
| } |