Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/integration/loki_micro_services_test.go

756 lines
24 KiB

package integration
import (
"context"
"strings"
"testing"
"time"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"github.com/grafana/loki/integration/client"
"github.com/grafana/loki/integration/cluster"
index-shipper: add support for multiple stores (#7754) Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com> **What this PR does / why we need it**: Currently loki initializes a single instance of index-shipper to [handle all the table ranges](https://github.com/grafana/loki/blob/ff7b46297345b215fbf49c2cd4c364d125b6290b/pkg/storage/factory.go#L188) (from across periods) for a given index type `boltdb-shipper, tsdb`. Since index-shipper only has the object client handle to the store defined by `shared_store_type`, it limits the index uploads to a single store. Setting `shared_store_type` to a different store at a later point in time would mean losing access to the indexes stored in the previously configured store. With this PR, we initialize a separate index-shipper & table manager for each period if `shared_store_type` is not explicity configured. This offers the flexibility to store index in multiple stores (across providers). **Note**: - usage of `shared_store_type` in this commit text refers to one of these config options depending on the index in use: `-boltdb.shipper.shared-store`, `-tsdb.shipper.shared-store` - `shared_store_type` used to default to the `object_store` from the latest `period_config` if not explicitly configured. This PR removes these defaults in favor of supporting index uploads to multiple stores. **Which issue(s) this PR fixes**: Fixes #7276 **Special notes for your reviewer**: All the instances of downloads table manager operate on the same cacheDir. But it shouldn't be a problem as the tableRanges do not overlap across periods. **Checklist** - [X] Reviewed the `CONTRIBUTING.md` guide - [ ] Documentation added - [X] Tests updated - [x] `CHANGELOG.md` updated - [x] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` --------- Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com> Co-authored-by: J Stickler <julie.stickler@grafana.com>
3 years ago
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/util/querylimits"
)
func TestMicroServicesIngestQuery(t *testing.T) {
clu := cluster.New(nil)
defer func() {
assert.NoError(t, clu.Cleanup())
}()
// run initially the compactor, indexgateway, and distributor.
var (
tCompactor = clu.AddComponent(
"compactor",
"-target=compactor",
"-boltdb.shipper.compactor.compaction-interval=1s",
"-boltdb.shipper.compactor.retention-delete-delay=1s",
// By default, a minute is added to the delete request start time. This compensates for that.
"-boltdb.shipper.compactor.delete-request-cancel-period=-60s",
"-compactor.deletion-mode=filter-and-delete",
)
tIndexGateway = clu.AddComponent(
"index-gateway",
"-target=index-gateway",
)
tDistributor = clu.AddComponent(
"distributor",
"-target=distributor",
)
)
require.NoError(t, clu.Run())
// then, run only the ingester and query scheduler.
var (
tIngester = clu.AddComponent(
"ingester",
"-target=ingester",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
tQueryScheduler = clu.AddComponent(
"query-scheduler",
"-target=query-scheduler",
"-query-scheduler.use-scheduler-ring=false",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
)
require.NoError(t, clu.Run())
// finally, run the query-frontend and querier.
var (
tQueryFrontend = clu.AddComponent(
"query-frontend",
"-target=query-frontend",
"-frontend.scheduler-address="+tQueryScheduler.GRPCURL(),
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
"-querier.per-request-limits-enabled=true",
"-frontend.required-query-response-format=protobuf",
)
_ = clu.AddComponent(
"querier",
"-target=querier",
"-querier.scheduler-address="+tQueryScheduler.GRPCURL(),
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
)
)
require.NoError(t, clu.Run())
tenantID := randStringRunes()
now := time.Now()
cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL())
cliDistributor.Now = now
cliIngester := client.New(tenantID, "", tIngester.HTTPURL())
cliIngester.Now = now
cliQueryFrontend := client.New(tenantID, "", tQueryFrontend.HTTPURL())
cliQueryFrontend.Now = now
index-shipper: add support for multiple stores (#7754) Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com> **What this PR does / why we need it**: Currently loki initializes a single instance of index-shipper to [handle all the table ranges](https://github.com/grafana/loki/blob/ff7b46297345b215fbf49c2cd4c364d125b6290b/pkg/storage/factory.go#L188) (from across periods) for a given index type `boltdb-shipper, tsdb`. Since index-shipper only has the object client handle to the store defined by `shared_store_type`, it limits the index uploads to a single store. Setting `shared_store_type` to a different store at a later point in time would mean losing access to the indexes stored in the previously configured store. With this PR, we initialize a separate index-shipper & table manager for each period if `shared_store_type` is not explicity configured. This offers the flexibility to store index in multiple stores (across providers). **Note**: - usage of `shared_store_type` in this commit text refers to one of these config options depending on the index in use: `-boltdb.shipper.shared-store`, `-tsdb.shipper.shared-store` - `shared_store_type` used to default to the `object_store` from the latest `period_config` if not explicitly configured. This PR removes these defaults in favor of supporting index uploads to multiple stores. **Which issue(s) this PR fixes**: Fixes #7276 **Special notes for your reviewer**: All the instances of downloads table manager operate on the same cacheDir. But it shouldn't be a problem as the tableRanges do not overlap across periods. **Checklist** - [X] Reviewed the `CONTRIBUTING.md` guide - [ ] Documentation added - [X] Tests updated - [x] `CHANGELOG.md` updated - [x] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` --------- Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com> Co-authored-by: J Stickler <julie.stickler@grafana.com>
3 years ago
t.Run("ingest-logs", func(t *testing.T) {
// ingest some log lines
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineA", now.Add(-45*time.Minute), map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineB", now.Add(-45*time.Minute), map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("lineC", map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("lineD", map[string]string{"job": "fake"}))
})
t.Run("query", func(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines)
})
t.Run("label-names", func(t *testing.T) {
resp, err := cliQueryFrontend.LabelNames(context.Background())
require.NoError(t, err)
assert.ElementsMatch(t, []string{"job"}, resp)
})
t.Run("label-values", func(t *testing.T) {
resp, err := cliQueryFrontend.LabelValues(context.Background(), "job")
require.NoError(t, err)
assert.ElementsMatch(t, []string{"fake"}, resp)
})
t.Run("per-request-limits", func(t *testing.T) {
queryLimitsPolicy := client.InjectHeadersOption(map[string][]string{querylimits.HTTPHeaderQueryLimitsKey: {`{"maxQueryLength": "1m"}`}})
cliQueryFrontendLimited := client.New(tenantID, "", tQueryFrontend.HTTPURL(), queryLimitsPolicy)
cliQueryFrontendLimited.Now = now
_, err := cliQueryFrontendLimited.LabelNames(context.Background())
require.ErrorContains(t, err, "the query time range exceeds the limit (query length")
})
}
index-shipper: add support for multiple stores (#7754) Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com> **What this PR does / why we need it**: Currently loki initializes a single instance of index-shipper to [handle all the table ranges](https://github.com/grafana/loki/blob/ff7b46297345b215fbf49c2cd4c364d125b6290b/pkg/storage/factory.go#L188) (from across periods) for a given index type `boltdb-shipper, tsdb`. Since index-shipper only has the object client handle to the store defined by `shared_store_type`, it limits the index uploads to a single store. Setting `shared_store_type` to a different store at a later point in time would mean losing access to the indexes stored in the previously configured store. With this PR, we initialize a separate index-shipper & table manager for each period if `shared_store_type` is not explicity configured. This offers the flexibility to store index in multiple stores (across providers). **Note**: - usage of `shared_store_type` in this commit text refers to one of these config options depending on the index in use: `-boltdb.shipper.shared-store`, `-tsdb.shipper.shared-store` - `shared_store_type` used to default to the `object_store` from the latest `period_config` if not explicitly configured. This PR removes these defaults in favor of supporting index uploads to multiple stores. **Which issue(s) this PR fixes**: Fixes #7276 **Special notes for your reviewer**: All the instances of downloads table manager operate on the same cacheDir. But it shouldn't be a problem as the tableRanges do not overlap across periods. **Checklist** - [X] Reviewed the `CONTRIBUTING.md` guide - [ ] Documentation added - [X] Tests updated - [x] `CHANGELOG.md` updated - [x] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` --------- Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com> Co-authored-by: J Stickler <julie.stickler@grafana.com>
3 years ago
func TestMicroServicesIngestQueryWithSchemaChange(t *testing.T) {
// init the cluster with a single tsdb period. Uses prefix index_tsdb_
clu := cluster.New(nil, cluster.SchemaWithTSDB)
defer func() {
assert.NoError(t, clu.Cleanup())
}()
// initially, run only compactor and distributor.
var (
tCompactor = clu.AddComponent(
"compactor",
"-target=compactor",
"-boltdb.shipper.compactor.compaction-interval=1s",
)
tDistributor = clu.AddComponent(
"distributor",
"-target=distributor",
)
)
require.NoError(t, clu.Run())
// then, run only ingester and query-scheduler.
var (
tIngester = clu.AddComponent(
"ingester",
"-target=ingester",
"-ingester.flush-on-shutdown=true",
)
tQueryScheduler = clu.AddComponent(
"query-scheduler",
"-target=query-scheduler",
"-query-scheduler.use-scheduler-ring=false",
)
)
require.NoError(t, clu.Run())
// finally, run the query-frontend and querier.
var (
tQueryFrontend = clu.AddComponent(
"query-frontend",
"-target=query-frontend",
"-frontend.scheduler-address="+tQueryScheduler.GRPCURL(),
"-frontend.default-validity=0s",
"-common.compactor-address="+tCompactor.HTTPURL(),
)
tQuerier = clu.AddComponent(
"querier",
"-target=querier",
"-querier.scheduler-address="+tQueryScheduler.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
)
)
require.NoError(t, clu.Run())
tenantID := randStringRunes()
now := time.Now()
cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL())
cliDistributor.Now = now
cliIngester := client.New(tenantID, "", tIngester.HTTPURL())
cliIngester.Now = now
cliQueryFrontend := client.New(tenantID, "", tQueryFrontend.HTTPURL())
cliQueryFrontend.Now = now
t.Run("ingest-logs", func(t *testing.T) {
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineA", time.Now().Add(-72*time.Hour), map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineB", time.Now().Add(-48*time.Hour), map[string]string{"job": "fake"}))
})
t.Run("query-lookback-default", func(t *testing.T) {
// queries ingesters with the default lookback period (3h)
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
assert.ElementsMatch(t, []string{}, lines)
})
t.Run("query-lookback-7d", func(t *testing.T) {
tQuerier.AddFlags("-querier.query-ingesters-within=168h")
require.NoError(t, tQuerier.Restart())
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
assert.ElementsMatch(t, []string{"lineA", "lineB"}, lines)
tQuerier.AddFlags("-querier.query-ingesters-within=3h")
require.NoError(t, tQuerier.Restart())
})
t.Run("flush-logs-and-restart-ingester-querier", func(t *testing.T) {
// restart ingester which should flush the chunks and index
require.NoError(t, tIngester.Restart())
// restart querier and index shipper to sync the index
tQuerier.AddFlags("-querier.query-store-only=true")
require.NoError(t, tQuerier.Restart())
})
// Query lines
t.Run("query again to verify logs being served from storage", func(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
assert.ElementsMatch(t, []string{"lineA", "lineB"}, lines)
tQuerier.AddFlags("-querier.query-store-only=false")
require.NoError(t, tQuerier.Restart())
})
// Add new tsdb period with a different index prefix(index_)
clu.ResetSchemaConfig()
cluster.SchemaWithTSDBAndTSDB(clu)
// restart to load the new schema
require.NoError(t, tIngester.Restart())
require.NoError(t, tQuerier.Restart())
t.Run("ingest-logs-new-period", func(t *testing.T) {
// ingest logs to the new period
require.NoError(t, cliDistributor.PushLogLine("lineC", map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("lineD", map[string]string{"job": "fake"}))
})
t.Run("query-both-periods-with-default-lookback", func(t *testing.T) {
// queries with the default lookback period (3h)
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines)
})
t.Run("flush-logs-and-restart-ingester-querier", func(t *testing.T) {
// restart ingester which should flush the chunks and index
require.NoError(t, tIngester.Restart())
// restart querier and index shipper to sync the index
tQuerier.AddFlags("-querier.query-store-only=true")
require.NoError(t, tQuerier.Restart())
})
// Query lines
t.Run("query both periods to verify logs being served from storage", func(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines)
})
}
func TestMicroServicesIngestQueryOverMultipleBucketSingleProvider(t *testing.T) {
index-shipper: add support for multiple stores (#7754) Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com> **What this PR does / why we need it**: Currently loki initializes a single instance of index-shipper to [handle all the table ranges](https://github.com/grafana/loki/blob/ff7b46297345b215fbf49c2cd4c364d125b6290b/pkg/storage/factory.go#L188) (from across periods) for a given index type `boltdb-shipper, tsdb`. Since index-shipper only has the object client handle to the store defined by `shared_store_type`, it limits the index uploads to a single store. Setting `shared_store_type` to a different store at a later point in time would mean losing access to the indexes stored in the previously configured store. With this PR, we initialize a separate index-shipper & table manager for each period if `shared_store_type` is not explicity configured. This offers the flexibility to store index in multiple stores (across providers). **Note**: - usage of `shared_store_type` in this commit text refers to one of these config options depending on the index in use: `-boltdb.shipper.shared-store`, `-tsdb.shipper.shared-store` - `shared_store_type` used to default to the `object_store` from the latest `period_config` if not explicitly configured. This PR removes these defaults in favor of supporting index uploads to multiple stores. **Which issue(s) this PR fixes**: Fixes #7276 **Special notes for your reviewer**: All the instances of downloads table manager operate on the same cacheDir. But it shouldn't be a problem as the tableRanges do not overlap across periods. **Checklist** - [X] Reviewed the `CONTRIBUTING.md` guide - [ ] Documentation added - [X] Tests updated - [x] `CHANGELOG.md` updated - [x] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` --------- Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com> Co-authored-by: J Stickler <julie.stickler@grafana.com>
3 years ago
for name, opt := range map[string]func(c *cluster.Cluster){
"boltdb-index": cluster.SchemaWithBoltDBAndBoltDB,
"tsdb-index": cluster.SchemaWithTSDBAndTSDB,
"boltdb-and-tsdb": cluster.SchemaWithBoltDBAndTSDB,
index-shipper: add support for multiple stores (#7754) Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com> **What this PR does / why we need it**: Currently loki initializes a single instance of index-shipper to [handle all the table ranges](https://github.com/grafana/loki/blob/ff7b46297345b215fbf49c2cd4c364d125b6290b/pkg/storage/factory.go#L188) (from across periods) for a given index type `boltdb-shipper, tsdb`. Since index-shipper only has the object client handle to the store defined by `shared_store_type`, it limits the index uploads to a single store. Setting `shared_store_type` to a different store at a later point in time would mean losing access to the indexes stored in the previously configured store. With this PR, we initialize a separate index-shipper & table manager for each period if `shared_store_type` is not explicity configured. This offers the flexibility to store index in multiple stores (across providers). **Note**: - usage of `shared_store_type` in this commit text refers to one of these config options depending on the index in use: `-boltdb.shipper.shared-store`, `-tsdb.shipper.shared-store` - `shared_store_type` used to default to the `object_store` from the latest `period_config` if not explicitly configured. This PR removes these defaults in favor of supporting index uploads to multiple stores. **Which issue(s) this PR fixes**: Fixes #7276 **Special notes for your reviewer**: All the instances of downloads table manager operate on the same cacheDir. But it shouldn't be a problem as the tableRanges do not overlap across periods. **Checklist** - [X] Reviewed the `CONTRIBUTING.md` guide - [ ] Documentation added - [X] Tests updated - [x] `CHANGELOG.md` updated - [x] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` --------- Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com> Co-authored-by: J Stickler <julie.stickler@grafana.com>
3 years ago
} {
t.Run(name, func(t *testing.T) {
storage.ResetBoltDBIndexClientsWithShipper()
clu := cluster.New(nil, opt)
defer func() {
storage.ResetBoltDBIndexClientsWithShipper()
assert.NoError(t, clu.Cleanup())
}()
// initially, run only compactor and distributor.
var (
tCompactor = clu.AddComponent(
"compactor",
"-target=compactor",
"-boltdb.shipper.compactor.compaction-interval=1s",
)
tDistributor = clu.AddComponent(
"distributor",
"-target=distributor",
)
)
require.NoError(t, clu.Run())
// then, run only ingester and query-scheduler.
var (
tIngester = clu.AddComponent(
"ingester",
"-target=ingester",
"-ingester.flush-on-shutdown=true",
)
tQueryScheduler = clu.AddComponent(
"query-scheduler",
"-target=query-scheduler",
"-query-scheduler.use-scheduler-ring=false",
)
)
require.NoError(t, clu.Run())
// finally, run the query-frontend and querier.
var (
tQueryFrontend = clu.AddComponent(
"query-frontend",
"-target=query-frontend",
"-frontend.scheduler-address="+tQueryScheduler.GRPCURL(),
"-frontend.default-validity=0s",
"-common.compactor-address="+tCompactor.HTTPURL(),
)
tQuerier = clu.AddComponent(
"querier",
"-target=querier",
"-querier.scheduler-address="+tQueryScheduler.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
)
)
require.NoError(t, clu.Run())
tenantID := randStringRunes()
now := time.Now()
cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL())
cliDistributor.Now = now
cliIngester := client.New(tenantID, "", tIngester.HTTPURL())
cliIngester.Now = now
cliQueryFrontend := client.New(tenantID, "", tQueryFrontend.HTTPURL())
cliQueryFrontend.Now = now
t.Run("ingest-logs", func(t *testing.T) {
require.NoError(t, cliDistributor.PushLogLineWithTimestampAndStructuredMetadata("lineA", time.Now().Add(-48*time.Hour), map[string]string{"traceID": "123"}, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestampAndStructuredMetadata("lineB", time.Now().Add(-36*time.Hour), map[string]string{"traceID": "456"}, map[string]string{"job": "fake"}))
index-shipper: add support for multiple stores (#7754) Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com> **What this PR does / why we need it**: Currently loki initializes a single instance of index-shipper to [handle all the table ranges](https://github.com/grafana/loki/blob/ff7b46297345b215fbf49c2cd4c364d125b6290b/pkg/storage/factory.go#L188) (from across periods) for a given index type `boltdb-shipper, tsdb`. Since index-shipper only has the object client handle to the store defined by `shared_store_type`, it limits the index uploads to a single store. Setting `shared_store_type` to a different store at a later point in time would mean losing access to the indexes stored in the previously configured store. With this PR, we initialize a separate index-shipper & table manager for each period if `shared_store_type` is not explicity configured. This offers the flexibility to store index in multiple stores (across providers). **Note**: - usage of `shared_store_type` in this commit text refers to one of these config options depending on the index in use: `-boltdb.shipper.shared-store`, `-tsdb.shipper.shared-store` - `shared_store_type` used to default to the `object_store` from the latest `period_config` if not explicitly configured. This PR removes these defaults in favor of supporting index uploads to multiple stores. **Which issue(s) this PR fixes**: Fixes #7276 **Special notes for your reviewer**: All the instances of downloads table manager operate on the same cacheDir. But it shouldn't be a problem as the tableRanges do not overlap across periods. **Checklist** - [X] Reviewed the `CONTRIBUTING.md` guide - [ ] Documentation added - [X] Tests updated - [x] `CHANGELOG.md` updated - [x] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` --------- Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com> Co-authored-by: J Stickler <julie.stickler@grafana.com>
3 years ago
// ingest logs to the current period
require.NoError(t, cliDistributor.PushLogLineWithStructuredMetadata("lineC", map[string]string{"traceID": "789"}, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithStructuredMetadata("lineD", map[string]string{"traceID": "123"}, map[string]string{"job": "fake"}))
Add metadata to push payload (#9694) **What this PR does / why we need it**: We are adding support for attaching labels to each log line. This is one of the series of the PRs broken up to make it easier to review changes. This PR updates the push payload to send labels with each log entry optionally. The log labels are supposed to be in the same format as the stream labels. Just to put it out, here is how it would look for proto and json push payload with same data: **proto(`endpoint`: `(/loki/api/v1/push|/api/prom/push)`, `Content-Type`: `application/x-protobuf`)**(payload built using [push.Stream](https://github.com/grafana/loki/blob/4cd1246b8830ccc241fa4afff85d208dc6ae2129/pkg/push/types.go#L12)): ``` push.Stream{ Entries: []logproto.Entry{ { Timestamp: time.Unix(0, 1688515200000000000), Line: "log line", Labels: `{foo="bar"}`, }, }, Labels: `{app="test"}`, } ``` **v1(`endpoint`: `/loki/api/v1/push`, `Content-Type`: `application/json`)**: ```json { "streams": [{ "stream": { "app": "test" }, "values": [ ["1688515200000000000", "log line", { "foo": "bar" }] ] }] } ``` **legacy-json(`/api/prom/push`, `Content-Type`: `application/json`)**: ```json { "streams": [{ "labels": "{app=\"test\"}", "entries": [{ "ts": "2023-07-05T00:00:00.000000000Z", "line": "log line", "labels": "{foo=\"bar\"}" }] }] } ``` **Which issue(s) this PR fixes**: **Special notes for your reviewer**: We may need to add more thoughtful tests. **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) --------- Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
2 years ago
index-shipper: add support for multiple stores (#7754) Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com> **What this PR does / why we need it**: Currently loki initializes a single instance of index-shipper to [handle all the table ranges](https://github.com/grafana/loki/blob/ff7b46297345b215fbf49c2cd4c364d125b6290b/pkg/storage/factory.go#L188) (from across periods) for a given index type `boltdb-shipper, tsdb`. Since index-shipper only has the object client handle to the store defined by `shared_store_type`, it limits the index uploads to a single store. Setting `shared_store_type` to a different store at a later point in time would mean losing access to the indexes stored in the previously configured store. With this PR, we initialize a separate index-shipper & table manager for each period if `shared_store_type` is not explicity configured. This offers the flexibility to store index in multiple stores (across providers). **Note**: - usage of `shared_store_type` in this commit text refers to one of these config options depending on the index in use: `-boltdb.shipper.shared-store`, `-tsdb.shipper.shared-store` - `shared_store_type` used to default to the `object_store` from the latest `period_config` if not explicitly configured. This PR removes these defaults in favor of supporting index uploads to multiple stores. **Which issue(s) this PR fixes**: Fixes #7276 **Special notes for your reviewer**: All the instances of downloads table manager operate on the same cacheDir. But it shouldn't be a problem as the tableRanges do not overlap across periods. **Checklist** - [X] Reviewed the `CONTRIBUTING.md` guide - [ ] Documentation added - [X] Tests updated - [x] `CHANGELOG.md` updated - [x] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` --------- Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com> Co-authored-by: J Stickler <julie.stickler@grafana.com>
3 years ago
})
t.Run("query-lookback-default", func(t *testing.T) {
// queries ingesters with the default lookback period (3h)
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
assert.ElementsMatch(t, []string{"lineC", "lineD"}, lines)
})
t.Run("flush-logs-and-restart-ingester-querier", func(t *testing.T) {
// restart ingester which should flush the chunks and index
require.NoError(t, tIngester.Restart())
// restart querier and index shipper to sync the index
storage.ResetBoltDBIndexClientsWithShipper()
tQuerier.AddFlags("-querier.query-store-only=true")
require.NoError(t, tQuerier.Restart())
})
// Query lines
t.Run("query again to verify logs being served from storage", func(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines)
})
})
}
}
query-scheduler: fix query distribution in SSD mode (#9471) **What this PR does / why we need it**: When we run the `query-scheduler` in `ring` mode, `queriers` and `query-frontend` discover the available `query-scheduler` instances using the ring. However, we have a problem when `query-schedulers` are not running in the same process as queriers and query-frontend since [we try to get the ring client interface from the scheduler instance](https://github.com/grafana/loki/blob/abd6131bba18db7f3575241c5e6dc4eed879fbc0/pkg/loki/modules.go#L358). This causes queries not to be spread across all the available queriers when running in SSD mode because [we point querier workers to query frontend when there is no ring client and scheduler address configured](https://github.com/grafana/loki/blob/b05f4fced305800b32641ae84e3bed5f1794fa7d/pkg/querier/worker_service.go#L115). I have fixed this issue by adding a new hidden target to initialize the ring client in `reader`/`member` mode based on which service is initializing it. `reader` mode will be used by `queriers` and `query-frontend` for discovering `query-scheduler` instances from the ring. `member` mode will be used by `query-schedulers` for registering themselves in the ring. I have also made a couple of changes not directly related to the issue but it fixes some problems: * [reset metric registry for each integration test](https://github.com/grafana/loki/commit/18c4fe59078b649ad6a788a48765b101d0b97618) - Previously we were reusing the same registry for all the tests and just [ignored the attempts to register same metrics](https://github.com/grafana/loki/blob/01f0ded7fcb57e3a7b26ffc1e8e3abf04a403825/integration/cluster/cluster.go#L113). This causes the registry to have metrics registered only from the first test so any updates from subsequent tests won't reflect in the metrics. metrics was the only reliable way for me to verify that `query-schedulers` were connected to `queriers` and `query-frontend` when running in ring mode in the integration test that I added to test my changes. This should also help with other tests where earlier it was hard to reliably check the metrics. * [load config from cli as well before applying dynamic config](https://github.com/grafana/loki/commit/f9e2448fc7e718db107165cd908054c806b84337) - Previously we were applying dynamic config considering just the config from config file. This results in unexpected config changes, for example, [this config change](https://github.com/grafana/loki/blob/4148dd2c51cb827ec3889298508b95ec7731e7fd/integration/loki_micro_services_test.go#L66) was getting ignored and [dynamic config tuning was unexpectedly turning on ring mode](https://github.com/grafana/loki/blob/52cd0a39b8266564352c61ab9b845ab597008770/pkg/loki/config_wrapper.go#L94) in the config. It is better to do any config tuning based on both file and cli args configs. **Which issue(s) this PR fixes**: Fixes #9195
3 years ago
func TestSchedulerRing(t *testing.T) {
clu := cluster.New(nil)
defer func() {
assert.NoError(t, clu.Cleanup())
}()
// run initially the compactor, indexgateway, and distributor.
var (
tCompactor = clu.AddComponent(
"compactor",
"-target=compactor",
"-boltdb.shipper.compactor.compaction-interval=1s",
"-boltdb.shipper.compactor.retention-delete-delay=1s",
// By default, a minute is added to the delete request start time. This compensates for that.
"-boltdb.shipper.compactor.delete-request-cancel-period=-60s",
"-compactor.deletion-mode=filter-and-delete",
)
tIndexGateway = clu.AddComponent(
"index-gateway",
"-target=index-gateway",
)
tDistributor = clu.AddComponent(
"distributor",
"-target=distributor",
)
)
require.NoError(t, clu.Run())
// then, run only the ingester and query scheduler.
var (
tIngester = clu.AddComponent(
"ingester",
"-target=ingester",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
tQueryScheduler = clu.AddComponent(
"query-scheduler",
"-target=query-scheduler",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
"-query-scheduler.use-scheduler-ring=true",
)
)
require.NoError(t, clu.Run())
// finally, run the query-frontend and querier.
var (
tQueryFrontend = clu.AddComponent(
"query-frontend",
"-target=query-frontend",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
"-querier.per-request-limits-enabled=true",
"-query-scheduler.use-scheduler-ring=true",
"-frontend.scheduler-worker-concurrency=5",
)
_ = clu.AddComponent(
"querier",
"-target=querier",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
"-query-scheduler.use-scheduler-ring=true",
"-querier.max-concurrent=4",
)
)
require.NoError(t, clu.Run())
tenantID := randStringRunes()
now := time.Now()
cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL())
cliDistributor.Now = now
cliIngester := client.New(tenantID, "", tIngester.HTTPURL())
cliIngester.Now = now
cliQueryFrontend := client.New(tenantID, "", tQueryFrontend.HTTPURL())
cliQueryFrontend.Now = now
cliQueryScheduler := client.New(tenantID, "", tQueryScheduler.HTTPURL())
cliQueryScheduler.Now = now
t.Run("verify-scheduler-connections", func(t *testing.T) {
require.Eventually(t, func() bool {
// Check metrics to see if query scheduler is connected with query-frontend
metrics, err := cliQueryScheduler.Metrics()
require.NoError(t, err)
return getMetricValue(t, "cortex_query_scheduler_connected_frontend_clients", metrics) == 5
}, 5*time.Second, 500*time.Millisecond)
require.Eventually(t, func() bool {
// Check metrics to see if query scheduler is connected with query-frontend
metrics, err := cliQueryScheduler.Metrics()
require.NoError(t, err)
return getMetricValue(t, "cortex_query_scheduler_connected_querier_clients", metrics) == 4
}, 5*time.Second, 500*time.Millisecond)
})
t.Run("ingest-logs", func(t *testing.T) {
// ingest some log lines
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineA", now.Add(-45*time.Minute), map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineB", now.Add(-45*time.Minute), map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("lineC", map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("lineD", map[string]string{"job": "fake"}))
})
t.Run("query", func(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines)
})
}
func TestQueryTSDB_WithCachedPostings(t *testing.T) {
clu := cluster.New(nil, cluster.SchemaWithTSDB)
defer func() {
assert.NoError(t, clu.Cleanup())
}()
var (
tDistributor = clu.AddComponent(
"distributor",
"-target=distributor",
)
tIndexGateway = clu.AddComponent(
"index-gateway",
"-target=index-gateway",
"-tsdb.enable-postings-cache=true",
"-store.index-cache-read.embedded-cache.enabled=true",
)
)
require.NoError(t, clu.Run())
var (
tIngester = clu.AddComponent(
"ingester",
"-target=ingester",
"-ingester.flush-on-shutdown=true",
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
tQueryScheduler = clu.AddComponent(
"query-scheduler",
"-target=query-scheduler",
"-query-scheduler.use-scheduler-ring=false",
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
tCompactor = clu.AddComponent(
"compactor",
"-target=compactor",
"-boltdb.shipper.compactor.compaction-interval=1s",
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
)
require.NoError(t, clu.Run())
// finally, run the query-frontend and querier.
var (
tQueryFrontend = clu.AddComponent(
"query-frontend",
"-target=query-frontend",
"-frontend.scheduler-address="+tQueryScheduler.GRPCURL(),
"-frontend.default-validity=0s",
"-common.compactor-address="+tCompactor.HTTPURL(),
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
_ = clu.AddComponent(
"querier",
"-target=querier",
"-querier.scheduler-address="+tQueryScheduler.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
)
require.NoError(t, clu.Run())
tenantID := randStringRunes()
now := time.Now()
cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL())
cliDistributor.Now = now
cliIngester := client.New(tenantID, "", tIngester.HTTPURL())
cliIngester.Now = now
cliQueryFrontend := client.New(tenantID, "", tQueryFrontend.HTTPURL())
cliQueryFrontend.Now = now
cliIndexGateway := client.New(tenantID, "", tIndexGateway.HTTPURL())
cliIndexGateway.Now = now
// initial cache state.
igwMetrics, err := cliIndexGateway.Metrics()
require.NoError(t, err)
assertCacheState(t, igwMetrics, &expectedCacheState{
cacheName: "store.index-cache-read.embedded-cache",
gets: 0,
misses: 0,
added: 0,
})
t.Run("ingest-logs", func(t *testing.T) {
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineA", time.Now().Add(-72*time.Hour), map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineB", time.Now().Add(-48*time.Hour), map[string]string{"job": "fake"}))
})
// restart ingester which should flush the chunks and index
require.NoError(t, tIngester.Restart())
// Query lines
t.Run("query to verify logs being served from storage", func(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
assert.ElementsMatch(t, []string{"lineA", "lineB"}, lines)
})
igwMetrics, err = cliIndexGateway.Metrics()
require.NoError(t, err)
assertCacheState(t, igwMetrics, &expectedCacheState{
cacheName: "store.index-cache-read.embedded-cache",
gets: 50,
misses: 1,
added: 1,
})
// ingest logs with ts=now.
require.NoError(t, cliDistributor.PushLogLine("lineC", map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("lineD", map[string]string{"job": "fake"}))
// default length is 7 days.
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
// expect lines from both, ingesters memory and from the store.
assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines)
}
func getValueFromMF(mf *dto.MetricFamily, lbs []*dto.LabelPair) float64 {
for _, m := range mf.Metric {
if !assert.ObjectsAreEqualValues(lbs, m.GetLabel()) {
continue
}
return m.Counter.GetValue()
}
return 0
}
func assertCacheState(t *testing.T, metrics string, e *expectedCacheState) {
var parser expfmt.TextParser
mfs, err := parser.TextToMetricFamilies(strings.NewReader(metrics))
require.NoError(t, err)
lbs := []*dto.LabelPair{
{
Name: proto.String("cache"),
Value: proto.String(e.cacheName),
},
}
mf, found := mfs["querier_cache_added_new_total"]
require.True(t, found)
require.Equal(t, e.added, getValueFromMF(mf, lbs))
mf, found = mfs["querier_cache_gets_total"]
require.True(t, found)
require.Equal(t, e.gets, getValueFromMF(mf, lbs))
mf, found = mfs["querier_cache_misses_total"]
require.True(t, found)
require.Equal(t, e.misses, getValueFromMF(mf, lbs))
}
type expectedCacheState struct {
cacheName string
gets float64
misses float64
added float64
}