| /* |
| Copyright 2014-2021 Docker Inc. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package spdystream |
| |
| import ( |
| "errors" |
| "fmt" |
| "io" |
| "net" |
| "net/http" |
| "sync" |
| "time" |
| |
| "github.com/moby/spdystream/spdy" |
| ) |
| |
| var ( |
| ErrUnreadPartialData = errors.New("unread partial data") |
| ) |
| |
| type Stream struct { |
| streamId spdy.StreamId |
| parent *Stream |
| conn *Connection |
| startChan chan error |
| |
| dataLock sync.RWMutex |
| dataChan chan []byte |
| unread []byte |
| |
| priority uint8 |
| headers http.Header |
| headerChan chan http.Header |
| finishLock sync.Mutex |
| finished bool |
| replyCond *sync.Cond |
| replied bool |
| closeLock sync.Mutex |
| closeChan chan bool |
| } |
| |
| // WriteData writes data to stream, sending a dataframe per call |
| func (s *Stream) WriteData(data []byte, fin bool) error { |
| s.waitWriteReply() |
| var flags spdy.DataFlags |
| |
| if fin { |
| flags = spdy.DataFlagFin |
| s.finishLock.Lock() |
| if s.finished { |
| s.finishLock.Unlock() |
| return ErrWriteClosedStream |
| } |
| s.finished = true |
| s.finishLock.Unlock() |
| } |
| |
| dataFrame := &spdy.DataFrame{ |
| StreamId: s.streamId, |
| Flags: flags, |
| Data: data, |
| } |
| |
| debugMessage("(%p) (%d) Writing data frame", s, s.streamId) |
| return s.conn.framer.WriteFrame(dataFrame) |
| } |
| |
| // Write writes bytes to a stream, calling write data for each call. |
| func (s *Stream) Write(data []byte) (n int, err error) { |
| err = s.WriteData(data, false) |
| if err == nil { |
| n = len(data) |
| } |
| return |
| } |
| |
| // Read reads bytes from a stream, a single read will never get more |
| // than what is sent on a single data frame, but a multiple calls to |
| // read may get data from the same data frame. |
| func (s *Stream) Read(p []byte) (n int, err error) { |
| if s.unread == nil { |
| select { |
| case <-s.closeChan: |
| return 0, io.EOF |
| case read, ok := <-s.dataChan: |
| if !ok { |
| return 0, io.EOF |
| } |
| s.unread = read |
| } |
| } |
| n = copy(p, s.unread) |
| if n < len(s.unread) { |
| s.unread = s.unread[n:] |
| } else { |
| s.unread = nil |
| } |
| return |
| } |
| |
| // ReadData reads an entire data frame and returns the byte array |
| // from the data frame. If there is unread data from the result |
| // of a Read call, this function will return an ErrUnreadPartialData. |
| func (s *Stream) ReadData() ([]byte, error) { |
| debugMessage("(%p) Reading data from %d", s, s.streamId) |
| if s.unread != nil { |
| return nil, ErrUnreadPartialData |
| } |
| select { |
| case <-s.closeChan: |
| return nil, io.EOF |
| case read, ok := <-s.dataChan: |
| if !ok { |
| return nil, io.EOF |
| } |
| return read, nil |
| } |
| } |
| |
| func (s *Stream) waitWriteReply() { |
| if s.replyCond != nil { |
| s.replyCond.L.Lock() |
| for !s.replied { |
| s.replyCond.Wait() |
| } |
| s.replyCond.L.Unlock() |
| } |
| } |
| |
| // Wait waits for the stream to receive a reply. |
| func (s *Stream) Wait() error { |
| return s.WaitTimeout(time.Duration(0)) |
| } |
| |
| // WaitTimeout waits for the stream to receive a reply or for timeout. |
| // When the timeout is reached, ErrTimeout will be returned. |
| func (s *Stream) WaitTimeout(timeout time.Duration) error { |
| var timeoutChan <-chan time.Time |
| if timeout > time.Duration(0) { |
| timeoutChan = time.After(timeout) |
| } |
| |
| select { |
| case err := <-s.startChan: |
| if err != nil { |
| return err |
| } |
| break |
| case <-timeoutChan: |
| return ErrTimeout |
| } |
| return nil |
| } |
| |
| // Close closes the stream by sending an empty data frame with the |
| // finish flag set, indicating this side is finished with the stream. |
| func (s *Stream) Close() error { |
| select { |
| case <-s.closeChan: |
| // Stream is now fully closed |
| s.conn.removeStream(s) |
| default: |
| break |
| } |
| return s.WriteData([]byte{}, true) |
| } |
| |
| // Reset sends a reset frame, putting the stream into the fully closed state. |
| func (s *Stream) Reset() error { |
| s.conn.removeStream(s) |
| return s.resetStream() |
| } |
| |
| func (s *Stream) resetStream() error { |
| // Always call closeRemoteChannels, even if s.finished is already true. |
| // This makes it so that stream.Close() followed by stream.Reset() allows |
| // stream.Read() to unblock. |
| s.closeRemoteChannels() |
| |
| s.finishLock.Lock() |
| if s.finished { |
| s.finishLock.Unlock() |
| return nil |
| } |
| s.finished = true |
| s.finishLock.Unlock() |
| |
| resetFrame := &spdy.RstStreamFrame{ |
| StreamId: s.streamId, |
| Status: spdy.Cancel, |
| } |
| return s.conn.framer.WriteFrame(resetFrame) |
| } |
| |
| // CreateSubStream creates a stream using the current as the parent |
| func (s *Stream) CreateSubStream(headers http.Header, fin bool) (*Stream, error) { |
| return s.conn.CreateStream(headers, s, fin) |
| } |
| |
| // SetPriority sets the stream priority, does not affect the |
| // remote priority of this stream after Open has been called. |
| // Valid values are 0 through 7, 0 being the highest priority |
| // and 7 the lowest. |
| func (s *Stream) SetPriority(priority uint8) { |
| s.priority = priority |
| } |
| |
| // SendHeader sends a header frame across the stream |
| func (s *Stream) SendHeader(headers http.Header, fin bool) error { |
| return s.conn.sendHeaders(headers, s, fin) |
| } |
| |
| // SendReply sends a reply on a stream, only valid to be called once |
| // when handling a new stream |
| func (s *Stream) SendReply(headers http.Header, fin bool) error { |
| if s.replyCond == nil { |
| return errors.New("cannot reply on initiated stream") |
| } |
| s.replyCond.L.Lock() |
| defer s.replyCond.L.Unlock() |
| if s.replied { |
| return nil |
| } |
| |
| err := s.conn.sendReply(headers, s, fin) |
| if err != nil { |
| return err |
| } |
| |
| s.replied = true |
| s.replyCond.Broadcast() |
| return nil |
| } |
| |
| // Refuse sends a reset frame with the status refuse, only |
| // valid to be called once when handling a new stream. This |
| // may be used to indicate that a stream is not allowed |
| // when http status codes are not being used. |
| func (s *Stream) Refuse() error { |
| if s.replied { |
| return nil |
| } |
| s.replied = true |
| return s.conn.sendReset(spdy.RefusedStream, s) |
| } |
| |
| // Cancel sends a reset frame with the status canceled. This |
| // can be used at any time by the creator of the Stream to |
| // indicate the stream is no longer needed. |
| func (s *Stream) Cancel() error { |
| return s.conn.sendReset(spdy.Cancel, s) |
| } |
| |
| // ReceiveHeader receives a header sent on the other side |
| // of the stream. This function will block until a header |
| // is received or stream is closed. |
| func (s *Stream) ReceiveHeader() (http.Header, error) { |
| select { |
| case <-s.closeChan: |
| break |
| case header, ok := <-s.headerChan: |
| if !ok { |
| return nil, fmt.Errorf("header chan closed") |
| } |
| return header, nil |
| } |
| return nil, fmt.Errorf("stream closed") |
| } |
| |
| // Parent returns the parent stream |
| func (s *Stream) Parent() *Stream { |
| return s.parent |
| } |
| |
| // Headers returns the headers used to create the stream |
| func (s *Stream) Headers() http.Header { |
| return s.headers |
| } |
| |
| // String returns the string version of stream using the |
| // streamId to uniquely identify the stream |
| func (s *Stream) String() string { |
| return fmt.Sprintf("stream:%d", s.streamId) |
| } |
| |
| // Identifier returns a 32 bit identifier for the stream |
| func (s *Stream) Identifier() uint32 { |
| return uint32(s.streamId) |
| } |
| |
| // IsFinished returns whether the stream has finished |
| // sending data |
| func (s *Stream) IsFinished() bool { |
| return s.finished |
| } |
| |
| // Implement net.Conn interface |
| |
| func (s *Stream) LocalAddr() net.Addr { |
| return s.conn.conn.LocalAddr() |
| } |
| |
| func (s *Stream) RemoteAddr() net.Addr { |
| return s.conn.conn.RemoteAddr() |
| } |
| |
| // TODO set per stream values instead of connection-wide |
| |
| func (s *Stream) SetDeadline(t time.Time) error { |
| return s.conn.conn.SetDeadline(t) |
| } |
| |
| func (s *Stream) SetReadDeadline(t time.Time) error { |
| return s.conn.conn.SetReadDeadline(t) |
| } |
| |
| func (s *Stream) SetWriteDeadline(t time.Time) error { |
| return s.conn.conn.SetWriteDeadline(t) |
| } |
| |
| func (s *Stream) closeRemoteChannels() { |
| s.closeLock.Lock() |
| defer s.closeLock.Unlock() |
| select { |
| case <-s.closeChan: |
| default: |
| close(s.closeChan) |
| } |
| } |