fix: resolve all trivy vulnerabilities (2023-06-06)
- bump go version to 1.17 - resolved: CVE-2022-21698, CVE-2022-27664, CVE-2022-41723, CVE-2022-41717, CVE-2022-29526, CVE-2022-32149, CVE-2022-28948.
This commit is contained in:
372
vendor/golang.org/x/net/http2/transport.go
generated
vendored
372
vendor/golang.org/x/net/http2/transport.go
generated
vendored
@@ -16,7 +16,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"io/fs"
|
||||
"log"
|
||||
"math"
|
||||
mathrand "math/rand"
|
||||
@@ -47,10 +47,6 @@ const (
|
||||
// we buffer per stream.
|
||||
transportDefaultStreamFlow = 4 << 20
|
||||
|
||||
// transportDefaultStreamMinRefresh is the minimum number of bytes we'll send
|
||||
// a stream-level WINDOW_UPDATE for at a time.
|
||||
transportDefaultStreamMinRefresh = 4 << 10
|
||||
|
||||
defaultUserAgent = "Go-http-client/2.0"
|
||||
|
||||
// initialMaxConcurrentStreams is a connections maxConcurrentStreams until
|
||||
@@ -68,13 +64,23 @@ const (
|
||||
// A Transport internally caches connections to servers. It is safe
|
||||
// for concurrent use by multiple goroutines.
|
||||
type Transport struct {
|
||||
// DialTLS specifies an optional dial function for creating
|
||||
// TLS connections for requests.
|
||||
// DialTLSContext specifies an optional dial function with context for
|
||||
// creating TLS connections for requests.
|
||||
//
|
||||
// If DialTLS is nil, tls.Dial is used.
|
||||
// If DialTLSContext and DialTLS is nil, tls.Dial is used.
|
||||
//
|
||||
// If the returned net.Conn has a ConnectionState method like tls.Conn,
|
||||
// it will be used to set http.Response.TLS.
|
||||
DialTLSContext func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error)
|
||||
|
||||
// DialTLS specifies an optional dial function for creating
|
||||
// TLS connections for requests.
|
||||
//
|
||||
// If DialTLSContext and DialTLS is nil, tls.Dial is used.
|
||||
//
|
||||
// Deprecated: Use DialTLSContext instead, which allows the transport
|
||||
// to cancel dials as soon as they are no longer needed.
|
||||
// If both are set, DialTLSContext takes priority.
|
||||
DialTLS func(network, addr string, cfg *tls.Config) (net.Conn, error)
|
||||
|
||||
// TLSClientConfig specifies the TLS configuration to use with
|
||||
@@ -108,6 +114,28 @@ type Transport struct {
|
||||
// to mean no limit.
|
||||
MaxHeaderListSize uint32
|
||||
|
||||
// MaxReadFrameSize is the http2 SETTINGS_MAX_FRAME_SIZE to send in the
|
||||
// initial settings frame. It is the size in bytes of the largest frame
|
||||
// payload that the sender is willing to receive. If 0, no setting is
|
||||
// sent, and the value is provided by the peer, which should be 16384
|
||||
// according to the spec:
|
||||
// https://datatracker.ietf.org/doc/html/rfc7540#section-6.5.2.
|
||||
// Values are bounded in the range 16k to 16M.
|
||||
MaxReadFrameSize uint32
|
||||
|
||||
// MaxDecoderHeaderTableSize optionally specifies the http2
|
||||
// SETTINGS_HEADER_TABLE_SIZE to send in the initial settings frame. It
|
||||
// informs the remote endpoint of the maximum size of the header compression
|
||||
// table used to decode header blocks, in octets. If zero, the default value
|
||||
// of 4096 is used.
|
||||
MaxDecoderHeaderTableSize uint32
|
||||
|
||||
// MaxEncoderHeaderTableSize optionally specifies an upper limit for the
|
||||
// header compression table used for encoding request headers. Received
|
||||
// SETTINGS_HEADER_TABLE_SIZE settings are capped at this limit. If zero,
|
||||
// the default value of 4096 is used.
|
||||
MaxEncoderHeaderTableSize uint32
|
||||
|
||||
// StrictMaxConcurrentStreams controls whether the server's
|
||||
// SETTINGS_MAX_CONCURRENT_STREAMS should be respected
|
||||
// globally. If false, new TCP connections are created to the
|
||||
@@ -161,6 +189,19 @@ func (t *Transport) maxHeaderListSize() uint32 {
|
||||
return t.MaxHeaderListSize
|
||||
}
|
||||
|
||||
func (t *Transport) maxFrameReadSize() uint32 {
|
||||
if t.MaxReadFrameSize == 0 {
|
||||
return 0 // use the default provided by the peer
|
||||
}
|
||||
if t.MaxReadFrameSize < minMaxFrameSize {
|
||||
return minMaxFrameSize
|
||||
}
|
||||
if t.MaxReadFrameSize > maxFrameSize {
|
||||
return maxFrameSize
|
||||
}
|
||||
return t.MaxReadFrameSize
|
||||
}
|
||||
|
||||
func (t *Transport) disableCompression() bool {
|
||||
return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression)
|
||||
}
|
||||
@@ -249,7 +290,8 @@ func (t *Transport) initConnPool() {
|
||||
// HTTP/2 server.
|
||||
type ClientConn struct {
|
||||
t *Transport
|
||||
tconn net.Conn // usually *tls.Conn, except specialized impls
|
||||
tconn net.Conn // usually *tls.Conn, except specialized impls
|
||||
tconnClosed bool
|
||||
tlsState *tls.ConnectionState // nil only for specialized impls
|
||||
reused uint32 // whether conn is being reused; atomic
|
||||
singleUse bool // whether being used for a single http.Request
|
||||
@@ -264,8 +306,8 @@ type ClientConn struct {
|
||||
|
||||
mu sync.Mutex // guards following
|
||||
cond *sync.Cond // hold mu; broadcast on flow/closed changes
|
||||
flow flow // our conn-level flow control quota (cs.flow is per stream)
|
||||
inflow flow // peer's conn-level flow control
|
||||
flow outflow // our conn-level flow control quota (cs.outflow is per stream)
|
||||
inflow inflow // peer's conn-level flow control
|
||||
doNotReuse bool // whether conn is marked to not be reused for any future requests
|
||||
closing bool
|
||||
closed bool
|
||||
@@ -282,10 +324,11 @@ type ClientConn struct {
|
||||
lastActive time.Time
|
||||
lastIdle time.Time // time last idle
|
||||
// Settings from peer: (also guarded by wmu)
|
||||
maxFrameSize uint32
|
||||
maxConcurrentStreams uint32
|
||||
peerMaxHeaderListSize uint64
|
||||
initialWindowSize uint32
|
||||
maxFrameSize uint32
|
||||
maxConcurrentStreams uint32
|
||||
peerMaxHeaderListSize uint64
|
||||
peerMaxHeaderTableSize uint32
|
||||
initialWindowSize uint32
|
||||
|
||||
// reqHeaderMu is a 1-element semaphore channel controlling access to sending new requests.
|
||||
// Write to reqHeaderMu to lock it, read from it to unlock.
|
||||
@@ -329,14 +372,14 @@ type clientStream struct {
|
||||
respHeaderRecv chan struct{} // closed when headers are received
|
||||
res *http.Response // set if respHeaderRecv is closed
|
||||
|
||||
flow flow // guarded by cc.mu
|
||||
inflow flow // guarded by cc.mu
|
||||
bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read
|
||||
readErr error // sticky read error; owned by transportResponseBody.Read
|
||||
flow outflow // guarded by cc.mu
|
||||
inflow inflow // guarded by cc.mu
|
||||
bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read
|
||||
readErr error // sticky read error; owned by transportResponseBody.Read
|
||||
|
||||
reqBody io.ReadCloser
|
||||
reqBodyContentLength int64 // -1 means unknown
|
||||
reqBodyClosed bool // body has been closed; guarded by cc.mu
|
||||
reqBodyContentLength int64 // -1 means unknown
|
||||
reqBodyClosed chan struct{} // guarded by cc.mu; non-nil on Close, closed when done
|
||||
|
||||
// owned by writeRequest:
|
||||
sentEndStream bool // sent an END_STREAM flag to the peer
|
||||
@@ -376,9 +419,8 @@ func (cs *clientStream) abortStreamLocked(err error) {
|
||||
cs.abortErr = err
|
||||
close(cs.abort)
|
||||
})
|
||||
if cs.reqBody != nil && !cs.reqBodyClosed {
|
||||
cs.reqBody.Close()
|
||||
cs.reqBodyClosed = true
|
||||
if cs.reqBody != nil {
|
||||
cs.closeReqBodyLocked()
|
||||
}
|
||||
// TODO(dneil): Clean up tests where cs.cc.cond is nil.
|
||||
if cs.cc.cond != nil {
|
||||
@@ -391,13 +433,24 @@ func (cs *clientStream) abortRequestBodyWrite() {
|
||||
cc := cs.cc
|
||||
cc.mu.Lock()
|
||||
defer cc.mu.Unlock()
|
||||
if cs.reqBody != nil && !cs.reqBodyClosed {
|
||||
cs.reqBody.Close()
|
||||
cs.reqBodyClosed = true
|
||||
if cs.reqBody != nil && cs.reqBodyClosed == nil {
|
||||
cs.closeReqBodyLocked()
|
||||
cc.cond.Broadcast()
|
||||
}
|
||||
}
|
||||
|
||||
func (cs *clientStream) closeReqBodyLocked() {
|
||||
if cs.reqBodyClosed != nil {
|
||||
return
|
||||
}
|
||||
cs.reqBodyClosed = make(chan struct{})
|
||||
reqBodyClosed := cs.reqBodyClosed
|
||||
go func() {
|
||||
cs.reqBody.Close()
|
||||
close(reqBodyClosed)
|
||||
}()
|
||||
}
|
||||
|
||||
type stickyErrWriter struct {
|
||||
conn net.Conn
|
||||
timeout time.Duration
|
||||
@@ -481,6 +534,15 @@ func authorityAddr(scheme string, authority string) (addr string) {
|
||||
return net.JoinHostPort(host, port)
|
||||
}
|
||||
|
||||
var retryBackoffHook func(time.Duration) *time.Timer
|
||||
|
||||
func backoffNewTimer(d time.Duration) *time.Timer {
|
||||
if retryBackoffHook != nil {
|
||||
return retryBackoffHook(d)
|
||||
}
|
||||
return time.NewTimer(d)
|
||||
}
|
||||
|
||||
// RoundTripOpt is like RoundTrip, but takes options.
|
||||
func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Response, error) {
|
||||
if !(req.URL.Scheme == "https" || (req.URL.Scheme == "http" && t.AllowHTTP)) {
|
||||
@@ -501,14 +563,19 @@ func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Res
|
||||
if req, err = shouldRetryRequest(req, err); err == nil {
|
||||
// After the first retry, do exponential backoff with 10% jitter.
|
||||
if retry == 0 {
|
||||
t.vlogf("RoundTrip retrying after failure: %v", err)
|
||||
continue
|
||||
}
|
||||
backoff := float64(uint(1) << (uint(retry) - 1))
|
||||
backoff += backoff * (0.1 * mathrand.Float64())
|
||||
d := time.Second * time.Duration(backoff)
|
||||
timer := backoffNewTimer(d)
|
||||
select {
|
||||
case <-time.After(time.Second * time.Duration(backoff)):
|
||||
case <-timer.C:
|
||||
t.vlogf("RoundTrip retrying after failure: %v", err)
|
||||
continue
|
||||
case <-req.Context().Done():
|
||||
timer.Stop()
|
||||
err = req.Context().Err()
|
||||
}
|
||||
}
|
||||
@@ -591,7 +658,7 @@ func (t *Transport) dialClientConn(ctx context.Context, addr string, singleUse b
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tconn, err := t.dialTLS(ctx)("tcp", addr, t.newTLSConfig(host))
|
||||
tconn, err := t.dialTLS(ctx, "tcp", addr, t.newTLSConfig(host))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -612,24 +679,25 @@ func (t *Transport) newTLSConfig(host string) *tls.Config {
|
||||
return cfg
|
||||
}
|
||||
|
||||
func (t *Transport) dialTLS(ctx context.Context) func(string, string, *tls.Config) (net.Conn, error) {
|
||||
if t.DialTLS != nil {
|
||||
return t.DialTLS
|
||||
func (t *Transport) dialTLS(ctx context.Context, network, addr string, tlsCfg *tls.Config) (net.Conn, error) {
|
||||
if t.DialTLSContext != nil {
|
||||
return t.DialTLSContext(ctx, network, addr, tlsCfg)
|
||||
} else if t.DialTLS != nil {
|
||||
return t.DialTLS(network, addr, tlsCfg)
|
||||
}
|
||||
return func(network, addr string, cfg *tls.Config) (net.Conn, error) {
|
||||
tlsCn, err := t.dialTLSWithContext(ctx, network, addr, cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
state := tlsCn.ConnectionState()
|
||||
if p := state.NegotiatedProtocol; p != NextProtoTLS {
|
||||
return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", p, NextProtoTLS)
|
||||
}
|
||||
if !state.NegotiatedProtocolIsMutual {
|
||||
return nil, errors.New("http2: could not negotiate protocol mutually")
|
||||
}
|
||||
return tlsCn, nil
|
||||
|
||||
tlsCn, err := t.dialTLSWithContext(ctx, network, addr, tlsCfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
state := tlsCn.ConnectionState()
|
||||
if p := state.NegotiatedProtocol; p != NextProtoTLS {
|
||||
return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", p, NextProtoTLS)
|
||||
}
|
||||
if !state.NegotiatedProtocolIsMutual {
|
||||
return nil, errors.New("http2: could not negotiate protocol mutually")
|
||||
}
|
||||
return tlsCn, nil
|
||||
}
|
||||
|
||||
// disableKeepAlives reports whether connections should be closed as
|
||||
@@ -645,6 +713,20 @@ func (t *Transport) expectContinueTimeout() time.Duration {
|
||||
return t.t1.ExpectContinueTimeout
|
||||
}
|
||||
|
||||
func (t *Transport) maxDecoderHeaderTableSize() uint32 {
|
||||
if v := t.MaxDecoderHeaderTableSize; v > 0 {
|
||||
return v
|
||||
}
|
||||
return initialHeaderTableSize
|
||||
}
|
||||
|
||||
func (t *Transport) maxEncoderHeaderTableSize() uint32 {
|
||||
if v := t.MaxEncoderHeaderTableSize; v > 0 {
|
||||
return v
|
||||
}
|
||||
return initialHeaderTableSize
|
||||
}
|
||||
|
||||
func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) {
|
||||
return t.newClientConn(c, t.disableKeepAlives())
|
||||
}
|
||||
@@ -685,15 +767,19 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
|
||||
})
|
||||
cc.br = bufio.NewReader(c)
|
||||
cc.fr = NewFramer(cc.bw, cc.br)
|
||||
if t.maxFrameReadSize() != 0 {
|
||||
cc.fr.SetMaxReadFrameSize(t.maxFrameReadSize())
|
||||
}
|
||||
if t.CountError != nil {
|
||||
cc.fr.countError = t.CountError
|
||||
}
|
||||
cc.fr.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil)
|
||||
maxHeaderTableSize := t.maxDecoderHeaderTableSize()
|
||||
cc.fr.ReadMetaHeaders = hpack.NewDecoder(maxHeaderTableSize, nil)
|
||||
cc.fr.MaxHeaderListSize = t.maxHeaderListSize()
|
||||
|
||||
// TODO: SetMaxDynamicTableSize, SetMaxDynamicTableSizeLimit on
|
||||
// henc in response to SETTINGS frames?
|
||||
cc.henc = hpack.NewEncoder(&cc.hbuf)
|
||||
cc.henc.SetMaxDynamicTableSizeLimit(t.maxEncoderHeaderTableSize())
|
||||
cc.peerMaxHeaderTableSize = initialHeaderTableSize
|
||||
|
||||
if t.AllowHTTP {
|
||||
cc.nextStreamID = 3
|
||||
@@ -708,14 +794,20 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
|
||||
{ID: SettingEnablePush, Val: 0},
|
||||
{ID: SettingInitialWindowSize, Val: transportDefaultStreamFlow},
|
||||
}
|
||||
if max := t.maxFrameReadSize(); max != 0 {
|
||||
initialSettings = append(initialSettings, Setting{ID: SettingMaxFrameSize, Val: max})
|
||||
}
|
||||
if max := t.maxHeaderListSize(); max != 0 {
|
||||
initialSettings = append(initialSettings, Setting{ID: SettingMaxHeaderListSize, Val: max})
|
||||
}
|
||||
if maxHeaderTableSize != initialHeaderTableSize {
|
||||
initialSettings = append(initialSettings, Setting{ID: SettingHeaderTableSize, Val: maxHeaderTableSize})
|
||||
}
|
||||
|
||||
cc.bw.Write(clientPreface)
|
||||
cc.fr.WriteSettings(initialSettings...)
|
||||
cc.fr.WriteWindowUpdate(0, transportDefaultConnFlow)
|
||||
cc.inflow.add(transportDefaultConnFlow + initialWindowSize)
|
||||
cc.inflow.init(transportDefaultConnFlow + initialWindowSize)
|
||||
cc.bw.Flush()
|
||||
if cc.werr != nil {
|
||||
cc.Close()
|
||||
@@ -732,11 +824,13 @@ func (cc *ClientConn) healthCheck() {
|
||||
// trigger the healthCheck again if there is no frame received.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), pingTimeout)
|
||||
defer cancel()
|
||||
cc.vlogf("http2: Transport sending health check")
|
||||
err := cc.Ping(ctx)
|
||||
if err != nil {
|
||||
cc.vlogf("http2: Transport health check failure: %v", err)
|
||||
cc.closeForLostPing()
|
||||
cc.t.connPool().MarkDead(cc)
|
||||
return
|
||||
} else {
|
||||
cc.vlogf("http2: Transport health check success")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -907,6 +1001,24 @@ func (cc *ClientConn) onIdleTimeout() {
|
||||
cc.closeIfIdle()
|
||||
}
|
||||
|
||||
func (cc *ClientConn) closeConn() {
|
||||
t := time.AfterFunc(250*time.Millisecond, cc.forceCloseConn)
|
||||
defer t.Stop()
|
||||
cc.tconn.Close()
|
||||
}
|
||||
|
||||
// A tls.Conn.Close can hang for a long time if the peer is unresponsive.
|
||||
// Try to shut it down more aggressively.
|
||||
func (cc *ClientConn) forceCloseConn() {
|
||||
tc, ok := cc.tconn.(*tls.Conn)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if nc := tlsUnderlyingConn(tc); nc != nil {
|
||||
nc.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func (cc *ClientConn) closeIfIdle() {
|
||||
cc.mu.Lock()
|
||||
if len(cc.streams) > 0 || cc.streamsReserved > 0 {
|
||||
@@ -921,7 +1033,7 @@ func (cc *ClientConn) closeIfIdle() {
|
||||
if VerboseLogs {
|
||||
cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2)
|
||||
}
|
||||
cc.tconn.Close()
|
||||
cc.closeConn()
|
||||
}
|
||||
|
||||
func (cc *ClientConn) isDoNotReuseAndIdle() bool {
|
||||
@@ -938,7 +1050,7 @@ func (cc *ClientConn) Shutdown(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
// Wait for all in-flight streams to complete or connection to close
|
||||
done := make(chan error, 1)
|
||||
done := make(chan struct{})
|
||||
cancelled := false // guarded by cc.mu
|
||||
go func() {
|
||||
cc.mu.Lock()
|
||||
@@ -946,7 +1058,7 @@ func (cc *ClientConn) Shutdown(ctx context.Context) error {
|
||||
for {
|
||||
if len(cc.streams) == 0 || cc.closed {
|
||||
cc.closed = true
|
||||
done <- cc.tconn.Close()
|
||||
close(done)
|
||||
break
|
||||
}
|
||||
if cancelled {
|
||||
@@ -957,8 +1069,9 @@ func (cc *ClientConn) Shutdown(ctx context.Context) error {
|
||||
}()
|
||||
shutdownEnterWaitStateHook()
|
||||
select {
|
||||
case err := <-done:
|
||||
return err
|
||||
case <-done:
|
||||
cc.closeConn()
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
cc.mu.Lock()
|
||||
// Free the goroutine above
|
||||
@@ -995,15 +1108,15 @@ func (cc *ClientConn) sendGoAway() error {
|
||||
|
||||
// closes the client connection immediately. In-flight requests are interrupted.
|
||||
// err is sent to streams.
|
||||
func (cc *ClientConn) closeForError(err error) error {
|
||||
func (cc *ClientConn) closeForError(err error) {
|
||||
cc.mu.Lock()
|
||||
cc.closed = true
|
||||
for _, cs := range cc.streams {
|
||||
cs.abortStreamLocked(err)
|
||||
}
|
||||
defer cc.cond.Broadcast()
|
||||
defer cc.mu.Unlock()
|
||||
return cc.tconn.Close()
|
||||
cc.cond.Broadcast()
|
||||
cc.mu.Unlock()
|
||||
cc.closeConn()
|
||||
}
|
||||
|
||||
// Close closes the client connection immediately.
|
||||
@@ -1011,16 +1124,17 @@ func (cc *ClientConn) closeForError(err error) error {
|
||||
// In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
|
||||
func (cc *ClientConn) Close() error {
|
||||
err := errors.New("http2: client connection force closed via ClientConn.Close")
|
||||
return cc.closeForError(err)
|
||||
cc.closeForError(err)
|
||||
return nil
|
||||
}
|
||||
|
||||
// closes the client connection immediately. In-flight requests are interrupted.
|
||||
func (cc *ClientConn) closeForLostPing() error {
|
||||
func (cc *ClientConn) closeForLostPing() {
|
||||
err := errors.New("http2: client connection lost")
|
||||
if f := cc.t.CountError; f != nil {
|
||||
f("conn_close_lost_ping")
|
||||
}
|
||||
return cc.closeForError(err)
|
||||
cc.closeForError(err)
|
||||
}
|
||||
|
||||
// errRequestCanceled is a copy of net/http's errRequestCanceled because it's not
|
||||
@@ -1030,7 +1144,7 @@ var errRequestCanceled = errors.New("net/http: request canceled")
|
||||
func commaSeparatedTrailers(req *http.Request) (string, error) {
|
||||
keys := make([]string, 0, len(req.Trailer))
|
||||
for k := range req.Trailer {
|
||||
k = http.CanonicalHeaderKey(k)
|
||||
k = canonicalHeader(k)
|
||||
switch k {
|
||||
case "Transfer-Encoding", "Trailer", "Content-Length":
|
||||
return "", fmt.Errorf("invalid Trailer key %q", k)
|
||||
@@ -1398,11 +1512,19 @@ func (cs *clientStream) cleanupWriteRequest(err error) {
|
||||
// and in multiple cases: server replies <=299 and >299
|
||||
// while still writing request body
|
||||
cc.mu.Lock()
|
||||
mustCloseBody := false
|
||||
if cs.reqBody != nil && cs.reqBodyClosed == nil {
|
||||
mustCloseBody = true
|
||||
cs.reqBodyClosed = make(chan struct{})
|
||||
}
|
||||
bodyClosed := cs.reqBodyClosed
|
||||
cs.reqBodyClosed = true
|
||||
cc.mu.Unlock()
|
||||
if !bodyClosed && cs.reqBody != nil {
|
||||
if mustCloseBody {
|
||||
cs.reqBody.Close()
|
||||
close(bodyClosed)
|
||||
}
|
||||
if bodyClosed != nil {
|
||||
<-bodyClosed
|
||||
}
|
||||
|
||||
if err != nil && cs.sentEndStream {
|
||||
@@ -1447,7 +1569,7 @@ func (cs *clientStream) cleanupWriteRequest(err error) {
|
||||
close(cs.donec)
|
||||
}
|
||||
|
||||
// awaitOpenSlotForStream waits until len(streams) < maxConcurrentStreams.
|
||||
// awaitOpenSlotForStreamLocked waits until len(streams) < maxConcurrentStreams.
|
||||
// Must hold cc.mu.
|
||||
func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error {
|
||||
for {
|
||||
@@ -1559,7 +1681,7 @@ func (cs *clientStream) writeRequestBody(req *http.Request) (err error) {
|
||||
|
||||
var sawEOF bool
|
||||
for !sawEOF {
|
||||
n, err := body.Read(buf[:len(buf)])
|
||||
n, err := body.Read(buf)
|
||||
if hasContentLen {
|
||||
remainLen -= int64(n)
|
||||
if remainLen == 0 && err == nil {
|
||||
@@ -1582,7 +1704,7 @@ func (cs *clientStream) writeRequestBody(req *http.Request) (err error) {
|
||||
}
|
||||
if err != nil {
|
||||
cc.mu.Lock()
|
||||
bodyClosed := cs.reqBodyClosed
|
||||
bodyClosed := cs.reqBodyClosed != nil
|
||||
cc.mu.Unlock()
|
||||
switch {
|
||||
case bodyClosed:
|
||||
@@ -1677,7 +1799,7 @@ func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error)
|
||||
if cc.closed {
|
||||
return 0, errClientConnClosed
|
||||
}
|
||||
if cs.reqBodyClosed {
|
||||
if cs.reqBodyClosed != nil {
|
||||
return 0, errStopReqBodyWrite
|
||||
}
|
||||
select {
|
||||
@@ -1748,7 +1870,8 @@ func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trail
|
||||
}
|
||||
for _, v := range vv {
|
||||
if !httpguts.ValidHeaderFieldValue(v) {
|
||||
return nil, fmt.Errorf("invalid HTTP header value %q for header %q", v, k)
|
||||
// Don't include the value in the error, because it may be sensitive.
|
||||
return nil, fmt.Errorf("invalid HTTP header value for header %q", k)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1861,7 +1984,7 @@ func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trail
|
||||
|
||||
// Header list size is ok. Write the headers.
|
||||
enumerateHeaders(func(name, value string) {
|
||||
name, ascii := asciiToLower(name)
|
||||
name, ascii := lowerHeader(name)
|
||||
if !ascii {
|
||||
// Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
|
||||
// field names have to be ASCII characters (just as in HTTP/1.x).
|
||||
@@ -1914,7 +2037,7 @@ func (cc *ClientConn) encodeTrailers(trailer http.Header) ([]byte, error) {
|
||||
}
|
||||
|
||||
for k, vv := range trailer {
|
||||
lowKey, ascii := asciiToLower(k)
|
||||
lowKey, ascii := lowerHeader(k)
|
||||
if !ascii {
|
||||
// Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
|
||||
// field names have to be ASCII characters (just as in HTTP/1.x).
|
||||
@@ -1946,8 +2069,7 @@ type resAndError struct {
|
||||
func (cc *ClientConn) addStreamLocked(cs *clientStream) {
|
||||
cs.flow.add(int32(cc.initialWindowSize))
|
||||
cs.flow.setConnFlow(&cc.flow)
|
||||
cs.inflow.add(transportDefaultStreamFlow)
|
||||
cs.inflow.setConnFlow(&cc.inflow)
|
||||
cs.inflow.init(transportDefaultStreamFlow)
|
||||
cs.ID = cc.nextStreamID
|
||||
cc.nextStreamID += 2
|
||||
cc.streams[cs.ID] = cs
|
||||
@@ -1972,13 +2094,13 @@ func (cc *ClientConn) forgetStreamID(id uint32) {
|
||||
// wake up RoundTrip if there is a pending request.
|
||||
cc.cond.Broadcast()
|
||||
|
||||
closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives()
|
||||
closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
|
||||
if closeOnIdle && cc.streamsReserved == 0 && len(cc.streams) == 0 {
|
||||
if VerboseLogs {
|
||||
cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, cc.nextStreamID-2)
|
||||
}
|
||||
cc.closed = true
|
||||
defer cc.tconn.Close()
|
||||
defer cc.closeConn()
|
||||
}
|
||||
|
||||
cc.mu.Unlock()
|
||||
@@ -2025,8 +2147,8 @@ func isEOFOrNetReadError(err error) bool {
|
||||
|
||||
func (rl *clientConnReadLoop) cleanup() {
|
||||
cc := rl.cc
|
||||
defer cc.tconn.Close()
|
||||
defer cc.t.connPool().MarkDead(cc)
|
||||
cc.t.connPool().MarkDead(cc)
|
||||
defer cc.closeConn()
|
||||
defer close(cc.readerDone)
|
||||
|
||||
if cc.idleTimer != nil {
|
||||
@@ -2048,6 +2170,7 @@ func (rl *clientConnReadLoop) cleanup() {
|
||||
err = io.ErrUnexpectedEOF
|
||||
}
|
||||
cc.closed = true
|
||||
|
||||
for _, cs := range cc.streams {
|
||||
select {
|
||||
case <-cs.peerClosed:
|
||||
@@ -2246,7 +2369,7 @@ func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFra
|
||||
Status: status + " " + http.StatusText(statusCode),
|
||||
}
|
||||
for _, hf := range regularFields {
|
||||
key := http.CanonicalHeaderKey(hf.Name)
|
||||
key := canonicalHeader(hf.Name)
|
||||
if key == "Trailer" {
|
||||
t := res.Trailer
|
||||
if t == nil {
|
||||
@@ -2254,7 +2377,7 @@ func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFra
|
||||
res.Trailer = t
|
||||
}
|
||||
foreachHeaderElement(hf.Value, func(v string) {
|
||||
t[http.CanonicalHeaderKey(v)] = nil
|
||||
t[canonicalHeader(v)] = nil
|
||||
})
|
||||
} else {
|
||||
vv := header[key]
|
||||
@@ -2359,7 +2482,7 @@ func (rl *clientConnReadLoop) processTrailers(cs *clientStream, f *MetaHeadersFr
|
||||
|
||||
trailer := make(http.Header)
|
||||
for _, hf := range f.RegularFields() {
|
||||
key := http.CanonicalHeaderKey(hf.Name)
|
||||
key := canonicalHeader(hf.Name)
|
||||
trailer[key] = append(trailer[key], hf.Value)
|
||||
}
|
||||
cs.trailer = trailer
|
||||
@@ -2405,21 +2528,10 @@ func (b transportResponseBody) Read(p []byte) (n int, err error) {
|
||||
}
|
||||
|
||||
cc.mu.Lock()
|
||||
var connAdd, streamAdd int32
|
||||
// Check the conn-level first, before the stream-level.
|
||||
if v := cc.inflow.available(); v < transportDefaultConnFlow/2 {
|
||||
connAdd = transportDefaultConnFlow - v
|
||||
cc.inflow.add(connAdd)
|
||||
}
|
||||
connAdd := cc.inflow.add(n)
|
||||
var streamAdd int32
|
||||
if err == nil { // No need to refresh if the stream is over or failed.
|
||||
// Consider any buffered body data (read from the conn but not
|
||||
// consumed by the client) when computing flow control for this
|
||||
// stream.
|
||||
v := int(cs.inflow.available()) + cs.bufPipe.Len()
|
||||
if v < transportDefaultStreamFlow-transportDefaultStreamMinRefresh {
|
||||
streamAdd = int32(transportDefaultStreamFlow - v)
|
||||
cs.inflow.add(streamAdd)
|
||||
}
|
||||
streamAdd = cs.inflow.add(n)
|
||||
}
|
||||
cc.mu.Unlock()
|
||||
|
||||
@@ -2447,17 +2559,15 @@ func (b transportResponseBody) Close() error {
|
||||
if unread > 0 {
|
||||
cc.mu.Lock()
|
||||
// Return connection-level flow control.
|
||||
if unread > 0 {
|
||||
cc.inflow.add(int32(unread))
|
||||
}
|
||||
connAdd := cc.inflow.add(unread)
|
||||
cc.mu.Unlock()
|
||||
|
||||
// TODO(dneil): Acquiring this mutex can block indefinitely.
|
||||
// Move flow control return to a goroutine?
|
||||
cc.wmu.Lock()
|
||||
// Return connection-level flow control.
|
||||
if unread > 0 {
|
||||
cc.fr.WriteWindowUpdate(0, uint32(unread))
|
||||
if connAdd > 0 {
|
||||
cc.fr.WriteWindowUpdate(0, uint32(connAdd))
|
||||
}
|
||||
cc.bw.Flush()
|
||||
cc.wmu.Unlock()
|
||||
@@ -2500,13 +2610,18 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
|
||||
// But at least return their flow control:
|
||||
if f.Length > 0 {
|
||||
cc.mu.Lock()
|
||||
cc.inflow.add(int32(f.Length))
|
||||
ok := cc.inflow.take(f.Length)
|
||||
connAdd := cc.inflow.add(int(f.Length))
|
||||
cc.mu.Unlock()
|
||||
|
||||
cc.wmu.Lock()
|
||||
cc.fr.WriteWindowUpdate(0, uint32(f.Length))
|
||||
cc.bw.Flush()
|
||||
cc.wmu.Unlock()
|
||||
if !ok {
|
||||
return ConnectionError(ErrCodeFlowControl)
|
||||
}
|
||||
if connAdd > 0 {
|
||||
cc.wmu.Lock()
|
||||
cc.fr.WriteWindowUpdate(0, uint32(connAdd))
|
||||
cc.bw.Flush()
|
||||
cc.wmu.Unlock()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -2537,9 +2652,7 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
|
||||
}
|
||||
// Check connection-level flow control.
|
||||
cc.mu.Lock()
|
||||
if cs.inflow.available() >= int32(f.Length) {
|
||||
cs.inflow.take(int32(f.Length))
|
||||
} else {
|
||||
if !takeInflows(&cc.inflow, &cs.inflow, f.Length) {
|
||||
cc.mu.Unlock()
|
||||
return ConnectionError(ErrCodeFlowControl)
|
||||
}
|
||||
@@ -2561,19 +2674,20 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
|
||||
}
|
||||
}
|
||||
|
||||
if refund > 0 {
|
||||
cc.inflow.add(int32(refund))
|
||||
if !didReset {
|
||||
cs.inflow.add(int32(refund))
|
||||
}
|
||||
sendConn := cc.inflow.add(refund)
|
||||
var sendStream int32
|
||||
if !didReset {
|
||||
sendStream = cs.inflow.add(refund)
|
||||
}
|
||||
cc.mu.Unlock()
|
||||
|
||||
if refund > 0 {
|
||||
if sendConn > 0 || sendStream > 0 {
|
||||
cc.wmu.Lock()
|
||||
cc.fr.WriteWindowUpdate(0, uint32(refund))
|
||||
if !didReset {
|
||||
cc.fr.WriteWindowUpdate(cs.ID, uint32(refund))
|
||||
if sendConn > 0 {
|
||||
cc.fr.WriteWindowUpdate(0, uint32(sendConn))
|
||||
}
|
||||
if sendStream > 0 {
|
||||
cc.fr.WriteWindowUpdate(cs.ID, uint32(sendStream))
|
||||
}
|
||||
cc.bw.Flush()
|
||||
cc.wmu.Unlock()
|
||||
@@ -2641,7 +2755,6 @@ func (rl *clientConnReadLoop) processGoAway(f *GoAwayFrame) error {
|
||||
if fn := cc.t.CountError; fn != nil {
|
||||
fn("recv_goaway_" + f.ErrCode.stringToken())
|
||||
}
|
||||
|
||||
}
|
||||
cc.setGoAway(f)
|
||||
return nil
|
||||
@@ -2706,8 +2819,10 @@ func (rl *clientConnReadLoop) processSettingsNoWrite(f *SettingsFrame) error {
|
||||
cc.cond.Broadcast()
|
||||
|
||||
cc.initialWindowSize = s.Val
|
||||
case SettingHeaderTableSize:
|
||||
cc.henc.SetMaxDynamicTableSize(s.Val)
|
||||
cc.peerMaxHeaderTableSize = s.Val
|
||||
default:
|
||||
// TODO(bradfitz): handle more settings? SETTINGS_HEADER_TABLE_SIZE probably.
|
||||
cc.vlogf("Unhandled Setting: %v", s)
|
||||
}
|
||||
return nil
|
||||
@@ -2881,7 +2996,12 @@ func (t *Transport) logf(format string, args ...interface{}) {
|
||||
log.Printf(format, args...)
|
||||
}
|
||||
|
||||
var noBody io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))
|
||||
var noBody io.ReadCloser = noBodyReader{}
|
||||
|
||||
type noBodyReader struct{}
|
||||
|
||||
func (noBodyReader) Close() error { return nil }
|
||||
func (noBodyReader) Read([]byte) (int, error) { return 0, io.EOF }
|
||||
|
||||
type missingBody struct{}
|
||||
|
||||
@@ -2926,7 +3046,11 @@ func (gz *gzipReader) Read(p []byte) (n int, err error) {
|
||||
}
|
||||
|
||||
func (gz *gzipReader) Close() error {
|
||||
return gz.body.Close()
|
||||
if err := gz.body.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
gz.zerr = fs.ErrClosed
|
||||
return nil
|
||||
}
|
||||
|
||||
type errorReader struct{ err error }
|
||||
@@ -2990,7 +3114,7 @@ func traceGotConn(req *http.Request, cc *ClientConn, reused bool) {
|
||||
cc.mu.Lock()
|
||||
ci.WasIdle = len(cc.streams) == 0 && reused
|
||||
if ci.WasIdle && !cc.lastActive.IsZero() {
|
||||
ci.IdleTime = time.Now().Sub(cc.lastActive)
|
||||
ci.IdleTime = time.Since(cc.lastActive)
|
||||
}
|
||||
cc.mu.Unlock()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user