|
|
|
|
@ -110,12 +110,6 @@ func newCRC32() hash.Hash32 { |
|
|
|
|
return crc32.New(castagnoliTable) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type symbolCacheEntry struct { |
|
|
|
|
index uint32 |
|
|
|
|
lastValueIndex uint32 |
|
|
|
|
lastValue string |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type PostingsEncoder func(*encoding.Encbuf, []uint32) error |
|
|
|
|
|
|
|
|
|
type PostingsDecoder func(encoding.Decbuf) (int, Postings, error) |
|
|
|
|
@ -146,7 +140,7 @@ type Writer struct { |
|
|
|
|
symbols *Symbols |
|
|
|
|
symbolFile *fileutil.MmapFile |
|
|
|
|
lastSymbol string |
|
|
|
|
symbolCache map[string]symbolCacheEntry |
|
|
|
|
symbolCache map[string]uint32 // From symbol to index in table.
|
|
|
|
|
|
|
|
|
|
labelIndexes []labelIndexHashEntry // Label index offsets.
|
|
|
|
|
labelNames map[string]uint64 // Label names, and their usage.
|
|
|
|
|
@ -246,7 +240,7 @@ func NewWriterWithEncoder(ctx context.Context, fn string, encoder PostingsEncode |
|
|
|
|
buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, |
|
|
|
|
buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, |
|
|
|
|
|
|
|
|
|
symbolCache: make(map[string]symbolCacheEntry, 1<<8), |
|
|
|
|
symbolCache: make(map[string]uint32, 1<<16), |
|
|
|
|
labelNames: make(map[string]uint64, 1<<8), |
|
|
|
|
crc32: newCRC32(), |
|
|
|
|
postingsEncoder: encoder, |
|
|
|
|
@ -478,29 +472,16 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, chunks ... |
|
|
|
|
w.buf2.PutUvarint(lset.Len()) |
|
|
|
|
|
|
|
|
|
if err := lset.Validate(func(l labels.Label) error { |
|
|
|
|
var err error |
|
|
|
|
cacheEntry, ok := w.symbolCache[l.Name] |
|
|
|
|
nameIndex := cacheEntry.index |
|
|
|
|
nameIndex, ok := w.symbolCache[l.Name] |
|
|
|
|
if !ok { |
|
|
|
|
nameIndex, err = w.symbols.ReverseLookup(l.Name) |
|
|
|
|
if err != nil { |
|
|
|
|
return fmt.Errorf("symbol entry for %q does not exist, %w", l.Name, err) |
|
|
|
|
} |
|
|
|
|
return fmt.Errorf("symbol entry for %q does not exist", l.Name) |
|
|
|
|
} |
|
|
|
|
w.labelNames[l.Name]++ |
|
|
|
|
w.buf2.PutUvarint32(nameIndex) |
|
|
|
|
|
|
|
|
|
valueIndex := cacheEntry.lastValueIndex |
|
|
|
|
if !ok || cacheEntry.lastValue != l.Value { |
|
|
|
|
valueIndex, err = w.symbols.ReverseLookup(l.Value) |
|
|
|
|
if err != nil { |
|
|
|
|
return fmt.Errorf("symbol entry for %q does not exist, %w", l.Value, err) |
|
|
|
|
} |
|
|
|
|
w.symbolCache[l.Name] = symbolCacheEntry{ |
|
|
|
|
index: nameIndex, |
|
|
|
|
lastValueIndex: valueIndex, |
|
|
|
|
lastValue: l.Value, |
|
|
|
|
} |
|
|
|
|
valueIndex, ok := w.symbolCache[l.Value] |
|
|
|
|
if !ok { |
|
|
|
|
return fmt.Errorf("symbol entry for %q does not exist", l.Value) |
|
|
|
|
} |
|
|
|
|
w.buf2.PutUvarint32(valueIndex) |
|
|
|
|
return nil |
|
|
|
|
@ -559,6 +540,7 @@ func (w *Writer) AddSymbol(sym string) error { |
|
|
|
|
return fmt.Errorf("symbol %q out-of-order", sym) |
|
|
|
|
} |
|
|
|
|
w.lastSymbol = sym |
|
|
|
|
w.symbolCache[sym] = uint32(w.numSymbols) |
|
|
|
|
w.numSymbols++ |
|
|
|
|
w.buf1.Reset() |
|
|
|
|
w.buf1.PutUvarintStr(sym) |
|
|
|
|
@ -644,9 +626,9 @@ func (w *Writer) writeLabelIndices() error { |
|
|
|
|
values = values[:0] |
|
|
|
|
} |
|
|
|
|
current = name |
|
|
|
|
sid, err := w.symbols.ReverseLookup(value) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
sid, ok := w.symbolCache[value] |
|
|
|
|
if !ok { |
|
|
|
|
return fmt.Errorf("symbol entry for %q does not exist", string(value)) |
|
|
|
|
} |
|
|
|
|
values = append(values, sid) |
|
|
|
|
} |
|
|
|
|
@ -918,9 +900,9 @@ func (w *Writer) writePostingsToTmpFiles() error { |
|
|
|
|
|
|
|
|
|
nameSymbols := map[uint32]string{} |
|
|
|
|
for _, name := range batchNames { |
|
|
|
|
sid, err := w.symbols.ReverseLookup(name) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
sid, ok := w.symbolCache[name] |
|
|
|
|
if !ok { |
|
|
|
|
return fmt.Errorf("symbol entry for %q does not exist", name) |
|
|
|
|
} |
|
|
|
|
nameSymbols[sid] = name |
|
|
|
|
} |
|
|
|
|
@ -957,9 +939,9 @@ func (w *Writer) writePostingsToTmpFiles() error { |
|
|
|
|
|
|
|
|
|
for _, name := range batchNames { |
|
|
|
|
// Write out postings for this label name.
|
|
|
|
|
sid, err := w.symbols.ReverseLookup(name) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
sid, ok := w.symbolCache[name] |
|
|
|
|
if !ok { |
|
|
|
|
return fmt.Errorf("symbol entry for %q does not exist", name) |
|
|
|
|
} |
|
|
|
|
values := make([]uint32, 0, len(postings[sid])) |
|
|
|
|
for v := range postings[sid] { |
|
|
|
|
|