From 0076bbdb4255c4eda7c70cabd8a0ce079e9686b3 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 5 Jul 2024 13:08:30 +0200 Subject: [PATCH] chore: Refactor storage interface for rf1 (#13415) --- pkg/ingester-rf1/flush.go | 50 ++---- pkg/ingester-rf1/ingester.go | 142 +++++------------- pkg/ingester-rf1/instance.go | 15 +- pkg/ingester-rf1/objstore/storage.go | 132 ++++++++++++++++ pkg/ingester-rf1/stream.go | 38 +++-- pkg/ingester/flush_test.go | 5 - pkg/ingester/ingester_test.go | 5 - pkg/loki/modules.go | 2 +- pkg/querier/querier_mock_test.go | 4 - .../chunk/client/alibaba/oss_object_client.go | 4 +- .../client/aws/dynamodb_storage_client.go | 5 - .../chunk/client/aws/s3_storage_client.go | 14 +- .../chunk/client/azure/blob_storage_client.go | 3 +- .../client/baidubce/bos_storage_client.go | 2 +- .../chunk/client/cassandra/storage_client.go | 6 +- pkg/storage/chunk/client/client.go | 2 - .../chunk/client/congestion/controller.go | 10 +- .../client/congestion/controller_test.go | 2 +- .../client/gcp/bigtable_object_client.go | 5 - .../chunk/client/gcp/gcs_object_client.go | 2 +- .../chunk/client/grpc/storage_client.go | 5 - .../client/ibmcloud/cos_object_client.go | 13 +- .../chunk/client/local/fs_object_client.go | 2 +- pkg/storage/chunk/client/metrics.go | 5 - pkg/storage/chunk/client/object_client.go | 18 +-- .../client/openstack/swift_object_client.go | 2 +- .../chunk/client/prefixed_object_client.go | 2 +- .../testutils/inmemory_storage_client.go | 2 +- pkg/storage/chunk/client/util/reader.go | 17 +++ pkg/storage/store.go | 5 - pkg/storage/stores/composite_store.go | 17 --- pkg/storage/stores/composite_store_test.go | 6 - pkg/storage/stores/series_store_write.go | 5 - pkg/storage/stores/series_store_write_test.go | 9 +- pkg/storage/util_test.go | 5 +- pkg/storage/wal/segment.go | 57 +++---- pkg/storage/wal/segment_test.go | 8 +- 37 files changed, 279 insertions(+), 347 deletions(-) create mode 100644 pkg/ingester-rf1/objstore/storage.go create mode 100644 pkg/storage/chunk/client/util/reader.go diff --git a/pkg/ingester-rf1/flush.go b/pkg/ingester-rf1/flush.go index d46619575e..37b24f6f1a 100644 --- a/pkg/ingester-rf1/flush.go +++ b/pkg/ingester-rf1/flush.go @@ -1,6 +1,7 @@ package ingesterrf1 import ( + "crypto/rand" "fmt" "net/http" "time" @@ -9,14 +10,14 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/ring" - "github.com/prometheus/client_golang/prometheus" + "github.com/grafana/dskit/runutil" + "github.com/oklog/ulid" "github.com/prometheus/common/model" "golang.org/x/net/context" - "github.com/grafana/loki/v3/pkg/chunkenc" - "github.com/grafana/loki/v3/pkg/storage/chunk" "github.com/grafana/loki/v3/pkg/storage/wal" "github.com/grafana/loki/v3/pkg/util" + util_log "github.com/grafana/loki/v3/pkg/util/log" ) const ( @@ -140,7 +141,12 @@ func (i *Ingester) flushOp(l log.Logger, flushCtx *flushCtx) error { // If the flush isn't successful, the operation for this userID is requeued allowing this and all other unflushed // segments to have another opportunity to be flushed. func (i *Ingester) flushSegment(ctx context.Context, ch *wal.SegmentWriter) error { - if err := i.store.PutWal(ctx, ch); err != nil { + reader := ch.Reader() + defer runutil.CloseWithLogOnErr(util_log.Logger, reader, "flushSegment") + + newUlid := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader) + + if err := i.store.PutObject(ctx, fmt.Sprintf("loki-v2/wal/anon/"+newUlid.String()), reader); err != nil { i.metrics.chunksFlushFailures.Inc() return fmt.Errorf("store put chunk: %w", err) } @@ -148,39 +154,3 @@ func (i *Ingester) flushSegment(ctx context.Context, ch *wal.SegmentWriter) erro // TODO: report some flush metrics return nil } - -// reportFlushedChunkStatistics calculate overall statistics of flushed chunks without compromising the flush process. -func (i *Ingester) reportFlushedChunkStatistics(ch *chunk.Chunk, desc *chunkDesc, sizePerTenant prometheus.Counter, countPerTenant prometheus.Counter, reason string) { - byt, err := ch.Encoded() - if err != nil { - level.Error(i.logger).Log("msg", "failed to encode flushed wire chunk", "err", err) - return - } - - i.metrics.chunksFlushedPerReason.WithLabelValues(reason).Add(1) - - compressedSize := float64(len(byt)) - uncompressedSize, ok := chunkenc.UncompressedSize(ch.Data) - - if ok && compressedSize > 0 { - i.metrics.chunkCompressionRatio.Observe(float64(uncompressedSize) / compressedSize) - } - - utilization := ch.Data.Utilization() - i.metrics.chunkUtilization.Observe(utilization) - numEntries := desc.chunk.Size() - i.metrics.chunkEntries.Observe(float64(numEntries)) - i.metrics.chunkSize.Observe(compressedSize) - sizePerTenant.Add(compressedSize) - countPerTenant.Inc() - - boundsFrom, boundsTo := desc.chunk.Bounds() - i.metrics.chunkAge.Observe(time.Since(boundsFrom).Seconds()) - i.metrics.chunkLifespan.Observe(boundsTo.Sub(boundsFrom).Hours()) - - i.metrics.flushedChunksBytesStats.Record(compressedSize) - i.metrics.flushedChunksLinesStats.Record(float64(numEntries)) - i.metrics.flushedChunksUtilizationStats.Record(utilization) - i.metrics.flushedChunksAgeStats.Record(time.Since(boundsFrom).Seconds()) - i.metrics.flushedChunksLifespanStats.Record(boundsTo.Sub(boundsFrom).Seconds()) -} diff --git a/pkg/ingester-rf1/ingester.go b/pkg/ingester-rf1/ingester.go index fef8418945..d87159952e 100644 --- a/pkg/ingester-rf1/ingester.go +++ b/pkg/ingester-rf1/ingester.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "io" "math/rand" "net/http" "os" @@ -16,9 +17,10 @@ import ( "github.com/opentracing/opentracing-go" "github.com/grafana/loki/v3/pkg/ingester-rf1/clientpool" + "github.com/grafana/loki/v3/pkg/ingester-rf1/objstore" "github.com/grafana/loki/v3/pkg/ingester/index" "github.com/grafana/loki/v3/pkg/loghttp/push" - lokilog "github.com/grafana/loki/v3/pkg/logql/log" + "github.com/grafana/loki/v3/pkg/storage" "github.com/grafana/loki/v3/pkg/storage/types" "github.com/grafana/loki/v3/pkg/storage/wal" util_log "github.com/grafana/loki/v3/pkg/util/log" @@ -35,20 +37,13 @@ import ( "github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc/health/grpc_health_v1" - server_util "github.com/grafana/loki/v3/pkg/util/server" - "github.com/grafana/loki/v3/pkg/analytics" "github.com/grafana/loki/v3/pkg/chunkenc" "github.com/grafana/loki/v3/pkg/distributor/writefailures" "github.com/grafana/loki/v3/pkg/ingester/client" "github.com/grafana/loki/v3/pkg/logproto" - "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/runtime" - "github.com/grafana/loki/v3/pkg/storage" - "github.com/grafana/loki/v3/pkg/storage/chunk" "github.com/grafana/loki/v3/pkg/storage/config" - "github.com/grafana/loki/v3/pkg/storage/stores" - indexstore "github.com/grafana/loki/v3/pkg/storage/stores/index" "github.com/grafana/loki/v3/pkg/util" ) @@ -121,7 +116,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.FlushOpBackoff.MaxRetries, "ingester-rf1.flush-op-backoff-retries", 10, "Maximum retries for failed flushes.") f.DurationVar(&cfg.FlushOpTimeout, "ingester-rf1.flush-op-timeout", 10*time.Minute, "The timeout for an individual flush. Will be retried up to `flush-op-backoff-retries` times.") f.DurationVar(&cfg.RetainPeriod, "ingester-rf1.chunks-retain-period", 0, "How long chunks should be retained in-memory after they've been flushed.") - //f.DurationVar(&cfg.MaxChunkIdle, "ingester-rf1.chunks-idle-period", 30*time.Minute, "How long chunks should sit in-memory with no updates before being flushed if they don't hit the max block size. This means that half-empty chunks will still be flushed after a certain period as long as they receive no further activity.") + // f.DurationVar(&cfg.MaxChunkIdle, "ingester-rf1.chunks-idle-period", 30*time.Minute, "How long chunks should sit in-memory with no updates before being flushed if they don't hit the max block size. This means that half-empty chunks will still be flushed after a certain period as long as they receive no further activity.") f.IntVar(&cfg.BlockSize, "ingester-rf1.chunks-block-size", 256*1024, "The targeted _uncompressed_ size in bytes of a chunk block When this threshold is exceeded the head block will be cut and compressed inside the chunk.") f.IntVar(&cfg.TargetChunkSize, "ingester-rf1.chunk-target-size", 1572864, "A target _compressed_ size in bytes for chunks. This is a desired size not an exact size, chunks may be slightly bigger or significantly smaller if they get flushed for other reasons (e.g. chunk_idle_period). A value of 0 creates chunks with a fixed 10 blocks, a non zero value will create chunks with a variable number of blocks to meet the target size.") // 1.5 MB f.StringVar(&cfg.ChunkEncoding, "ingester-rf1.chunk-encoding", chunkenc.EncGZIP.String(), fmt.Sprintf("The algorithm to use for compressing chunk. (%s)", chunkenc.SupportedEncoding())) @@ -159,13 +154,10 @@ type Wrapper interface { Wrap(wrapped Interface) Interface } -// Store is the store interface we need on the ingester. -type Store interface { - stores.ChunkWriter - stores.ChunkFetcher - storage.SelectStore - storage.SchemaConfigProvider - indexstore.StatsReader +// Storage is the store interface we need on the ingester. +type Storage interface { + PutObject(ctx context.Context, objectKey string, object io.Reader) error + Stop() } // Interface is an interface for the Ingester @@ -174,8 +166,6 @@ type Interface interface { http.Handler logproto.PusherServer - //logproto.QuerierServer - //logproto.StreamDataServer CheckReady(ctx context.Context) error FlushHandler(w http.ResponseWriter, _ *http.Request) @@ -218,7 +208,7 @@ type Ingester struct { lifecycler *ring.Lifecycler lifecyclerWatcher *services.FailureWatcher - store Store + store Storage periodicConfigs []config.PeriodConfig loopDone sync.WaitGroup @@ -240,14 +230,10 @@ type Ingester struct { terminateOnShutdown bool // Only used by WAL & flusher to coordinate backpressure during replay. - //replayController *replayController + // replayController *replayController metrics *ingesterMetrics - chunkFilter chunk.RequestChunkFilterer - extractorWrapper lokilog.SampleExtractorWrapper - pipelineWrapper lokilog.PipelineWrapper - streamRateCalculator *StreamRateCalculator writeLogManager *writefailures.Manager @@ -256,11 +242,25 @@ type Ingester struct { // recalculateOwnedStreams periodically checks the ring for changes and recalculates owned streams for each instance. readRing ring.ReadRing - //recalculateOwnedStreams *recalculateOwnedStreams + // recalculateOwnedStreams *recalculateOwnedStreams } // New makes a new Ingester. -func New(cfg Config, clientConfig client.Config, store Store, limits Limits, configs *runtime.TenantConfigs, registerer prometheus.Registerer, writeFailuresCfg writefailures.Cfg, metricsNamespace string, logger log.Logger, customStreamsTracker push.UsageTracker, readRing ring.ReadRing) (*Ingester, error) { +func New(cfg Config, clientConfig client.Config, + periodConfigs []config.PeriodConfig, + storageConfig storage.Config, + clientMetrics storage.ClientMetrics, + limits Limits, configs *runtime.TenantConfigs, + registerer prometheus.Registerer, + writeFailuresCfg writefailures.Cfg, + metricsNamespace string, + logger log.Logger, + customStreamsTracker push.UsageTracker, readRing ring.ReadRing, +) (*Ingester, error) { + storage, err := objstore.New(periodConfigs, storageConfig, clientMetrics) + if err != nil { + return nil, err + } if cfg.ingesterClientFactory == nil { cfg.ingesterClientFactory = client.New } @@ -279,13 +279,13 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con clientConfig: clientConfig, tenantConfigs: configs, instances: map[string]*instance{}, - store: store, - periodicConfigs: store.GetSchemaConfigs(), + store: storage, + periodicConfigs: periodConfigs, loopQuit: make(chan struct{}), flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes), tailersQuit: make(chan struct{}), metrics: metrics, - //flushOnShutdownSwitch: &OnceSwitch{}, + // flushOnShutdownSwitch: &OnceSwitch{}, terminateOnShutdown: false, streamRateCalculator: NewStreamRateCalculator(), writeLogManager: writefailures.NewManager(logger, registerer, writeFailuresCfg, configs, "ingester_rf1"), @@ -298,7 +298,6 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con segmentWriter: segmentWriter, }, } - //i.replayController = newReplayController(metrics, cfg.WAL, &replayFlusher{i}) // TODO: change flush on shutdown i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester-rf1", "ingester-rf1-ring", true, logger, prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer)) @@ -334,18 +333,6 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con return i, nil } -func (i *Ingester) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) { - i.chunkFilter = chunkFilter -} - -func (i *Ingester) SetExtractorWrapper(wrapper lokilog.SampleExtractorWrapper) { - i.extractorWrapper = wrapper -} - -func (i *Ingester) SetPipelineWrapper(wrapper lokilog.PipelineWrapper) { - i.pipelineWrapper = wrapper -} - // setupAutoForget looks for ring status if `AutoForgetUnhealthy` is enabled // when enabled, unhealthy ingesters that reach `ring.kvstore.heartbeat_timeout` are removed from the ring every `HeartbeatPeriod` func (i *Ingester) setupAutoForget() { @@ -490,7 +477,7 @@ func (i *Ingester) running(ctx context.Context) error { func (i *Ingester) stopping(_ error) error { i.stopIncomingRequests() var errs util.MultiError - //errs.Add(i.wal.Stop()) + // errs.Add(i.wal.Stop()) //if i.flushOnShutdownSwitch.Get() { // i.lifecycler.SetFlushOnShutdown(true) @@ -502,7 +489,7 @@ func (i *Ingester) stopping(_ error) error { } i.flushQueuesDone.Wait() - //i.streamRateCalculator.Stop() + // i.streamRateCalculator.Stop() // In case the flag to terminate on shutdown is set or this instance is marked to release its resources, // we need to mark the ingester service as "failed", so Loki will shut down entirely. @@ -511,6 +498,7 @@ func (i *Ingester) stopping(_ error) error { i.removeShutdownMarkerFile() return modules.ErrStopProcess } + i.store.Stop() return errs.Err() } @@ -581,7 +569,7 @@ func (i *Ingester) loop() { func (i *Ingester) doFlushTick() { i.flushCtx.lock.Lock() - //i.logger.Log("msg", "starting periodic flush") + // i.logger.Log("msg", "starting periodic flush") // Stop new chunks being written while we swap destinations - we'll never unlock as this flushctx can no longer be used. currentFlushCtx := i.flushCtx @@ -708,7 +696,7 @@ func createShutdownMarker(p string) error { return err } - dir, err := os.OpenFile(path.Dir(p), os.O_RDONLY, 0777) + dir, err := os.OpenFile(path.Dir(p), os.O_RDONLY, 0o777) if err != nil { return err } @@ -725,7 +713,7 @@ func removeShutdownMarker(p string) error { return err } - dir, err := os.OpenFile(path.Dir(p), os.O_RDONLY, 0777) + dir, err := os.OpenFile(path.Dir(p), os.O_RDONLY, 0o777) if err != nil { return err } @@ -811,7 +799,7 @@ func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logpro // Fetch a flush context and try to acquire the RLock // The only time the Write Lock is held is when this context is no longer usable and a new one is being created. // In this case, we need to re-read i.flushCtx in order to fetch the new one as soon as it's available. - //The newCtxAvailable chan is closed as soon as the new one is available to avoid a busy loop. + // The newCtxAvailable chan is closed as soon as the new one is available to avoid a busy loop. currentFlushCtx := i.flushCtx for !currentFlushCtx.lock.TryRLock() { select { @@ -863,7 +851,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { / inst, ok = i.instances[instanceID] if !ok { var err error - inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.metrics, i.chunkFilter, i.pipelineWrapper, i.extractorWrapper, i.streamRateCalculator, i.writeLogManager, i.customStreamsTracker) + inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.metrics, i.streamRateCalculator, i.writeLogManager, i.customStreamsTracker) if err != nil { return nil, err } @@ -894,62 +882,6 @@ func (i *Ingester) asyncStoreMaxLookBack() time.Duration { return maxLookBack } -// GetChunkIDs is meant to be used only when using an async store like boltdb-shipper or tsdb. -func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsRequest) (*logproto.GetChunkIDsResponse, error) { - gcr, err := i.getChunkIDs(ctx, req) - err = server_util.ClientGrpcStatusAndError(err) - return gcr, err -} - -// GetChunkIDs is meant to be used only when using an async store like boltdb-shipper or tsdb. -func (i *Ingester) getChunkIDs(ctx context.Context, req *logproto.GetChunkIDsRequest) (*logproto.GetChunkIDsResponse, error) { - orgID, err := tenant.TenantID(ctx) - if err != nil { - return nil, err - } - - // Set profiling tags - defer pprof.SetGoroutineLabels(ctx) - ctx = pprof.WithLabels(ctx, pprof.Labels("path", "read", "type", "chunkIDs", "tenant", orgID)) - pprof.SetGoroutineLabels(ctx) - - asyncStoreMaxLookBack := i.asyncStoreMaxLookBack() - if asyncStoreMaxLookBack == 0 { - return &logproto.GetChunkIDsResponse{}, nil - } - - reqStart := req.Start - reqStart = adjustQueryStartTime(asyncStoreMaxLookBack, reqStart, time.Now()) - - // parse the request - start, end := util.RoundToMilliseconds(reqStart, req.End) - matchers, err := syntax.ParseMatchers(req.Matchers, true) - if err != nil { - return nil, err - } - - // get chunk references - chunksGroups, _, err := i.store.GetChunks(ctx, orgID, start, end, chunk.NewPredicate(matchers, nil), nil) - if err != nil { - return nil, err - } - - // todo (Callum) ingester should maybe store the whole schema config? - s := config.SchemaConfig{ - Configs: i.periodicConfigs, - } - - // build the response - resp := logproto.GetChunkIDsResponse{ChunkIDs: []string{}} - for _, chunks := range chunksGroups { - for _, chk := range chunks { - resp.ChunkIDs = append(resp.ChunkIDs, s.ExternalKey(chk.ChunkRef)) - } - } - - return &resp, nil -} - // Watch implements grpc_health_v1.HealthCheck. func (*Ingester) Watch(*grpc_health_v1.HealthCheckRequest, grpc_health_v1.Health_WatchServer) error { return nil diff --git a/pkg/ingester-rf1/instance.go b/pkg/ingester-rf1/instance.go index e5c5454971..16a6758f4e 100644 --- a/pkg/ingester-rf1/instance.go +++ b/pkg/ingester-rf1/instance.go @@ -20,10 +20,8 @@ import ( "github.com/grafana/loki/v3/pkg/ingester/index" "github.com/grafana/loki/v3/pkg/loghttp/push" "github.com/grafana/loki/v3/pkg/logproto" - "github.com/grafana/loki/v3/pkg/logql/log" "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/runtime" - "github.com/grafana/loki/v3/pkg/storage/chunk" "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/util/constants" util_log "github.com/grafana/loki/v3/pkg/util/log" @@ -69,7 +67,7 @@ type instance struct { streamsCreatedTotal prometheus.Counter streamsRemovedTotal prometheus.Counter - //tailers map[uint32]*tailer + // tailers map[uint32]*tailer tailerMtx sync.RWMutex limiter *Limiter @@ -80,9 +78,6 @@ type instance struct { metrics *ingesterMetrics - chunkFilter chunk.RequestChunkFilterer - pipelineWrapper log.PipelineWrapper - extractorWrapper log.SampleExtractorWrapper streamRateCalculator *StreamRateCalculator writeFailures *writefailures.Manager @@ -123,9 +118,6 @@ func newInstance( limiter *Limiter, configs *runtime.TenantConfigs, metrics *ingesterMetrics, - chunkFilter chunk.RequestChunkFilterer, - pipelineWrapper log.PipelineWrapper, - extractorWrapper log.SampleExtractorWrapper, streamRateCalculator *StreamRateCalculator, writeFailures *writefailures.Manager, customStreamsTracker push.UsageTracker, @@ -154,9 +146,6 @@ func newInstance( ownedStreamsSvc: ownedStreamsSvc, configs: configs, metrics: metrics, - chunkFilter: chunkFilter, - pipelineWrapper: pipelineWrapper, - extractorWrapper: extractorWrapper, streamRateCalculator: streamRateCalculator, @@ -286,7 +275,7 @@ func (i *instance) onStreamCreated(s *stream) { memoryStreams.WithLabelValues(i.instanceID).Inc() memoryStreamsLabelsBytes.Add(float64(len(s.labels.String()))) i.streamsCreatedTotal.Inc() - //i.addTailersToNewStream(s) + // i.addTailersToNewStream(s) streamsCountStats.Add(1) i.ownedStreamsSvc.incOwnedStreamCount() if i.configs.LogStreamCreation(i.instanceID) { diff --git a/pkg/ingester-rf1/objstore/storage.go b/pkg/ingester-rf1/objstore/storage.go new file mode 100644 index 0000000000..8ec7cf6970 --- /dev/null +++ b/pkg/ingester-rf1/objstore/storage.go @@ -0,0 +1,132 @@ +package objstore + +import ( + "context" + "fmt" + "io" + "sort" + + "github.com/prometheus/common/model" + + "github.com/grafana/loki/v3/pkg/storage" + "github.com/grafana/loki/v3/pkg/storage/chunk/client" + "github.com/grafana/loki/v3/pkg/storage/config" +) + +type Multi struct { + stores []*storeEntry + storageConfig storage.Config +} + +type storeEntry struct { + start model.Time + cfg config.PeriodConfig + objectClient client.ObjectClient +} + +var _ client.ObjectClient = (*Multi)(nil) + +func New( + periodicConfigs []config.PeriodConfig, + storageConfig storage.Config, + clientMetrics storage.ClientMetrics, +) (*Multi, error) { + store := &Multi{ + storageConfig: storageConfig, + } + // sort by From time + sort.Slice(periodicConfigs, func(i, j int) bool { + return periodicConfigs[i].From.Time.Before(periodicConfigs[i].From.Time) + }) + for _, periodicConfig := range periodicConfigs { + objectClient, err := storage.NewObjectClient(periodicConfig.ObjectType, storageConfig, clientMetrics) + if err != nil { + return nil, fmt.Errorf("creating object client for period %s: %w ", periodicConfig.From, err) + } + store.stores = append(store.stores, &storeEntry{ + start: periodicConfig.From.Time, + cfg: periodicConfig, + objectClient: objectClient, + }) + } + return store, nil +} + +func (m *Multi) GetStoreFor(ts model.Time) (client.ObjectClient, error) { + // find the schema with the lowest start _after_ tm + j := sort.Search(len(m.stores), func(j int) bool { + return m.stores[j].start > ts + }) + + // reduce it by 1 because we want a schema with start <= tm + j-- + + if 0 <= j && j < len(m.stores) { + return m.stores[j].objectClient, nil + } + + // should in theory never happen + return nil, fmt.Errorf("no store found for timestamp %s", ts) +} + +func (m *Multi) ObjectExists(ctx context.Context, objectKey string) (bool, error) { + s, err := m.GetStoreFor(model.Now()) + if err != nil { + return false, err + } + return s.ObjectExists(ctx, objectKey) +} + +func (m *Multi) PutObject(ctx context.Context, objectKey string, object io.Reader) error { + s, err := m.GetStoreFor(model.Now()) + if err != nil { + return err + } + return s.PutObject(ctx, objectKey, object) +} + +func (m *Multi) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) { + s, err := m.GetStoreFor(model.Now()) + if err != nil { + return nil, 0, err + } + return s.GetObject(ctx, objectKey) +} + +func (m *Multi) List(ctx context.Context, prefix string, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { + s, err := m.GetStoreFor(model.Now()) + if err != nil { + return nil, nil, err + } + return s.List(ctx, prefix, delimiter) +} + +func (m *Multi) DeleteObject(ctx context.Context, objectKey string) error { + s, err := m.GetStoreFor(model.Now()) + if err != nil { + return err + } + return s.DeleteObject(ctx, objectKey) +} + +func (m *Multi) IsObjectNotFoundErr(err error) bool { + s, _ := m.GetStoreFor(model.Now()) + if s == nil { + return false + } + return s.IsObjectNotFoundErr(err) +} + +func (m *Multi) IsRetryableErr(err error) bool { + s, _ := m.GetStoreFor(model.Now()) + if s == nil { + return false + } + return s.IsRetryableErr(err) +} + +func (m *Multi) Stop() { + for _, s := range m.stores { + s.objectClient.Stop() + } +} diff --git a/pkg/ingester-rf1/stream.go b/pkg/ingester-rf1/stream.go index 932c6244bb..8bd7bdd0e3 100644 --- a/pkg/ingester-rf1/stream.go +++ b/pkg/ingester-rf1/stream.go @@ -52,8 +52,8 @@ type stream struct { metrics *ingesterMetrics - //tailers map[uint32]*tailer - //tailerMtx sync.RWMutex + // tailers map[uint32]*tailer + // tailerMtx sync.RWMutex // entryCt is a counter which is incremented on each accepted entry. // This allows us to discard WAL entries during replays which were @@ -63,7 +63,7 @@ type stream struct { entryCt int64 unorderedWrites bool - //streamRateCalculator *StreamRateCalculator + // streamRateCalculator *StreamRateCalculator writeFailures *writefailures.Manager @@ -95,11 +95,11 @@ func newStream( fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, - //streamRateCalculator *StreamRateCalculator, + // streamRateCalculator *StreamRateCalculator, metrics *ingesterMetrics, writeFailures *writefailures.Manager, ) *stream { - //hashNoShard, _ := labels.HashWithoutLabels(make([]byte, 0, 1024), ShardLbName) + // hashNoShard, _ := labels.HashWithoutLabels(make([]byte, 0, 1024), ShardLbName) return &stream{ limiter: NewStreamRateLimiter(limits, tenant, 10*time.Second), cfg: cfg, @@ -107,11 +107,11 @@ func newStream( labels: labels, labelsString: labels.String(), labelHash: labels.Hash(), - //labelHashNoShard: hashNoShard, - //tailers: map[uint32]*tailer{}, + // labelHashNoShard: hashNoShard, + // tailers: map[uint32]*tailer{}, metrics: metrics, tenant: tenant, - //streamRateCalculator: streamRateCalculator, + // streamRateCalculator: streamRateCalculator, unorderedWrites: unorderedWrites, writeFailures: writeFailures, @@ -137,13 +137,12 @@ func (s *stream) Push( usageTracker push.UsageTracker, flushCtx *flushCtx, ) (int, error) { - toStore, invalid := s.validateEntries(ctx, entries, rateLimitWholeStream, usageTracker) if rateLimitWholeStream && hasRateLimitErr(invalid) { return 0, errorForFailedEntries(s, invalid, len(entries)) } - bytesAdded, _ := s.storeEntries(ctx, toStore, usageTracker, flushCtx) + bytesAdded := s.storeEntries(ctx, toStore, usageTracker, flushCtx) return bytesAdded, errorForFailedEntries(s, invalid, len(entries)) } @@ -196,7 +195,7 @@ func hasRateLimitErr(errs []entryWithError) bool { return ok } -func (s *stream) storeEntries(ctx context.Context, entries []logproto.Entry, usageTracker push.UsageTracker, flushCtx *flushCtx) (int, []*logproto.Entry) { +func (s *stream) storeEntries(ctx context.Context, entries []*logproto.Entry, usageTracker push.UsageTracker, flushCtx *flushCtx) int { if sp := opentracing.SpanFromContext(ctx); sp != nil { sp.LogKV("event", "stream started to store entries", "labels", s.labelsString) defer sp.LogKV("event", "stream finished to store entries") @@ -204,7 +203,6 @@ func (s *stream) storeEntries(ctx context.Context, entries []logproto.Entry, usa var bytesAdded, outOfOrderSamples, outOfOrderBytes int - storedEntries := make([]*logproto.Entry, 0, len(entries)) for i := 0; i < len(entries); i++ { s.entryCt++ s.lastLine.ts = entries[i].Timestamp @@ -214,15 +212,13 @@ func (s *stream) storeEntries(ctx context.Context, entries []logproto.Entry, usa } bytesAdded += len(entries[i].Line) - storedEntries = append(storedEntries, &entries[i]) } - flushCtx.segmentWriter.Append(s.tenant, s.labels.String(), s.labels, storedEntries) + flushCtx.segmentWriter.Append(s.tenant, s.labels.String(), s.labels, entries) s.reportMetrics(ctx, outOfOrderSamples, outOfOrderBytes, 0, 0, usageTracker) - return bytesAdded, storedEntries + return bytesAdded } -func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry, rateLimitWholeStream bool, usageTracker push.UsageTracker) ([]logproto.Entry, []entryWithError) { - +func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry, rateLimitWholeStream bool, usageTracker push.UsageTracker) ([]*logproto.Entry, []entryWithError) { var ( outOfOrderSamples, outOfOrderBytes int rateLimitedSamples, rateLimitedBytes int @@ -231,7 +227,7 @@ func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry, limit = s.limiter.lim.Limit() lastLine = s.lastLine highestTs = s.highestTs - toStore = make([]logproto.Entry, 0, len(entries)) + toStore = make([]*logproto.Entry, 0, len(entries)) ) for i := range entries { @@ -277,7 +273,7 @@ func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry, highestTs = entries[i].Timestamp } - toStore = append(toStore, entries[i]) + toStore = append(toStore, &entries[i]) } // Each successful call to 'AllowN' advances the limiter. With all-or-nothing @@ -289,12 +285,12 @@ func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry, rateLimitedSamples = len(toStore) failedEntriesWithError = make([]entryWithError, 0, len(toStore)) for i := 0; i < len(toStore); i++ { - failedEntriesWithError = append(failedEntriesWithError, entryWithError{&toStore[i], &validation.ErrStreamRateLimit{RateLimit: flagext.ByteSize(limit), Labels: s.labelsString, Bytes: flagext.ByteSize(len(toStore[i].Line))}}) + failedEntriesWithError = append(failedEntriesWithError, entryWithError{toStore[i], &validation.ErrStreamRateLimit{RateLimit: flagext.ByteSize(limit), Labels: s.labelsString, Bytes: flagext.ByteSize(len(toStore[i].Line))}}) rateLimitedBytes += len(toStore[i].Line) } } - //s.streamRateCalculator.Record(s.tenant, s.labelHash, s.labelHashNoShard, totalBytes) + // s.streamRateCalculator.Record(s.tenant, s.labelHash, s.labelHashNoShard, totalBytes) s.reportMetrics(ctx, outOfOrderSamples, outOfOrderBytes, rateLimitedSamples, rateLimitedBytes, usageTracker) return toStore, failedEntriesWithError } diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index a33b9d8ba4..69462a3d35 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -37,7 +37,6 @@ import ( "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores/index/stats" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding" - walsegment "github.com/grafana/loki/v3/pkg/storage/wal" "github.com/grafana/loki/v3/pkg/util/constants" "github.com/grafana/loki/v3/pkg/validation" ) @@ -433,10 +432,6 @@ func defaultIngesterTestConfig(t testing.TB) Config { return cfg } -func (s *testStore) PutWal(_ context.Context, _ *walsegment.SegmentWriter) error { - return nil -} - func (s *testStore) Put(ctx context.Context, chunks []chunk.Chunk) error { s.mtx.Lock() defer s.mtx.Unlock() diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index d9f924352d..871a3082e0 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -48,7 +48,6 @@ import ( "github.com/grafana/loki/v3/pkg/storage/stores/index/seriesvolume" "github.com/grafana/loki/v3/pkg/storage/stores/index/stats" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding" - "github.com/grafana/loki/v3/pkg/storage/wal" "github.com/grafana/loki/v3/pkg/util/constants" "github.com/grafana/loki/v3/pkg/validation" ) @@ -436,10 +435,6 @@ type mockStore struct { chunks map[string][]chunk.Chunk } -func (s *mockStore) PutWal(_ context.Context, _ *wal.SegmentWriter) error { - return nil -} - func (s *mockStore) Put(ctx context.Context, chunks []chunk.Chunk) error { s.mtx.Lock() defer s.mtx.Unlock() diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 9734d29a7e..f7fb901e0c 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -644,7 +644,7 @@ func (t *Loki) initIngesterRF1() (_ services.Service, err error) { level.Warn(util_log.Logger).Log("msg", "The config setting shutdown marker path is not set. The /ingester/prepare_shutdown endpoint won't work") } - t.IngesterRF1, err = ingester_rf1.New(t.Cfg.IngesterRF1, t.Cfg.IngesterRF1Client, t.Store, t.Overrides, t.tenantConfigs, prometheus.DefaultRegisterer, t.Cfg.Distributor.WriteFailuresLogging, t.Cfg.MetricsNamespace, logger, t.UsageTracker, t.ring) + t.IngesterRF1, err = ingester_rf1.New(t.Cfg.IngesterRF1, t.Cfg.IngesterRF1Client, t.Cfg.SchemaConfig.Configs, t.Cfg.StorageConfig, t.ClientMetrics, t.Overrides, t.tenantConfigs, prometheus.DefaultRegisterer, t.Cfg.Distributor.WriteFailuresLogging, t.Cfg.MetricsNamespace, logger, t.UsageTracker, t.ring) if err != nil { fmt.Println("Error initializing ingester rf1", err) return diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index 60d26fee28..20c3b9f1b7 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -8,7 +8,6 @@ import ( "time" "github.com/grafana/loki/v3/pkg/logql/log" - "github.com/grafana/loki/v3/pkg/storage/wal" "github.com/grafana/loki/v3/pkg/loghttp" @@ -340,9 +339,6 @@ func (s *storeMock) GetChunks(ctx context.Context, userID string, from, through return args.Get(0).([][]chunk.Chunk), args.Get(0).([]*fetcher.Fetcher), args.Error(2) } -func (s *storeMock) PutWal(_ context.Context, _ *wal.SegmentWriter) error { - return errors.New("storeMock.PutWal() has not been mocked") -} func (s *storeMock) Put(_ context.Context, _ []chunk.Chunk) error { return errors.New("storeMock.Put() has not been mocked") } diff --git a/pkg/storage/chunk/client/alibaba/oss_object_client.go b/pkg/storage/chunk/client/alibaba/oss_object_client.go index 3e7674467a..d8446f6db9 100644 --- a/pkg/storage/chunk/client/alibaba/oss_object_client.go +++ b/pkg/storage/chunk/client/alibaba/oss_object_client.go @@ -106,18 +106,16 @@ func (s *OssObjectClient) GetObject(ctx context.Context, objectKey string) (io.R return nil, 0, err } return resp.Response.Body, int64(size), err - } // PutObject puts the specified bytes into the configured OSS bucket at the provided key -func (s *OssObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error { +func (s *OssObjectClient) PutObject(ctx context.Context, objectKey string, object io.Reader) error { return instrument.CollectedRequest(ctx, "OSS.PutObject", ossRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { if err := s.defaultBucket.PutObject(objectKey, object); err != nil { return errors.Wrap(err, "failed to put oss object") } return nil }) - } // List implements chunk.ObjectClient. diff --git a/pkg/storage/chunk/client/aws/dynamodb_storage_client.go b/pkg/storage/chunk/client/aws/dynamodb_storage_client.go index b70c4269ed..87fd24e127 100644 --- a/pkg/storage/chunk/client/aws/dynamodb_storage_client.go +++ b/pkg/storage/chunk/client/aws/dynamodb_storage_client.go @@ -33,7 +33,6 @@ import ( client_util "github.com/grafana/loki/v3/pkg/storage/chunk/client/util" "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores/series/index" - "github.com/grafana/loki/v3/pkg/storage/wal" "github.com/grafana/loki/v3/pkg/util" "github.com/grafana/loki/v3/pkg/util/log" "github.com/grafana/loki/v3/pkg/util/math" @@ -119,10 +118,6 @@ type dynamoDBStorageClient struct { metrics *dynamoDBMetrics } -func (a dynamoDBStorageClient) PutWal(_ context.Context, _ *wal.SegmentWriter) error { - return errors.New("not implemented") -} - // NewDynamoDBIndexClient makes a new DynamoDB-backed IndexClient. func NewDynamoDBIndexClient(cfg DynamoDBConfig, schemaCfg config.SchemaConfig, reg prometheus.Registerer) (index.Client, error) { return newDynamoDBStorageClient(cfg, schemaCfg, reg) diff --git a/pkg/storage/chunk/client/aws/s3_storage_client.go b/pkg/storage/chunk/client/aws/s3_storage_client.go index bae0fce22d..c2a50dd16f 100644 --- a/pkg/storage/chunk/client/aws/s3_storage_client.go +++ b/pkg/storage/chunk/client/aws/s3_storage_client.go @@ -30,6 +30,7 @@ import ( bucket_s3 "github.com/grafana/loki/v3/pkg/storage/bucket/s3" "github.com/grafana/loki/v3/pkg/storage/chunk/client" "github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging" + clientutil "github.com/grafana/loki/v3/pkg/storage/chunk/client/util" storageawscommon "github.com/grafana/loki/v3/pkg/storage/common/aws" "github.com/grafana/loki/v3/pkg/util" "github.com/grafana/loki/v3/pkg/util/constants" @@ -309,7 +310,6 @@ func (a *S3ObjectClient) ObjectExists(ctx context.Context, objectKey string) (bo _, err := a.S3.HeadObject(headObjectInput) return err }) - if err != nil { return false, err } @@ -381,10 +381,14 @@ func (a *S3ObjectClient) GetObject(ctx context.Context, objectKey string) (io.Re } // PutObject into the store -func (a *S3ObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error { +func (a *S3ObjectClient) PutObject(ctx context.Context, objectKey string, object io.Reader) error { return loki_instrument.TimeRequest(ctx, "S3.PutObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + readSeeker, err := clientutil.ReadSeeker(object) + if err != nil { + return err + } putObjectInput := &s3.PutObjectInput{ - Body: object, + Body: readSeeker, Bucket: aws.String(a.bucketFromKey(objectKey)), Key: aws.String(objectKey), StorageClass: aws.String(a.cfg.StorageClass), @@ -396,7 +400,7 @@ func (a *S3ObjectClient) PutObject(ctx context.Context, objectKey string, object putObjectInput.SSEKMSEncryptionContext = a.sseConfig.KMSEncryptionContext } - _, err := a.S3.PutObjectWithContext(ctx, putObjectInput) + _, err = a.S3.PutObjectWithContext(ctx, putObjectInput) return err }) } @@ -405,7 +409,7 @@ func (a *S3ObjectClient) PutObject(ctx context.Context, objectKey string, object func (a *S3ObjectClient) List(ctx context.Context, prefix, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { var storageObjects []client.StorageObject var commonPrefixes []client.StorageCommonPrefix - var commonPrefixesSet = make(map[string]bool) + commonPrefixesSet := make(map[string]bool) for i := range a.bucketNames { err := loki_instrument.TimeRequest(ctx, "S3.List", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error { diff --git a/pkg/storage/chunk/client/azure/blob_storage_client.go b/pkg/storage/chunk/client/azure/blob_storage_client.go index 7c5f5bb496..f1e93b993d 100644 --- a/pkg/storage/chunk/client/azure/blob_storage_client.go +++ b/pkg/storage/chunk/client/azure/blob_storage_client.go @@ -229,7 +229,6 @@ func (b *BlobStorage) ObjectExists(ctx context.Context, objectKey string) (bool, _, err = blockBlobURL.GetProperties(ctx, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{}) return err }) - if err != nil { return false, err } @@ -278,7 +277,7 @@ func (b *BlobStorage) getObject(ctx context.Context, objectKey string) (rc io.Re return downloadResponse.Body(azblob.RetryReaderOptions{MaxRetryRequests: b.cfg.MaxRetries}), downloadResponse.ContentLength(), nil } -func (b *BlobStorage) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error { +func (b *BlobStorage) PutObject(ctx context.Context, objectKey string, object io.Reader) error { return loki_instrument.TimeRequest(ctx, "azure.PutObject", instrument.NewHistogramCollector(b.metrics.requestDuration), instrument.ErrorCode, func(ctx context.Context) error { blockBlobURL, err := b.getBlobURL(objectKey, false) if err != nil { diff --git a/pkg/storage/chunk/client/baidubce/bos_storage_client.go b/pkg/storage/chunk/client/baidubce/bos_storage_client.go index 30a9e97f49..b9abd8c90d 100644 --- a/pkg/storage/chunk/client/baidubce/bos_storage_client.go +++ b/pkg/storage/chunk/client/baidubce/bos_storage_client.go @@ -79,7 +79,7 @@ func NewBOSObjectStorage(cfg *BOSStorageConfig) (*BOSObjectStorage, error) { }, nil } -func (b *BOSObjectStorage) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error { +func (b *BOSObjectStorage) PutObject(ctx context.Context, objectKey string, object io.Reader) error { return instrument.CollectedRequest(ctx, "BOS.PutObject", bosRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { body, err := bce.NewBodyFromSizedReader(object, -1) if err != nil { diff --git a/pkg/storage/chunk/client/cassandra/storage_client.go b/pkg/storage/chunk/client/cassandra/storage_client.go index 732491de2d..70551591f6 100644 --- a/pkg/storage/chunk/client/cassandra/storage_client.go +++ b/pkg/storage/chunk/client/cassandra/storage_client.go @@ -23,7 +23,6 @@ import ( "github.com/grafana/loki/v3/pkg/storage/chunk/client/util" "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores/series/index" - "github.com/grafana/loki/v3/pkg/storage/wal" util_log "github.com/grafana/loki/v3/pkg/util/log" ) @@ -544,6 +543,7 @@ func NewObjectClient(cfg Config, schemaCfg config.SchemaConfig, registerer prome } return client, nil } + func (s *ObjectClient) reconnectWriteSession() error { s.writeMtx.Lock() defer s.writeMtx.Unlock() @@ -568,10 +568,6 @@ func (s *ObjectClient) reconnectReadSession() error { return nil } -func (s *ObjectClient) PutWal(_ context.Context, _ *wal.SegmentWriter) error { - return errors.New("not implemented") -} - // PutChunks implements chunk.ObjectClient. func (s *ObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { err := s.putChunks(ctx, chunks) diff --git a/pkg/storage/chunk/client/client.go b/pkg/storage/chunk/client/client.go index 800086c661..36b65d40b6 100644 --- a/pkg/storage/chunk/client/client.go +++ b/pkg/storage/chunk/client/client.go @@ -6,7 +6,6 @@ import ( "github.com/grafana/loki/v3/pkg/storage/chunk" "github.com/grafana/loki/v3/pkg/storage/stores/series/index" - "github.com/grafana/loki/v3/pkg/storage/wal" ) var ( @@ -19,7 +18,6 @@ var ( // Client is for storing and retrieving chunks. type Client interface { Stop() - PutWal(ctx context.Context, writer *wal.SegmentWriter) error PutChunks(ctx context.Context, chunks []chunk.Chunk) error GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) DeleteChunk(ctx context.Context, userID, chunkID string) error diff --git a/pkg/storage/chunk/client/congestion/controller.go b/pkg/storage/chunk/client/congestion/controller.go index e7f29fab47..6a60a2ce7a 100644 --- a/pkg/storage/chunk/client/congestion/controller.go +++ b/pkg/storage/chunk/client/congestion/controller.go @@ -83,7 +83,7 @@ func (a *AIMDController) withLogger(logger log.Logger) Controller { return a } -func (a *AIMDController) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error { +func (a *AIMDController) PutObject(ctx context.Context, objectKey string, object io.Reader) error { return a.inner.PutObject(ctx, objectKey, object) } @@ -208,11 +208,12 @@ func NewNoopController(Config) *NoopController { return &NoopController{} } -func (n *NoopController) ObjectExists(context.Context, string) (bool, error) { return true, nil } -func (n *NoopController) PutObject(context.Context, string, io.ReadSeeker) error { return nil } +func (n *NoopController) ObjectExists(context.Context, string) (bool, error) { return true, nil } +func (n *NoopController) PutObject(context.Context, string, io.Reader) error { return nil } func (n *NoopController) GetObject(context.Context, string) (io.ReadCloser, int64, error) { return nil, 0, nil } + func (n *NoopController) List(context.Context, string, string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { return nil, nil, nil } @@ -226,14 +227,17 @@ func (n *NoopController) withLogger(logger log.Logger) Controller { n.logger = logger return n } + func (n *NoopController) withRetrier(r Retrier) Controller { n.retrier = r return n } + func (n *NoopController) withHedger(h Hedger) Controller { n.hedger = h return n } + func (n *NoopController) withMetrics(m *Metrics) Controller { n.metrics = m return n diff --git a/pkg/storage/chunk/client/congestion/controller_test.go b/pkg/storage/chunk/client/congestion/controller_test.go index 49edfe563a..23fa8a4196 100644 --- a/pkg/storage/chunk/client/congestion/controller_test.go +++ b/pkg/storage/chunk/client/congestion/controller_test.go @@ -247,7 +247,7 @@ type mockObjectClient struct { nonRetryableErrs bool } -func (m *mockObjectClient) PutObject(context.Context, string, io.ReadSeeker) error { +func (m *mockObjectClient) PutObject(context.Context, string, io.Reader) error { panic("not implemented") } diff --git a/pkg/storage/chunk/client/gcp/bigtable_object_client.go b/pkg/storage/chunk/client/gcp/bigtable_object_client.go index 992e4bff92..d878bc19bc 100644 --- a/pkg/storage/chunk/client/gcp/bigtable_object_client.go +++ b/pkg/storage/chunk/client/gcp/bigtable_object_client.go @@ -12,7 +12,6 @@ import ( "github.com/grafana/loki/v3/pkg/storage/chunk" "github.com/grafana/loki/v3/pkg/storage/chunk/client" "github.com/grafana/loki/v3/pkg/storage/config" - "github.com/grafana/loki/v3/pkg/storage/wal" "github.com/grafana/loki/v3/pkg/util/math" ) @@ -84,10 +83,6 @@ func (s *bigtableObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chu return nil } -func (s *bigtableObjectClient) PutWal(_ context.Context, _ *wal.SegmentWriter) error { - return errors.New("not implemented") -} - func (s *bigtableObjectClient) GetChunks(ctx context.Context, input []chunk.Chunk) ([]chunk.Chunk, error) { sp, ctx := ot.StartSpanFromContext(ctx, "GetChunks") defer sp.Finish() diff --git a/pkg/storage/chunk/client/gcp/gcs_object_client.go b/pkg/storage/chunk/client/gcp/gcs_object_client.go index 2f724e159a..57d26b334e 100644 --- a/pkg/storage/chunk/client/gcp/gcs_object_client.go +++ b/pkg/storage/chunk/client/gcp/gcs_object_client.go @@ -161,7 +161,7 @@ func (s *GCSObjectClient) getObject(ctx context.Context, objectKey string) (rc i } // PutObject puts the specified bytes into the configured GCS bucket at the provided key -func (s *GCSObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error { +func (s *GCSObjectClient) PutObject(ctx context.Context, objectKey string, object io.Reader) error { writer := s.defaultBucket.Object(objectKey).NewWriter(ctx) // Default GCSChunkSize is 8M and for each call, 8M is allocated xD // By setting it to 0, we just upload the object in a single a request diff --git a/pkg/storage/chunk/client/grpc/storage_client.go b/pkg/storage/chunk/client/grpc/storage_client.go index 8c1284ba1d..42ee00507e 100644 --- a/pkg/storage/chunk/client/grpc/storage_client.go +++ b/pkg/storage/chunk/client/grpc/storage_client.go @@ -9,7 +9,6 @@ import ( "github.com/grafana/loki/v3/pkg/storage/chunk" "github.com/grafana/loki/v3/pkg/storage/config" - "github.com/grafana/loki/v3/pkg/storage/wal" ) type StorageClient struct { @@ -67,10 +66,6 @@ func (s *StorageClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) err return nil } -func (s *StorageClient) PutWal(_ context.Context, _ *wal.SegmentWriter) error { - return errors.New("not implemented") -} - func (s *StorageClient) DeleteChunk(ctx context.Context, _, chunkID string) error { chunkInfo := &ChunkID{ChunkID: chunkID} _, err := s.client.DeleteChunks(ctx, chunkInfo) diff --git a/pkg/storage/chunk/client/ibmcloud/cos_object_client.go b/pkg/storage/chunk/client/ibmcloud/cos_object_client.go index c9d534ae41..6126df1b23 100644 --- a/pkg/storage/chunk/client/ibmcloud/cos_object_client.go +++ b/pkg/storage/chunk/client/ibmcloud/cos_object_client.go @@ -29,6 +29,7 @@ import ( "github.com/grafana/loki/v3/pkg/storage/chunk/client" "github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging" + "github.com/grafana/loki/v3/pkg/storage/chunk/client/util" "github.com/grafana/loki/v3/pkg/util/constants" "github.com/grafana/loki/v3/pkg/util/log" ) @@ -327,7 +328,6 @@ func (c *COSObjectClient) ObjectExists(ctx context.Context, objectKey string) (b }) return requestErr }) - if err != nil { return false, err } @@ -337,7 +337,6 @@ func (c *COSObjectClient) ObjectExists(ctx context.Context, objectKey string) (b // GetObject returns a reader and the size for the specified object key from the configured S3 bucket. func (c *COSObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) { - var resp *cos.GetObjectOutput // Map the key into a bucket @@ -370,15 +369,19 @@ func (c *COSObjectClient) GetObject(ctx context.Context, objectKey string) (io.R } // PutObject into the store -func (c *COSObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error { +func (c *COSObjectClient) PutObject(ctx context.Context, objectKey string, object io.Reader) error { return instrument.CollectedRequest(ctx, "COS.PutObject", cosRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + readSeeker, err := util.ReadSeeker(object) + if err != nil { + return err + } putObjectInput := &cos.PutObjectInput{ - Body: object, + Body: readSeeker, Bucket: ibm.String(c.bucketFromKey(objectKey)), Key: ibm.String(objectKey), } - _, err := c.cos.PutObjectWithContext(ctx, putObjectInput) + _, err = c.cos.PutObjectWithContext(ctx, putObjectInput) return err }) } diff --git a/pkg/storage/chunk/client/local/fs_object_client.go b/pkg/storage/chunk/client/local/fs_object_client.go index 41e911cb28..bde92c8302 100644 --- a/pkg/storage/chunk/client/local/fs_object_client.go +++ b/pkg/storage/chunk/client/local/fs_object_client.go @@ -89,7 +89,7 @@ func (f *FSObjectClient) GetObject(_ context.Context, objectKey string) (io.Read } // PutObject into the store -func (f *FSObjectClient) PutObject(_ context.Context, objectKey string, object io.ReadSeeker) error { +func (f *FSObjectClient) PutObject(_ context.Context, objectKey string, object io.Reader) error { fullPath := filepath.Join(f.cfg.Directory, filepath.FromSlash(objectKey)) err := util.EnsureDirectory(filepath.Dir(fullPath)) if err != nil { diff --git a/pkg/storage/chunk/client/metrics.go b/pkg/storage/chunk/client/metrics.go index 5e1ba5b418..76ca20a1ba 100644 --- a/pkg/storage/chunk/client/metrics.go +++ b/pkg/storage/chunk/client/metrics.go @@ -7,7 +7,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/grafana/loki/v3/pkg/storage/chunk" - "github.com/grafana/loki/v3/pkg/storage/wal" "github.com/grafana/loki/v3/pkg/util/constants" ) @@ -61,10 +60,6 @@ func (c MetricsChunkClient) Stop() { c.Client.Stop() } -func (c MetricsChunkClient) PutWal(ctx context.Context, writer *wal.SegmentWriter) error { - return c.Client.PutWal(ctx, writer) -} - func (c MetricsChunkClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { if err := c.Client.PutChunks(ctx, chunks); err != nil { return err diff --git a/pkg/storage/chunk/client/object_client.go b/pkg/storage/chunk/client/object_client.go index 7632647bd9..460c9566f6 100644 --- a/pkg/storage/chunk/client/object_client.go +++ b/pkg/storage/chunk/client/object_client.go @@ -3,26 +3,23 @@ package client import ( "bytes" "context" - "crypto/rand" "encoding/base64" "io" "strings" "time" - "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/grafana/loki/v3/pkg/storage/chunk" "github.com/grafana/loki/v3/pkg/storage/chunk/client/util" "github.com/grafana/loki/v3/pkg/storage/config" - "github.com/grafana/loki/v3/pkg/storage/wal" ) // ObjectClient is used to store arbitrary data in Object Store (S3/GCS/Azure/...) type ObjectClient interface { ObjectExists(ctx context.Context, objectKey string) (bool, error) - PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error + PutObject(ctx context.Context, objectKey string, object io.Reader) error // NOTE: The consumer of GetObject should always call the Close method when it is done reading which otherwise could cause a resource leak. GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) @@ -108,19 +105,6 @@ func (o *client) Stop() { o.store.Stop() } -func (o *client) PutWal(ctx context.Context, segment *wal.SegmentWriter) error { - reader, err := segment.ToReader() - if err != nil { - return err - } - defer func(reader io.ReadSeekCloser) { - _ = reader.Close() - }(reader) - - newUlid := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader) - return o.store.PutObject(ctx, "loki-v2/wal/anon/"+newUlid.String(), reader) -} - // PutChunks stores the provided chunks in the configured backend. If multiple errors are // returned, the last one sequentially will be propagated up. func (o *client) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { diff --git a/pkg/storage/chunk/client/openstack/swift_object_client.go b/pkg/storage/chunk/client/openstack/swift_object_client.go index 96b836b0a9..50ac21c12a 100644 --- a/pkg/storage/chunk/client/openstack/swift_object_client.go +++ b/pkg/storage/chunk/client/openstack/swift_object_client.go @@ -145,7 +145,7 @@ func (s *SwiftObjectClient) GetObject(_ context.Context, objectKey string) (io.R } // PutObject puts the specified bytes into the configured Swift container at the provided key -func (s *SwiftObjectClient) PutObject(_ context.Context, objectKey string, object io.ReadSeeker) error { +func (s *SwiftObjectClient) PutObject(_ context.Context, objectKey string, object io.Reader) error { _, err := s.conn.ObjectPut(s.cfg.ContainerName, objectKey, object, false, "", "", nil) return err } diff --git a/pkg/storage/chunk/client/prefixed_object_client.go b/pkg/storage/chunk/client/prefixed_object_client.go index aa792b21b9..1f887a64e2 100644 --- a/pkg/storage/chunk/client/prefixed_object_client.go +++ b/pkg/storage/chunk/client/prefixed_object_client.go @@ -15,7 +15,7 @@ func NewPrefixedObjectClient(downstreamClient ObjectClient, prefix string) Objec return PrefixedObjectClient{downstreamClient: downstreamClient, prefix: prefix} } -func (p PrefixedObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error { +func (p PrefixedObjectClient) PutObject(ctx context.Context, objectKey string, object io.Reader) error { return p.downstreamClient.PutObject(ctx, p.prefix+objectKey, object) } diff --git a/pkg/storage/chunk/client/testutils/inmemory_storage_client.go b/pkg/storage/chunk/client/testutils/inmemory_storage_client.go index 5f2a95da76..2056703522 100644 --- a/pkg/storage/chunk/client/testutils/inmemory_storage_client.go +++ b/pkg/storage/chunk/client/testutils/inmemory_storage_client.go @@ -426,7 +426,7 @@ func (m *InMemoryObjectClient) GetObject(_ context.Context, objectKey string) (i } // PutObject implements client.ObjectClient. -func (m *InMemoryObjectClient) PutObject(_ context.Context, objectKey string, object io.ReadSeeker) error { +func (m *InMemoryObjectClient) PutObject(_ context.Context, objectKey string, object io.Reader) error { buf, err := io.ReadAll(object) if err != nil { return err diff --git a/pkg/storage/chunk/client/util/reader.go b/pkg/storage/chunk/client/util/reader.go new file mode 100644 index 0000000000..2459b1e9ea --- /dev/null +++ b/pkg/storage/chunk/client/util/reader.go @@ -0,0 +1,17 @@ +package util + +import ( + "bytes" + "io" +) + +func ReadSeeker(r io.Reader) (io.ReadSeeker, error) { + if rs, ok := r.(io.ReadSeeker); ok { + return rs, nil + } + data, err := io.ReadAll(r) + if err != nil { + return nil, err + } + return bytes.NewReader(data), nil +} diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 2f5d88941f..db4a0a498e 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -7,7 +7,6 @@ import ( "time" "github.com/grafana/loki/v3/pkg/storage/types" - "github.com/grafana/loki/v3/pkg/storage/wal" "github.com/grafana/loki/v3/pkg/util/httpreq" lokilog "github.com/grafana/loki/v3/pkg/logql/log" @@ -609,7 +608,3 @@ func (f failingChunkWriter) Put(_ context.Context, _ []chunk.Chunk) error { func (f failingChunkWriter) PutOne(_ context.Context, _, _ model.Time, _ chunk.Chunk) error { return errWritingChunkUnsupported } - -func (f failingChunkWriter) PutWal(_ context.Context, _ *wal.SegmentWriter) error { - return errWritingChunkUnsupported -} diff --git a/pkg/storage/stores/composite_store.go b/pkg/storage/stores/composite_store.go index 484d8574f3..182a686c88 100644 --- a/pkg/storage/stores/composite_store.go +++ b/pkg/storage/stores/composite_store.go @@ -15,16 +15,10 @@ import ( "github.com/grafana/loki/v3/pkg/storage/stores/index/stats" tsdb_index "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding" - "github.com/grafana/loki/v3/pkg/storage/wal" "github.com/grafana/loki/v3/pkg/util" ) -type WalSegmentWriter interface { - PutWal(ctx context.Context, writer *wal.SegmentWriter) error -} - type ChunkWriter interface { - WalSegmentWriter Put(ctx context.Context, chunks []chunk.Chunk) error PutOne(ctx context.Context, from, through model.Time, chunk chunk.Chunk) error } @@ -51,7 +45,6 @@ type Store interface { ChunkWriter ChunkFetcher ChunkFetcherProvider - WalSegmentWriter Stop() } @@ -95,12 +88,6 @@ func (c *CompositeStore) Stores() []Store { return stores } -func (c CompositeStore) PutWal(ctx context.Context, writer *wal.SegmentWriter) error { - // TODO: Understand how to use the forStores method to correctly pick a store for this - err := c.stores[0].PutWal(ctx, writer) - return err -} - func (c CompositeStore) Put(ctx context.Context, chunks []chunk.Chunk) error { for _, chunk := range chunks { err := c.forStores(ctx, chunk.From, chunk.Through, func(innerCtx context.Context, from, through model.Time, store Store) error { @@ -210,10 +197,8 @@ func (c CompositeStore) Stats(ctx context.Context, userID string, from, through xs = append(xs, x) return err }) - if err != nil { return nil, err - } res := stats.MergeStats(xs...) return &res, err @@ -226,7 +211,6 @@ func (c CompositeStore) Volume(ctx context.Context, userID string, from, through volumes = append(volumes, volume) return err }) - if err != nil { return nil, err } @@ -254,7 +238,6 @@ func (c CompositeStore) GetShards( groups = append(groups, shards) return nil }) - if err != nil { return nil, err } diff --git a/pkg/storage/stores/composite_store_test.go b/pkg/storage/stores/composite_store_test.go index 064e19ca8b..90062add15 100644 --- a/pkg/storage/stores/composite_store_test.go +++ b/pkg/storage/stores/composite_store_test.go @@ -9,7 +9,6 @@ import ( "github.com/pkg/errors" "github.com/grafana/loki/v3/pkg/logproto" - "github.com/grafana/loki/v3/pkg/storage/wal" "github.com/grafana/dskit/test" "github.com/prometheus/common/model" @@ -24,10 +23,6 @@ import ( type mockStore int -func (m mockStore) PutWal(_ context.Context, _ *wal.SegmentWriter) error { - return nil -} - func (m mockStore) Put(_ context.Context, _ []chunk.Chunk) error { return nil } @@ -357,7 +352,6 @@ func TestVolume(t *testing.T) { require.Error(t, err, "something bad") require.Nil(t, volumes) }) - } func TestFilterForTimeRange(t *testing.T) { diff --git a/pkg/storage/stores/series_store_write.go b/pkg/storage/stores/series_store_write.go index 2b134472eb..a36ae4510b 100644 --- a/pkg/storage/stores/series_store_write.go +++ b/pkg/storage/stores/series_store_write.go @@ -13,7 +13,6 @@ import ( "github.com/grafana/loki/v3/pkg/storage/chunk/fetcher" "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores/index" - "github.com/grafana/loki/v3/pkg/storage/wal" "github.com/grafana/loki/v3/pkg/util/constants" "github.com/grafana/loki/v3/pkg/util/spanlogger" ) @@ -66,10 +65,6 @@ func (c *Writer) Put(ctx context.Context, chunks []chunk.Chunk) error { return nil } -func (c *Writer) PutWal(ctx context.Context, segment *wal.SegmentWriter) error { - return c.fetcher.Client().PutWal(ctx, segment) -} - // PutOne implements Store func (c *Writer) PutOne(ctx context.Context, from, through model.Time, chk chunk.Chunk) error { sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.PutOne") diff --git a/pkg/storage/stores/series_store_write_test.go b/pkg/storage/stores/series_store_write_test.go index 882fbaa009..f58ec1a730 100644 --- a/pkg/storage/stores/series_store_write_test.go +++ b/pkg/storage/stores/series_store_write_test.go @@ -13,7 +13,6 @@ import ( "github.com/grafana/loki/v3/pkg/storage/chunk" "github.com/grafana/loki/v3/pkg/storage/chunk/fetcher" "github.com/grafana/loki/v3/pkg/storage/config" - "github.com/grafana/loki/v3/pkg/storage/wal" ) type mockCache struct { @@ -56,10 +55,6 @@ type mockChunksClient struct { called int } -func (m *mockChunksClient) PutWal(_ context.Context, _ *wal.SegmentWriter) error { - return nil -} - func (m *mockChunksClient) PutChunks(_ context.Context, _ []chunk.Chunk) error { m.called++ return nil @@ -67,15 +62,19 @@ func (m *mockChunksClient) PutChunks(_ context.Context, _ []chunk.Chunk) error { func (m *mockChunksClient) Stop() { } + func (m *mockChunksClient) GetChunks(_ context.Context, _ []chunk.Chunk) ([]chunk.Chunk, error) { panic("GetChunks not implemented") } + func (m *mockChunksClient) DeleteChunk(_ context.Context, _, _ string) error { panic("DeleteChunk not implemented") } + func (m *mockChunksClient) IsChunkNotFoundErr(_ error) bool { panic("IsChunkNotFoundErr not implemented") } + func (m *mockChunksClient) IsRetryableErr(_ error) bool { panic("IsRetryableErr not implemented") } diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index b359132408..5ef02e74b1 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -26,7 +26,6 @@ import ( "github.com/grafana/loki/v3/pkg/storage/stores" index_stats "github.com/grafana/loki/v3/pkg/storage/stores/index/stats" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding" - "github.com/grafana/loki/v3/pkg/storage/wal" loki_util "github.com/grafana/loki/v3/pkg/util" "github.com/grafana/loki/v3/pkg/util/constants" util_log "github.com/grafana/loki/v3/pkg/util/log" @@ -186,8 +185,7 @@ func newMockChunkStore(chunkFormat byte, headfmt chunkenc.HeadBlockFmt, streams return &mockChunkStore{schemas: config.SchemaConfig{}, chunks: chunks, client: &mockChunkStoreClient{chunks: chunks, scfg: config.SchemaConfig{}}} } -func (m *mockChunkStore) PutWal(_ context.Context, _ *wal.SegmentWriter) error { return nil } -func (m *mockChunkStore) Put(_ context.Context, _ []chunk.Chunk) error { return nil } +func (m *mockChunkStore) Put(_ context.Context, _ []chunk.Chunk) error { return nil } func (m *mockChunkStore) PutOne(_ context.Context, _, _ model.Time, _ chunk.Chunk) error { return nil } @@ -294,7 +292,6 @@ func (m mockChunkStoreClient) Stop() { panic("implement me") } -func (m mockChunkStoreClient) PutWal(_ context.Context, _ *wal.SegmentWriter) error { return nil } func (m mockChunkStoreClient) PutChunks(_ context.Context, _ []chunk.Chunk) error { return nil } diff --git a/pkg/storage/wal/segment.go b/pkg/storage/wal/segment.go index fa8e42cc94..e9fb2d51ea 100644 --- a/pkg/storage/wal/segment.go +++ b/pkg/storage/wal/segment.go @@ -20,7 +20,6 @@ import ( "github.com/grafana/loki/v3/pkg/storage/wal/chunks" "github.com/grafana/loki/v3/pkg/storage/wal/index" "github.com/grafana/loki/v3/pkg/util/encoding" - "github.com/grafana/loki/v3/pkg/util/pool" ) // LOKW is the magic number for the Loki WAL format. @@ -35,10 +34,7 @@ var ( } }, } - - // 512kb - 20 mb - encodedWalSegmentBufferPool = pool.NewBuffer(512*1024, 20*1024*1024, 2) - tenantLabel = "__loki_tenant__" + tenantLabel = "__loki_tenant__" ) func init() { @@ -276,45 +272,30 @@ func (b *SegmentWriter) Reset() { b.inputSize.Store(0) } -func (b *SegmentWriter) ToReader() (io.ReadSeekCloser, error) { - // snappy compression rate is ~5x , but we can not predict it, so we need to allocate bigger buffer to avoid allocations - buffer := encodedWalSegmentBufferPool.Get(int(b.inputSize.Load() / 3)) - _, err := b.WriteTo(buffer) - if err != nil { - return nil, fmt.Errorf("failed to write segment to create a reader: %w", err) - } - return NewEncodedSegmentReader(buffer), nil -} - -var ( - _ io.ReadSeekCloser = &EncodedSegmentReader{} -) - type EncodedSegmentReader struct { - delegate io.ReadSeeker - encodedContent *bytes.Buffer + *io.PipeReader + *io.PipeWriter } -func NewEncodedSegmentReader(encodedContent *bytes.Buffer) *EncodedSegmentReader { - return &EncodedSegmentReader{ - encodedContent: encodedContent, - delegate: bytes.NewReader(encodedContent.Bytes()), +func (e *EncodedSegmentReader) Close() error { + err := e.PipeWriter.Close() + if err != nil { + return err } + err = e.PipeReader.Close() + if err != nil { + return err + } + return nil } -func (e *EncodedSegmentReader) Read(p []byte) (n int, err error) { - return e.delegate.Read(p) -} - -func (e *EncodedSegmentReader) Seek(offset int64, whence int) (int64, error) { - return e.delegate.Seek(offset, whence) -} - -func (e *EncodedSegmentReader) Close() error { - encodedWalSegmentBufferPool.Put(e.encodedContent) - e.encodedContent = nil - e.delegate = nil - return nil +func (b *SegmentWriter) Reader() io.ReadCloser { + pr, pw := io.Pipe() + go func() { + _, err := b.WriteTo(pw) + pw.CloseWithError(err) + }() + return &EncodedSegmentReader{PipeReader: pr, PipeWriter: pw} } // InputSize returns the total size of the input data written to the writer. diff --git a/pkg/storage/wal/segment_test.go b/pkg/storage/wal/segment_test.go index 0e14028bd0..beddd3d09f 100644 --- a/pkg/storage/wal/segment_test.go +++ b/pkg/storage/wal/segment_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "io" "sort" "sync" "testing" @@ -510,16 +511,15 @@ func BenchmarkWrites(b *testing.B) { } }) - bytesBuf := make([]byte, inputSize) + bytesBuf := make([]byte, encodedLength) b.Run("Reader", func(b *testing.B) { b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { var err error - reader, err := writer.ToReader() - require.NoError(b, err) + reader := writer.Reader() - n, err := reader.Read(bytesBuf) + n, err := io.ReadFull(reader, bytesBuf) require.NoError(b, err) require.EqualValues(b, encodedLength, n) require.NoError(b, reader.Close())