Query label values and names are now fetched from the store. (#521)

* Query label values and names are now fetched from the store.

A time range is now required by the /api/prom/label with a sane default (6 hours from now).

* fix http querystring and update doc

* update vendor

* rebased
pull/728/head
Cyril Tovena 6 years ago committed by GitHub
parent d3ab48669e
commit 86bc3c0518
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 61
      Gopkg.lock
  2. 17
      docs/api.md
  3. 159
      pkg/logproto/logproto.pb.go
  4. 2
      pkg/logproto/logproto.proto
  5. 17
      pkg/querier/http.go
  6. 16
      pkg/querier/querier.go
  7. 40
      vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go
  8. 54
      vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store_utils.go
  9. 15
      vendor/github.com/cortexproject/cortex/pkg/chunk/composite_store.go
  10. 57
      vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go

61
Gopkg.lock generated

@ -222,7 +222,7 @@
[[projects]]
branch = "master"
digest = "1:2f0846dd85df3365a80c32ff994eb1fcee5eec2c51a812ceec182398f3ef85f4"
digest = "1:5a07b5363e4c2aa127a3afd1e8e323d3a288ba1d90d37793d2e14843f5b5b82e"
name = "github.com/cortexproject/cortex"
packages = [
"pkg/chunk",
@ -248,7 +248,7 @@
"pkg/util/validation",
]
pruneopts = "UT"
revision = "e1ab5495e8a846891e3b6b8e757e63201b886bec"
revision = "ef492f6bbafb185bbe61ae7a6955b7a4af5f3d9a"
[[projects]]
digest = "1:ffe9824d294da03b391f44e1ae8281281b4afc1bdaa9588c9097785e3af10cec"
@ -560,39 +560,6 @@
revision = "66b9c49e59c6c48f0ffce28c2d8b8a5678502c6d"
version = "v1.4.0"
[[projects]]
branch = "master"
digest = "1:1f4181cfeacebef71babf22e99d727c1667e1f620982787c7035653d6e887dbb"
name = "github.com/grafana/loki"
packages = [
"pkg/chunkenc",
"pkg/distributor",
"pkg/helpers",
"pkg/ingester",
"pkg/ingester/client",
"pkg/iter",
"pkg/logentry/metric",
"pkg/logentry/stages",
"pkg/logproto",
"pkg/logql",
"pkg/loki",
"pkg/promtail",
"pkg/promtail/api",
"pkg/promtail/client",
"pkg/promtail/client/fake",
"pkg/promtail/config",
"pkg/promtail/positions",
"pkg/promtail/scrape",
"pkg/promtail/server",
"pkg/promtail/server/ui",
"pkg/promtail/targets",
"pkg/querier",
"pkg/util",
"pkg/util/flagext",
]
pruneopts = "UT"
revision = "4c7138231f77997909564616efc5d0cdbcb1ead8"
[[projects]]
digest = "1:1168584a5881d371e96cb0e66ef6db71d7cef0856cc7f311490bc856627f8328"
name = "github.com/grpc-ecosystem/go-grpc-middleware"
@ -1619,30 +1586,6 @@
"github.com/golang/snappy",
"github.com/gorilla/mux",
"github.com/gorilla/websocket",
"github.com/grafana/loki/pkg/chunkenc",
"github.com/grafana/loki/pkg/distributor",
"github.com/grafana/loki/pkg/helpers",
"github.com/grafana/loki/pkg/ingester",
"github.com/grafana/loki/pkg/ingester/client",
"github.com/grafana/loki/pkg/iter",
"github.com/grafana/loki/pkg/logentry/metric",
"github.com/grafana/loki/pkg/logentry/stages",
"github.com/grafana/loki/pkg/logproto",
"github.com/grafana/loki/pkg/logql",
"github.com/grafana/loki/pkg/loki",
"github.com/grafana/loki/pkg/promtail",
"github.com/grafana/loki/pkg/promtail/api",
"github.com/grafana/loki/pkg/promtail/client",
"github.com/grafana/loki/pkg/promtail/client/fake",
"github.com/grafana/loki/pkg/promtail/config",
"github.com/grafana/loki/pkg/promtail/positions",
"github.com/grafana/loki/pkg/promtail/scrape",
"github.com/grafana/loki/pkg/promtail/server",
"github.com/grafana/loki/pkg/promtail/server/ui",
"github.com/grafana/loki/pkg/promtail/targets",
"github.com/grafana/loki/pkg/querier",
"github.com/grafana/loki/pkg/util",
"github.com/grafana/loki/pkg/util/flagext",
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc",
"github.com/hpcloud/tail",
"github.com/jmespath/go-jmespath",

@ -39,7 +39,7 @@ The Loki server has the following API endpoints (_Note:_ Authentication is out o
Responses looks like this:
```
```json
{
"streams": [
{
@ -59,11 +59,14 @@ The Loki server has the following API endpoints (_Note:_ Authentication is out o
- `GET /api/prom/label`
For retrieving the names of the labels one can query on.
For doing label name queries, accepts the following parameters in the query-string:
- `start`: the start time for the query, as a nanosecond Unix epoch (nanoseconds since 1970). Default is always 6 hour ago.
- `end`: the end time for the query, as a nanosecond Unix epoch (nanoseconds since 1970). Default is current time.
Responses looks like this:
```
```json
{
"values": [
"instance",
@ -74,11 +77,15 @@ The Loki server has the following API endpoints (_Note:_ Authentication is out o
```
- `GET /api/prom/label/<name>/values`
For retrieving the label values one can query on.
For doing label values queries, accepts the following parameters in the query-string:
- `start`: the start time for the query, as a nanosecond Unix epoch (nanoseconds since 1970). Default is always 6 hour ago.
- `end`: the end time for the query, as a nanosecond Unix epoch (nanoseconds since 1970). Default is current time.
Responses looks like this:
```
```json
{
"values": [
"default",

@ -6,17 +6,18 @@ package logproto
import (
context "context"
fmt "fmt"
_ "github.com/gogo/protobuf/gogoproto"
proto "github.com/gogo/protobuf/proto"
_ "github.com/gogo/protobuf/types"
github_com_gogo_protobuf_types "github.com/gogo/protobuf/types"
grpc "google.golang.org/grpc"
io "io"
math "math"
reflect "reflect"
strconv "strconv"
strings "strings"
time "time"
_ "github.com/gogo/protobuf/gogoproto"
proto "github.com/gogo/protobuf/proto"
_ "github.com/gogo/protobuf/types"
github_com_gogo_protobuf_types "github.com/gogo/protobuf/types"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
@ -257,8 +258,10 @@ func (m *QueryResponse) GetStreams() []*Stream {
}
type LabelRequest struct {
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
Values bool `protobuf:"varint,2,opt,name=values,proto3" json:"values,omitempty"`
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
Values bool `protobuf:"varint,2,opt,name=values,proto3" json:"values,omitempty"`
Start *time.Time `protobuf:"bytes,3,opt,name=start,proto3,stdtime" json:"start,omitempty"`
End *time.Time `protobuf:"bytes,4,opt,name=end,proto3,stdtime" json:"end,omitempty"`
}
func (m *LabelRequest) Reset() { *m = LabelRequest{} }
@ -307,6 +310,20 @@ func (m *LabelRequest) GetValues() bool {
return false
}
func (m *LabelRequest) GetStart() *time.Time {
if m != nil {
return m.Start
}
return nil
}
func (m *LabelRequest) GetEnd() *time.Time {
if m != nil {
return m.End
}
return nil
}
type LabelResponse struct {
Values []string `protobuf:"bytes,1,rep,name=values,proto3" json:"values,omitempty"`
}
@ -838,6 +855,20 @@ func (this *LabelRequest) Equal(that interface{}) bool {
if this.Values != that1.Values {
return false
}
if that1.Start == nil {
if this.Start != nil {
return false
}
} else if !this.Start.Equal(*that1.Start) {
return false
}
if that1.End == nil {
if this.End != nil {
return false
}
} else if !this.End.Equal(*that1.End) {
return false
}
return true
}
func (this *LabelResponse) Equal(that interface{}) bool {
@ -1072,10 +1103,12 @@ func (this *LabelRequest) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 6)
s := make([]string, 0, 8)
s = append(s, "&logproto.LabelRequest{")
s = append(s, "Name: "+fmt.Sprintf("%#v", this.Name)+",\n")
s = append(s, "Values: "+fmt.Sprintf("%#v", this.Values)+",\n")
s = append(s, "Start: "+fmt.Sprintf("%#v", this.Start)+",\n")
s = append(s, "End: "+fmt.Sprintf("%#v", this.End)+",\n")
s = append(s, "}")
return strings.Join(s, "")
}
@ -1587,6 +1620,26 @@ func (m *LabelRequest) MarshalTo(dAtA []byte) (int, error) {
}
i++
}
if m.Start != nil {
dAtA[i] = 0x1a
i++
i = encodeVarintLogproto(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(*m.Start)))
n3, err3 := github_com_gogo_protobuf_types.StdTimeMarshalTo(*m.Start, dAtA[i:])
if err3 != nil {
return 0, err3
}
i += n3
}
if m.End != nil {
dAtA[i] = 0x22
i++
i = encodeVarintLogproto(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(*m.End)))
n4, err4 := github_com_gogo_protobuf_types.StdTimeMarshalTo(*m.End, dAtA[i:])
if err4 != nil {
return 0, err4
}
i += n4
}
return i, nil
}
@ -1677,11 +1730,11 @@ func (m *Entry) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0xa
i++
i = encodeVarintLogproto(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp)))
n3, err3 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i:])
if err3 != nil {
return 0, err3
n5, err5 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i:])
if err5 != nil {
return 0, err5
}
i += n3
i += n5
if len(m.Line) > 0 {
dAtA[i] = 0x12
i++
@ -1894,6 +1947,14 @@ func (m *LabelRequest) Size() (n int) {
if m.Values {
n += 2
}
if m.Start != nil {
l = github_com_gogo_protobuf_types.SizeOfStdTime(*m.Start)
n += 1 + l + sovLogproto(uint64(l))
}
if m.End != nil {
l = github_com_gogo_protobuf_types.SizeOfStdTime(*m.End)
n += 1 + l + sovLogproto(uint64(l))
}
return n
}
@ -2076,6 +2137,8 @@ func (this *LabelRequest) String() string {
s := strings.Join([]string{`&LabelRequest{`,
`Name:` + fmt.Sprintf("%v", this.Name) + `,`,
`Values:` + fmt.Sprintf("%v", this.Values) + `,`,
`Start:` + strings.Replace(fmt.Sprintf("%v", this.Start), "Timestamp", "types.Timestamp", 1) + `,`,
`End:` + strings.Replace(fmt.Sprintf("%v", this.End), "Timestamp", "types.Timestamp", 1) + `,`,
`}`,
}, "")
return s
@ -2694,6 +2757,78 @@ func (m *LabelRequest) Unmarshal(dAtA []byte) error {
}
}
m.Values = bool(v != 0)
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Start", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLogproto
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthLogproto
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthLogproto
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Start == nil {
m.Start = new(time.Time)
}
if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(m.Start, dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field End", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLogproto
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthLogproto
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthLogproto
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.End == nil {
m.End = new(time.Time)
}
if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(m.End, dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipLogproto(dAtA[iNdEx:])

@ -43,6 +43,8 @@ message QueryResponse {
message LabelRequest {
string name = 1;
bool values = 2; // True to fetch label values, false for fetch labels names.
google.protobuf.Timestamp start = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = true];
google.protobuf.Timestamp end = 4 [(gogoproto.stdtime) = true, (gogoproto.nullable) = true];
}
message LabelResponse {

@ -143,10 +143,27 @@ func (q *Querier) QueryHandler(w http.ResponseWriter, r *http.Request) {
// LabelHandler is a http.HandlerFunc for handling label queries.
func (q *Querier) LabelHandler(w http.ResponseWriter, r *http.Request) {
name, ok := mux.Vars(r)["name"]
params := r.URL.Query()
now := time.Now()
req := &logproto.LabelRequest{
Values: ok,
Name: name,
}
end, err := unixNanoTimeParam(params, "end", now)
if err != nil {
http.Error(w, httpgrpc.Errorf(http.StatusBadRequest, err.Error()).Error(), http.StatusBadRequest)
return
}
req.End = &end
start, err := unixNanoTimeParam(params, "start", end.Add(-6*time.Hour))
if err != nil {
http.Error(w, httpgrpc.Errorf(http.StatusBadRequest, err.Error()).Error(), http.StatusBadRequest)
return
}
req.Start = &start
resp, err := q.Label(r.Context(), req)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)

@ -10,6 +10,7 @@ import (
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
token_util "github.com/grafana/loki/pkg/util"
"github.com/prometheus/common/model"
"github.com/weaveworks/common/user"
"google.golang.org/grpc/health/grpc_health_v1"
@ -148,10 +149,25 @@ func (q *Querier) Label(ctx context.Context, req *logproto.LabelRequest) (*logpr
return nil, err
}
from, through := model.TimeFromUnixNano(req.Start.UnixNano()), model.TimeFromUnixNano(req.End.UnixNano())
var storeValues []string
if req.Values {
storeValues, err = q.store.LabelValuesForMetricName(ctx, from, through, "logs", req.Name)
if err != nil {
return nil, err
}
} else {
storeValues, err = q.store.LabelNamesForMetricName(ctx, from, through, "logs")
if err != nil {
return nil, err
}
}
results := make([][]string, 0, len(resps))
for _, resp := range resps {
results = append(results, resp.response.(*logproto.LabelResponse).Values)
}
results = append(results, storeValues)
return &logproto.LabelResponse{
Values: mergeLists(results...),

@ -64,11 +64,15 @@ type StoreConfig struct {
// Limits query start time to be greater than now() - MaxLookBackPeriod, if set.
MaxLookBackPeriod time.Duration `yaml:"max_look_back_period"`
// Not visible in yaml because the setting shouldn't be common between ingesters and queriers
chunkCacheStubs bool // don't write the full chunk to cache, just a stub entry
}
// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) {
cfg.ChunkCacheConfig.RegisterFlagsWithPrefix("", "Cache config for chunks. ", f)
f.BoolVar(&cfg.chunkCacheStubs, "store.chunk-cache-stubs", false, "If true, don't write the full chunk to cache, just a stub entry.")
cfg.WriteDedupeCacheConfig.RegisterFlagsWithPrefix("store.index-cache-write.", "Cache config for index entry writing. ", f)
f.DurationVar(&cfg.MinChunkAge, "store.min-chunk-age", 0, "Minimum time between chunk update and being saved to the store.")
@ -92,7 +96,7 @@ type store struct {
}
func newStore(cfg StoreConfig, schema Schema, index IndexClient, chunks ObjectClient, limits *validation.Overrides) (Store, error) {
fetcher, err := NewChunkFetcher(cfg.ChunkCacheConfig, chunks)
fetcher, err := NewChunkFetcher(cfg.ChunkCacheConfig, cfg.chunkCacheStubs, chunks)
if err != nil {
return nil, err
}
@ -228,12 +232,44 @@ func (c *store) LabelValuesForMetricName(ctx context.Context, from, through mode
}
result = append(result, string(labelValue))
}
sort.Strings(result)
result = uniqueStrings(result)
return result, nil
}
// LabelNamesForMetricName retrieves all label names for a metric name.
func (c *store) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) {
log, ctx := spanlogger.New(ctx, "ChunkStore.LabelNamesForMetricName")
defer log.Span.Finish()
level.Debug(log).Log("from", from, "through", through, "metricName", metricName)
shortcut, err := c.validateQueryTimeRange(ctx, &from, &through)
if err != nil {
return nil, err
} else if shortcut {
return nil, nil
}
chunks, err := c.lookupChunksByMetricName(ctx, from, through, nil, metricName)
if err != nil {
return nil, err
}
level.Debug(log).Log("msg", "Chunks in index", "chunks", len(chunks))
// Filter out chunks that are not in the selected time range and keep a single chunk per fingerprint
filtered := filterChunksByTime(from, through, chunks)
filtered, keys := filterChunksByUniqueFingerprint(filtered)
level.Debug(log).Log("msg", "Chunks post filtering", "chunks", len(chunks))
// Now fetch the actual chunk data from Memcache / S3
allChunks, err := c.FetchChunks(ctx, filtered, keys)
if err != nil {
level.Error(log).Log("msg", "FetchChunks", "err", err)
return nil, err
}
return labelNamesFromChunks(allChunks), nil
}
func (c *store) validateQueryTimeRange(ctx context.Context, from *model.Time, through *model.Time) (bool, error) {
log, ctx := spanlogger.New(ctx, "store.validateQueryTimeRange")
defer log.Span.Finish()

@ -2,6 +2,7 @@ package chunk
import (
"context"
"sort"
"sync"
"github.com/go-kit/kit/log/level"
@ -36,6 +37,39 @@ func keysFromChunks(chunks []Chunk) []string {
return keys
}
func labelNamesFromChunks(chunks []Chunk) []string {
keys := map[string]struct{}{}
var result []string
for _, c := range chunks {
for _, l := range c.Metric {
if l.Name != model.MetricNameLabel {
if _, ok := keys[string(l.Name)]; !ok {
keys[string(l.Name)] = struct{}{}
result = append(result, string(l.Name))
}
}
}
}
sort.Strings(result)
return result
}
func filterChunksByUniqueFingerprint(chunks []Chunk) ([]Chunk, []string) {
filtered := make([]Chunk, 0, len(chunks))
keys := make([]string, 0, len(chunks))
uniqueFp := map[model.Fingerprint]struct{}{}
for _, chunk := range chunks {
if _, ok := uniqueFp[chunk.Fingerprint]; ok {
continue
}
filtered = append(filtered, chunk)
keys = append(keys, chunk.ExternalKey())
uniqueFp[chunk.Fingerprint] = struct{}{}
}
return filtered, keys
}
func filterChunksByMatchers(chunks []Chunk, filters []*labels.Matcher) []Chunk {
filteredChunks := make([]Chunk, 0, len(chunks))
outer:
@ -54,8 +88,9 @@ outer:
// and writing back any misses to the cache. Also responsible for decoding
// chunks from the cache, in parallel.
type Fetcher struct {
storage ObjectClient
cache cache.Cache
storage ObjectClient
cache cache.Cache
cacheStubs bool
wait sync.WaitGroup
decodeRequests chan decodeRequest
@ -72,7 +107,7 @@ type decodeResponse struct {
}
// NewChunkFetcher makes a new ChunkFetcher.
func NewChunkFetcher(cfg cache.Config, storage ObjectClient) (*Fetcher, error) {
func NewChunkFetcher(cfg cache.Config, cacheStubs bool, storage ObjectClient) (*Fetcher, error) {
cache, err := cache.New(cfg)
if err != nil {
return nil, err
@ -81,6 +116,7 @@ func NewChunkFetcher(cfg cache.Config, storage ObjectClient) (*Fetcher, error) {
c := &Fetcher{
storage: storage,
cache: cache,
cacheStubs: cacheStubs,
decodeRequests: make(chan decodeRequest),
}
@ -149,10 +185,14 @@ func (c *Fetcher) writeBackCache(ctx context.Context, chunks []Chunk) error {
keys := make([]string, 0, len(chunks))
bufs := make([][]byte, 0, len(chunks))
for i := range chunks {
encoded, err := chunks[i].Encoded()
// TODO don't fail, just log and conitnue?
if err != nil {
return err
var encoded []byte
var err error
if !c.cacheStubs {
encoded, err = chunks[i].Encoded()
// TODO don't fail, just log and conitnue?
if err != nil {
return err
}
}
keys = append(keys, chunks[i].ExternalKey())

@ -19,6 +19,7 @@ type Store interface {
// using the corresponding Fetcher (fetchers[i].FetchChunks(ctx, chunks[i], ...)
GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error)
LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName string, labelName string) ([]string, error)
LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error)
Stop()
}
@ -106,6 +107,20 @@ func (c compositeStore) LabelValuesForMetricName(ctx context.Context, from, thro
return result, err
}
// LabelNamesForMetricName retrieves all label names for a metric name.
func (c compositeStore) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) {
var result []string
err := c.forStores(from, through, func(from, through model.Time, store Store) error {
labelNames, err := store.LabelNamesForMetricName(ctx, from, through, metricName)
if err != nil {
return err
}
result = append(result, labelNames...)
return nil
})
return result, err
}
func (c compositeStore) GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error) {
chunkIDs := [][]Chunk{}
fetchers := []*Fetcher{}

@ -68,7 +68,7 @@ type seriesStore struct {
}
func newSeriesStore(cfg StoreConfig, schema Schema, index IndexClient, chunks ObjectClient, limits *validation.Overrides) (Store, error) {
fetcher, err := NewChunkFetcher(cfg.ChunkCacheConfig, chunks)
fetcher, err := NewChunkFetcher(cfg.ChunkCacheConfig, cfg.chunkCacheStubs, chunks)
if err != nil {
return nil, err
}
@ -191,6 +191,61 @@ func (c *seriesStore) GetChunkRefs(ctx context.Context, from, through model.Time
return [][]Chunk{chunks}, []*Fetcher{c.store.Fetcher}, nil
}
// LabelNamesForMetricName retrieves all label names for a metric name.
func (c *seriesStore) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) {
log, ctx := spanlogger.New(ctx, "SeriesStore.LabelNamesForMetricName")
defer log.Span.Finish()
userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}
shortcut, err := c.validateQueryTimeRange(ctx, &from, &through)
if err != nil {
return nil, err
} else if shortcut {
return nil, nil
}
level.Debug(log).Log("metric", metricName)
// Fetch the series IDs from the index
seriesIDs, err := c.lookupSeriesByMetricNameMatchers(ctx, from, through, userID, metricName, nil)
if err != nil {
return nil, err
}
level.Debug(log).Log("series-ids", len(seriesIDs))
// Lookup the series in the index to get the chunks.
chunkIDs, err := c.lookupChunksBySeries(ctx, from, through, userID, seriesIDs)
if err != nil {
level.Error(log).Log("msg", "lookupChunksBySeries", "err", err)
return nil, err
}
level.Debug(log).Log("chunk-ids", len(chunkIDs))
chunks, err := c.convertChunkIDsToChunks(ctx, userID, chunkIDs)
if err != nil {
level.Error(log).Log("err", "convertChunkIDsToChunks", "err", err)
return nil, err
}
// Filter out chunks that are not in the selected time range and keep a single chunk per fingerprint
filtered := filterChunksByTime(from, through, chunks)
filtered, keys := filterChunksByUniqueFingerprint(filtered)
level.Debug(log).Log("Chunks post filtering", len(chunks))
chunksPerQuery.Observe(float64(len(filtered)))
// Now fetch the actual chunk data from Memcache / S3
allChunks, err := c.FetchChunks(ctx, filtered, keys)
if err != nil {
level.Error(log).Log("msg", "FetchChunks", "err", err)
return nil, err
}
return labelNamesFromChunks(allChunks), nil
}
func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from, through model.Time, userID, metricName string, matchers []*labels.Matcher) ([]string, error) {
log, ctx := spanlogger.New(ctx, "SeriesStore.lookupSeriesByMetricNameMatchers", "metricName", metricName, "matchers", len(matchers))
defer log.Span.Finish()

Loading…
Cancel
Save