Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/engine/executor/dataobjscan.go

574 lines
17 KiB

package executor
import (
"cmp"
"context"
"errors"
"fmt"
"io"
chore(engine): Use shard information from query frontend in the new query engine (#17792) **What this PR does / why we need it**: This PR introduces the sharding to the new query engine to be able to test the engine using the exsiting Loki architecture with query frontend, query scheduler, and queriers. **Note, this is only an interim solution used for validating and testing.** By design, the first phase of the query engine implementation does only local execution of queries without sharding or time-based splitting. However, for testing the results of this first phase, we can utilise dual-ingestion (writing both chunks/tsdb and data objects in a way that sharding and splitting is performed in the query frontend using the existing middlewares. On the queriers themselves, the sub-queries are parsed and planned using the new logical and physical planner. If the query can successfully be planned, it will be executed by the new engine, otherwise it falls back to the old engine. **Shard size considerations** While performing time-splitting in the frontend works for data objects as well, sharding by information from TSDB is not directly mappable to data objects. The default target shard size in TSDB is 600MB (decompressed), whereas target size of data objects is 1GB compressed or roughly 10-15GB uncompressed. However, individual logs sections of a data object have a target size of 128MB, which is roughly 0.9-1.2GB. That is 1.5-2x larger than the TSDB target shard size. So when using the sharding calculation from TSDB, it would over-shard for data object sections, which is likely acceptable for testing and good enough for proving that local execution with the new engine works. **How does sharding with data objects in this PR work?** The query frontend passes down the calculated shards as part of the query parameters of the serialised sub-request. The logical planner on the querier stores the Shard annotation on the `MAKETABLE` alongside the stream selector. This is then used by the physical planner to filter out only the relevant sections of the resolved data objects from the metastore lookup. During exeuction, only readers for the relevant sections are initialised when performing the `DataObjScan`. Signed-off-by: Christian Haudum <christian.haudum@gmail.com> Co-authored-by: Ashwanth <iamashwanth@gmail.com>
4 days ago
"runtime"
"slices"
"sync"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/prometheus/prometheus/model/labels"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
"github.com/grafana/loki/v3/pkg/dataobj"
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
1 month ago
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
"github.com/grafana/loki/v3/pkg/engine/internal/datatype"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
"github.com/grafana/loki/v3/pkg/util/topk"
)
type dataobjScan struct {
ctx context.Context
opts dataobjScanOptions
initialized bool
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
1 month ago
readers []*logs.RowReader
streams map[int64]labels.Labels
state state
}
type dataobjScanOptions struct {
// TODO(rfratto): Limiting each DataObjScan to a single section is going to
// be critical for limiting memory overhead here; the section is intended to
// be the smallest unit of parallelization.
Object *dataobj.Object // Object to read from.
StreamIDs []int64 // Stream IDs to match from logs sections.
chore(engine): Use shard information from query frontend in the new query engine (#17792) **What this PR does / why we need it**: This PR introduces the sharding to the new query engine to be able to test the engine using the exsiting Loki architecture with query frontend, query scheduler, and queriers. **Note, this is only an interim solution used for validating and testing.** By design, the first phase of the query engine implementation does only local execution of queries without sharding or time-based splitting. However, for testing the results of this first phase, we can utilise dual-ingestion (writing both chunks/tsdb and data objects in a way that sharding and splitting is performed in the query frontend using the existing middlewares. On the queriers themselves, the sub-queries are parsed and planned using the new logical and physical planner. If the query can successfully be planned, it will be executed by the new engine, otherwise it falls back to the old engine. **Shard size considerations** While performing time-splitting in the frontend works for data objects as well, sharding by information from TSDB is not directly mappable to data objects. The default target shard size in TSDB is 600MB (decompressed), whereas target size of data objects is 1GB compressed or roughly 10-15GB uncompressed. However, individual logs sections of a data object have a target size of 128MB, which is roughly 0.9-1.2GB. That is 1.5-2x larger than the TSDB target shard size. So when using the sharding calculation from TSDB, it would over-shard for data object sections, which is likely acceptable for testing and good enough for proving that local execution with the new engine works. **How does sharding with data objects in this PR work?** The query frontend passes down the calculated shards as part of the query parameters of the serialised sub-request. The logical planner on the querier stores the Shard annotation on the `MAKETABLE` alongside the stream selector. This is then used by the physical planner to filter out only the relevant sections of the resolved data objects from the metastore lookup. During exeuction, only readers for the relevant sections are initialised when performing the `DataObjScan`. Signed-off-by: Christian Haudum <christian.haudum@gmail.com> Co-authored-by: Ashwanth <iamashwanth@gmail.com>
4 days ago
Sections []int // Logs sections to fetch.
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
1 month ago
Predicates []logs.RowPredicate // Predicate to apply to the logs.
Projections []physical.ColumnExpression // Columns to include. An empty slice means all columns.
Direction physical.SortOrder // Order of timestamps to return (ASC=Forward, DESC=Backward)
Limit uint32 // A limit on the number of rows to return (0=unlimited).
}
var _ Pipeline = (*dataobjScan)(nil)
// newDataobjScanPipeline creates a new Pipeline which emits a single
// [arrow.Record] composed of all log sections in a data object. Rows in the
// returned record are ordered by timestamp in the direction specified by
// opts.Direction.
func newDataobjScanPipeline(ctx context.Context, opts dataobjScanOptions) *dataobjScan {
return &dataobjScan{ctx: ctx, opts: opts}
}
// Read retrieves the next [arrow.Record] from the dataobj.
func (s *dataobjScan) Read() error {
if err := s.init(); err != nil {
return err
}
rec, err := s.read()
s.state = newState(rec, err)
if err != nil {
return fmt.Errorf("reading data object: %w", err)
}
return nil
}
func (s *dataobjScan) init() error {
if s.initialized {
return nil
}
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
1 month ago
if err := s.initStreams(); err != nil {
return fmt.Errorf("initializing streams: %w", err)
}
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
1 month ago
s.readers = nil
chore(engine): Use shard information from query frontend in the new query engine (#17792) **What this PR does / why we need it**: This PR introduces the sharding to the new query engine to be able to test the engine using the exsiting Loki architecture with query frontend, query scheduler, and queriers. **Note, this is only an interim solution used for validating and testing.** By design, the first phase of the query engine implementation does only local execution of queries without sharding or time-based splitting. However, for testing the results of this first phase, we can utilise dual-ingestion (writing both chunks/tsdb and data objects in a way that sharding and splitting is performed in the query frontend using the existing middlewares. On the queriers themselves, the sub-queries are parsed and planned using the new logical and physical planner. If the query can successfully be planned, it will be executed by the new engine, otherwise it falls back to the old engine. **Shard size considerations** While performing time-splitting in the frontend works for data objects as well, sharding by information from TSDB is not directly mappable to data objects. The default target shard size in TSDB is 600MB (decompressed), whereas target size of data objects is 1GB compressed or roughly 10-15GB uncompressed. However, individual logs sections of a data object have a target size of 128MB, which is roughly 0.9-1.2GB. That is 1.5-2x larger than the TSDB target shard size. So when using the sharding calculation from TSDB, it would over-shard for data object sections, which is likely acceptable for testing and good enough for proving that local execution with the new engine works. **How does sharding with data objects in this PR work?** The query frontend passes down the calculated shards as part of the query parameters of the serialised sub-request. The logical planner on the querier stores the Shard annotation on the `MAKETABLE` alongside the stream selector. This is then used by the physical planner to filter out only the relevant sections of the resolved data objects from the metastore lookup. During exeuction, only readers for the relevant sections are initialised when performing the `DataObjScan`. Signed-off-by: Christian Haudum <christian.haudum@gmail.com> Co-authored-by: Ashwanth <iamashwanth@gmail.com>
4 days ago
for idx, section := range s.opts.Object.Sections().Filter(logs.CheckSection) {
// Filter out sections that are not part of this shard
if !slices.Contains(s.opts.Sections, idx) {
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
1 month ago
continue
}
sec, err := logs.Open(s.ctx, section)
if err != nil {
return fmt.Errorf("opening logs section: %w", err)
}
// TODO(rfratto): There's a few problems with using LogsReader as it is:
//
// 1. LogsReader doesn't support providing a subset of columns to read
// from, so we're applying projections after reading.
//
// 2. LogsReader is intended to be pooled to reduce memory, but we're
// creating a new one every time here.
//
// For the sake of the initial implementation I'm ignoring these issues,
// but we'll absolutely need to solve this prior to production use.
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
1 month ago
lr := logs.NewRowReader(sec)
// The calls below can't fail because we're always using a brand new logs
// reader.
_ = lr.MatchStreams(slices.Values(s.opts.StreamIDs))
_ = lr.SetPredicates(s.opts.Predicates)
s.readers = append(s.readers, lr)
}
s.initialized = true
return nil
}
// initStreams retrieves all requested stream records from streams sections so
// that emitted [arrow.Record]s can include stream labels in results.
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
1 month ago
func (s *dataobjScan) initStreams() error {
var sr streams.RowReader
chore(engine): Use shard information from query frontend in the new query engine (#17792) **What this PR does / why we need it**: This PR introduces the sharding to the new query engine to be able to test the engine using the exsiting Loki architecture with query frontend, query scheduler, and queriers. **Note, this is only an interim solution used for validating and testing.** By design, the first phase of the query engine implementation does only local execution of queries without sharding or time-based splitting. However, for testing the results of this first phase, we can utilise dual-ingestion (writing both chunks/tsdb and data objects in a way that sharding and splitting is performed in the query frontend using the existing middlewares. On the queriers themselves, the sub-queries are parsed and planned using the new logical and physical planner. If the query can successfully be planned, it will be executed by the new engine, otherwise it falls back to the old engine. **Shard size considerations** While performing time-splitting in the frontend works for data objects as well, sharding by information from TSDB is not directly mappable to data objects. The default target shard size in TSDB is 600MB (decompressed), whereas target size of data objects is 1GB compressed or roughly 10-15GB uncompressed. However, individual logs sections of a data object have a target size of 128MB, which is roughly 0.9-1.2GB. That is 1.5-2x larger than the TSDB target shard size. So when using the sharding calculation from TSDB, it would over-shard for data object sections, which is likely acceptable for testing and good enough for proving that local execution with the new engine works. **How does sharding with data objects in this PR work?** The query frontend passes down the calculated shards as part of the query parameters of the serialised sub-request. The logical planner on the querier stores the Shard annotation on the `MAKETABLE` alongside the stream selector. This is then used by the physical planner to filter out only the relevant sections of the resolved data objects from the metastore lookup. During exeuction, only readers for the relevant sections are initialised when performing the `DataObjScan`. Signed-off-by: Christian Haudum <christian.haudum@gmail.com> Co-authored-by: Ashwanth <iamashwanth@gmail.com>
4 days ago
defer sr.Close()
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
1 month ago
streamsBuf := make([]streams.Stream, 512)
// Initialize entries in the map so we can do a presence test in the loop
// below.
s.streams = make(map[int64]labels.Labels, len(s.opts.StreamIDs))
for _, id := range s.opts.StreamIDs {
s.streams[id] = labels.EmptyLabels()
}
chore(engine): Use shard information from query frontend in the new query engine (#17792) **What this PR does / why we need it**: This PR introduces the sharding to the new query engine to be able to test the engine using the exsiting Loki architecture with query frontend, query scheduler, and queriers. **Note, this is only an interim solution used for validating and testing.** By design, the first phase of the query engine implementation does only local execution of queries without sharding or time-based splitting. However, for testing the results of this first phase, we can utilise dual-ingestion (writing both chunks/tsdb and data objects in a way that sharding and splitting is performed in the query frontend using the existing middlewares. On the queriers themselves, the sub-queries are parsed and planned using the new logical and physical planner. If the query can successfully be planned, it will be executed by the new engine, otherwise it falls back to the old engine. **Shard size considerations** While performing time-splitting in the frontend works for data objects as well, sharding by information from TSDB is not directly mappable to data objects. The default target shard size in TSDB is 600MB (decompressed), whereas target size of data objects is 1GB compressed or roughly 10-15GB uncompressed. However, individual logs sections of a data object have a target size of 128MB, which is roughly 0.9-1.2GB. That is 1.5-2x larger than the TSDB target shard size. So when using the sharding calculation from TSDB, it would over-shard for data object sections, which is likely acceptable for testing and good enough for proving that local execution with the new engine works. **How does sharding with data objects in this PR work?** The query frontend passes down the calculated shards as part of the query parameters of the serialised sub-request. The logical planner on the querier stores the Shard annotation on the `MAKETABLE` alongside the stream selector. This is then used by the physical planner to filter out only the relevant sections of the resolved data objects from the metastore lookup. During exeuction, only readers for the relevant sections are initialised when performing the `DataObjScan`. Signed-off-by: Christian Haudum <christian.haudum@gmail.com> Co-authored-by: Ashwanth <iamashwanth@gmail.com>
4 days ago
for _, section := range s.opts.Object.Sections().Filter(streams.CheckSection) {
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
1 month ago
sec, err := streams.Open(s.ctx, section)
if err != nil {
return fmt.Errorf("opening streams section: %w", err)
}
// TODO(rfratto): dataobj.StreamsPredicate is missing support for filtering
// on stream IDs when we already know them in advance; this can cause the
// Read here to take longer than it needs to since we're reading the
// entirety of every row.
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
1 month ago
sr.Reset(sec)
for {
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
1 month ago
n, err := sr.Read(s.ctx, streamsBuf)
if n == 0 && errors.Is(err, io.EOF) {
return nil
} else if err != nil && !errors.Is(err, io.EOF) {
return err
}
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
1 month ago
for i, stream := range streamsBuf[:n] {
if _, found := s.streams[stream.ID]; !found {
continue
}
s.streams[stream.ID] = stream.Labels
// Zero out the stream entry from the slice so the next call to sr.Read
// doesn't overwrite any memory we just moved to s.streams.
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
1 month ago
streamsBuf[i] = streams.Stream{}
}
}
}
// Check that all streams were populated.
var errs []error
for id, labels := range s.streams {
if labels.IsEmpty() {
errs = append(errs, fmt.Errorf("requested stream ID %d not found in any stream section", id))
}
}
return errors.Join(errs...)
}
// read reads the entire data object into memory and generates an arrow.Record
// from the data. It returns an error upon encountering an error while reading
// one of the sections.
func (s *dataobjScan) read() (arrow.Record, error) {
// Since [physical.DataObjScan] requires that:
//
// * Records are ordered by timestamp, and
// * Records from the same dataobjScan do not overlap in time
//
// we *must* read the entire data object before creating a record, as the
// sections in the dataobj itself are not already sorted by timestamp (though
// we only need to keep up to Limit rows in memory).
var (
heapMut sync.Mutex
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
1 month ago
heap = topk.Heap[logs.Record]{
Limit: int(s.opts.Limit),
Less: s.getLessFunc(s.opts.Direction),
}
)
g, ctx := errgroup.WithContext(s.ctx)
chore(engine): Use shard information from query frontend in the new query engine (#17792) **What this PR does / why we need it**: This PR introduces the sharding to the new query engine to be able to test the engine using the exsiting Loki architecture with query frontend, query scheduler, and queriers. **Note, this is only an interim solution used for validating and testing.** By design, the first phase of the query engine implementation does only local execution of queries without sharding or time-based splitting. However, for testing the results of this first phase, we can utilise dual-ingestion (writing both chunks/tsdb and data objects in a way that sharding and splitting is performed in the query frontend using the existing middlewares. On the queriers themselves, the sub-queries are parsed and planned using the new logical and physical planner. If the query can successfully be planned, it will be executed by the new engine, otherwise it falls back to the old engine. **Shard size considerations** While performing time-splitting in the frontend works for data objects as well, sharding by information from TSDB is not directly mappable to data objects. The default target shard size in TSDB is 600MB (decompressed), whereas target size of data objects is 1GB compressed or roughly 10-15GB uncompressed. However, individual logs sections of a data object have a target size of 128MB, which is roughly 0.9-1.2GB. That is 1.5-2x larger than the TSDB target shard size. So when using the sharding calculation from TSDB, it would over-shard for data object sections, which is likely acceptable for testing and good enough for proving that local execution with the new engine works. **How does sharding with data objects in this PR work?** The query frontend passes down the calculated shards as part of the query parameters of the serialised sub-request. The logical planner on the querier stores the Shard annotation on the `MAKETABLE` alongside the stream selector. This is then used by the physical planner to filter out only the relevant sections of the resolved data objects from the metastore lookup. During exeuction, only readers for the relevant sections are initialised when performing the `DataObjScan`. Signed-off-by: Christian Haudum <christian.haudum@gmail.com> Co-authored-by: Ashwanth <iamashwanth@gmail.com>
4 days ago
g.SetLimit(max(runtime.GOMAXPROCS(0)/2, 1))
var gotData atomic.Bool
for _, reader := range s.readers {
g.Go(func() error {
for {
chore(engine): Use shard information from query frontend in the new query engine (#17792) **What this PR does / why we need it**: This PR introduces the sharding to the new query engine to be able to test the engine using the exsiting Loki architecture with query frontend, query scheduler, and queriers. **Note, this is only an interim solution used for validating and testing.** By design, the first phase of the query engine implementation does only local execution of queries without sharding or time-based splitting. However, for testing the results of this first phase, we can utilise dual-ingestion (writing both chunks/tsdb and data objects in a way that sharding and splitting is performed in the query frontend using the existing middlewares. On the queriers themselves, the sub-queries are parsed and planned using the new logical and physical planner. If the query can successfully be planned, it will be executed by the new engine, otherwise it falls back to the old engine. **Shard size considerations** While performing time-splitting in the frontend works for data objects as well, sharding by information from TSDB is not directly mappable to data objects. The default target shard size in TSDB is 600MB (decompressed), whereas target size of data objects is 1GB compressed or roughly 10-15GB uncompressed. However, individual logs sections of a data object have a target size of 128MB, which is roughly 0.9-1.2GB. That is 1.5-2x larger than the TSDB target shard size. So when using the sharding calculation from TSDB, it would over-shard for data object sections, which is likely acceptable for testing and good enough for proving that local execution with the new engine works. **How does sharding with data objects in this PR work?** The query frontend passes down the calculated shards as part of the query parameters of the serialised sub-request. The logical planner on the querier stores the Shard annotation on the `MAKETABLE` alongside the stream selector. This is then used by the physical planner to filter out only the relevant sections of the resolved data objects from the metastore lookup. During exeuction, only readers for the relevant sections are initialised when performing the `DataObjScan`. Signed-off-by: Christian Haudum <christian.haudum@gmail.com> Co-authored-by: Ashwanth <iamashwanth@gmail.com>
4 days ago
buf := make([]logs.Record, 1024) // do not re-use buffer
n, err := reader.Read(ctx, buf)
if n == 0 && errors.Is(err, io.EOF) {
return nil
} else if err != nil && !errors.Is(err, io.EOF) {
return err
}
gotData.Store(true)
heapMut.Lock()
for _, rec := range buf[:n] {
heap.Push(rec)
}
heapMut.Unlock()
}
})
}
if err := g.Wait(); err != nil {
return nil, err
} else if !gotData.Load() {
return nil, EOF
}
projections, err := s.effectiveProjections(&heap)
if err != nil {
return nil, fmt.Errorf("getting effective projections: %w", err)
}
schema, err := schemaFromColumns(projections)
if err != nil {
return nil, fmt.Errorf("creating schema: %w", err)
}
// TODO(rfratto): pass allocator to builder
rb := array.NewRecordBuilder(memory.NewGoAllocator(), schema)
defer rb.Release()
records := heap.PopAll()
slices.Reverse(records)
for _, record := range records {
for i := 0; i < schema.NumFields(); i++ {
field, builder := rb.Schema().Field(i), rb.Field(i)
s.appendToBuilder(builder, &field, &record)
}
}
return rb.NewRecord(), nil
}
// getLessFunc returns a "less comparison" function for records for the sort heap.
// direction determines the search order:
// BACKWARD is a backward search starting at the end of the time range.
// FORWARD is a forward search starting at the beginning of the time range.
// If two records have the same timestamp, the compareStreams function is used to determine the sort order.
func (s *dataobjScan) getLessFunc(direction physical.SortOrder) func(a, b logs.Record) bool {
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
1 month ago
compareStreams := func(a, b logs.Record) bool {
aStream, ok := s.streams[a.StreamID]
if !ok {
return false
}
bStream, ok := s.streams[b.StreamID]
if !ok {
return true
}
return labels.Compare(aStream, bStream) < 0
}
switch direction {
case physical.ASC:
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
1 month ago
return func(a, b logs.Record) bool {
if a.Timestamp.Equal(b.Timestamp) {
compareStreams(a, b)
}
return a.Timestamp.After(b.Timestamp)
}
case physical.DESC:
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
1 month ago
return func(a, b logs.Record) bool {
if a.Timestamp.Equal(b.Timestamp) {
compareStreams(a, b)
}
return a.Timestamp.Before(b.Timestamp)
}
default:
panic("invalid direction")
}
}
// effectiveProjections returns the effective projections to return for a
// record. If s.opts.Projections is non-empty, then its column expressions are
// used for the projections.
//
// Otherwise, the set of all columns found in the heap are used, in order of:
//
// * All stream labels (sorted by name)
// * All metadata columns (sorted by name)
// * Log timestamp
// * Log message
//
// effectiveProjections does not mutate h.
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
1 month ago
func (s *dataobjScan) effectiveProjections(h *topk.Heap[logs.Record]) ([]physical.ColumnExpression, error) {
if len(s.opts.Projections) > 0 {
return s.opts.Projections, nil
}
var (
columns []physical.ColumnExpression
foundStreams = map[int64]struct{}{}
found = map[physical.ColumnExpr]struct{}{}
)
addColumn := func(name string, ty types.ColumnType) {
expr := physical.ColumnExpr{
Ref: types.ColumnRef{Column: name, Type: ty},
}
if _, ok := found[expr]; !ok {
found[expr] = struct{}{}
columns = append(columns, &expr)
}
}
for rec := range h.Range() {
stream, ok := s.streams[rec.StreamID]
if !ok {
// If we hit this, there's a problem with either initStreams (we missed a
// requested stream) or the predicate application, where it returned a
// stream we didn't want.
return nil, fmt.Errorf("stream ID %d not found in stream cache", rec.StreamID)
}
if _, addedStream := foundStreams[rec.StreamID]; !addedStream {
stream.Range(func(label labels.Label) {
addColumn(label.Name, types.ColumnTypeLabel)
})
foundStreams[rec.StreamID] = struct{}{}
}
rec.Metadata.Range(func(label labels.Label) {
addColumn(label.Name, types.ColumnTypeMetadata)
})
}
// Sort existing columns by type (preferring labels) then name.
slices.SortFunc(columns, func(a, b physical.ColumnExpression) int {
aRef, bRef := a.(*physical.ColumnExpr).Ref, b.(*physical.ColumnExpr).Ref
if aRef.Type != bRef.Type {
if aRef.Type == types.ColumnTypeLabel {
return -1 // Labels first.
}
return 1
}
return cmp.Compare(aRef.Column, bRef.Column)
})
// Add fixed columns at the end.
addColumn(types.ColumnNameBuiltinTimestamp, types.ColumnTypeBuiltin)
addColumn(types.ColumnNameBuiltinMessage, types.ColumnTypeBuiltin)
return columns, nil
}
func schemaFromColumns(columns []physical.ColumnExpression) (*arrow.Schema, error) {
var (
fields = make([]arrow.Field, 0, len(columns))
fingerprints = make(map[string]struct{}, len(columns))
)
addField := func(field arrow.Field) {
fp := field.Fingerprint()
if field.HasMetadata() {
// We differentiate column type using metadata, but the metadata isn't
// included in the fingerprint, so we need to manually include it here.
fp += field.Metadata.String()
}
if _, exist := fingerprints[fp]; exist {
return
}
fields = append(fields, field)
fingerprints[fp] = struct{}{}
}
for _, column := range columns {
columnExpr, ok := column.(*physical.ColumnExpr)
if !ok {
return nil, fmt.Errorf("invalid column expression type %T", column)
}
switch columnExpr.Ref.Type {
case types.ColumnTypeLabel:
// TODO(rfratto): Switch to dictionary encoding for labels.
//
// Since labels are more likely to repeat than metadata, we could cut
// down on the memory overhead of a record by dictionary encoding the
// labels.
//
// However, the csv package we use for testing DataObjScan currently
// (2025-05-02) doesn't support dictionary encoding, and we would need
// to find a solution there.
//
// We skipped dictionary encoding for now to get the initial prototype
// working.
ty, md := arrowTypeFromColumnRef(columnExpr.Ref)
addField(arrow.Field{
Name: columnExpr.Ref.Column,
Type: ty,
Nullable: true,
Metadata: md,
})
case types.ColumnTypeMetadata:
// Metadata is *not* encoded using dictionary encoding since metadata is
// has unconstrained cardinality. Using dictionary encoding would require
// tracking every encoded value in the record, which is likely to be too
// expensive.
ty, md := arrowTypeFromColumnRef(columnExpr.Ref)
addField(arrow.Field{
Name: columnExpr.Ref.Column,
Type: ty,
Nullable: true,
Metadata: md,
})
case types.ColumnTypeBuiltin:
ty, md := arrowTypeFromColumnRef(columnExpr.Ref)
addField(arrow.Field{
Name: columnExpr.Ref.Column,
Type: ty,
Nullable: true,
Metadata: md,
})
case types.ColumnTypeAmbiguous:
// The best handling for ambiguous columns (in terms of the schema) is to
// explode it out into multiple columns, one for each type. (Except for
// parsed, which can't be emitted from DataObjScan right now).
//
// TODO(rfratto): should ambiguity be passed down like this? It's odd for
// the returned schema to be different than the set of columns you asked
// for.
//
// As an alternative, ambiguity could be handled by the planner, where it
// performs the explosion and propagates the ambiguity down into the
// predicates.
//
// If we're ok with the schema changing from what was requested, then we
// could update this to resolve the ambiguity at [dataobjScan.effectiveProjections]
// so we don't always explode out to the full set of columns.
addField(arrow.Field{
Name: columnExpr.Ref.Column,
Type: arrow.BinaryTypes.String,
Nullable: true,
Metadata: datatype.ColumnMetadata(types.ColumnTypeLabel, datatype.String),
})
addField(arrow.Field{
Name: columnExpr.Ref.Column,
Type: arrow.BinaryTypes.String,
Nullable: true,
Metadata: datatype.ColumnMetadata(types.ColumnTypeMetadata, datatype.String),
})
case types.ColumnTypeParsed, types.ColumnTypeGenerated:
return nil, fmt.Errorf("parsed column type not supported: %s", columnExpr.Ref.Type)
}
}
return arrow.NewSchema(fields, nil), nil
}
func arrowTypeFromColumnRef(ref types.ColumnRef) (arrow.DataType, arrow.Metadata) {
if ref.Type == types.ColumnTypeBuiltin {
switch ref.Column {
case types.ColumnNameBuiltinTimestamp:
return arrow.FixedWidthTypes.Timestamp_ns, datatype.ColumnMetadataBuiltinTimestamp
case types.ColumnNameBuiltinMessage:
return arrow.BinaryTypes.String, datatype.ColumnMetadataBuiltinMessage
default:
panic(fmt.Sprintf("unsupported builtin column type %s", ref))
}
}
return arrow.BinaryTypes.String, datatype.ColumnMetadata(ref.Type, datatype.String)
}
// appendToBuilder appends a the provided field from record into the given
// builder. The metadata of field is used to determine the category of column.
// appendToBuilder panics if the type of field does not match the datatype of
// builder.
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
1 month ago
func (s *dataobjScan) appendToBuilder(builder array.Builder, field *arrow.Field, record *logs.Record) {
columnType, ok := field.Metadata.GetValue(types.MetadataKeyColumnType)
if !ok {
// This shouldn't happen; we control the metadata here on the fields.
panic(fmt.Sprintf("missing column type in field %s", field.Name))
}
switch columnType {
case types.ColumnTypeLabel.String():
stream, ok := s.streams[record.StreamID]
if !ok {
panic(fmt.Sprintf("stream ID %d not found in stream cache", record.StreamID))
}
val := stream.Get(field.Name)
if val == "" {
builder.(*array.StringBuilder).AppendNull()
} else {
builder.(*array.StringBuilder).Append(val)
}
case types.ColumnTypeMetadata.String():
val := record.Metadata.Get(field.Name)
if val == "" {
builder.(*array.StringBuilder).AppendNull()
} else {
builder.(*array.StringBuilder).Append(val)
}
case types.ColumnTypeBuiltin.String():
if field.Name == types.ColumnNameBuiltinTimestamp {
ts, _ := arrow.TimestampFromTime(record.Timestamp, arrow.Nanosecond)
builder.(*array.TimestampBuilder).Append(ts)
} else if field.Name == types.ColumnNameBuiltinMessage {
// Use the inner BinaryBuilder to avoid converting record.Line to a
// string and back.
builder.(*array.StringBuilder).BinaryBuilder.Append(record.Line)
} else {
panic(fmt.Sprintf("unsupported builtin column %s", field.Name))
}
default:
// This shouldn't happen; we control the metadata here on the fields.
panic(fmt.Sprintf("unsupported column type %s", columnType))
}
}
// Value returns the current [arrow.Record] retrieved by the previous call to
// [dataobjScan.Read], or an error if the record cannot be read.
func (s *dataobjScan) Value() (arrow.Record, error) { return s.state.batch, s.state.err }
// Close closes s and releases all resources.
func (s *dataobjScan) Close() {
for _, reader := range s.readers {
_ = reader.Close()
}
}
// Inputs implements Pipeline and returns nil, since DataObjScan accepts no
// pipelines as input.
func (s *dataobjScan) Inputs() []Pipeline { return nil }
// Transport implements Pipeline and returns [Local].
func (s *dataobjScan) Transport() Transport { return Local }