diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 84e04f1ee1..387715029f 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -121,7 +121,7 @@ func NewStore(cfg Config, storeCfg config.ChunkStoreConfig, schemaCfg config.Sch if err != nil { return nil, errors.Wrap(err, "error loading schema config") } - stores := stores.NewCompositeStore(limits) + stores := stores.NewCompositeStore(limits, registerer) s := &store{ Store: stores, diff --git a/pkg/storage/stores/composite_store.go b/pkg/storage/stores/composite_store.go index 2f537022e4..f899a4e744 100644 --- a/pkg/storage/stores/composite_store.go +++ b/pkg/storage/stores/composite_store.go @@ -4,8 +4,10 @@ import ( "context" "sort" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/weaveworks/common/instrument" "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/fetcher" @@ -38,13 +40,14 @@ type CompositeStore struct { } type compositeStore struct { - stores []compositeStoreEntry + stores []compositeStoreEntry + metrics *metrics } // NewCompositeStore creates a new Store which delegates to different stores depending // on time. -func NewCompositeStore(limits StoreLimits) *CompositeStore { - return &CompositeStore{compositeStore{}, limits} +func NewCompositeStore(limits StoreLimits, reg prometheus.Registerer) *CompositeStore { + return &CompositeStore{compositeStore{metrics: newMetrics(reg)}, limits} } func (c *CompositeStore) AddStore(start model.Time, fetcher *fetcher.Fetcher, index series.IndexStore, writer ChunkWriter, stop func()) { @@ -93,6 +96,19 @@ func (c compositeStore) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) } func (c compositeStore) GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) { + var lbls []labels.Labels + if err := instrument.CollectedRequest(ctx, "series", instrument.NewHistogramCollector(c.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { + var err error + lbls, err = c.getSeries(ctx, userID, from, through, matchers...) + return err + }); err != nil { + return nil, err + } + + return lbls, nil +} + +func (c compositeStore) getSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) { var results []labels.Labels found := map[uint64]struct{}{} err := c.forStores(ctx, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error { @@ -116,6 +132,19 @@ func (c compositeStore) GetSeries(ctx context.Context, userID string, from, thro // LabelValuesForMetricName retrieves all label values for a single label name and metric name. func (c compositeStore) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) { + var values []string + if err := instrument.CollectedRequest(ctx, "label_values", instrument.NewHistogramCollector(c.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { + var err error + values, err = c.labelValuesForMetricName(ctx, userID, from, through, metricName, labelName, matchers...) + return err + }); err != nil { + return nil, err + } + + return values, nil +} + +func (c compositeStore) labelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) { var result util.UniqueStrings err := c.forStores(ctx, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error { labelValues, err := store.LabelValuesForMetricName(innerCtx, userID, from, through, metricName, labelName, matchers...) @@ -130,6 +159,19 @@ func (c compositeStore) LabelValuesForMetricName(ctx context.Context, userID str // LabelNamesForMetricName retrieves all label names for a metric name. func (c compositeStore) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) { + var values []string + if err := instrument.CollectedRequest(ctx, "label_names", instrument.NewHistogramCollector(c.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { + var err error + values, err = c.labelNamesForMetricName(ctx, userID, from, through, metricName) + return err + }); err != nil { + return nil, err + } + + return values, nil +} + +func (c compositeStore) labelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) { var result util.UniqueStrings err := c.forStores(ctx, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error { labelNames, err := store.LabelNamesForMetricName(innerCtx, userID, from, through, metricName) @@ -143,6 +185,21 @@ func (c compositeStore) LabelNamesForMetricName(ctx context.Context, userID stri } func (c compositeStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { + var chunks [][]chunk.Chunk + var fetchers []*fetcher.Fetcher + + if err := instrument.CollectedRequest(ctx, "chunkrefs", instrument.NewHistogramCollector(c.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { + var err error + chunks, fetchers, err = c.getChunkRefs(ctx, userID, from, through, matchers...) + return err + }); err != nil { + return nil, nil, err + } + + return chunks, fetchers, nil +} + +func (c compositeStore) getChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { chunkIDs := [][]chunk.Chunk{} fetchers := []*fetcher.Fetcher{} err := c.forStores(ctx, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error { @@ -164,6 +221,19 @@ func (c compositeStore) GetChunkRefs(ctx context.Context, userID string, from, t } func (c compositeStore) Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) { + var sts *stats.Stats + if err := instrument.CollectedRequest(ctx, "stats", instrument.NewHistogramCollector(c.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { + var err error + sts, err = c.stats(ctx, userID, from, through, matchers...) + return err + }); err != nil { + return nil, err + } + + return sts, nil +} + +func (c compositeStore) stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) { xs := make([]*stats.Stats, 0, len(c.stores)) err := c.forStores(ctx, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error { x, err := store.Stats(innerCtx, userID, from, through, matchers...) diff --git a/pkg/storage/stores/composite_store_test.go b/pkg/storage/stores/composite_store_test.go index a97fb078d0..474218458c 100644 --- a/pkg/storage/stores/composite_store_test.go +++ b/pkg/storage/stores/composite_store_test.go @@ -66,6 +66,7 @@ func TestCompositeStore(t *testing.T) { } } cs := compositeStore{ + metrics: newMetrics(nil), stores: []compositeStoreEntry{ {model.TimeFromUnix(0), mockStore(1)}, {model.TimeFromUnix(100), mockStore(2)}, @@ -79,11 +80,12 @@ func TestCompositeStore(t *testing.T) { want []result }{ // Test we have sensible results when there are no schema's defined - {compositeStore{}, 0, 1, []result{}}, + {compositeStore{metrics: newMetrics(nil)}, 0, 1, []result{}}, // Test we have sensible results when there is a single schema { compositeStore{ + metrics: newMetrics(nil), stores: []compositeStoreEntry{ {model.TimeFromUnix(0), mockStore(1)}, }, @@ -97,6 +99,7 @@ func TestCompositeStore(t *testing.T) { // Test we have sensible results for negative (ie pre 1970) times { compositeStore{ + metrics: newMetrics(nil), stores: []compositeStoreEntry{ {model.TimeFromUnix(0), mockStore(1)}, }, @@ -106,6 +109,7 @@ func TestCompositeStore(t *testing.T) { }, { compositeStore{ + metrics: newMetrics(nil), stores: []compositeStoreEntry{ {model.TimeFromUnix(0), mockStore(1)}, }, @@ -119,6 +123,7 @@ func TestCompositeStore(t *testing.T) { // Test we have sensible results when there is two schemas { compositeStore{ + metrics: newMetrics(nil), stores: []compositeStoreEntry{ {model.TimeFromUnix(0), mockStore(1)}, {model.TimeFromUnix(100), mockStore(2)}, @@ -199,6 +204,7 @@ func TestCompositeStoreLabels(t *testing.T) { t.Parallel() cs := compositeStore{ + metrics: newMetrics(nil), stores: []compositeStoreEntry{ {model.TimeFromUnix(0), mockStore(1)}, {model.TimeFromUnix(20), mockStoreLabel{mockStore(1), []string{"b", "c", "e"}}}, @@ -249,6 +255,7 @@ func (m mockStoreGetChunkFetcher) GetChunkFetcher(tm model.Time) *fetcher.Fetche func TestCompositeStore_GetChunkFetcher(t *testing.T) { cs := compositeStore{ + metrics: newMetrics(nil), stores: []compositeStoreEntry{ {model.TimeFromUnix(10), mockStoreGetChunkFetcher{mockStore(0), &fetcher.Fetcher{}}}, {model.TimeFromUnix(20), mockStoreGetChunkFetcher{mockStore(1), &fetcher.Fetcher{}}}, diff --git a/pkg/storage/stores/metrics.go b/pkg/storage/stores/metrics.go new file mode 100644 index 0000000000..5e0c13b15b --- /dev/null +++ b/pkg/storage/stores/metrics.go @@ -0,0 +1,21 @@ +package stores + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type metrics struct { + // ToDo(Sandeep): Refactor code to include index write requests + indexQueryLatency *prometheus.HistogramVec +} + +func newMetrics(reg prometheus.Registerer) *metrics { + return &metrics{ + indexQueryLatency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "loki", + Name: "index_request_duration_seconds", + Help: "Time (in seconds) spent in serving index query requests", + }, []string{"operation", "status_code"}), + } +}