|
|
|
|
@ -3,11 +3,13 @@ package querier |
|
|
|
|
import ( |
|
|
|
|
"context" |
|
|
|
|
"flag" |
|
|
|
|
"net/http" |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
cortex_client "github.com/cortexproject/cortex/pkg/ingester/client" |
|
|
|
|
"github.com/cortexproject/cortex/pkg/ring" |
|
|
|
|
"github.com/cortexproject/cortex/pkg/util" |
|
|
|
|
"github.com/go-kit/kit/log/level" |
|
|
|
|
"github.com/prometheus/common/model" |
|
|
|
|
"google.golang.org/grpc/health/grpc_health_v1" |
|
|
|
|
|
|
|
|
|
@ -18,6 +20,8 @@ import ( |
|
|
|
|
"github.com/grafana/loki/pkg/storage" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
var readinessProbeSuccess = []byte("Ready") |
|
|
|
|
|
|
|
|
|
// Config for a querier.
|
|
|
|
|
type Config struct { |
|
|
|
|
TailMaxDuration time.Duration `yaml:"tail_max_duration"` |
|
|
|
|
@ -63,6 +67,21 @@ type responseFromIngesters struct { |
|
|
|
|
response interface{} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// ReadinessHandler is used to indicate to k8s when the querier is ready.
|
|
|
|
|
// Returns 200 when the querier is ready, 500 otherwise.
|
|
|
|
|
func (q *Querier) ReadinessHandler(w http.ResponseWriter, r *http.Request) { |
|
|
|
|
_, err := q.ring.GetAll() |
|
|
|
|
if err != nil { |
|
|
|
|
http.Error(w, "Not ready: "+err.Error(), http.StatusInternalServerError) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
w.WriteHeader(http.StatusOK) |
|
|
|
|
if _, err := w.Write(readinessProbeSuccess); err != nil { |
|
|
|
|
level.Error(util.Logger).Log("msg", "error writing success message", "error", err) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// forAllIngesters runs f, in parallel, for all ingesters
|
|
|
|
|
// TODO taken from Cortex, see if we can refactor out an usable interface.
|
|
|
|
|
func (q *Querier) forAllIngesters(f func(logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) { |
|
|
|
|
|