mirror of https://github.com/grafana/loki
feat: improve dataobj-inspect command (#18618)
parent
7c9003eae5
commit
238e0b7818
@ -0,0 +1,126 @@ |
||||
package main |
||||
|
||||
import ( |
||||
"context" |
||||
"errors" |
||||
"fmt" |
||||
"io" |
||||
"os" |
||||
|
||||
"github.com/alecthomas/kingpin/v2" |
||||
"github.com/fatih/color" |
||||
"github.com/prometheus/prometheus/model/labels" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/dataobj" |
||||
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs" |
||||
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams" |
||||
) |
||||
|
||||
// dumpCommand dumps the contents of the data object.
|
||||
type dumpCommand struct { |
||||
files *[]string |
||||
} |
||||
|
||||
func (cmd *dumpCommand) run(c *kingpin.ParseContext) error { |
||||
for _, f := range *cmd.files { |
||||
cmd.dumpFile(f) |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (cmd *dumpCommand) dumpFile(name string) { |
||||
f, err := os.Open(name) |
||||
if err != nil { |
||||
exitWithError(fmt.Errorf("failed to open file: %w", err)) |
||||
} |
||||
defer func() { _ = f.Close() }() |
||||
fi, err := f.Stat() |
||||
if err != nil { |
||||
exitWithError(fmt.Errorf("failed to read fileinfo: %w", err)) |
||||
} |
||||
dataObj, err := dataobj.FromReaderAt(f, fi.Size()) |
||||
if err != nil { |
||||
exitWithError(fmt.Errorf("failed to read dataobj: %w", err)) |
||||
} |
||||
for offset, sec := range dataObj.Sections() { |
||||
switch { |
||||
case streams.CheckSection(sec): |
||||
cmd.dumpStreamsSection(context.TODO(), offset, sec) |
||||
case logs.CheckSection(sec): |
||||
cmd.dumpLogsSection(context.TODO(), offset, sec) |
||||
default: |
||||
fmt.Printf("unknown section: %s\n", sec.Type) |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (cmd *dumpCommand) dumpStreamsSection(ctx context.Context, offset int, sec *dataobj.Section) { |
||||
streamsSec, err := streams.Open(ctx, sec) |
||||
if err != nil { |
||||
exitWithError(err) |
||||
} |
||||
bold := color.New(color.Bold) |
||||
bold.Println("Streams section:") |
||||
bold.Printf("\toffset: %d\n", offset) |
||||
|
||||
tmp := make([]streams.Stream, 512) |
||||
r := streams.NewRowReader(streamsSec) |
||||
for { |
||||
n, err := r.Read(ctx, tmp) |
||||
if err != nil && !errors.Is(err, io.EOF) { |
||||
exitWithError(err) |
||||
} |
||||
if n == 0 && errors.Is(err, io.EOF) { |
||||
return |
||||
} |
||||
for _, s := range tmp[:n] { |
||||
bold.Printf("\t\tid: %d, labels:\n", s.ID) |
||||
s.Labels.Range(func(l labels.Label) { |
||||
fmt.Printf("\t\t\t%s=%s\n", l.Name, l.Value) |
||||
}) |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (cmd *dumpCommand) dumpLogsSection(ctx context.Context, offset int, sec *dataobj.Section) { |
||||
logsSec, err := logs.Open(ctx, sec) |
||||
if err != nil { |
||||
exitWithError(err) |
||||
} |
||||
bold := color.New(color.Bold) |
||||
bold.Println("Logs section:") |
||||
bold.Printf("\toffset: %d\n", offset) |
||||
tmp := make([]logs.Record, 512) |
||||
r := logs.NewRowReader(logsSec) |
||||
for { |
||||
n, err := r.Read(ctx, tmp) |
||||
if err != nil && !errors.Is(err, io.EOF) { |
||||
exitWithError(err) |
||||
} |
||||
if n == 0 && errors.Is(err, io.EOF) { |
||||
return |
||||
} |
||||
for _, r := range tmp[0:n] { |
||||
bold.Printf("\t\tid: %d, timestamp: %s, metadata:\n", r.StreamID, r.Timestamp) |
||||
r.Metadata.Range(func(l labels.Label) { |
||||
fmt.Printf("\t\t\t%s=%s\n", l.Name, l.Value) |
||||
}) |
||||
if len(r.Line) > 0 { |
||||
bold.Printf("\t\t> ") |
||||
for pos, char := range string(r.Line) { |
||||
fmt.Printf("%c", char) |
||||
if pos > 0 && pos%100 == 0 { |
||||
bold.Printf("\n\t\t> ") |
||||
} |
||||
} |
||||
fmt.Println("") |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
func addDumpCommand(app *kingpin.Application) { |
||||
cmd := &dumpCommand{} |
||||
dump := app.Command("dump", "Dump the contents of the data object.").Action(cmd.run) |
||||
cmd.files = dump.Arg("file", "The file to dump.").ExistingFiles() |
||||
} |
||||
@ -0,0 +1,92 @@ |
||||
package main |
||||
|
||||
import ( |
||||
"context" |
||||
"errors" |
||||
"fmt" |
||||
"io" |
||||
"os" |
||||
"slices" |
||||
|
||||
"github.com/alecthomas/kingpin/v2" |
||||
"github.com/fatih/color" |
||||
"github.com/prometheus/prometheus/model/labels" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/dataobj" |
||||
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams" |
||||
) |
||||
|
||||
// listStreamsCommand lists the streams in the data object.
|
||||
type listStreamsCommand struct { |
||||
files *[]string |
||||
} |
||||
|
||||
func (cmd *listStreamsCommand) run(c *kingpin.ParseContext) error { |
||||
for _, f := range *cmd.files { |
||||
cmd.listStreamsInFile(f) |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (cmd *listStreamsCommand) listStreamsInFile(name string) { |
||||
f, err := os.Open(name) |
||||
if err != nil { |
||||
exitWithError(fmt.Errorf("failed to open file: %w", err)) |
||||
} |
||||
defer func() { _ = f.Close() }() |
||||
fi, err := f.Stat() |
||||
if err != nil { |
||||
exitWithError(fmt.Errorf("failed to read fileinfo: %w", err)) |
||||
} |
||||
dataObj, err := dataobj.FromReaderAt(f, fi.Size()) |
||||
if err != nil { |
||||
exitWithError(fmt.Errorf("failed to read dataobj: %w", err)) |
||||
} |
||||
cmd.listStreams(context.TODO(), dataObj) |
||||
} |
||||
|
||||
func (cmd *listStreamsCommand) listStreams(ctx context.Context, dataObj *dataobj.Object) { |
||||
var ( |
||||
result = make(map[int64]streams.Stream) |
||||
tmp = make([]streams.Stream, 512) |
||||
) |
||||
for _, sec := range dataObj.Sections() { |
||||
if streams.CheckSection(sec) { |
||||
streamsSec, err := streams.Open(ctx, sec) |
||||
if err != nil { |
||||
exitWithError(fmt.Errorf("failed to open streams section: %w", err)) |
||||
} |
||||
r := streams.NewRowReader(streamsSec) |
||||
for { |
||||
n, err := r.Read(ctx, tmp) |
||||
if err != nil && !errors.Is(err, io.EOF) { |
||||
exitWithError(err) |
||||
} |
||||
if n == 0 && errors.Is(err, io.EOF) { |
||||
break |
||||
} |
||||
for _, s := range tmp[:n] { |
||||
result[s.ID] = s |
||||
} |
||||
} |
||||
} |
||||
} |
||||
sorted := make([]int64, 0, len(result)) |
||||
for id := range result { |
||||
sorted = append(sorted, id) |
||||
} |
||||
slices.Sort(sorted) |
||||
bold := color.New(color.Bold) |
||||
for _, id := range sorted { |
||||
bold.Printf("id: %d, labels:\n", id) |
||||
result[id].Labels.Range(func(l labels.Label) { |
||||
fmt.Printf("\t%s=%s\n", l.Name, l.Value) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func addListStreamsCommand(app *kingpin.Application) { |
||||
cmd := &listStreamsCommand{} |
||||
dump := app.Command("list-streams", "Lists all streams in the data object.").Action(cmd.run) |
||||
cmd.files = dump.Arg("file", "The file to list.").ExistingFiles() |
||||
} |
||||
@ -1,34 +1,22 @@ |
||||
package main |
||||
|
||||
import ( |
||||
"flag" |
||||
"log" |
||||
"fmt" |
||||
"os" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/dataobj/tools" |
||||
"github.com/alecthomas/kingpin/v2" |
||||
) |
||||
|
||||
func main() { |
||||
flag.Parse() |
||||
|
||||
for _, f := range flag.Args() { |
||||
printFile(f) |
||||
} |
||||
func exitWithError(err error) { |
||||
fmt.Fprint(os.Stderr, err.Error()) |
||||
os.Exit(1) |
||||
} |
||||
|
||||
func printFile(filename string) { |
||||
f, err := os.Open(filename) |
||||
if err != nil { |
||||
log.Printf("%s: %v", filename, err) |
||||
return |
||||
} |
||||
defer func() { _ = f.Close() }() |
||||
|
||||
fi, err := f.Stat() |
||||
if err != nil { |
||||
log.Printf("%s: %v", filename, err) |
||||
return |
||||
} |
||||
|
||||
tools.Inspect(f, fi.Size()) |
||||
func main() { |
||||
app := kingpin.New("dataobj-inspect", "A command-line tool to inspect data objects.") |
||||
addDumpCommand(app) |
||||
addStatsCommand(app) |
||||
addListStreamsCommand(app) |
||||
addPrintStreamsCommand(app) |
||||
kingpin.MustParse(app.Parse(os.Args[1:])) |
||||
} |
||||
|
||||
@ -0,0 +1,86 @@ |
||||
package main |
||||
|
||||
import ( |
||||
"context" |
||||
"errors" |
||||
"fmt" |
||||
"io" |
||||
"os" |
||||
|
||||
"github.com/alecthomas/kingpin/v2" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/dataobj" |
||||
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs" |
||||
) |
||||
|
||||
// printStreamsCommand prints the streams in the data object.
|
||||
type printStreamsCommand struct { |
||||
files *[]string |
||||
streamIDs *[]int64 |
||||
} |
||||
|
||||
func (cmd *printStreamsCommand) run(c *kingpin.ParseContext) error { |
||||
for _, f := range *cmd.files { |
||||
cmd.printStreamsInFile(f) |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (cmd *printStreamsCommand) printStreamsInFile(name string) { |
||||
f, err := os.Open(name) |
||||
if err != nil { |
||||
exitWithError(fmt.Errorf("failed to open file: %w", err)) |
||||
} |
||||
defer func() { _ = f.Close() }() |
||||
fi, err := f.Stat() |
||||
if err != nil { |
||||
exitWithError(fmt.Errorf("failed to read fileinfo: %w", err)) |
||||
} |
||||
dataObj, err := dataobj.FromReaderAt(f, fi.Size()) |
||||
if err != nil { |
||||
exitWithError(fmt.Errorf("failed to read dataobj: %w", err)) |
||||
} |
||||
cmd.printStreams(context.TODO(), dataObj) |
||||
} |
||||
|
||||
func (cmd *printStreamsCommand) printStreams(ctx context.Context, dataObj *dataobj.Object) { |
||||
var ( |
||||
tmp = make([]logs.Record, 512) |
||||
printAll = false |
||||
printStreamIDs = make(map[int64]struct{}) |
||||
) |
||||
printAll = len(*cmd.streamIDs) == 0 |
||||
for _, id := range *cmd.streamIDs { |
||||
printStreamIDs[id] = struct{}{} |
||||
} |
||||
for _, sec := range dataObj.Sections() { |
||||
if logs.CheckSection(sec) { |
||||
logsSec, err := logs.Open(ctx, sec) |
||||
if err != nil { |
||||
exitWithError(fmt.Errorf("failed to open logs section: %w", err)) |
||||
} |
||||
r := logs.NewRowReader(logsSec) |
||||
for { |
||||
n, err := r.Read(ctx, tmp) |
||||
if err != nil && !errors.Is(err, io.EOF) { |
||||
exitWithError(err) |
||||
} |
||||
if n == 0 && errors.Is(err, io.EOF) { |
||||
break |
||||
} |
||||
for _, r := range tmp[:n] { |
||||
if _, ok := printStreamIDs[r.StreamID]; ok || printAll { |
||||
fmt.Printf("%s\n", string(r.Line)) |
||||
} |
||||
} |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
func addPrintStreamsCommand(app *kingpin.Application) { |
||||
cmd := &printStreamsCommand{} |
||||
dump := app.Command("print-streams", "Prints the streams in the data object.").Action(cmd.run) |
||||
cmd.streamIDs = dump.Flag("stream-id", "Print lines for these stream IDs.").Int64List() |
||||
cmd.files = dump.Arg("file", "The file to list.").ExistingFiles() |
||||
} |
||||
@ -0,0 +1,107 @@ |
||||
package main |
||||
|
||||
import ( |
||||
"context" |
||||
"errors" |
||||
"fmt" |
||||
"os" |
||||
|
||||
"github.com/alecthomas/kingpin/v2" |
||||
"github.com/dustin/go-humanize" |
||||
"github.com/fatih/color" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/dataobj" |
||||
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs" |
||||
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams" |
||||
) |
||||
|
||||
// statsCommand prints stats for each data object in files.
|
||||
type statsCommand struct { |
||||
files *[]string |
||||
} |
||||
|
||||
func (cmd *statsCommand) run(c *kingpin.ParseContext) error { |
||||
for _, f := range *cmd.files { |
||||
cmd.printStats(f) |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// summarizeFile prints stats for the data object file.
|
||||
func (cmd *statsCommand) printStats(name string) { |
||||
f, err := os.Open(name) |
||||
if err != nil { |
||||
exitWithError(fmt.Errorf("failed to open file: %w", err)) |
||||
} |
||||
defer func() { _ = f.Close() }() |
||||
fi, err := f.Stat() |
||||
if err != nil { |
||||
exitWithError(fmt.Errorf("failed to read fileinfo: %w", err)) |
||||
} |
||||
dataObj, err := dataobj.FromReaderAt(f, fi.Size()) |
||||
if err != nil { |
||||
exitWithError(fmt.Errorf("failed to read dataobj: %w", err)) |
||||
} |
||||
for offset, sec := range dataObj.Sections() { |
||||
switch { |
||||
case streams.CheckSection(sec): |
||||
cmd.printStreamsSectionStats(context.TODO(), offset, sec) |
||||
case logs.CheckSection(sec): |
||||
cmd.printLogsSectionStats(context.TODO(), offset, sec) |
||||
default: |
||||
exitWithError(errors.New("unknown section")) |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (cmd *statsCommand) printStreamsSectionStats(ctx context.Context, offset int, sec *dataobj.Section) { |
||||
streamsSec, err := streams.Open(ctx, sec) |
||||
if err != nil { |
||||
exitWithError(fmt.Errorf("failed to open streams section: %w", err)) |
||||
} |
||||
stats, err := streams.ReadStats(ctx, streamsSec) |
||||
if err != nil { |
||||
exitWithError(fmt.Errorf("failed to read section stats: %w", err)) |
||||
} |
||||
bold := color.New(color.Bold) |
||||
bold.Println("Streams section:") |
||||
bold.Printf( |
||||
"\toffset: %d, columns: %d, compressed size: %v; uncompressed size %v\n", |
||||
offset, |
||||
len(stats.Columns), |
||||
humanize.Bytes(stats.CompressedSize), |
||||
humanize.Bytes(stats.UncompressedSize), |
||||
) |
||||
for _, col := range stats.Columns { |
||||
fmt.Printf("\t\tname: %s, type: %v, %d populated rows, %v compressed (%v), %v uncompressed\n", col.Name, col.Type[12:], col.ValuesCount, humanize.Bytes(col.CompressedSize), col.Compression[17:], humanize.Bytes(col.UncompressedSize)) |
||||
} |
||||
} |
||||
|
||||
func (cmd *statsCommand) printLogsSectionStats(ctx context.Context, offset int, sec *dataobj.Section) { |
||||
logsSec, err := logs.Open(ctx, sec) |
||||
if err != nil { |
||||
exitWithError(fmt.Errorf("failed to open logs section: %w", err)) |
||||
} |
||||
stats, err := logs.ReadStats(ctx, logsSec) |
||||
if err != nil { |
||||
exitWithError(fmt.Errorf("failed to read section stats: %w", err)) |
||||
} |
||||
bold := color.New(color.Bold) |
||||
bold.Println("Logs section:") |
||||
bold.Printf( |
||||
"\toffset: %d, columns: %d, compressed size: %v; uncompressed size %v\n", |
||||
offset, |
||||
len(stats.Columns), |
||||
humanize.Bytes(stats.CompressedSize), |
||||
humanize.Bytes(stats.UncompressedSize), |
||||
) |
||||
for _, col := range stats.Columns { |
||||
fmt.Printf("\t\tname: %s, type: %v, %d populated rows, %v compressed (%v), %v uncompressed\n", col.Name, col.Type[12:], col.ValuesCount, humanize.Bytes(col.CompressedSize), col.Compression[17:], humanize.Bytes(col.UncompressedSize)) |
||||
} |
||||
} |
||||
|
||||
func addStatsCommand(app *kingpin.Application) { |
||||
cmd := &statsCommand{} |
||||
summary := app.Command("stats", "Print stats for the data object.").Action(cmd.run) |
||||
cmd.files = summary.Arg("file", "The file to print.").ExistingFiles() |
||||
} |
||||
Loading…
Reference in new issue