Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/integration/loki_micro_services_delete_...

445 lines
13 KiB

//go:build integration
package integration
import (
"context"
"sort"
"strconv"
"testing"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/integration/client"
"github.com/grafana/loki/v3/integration/cluster"
"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/storage"
)
type pushRequest struct {
stream map[string]string
entries []logproto.Entry
}
func TestMicroServicesDeleteRequest(t *testing.T) {
clu := cluster.New(nil, cluster.SchemaWithTSDBAndTSDB, func(c *cluster.Cluster) {
c.SetSchemaVer("v13")
})
defer func() {
assert.NoError(t, clu.Cleanup())
}()
// initially, run only compactor, index-gateway and distributor.
var (
tCompactor = clu.AddComponent(
"compactor",
"-target=compactor",
config: loki better defaults (#10793) **What this PR does / why we need it**: Updates some of the configuration defaults to provide a better experience for users out of the box. I tried to add notes for non-trivial changes explaining the reason. | configuration | new default | old default | notes | | ------------------------------------------------------ | ----------- | ----------- | -------- | `compactor.delete-max-interval` | 24h | 0 | splits the delete requests into intervals no longer than `delete_max_interval` | | `distributor.max-line-size` | 256KB | 0 | - | | `ingester.sync-period` | 1h | 0 | ensures that the chunk cuts for a given stream are synchronized across the ingesters in the replication set. Helps with deduplicating chunks. | | `ingester.sync-min-utilization` | 0.1 | 0 | ^ | | `frontend.max-querier-bytes-read` | 150GB | 0 | - | | `frontend.max-cache-freshness` | 10m | 1m | avoid caching results for data that is still in churn | | `frontend.max-stats-cache-freshness` | 10m | 0 | avoid caching results for data that is still in churn | | `memcached.batchsize` | 256 | 1024 | - | | `memcached.parallelism` | 10 | 100 | - | | `querier.tsdb-max-query-parallelism` | 128 | 512 | avoid over parallelising queries for smaller installations | | `querier.split-queries-by-interval` | 1h | 30m | avoid over parallelising queries for smaller installations | | `querier.compress-http-responses` | true | false | compress response if the request accepts gzip encoding | | `query-scheduler.max-outstanding-requests-per-tenant` | 32000 | 100 | current default is too small, opening a big dashboard could hit this limit | | `validation.max-label-names-per-series` | 15 | 30 | reducing this to avoid blowing up the series count and index size. Also enforces users to think about what labels are being added. | **Special notes for your reviewer**: regarding `TestQueryTSDB_WithCachedPostings`: removed the assert on get calls as this is hard to get right (splits, querySize mw making stats calls) validating additions to the cache and misses is more important here I think. **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [x] Documentation added - [x] Tests updated - [x] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [x] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213)
2 years ago
"-compactor.compaction-interval=1s",
"-compactor.retention-delete-delay=1s",
// By default, a minute is added to the delete request start time. This compensates for that.
config: loki better defaults (#10793) **What this PR does / why we need it**: Updates some of the configuration defaults to provide a better experience for users out of the box. I tried to add notes for non-trivial changes explaining the reason. | configuration | new default | old default | notes | | ------------------------------------------------------ | ----------- | ----------- | -------- | `compactor.delete-max-interval` | 24h | 0 | splits the delete requests into intervals no longer than `delete_max_interval` | | `distributor.max-line-size` | 256KB | 0 | - | | `ingester.sync-period` | 1h | 0 | ensures that the chunk cuts for a given stream are synchronized across the ingesters in the replication set. Helps with deduplicating chunks. | | `ingester.sync-min-utilization` | 0.1 | 0 | ^ | | `frontend.max-querier-bytes-read` | 150GB | 0 | - | | `frontend.max-cache-freshness` | 10m | 1m | avoid caching results for data that is still in churn | | `frontend.max-stats-cache-freshness` | 10m | 0 | avoid caching results for data that is still in churn | | `memcached.batchsize` | 256 | 1024 | - | | `memcached.parallelism` | 10 | 100 | - | | `querier.tsdb-max-query-parallelism` | 128 | 512 | avoid over parallelising queries for smaller installations | | `querier.split-queries-by-interval` | 1h | 30m | avoid over parallelising queries for smaller installations | | `querier.compress-http-responses` | true | false | compress response if the request accepts gzip encoding | | `query-scheduler.max-outstanding-requests-per-tenant` | 32000 | 100 | current default is too small, opening a big dashboard could hit this limit | | `validation.max-label-names-per-series` | 15 | 30 | reducing this to avoid blowing up the series count and index size. Also enforces users to think about what labels are being added. | **Special notes for your reviewer**: regarding `TestQueryTSDB_WithCachedPostings`: removed the assert on get calls as this is hard to get right (splits, querySize mw making stats calls) validating additions to the cache and misses is more important here I think. **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [x] Documentation added - [x] Tests updated - [x] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [x] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213)
2 years ago
"-compactor.delete-request-cancel-period=-60s",
"-compactor.deletion-mode=filter-only",
config: loki better defaults (#10793) **What this PR does / why we need it**: Updates some of the configuration defaults to provide a better experience for users out of the box. I tried to add notes for non-trivial changes explaining the reason. | configuration | new default | old default | notes | | ------------------------------------------------------ | ----------- | ----------- | -------- | `compactor.delete-max-interval` | 24h | 0 | splits the delete requests into intervals no longer than `delete_max_interval` | | `distributor.max-line-size` | 256KB | 0 | - | | `ingester.sync-period` | 1h | 0 | ensures that the chunk cuts for a given stream are synchronized across the ingesters in the replication set. Helps with deduplicating chunks. | | `ingester.sync-min-utilization` | 0.1 | 0 | ^ | | `frontend.max-querier-bytes-read` | 150GB | 0 | - | | `frontend.max-cache-freshness` | 10m | 1m | avoid caching results for data that is still in churn | | `frontend.max-stats-cache-freshness` | 10m | 0 | avoid caching results for data that is still in churn | | `memcached.batchsize` | 256 | 1024 | - | | `memcached.parallelism` | 10 | 100 | - | | `querier.tsdb-max-query-parallelism` | 128 | 512 | avoid over parallelising queries for smaller installations | | `querier.split-queries-by-interval` | 1h | 30m | avoid over parallelising queries for smaller installations | | `querier.compress-http-responses` | true | false | compress response if the request accepts gzip encoding | | `query-scheduler.max-outstanding-requests-per-tenant` | 32000 | 100 | current default is too small, opening a big dashboard could hit this limit | | `validation.max-label-names-per-series` | 15 | 30 | reducing this to avoid blowing up the series count and index size. Also enforces users to think about what labels are being added. | **Special notes for your reviewer**: regarding `TestQueryTSDB_WithCachedPostings`: removed the assert on get calls as this is hard to get right (splits, querySize mw making stats calls) validating additions to the cache and misses is more important here I think. **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [x] Documentation added - [x] Tests updated - [x] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [x] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213)
2 years ago
"-compactor.delete-max-interval=0",
"-limits.per-user-override-period=1s",
)
tDistributor = clu.AddComponent(
"distributor",
"-target=distributor",
)
)
require.NoError(t, clu.Run())
// then, run only ingester and query-scheduler.
var (
tIngester = clu.AddComponent(
"ingester",
"-target=ingester",
"-ingester.flush-on-shutdown=true",
"-ingester.wal-enabled=false",
)
tQueryScheduler = clu.AddComponent(
"query-scheduler",
"-target=query-scheduler",
"-query-scheduler.use-scheduler-ring=false",
)
)
require.NoError(t, clu.Run())
// finally, run the query-frontend and querier.
var (
tQueryFrontend = clu.AddComponent(
"query-frontend",
"-target=query-frontend",
"-frontend.scheduler-address="+tQueryScheduler.GRPCURL(),
"-frontend.default-validity=0s",
"-common.compactor-address="+tCompactor.HTTPURL(),
)
tQuerier = clu.AddComponent(
"querier",
"-target=querier",
"-querier.scheduler-address="+tQueryScheduler.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
)
)
require.NoError(t, clu.Run())
tenantID := randStringRunes()
now := time.Now()
cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL())
cliDistributor.Now = now
cliIngester := client.New(tenantID, "", tIngester.HTTPURL())
cliIngester.Now = now
cliQueryFrontend := client.New(tenantID, "", tQueryFrontend.HTTPURL())
cliQueryFrontend.Now = now
cliCompactor := client.New(tenantID, "", tCompactor.HTTPURL())
cliCompactor.Now = now
var pushRequests []pushRequest
var expectedStreams []client.StreamValues
for _, deletionType := range []string{"filter", "filter_no_match", "nothing", "partially_by_time", "whole"} {
pushRequests = append(pushRequests, pushRequest{
stream: map[string]string{
"job": "fake",
"deletion_type": deletionType,
},
entries: []logproto.Entry{
{
Timestamp: now.Add(-48 * time.Hour),
Line: "lineA",
},
{
Timestamp: now.Add(-48 * time.Hour),
Line: "lineB",
},
{
Timestamp: now.Add(-time.Minute),
Line: "lineC",
},
{
Timestamp: now.Add(-time.Minute),
Line: "lineD",
},
},
})
}
pushRequests = append(pushRequests, pushRequest{
stream: map[string]string{
"job": "fake",
"deletion_type": "with_structured_metadata",
},
entries: []logproto.Entry{
{
Timestamp: now.Add(-48 * time.Hour),
Line: "AlineA",
StructuredMetadata: push.LabelsAdapter{
{
Name: "line",
Value: "A",
},
},
},
{
Timestamp: now.Add(-48 * time.Hour),
Line: "AlineB",
StructuredMetadata: push.LabelsAdapter{
{
Name: "line",
Value: "B",
},
},
},
{
Timestamp: now.Add(-time.Minute),
Line: "AlineC",
StructuredMetadata: push.LabelsAdapter{
{
Name: "line",
Value: "C",
},
},
},
{
Timestamp: now.Add(-time.Minute),
Line: "AlineD",
StructuredMetadata: push.LabelsAdapter{
{
Name: "line",
Value: "D",
},
},
},
},
})
for _, pr := range pushRequests {
expectedStreams = append(expectedStreams, pushRequestToClientStreamValues(t, pr)...)
}
expectedDeleteRequests := []client.DeleteRequest{
{
StartTime: now.Add(-48 * time.Hour).Unix(),
EndTime: now.Unix(),
Query: `{deletion_type="filter"} |= "lineB"`,
Status: "received",
},
{
StartTime: now.Add(-48 * time.Hour).Unix(),
EndTime: now.Unix(),
Query: `{deletion_type="filter_no_match"} |= "foo"`,
Status: "received",
},
{
StartTime: now.Add(-48 * time.Hour).Unix(),
EndTime: now.Add(-10 * time.Minute).Unix(),
Query: `{deletion_type="partially_by_time"}`,
Status: "received",
},
{
StartTime: now.Add(-48 * time.Hour).Unix(),
EndTime: now.Unix(),
Query: `{deletion_type="whole"}`,
Status: "received",
},
{
StartTime: now.Add(-48 * time.Hour).Unix(),
EndTime: now.Unix(),
Query: `{deletion_type="with_structured_metadata"} | line="A"`,
Status: "received",
},
}
validateQueryResponse := func(expectedStreams []client.StreamValues, resp *client.Response) {
t.Helper()
assert.Equal(t, "success", resp.Status)
assert.Equal(t, "streams", resp.Data.ResultType)
require.Len(t, resp.Data.Stream, len(expectedStreams))
sort.Slice(resp.Data.Stream, func(i, j int) bool {
return labels.FromMap(resp.Data.Stream[i].Stream).String() < labels.FromMap(resp.Data.Stream[j].Stream).String()
})
for _, stream := range resp.Data.Stream {
sort.Slice(stream.Values, func(i, j int) bool {
return stream.Values[i][1] < stream.Values[j][1]
})
}
require.Equal(t, expectedStreams, resp.Data.Stream)
}
t.Run("ingest-logs", func(t *testing.T) {
// ingest some log lines
for _, pr := range pushRequests {
for _, entry := range pr.entries {
ingestion: native otlp ingestion support (#10727) **What this PR does / why we need it**: Add support for natively supporting logs ingestion in OTLP format. `/otlp/v1/logs` is the new endpoint where users can push logs in OTLP format. It accepts logs serialized in JSON or proto format. Since OTEL format is very different than what Loki storage model, here is how data in OTEL format will be mapped to Loki data model: * Index labels: The Resource Attributes map quite well to Index labels in Loki since both usually identify the source of the logs. The problem however is that Resource attributes in OTLP can have an unbounded number of values while Loki has a default limit of having up to 30 labels. Since Index labels in Loki can largely drive the kind of querying experience the users are going to have, we have chosen select attributes which would be picked as Index Labels. The ones that are not picked up as Index labels would be stored as Structured Metadata with each log entry. * Timestamp: LogRecord.TimeUnixNano * LogLine: LogRecord.Body holds the body of the log. However, since Loki only supports Log body in string format, we will stringify non-string values using [AsString method from OTEL collector lib](https://github.com/open-telemetry/opentelemetry-collector/blob/ab3d6c5b64701e690aaa340b0a63f443ff22c1f0/pdata/pcommon/value.go#L353). * Structured Metadata: Anything which can’t be stored in Index labels and LogLine. Here is a non-exhaustive list of what will be stored in Structured Metadata to give a sense of what it will hold: * Resource Attributes not stored as Index labels is replicated and stored with each log entry. * Everything under InstrumentationScope is replicated and stored with each log entry. * Everything under LogRecord except LogRecord.Body, LogRecord.TimeUnixNano and sometimes LogRecord.ObservedTimestamp. *NOTES*: * Since Loki does not support `.` or any other special characters other than `_` in label names, we replace all non-supported characters with `_`. * Since Loki only supports string in values of Index Labels and Structured Metadata, all the complex types are converted as follows: * Map would be flattened into label keys using `_` as separator, same as how we do it in [json parser in LogQL](https://grafana.com/docs/loki/latest/query/log_queries/#json). * Everything else is stringified using [AsString method from OTEL collector lib](https://github.com/open-telemetry/opentelemetry-collector/blob/ab3d6c5b64701e690aaa340b0a63f443ff22c1f0/pdata/pcommon/value.go#L353) **Special notes for your reviewer**: I will open follow-up PRs for: * Documentation * Make blessed attributes list configurable per tenant. **Checklist** - [x] Tests updated - [x] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label
2 years ago
require.NoError(t, cliDistributor.PushLogLine(
entry.Line,
entry.Timestamp,
logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata).Map(),
pr.stream,
))
}
}
})
t.Run("query", func(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
// given default value of query_ingesters_within is 3h, older samples won't be present in the response
var es []client.StreamValues
for _, stream := range expectedStreams {
s := client.StreamValues{
Stream: stream.Stream,
Values: nil,
}
for _, sv := range stream.Values {
tsNs, err := strconv.ParseInt(sv[0], 10, 64)
require.NoError(t, err)
if !time.Unix(0, tsNs).Before(now.Add(-3 * time.Hour)) {
s.Values = append(s.Values, sv)
}
}
if len(s.Values) > 0 {
es = append(es, s)
}
}
validateQueryResponse(es, resp)
})
t.Run("flush-logs-and-restart-ingester-querier", func(t *testing.T) {
// restart ingester which should flush the chunks
require.NoError(t, tIngester.Restart())
// ensure that ingester has 0 chunks in memory
cliIngester = client.New(tenantID, "", tIngester.HTTPURL())
cliIngester.Now = now
metrics, err := cliIngester.Metrics()
require.NoError(t, err)
checkMetricValue(t, "loki_ingester_chunks_flushed_total", metrics, 6)
// reset boltdb-shipper client and restart querier
index-shipper: add support for multiple stores (#7754) Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com> **What this PR does / why we need it**: Currently loki initializes a single instance of index-shipper to [handle all the table ranges](https://github.com/grafana/loki/blob/ff7b46297345b215fbf49c2cd4c364d125b6290b/pkg/storage/factory.go#L188) (from across periods) for a given index type `boltdb-shipper, tsdb`. Since index-shipper only has the object client handle to the store defined by `shared_store_type`, it limits the index uploads to a single store. Setting `shared_store_type` to a different store at a later point in time would mean losing access to the indexes stored in the previously configured store. With this PR, we initialize a separate index-shipper & table manager for each period if `shared_store_type` is not explicity configured. This offers the flexibility to store index in multiple stores (across providers). **Note**: - usage of `shared_store_type` in this commit text refers to one of these config options depending on the index in use: `-boltdb.shipper.shared-store`, `-tsdb.shipper.shared-store` - `shared_store_type` used to default to the `object_store` from the latest `period_config` if not explicitly configured. This PR removes these defaults in favor of supporting index uploads to multiple stores. **Which issue(s) this PR fixes**: Fixes #7276 **Special notes for your reviewer**: All the instances of downloads table manager operate on the same cacheDir. But it shouldn't be a problem as the tableRanges do not overlap across periods. **Checklist** - [X] Reviewed the `CONTRIBUTING.md` guide - [ ] Documentation added - [X] Tests updated - [x] `CHANGELOG.md` updated - [x] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` --------- Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com> Co-authored-by: J Stickler <julie.stickler@grafana.com>
2 years ago
storage.ResetBoltDBIndexClientsWithShipper()
require.NoError(t, tQuerier.Restart())
})
// Query lines
t.Run("query again to verify logs being served from storage", func(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
validateQueryResponse(expectedStreams, resp)
})
t.Run("add-delete-requests", func(t *testing.T) {
for _, deleteRequest := range expectedDeleteRequests {
params := client.DeleteRequestParams{
Start: strconv.FormatInt(deleteRequest.StartTime, 10),
End: strconv.FormatInt(deleteRequest.EndTime, 10),
Query: deleteRequest.Query,
}
require.NoError(t, cliCompactor.AddDeleteRequest(params))
}
})
t.Run("read-delete-request", func(t *testing.T) {
deleteRequests, err := cliCompactor.GetDeleteRequests()
require.NoError(t, err)
require.ElementsMatch(t, client.DeleteRequests(expectedDeleteRequests), deleteRequests)
})
// Query lines
t.Run("verify query time filtering", func(t *testing.T) {
// reset boltdb-shipper client and restart querier
index-shipper: add support for multiple stores (#7754) Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com> **What this PR does / why we need it**: Currently loki initializes a single instance of index-shipper to [handle all the table ranges](https://github.com/grafana/loki/blob/ff7b46297345b215fbf49c2cd4c364d125b6290b/pkg/storage/factory.go#L188) (from across periods) for a given index type `boltdb-shipper, tsdb`. Since index-shipper only has the object client handle to the store defined by `shared_store_type`, it limits the index uploads to a single store. Setting `shared_store_type` to a different store at a later point in time would mean losing access to the indexes stored in the previously configured store. With this PR, we initialize a separate index-shipper & table manager for each period if `shared_store_type` is not explicity configured. This offers the flexibility to store index in multiple stores (across providers). **Note**: - usage of `shared_store_type` in this commit text refers to one of these config options depending on the index in use: `-boltdb.shipper.shared-store`, `-tsdb.shipper.shared-store` - `shared_store_type` used to default to the `object_store` from the latest `period_config` if not explicitly configured. This PR removes these defaults in favor of supporting index uploads to multiple stores. **Which issue(s) this PR fixes**: Fixes #7276 **Special notes for your reviewer**: All the instances of downloads table manager operate on the same cacheDir. But it shouldn't be a problem as the tableRanges do not overlap across periods. **Checklist** - [X] Reviewed the `CONTRIBUTING.md` guide - [ ] Documentation added - [X] Tests updated - [x] `CHANGELOG.md` updated - [x] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` --------- Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com> Co-authored-by: J Stickler <julie.stickler@grafana.com>
2 years ago
storage.ResetBoltDBIndexClientsWithShipper()
require.NoError(t, tQuerier.Restart())
// update expectedStreams as per the issued requests
expectedStreams[0].Values = append(expectedStreams[0].Values[:1], expectedStreams[0].Values[2:]...)
expectedStreams[3].Values = expectedStreams[3].Values[2:]
expectedStreams = append(expectedStreams[:4], expectedStreams[6:]...)
// query and verify that we get the resp which matches expectedStreams
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
validateQueryResponse(expectedStreams, resp)
})
// Wait until delete request is finished
t.Run("wait-until-delete-request-processed", func(t *testing.T) {
tenantLimits := tCompactor.GetTenantLimits(tenantID)
tenantLimits.DeletionMode = "filter-and-delete"
require.NoError(t, tCompactor.SetTenantLimits(tenantID, tenantLimits))
// all the delete requests should have been processed
for i := range expectedDeleteRequests {
expectedDeleteRequests[i].Status = "processed"
}
require.Eventually(t, func() bool {
deleteRequests, err := cliCompactor.GetDeleteRequests()
require.NoError(t, err)
outer:
for i := range deleteRequests {
for j := range expectedDeleteRequests {
if deleteRequests[i] == expectedDeleteRequests[j] {
continue outer
}
}
return false
}
return true
}, 20*time.Second, 1*time.Second)
// Check metrics
metrics, err := cliCompactor.Metrics()
require.NoError(t, err)
checkUserLabelAndMetricValue(t, "loki_compactor_delete_requests_processed_total", metrics, tenantID, float64(len(expectedDeleteRequests)))
// ideally this metric should be equal to 2 given that a single line matches the line filter and structured metadata filter
// but the same chunks are indexed in 3 tables
checkUserLabelAndMetricValue(t, "loki_compactor_deleted_lines", metrics, tenantID, 6)
})
// Query lines
t.Run("query-without-query-time-filtering", func(t *testing.T) {
// disable deletion for tenant to stop query time filtering of data requested for deletion
tenantLimits := tQuerier.GetTenantLimits(tenantID)
tenantLimits.DeletionMode = "disabled"
require.NoError(t, tQuerier.SetTenantLimits(tenantID, tenantLimits))
// restart querier to make it sync the index
index-shipper: add support for multiple stores (#7754) Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com> **What this PR does / why we need it**: Currently loki initializes a single instance of index-shipper to [handle all the table ranges](https://github.com/grafana/loki/blob/ff7b46297345b215fbf49c2cd4c364d125b6290b/pkg/storage/factory.go#L188) (from across periods) for a given index type `boltdb-shipper, tsdb`. Since index-shipper only has the object client handle to the store defined by `shared_store_type`, it limits the index uploads to a single store. Setting `shared_store_type` to a different store at a later point in time would mean losing access to the indexes stored in the previously configured store. With this PR, we initialize a separate index-shipper & table manager for each period if `shared_store_type` is not explicity configured. This offers the flexibility to store index in multiple stores (across providers). **Note**: - usage of `shared_store_type` in this commit text refers to one of these config options depending on the index in use: `-boltdb.shipper.shared-store`, `-tsdb.shipper.shared-store` - `shared_store_type` used to default to the `object_store` from the latest `period_config` if not explicitly configured. This PR removes these defaults in favor of supporting index uploads to multiple stores. **Which issue(s) this PR fixes**: Fixes #7276 **Special notes for your reviewer**: All the instances of downloads table manager operate on the same cacheDir. But it shouldn't be a problem as the tableRanges do not overlap across periods. **Checklist** - [X] Reviewed the `CONTRIBUTING.md` guide - [ ] Documentation added - [X] Tests updated - [x] `CHANGELOG.md` updated - [x] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` --------- Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com> Co-authored-by: J Stickler <julie.stickler@grafana.com>
2 years ago
storage.ResetBoltDBIndexClientsWithShipper()
require.NoError(t, tQuerier.Restart())
// ensure the deletion-mode limit is updated
require.Equal(t, "disabled", tQuerier.GetTenantLimits(tenantID).DeletionMode)
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
validateQueryResponse(expectedStreams, resp)
})
}
func checkUserLabelAndMetricValue(t *testing.T, metricName, metrics, tenantID string, expectedValue float64) {
t.Helper()
val, labels, err := extractMetric(metricName, metrics)
require.NoError(t, err)
require.NotNil(t, labels)
require.Len(t, labels, 1)
require.Contains(t, labels, "user")
require.Equal(t, labels["user"], tenantID)
require.Equal(t, expectedValue, val)
}
func checkMetricValue(t *testing.T, metricName, metrics string, expectedValue float64) {
query-scheduler: fix query distribution in SSD mode (#9471) **What this PR does / why we need it**: When we run the `query-scheduler` in `ring` mode, `queriers` and `query-frontend` discover the available `query-scheduler` instances using the ring. However, we have a problem when `query-schedulers` are not running in the same process as queriers and query-frontend since [we try to get the ring client interface from the scheduler instance](https://github.com/grafana/loki/blob/abd6131bba18db7f3575241c5e6dc4eed879fbc0/pkg/loki/modules.go#L358). This causes queries not to be spread across all the available queriers when running in SSD mode because [we point querier workers to query frontend when there is no ring client and scheduler address configured](https://github.com/grafana/loki/blob/b05f4fced305800b32641ae84e3bed5f1794fa7d/pkg/querier/worker_service.go#L115). I have fixed this issue by adding a new hidden target to initialize the ring client in `reader`/`member` mode based on which service is initializing it. `reader` mode will be used by `queriers` and `query-frontend` for discovering `query-scheduler` instances from the ring. `member` mode will be used by `query-schedulers` for registering themselves in the ring. I have also made a couple of changes not directly related to the issue but it fixes some problems: * [reset metric registry for each integration test](https://github.com/grafana/loki/commit/18c4fe59078b649ad6a788a48765b101d0b97618) - Previously we were reusing the same registry for all the tests and just [ignored the attempts to register same metrics](https://github.com/grafana/loki/blob/01f0ded7fcb57e3a7b26ffc1e8e3abf04a403825/integration/cluster/cluster.go#L113). This causes the registry to have metrics registered only from the first test so any updates from subsequent tests won't reflect in the metrics. metrics was the only reliable way for me to verify that `query-schedulers` were connected to `queriers` and `query-frontend` when running in ring mode in the integration test that I added to test my changes. This should also help with other tests where earlier it was hard to reliably check the metrics. * [load config from cli as well before applying dynamic config](https://github.com/grafana/loki/commit/f9e2448fc7e718db107165cd908054c806b84337) - Previously we were applying dynamic config considering just the config from config file. This results in unexpected config changes, for example, [this config change](https://github.com/grafana/loki/blob/4148dd2c51cb827ec3889298508b95ec7731e7fd/integration/loki_micro_services_test.go#L66) was getting ignored and [dynamic config tuning was unexpectedly turning on ring mode](https://github.com/grafana/loki/blob/52cd0a39b8266564352c61ab9b845ab597008770/pkg/loki/config_wrapper.go#L94) in the config. It is better to do any config tuning based on both file and cli args configs. **Which issue(s) this PR fixes**: Fixes #9195
2 years ago
t.Helper()
require.Equal(t, expectedValue, getMetricValue(t, metricName, metrics))
}
func getMetricValue(t *testing.T, metricName, metrics string) float64 {
t.Helper()
val, _, err := extractMetric(metricName, metrics)
require.NoError(t, err)
query-scheduler: fix query distribution in SSD mode (#9471) **What this PR does / why we need it**: When we run the `query-scheduler` in `ring` mode, `queriers` and `query-frontend` discover the available `query-scheduler` instances using the ring. However, we have a problem when `query-schedulers` are not running in the same process as queriers and query-frontend since [we try to get the ring client interface from the scheduler instance](https://github.com/grafana/loki/blob/abd6131bba18db7f3575241c5e6dc4eed879fbc0/pkg/loki/modules.go#L358). This causes queries not to be spread across all the available queriers when running in SSD mode because [we point querier workers to query frontend when there is no ring client and scheduler address configured](https://github.com/grafana/loki/blob/b05f4fced305800b32641ae84e3bed5f1794fa7d/pkg/querier/worker_service.go#L115). I have fixed this issue by adding a new hidden target to initialize the ring client in `reader`/`member` mode based on which service is initializing it. `reader` mode will be used by `queriers` and `query-frontend` for discovering `query-scheduler` instances from the ring. `member` mode will be used by `query-schedulers` for registering themselves in the ring. I have also made a couple of changes not directly related to the issue but it fixes some problems: * [reset metric registry for each integration test](https://github.com/grafana/loki/commit/18c4fe59078b649ad6a788a48765b101d0b97618) - Previously we were reusing the same registry for all the tests and just [ignored the attempts to register same metrics](https://github.com/grafana/loki/blob/01f0ded7fcb57e3a7b26ffc1e8e3abf04a403825/integration/cluster/cluster.go#L113). This causes the registry to have metrics registered only from the first test so any updates from subsequent tests won't reflect in the metrics. metrics was the only reliable way for me to verify that `query-schedulers` were connected to `queriers` and `query-frontend` when running in ring mode in the integration test that I added to test my changes. This should also help with other tests where earlier it was hard to reliably check the metrics. * [load config from cli as well before applying dynamic config](https://github.com/grafana/loki/commit/f9e2448fc7e718db107165cd908054c806b84337) - Previously we were applying dynamic config considering just the config from config file. This results in unexpected config changes, for example, [this config change](https://github.com/grafana/loki/blob/4148dd2c51cb827ec3889298508b95ec7731e7fd/integration/loki_micro_services_test.go#L66) was getting ignored and [dynamic config tuning was unexpectedly turning on ring mode](https://github.com/grafana/loki/blob/52cd0a39b8266564352c61ab9b845ab597008770/pkg/loki/config_wrapper.go#L94) in the config. It is better to do any config tuning based on both file and cli args configs. **Which issue(s) this PR fixes**: Fixes #9195
2 years ago
return val
}
func pushRequestToClientStreamValues(t *testing.T, p pushRequest) []client.StreamValues {
Flag categorize labels on streams response (#10419) We recently introduced support for ingesting and querying structured metadata in Loki. This adds a new dimension to Loki's labels since now we arguably have three categories of labels: _stream_, _structured metadata_, and _parsed_ labels. Depending on the origin of the labels, they should be used in LogQL expressions differently to achieve optimal performance. _stream_ labels should be added to stream matchers, _structured metadata_ labels should be used in a filter expression before any parsing expression, and _parsed_ labels should be placed after the parser expression extracting them. The Grafana UI has a hard time dealing with this same problem. Before https://github.com/grafana/grafana/pull/73955, the filtering functionality in Grafana was broken since it was not able to distinguish between _stream_ and _structured metadata_ labels. Also, as soon as a parser expression was added to the query, filters added by Grafana would be appended to the end of the query regardless of the label category. The PR above implements a workaround for this problem but needs a better API on Loki's end to mitigate all corner cases. Loki currently returns the following JSON for log queries: ```json ... { "stream": { "cluster": "us-central", "container": "query-frontend", "namespace": "loki", "level": "info", "traceID": "68810cf0c94bfcca" }, "values": [ [ "1693996529000222496", "1693996529000222496 aaaaaaaaa.....\n" ], ... }, { "stream": { "cluster": "us-central", "container": "query-frontend", "namespace": "loki", "level": "debug", "traceID": "a7116cj54c4bjz8s" }, "values": [ [ "1693996529000222497", "1693996529000222497 bbbbbbbbb.....\n" ], ... }, ... ``` As can be seen, there is no way we can distinguish the category of each label. This PR introduces a new flag `X-Loki-Response-Encoding-Flags: categorize-labels` that makes Loki return categorized labels as follows: ```json ... { "stream": { "cluster": "us-central", "container": "query-frontend", "namespace": "loki", }, "values": [ [ "1693996529000222496", "1693996529000222496 aaaaaaaaa.....\n", { "structuredMetadata": { "traceID": "68810cf0c94bfcca" }, "parsed": { "level": "info" } } ], [ "1693996529000222497", "1693996529000222497 bbbbbbbbb.....\n", { "structuredMetadata": { "traceID": "a7116cj54c4bjz8s" }, "parsed": { "level": "debug" } } ], ... }, ... ``` Note that this PR only supports log queries, not metric queries. From a UX perspective, being able to categorize labels in metric queries doesn't have any benefit yet. Having said that, supporting this for metric queries would require some minor refactoring on top of what has been implemented here. If we decide to do that, I think we should do it on a separate PR to avoid making this PR even larger. I also decided to leave out support for Tail queries to avoid making this PR even larger. Once this one gets merged, we can work to support tailing. --- **Note to reviewers** This PR is huge since we need to forward categorized all over the codebase (from parsing logs all the way to marshaling), fortunately, many of the changes come from updating tests and refactoring iterators. Tested out in a dev cell with query `'{stream="stdout"} | label_format new="text"`. - Without the new flag: ``` $ http http://127.0.0.1:3100/loki/api/v1/query_range\?direction\=BACKWARD\&end\=1693996529322486000\&limit\=30\&query\=%7Bstream%3D%22stdout%22%7D+%7C+label_format+new%3D%22text%22\&start\=1693992929322486000 X-Scope-Orgid:REDACTED { "data": { "result": [ { "stream": { "new": "text", "pod": "loki-canary-986bd6f4b-xqmb7", "stream": "stdout" }, "values": [ [ "1693996529000222496", "1693996529000222496 pppppppppppp...\n" ], [ "1693996528499160852", "1693996528499160852 pppppppppppp...\n" ], ... ``` - With the new flag ``` $ http http://127.0.0.1:3100/loki/api/v1/query_range\?direction\=BACKWARD\&end\=1693996529322486000\&limit\=30\&query\=%7Bstream%3D%22stdout%22%7D+%7C+label_format+new%3D%22text%22\&start\=1693992929322486000 X-Scope-Orgid:REDACTED X-Loki-Response-Encoding-Flags:categorize-labels { "data": { "encodingFlags": [ "categorize-labels" ], "result": [ { "stream": { "pod": "loki-canary-986bd6f4b-xqmb7", "stream": "stdout" }, "values": [ [ "1693996529000222496", "1693996529000222496 pppppppppppp...\n", { "parsed": { "new": "text" } } ], [ "1693996528499160852", "1693996528499160852 pppppppppppp...\n", { "parsed": { "new": "text" } } ], ... ```
2 years ago
logsByStream := map[string][]client.Entry{}
for _, entry := range p.entries {
lb := labels.NewBuilder(labels.FromMap(p.stream))
for _, l := range entry.StructuredMetadata {
lb.Set(l.Name, l.Value)
}
stream := lb.Labels().String()
logsByStream[stream] = append(logsByStream[stream], []string{
strconv.FormatInt(entry.Timestamp.UnixNano(), 10),
entry.Line,
})
}
var svs []client.StreamValues
for stream, values := range logsByStream {
parsedLabels, err := syntax.ParseLabels(stream)
require.NoError(t, err)
svs = append(svs, client.StreamValues{
Stream: parsedLabels.Map(),
Values: values,
})
}
sort.Slice(svs, func(i, j int) bool {
return labels.FromMap(svs[i].Stream).String() < labels.FromMap(svs[j].Stream).String()
})
return svs
}