chore(dataobj): reduce total allocations while encoding (#15846)

dataobj-logs-sort
Robert Fratto 12 months ago committed by GitHub
parent bb5373db76
commit 2a60011b13
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      pkg/dataobj/internal/encoding/encoder.go
  2. 27
      pkg/dataobj/internal/encoding/encoder_logs.go
  3. 26
      pkg/dataobj/internal/encoding/encoder_streams.go

@ -162,6 +162,7 @@ func (enc *Encoder) append(data, metadata []byte) error {
enc.curSection.MetadataSize = uint32(len(metadata))
// bytes.Buffer.Write never fails.
enc.data.Grow(len(data) + len(metadata))
_, _ = enc.data.Write(data)
_, _ = enc.data.Write(metadata)

@ -153,6 +153,7 @@ func (enc *LogsEncoder) append(data, metadata []byte) error {
enc.curColumn.Info.MetadataSize = uint32(len(metadata))
// bytes.Buffer.Write never fails.
enc.data.Grow(len(data) + len(metadata))
_, _ = enc.data.Write(data)
_, _ = enc.data.Write(metadata)
@ -169,8 +170,11 @@ type LogsColumnEncoder struct {
startOffset int // Byte offset in the file where the column starts.
closed bool // true if LogsColumnEncoder has been closed.
data *bytes.Buffer // All page data.
pages []*logsmd.PageDesc
data *bytes.Buffer // All page data.
pageHeaders []*logsmd.PageDesc
memPages []*dataset.MemPage // Pages to write.
totalPageSize int // Size of bytes across all pages.
}
func newLogsColumnEncoder(parent *LogsEncoder, offset int) *LogsColumnEncoder {
@ -195,7 +199,7 @@ func (enc *LogsColumnEncoder) AppendPage(page *dataset.MemPage) error {
// It's possible the caller can pass an incorrect value for UncompressedSize
// and CompressedSize, but those fields are purely for stats so we don't
// check it.
enc.pages = append(enc.pages, &logsmd.PageDesc{
enc.pageHeaders = append(enc.pageHeaders, &logsmd.PageDesc{
Info: &datasetmd.PageInfo{
UncompressedSize: uint32(page.Info.UncompressedSize),
CompressedSize: uint32(page.Info.CompressedSize),
@ -204,14 +208,15 @@ func (enc *LogsColumnEncoder) AppendPage(page *dataset.MemPage) error {
ValuesCount: uint32(page.Info.ValuesCount),
Encoding: page.Info.Encoding,
DataOffset: uint32(enc.startOffset + enc.data.Len()),
DataOffset: uint32(enc.startOffset + enc.totalPageSize),
DataSize: uint32(len(page.Data)),
Statistics: page.Info.Stats,
},
})
_, _ = enc.data.Write(page.Data) // bytes.Buffer.Write never fails.
enc.memPages = append(enc.memPages, page)
enc.totalPageSize += len(page.Data)
return nil
}
@ -220,7 +225,7 @@ func (enc *LogsColumnEncoder) AppendPage(page *dataset.MemPage) error {
func (enc *LogsColumnEncoder) MetadataSize() int { return elementMetadataSize(enc) }
func (enc *LogsColumnEncoder) metadata() proto.Message {
return &logsmd.ColumnMetadata{Pages: enc.pages}
return &logsmd.ColumnMetadata{Pages: enc.pageHeaders}
}
// Commit closes the column, flushing all data to the parent element. After
@ -233,11 +238,18 @@ func (enc *LogsColumnEncoder) Commit() error {
defer bytesBufferPool.Put(enc.data)
if len(enc.pages) == 0 {
if len(enc.pageHeaders) == 0 {
// No data was written; discard.
return enc.parent.append(nil, nil)
}
// Write all pages. To avoid costly reallocations, we grow our buffer to fit
// all data first.
enc.data.Grow(enc.totalPageSize)
for _, p := range enc.memPages {
_, _ = enc.data.Write(p.Data) // bytes.Buffer.Write never fails.
}
metadataBuffer := bytesBufferPool.Get().(*bytes.Buffer)
metadataBuffer.Reset()
defer bytesBufferPool.Put(metadataBuffer)
@ -245,6 +257,7 @@ func (enc *LogsColumnEncoder) Commit() error {
if err := elementMetadataWrite(enc, metadataBuffer); err != nil {
return err
}
return enc.parent.append(enc.data.Bytes(), metadataBuffer.Bytes())
}

@ -153,6 +153,7 @@ func (enc *StreamsEncoder) append(data, metadata []byte) error {
enc.curColumn.Info.MetadataSize = uint32(len(metadata))
// bytes.Buffer.Write never fails.
enc.data.Grow(len(data) + len(metadata))
_, _ = enc.data.Write(data)
_, _ = enc.data.Write(metadata)
@ -169,8 +170,11 @@ type StreamsColumnEncoder struct {
startOffset int // Byte offset in the file where the column starts.
closed bool // true if StreamsColumnEncoder has been closed.
data *bytes.Buffer // All page data.
pages []*streamsmd.PageDesc
data *bytes.Buffer // All page data.
pageHeaders []*streamsmd.PageDesc
memPages []*dataset.MemPage // Pages to write.
totalPageSize int // Total size of all pages.
}
func newStreamsColumnEncoder(parent *StreamsEncoder, offset int) *StreamsColumnEncoder {
@ -195,7 +199,7 @@ func (enc *StreamsColumnEncoder) AppendPage(page *dataset.MemPage) error {
// It's possible the caller can pass an incorrect value for UncompressedSize
// and CompressedSize, but those fields are purely for stats so we don't
// check it.
enc.pages = append(enc.pages, &streamsmd.PageDesc{
enc.pageHeaders = append(enc.pageHeaders, &streamsmd.PageDesc{
Info: &datasetmd.PageInfo{
UncompressedSize: uint32(page.Info.UncompressedSize),
CompressedSize: uint32(page.Info.CompressedSize),
@ -204,14 +208,15 @@ func (enc *StreamsColumnEncoder) AppendPage(page *dataset.MemPage) error {
ValuesCount: uint32(page.Info.ValuesCount),
Encoding: page.Info.Encoding,
DataOffset: uint32(enc.startOffset + enc.data.Len()),
DataOffset: uint32(enc.startOffset + enc.totalPageSize),
DataSize: uint32(len(page.Data)),
Statistics: page.Info.Stats,
},
})
_, _ = enc.data.Write(page.Data) // bytes.Buffer.Write never fails.
enc.memPages = append(enc.memPages, page)
enc.totalPageSize += len(page.Data)
return nil
}
@ -220,7 +225,7 @@ func (enc *StreamsColumnEncoder) AppendPage(page *dataset.MemPage) error {
func (enc *StreamsColumnEncoder) MetadataSize() int { return elementMetadataSize(enc) }
func (enc *StreamsColumnEncoder) metadata() proto.Message {
return &streamsmd.ColumnMetadata{Pages: enc.pages}
return &streamsmd.ColumnMetadata{Pages: enc.pageHeaders}
}
// Commit closes the column, flushing all data to the parent element. After
@ -233,11 +238,18 @@ func (enc *StreamsColumnEncoder) Commit() error {
defer bytesBufferPool.Put(enc.data)
if len(enc.pages) == 0 {
if len(enc.pageHeaders) == 0 {
// No data was written; discard.
return enc.parent.append(nil, nil)
}
// Write all pages. To avoid costly reallocations, we grow our buffer to fit
// all data first.
enc.data.Grow(enc.totalPageSize)
for _, p := range enc.memPages {
_, _ = enc.data.Write(p.Data) // bytes.Buffer.Write never fails.
}
metadataBuffer := bytesBufferPool.Get().(*bytes.Buffer)
metadataBuffer.Reset()
defer bytesBufferPool.Put(metadataBuffer)

Loading…
Cancel
Save