diff options
Diffstat (limited to 'vendor/go.opencensus.io/stats/view/worker.go')
| -rw-r--r-- | vendor/go.opencensus.io/stats/view/worker.go | 424 |
1 files changed, 0 insertions, 424 deletions
diff --git a/vendor/go.opencensus.io/stats/view/worker.go b/vendor/go.opencensus.io/stats/view/worker.go deleted file mode 100644 index 6a79cd8..0000000 --- a/vendor/go.opencensus.io/stats/view/worker.go +++ /dev/null @@ -1,424 +0,0 @@ -// Copyright 2017, OpenCensus Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -package view - -import ( - "fmt" - "sync" - "time" - - "go.opencensus.io/resource" - - "go.opencensus.io/metric/metricdata" - "go.opencensus.io/metric/metricproducer" - "go.opencensus.io/stats" - "go.opencensus.io/stats/internal" - "go.opencensus.io/tag" -) - -func init() { - defaultWorker = NewMeter().(*worker) - go defaultWorker.start() - internal.DefaultRecorder = record - internal.MeasurementRecorder = recordMeasurement -} - -type measureRef struct { - measure string - views map[*viewInternal]struct{} -} - -type worker struct { - measures map[string]*measureRef - views map[string]*viewInternal - viewStartTimes map[*viewInternal]time.Time - - timer *time.Ticker - c chan command - quit, done chan bool - mu sync.RWMutex - r *resource.Resource - - exportersMu sync.RWMutex - exporters map[Exporter]struct{} -} - -// Meter defines an interface which allows a single process to maintain -// multiple sets of metrics exports (intended for the advanced case where a -// single process wants to report metrics about multiple objects, such as -// multiple databases or HTTP services). -// -// Note that this is an advanced use case, and the static functions in this -// module should cover the common use cases. -type Meter interface { - stats.Recorder - // Find returns a registered view associated with this name. - // If no registered view is found, nil is returned. - Find(name string) *View - // Register begins collecting data for the given views. - // Once a view is registered, it reports data to the registered exporters. - Register(views ...*View) error - // Unregister the given views. Data will not longer be exported for these views - // after Unregister returns. - // It is not necessary to unregister from views you expect to collect for the - // duration of your program execution. - Unregister(views ...*View) - // SetReportingPeriod sets the interval between reporting aggregated views in - // the program. If duration is less than or equal to zero, it enables the - // default behavior. - // - // Note: each exporter makes different promises about what the lowest supported - // duration is. For example, the Stackdriver exporter recommends a value no - // lower than 1 minute. Consult each exporter per your needs. - SetReportingPeriod(time.Duration) - - // RegisterExporter registers an exporter. - // Collected data will be reported via all the - // registered exporters. Once you no longer - // want data to be exported, invoke UnregisterExporter - // with the previously registered exporter. - // - // Binaries can register exporters, libraries shouldn't register exporters. - RegisterExporter(Exporter) - // UnregisterExporter unregisters an exporter. - UnregisterExporter(Exporter) - // SetResource may be used to set the Resource associated with this registry. - // This is intended to be used in cases where a single process exports metrics - // for multiple Resources, typically in a multi-tenant situation. - SetResource(*resource.Resource) - - // Start causes the Meter to start processing Record calls and aggregating - // statistics as well as exporting data. - Start() - // Stop causes the Meter to stop processing calls and terminate data export. - Stop() - - // RetrieveData gets a snapshot of the data collected for the the view registered - // with the given name. It is intended for testing only. - RetrieveData(viewName string) ([]*Row, error) -} - -var _ Meter = (*worker)(nil) - -var defaultWorker *worker - -var defaultReportingDuration = 10 * time.Second - -// Find returns a registered view associated with this name. -// If no registered view is found, nil is returned. -func Find(name string) (v *View) { - return defaultWorker.Find(name) -} - -// Find returns a registered view associated with this name. -// If no registered view is found, nil is returned. -func (w *worker) Find(name string) (v *View) { - req := &getViewByNameReq{ - name: name, - c: make(chan *getViewByNameResp), - } - w.c <- req - resp := <-req.c - return resp.v -} - -// Register begins collecting data for the given views. -// Once a view is registered, it reports data to the registered exporters. -func Register(views ...*View) error { - return defaultWorker.Register(views...) -} - -// Register begins collecting data for the given views. -// Once a view is registered, it reports data to the registered exporters. -func (w *worker) Register(views ...*View) error { - req := ®isterViewReq{ - views: views, - err: make(chan error), - } - w.c <- req - return <-req.err -} - -// Unregister the given views. Data will not longer be exported for these views -// after Unregister returns. -// It is not necessary to unregister from views you expect to collect for the -// duration of your program execution. -func Unregister(views ...*View) { - defaultWorker.Unregister(views...) -} - -// Unregister the given views. Data will not longer be exported for these views -// after Unregister returns. -// It is not necessary to unregister from views you expect to collect for the -// duration of your program execution. -func (w *worker) Unregister(views ...*View) { - names := make([]string, len(views)) - for i := range views { - names[i] = views[i].Name - } - req := &unregisterFromViewReq{ - views: names, - done: make(chan struct{}), - } - w.c <- req - <-req.done -} - -// RetrieveData gets a snapshot of the data collected for the the view registered -// with the given name. It is intended for testing only. -func RetrieveData(viewName string) ([]*Row, error) { - return defaultWorker.RetrieveData(viewName) -} - -// RetrieveData gets a snapshot of the data collected for the the view registered -// with the given name. It is intended for testing only. -func (w *worker) RetrieveData(viewName string) ([]*Row, error) { - req := &retrieveDataReq{ - now: time.Now(), - v: viewName, - c: make(chan *retrieveDataResp), - } - w.c <- req - resp := <-req.c - return resp.rows, resp.err -} - -func record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) { - defaultWorker.Record(tags, ms, attachments) -} - -func recordMeasurement(tags *tag.Map, ms []stats.Measurement, attachments map[string]interface{}) { - defaultWorker.recordMeasurement(tags, ms, attachments) -} - -// Record records a set of measurements ms associated with the given tags and attachments. -func (w *worker) Record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) { - w.recordMeasurement(tags, ms.([]stats.Measurement), attachments) -} - -// recordMeasurement records a set of measurements ms associated with the given tags and attachments. -// This is the same as Record but without an interface{} type to avoid allocations -func (w *worker) recordMeasurement(tags *tag.Map, ms []stats.Measurement, attachments map[string]interface{}) { - req := &recordReq{ - tm: tags, - ms: ms, - attachments: attachments, - t: time.Now(), - } - w.c <- req -} - -// SetReportingPeriod sets the interval between reporting aggregated views in -// the program. If duration is less than or equal to zero, it enables the -// default behavior. -// -// Note: each exporter makes different promises about what the lowest supported -// duration is. For example, the Stackdriver exporter recommends a value no -// lower than 1 minute. Consult each exporter per your needs. -func SetReportingPeriod(d time.Duration) { - defaultWorker.SetReportingPeriod(d) -} - -// Stop stops the default worker. -func Stop() { - defaultWorker.Stop() -} - -// SetReportingPeriod sets the interval between reporting aggregated views in -// the program. If duration is less than or equal to zero, it enables the -// default behavior. -// -// Note: each exporter makes different promises about what the lowest supported -// duration is. For example, the Stackdriver exporter recommends a value no -// lower than 1 minute. Consult each exporter per your needs. -func (w *worker) SetReportingPeriod(d time.Duration) { - // TODO(acetechnologist): ensure that the duration d is more than a certain - // value. e.g. 1s - req := &setReportingPeriodReq{ - d: d, - c: make(chan bool), - } - w.c <- req - <-req.c // don't return until the timer is set to the new duration. -} - -// NewMeter constructs a Meter instance. You should only need to use this if -// you need to separate out Measurement recordings and View aggregations within -// a single process. -func NewMeter() Meter { - return &worker{ - measures: make(map[string]*measureRef), - views: make(map[string]*viewInternal), - viewStartTimes: make(map[*viewInternal]time.Time), - timer: time.NewTicker(defaultReportingDuration), - c: make(chan command, 1024), - quit: make(chan bool), - done: make(chan bool), - - exporters: make(map[Exporter]struct{}), - } -} - -// SetResource associates all data collected by this Meter with the specified -// resource. This resource is reported when using metricexport.ReadAndExport; -// it is not provided when used with ExportView/RegisterExporter, because that -// interface does not provide a means for reporting the Resource. -func (w *worker) SetResource(r *resource.Resource) { - w.r = r -} - -func (w *worker) Start() { - go w.start() -} - -func (w *worker) start() { - prodMgr := metricproducer.GlobalManager() - prodMgr.AddProducer(w) - - for { - select { - case cmd := <-w.c: - cmd.handleCommand(w) - case <-w.timer.C: - w.reportUsage() - case <-w.quit: - w.timer.Stop() - close(w.c) - close(w.done) - return - } - } -} - -func (w *worker) Stop() { - prodMgr := metricproducer.GlobalManager() - prodMgr.DeleteProducer(w) - select { - case <-w.quit: - default: - close(w.quit) - } - <-w.done -} - -func (w *worker) getMeasureRef(name string) *measureRef { - if mr, ok := w.measures[name]; ok { - return mr - } - mr := &measureRef{ - measure: name, - views: make(map[*viewInternal]struct{}), - } - w.measures[name] = mr - return mr -} - -func (w *worker) tryRegisterView(v *View) (*viewInternal, error) { - w.mu.Lock() - defer w.mu.Unlock() - vi, err := newViewInternal(v) - if err != nil { - return nil, err - } - if x, ok := w.views[vi.view.Name]; ok { - if !x.view.same(vi.view) { - return nil, fmt.Errorf("cannot register view %q; a different view with the same name is already registered", v.Name) - } - - // the view is already registered so there is nothing to do and the - // command is considered successful. - return x, nil - } - w.views[vi.view.Name] = vi - w.viewStartTimes[vi] = time.Now() - ref := w.getMeasureRef(vi.view.Measure.Name()) - ref.views[vi] = struct{}{} - return vi, nil -} - -func (w *worker) unregisterView(v *viewInternal) { - w.mu.Lock() - defer w.mu.Unlock() - delete(w.views, v.view.Name) - delete(w.viewStartTimes, v) - if measure := w.measures[v.view.Measure.Name()]; measure != nil { - delete(measure.views, v) - } -} - -func (w *worker) reportView(v *viewInternal) { - if !v.isSubscribed() { - return - } - rows := v.collectedRows() - viewData := &Data{ - View: v.view, - Start: w.viewStartTimes[v], - End: time.Now(), - Rows: rows, - } - w.exportersMu.Lock() - defer w.exportersMu.Unlock() - for e := range w.exporters { - e.ExportView(viewData) - } -} - -func (w *worker) reportUsage() { - w.mu.Lock() - defer w.mu.Unlock() - for _, v := range w.views { - w.reportView(v) - } -} - -func (w *worker) toMetric(v *viewInternal, now time.Time) *metricdata.Metric { - if !v.isSubscribed() { - return nil - } - - return viewToMetric(v, w.r, now) -} - -// Read reads all view data and returns them as metrics. -// It is typically invoked by metric reader to export stats in metric format. -func (w *worker) Read() []*metricdata.Metric { - w.mu.Lock() - defer w.mu.Unlock() - now := time.Now() - metrics := make([]*metricdata.Metric, 0, len(w.views)) - for _, v := range w.views { - metric := w.toMetric(v, now) - if metric != nil { - metrics = append(metrics, metric) - } - } - return metrics -} - -func (w *worker) RegisterExporter(e Exporter) { - w.exportersMu.Lock() - defer w.exportersMu.Unlock() - - w.exporters[e] = struct{}{} -} - -func (w *worker) UnregisterExporter(e Exporter) { - w.exportersMu.Lock() - defer w.exportersMu.Unlock() - - delete(w.exporters, e) -} |
