summaryrefslogtreecommitdiff
path: root/vendor/golang.org/x/net/http2/transport.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/golang.org/x/net/http2/transport.go')
-rw-r--r--vendor/golang.org/x/net/http2/transport.go99
1 files changed, 43 insertions, 56 deletions
diff --git a/vendor/golang.org/x/net/http2/transport.go b/vendor/golang.org/x/net/http2/transport.go
index 30f706e..f965579 100644
--- a/vendor/golang.org/x/net/http2/transport.go
+++ b/vendor/golang.org/x/net/http2/transport.go
@@ -47,10 +47,6 @@ const (
// we buffer per stream.
transportDefaultStreamFlow = 4 << 20
- // transportDefaultStreamMinRefresh is the minimum number of bytes we'll send
- // a stream-level WINDOW_UPDATE for at a time.
- transportDefaultStreamMinRefresh = 4 << 10
-
defaultUserAgent = "Go-http-client/2.0"
// initialMaxConcurrentStreams is a connections maxConcurrentStreams until
@@ -310,8 +306,8 @@ type ClientConn struct {
mu sync.Mutex // guards following
cond *sync.Cond // hold mu; broadcast on flow/closed changes
- flow flow // our conn-level flow control quota (cs.flow is per stream)
- inflow flow // peer's conn-level flow control
+ flow outflow // our conn-level flow control quota (cs.outflow is per stream)
+ inflow inflow // peer's conn-level flow control
doNotReuse bool // whether conn is marked to not be reused for any future requests
closing bool
closed bool
@@ -376,10 +372,10 @@ type clientStream struct {
respHeaderRecv chan struct{} // closed when headers are received
res *http.Response // set if respHeaderRecv is closed
- flow flow // guarded by cc.mu
- inflow flow // guarded by cc.mu
- bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read
- readErr error // sticky read error; owned by transportResponseBody.Read
+ flow outflow // guarded by cc.mu
+ inflow inflow // guarded by cc.mu
+ bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read
+ readErr error // sticky read error; owned by transportResponseBody.Read
reqBody io.ReadCloser
reqBodyContentLength int64 // -1 means unknown
@@ -564,10 +560,11 @@ func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Res
traceGotConn(req, cc, reused)
res, err := cc.RoundTrip(req)
if err != nil && retry <= 6 {
+ roundTripErr := err
if req, err = shouldRetryRequest(req, err); err == nil {
// After the first retry, do exponential backoff with 10% jitter.
if retry == 0 {
- t.vlogf("RoundTrip retrying after failure: %v", err)
+ t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
continue
}
backoff := float64(uint(1) << (uint(retry) - 1))
@@ -576,7 +573,7 @@ func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Res
timer := backoffNewTimer(d)
select {
case <-timer.C:
- t.vlogf("RoundTrip retrying after failure: %v", err)
+ t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
continue
case <-req.Context().Done():
timer.Stop()
@@ -811,7 +808,7 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
cc.bw.Write(clientPreface)
cc.fr.WriteSettings(initialSettings...)
cc.fr.WriteWindowUpdate(0, transportDefaultConnFlow)
- cc.inflow.add(transportDefaultConnFlow + initialWindowSize)
+ cc.inflow.init(transportDefaultConnFlow + initialWindowSize)
cc.bw.Flush()
if cc.werr != nil {
cc.Close()
@@ -1573,7 +1570,7 @@ func (cs *clientStream) cleanupWriteRequest(err error) {
close(cs.donec)
}
-// awaitOpenSlotForStream waits until len(streams) < maxConcurrentStreams.
+// awaitOpenSlotForStreamLocked waits until len(streams) < maxConcurrentStreams.
// Must hold cc.mu.
func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error {
for {
@@ -2073,8 +2070,7 @@ type resAndError struct {
func (cc *ClientConn) addStreamLocked(cs *clientStream) {
cs.flow.add(int32(cc.initialWindowSize))
cs.flow.setConnFlow(&cc.flow)
- cs.inflow.add(transportDefaultStreamFlow)
- cs.inflow.setConnFlow(&cc.inflow)
+ cs.inflow.init(transportDefaultStreamFlow)
cs.ID = cc.nextStreamID
cc.nextStreamID += 2
cc.streams[cs.ID] = cs
@@ -2533,21 +2529,10 @@ func (b transportResponseBody) Read(p []byte) (n int, err error) {
}
cc.mu.Lock()
- var connAdd, streamAdd int32
- // Check the conn-level first, before the stream-level.
- if v := cc.inflow.available(); v < transportDefaultConnFlow/2 {
- connAdd = transportDefaultConnFlow - v
- cc.inflow.add(connAdd)
- }
+ connAdd := cc.inflow.add(n)
+ var streamAdd int32
if err == nil { // No need to refresh if the stream is over or failed.
- // Consider any buffered body data (read from the conn but not
- // consumed by the client) when computing flow control for this
- // stream.
- v := int(cs.inflow.available()) + cs.bufPipe.Len()
- if v < transportDefaultStreamFlow-transportDefaultStreamMinRefresh {
- streamAdd = int32(transportDefaultStreamFlow - v)
- cs.inflow.add(streamAdd)
- }
+ streamAdd = cs.inflow.add(n)
}
cc.mu.Unlock()
@@ -2571,29 +2556,27 @@ func (b transportResponseBody) Close() error {
cs := b.cs
cc := cs.cc
+ cs.bufPipe.BreakWithError(errClosedResponseBody)
+ cs.abortStream(errClosedResponseBody)
+
unread := cs.bufPipe.Len()
if unread > 0 {
cc.mu.Lock()
// Return connection-level flow control.
- if unread > 0 {
- cc.inflow.add(int32(unread))
- }
+ connAdd := cc.inflow.add(unread)
cc.mu.Unlock()
// TODO(dneil): Acquiring this mutex can block indefinitely.
// Move flow control return to a goroutine?
cc.wmu.Lock()
// Return connection-level flow control.
- if unread > 0 {
- cc.fr.WriteWindowUpdate(0, uint32(unread))
+ if connAdd > 0 {
+ cc.fr.WriteWindowUpdate(0, uint32(connAdd))
}
cc.bw.Flush()
cc.wmu.Unlock()
}
- cs.bufPipe.BreakWithError(errClosedResponseBody)
- cs.abortStream(errClosedResponseBody)
-
select {
case <-cs.donec:
case <-cs.ctx.Done():
@@ -2628,13 +2611,18 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
// But at least return their flow control:
if f.Length > 0 {
cc.mu.Lock()
- cc.inflow.add(int32(f.Length))
+ ok := cc.inflow.take(f.Length)
+ connAdd := cc.inflow.add(int(f.Length))
cc.mu.Unlock()
-
- cc.wmu.Lock()
- cc.fr.WriteWindowUpdate(0, uint32(f.Length))
- cc.bw.Flush()
- cc.wmu.Unlock()
+ if !ok {
+ return ConnectionError(ErrCodeFlowControl)
+ }
+ if connAdd > 0 {
+ cc.wmu.Lock()
+ cc.fr.WriteWindowUpdate(0, uint32(connAdd))
+ cc.bw.Flush()
+ cc.wmu.Unlock()
+ }
}
return nil
}
@@ -2665,9 +2653,7 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
}
// Check connection-level flow control.
cc.mu.Lock()
- if cs.inflow.available() >= int32(f.Length) {
- cs.inflow.take(int32(f.Length))
- } else {
+ if !takeInflows(&cc.inflow, &cs.inflow, f.Length) {
cc.mu.Unlock()
return ConnectionError(ErrCodeFlowControl)
}
@@ -2689,19 +2675,20 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
}
}
- if refund > 0 {
- cc.inflow.add(int32(refund))
- if !didReset {
- cs.inflow.add(int32(refund))
- }
+ sendConn := cc.inflow.add(refund)
+ var sendStream int32
+ if !didReset {
+ sendStream = cs.inflow.add(refund)
}
cc.mu.Unlock()
- if refund > 0 {
+ if sendConn > 0 || sendStream > 0 {
cc.wmu.Lock()
- cc.fr.WriteWindowUpdate(0, uint32(refund))
- if !didReset {
- cc.fr.WriteWindowUpdate(cs.ID, uint32(refund))
+ if sendConn > 0 {
+ cc.fr.WriteWindowUpdate(0, uint32(sendConn))
+ }
+ if sendStream > 0 {
+ cc.fr.WriteWindowUpdate(cs.ID, uint32(sendStream))
}
cc.bw.Flush()
cc.wmu.Unlock()