fix(dataobj-inspect): Enhance dump and list-streams commands (#20757)

pull/20766/head
Bryan Boreham 4 weeks ago committed by GitHub
parent b5c7ea2dc2
commit cd3fd30a46
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 13
      cmd/dataobj-inspect/dump.go
  2. 2
      cmd/dataobj-inspect/go.mod
  3. 31
      cmd/dataobj-inspect/list_streams.go

@ -20,6 +20,8 @@ import (
type dumpCommand struct {
files *[]string
printLines *bool
tenant *string
streamID *int
}
func (cmd *dumpCommand) run(c *kingpin.ParseContext) error {
@ -44,6 +46,9 @@ func (cmd *dumpCommand) dumpFile(name string) {
exitWithErr(fmt.Errorf("failed to read dataobj: %w", err))
}
for offset, sec := range dataObj.Sections() {
if *cmd.tenant != "" && sec.Tenant != *cmd.tenant {
continue
}
switch {
case streams.CheckSection(sec):
cmd.dumpStreamsSection(context.TODO(), offset, sec)
@ -75,6 +80,9 @@ func (cmd *dumpCommand) dumpStreamsSection(ctx context.Context, offset int, sec
return
}
for _, s := range tmp[:n] {
if *cmd.streamID != 0 && *cmd.streamID != int(s.ID) {
continue
}
bold.Printf("\t\tid: %d, labels:\n", s.ID)
s.Labels.Range(func(l labels.Label) {
fmt.Printf("\t\t\t%s=%s\n", l.Name, l.Value)
@ -102,6 +110,9 @@ func (cmd *dumpCommand) dumpLogsSection(ctx context.Context, offset int, sec *da
return
}
for _, r := range tmp[0:n] {
if *cmd.streamID != 0 && *cmd.streamID != int(r.StreamID) {
continue
}
bold.Printf("\t\tid: %d, timestamp: %s, metadata:\n", r.StreamID, r.Timestamp)
r.Metadata.Range(func(l labels.Label) {
fmt.Printf("\t\t\t%s=%s\n", l.Name, l.Value)
@ -124,5 +135,7 @@ func addDumpCommand(app *kingpin.Application) {
cmd := &dumpCommand{}
dump := app.Command("dump", "Dump the contents of the data object.").Action(cmd.run)
cmd.printLines = dump.Flag("print-lines", "Prints the lines of each column.").Bool()
cmd.tenant = dump.Flag("tenant", "Which tenant to dump").String()
cmd.streamID = dump.Flag("stream", "Which stream ID to dump").Int()
cmd.files = dump.Arg("file", "The file to dump.").ExistingFiles()
}

@ -1,4 +1,4 @@
module github.com/grafana/loki/cmd/index
module github.com/grafana/loki/cmd/dataobj-inspect
go 1.25.5

@ -1,16 +1,18 @@
package main
import (
"cmp"
"context"
"errors"
"fmt"
"io"
"os"
"slices"
"strings"
"time"
"github.com/alecthomas/kingpin/v2"
"github.com/fatih/color"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
@ -18,7 +20,8 @@ import (
// listStreamsCommand lists the streams in the data object.
type listStreamsCommand struct {
files *[]string
files *[]string
tenant *string
}
func (cmd *listStreamsCommand) run(c *kingpin.ParseContext) error {
@ -46,11 +49,18 @@ func (cmd *listStreamsCommand) listStreamsInFile(name string) {
}
func (cmd *listStreamsCommand) listStreams(ctx context.Context, dataObj *dataobj.Object) {
type key struct {
tenant string
id int64
}
var (
result = make(map[int64]streams.Stream)
result = make(map[key]streams.Stream)
tmp = make([]streams.Stream, 512)
)
for _, sec := range dataObj.Sections() {
if *cmd.tenant != "" && sec.Tenant != *cmd.tenant {
continue
}
if streams.CheckSection(sec) {
streamsSec, err := streams.Open(ctx, sec)
if err != nil {
@ -66,22 +76,20 @@ func (cmd *listStreamsCommand) listStreams(ctx context.Context, dataObj *dataobj
break
}
for _, s := range tmp[:n] {
result[s.ID] = s
result[key{tenant: sec.Tenant, id: s.ID}] = s
}
}
}
}
sorted := make([]int64, 0, len(result))
sorted := make([]key, 0, len(result))
for id := range result {
sorted = append(sorted, id)
}
slices.Sort(sorted)
slices.SortFunc(sorted, func(a, b key) int { return cmp.Or(strings.Compare(a.tenant, b.tenant), int(a.id-b.id)) })
bold := color.New(color.Bold)
for _, id := range sorted {
bold.Printf("id: %d, labels:\n", id)
result[id].Labels.Range(func(l labels.Label) {
fmt.Printf("\t%s=%s\n", l.Name, l.Value)
})
for _, k := range sorted {
s := result[k]
bold.Printf("tenant: %s, id: %d, from: %s, to: %s, labels: %s\n", k.tenant, k.id, s.MinTimestamp.Format(time.DateTime), s.MaxTimestamp.Format(time.DateTime), s.Labels)
}
}
@ -89,4 +97,5 @@ func addListStreamsCommand(app *kingpin.Application) {
cmd := &listStreamsCommand{}
dump := app.Command("list-streams", "Lists all streams in the data object.").Action(cmd.run)
cmd.files = dump.Arg("file", "The file to list.").ExistingFiles()
cmd.tenant = dump.Flag("tenant", "Filter the list to a specific tenant").String()
}

Loading…
Cancel
Save