Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/querier/querier.go

356 lines
10 KiB

package querier
import (
"context"
"flag"
"net/http"
"time"
cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/storage"
)
const (
// How long the Tailer should wait - once there are no entries to read from ingesters -
// before checking if a new entry is available (to avoid spinning the CPU in a continuous
// check loop)
tailerWaitEntryThrottle = time.Second / 2
)
var readinessProbeSuccess = []byte("Ready")
// Config for a querier.
type Config struct {
QueryTimeout time.Duration `yaml:"query_timeout"`
TailMaxDuration time.Duration `yaml:"tail_max_duration"`
Engine logql.EngineOpts `yaml:"engine,omitempty"`
}
// RegisterFlags register flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.TailMaxDuration, "querier.tail-max-duration", 1*time.Hour, "Limit the duration for which live tailing request would be served")
f.DurationVar(&cfg.QueryTimeout, "querier.query_timeout", 1*time.Minute, "Timeout when querying backends (ingesters or storage) during the execution of a query request")
}
// Querier handlers queries.
type Querier struct {
cfg Config
ring ring.ReadRing
pool *cortex_client.Pool
store storage.Store
engine *logql.Engine
}
// New makes a new Querier.
func New(cfg Config, clientCfg client.Config, ring ring.ReadRing, store storage.Store) (*Querier, error) {
factory := func(addr string) (grpc_health_v1.HealthClient, error) {
return client.New(clientCfg, addr)
}
return newQuerier(cfg, clientCfg, factory, ring, store)
}
// newQuerier creates a new Querier and allows to pass a custom ingester client factory
// used for testing purposes
func newQuerier(cfg Config, clientCfg client.Config, clientFactory cortex_client.Factory, ring ring.ReadRing, store storage.Store) (*Querier, error) {
return &Querier{
cfg: cfg,
ring: ring,
pool: cortex_client.NewPool(clientCfg.PoolConfig, ring, clientFactory, util.Logger),
store: store,
engine: logql.NewEngine(cfg.Engine),
}, nil
}
type responseFromIngesters struct {
addr string
response interface{}
}
// ReadinessHandler is used to indicate to k8s when the querier is ready.
// Returns 200 when the querier is ready, 500 otherwise.
func (q *Querier) ReadinessHandler(w http.ResponseWriter, r *http.Request) {
_, err := q.ring.GetAll()
if err != nil {
http.Error(w, "Not ready: "+err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
if _, err := w.Write(readinessProbeSuccess); err != nil {
level.Error(util.Logger).Log("msg", "error writing success message", "error", err)
}
}
// forAllIngesters runs f, in parallel, for all ingesters
// TODO taken from Cortex, see if we can refactor out an usable interface.
func (q *Querier) forAllIngesters(f func(logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
replicationSet, err := q.ring.GetAll()
if err != nil {
return nil, err
}
return q.forGivenIngesters(replicationSet, f)
}
// forGivenIngesters runs f, in parallel, for given ingesters
// TODO taken from Cortex, see if we can refactor out an usable interface.
func (q *Querier) forGivenIngesters(replicationSet ring.ReplicationSet, f func(logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
resps, errs := make(chan responseFromIngesters), make(chan error)
for _, ingester := range replicationSet.Ingesters {
go func(ingester ring.IngesterDesc) {
client, err := q.pool.GetClientFor(ingester.Addr)
if err != nil {
errs <- err
return
}
resp, err := f(client.(logproto.QuerierClient))
if err != nil {
errs <- err
} else {
resps <- responseFromIngesters{ingester.Addr, resp}
}
}(ingester)
}
var lastErr error
result, numErrs := []responseFromIngesters{}, 0
for range replicationSet.Ingesters {
select {
case resp := <-resps:
result = append(result, resp)
case lastErr = <-errs:
numErrs++
}
}
if numErrs > replicationSet.MaxErrors {
return nil, lastErr
}
return result, nil
}
// Select Implements logql.Querier which select logs via matchers and regex filters.
func (q *Querier) Select(ctx context.Context, params logql.SelectParams) (iter.EntryIterator, error) {
ingesterIterators, err := q.queryIngesters(ctx, params)
if err != nil {
return nil, err
}
chunkStoreIterators, err := q.store.LazyQuery(ctx, params)
if err != nil {
return nil, err
}
iterators := append(ingesterIterators, chunkStoreIterators)
return iter.NewHeapIterator(iterators, params.Direction), nil
}
func (q *Querier) queryIngesters(ctx context.Context, params logql.SelectParams) ([]iter.EntryIterator, error) {
clients, err := q.forAllIngesters(func(client logproto.QuerierClient) (interface{}, error) {
return client.Query(ctx, params.QueryRequest)
})
if err != nil {
return nil, err
}
iterators := make([]iter.EntryIterator, len(clients))
for i := range clients {
iterators[i] = iter.NewQueryClientIterator(clients[i].response.(logproto.Querier_QueryClient), params.Direction)
}
return iterators, nil
}
// Label does the heavy lifting for a Label query.
func (q *Querier) Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) {
// Enforce the query timeout while querying backends
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(q.cfg.QueryTimeout))
defer cancel()
resps, err := q.forAllIngesters(func(client logproto.QuerierClient) (interface{}, error) {
return client.Label(ctx, req)
})
if err != nil {
return nil, err
}
from, through := model.TimeFromUnixNano(req.Start.UnixNano()), model.TimeFromUnixNano(req.End.UnixNano())
var storeValues []string
if req.Values {
storeValues, err = q.store.LabelValuesForMetricName(ctx, from, through, "logs", req.Name)
if err != nil {
return nil, err
}
} else {
storeValues, err = q.store.LabelNamesForMetricName(ctx, from, through, "logs")
if err != nil {
return nil, err
}
}
results := make([][]string, 0, len(resps))
for _, resp := range resps {
results = append(results, resp.response.(*logproto.LabelResponse).Values)
}
results = append(results, storeValues)
return &logproto.LabelResponse{
Values: mergeLists(results...),
}, nil
}
// Check implements the grpc healthcheck
func (*Querier) Check(_ context.Context, _ *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
}
func mergeLists(ss ...[]string) []string {
switch len(ss) {
case 0:
return nil
case 1:
return ss[0]
case 2:
return mergePair(ss[0], ss[1])
default:
n := len(ss) / 2
return mergePair(mergeLists(ss[:n]...), mergeLists(ss[n:]...))
}
}
func mergePair(s1, s2 []string) []string {
i, j := 0, 0
result := make([]string, 0, len(s1)+len(s2))
for i < len(s1) && j < len(s2) {
if s1[i] < s2[j] {
result = append(result, s1[i])
i++
} else if s1[i] > s2[j] {
result = append(result, s2[j])
j++
} else {
result = append(result, s1[i])
i++
j++
}
}
for ; i < len(s1); i++ {
result = append(result, s1[i])
}
for ; j < len(s2); j++ {
result = append(result, s2[j])
}
return result
}
// Tail keeps getting matching logs from all ingesters for given query
func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, error) {
// Enforce the query timeout except when tailing, otherwise the tailing
// will be terminated once the query timeout is reached
tailCtx := ctx
queryCtx, cancelQuery := context.WithDeadline(ctx, time.Now().Add(q.cfg.QueryTimeout))
defer cancelQuery()
clients, err := q.forAllIngesters(func(client logproto.QuerierClient) (interface{}, error) {
return client.Tail(tailCtx, req)
})
if err != nil {
return nil, err
}
tailClients := make(map[string]logproto.Querier_TailClient)
for i := range clients {
tailClients[clients[i].addr] = clients[i].response.(logproto.Querier_TailClient)
}
histReq := logql.SelectParams{
QueryRequest: &logproto.QueryRequest{
Selector: req.Query,
Start: req.Start,
End: time.Now(),
Limit: req.Limit,
Direction: logproto.BACKWARD,
},
}
histIterators, err := q.Select(queryCtx, histReq)
if err != nil {
return nil, err
}
reversedIterator, err := iter.NewEntryIteratorForward(histIterators, req.Limit, true)
if err != nil {
return nil, err
}
return newTailer(
time.Duration(req.DelayFor)*time.Second,
tailClients,
reversedIterator,
func(connectedIngestersAddr []string) (map[string]logproto.Querier_TailClient, error) {
return q.tailDisconnectedIngesters(tailCtx, req, connectedIngestersAddr)
},
q.cfg.TailMaxDuration,
tailerWaitEntryThrottle,
), nil
}
// passed to tailer for (re)connecting to new or disconnected ingesters
func (q *Querier) tailDisconnectedIngesters(ctx context.Context, req *logproto.TailRequest, connectedIngestersAddr []string) (map[string]logproto.Querier_TailClient, error) {
// Build a map to easily check if an ingester address is already connected
connected := make(map[string]bool)
for _, addr := range connectedIngestersAddr {
connected[addr] = true
}
// Get the current replication set from the ring
replicationSet, err := q.ring.GetAll()
if err != nil {
return nil, err
}
// Look for disconnected ingesters or new one we should (re)connect to
reconnectIngesters := []ring.IngesterDesc{}
for _, ingester := range replicationSet.Ingesters {
if _, ok := connected[ingester.Addr]; ok {
continue
}
// Skip ingesters which are leaving or joining the cluster
if ingester.State != ring.ACTIVE {
continue
}
reconnectIngesters = append(reconnectIngesters, ingester)
}
if len(reconnectIngesters) == 0 {
return nil, nil
}
// Instance a tail client for each ingester to re(connect)
reconnectClients, err := q.forGivenIngesters(ring.ReplicationSet{Ingesters: reconnectIngesters}, func(client logproto.QuerierClient) (interface{}, error) {
return client.Tail(ctx, req)
})
if err != nil {
return nil, err
}
reconnectClientsMap := make(map[string]logproto.Querier_TailClient)
for _, client := range reconnectClients {
reconnectClientsMap[client.addr] = client.response.(logproto.Querier_TailClient)
}
return reconnectClientsMap, nil
}