Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/engine/executor/limit.go

58 lines
1.4 KiB

package executor
chore(engine): Use shard information from query frontend in the new query engine (#17792) **What this PR does / why we need it**: This PR introduces the sharding to the new query engine to be able to test the engine using the exsiting Loki architecture with query frontend, query scheduler, and queriers. **Note, this is only an interim solution used for validating and testing.** By design, the first phase of the query engine implementation does only local execution of queries without sharding or time-based splitting. However, for testing the results of this first phase, we can utilise dual-ingestion (writing both chunks/tsdb and data objects in a way that sharding and splitting is performed in the query frontend using the existing middlewares. On the queriers themselves, the sub-queries are parsed and planned using the new logical and physical planner. If the query can successfully be planned, it will be executed by the new engine, otherwise it falls back to the old engine. **Shard size considerations** While performing time-splitting in the frontend works for data objects as well, sharding by information from TSDB is not directly mappable to data objects. The default target shard size in TSDB is 600MB (decompressed), whereas target size of data objects is 1GB compressed or roughly 10-15GB uncompressed. However, individual logs sections of a data object have a target size of 128MB, which is roughly 0.9-1.2GB. That is 1.5-2x larger than the TSDB target shard size. So when using the sharding calculation from TSDB, it would over-shard for data object sections, which is likely acceptable for testing and good enough for proving that local execution with the new engine works. **How does sharding with data objects in this PR work?** The query frontend passes down the calculated shards as part of the query parameters of the serialised sub-request. The logical planner on the querier stores the Shard annotation on the `MAKETABLE` alongside the stream selector. This is then used by the physical planner to filter out only the relevant sections of the resolved data objects from the metastore lookup. During exeuction, only readers for the relevant sections are initialised when performing the `DataObjScan`. Signed-off-by: Christian Haudum <christian.haudum@gmail.com> Co-authored-by: Ashwanth <iamashwanth@gmail.com>
4 days ago
import (
"github.com/apache/arrow-go/v18/arrow"
)
func NewLimitPipeline(input Pipeline, skip, fetch uint32) *GenericPipeline {
// We gradually reduce offsetRemaining and limitRemaining as we process more records, as the
// offsetRemaining and limitRemaining may cross record boundaries.
var (
offsetRemaining = int64(skip)
limitRemaining = int64(fetch)
)
return newGenericPipeline(Local, func(inputs []Pipeline) state {
var length int64
var start, end int64
var batch arrow.Record
// We skip yielding zero-length batches while offsetRemainig > 0
for length == 0 {
// Stop once we reached the limit
if limitRemaining <= 0 {
return Exhausted
}
chore(engine): Use shard information from query frontend in the new query engine (#17792) **What this PR does / why we need it**: This PR introduces the sharding to the new query engine to be able to test the engine using the exsiting Loki architecture with query frontend, query scheduler, and queriers. **Note, this is only an interim solution used for validating and testing.** By design, the first phase of the query engine implementation does only local execution of queries without sharding or time-based splitting. However, for testing the results of this first phase, we can utilise dual-ingestion (writing both chunks/tsdb and data objects in a way that sharding and splitting is performed in the query frontend using the existing middlewares. On the queriers themselves, the sub-queries are parsed and planned using the new logical and physical planner. If the query can successfully be planned, it will be executed by the new engine, otherwise it falls back to the old engine. **Shard size considerations** While performing time-splitting in the frontend works for data objects as well, sharding by information from TSDB is not directly mappable to data objects. The default target shard size in TSDB is 600MB (decompressed), whereas target size of data objects is 1GB compressed or roughly 10-15GB uncompressed. However, individual logs sections of a data object have a target size of 128MB, which is roughly 0.9-1.2GB. That is 1.5-2x larger than the TSDB target shard size. So when using the sharding calculation from TSDB, it would over-shard for data object sections, which is likely acceptable for testing and good enough for proving that local execution with the new engine works. **How does sharding with data objects in this PR work?** The query frontend passes down the calculated shards as part of the query parameters of the serialised sub-request. The logical planner on the querier stores the Shard annotation on the `MAKETABLE` alongside the stream selector. This is then used by the physical planner to filter out only the relevant sections of the resolved data objects from the metastore lookup. During exeuction, only readers for the relevant sections are initialised when performing the `DataObjScan`. Signed-off-by: Christian Haudum <christian.haudum@gmail.com> Co-authored-by: Ashwanth <iamashwanth@gmail.com>
4 days ago
// Pull the next item from input
input := inputs[0]
err := input.Read()
if err != nil {
return failureState(err)
}
batch, _ = input.Value()
// We want to slice batch so it only contains the rows we're looking for
// accounting for both the limit and offset.
// We constrain the start and end to be within the bounds of the record.
start = min(offsetRemaining, batch.NumRows())
end = min(start+limitRemaining, batch.NumRows())
length = end - start
offsetRemaining -= start
limitRemaining -= length
}
if length <= 0 && offsetRemaining <= 0 {
return Exhausted
}
chore(engine): Use shard information from query frontend in the new query engine (#17792) **What this PR does / why we need it**: This PR introduces the sharding to the new query engine to be able to test the engine using the exsiting Loki architecture with query frontend, query scheduler, and queriers. **Note, this is only an interim solution used for validating and testing.** By design, the first phase of the query engine implementation does only local execution of queries without sharding or time-based splitting. However, for testing the results of this first phase, we can utilise dual-ingestion (writing both chunks/tsdb and data objects in a way that sharding and splitting is performed in the query frontend using the existing middlewares. On the queriers themselves, the sub-queries are parsed and planned using the new logical and physical planner. If the query can successfully be planned, it will be executed by the new engine, otherwise it falls back to the old engine. **Shard size considerations** While performing time-splitting in the frontend works for data objects as well, sharding by information from TSDB is not directly mappable to data objects. The default target shard size in TSDB is 600MB (decompressed), whereas target size of data objects is 1GB compressed or roughly 10-15GB uncompressed. However, individual logs sections of a data object have a target size of 128MB, which is roughly 0.9-1.2GB. That is 1.5-2x larger than the TSDB target shard size. So when using the sharding calculation from TSDB, it would over-shard for data object sections, which is likely acceptable for testing and good enough for proving that local execution with the new engine works. **How does sharding with data objects in this PR work?** The query frontend passes down the calculated shards as part of the query parameters of the serialised sub-request. The logical planner on the querier stores the Shard annotation on the `MAKETABLE` alongside the stream selector. This is then used by the physical planner to filter out only the relevant sections of the resolved data objects from the metastore lookup. During exeuction, only readers for the relevant sections are initialised when performing the `DataObjScan`. Signed-off-by: Christian Haudum <christian.haudum@gmail.com> Co-authored-by: Ashwanth <iamashwanth@gmail.com>
4 days ago
if batch.NumRows() == 0 {
return successState(batch)
}
rec := batch.NewSlice(start, end)
return successState(rec)
}, input)
}