Logql Analyzer (#6171)

* created logql-debugger page and API endpoint that returns information for each stage of the query

* added docker-compose setup for local testing

* refactored analyzer and added unit tests

* removed docs page to release it in a separate PR when back-end API is published

* changes structure of results object in response to return origin log line needed for case when only stream selector is sent in query

* merged the latest main and resolved conflicts

* added endpoint for readiness probe and configured CI

* fixed drone.yml discrepancy

* fixed method signature

* fixed path to Dockerfile

* added clean step
k113
Vladyslav Diachenko 3 years ago committed by GitHub
parent 9e84648f3e
commit 615941234a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 26
      .drone/drone.jsonnet
  2. 54
      .drone/drone.yml
  3. 7
      Makefile
  4. 13
      cmd/logql-analyzer/Dockerfile
  5. 16
      cmd/logql-analyzer/docker-compose.yaml
  6. 52
      cmd/logql-analyzer/main.go
  7. 6
      pkg/logql/log/labels.go
  8. 24
      pkg/logql/log/pipeline.go
  9. 164
      pkg/logqlanalyzer/analyzer.go
  10. 84
      pkg/logqlanalyzer/analyzer_test.go
  11. 94
      pkg/logqlanalyzer/http.go

@ -311,6 +311,31 @@ local lokioperator(arch) = pipeline('lokioperator-' + arch) + arch_image(arch) {
depends_on: ['check'],
};
local logql_analyzer() = pipeline('logql-analyzer') + arch_image('amd64') {
steps+: [
// dry run for everything that is not tag or main
docker('amd64', 'logql-analyzer') {
depends_on: ['image-tag'],
when: onPRs,
settings+: {
dry_run: true,
repo: 'grafana/logql-analyzer',
},
},
] + [
// publish for tag or main
docker('amd64', 'logql-analyzer') {
depends_on: ['image-tag'],
when: onTagOrMain,
settings+: {
repo: 'grafana/logql-analyzer',
},
},
],
depends_on: ['check'],
};
local multiarch_image(arch) = pipeline('docker-' + arch) + arch_image(arch) {
steps+: [
// dry run for everything that is not tag or main
@ -577,6 +602,7 @@ local manifest_ecr(apps, archs) = pipeline('manifest-ecr') {
],
},
promtail_win(),
logql_analyzer(),
pipeline('release') {
trigger+: {
event: ['pull_request', 'tag'],

@ -1179,6 +1179,58 @@ trigger:
- refs/tags/v*
- refs/pull/*/head
---
depends_on:
- check
kind: pipeline
name: logql-analyzer
platform:
arch: amd64
os: linux
steps:
- commands:
- apk add --no-cache bash git
- git fetch origin --tags
- echo $(./tools/image-tag)-amd64 > .tags
image: alpine
name: image-tag
- depends_on:
- image-tag
image: plugins/docker
name: build-logql-analyzer-image
settings:
dockerfile: cmd/logql-analyzer/Dockerfile
dry_run: true
password:
from_secret: docker_password
repo: grafana/logql-analyzer
username:
from_secret: docker_username
when:
event:
- pull_request
- depends_on:
- image-tag
image: plugins/docker
name: publish-logql-analyzer-image
settings:
dockerfile: cmd/logql-analyzer/Dockerfile
dry_run: false
password:
from_secret: docker_password
repo: grafana/logql-analyzer
username:
from_secret: docker_username
when:
event:
- push
- tag
trigger:
ref:
- refs/heads/main
- refs/heads/k???
- refs/tags/v*
- refs/pull/*/head
---
image_pull_secrets:
- dockerconfigjson
kind: pipeline
@ -1476,6 +1528,6 @@ kind: secret
name: gpg_private_key
---
kind: signature
hmac: f97eb90ab5198649edd375255530188042f0b35172ce589e51362f5a5fef20ca
hmac: a9b8921396d009dc5ff6a4bb9057525fca82a4ec04e545de7056a3318bd04ce6
...

@ -290,6 +290,7 @@ clean:
rm -rf clients/cmd/fluent-bit/out_grafana_loki.h
rm -rf clients/cmd/fluent-bit/out_grafana_loki.so
rm -rf cmd/migrate/migrate
rm -rf cmd/logql-analyzer/logql-analyzer
go clean ./...
#########
@ -544,6 +545,12 @@ loki-querytee-push: loki-querytee-image-cross
migrate-image:
$(SUDO) docker build -t $(IMAGE_PREFIX)/loki-migrate:$(IMAGE_TAG) -f cmd/migrate/Dockerfile .
# LogQL Analyzer
logql-analyzer-image:
$(SUDO) docker build -t $(IMAGE_PREFIX)/logql-analyzer:$(IMAGE_TAG) -f cmd/logql-analyzer/Dockerfile .
logql-analyzer-push: logql-analyzer-image
$(call push-image,logql-analyzer)
# build-image (only amd64)
build-image: OCI_PLATFORMS=

@ -0,0 +1,13 @@
FROM golang:1.17.9 as build
COPY . /src/loki
WORKDIR /src/loki
RUN make clean && CGO_ENABLED=0 go build ./cmd/logql-analyzer/
FROM alpine:3.15.4
RUN apk add --no-cache ca-certificates
COPY --from=build /src/loki/logql-analyzer /usr/bin/logql-analyzer
ENTRYPOINT [ "/usr/bin/logql-analyzer" ]

@ -0,0 +1,16 @@
version: "3.3"
services:
backend:
build:
context: ../../
dockerfile: ./cmd/logql-analyzer/Dockerfile
entrypoint: [ "/usr/bin/logql-analyzer", "--server.http-listen-port=3001" ]
ports:
- "3001:3001"
docs:
image: grafana/docs-base:latest
volumes:
- ../../docs/sources:/hugo/content/docs/loki/latest
ports:
- "3002:3002"

@ -0,0 +1,52 @@
package main
import (
"flag"
"net/http"
"github.com/go-kit/log/level"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/server"
"github.com/grafana/loki/pkg/logqlanalyzer"
util_log "github.com/grafana/loki/pkg/util/log"
)
func main() {
cfg := getConfig()
util_log.InitLogger(&server.Config{
LogLevel: cfg.LogLevel,
}, prometheus.DefaultRegisterer)
s, err := createServer(cfg)
if err != nil {
level.Error(util_log.Logger).Log("msg", "error while creating the server", "err", err)
}
err = s.Run()
defer s.Shutdown()
if err != nil {
level.Error(util_log.Logger).Log("msg", "error while running the server", "err", err)
}
}
func getConfig() server.Config {
cfg := server.Config{}
cfg.RegisterFlags(flag.CommandLine)
flag.Parse()
return cfg
}
func createServer(cfg server.Config) (*server.Server, error) {
s, err := server.New(cfg)
if err != nil {
return nil, err
}
s.HTTP.Use(mux.CORSMethodMiddleware(s.HTTP))
s.HTTP.Use(logqlanalyzer.CorsMiddleware())
s.HTTP.Handle("/api/logql-analyze", &logqlanalyzer.LogQLAnalyzeHandler{}).Methods(http.MethodPost, http.MethodOptions)
s.HTTP.HandleFunc("/ready", func(w http.ResponseWriter, _ *http.Request) {
http.Error(w, "ready", http.StatusOK)
}).Methods(http.MethodGet)
return s, err
}

@ -229,7 +229,7 @@ func (b *LabelsBuilder) Set(n, v string) *LabelsBuilder {
// Labels returns the labels from the builder. If no modifications
// were made, the original labels are returned.
func (b *LabelsBuilder) labels() labels.Labels {
b.buf = b.unsortedLabels(b.buf)
b.buf = b.UnsortedLabels(b.buf)
sort.Sort(b.buf)
return b.buf
}
@ -244,7 +244,7 @@ func (b *LabelsBuilder) appendErrors(buf labels.Labels) labels.Labels {
return buf
}
func (b *LabelsBuilder) unsortedLabels(buf labels.Labels) labels.Labels {
func (b *LabelsBuilder) UnsortedLabels(buf labels.Labels) labels.Labels {
if len(b.del) == 0 && len(b.add) == 0 {
if buf == nil {
buf = make(labels.Labels, 0, len(b.base)+1)
@ -287,7 +287,7 @@ func (b *LabelsBuilder) Map() map[string]string {
}
return b.baseMap
}
b.buf = b.unsortedLabels(b.buf)
b.buf = b.UnsortedLabels(b.buf)
// todo should we also cache maps since limited by the result ?
// Maps also don't create a copy of the labels.
res := make(map[string]string, len(b.buf))

@ -100,12 +100,27 @@ func (fn StageFunc) RequiredLabelNames() []string {
// pipeline is a combinations of multiple stages.
// It can also be reduced into a single stage for convenience.
type pipeline struct {
AnalyzablePipeline
stages []Stage
baseBuilder *BaseLabelsBuilder
streamPipelines map[uint64]StreamPipeline
}
func (p *pipeline) Stages() []Stage {
return p.stages
}
func (p *pipeline) LabelsBuilder() *BaseLabelsBuilder {
return p.baseBuilder
}
type AnalyzablePipeline interface {
Pipeline
Stages() []Stage
LabelsBuilder() *BaseLabelsBuilder
}
// NewPipeline creates a new pipeline for a given set of stages.
func NewPipeline(stages []Stage) Pipeline {
if len(stages) == 0 {
@ -123,16 +138,17 @@ type streamPipeline struct {
builder *LabelsBuilder
}
func NewStreamPipeline(stages []Stage, labelsBuilder *LabelsBuilder) StreamPipeline {
return &streamPipeline{stages, labelsBuilder}
}
func (p *pipeline) ForStream(labels labels.Labels) StreamPipeline {
hash := p.baseBuilder.Hash(labels)
if res, ok := p.streamPipelines[hash]; ok {
return res
}
res := &streamPipeline{
stages: p.stages,
builder: p.baseBuilder.ForLabels(labels, hash),
}
res := NewStreamPipeline(p.stages, p.baseBuilder.ForLabels(labels, hash))
p.streamPipelines[hash] = res
return res
}

@ -0,0 +1,164 @@
package logqlanalyzer
import (
"fmt"
"time"
"unsafe"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/logql/syntax"
)
type logQLAnalyzer struct {
}
func (a logQLAnalyzer) analyze(query string, logs []string) (*Result, error) {
expr, err := syntax.ParseLogSelector(query, true)
if err != nil {
return nil, errors.Wrap(err, "invalid query")
}
streamSelector, stages, err := a.extractExpressionParts(expr)
if err != nil {
return nil, errors.Wrap(err, "can not extract parts of expression")
}
pipeline, err := expr.Pipeline()
if err != nil {
return nil, errors.Wrap(err, "can not create pipeline")
}
streamLabels, err := parser.ParseMetric(streamSelector)
if err != nil {
return nil, errors.Wrap(err, "can not parse labels from stream selector")
}
analyzer := NewPipelineAnalyzer(pipeline, streamLabels)
response := &Result{StreamSelector: streamSelector, Stages: stages, Results: make([]LineResult, 0, len(logs))}
for _, line := range logs {
analysisRecords := analyzer.AnalyzeLine(line)
response.Results = append(response.Results, mapAllToLineResult(line, analysisRecords))
}
return response, nil
}
func (a logQLAnalyzer) extractExpressionParts(expr syntax.LogSelectorExpr) (string, []string, error) {
switch expr := expr.(type) {
case *syntax.PipelineExpr:
stages := make([]string, 0, len(expr.MultiStages)+1)
streamSelector := expr.Left.String()
for _, stage := range expr.MultiStages {
stages = append(stages, stage.String())
}
return streamSelector, stages, nil
case *syntax.MatchersExpr:
return expr.String(), []string{}, nil
default:
return "", nil, fmt.Errorf("unsupported type of expression")
}
}
func mapAllToLineResult(originLine string, analysisRecords []StageAnalysisRecord) LineResult {
stageRecords := make([]StageRecord, 0, len(analysisRecords))
for _, record := range analysisRecords {
if !record.Processed {
break
}
stageRecords = append(stageRecords, StageRecord{
LineBefore: record.LineBefore,
LabelsBefore: mapAllToLabelsResponse(record.LabelsBefore),
LineAfter: record.LineAfter,
LabelsAfter: mapAllToLabelsResponse(record.LabelsAfter),
FilteredOut: record.FilteredOut,
})
}
return LineResult{originLine, stageRecords}
}
func mapAllToLabelsResponse(labels labels.Labels) []Label {
result := make([]Label, 0, len(labels))
for _, label := range labels {
result = append(result, Label{Name: label.Name, Value: label.Value})
}
return result
}
type PipelineAnalyzer interface {
AnalyzeLine(line string) []StageAnalysisRecord
}
type noopPipelineAnalyzer struct {
}
func (n noopPipelineAnalyzer) AnalyzeLine(_ string) []StageAnalysisRecord {
return []StageAnalysisRecord{}
}
type streamPipelineAnalyzer struct {
origin log.AnalyzablePipeline
stagesCount int
streamLabels labels.Labels
}
func NewPipelineAnalyzer(origin log.Pipeline, streamLabels labels.Labels) PipelineAnalyzer {
if o, ok := origin.(log.AnalyzablePipeline); ok {
stagesCount := len(o.Stages())
return &streamPipelineAnalyzer{o, stagesCount, streamLabels}
}
return &noopPipelineAnalyzer{}
}
func (p streamPipelineAnalyzer) AnalyzeLine(line string) []StageAnalysisRecord {
stages := p.origin.Stages()
stageRecorders := make([]log.Stage, 0, len(stages))
records := make([]StageAnalysisRecord, len(stages))
for i, stage := range stages {
stageRecorders = append(stageRecorders, StageAnalysisRecorder{origin: stage,
records: records,
stageIndex: i,
})
}
stream := log.NewStreamPipeline(stageRecorders, p.origin.LabelsBuilder().ForLabels(p.streamLabels, p.streamLabels.Hash()))
_, _, _ = stream.ProcessString(time.Now().UnixMilli(), line)
return records
}
type StageAnalysisRecorder struct {
log.Stage
origin log.Stage
stageIndex int
records []StageAnalysisRecord
}
func (s StageAnalysisRecorder) Process(ts int64, line []byte, lbs *log.LabelsBuilder) ([]byte, bool) {
lineBefore := unsafeGetString(line)
labelsBefore := lbs.UnsortedLabels(nil)
lineResult, ok := s.origin.Process(ts, line, lbs)
s.records[s.stageIndex] = StageAnalysisRecord{
Processed: true,
LabelsBefore: labelsBefore,
LineBefore: lineBefore,
LabelsAfter: lbs.UnsortedLabels(nil),
LineAfter: unsafeGetString(lineResult),
FilteredOut: !ok,
}
return lineResult, ok
}
func (s StageAnalysisRecorder) RequiredLabelNames() []string {
return s.origin.RequiredLabelNames()
}
type StageAnalysisRecord struct {
Processed bool
LineBefore string
LabelsBefore labels.Labels
LineAfter string
LabelsAfter labels.Labels
FilteredOut bool
}
func unsafeGetString(buf []byte) string {
return *((*string)(unsafe.Pointer(&buf)))
}

@ -0,0 +1,84 @@
package logqlanalyzer
import (
"testing"
"github.com/stretchr/testify/require"
)
func Test_logQLAnalyzer_analyze_stages(t *testing.T) {
tests := map[string]struct {
query string
expectedStreamSelector string
expectedStages []string
}{
"expected 2 stages and streamSelector to be detected": {
query: "{job=\"analyze\"} | json |= \"info\"",
expectedStreamSelector: "{job=\"analyze\"}",
expectedStages: []string{
"| json",
"|= \"info\"",
},
},
"expected 2 stages and streamSelector to be detected even if query contains 4 stages": {
query: "{job=\"analyze\"} | pattern \"<_> <level> <msg>\" |= \"info\" |~ \"some_expr\"",
expectedStreamSelector: "{job=\"analyze\"}",
expectedStages: []string{
"| pattern \"<_> <level> <msg>\"",
"|= \"info\" |~ \"some_expr\"",
},
},
}
for name, data := range tests {
t.Run(name, func(t *testing.T) {
result, err := logQLAnalyzer{}.analyze(data.query, []string{})
require.NoError(t, err)
require.Equal(t, data.expectedStreamSelector, result.StreamSelector)
require.Equal(t, data.expectedStages, result.Stages)
})
}
}
func Test_logQLAnalyzer_analyze_expected_1_stage_record_for_each_log_line(t *testing.T) {
line0 := "lvl=error msg=a"
line1 := "lvl=info msg=b"
result, err := logQLAnalyzer{}.analyze("{job=\"analyze\"} | logfmt", []string{line0, line1})
require.NoError(t, err)
require.Equal(t, 2, len(result.Results))
require.Equal(t, 1, len(result.Results[0].StageRecords))
require.Equal(t, 1, len(result.Results[1].StageRecords))
}
func Test_logQLAnalyzer_analyze_expected_all_stage_records_to_be_correct(t *testing.T) {
line := "lvl=error msg=a"
reformattedLine := "level=error message=A"
result, err := logQLAnalyzer{}.analyze("{job=\"analyze\"} | logfmt | line_format \"level={{.lvl}} message={{.msg | ToUpper}}\" |= \"info\"", []string{line})
require.NoError(t, err)
require.Equal(t, 1, len(result.Results))
require.Equal(t, 3, len(result.Results[0].StageRecords), "expected records for two stages")
streamLabels := []Label{{"job", "analyze"}}
parsedLabels := append(streamLabels, []Label{{"lvl", "error"}, {"msg", "a"}}...)
require.Equal(t, StageRecord{
LineBefore: line,
LabelsBefore: streamLabels,
LineAfter: line,
LabelsAfter: parsedLabels,
FilteredOut: false,
}, result.Results[0].StageRecords[0])
require.Equal(t, StageRecord{
LineBefore: line,
LabelsBefore: parsedLabels,
LineAfter: reformattedLine,
LabelsAfter: parsedLabels,
FilteredOut: false,
}, result.Results[0].StageRecords[1], "line is expected to be reformatted on this stage")
require.Equal(t, StageRecord{
LineBefore: reformattedLine,
LabelsBefore: parsedLabels,
LineAfter: reformattedLine,
LabelsAfter: parsedLabels,
FilteredOut: true,
}, result.Results[0].StageRecords[2], "line is expected to be filtered out on this stage")
}

@ -0,0 +1,94 @@
package logqlanalyzer
import (
"context"
"encoding/json"
"io/ioutil"
"net/http"
"github.com/go-kit/log/level"
"github.com/gorilla/mux"
util_log "github.com/grafana/loki/pkg/util/log"
)
func CorsMiddleware() mux.MiddlewareFunc {
return func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
if r.Method == http.MethodOptions {
w.WriteHeader(200)
return
}
h.ServeHTTP(w, r)
})
}
}
type LogQLAnalyzeHandler struct {
analyzer logQLAnalyzer
}
func (s *LogQLAnalyzeHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
payload, err := ioutil.ReadAll(req.Body)
if err != nil {
writeError(req.Context(), w, err, http.StatusBadRequest, "unable to read request body")
return
}
requestBody := &Request{}
err = json.Unmarshal(payload, requestBody)
if err != nil {
writeError(req.Context(), w, err, http.StatusBadRequest, "unable unmarshal request body")
return
}
result, err := s.analyzer.analyze(requestBody.Query, requestBody.Logs)
if err != nil {
writeError(req.Context(), w, err, http.StatusBadRequest, "unable to analyze query")
return
}
responseBody, err := json.Marshal(result)
if err != nil {
writeError(req.Context(), w, err, http.StatusInternalServerError, "can not marshal the response")
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
if n, err := w.Write(responseBody); err != nil {
level.Error(util_log.WithContext(req.Context(), util_log.Logger)).Log("msg", "error writing response", "bytesWritten", n, "err", err)
}
}
func writeError(ctx context.Context, w http.ResponseWriter, err error, statusCode int, msg string) {
level.Error(util_log.WithContext(ctx, util_log.Logger)).Log("msg", msg, "err", err)
http.Error(w, err.Error(), statusCode)
}
type Request struct {
Query string `json:"query"`
Logs []string `json:"logs"`
}
type Result struct {
StreamSelector string `json:"stream_selector"`
Stages []string `json:"stages"`
Results []LineResult `json:"results"`
}
type LineResult struct {
OriginLine string `json:"origin_line"`
StageRecords []StageRecord `json:"stage_records"`
}
type StageRecord struct {
LineBefore string `json:"line_before"`
LabelsBefore []Label `json:"labels_before"`
LineAfter string `json:"line_after"`
LabelsAfter []Label `json:"labels_after"`
FilteredOut bool `json:"filtered_out"`
}
type Label struct {
Name string `json:"name"`
Value string `json:"value"`
}
Loading…
Cancel
Save