diff options
Diffstat (limited to 'vendor/google.golang.org/appengine/internal/api.go')
| -rw-r--r-- | vendor/google.golang.org/appengine/internal/api.go | 678 | 
1 files changed, 678 insertions, 0 deletions
diff --git a/vendor/google.golang.org/appengine/internal/api.go b/vendor/google.golang.org/appengine/internal/api.go new file mode 100644 index 0000000..721053c --- /dev/null +++ b/vendor/google.golang.org/appengine/internal/api.go @@ -0,0 +1,678 @@ +// Copyright 2011 Google Inc. All rights reserved. +// Use of this source code is governed by the Apache 2.0 +// license that can be found in the LICENSE file. + +// +build !appengine + +package internal + +import ( +	"bytes" +	"errors" +	"fmt" +	"io/ioutil" +	"log" +	"net" +	"net/http" +	"net/url" +	"os" +	"runtime" +	"strconv" +	"strings" +	"sync" +	"sync/atomic" +	"time" + +	"github.com/golang/protobuf/proto" +	netcontext "golang.org/x/net/context" + +	basepb "google.golang.org/appengine/internal/base" +	logpb "google.golang.org/appengine/internal/log" +	remotepb "google.golang.org/appengine/internal/remote_api" +) + +const ( +	apiPath             = "/rpc_http" +	defaultTicketSuffix = "/default.20150612t184001.0" +) + +var ( +	// Incoming headers. +	ticketHeader       = http.CanonicalHeaderKey("X-AppEngine-API-Ticket") +	dapperHeader       = http.CanonicalHeaderKey("X-Google-DapperTraceInfo") +	traceHeader        = http.CanonicalHeaderKey("X-Cloud-Trace-Context") +	curNamespaceHeader = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace") +	userIPHeader       = http.CanonicalHeaderKey("X-AppEngine-User-IP") +	remoteAddrHeader   = http.CanonicalHeaderKey("X-AppEngine-Remote-Addr") +	devRequestIdHeader = http.CanonicalHeaderKey("X-Appengine-Dev-Request-Id") + +	// Outgoing headers. +	apiEndpointHeader      = http.CanonicalHeaderKey("X-Google-RPC-Service-Endpoint") +	apiEndpointHeaderValue = []string{"app-engine-apis"} +	apiMethodHeader        = http.CanonicalHeaderKey("X-Google-RPC-Service-Method") +	apiMethodHeaderValue   = []string{"/VMRemoteAPI.CallRemoteAPI"} +	apiDeadlineHeader      = http.CanonicalHeaderKey("X-Google-RPC-Service-Deadline") +	apiContentType         = http.CanonicalHeaderKey("Content-Type") +	apiContentTypeValue    = []string{"application/octet-stream"} +	logFlushHeader         = http.CanonicalHeaderKey("X-AppEngine-Log-Flush-Count") + +	apiHTTPClient = &http.Client{ +		Transport: &http.Transport{ +			Proxy:               http.ProxyFromEnvironment, +			Dial:                limitDial, +			MaxIdleConns:        1000, +			MaxIdleConnsPerHost: 10000, +			IdleConnTimeout:     90 * time.Second, +		}, +	} + +	defaultTicketOnce     sync.Once +	defaultTicket         string +	backgroundContextOnce sync.Once +	backgroundContext     netcontext.Context +) + +func apiURL() *url.URL { +	host, port := "appengine.googleapis.internal", "10001" +	if h := os.Getenv("API_HOST"); h != "" { +		host = h +	} +	if p := os.Getenv("API_PORT"); p != "" { +		port = p +	} +	return &url.URL{ +		Scheme: "http", +		Host:   host + ":" + port, +		Path:   apiPath, +	} +} + +func handleHTTP(w http.ResponseWriter, r *http.Request) { +	c := &context{ +		req:       r, +		outHeader: w.Header(), +		apiURL:    apiURL(), +	} +	r = r.WithContext(withContext(r.Context(), c)) +	c.req = r + +	stopFlushing := make(chan int) + +	// Patch up RemoteAddr so it looks reasonable. +	if addr := r.Header.Get(userIPHeader); addr != "" { +		r.RemoteAddr = addr +	} else if addr = r.Header.Get(remoteAddrHeader); addr != "" { +		r.RemoteAddr = addr +	} else { +		// Should not normally reach here, but pick a sensible default anyway. +		r.RemoteAddr = "127.0.0.1" +	} +	// The address in the headers will most likely be of these forms: +	//	123.123.123.123 +	//	2001:db8::1 +	// net/http.Request.RemoteAddr is specified to be in "IP:port" form. +	if _, _, err := net.SplitHostPort(r.RemoteAddr); err != nil { +		// Assume the remote address is only a host; add a default port. +		r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80") +	} + +	// Start goroutine responsible for flushing app logs. +	// This is done after adding c to ctx.m (and stopped before removing it) +	// because flushing logs requires making an API call. +	go c.logFlusher(stopFlushing) + +	executeRequestSafely(c, r) +	c.outHeader = nil // make sure header changes aren't respected any more + +	stopFlushing <- 1 // any logging beyond this point will be dropped + +	// Flush any pending logs asynchronously. +	c.pendingLogs.Lock() +	flushes := c.pendingLogs.flushes +	if len(c.pendingLogs.lines) > 0 { +		flushes++ +	} +	c.pendingLogs.Unlock() +	flushed := make(chan struct{}) +	go func() { +		defer close(flushed) +		// Force a log flush, because with very short requests we +		// may not ever flush logs. +		c.flushLog(true) +	}() +	w.Header().Set(logFlushHeader, strconv.Itoa(flushes)) + +	// Avoid nil Write call if c.Write is never called. +	if c.outCode != 0 { +		w.WriteHeader(c.outCode) +	} +	if c.outBody != nil { +		w.Write(c.outBody) +	} +	// Wait for the last flush to complete before returning, +	// otherwise the security ticket will not be valid. +	<-flushed +} + +func executeRequestSafely(c *context, r *http.Request) { +	defer func() { +		if x := recover(); x != nil { +			logf(c, 4, "%s", renderPanic(x)) // 4 == critical +			c.outCode = 500 +		} +	}() + +	http.DefaultServeMux.ServeHTTP(c, r) +} + +func renderPanic(x interface{}) string { +	buf := make([]byte, 16<<10) // 16 KB should be plenty +	buf = buf[:runtime.Stack(buf, false)] + +	// Remove the first few stack frames: +	//   this func +	//   the recover closure in the caller +	// That will root the stack trace at the site of the panic. +	const ( +		skipStart  = "internal.renderPanic" +		skipFrames = 2 +	) +	start := bytes.Index(buf, []byte(skipStart)) +	p := start +	for i := 0; i < skipFrames*2 && p+1 < len(buf); i++ { +		p = bytes.IndexByte(buf[p+1:], '\n') + p + 1 +		if p < 0 { +			break +		} +	} +	if p >= 0 { +		// buf[start:p+1] is the block to remove. +		// Copy buf[p+1:] over buf[start:] and shrink buf. +		copy(buf[start:], buf[p+1:]) +		buf = buf[:len(buf)-(p+1-start)] +	} + +	// Add panic heading. +	head := fmt.Sprintf("panic: %v\n\n", x) +	if len(head) > len(buf) { +		// Extremely unlikely to happen. +		return head +	} +	copy(buf[len(head):], buf) +	copy(buf, head) + +	return string(buf) +} + +// context represents the context of an in-flight HTTP request. +// It implements the appengine.Context and http.ResponseWriter interfaces. +type context struct { +	req *http.Request + +	outCode   int +	outHeader http.Header +	outBody   []byte + +	pendingLogs struct { +		sync.Mutex +		lines   []*logpb.UserAppLogLine +		flushes int +	} + +	apiURL *url.URL +} + +var contextKey = "holds a *context" + +// jointContext joins two contexts in a superficial way. +// It takes values and timeouts from a base context, and only values from another context. +type jointContext struct { +	base       netcontext.Context +	valuesOnly netcontext.Context +} + +func (c jointContext) Deadline() (time.Time, bool) { +	return c.base.Deadline() +} + +func (c jointContext) Done() <-chan struct{} { +	return c.base.Done() +} + +func (c jointContext) Err() error { +	return c.base.Err() +} + +func (c jointContext) Value(key interface{}) interface{} { +	if val := c.base.Value(key); val != nil { +		return val +	} +	return c.valuesOnly.Value(key) +} + +// fromContext returns the App Engine context or nil if ctx is not +// derived from an App Engine context. +func fromContext(ctx netcontext.Context) *context { +	c, _ := ctx.Value(&contextKey).(*context) +	return c +} + +func withContext(parent netcontext.Context, c *context) netcontext.Context { +	ctx := netcontext.WithValue(parent, &contextKey, c) +	if ns := c.req.Header.Get(curNamespaceHeader); ns != "" { +		ctx = withNamespace(ctx, ns) +	} +	return ctx +} + +func toContext(c *context) netcontext.Context { +	return withContext(netcontext.Background(), c) +} + +func IncomingHeaders(ctx netcontext.Context) http.Header { +	if c := fromContext(ctx); c != nil { +		return c.req.Header +	} +	return nil +} + +func ReqContext(req *http.Request) netcontext.Context { +	return req.Context() +} + +func WithContext(parent netcontext.Context, req *http.Request) netcontext.Context { +	return jointContext{ +		base:       parent, +		valuesOnly: req.Context(), +	} +} + +// DefaultTicket returns a ticket used for background context or dev_appserver. +func DefaultTicket() string { +	defaultTicketOnce.Do(func() { +		if IsDevAppServer() { +			defaultTicket = "testapp" + defaultTicketSuffix +			return +		} +		appID := partitionlessAppID() +		escAppID := strings.Replace(strings.Replace(appID, ":", "_", -1), ".", "_", -1) +		majVersion := VersionID(nil) +		if i := strings.Index(majVersion, "."); i > 0 { +			majVersion = majVersion[:i] +		} +		defaultTicket = fmt.Sprintf("%s/%s.%s.%s", escAppID, ModuleName(nil), majVersion, InstanceID()) +	}) +	return defaultTicket +} + +func BackgroundContext() netcontext.Context { +	backgroundContextOnce.Do(func() { +		// Compute background security ticket. +		ticket := DefaultTicket() + +		c := &context{ +			req: &http.Request{ +				Header: http.Header{ +					ticketHeader: []string{ticket}, +				}, +			}, +			apiURL: apiURL(), +		} +		backgroundContext = toContext(c) + +		// TODO(dsymonds): Wire up the shutdown handler to do a final flush. +		go c.logFlusher(make(chan int)) +	}) + +	return backgroundContext +} + +// RegisterTestRequest registers the HTTP request req for testing, such that +// any API calls are sent to the provided URL. It returns a closure to delete +// the registration. +// It should only be used by aetest package. +func RegisterTestRequest(req *http.Request, apiURL *url.URL, decorate func(netcontext.Context) netcontext.Context) (*http.Request, func()) { +	c := &context{ +		req:    req, +		apiURL: apiURL, +	} +	ctx := withContext(decorate(req.Context()), c) +	req = req.WithContext(ctx) +	c.req = req +	return req, func() {} +} + +var errTimeout = &CallError{ +	Detail:  "Deadline exceeded", +	Code:    int32(remotepb.RpcError_CANCELLED), +	Timeout: true, +} + +func (c *context) Header() http.Header { return c.outHeader } + +// Copied from $GOROOT/src/pkg/net/http/transfer.go. Some response status +// codes do not permit a response body (nor response entity headers such as +// Content-Length, Content-Type, etc). +func bodyAllowedForStatus(status int) bool { +	switch { +	case status >= 100 && status <= 199: +		return false +	case status == 204: +		return false +	case status == 304: +		return false +	} +	return true +} + +func (c *context) Write(b []byte) (int, error) { +	if c.outCode == 0 { +		c.WriteHeader(http.StatusOK) +	} +	if len(b) > 0 && !bodyAllowedForStatus(c.outCode) { +		return 0, http.ErrBodyNotAllowed +	} +	c.outBody = append(c.outBody, b...) +	return len(b), nil +} + +func (c *context) WriteHeader(code int) { +	if c.outCode != 0 { +		logf(c, 3, "WriteHeader called multiple times on request.") // error level +		return +	} +	c.outCode = code +} + +func (c *context) post(body []byte, timeout time.Duration) (b []byte, err error) { +	hreq := &http.Request{ +		Method: "POST", +		URL:    c.apiURL, +		Header: http.Header{ +			apiEndpointHeader: apiEndpointHeaderValue, +			apiMethodHeader:   apiMethodHeaderValue, +			apiContentType:    apiContentTypeValue, +			apiDeadlineHeader: []string{strconv.FormatFloat(timeout.Seconds(), 'f', -1, 64)}, +		}, +		Body:          ioutil.NopCloser(bytes.NewReader(body)), +		ContentLength: int64(len(body)), +		Host:          c.apiURL.Host, +	} +	if info := c.req.Header.Get(dapperHeader); info != "" { +		hreq.Header.Set(dapperHeader, info) +	} +	if info := c.req.Header.Get(traceHeader); info != "" { +		hreq.Header.Set(traceHeader, info) +	} + +	tr := apiHTTPClient.Transport.(*http.Transport) + +	var timedOut int32 // atomic; set to 1 if timed out +	t := time.AfterFunc(timeout, func() { +		atomic.StoreInt32(&timedOut, 1) +		tr.CancelRequest(hreq) +	}) +	defer t.Stop() +	defer func() { +		// Check if timeout was exceeded. +		if atomic.LoadInt32(&timedOut) != 0 { +			err = errTimeout +		} +	}() + +	hresp, err := apiHTTPClient.Do(hreq) +	if err != nil { +		return nil, &CallError{ +			Detail: fmt.Sprintf("service bridge HTTP failed: %v", err), +			Code:   int32(remotepb.RpcError_UNKNOWN), +		} +	} +	defer hresp.Body.Close() +	hrespBody, err := ioutil.ReadAll(hresp.Body) +	if hresp.StatusCode != 200 { +		return nil, &CallError{ +			Detail: fmt.Sprintf("service bridge returned HTTP %d (%q)", hresp.StatusCode, hrespBody), +			Code:   int32(remotepb.RpcError_UNKNOWN), +		} +	} +	if err != nil { +		return nil, &CallError{ +			Detail: fmt.Sprintf("service bridge response bad: %v", err), +			Code:   int32(remotepb.RpcError_UNKNOWN), +		} +	} +	return hrespBody, nil +} + +func Call(ctx netcontext.Context, service, method string, in, out proto.Message) error { +	if ns := NamespaceFromContext(ctx); ns != "" { +		if fn, ok := NamespaceMods[service]; ok { +			fn(in, ns) +		} +	} + +	if f, ctx, ok := callOverrideFromContext(ctx); ok { +		return f(ctx, service, method, in, out) +	} + +	// Handle already-done contexts quickly. +	select { +	case <-ctx.Done(): +		return ctx.Err() +	default: +	} + +	c := fromContext(ctx) +	if c == nil { +		// Give a good error message rather than a panic lower down. +		return errNotAppEngineContext +	} + +	// Apply transaction modifications if we're in a transaction. +	if t := transactionFromContext(ctx); t != nil { +		if t.finished { +			return errors.New("transaction context has expired") +		} +		applyTransaction(in, &t.transaction) +	} + +	// Default RPC timeout is 60s. +	timeout := 60 * time.Second +	if deadline, ok := ctx.Deadline(); ok { +		timeout = deadline.Sub(time.Now()) +	} + +	data, err := proto.Marshal(in) +	if err != nil { +		return err +	} + +	ticket := c.req.Header.Get(ticketHeader) +	// Use a test ticket under test environment. +	if ticket == "" { +		if appid := ctx.Value(&appIDOverrideKey); appid != nil { +			ticket = appid.(string) + defaultTicketSuffix +		} +	} +	// Fall back to use background ticket when the request ticket is not available in Flex or dev_appserver. +	if ticket == "" { +		ticket = DefaultTicket() +	} +	if dri := c.req.Header.Get(devRequestIdHeader); IsDevAppServer() && dri != "" { +		ticket = dri +	} +	req := &remotepb.Request{ +		ServiceName: &service, +		Method:      &method, +		Request:     data, +		RequestId:   &ticket, +	} +	hreqBody, err := proto.Marshal(req) +	if err != nil { +		return err +	} + +	hrespBody, err := c.post(hreqBody, timeout) +	if err != nil { +		return err +	} + +	res := &remotepb.Response{} +	if err := proto.Unmarshal(hrespBody, res); err != nil { +		return err +	} +	if res.RpcError != nil { +		ce := &CallError{ +			Detail: res.RpcError.GetDetail(), +			Code:   *res.RpcError.Code, +		} +		switch remotepb.RpcError_ErrorCode(ce.Code) { +		case remotepb.RpcError_CANCELLED, remotepb.RpcError_DEADLINE_EXCEEDED: +			ce.Timeout = true +		} +		return ce +	} +	if res.ApplicationError != nil { +		return &APIError{ +			Service: *req.ServiceName, +			Detail:  res.ApplicationError.GetDetail(), +			Code:    *res.ApplicationError.Code, +		} +	} +	if res.Exception != nil || res.JavaException != nil { +		// This shouldn't happen, but let's be defensive. +		return &CallError{ +			Detail: "service bridge returned exception", +			Code:   int32(remotepb.RpcError_UNKNOWN), +		} +	} +	return proto.Unmarshal(res.Response, out) +} + +func (c *context) Request() *http.Request { +	return c.req +} + +func (c *context) addLogLine(ll *logpb.UserAppLogLine) { +	// Truncate long log lines. +	// TODO(dsymonds): Check if this is still necessary. +	const lim = 8 << 10 +	if len(*ll.Message) > lim { +		suffix := fmt.Sprintf("...(length %d)", len(*ll.Message)) +		ll.Message = proto.String((*ll.Message)[:lim-len(suffix)] + suffix) +	} + +	c.pendingLogs.Lock() +	c.pendingLogs.lines = append(c.pendingLogs.lines, ll) +	c.pendingLogs.Unlock() +} + +var logLevelName = map[int64]string{ +	0: "DEBUG", +	1: "INFO", +	2: "WARNING", +	3: "ERROR", +	4: "CRITICAL", +} + +func logf(c *context, level int64, format string, args ...interface{}) { +	if c == nil { +		panic("not an App Engine context") +	} +	s := fmt.Sprintf(format, args...) +	s = strings.TrimRight(s, "\n") // Remove any trailing newline characters. +	c.addLogLine(&logpb.UserAppLogLine{ +		TimestampUsec: proto.Int64(time.Now().UnixNano() / 1e3), +		Level:         &level, +		Message:       &s, +	}) +	// Only duplicate log to stderr if not running on App Engine second generation +	if !IsSecondGen() { +		log.Print(logLevelName[level] + ": " + s) +	} +} + +// flushLog attempts to flush any pending logs to the appserver. +// It should not be called concurrently. +func (c *context) flushLog(force bool) (flushed bool) { +	c.pendingLogs.Lock() +	// Grab up to 30 MB. We can get away with up to 32 MB, but let's be cautious. +	n, rem := 0, 30<<20 +	for ; n < len(c.pendingLogs.lines); n++ { +		ll := c.pendingLogs.lines[n] +		// Each log line will require about 3 bytes of overhead. +		nb := proto.Size(ll) + 3 +		if nb > rem { +			break +		} +		rem -= nb +	} +	lines := c.pendingLogs.lines[:n] +	c.pendingLogs.lines = c.pendingLogs.lines[n:] +	c.pendingLogs.Unlock() + +	if len(lines) == 0 && !force { +		// Nothing to flush. +		return false +	} + +	rescueLogs := false +	defer func() { +		if rescueLogs { +			c.pendingLogs.Lock() +			c.pendingLogs.lines = append(lines, c.pendingLogs.lines...) +			c.pendingLogs.Unlock() +		} +	}() + +	buf, err := proto.Marshal(&logpb.UserAppLogGroup{ +		LogLine: lines, +	}) +	if err != nil { +		log.Printf("internal.flushLog: marshaling UserAppLogGroup: %v", err) +		rescueLogs = true +		return false +	} + +	req := &logpb.FlushRequest{ +		Logs: buf, +	} +	res := &basepb.VoidProto{} +	c.pendingLogs.Lock() +	c.pendingLogs.flushes++ +	c.pendingLogs.Unlock() +	if err := Call(toContext(c), "logservice", "Flush", req, res); err != nil { +		log.Printf("internal.flushLog: Flush RPC: %v", err) +		rescueLogs = true +		return false +	} +	return true +} + +const ( +	// Log flushing parameters. +	flushInterval      = 1 * time.Second +	forceFlushInterval = 60 * time.Second +) + +func (c *context) logFlusher(stop <-chan int) { +	lastFlush := time.Now() +	tick := time.NewTicker(flushInterval) +	for { +		select { +		case <-stop: +			// Request finished. +			tick.Stop() +			return +		case <-tick.C: +			force := time.Now().Sub(lastFlush) > forceFlushInterval +			if c.flushLog(force) { +				lastFlush = time.Now() +			} +		} +	} +} + +func ContextForTesting(req *http.Request) netcontext.Context { +	return toContext(&context{req: req}) +}  | 
