|
|
|
@ -4,8 +4,10 @@ import ( |
|
|
|
|
"context" |
|
|
|
|
"sort" |
|
|
|
|
|
|
|
|
|
"github.com/prometheus/client_golang/prometheus" |
|
|
|
|
"github.com/prometheus/common/model" |
|
|
|
|
"github.com/prometheus/prometheus/model/labels" |
|
|
|
|
"github.com/weaveworks/common/instrument" |
|
|
|
|
|
|
|
|
|
"github.com/grafana/loki/pkg/storage/chunk" |
|
|
|
|
"github.com/grafana/loki/pkg/storage/chunk/fetcher" |
|
|
|
@ -38,13 +40,14 @@ type CompositeStore struct { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type compositeStore struct { |
|
|
|
|
stores []compositeStoreEntry |
|
|
|
|
stores []compositeStoreEntry |
|
|
|
|
metrics *metrics |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// NewCompositeStore creates a new Store which delegates to different stores depending
|
|
|
|
|
// on time.
|
|
|
|
|
func NewCompositeStore(limits StoreLimits) *CompositeStore { |
|
|
|
|
return &CompositeStore{compositeStore{}, limits} |
|
|
|
|
func NewCompositeStore(limits StoreLimits, reg prometheus.Registerer) *CompositeStore { |
|
|
|
|
return &CompositeStore{compositeStore{metrics: newMetrics(reg)}, limits} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (c *CompositeStore) AddStore(start model.Time, fetcher *fetcher.Fetcher, index series.IndexStore, writer ChunkWriter, stop func()) { |
|
|
|
@ -93,6 +96,19 @@ func (c compositeStore) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (c compositeStore) GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) { |
|
|
|
|
var lbls []labels.Labels |
|
|
|
|
if err := instrument.CollectedRequest(ctx, "series", instrument.NewHistogramCollector(c.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { |
|
|
|
|
var err error |
|
|
|
|
lbls, err = c.getSeries(ctx, userID, from, through, matchers...) |
|
|
|
|
return err |
|
|
|
|
}); err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return lbls, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (c compositeStore) getSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) { |
|
|
|
|
var results []labels.Labels |
|
|
|
|
found := map[uint64]struct{}{} |
|
|
|
|
err := c.forStores(ctx, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error { |
|
|
|
@ -116,6 +132,19 @@ func (c compositeStore) GetSeries(ctx context.Context, userID string, from, thro |
|
|
|
|
|
|
|
|
|
// LabelValuesForMetricName retrieves all label values for a single label name and metric name.
|
|
|
|
|
func (c compositeStore) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) { |
|
|
|
|
var values []string |
|
|
|
|
if err := instrument.CollectedRequest(ctx, "label_values", instrument.NewHistogramCollector(c.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { |
|
|
|
|
var err error |
|
|
|
|
values, err = c.labelValuesForMetricName(ctx, userID, from, through, metricName, labelName, matchers...) |
|
|
|
|
return err |
|
|
|
|
}); err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return values, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (c compositeStore) labelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) { |
|
|
|
|
var result util.UniqueStrings |
|
|
|
|
err := c.forStores(ctx, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error { |
|
|
|
|
labelValues, err := store.LabelValuesForMetricName(innerCtx, userID, from, through, metricName, labelName, matchers...) |
|
|
|
@ -130,6 +159,19 @@ func (c compositeStore) LabelValuesForMetricName(ctx context.Context, userID str |
|
|
|
|
|
|
|
|
|
// LabelNamesForMetricName retrieves all label names for a metric name.
|
|
|
|
|
func (c compositeStore) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) { |
|
|
|
|
var values []string |
|
|
|
|
if err := instrument.CollectedRequest(ctx, "label_names", instrument.NewHistogramCollector(c.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { |
|
|
|
|
var err error |
|
|
|
|
values, err = c.labelNamesForMetricName(ctx, userID, from, through, metricName) |
|
|
|
|
return err |
|
|
|
|
}); err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return values, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (c compositeStore) labelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) { |
|
|
|
|
var result util.UniqueStrings |
|
|
|
|
err := c.forStores(ctx, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error { |
|
|
|
|
labelNames, err := store.LabelNamesForMetricName(innerCtx, userID, from, through, metricName) |
|
|
|
@ -143,6 +185,21 @@ func (c compositeStore) LabelNamesForMetricName(ctx context.Context, userID stri |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (c compositeStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { |
|
|
|
|
var chunks [][]chunk.Chunk |
|
|
|
|
var fetchers []*fetcher.Fetcher |
|
|
|
|
|
|
|
|
|
if err := instrument.CollectedRequest(ctx, "chunkrefs", instrument.NewHistogramCollector(c.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { |
|
|
|
|
var err error |
|
|
|
|
chunks, fetchers, err = c.getChunkRefs(ctx, userID, from, through, matchers...) |
|
|
|
|
return err |
|
|
|
|
}); err != nil { |
|
|
|
|
return nil, nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return chunks, fetchers, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (c compositeStore) getChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { |
|
|
|
|
chunkIDs := [][]chunk.Chunk{} |
|
|
|
|
fetchers := []*fetcher.Fetcher{} |
|
|
|
|
err := c.forStores(ctx, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error { |
|
|
|
@ -164,6 +221,19 @@ func (c compositeStore) GetChunkRefs(ctx context.Context, userID string, from, t |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (c compositeStore) Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) { |
|
|
|
|
var sts *stats.Stats |
|
|
|
|
if err := instrument.CollectedRequest(ctx, "stats", instrument.NewHistogramCollector(c.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { |
|
|
|
|
var err error |
|
|
|
|
sts, err = c.stats(ctx, userID, from, through, matchers...) |
|
|
|
|
return err |
|
|
|
|
}); err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return sts, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (c compositeStore) stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) { |
|
|
|
|
xs := make([]*stats.Stats, 0, len(c.stores)) |
|
|
|
|
err := c.forStores(ctx, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error { |
|
|
|
|
x, err := store.Stats(innerCtx, userID, from, through, matchers...) |
|
|
|
|