logcli: Add parallel flags (#8518)

pull/8548/head
Angus Dippenaar 3 years ago committed by GitHub
parent 4e4359e67c
commit 42522d794a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 87
      cmd/logcli/main.go
  3. 223
      docs/sources/tools/logcli.md
  4. 7
      pkg/logcli/output/default.go
  5. 8
      pkg/logcli/output/jsonl.go
  6. 1
      pkg/logcli/output/output.go
  7. 8
      pkg/logcli/output/raw.go
  8. 108
      pkg/logcli/query/part_file.go
  9. 237
      pkg/logcli/query/query.go
  10. 227
      pkg/logcli/query/query_test.go

@ -234,6 +234,7 @@ Check the history of the branch `release-2.7.x`.
#### Logcli
* [7325](https://github.com/grafana/loki/pull/7325) **dbirks**: Document setting up command completion
* [8518](https://github.com/grafana/loki/pull/8518) **SN9NV**: Add parallel flags
#### Fluent Bit

@ -76,7 +76,74 @@ data points between the start and end query time. This output is used to
build graphs, similar to what is seen in the Grafana Explore graph view.
If you are querying metrics and just want the most recent data point
(like what is seen in the Grafana Explore table view), then you should use
the "instant-query" command instead.`)
the "instant-query" command instead.
Parallelization:
You can download an unlimited number of logs in parallel, there are a few
flags which control this behaviour:
--parallel-duration
--parallel-max-workers
--part-path-prefix
--overwrite-completed-parts
--merge-parts
--keep-parts
Refer to the help for each flag for details about what each of them do.
Example:
logcli query
--timezone=UTC
--from="2021-01-19T10:00:00Z"
--to="2021-01-19T20:00:00Z"
--output=jsonl
--parallel-duration="15m"
--parallel-max-workers="4"
--part-path-prefix="/tmp/my_query"
--merge-parts
'my-query'
This example will create a queue of jobs to execute, each being 15 minutes in
duration. In this case, that means, for the 10-hour total duration, there will
be forty 15-minute jobs. The --limit flag is ignored.
It will start four workers, and they will each take a job to work on from the
queue until all the jobs have been completed.
Each job will save a "part" file to the location specified by the --part-path-prefix.
Different prefixes can be used to run multiple queries at the same time.
The timestamp of the start and end of the part is in the file name.
While the part is being downloaded, the filename will end in ".part", when it
is complete, the file will be renamed to remove this ".part" extension.
By default, if a completed part file is found, that part will not be downloaded
again. This can be overridden with the --overwrite-completed-parts flag.
Part file example using the previous command, adding --keep-parts so they are
not deleted:
Since we don't have the --forward flag, the parts will be downloaded in reverse.
Two of the workers have finished their jobs (last two files), and have picked
up the next jobs in the queue.
Running ls, this is what we should expect to see.
$ ls -1 /tmp/my_query*
/tmp/my_query_20210119T183000_20210119T184500.part.tmp
/tmp/my_query_20210119T184500_20210119T190000.part.tmp
/tmp/my_query_20210119T190000_20210119T191500.part.tmp
/tmp/my_query_20210119T191500_20210119T193000.part.tmp
/tmp/my_query_20210119T193000_20210119T194500.part
/tmp/my_query_20210119T194500_20210119T200000.part
If you do not specify the --merge-parts flag, the part files will be
downloaded, and logcli will exit, and you can process the files as you wish.
With the flag specified, the part files will be read in order, and the output
printed to the terminal. The lines will be printed as soon as the next part is
complete, you don't have to wait for all the parts to download before getting
output. The --merge-parts flag will remove the part files when it is done
reading each of them. To change this, you can use the --keep-parts flag, and
the part files will not be removed.`)
rangeQuery = newQuery(false, queryCmd)
tail = queryCmd.Flag("tail", "Tail the logs").Short('t').Default("false").Bool()
follow = queryCmd.Flag("follow", "Alias for --tail").Short('f').Default("false").Bool()
@ -193,8 +260,12 @@ func main() {
if *tail || *follow {
rangeQuery.TailQuery(time.Duration(*delayFor)*time.Second, queryClient, out)
} else {
} else if rangeQuery.ParallelMaxWorkers == 1 {
rangeQuery.DoQuery(queryClient, out, *statistics)
} else {
// `--limit` doesn't make sense when using parallelism.
rangeQuery.Limit = 0
rangeQuery.DoQueryParallel(queryClient, out, *statistics)
}
case instantQueryCmd.FullCommand():
location, err := time.LoadLocation(*timezone)
@ -359,6 +430,11 @@ func newQuery(instant bool, cmd *kingpin.CmdClause) *query.Query {
q.End = mustParse(to, defaultEnd)
}
q.Quiet = *quiet
if q.ParallelMaxWorkers < 1 {
return fmt.Errorf("parallel-max-workers must be greater than 0")
}
return nil
})
@ -374,7 +450,12 @@ func newQuery(instant bool, cmd *kingpin.CmdClause) *query.Query {
cmd.Flag("step", "Query resolution step width, for metric queries. Evaluate the query at the specified step over the time range.").DurationVar(&q.Step)
cmd.Flag("interval", "Query interval, for log queries. Return entries at the specified interval, ignoring those between. **This parameter is experimental, please see Issue 1779**").DurationVar(&q.Interval)
cmd.Flag("batch", "Query batch size to use until 'limit' is reached").Default("1000").IntVar(&q.BatchSize)
cmd.Flag("parallel-duration", "Split the range into jobs of this length to download the logs in parallel. This will result in the logs being out of order. Use --part-path-prefix to create a file per job to maintain ordering.").Default("1h").DurationVar(&q.ParallelDuration)
cmd.Flag("parallel-max-workers", "Max number of workers to start up for parallel jobs. A value of 1 will not create any parallel workers. When using parallel workers, limit is ignored.").Default("1").IntVar(&q.ParallelMaxWorkers)
cmd.Flag("part-path-prefix", "When set, each server response will be saved to a file with this prefix. Creates files in the format: 'prefix-utc_start-utc_end.part'. Intended to be used with the parallel-* flags so that you can combine the files to maintain ordering based on the filename. Default is to write to stdout.").StringVar(&q.PartPathPrefix)
cmd.Flag("overwrite-completed-parts", "Overwrites completed part files. This will download the range again, and replace the original completed part file. Default will skip a range if it's part file is already downloaded.").Default("false").BoolVar(&q.OverwriteCompleted)
cmd.Flag("merge-parts", "Reads the part files in order and writes the output to stdout. Original part files will be deleted with this option.").Default("false").BoolVar(&q.MergeParts)
cmd.Flag("keep-parts", "Overrides the default behaviour of --merge-parts which will delete the part files once all the files have been read. This option will keep the part files.").Default("false").BoolVar(&q.KeepParts)
}
cmd.Flag("forward", "Scan forwards through logs.").Default("false").BoolVar(&q.Forward)

@ -250,26 +250,21 @@ usage: logcli query [<flags>] <query>
Run a LogQL query.
The "query" command is useful for querying for logs. Logs can be returned in a
few output modes:
The "query" command is useful for querying for logs. Logs can be returned in a few output modes:
raw: log line
default: log timestamp + log labels + log line
jsonl: JSON response from Loki API of log line
The output of the log can be specified with the "-o" flag, for example, "-o raw"
for the raw output format.
The output of the log can be specified with the "-o" flag, for example, "-o raw" for the raw output format.
The "query" command will output extra information about the query and its
results, such as the API URL, set of common labels, and set of excluded labels.
This extra information can be suppressed with the --quiet flag.
The "query" command will output extra information about the query and its results, such as the API URL, set of common labels, and set of excluded labels. This extra information can be
suppressed with the --quiet flag.
By default we look over the last hour of data; use --since to modify or provide
specific start and end times with --from and --to respectively.
By default we look over the last hour of data; use --since to modify or provide specific start and end times with --from and --to respectively.
Notice that when using --from and --to then ensure to use RFC3339Nano time
format, but without timezone at the end. The local timezone will be added
automatically or if using --timezone flag.
Notice that when using --from and --to then ensure to use RFC3339Nano time format, but without timezone at the end. The local timezone will be added automatically or if using --timezone
flag.
Example:
@ -282,101 +277,119 @@ Example:
The output is limited to 30 entries by default; use --limit to increase.
While "query" does support metrics queries, its output contains multiple data
points between the start and end query time. This output is used to build
graphs, similar to what is seen in the Grafana Explore graph view. If you are
querying metrics and just want the most recent data point (like what is seen in
the Grafana Explore table view), then you should use the "instant-query" command
instead.
While "query" does support metrics queries, its output contains multiple data points between the start and end query time. This output is used to build graphs, similar to what is seen
in the Grafana Explore graph view. If you are querying metrics and just want the most recent data point (like what is seen in the Grafana Explore table view), then you should use the
"instant-query" command instead.
Parallelization:
You can download an unlimited number of logs in parallel, there are a few flags which control this behaviour:
--parallel-duration
--parallel-max-workers
--part-path-prefix
--overwrite-completed-parts
--merge-parts
--keep-parts
Refer to the help for each flag for details about what each of them do.
Example:
logcli query
--timezone=UTC
--from="2021-01-19T10:00:00Z"
--to="2021-01-19T20:00:00Z"
--output=jsonl
--parallel-duration="15m"
--parallel-max-workers="4"
--part-path-prefix="/tmp/my_query"
--merge-parts
'my-query'
This example will create a queue of jobs to execute, each being 15 minutes in duration. In this case, that means, for the 10-hour total duration, there will be forty 15-minute jobs.
The --limit flag is ignored.
It will start four workers, and they will each take a job to work on from the queue until all the jobs have been completed.
Each job will save a "part" file to the location specified by the --part-path-prefix. Different prefixes can be used to run multiple queries at the same time. The timestamp of the start and
end of the part is in the file name. While the part is being downloaded, the filename will end in ".part", when it is complete, the file will be renamed to remove this ".part" extension.
By default, if a completed part file is found, that part will not be downloaded again. This can be overridden with the --overwrite-completed-parts flag.
Part file example using the previous command, adding --keep-parts so they are not deleted:
Since we don't have the --forward flag, the parts will be downloaded in reverse. Two of the workers have finished their jobs (last two files), and have picked up the next jobs in the queue.
Running ls, this is what we should expect to see.
$ ls -1 /tmp/my_query* /tmp/my_query_20210119T183000_20210119T184500.part.tmp /tmp/my_query_20210119T184500_20210119T190000.part.tmp /tmp/my_query_20210119T190000_20210119T191500.part.tmp
/tmp/my_query_20210119T191500_20210119T193000.part.tmp /tmp/my_query_20210119T193000_20210119T194500.part /tmp/my_query_20210119T194500_20210119T200000.part
If you do not specify the --merge-parts flag, the part files will be downloaded, and logcli will exit, and you can process the files as you wish. With the flag specified, the part files
will be read in order, and the output printed to the terminal. The lines will be printed as soon as the next part is complete, you don't have to wait for all the parts to download before
getting output. The --merge-parts flag will remove the part files when it is done reading each of them. To change this, you can use the --keep-parts flag, and the part files will not be
removed.
Flags:
--help Show context-sensitive help (also try --help-long
and --help-man).
--version Show application version.
-q, --quiet Suppress query metadata
--stats Show query statistics
-o, --output=default Specify output mode [default, raw, jsonl]. raw
suppresses log labels and timestamp.
-z, --timezone=Local Specify the timezone to use when formatting output
timestamps [Local, UTC]
--cpuprofile="" Specify the location for writing a CPU profile.
--memprofile="" Specify the location for writing a memory profile.
--stdin Take input logs from stdin
--addr="http://localhost:3100"
Server address. Can also be set using LOKI_ADDR
env var.
--username="" Username for HTTP basic auth. Can also be set
using LOKI_USERNAME env var.
--password="" Password for HTTP basic auth. Can also be set
using LOKI_PASSWORD env var.
--ca-cert="" Path to the server Certificate Authority. Can also
be set using LOKI_CA_CERT_PATH env var.
--tls-skip-verify Server certificate TLS skip verify. Can also be
set using LOKI_TLS_SKIP_VERIFY env var.
--cert="" Path to the client certificate. Can also be set
using LOKI_CLIENT_CERT_PATH env var.
--key="" Path to the client certificate key. Can also be
set using LOKI_CLIENT_KEY_PATH env var.
--org-id="" adds X-Scope-OrgID to API requests for
representing tenant ID. Useful for requesting
tenant data when bypassing an auth gateway. Can
also be set using LOKI_ORG_ID env var.
--query-tags="" adds X-Query-Tags http header to API requests.
This header value will be part of `metrics.go`
statistics. Useful for tracking the query. Can
also be set using LOKI_QUERY_TAGS env var.
--bearer-token="" adds the Authorization header to API requests for
authentication purposes. Can also be set using
LOKI_BEARER_TOKEN env var.
--bearer-token-file="" adds the Authorization header to API requests for
authentication purposes. Can also be set using
LOKI_BEARER_TOKEN_FILE env var.
--retries=0 How many times to retry each query when getting an
error response from Loki. Can also be set using
LOKI_CLIENT_RETRIES env var.
--min-backoff=0 Minimum backoff time between retries. Can also be
set using LOKI_CLIENT_MIN_BACKOFF env var.
--max-backoff=0 Maximum backoff time between retries. Can also be
set using LOKI_CLIENT_MAX_BACKOFF env var.
--auth-header="Authorization"
The authorization header used. Can also be set
using LOKI_AUTH_HEADER env var.
--proxy-url="" The http or https proxy to use when making
requests. Can also be set using
LOKI_HTTP_PROXY_URL env var.
--limit=30 Limit on number of entries to print.
--since=1h Lookback window.
--from=FROM Start looking for logs at this absolute time
(inclusive)
--to=TO Stop looking for logs at this absolute time
(exclusive)
--step=STEP Query resolution step width, for metric queries.
Evaluate the query at the specified step over the
time range.
--interval=INTERVAL Query interval, for log queries. Return entries at
the specified interval, ignoring those between.
**This parameter is experimental, please see Issue
1779**
--batch=1000 Query batch size to use until 'limit' is reached
--forward Scan forwards through logs.
--no-labels Do not print any labels
--exclude-label=EXCLUDE-LABEL ...
Exclude labels given the provided key during
output.
--include-label=INCLUDE-LABEL ...
Include labels given the provided key during
output.
--labels-length=0 Set a fixed padding to labels
--store-config="" Execute the current query using a configured
storage from a given Loki configuration file.
--remote-schema Execute the current query using a remote schema
retrieved using the configured storage in the
given Loki configuration file.
--colored-output Show output with colored labels
-t, --tail Tail the logs
-f, --follow Alias for --tail
--delay-for=0 Delay in tailing by number of seconds to
accumulate logs for re-ordering
--help Show context-sensitive help (also try --help-long and --help-man).
--version Show application version.
-q, --quiet Suppress query metadata
--stats Show query statistics
-o, --output=default Specify output mode [default, raw, jsonl]. raw suppresses log labels and timestamp.
-z, --timezone=Local Specify the timezone to use when formatting output timestamps [Local, UTC]
--cpuprofile="" Specify the location for writing a CPU profile.
--memprofile="" Specify the location for writing a memory profile.
--stdin Take input logs from stdin
--addr="http://localhost:3100"
Server address. Can also be set using LOKI_ADDR env var.
--username="" Username for HTTP basic auth. Can also be set using LOKI_USERNAME env var.
--password="" Password for HTTP basic auth. Can also be set using LOKI_PASSWORD env var.
--ca-cert="" Path to the server Certificate Authority. Can also be set using LOKI_CA_CERT_PATH env var.
--tls-skip-verify Server certificate TLS skip verify. Can also be set using LOKI_TLS_SKIP_VERIFY env var.
--cert="" Path to the client certificate. Can also be set using LOKI_CLIENT_CERT_PATH env var.
--key="" Path to the client certificate key. Can also be set using LOKI_CLIENT_KEY_PATH env var.
--org-id="" adds X-Scope-OrgID to API requests for representing tenant ID. Useful for requesting tenant data when bypassing an auth gateway. Can also be set using
LOKI_ORG_ID env var.
--query-tags="" adds X-Query-Tags http header to API requests. This header value will be part of `metrics.go` statistics. Useful for tracking the query. Can also be set
using LOKI_QUERY_TAGS env var.
--bearer-token="" adds the Authorization header to API requests for authentication purposes. Can also be set using LOKI_BEARER_TOKEN env var.
--bearer-token-file="" adds the Authorization header to API requests for authentication purposes. Can also be set using LOKI_BEARER_TOKEN_FILE env var.
--retries=0 How many times to retry each query when getting an error response from Loki. Can also be set using LOKI_CLIENT_RETRIES env var.
--min-backoff=0 Minimum backoff time between retries. Can also be set using LOKI_CLIENT_MIN_BACKOFF env var.
--max-backoff=0 Maximum backoff time between retries. Can also be set using LOKI_CLIENT_MAX_BACKOFF env var.
--auth-header="Authorization"
The authorization header used. Can also be set using LOKI_AUTH_HEADER env var.
--proxy-url="" The http or https proxy to use when making requests. Can also be set using LOKI_HTTP_PROXY_URL env var.
--limit=30 Limit on number of entries to print. Setting it to 0 will fetch all entries.
--since=1h Lookback window.
--from=FROM Start looking for logs at this absolute time (inclusive)
--to=TO Stop looking for logs at this absolute time (exclusive)
--step=STEP Query resolution step width, for metric queries. Evaluate the query at the specified step over the time range.
--interval=INTERVAL Query interval, for log queries. Return entries at the specified interval, ignoring those between. **This parameter is experimental, please see Issue 1779**
--batch=1000 Query batch size to use until 'limit' is reached
--parallel-duration=1h Split the range into jobs of this length to download the logs in parallel. This will result in the logs being out of order. Use --part-path-prefix to create
a file per job to maintain ordering.
--parallel-max-workers=1 Max number of workers to start up for parallel jobs. A value of 1 will not create any parallel workers. When using parallel workers, limit is ignored.
--part-path-prefix=PART-PATH-PREFIX
When set, each server response will be saved to a file with this prefix. Creates files in the format: 'prefix-utc_start-utc_end.part'. Intended to be used
with the parallel-* flags so that you can combine the files to maintain ordering based on the filename. Default is to write to stdout.
--overwrite-completed-parts
Overwrites completed part files. This will download the range again, and replace the original completed part file. Default will skip a range if it's part
file is already downloaded.
--merge-parts Reads the part files in order and writes the output to stdout. Original part files will be deleted with this option.
--keep-parts Overrides the default behaviour of --merge-parts which will delete the part files once all the files have been read. This option will keep the part files.
--forward Scan forwards through logs.
--no-labels Do not print any labels
--exclude-label=EXCLUDE-LABEL ...
Exclude labels given the provided key during output.
--include-label=INCLUDE-LABEL ...
Include labels given the provided key during output.
--labels-length=0 Set a fixed padding to labels
--store-config="" Execute the current query using a configured storage from a given Loki configuration file.
--remote-schema Execute the current query using a remote schema retrieved using the configured storage in the given Loki configuration file.
--colored-output Show output with colored labels
-t, --tail Tail the logs
-f, --follow Alias for --tail
--delay-for=0 Delay in tailing by number of seconds to accumulate logs for re-ordering
Args:
<query> eg '{foo="bar",baz=~".*blip"} |~ ".*error.*"'

@ -32,7 +32,14 @@ func (o *DefaultOutput) FormatAndPrintln(ts time.Time, lbls loghttp.LabelSet, ma
} else {
fmt.Fprintf(o.w, "%s %s %s\n", color.BlueString(timestamp), color.RedString(padLabel(lbls, maxLabelsLen)), line)
}
}
// WithWriter returns a copy of the LogOutput with the writer set to the given writer
func (o DefaultOutput) WithWriter(w io.Writer) LogOutput {
return &DefaultOutput{
w: w,
options: o.options,
}
}
// add some padding after labels

@ -35,3 +35,11 @@ func (o *JSONLOutput) FormatAndPrintln(ts time.Time, lbls loghttp.LabelSet, maxL
fmt.Fprintln(o.w, string(out))
}
// WithWriter returns a copy of the LogOutput with the writer set to the given writer
func (o JSONLOutput) WithWriter(w io.Writer) LogOutput {
return &JSONLOutput{
w: w,
options: o.options,
}
}

@ -29,6 +29,7 @@ var colorList = []*color.Color{
// LogOutput is the interface any output mode must implement
type LogOutput interface {
FormatAndPrintln(ts time.Time, lbls loghttp.LabelSet, maxLabelsLen int, line string)
WithWriter(w io.Writer) LogOutput
}
// LogOutputOptions defines options supported by LogOutput

@ -28,3 +28,11 @@ func (o *RawOutput) FormatAndPrintln(ts time.Time, lbls loghttp.LabelSet, maxLab
}
fmt.Fprintln(o.w, line)
}
// WithWriter returns a copy of the LogOutput with the writer set to the given writer
func (o RawOutput) WithWriter(w io.Writer) LogOutput {
return &RawOutput{
w: w,
options: o.options,
}
}

@ -0,0 +1,108 @@
package query
import (
"errors"
"fmt"
"os"
"sync"
)
// PartFile partially complete file.
// Expected usage:
// 1. Create the temp file: CreateTemp
// 2. Write the data to the file
// 3. When you're done, call Finalize and the temp file will be closed and renamed
type PartFile struct {
finalName string
fd *os.File
lock sync.Mutex
}
// NewPartFile initializes a new partial file object, setting the filename
// which will be used when the file is closed with the Finalize function.
//
// This will not create the file, call CreateTemp to create the file.
func NewPartFile(filename string) *PartFile {
return &PartFile{
finalName: filename,
}
}
// Exists checks if the completed file exists.
func (f *PartFile) Exists() (bool, error) {
if _, err := os.Stat(f.finalName); err == nil {
// No error means file exits, and we can stat it.
return true, nil
} else if errors.Is(err, os.ErrNotExist) {
// File does not exist.
return false, nil
} else {
// Unclear if file exists or not, we cannot stat it.
return false, fmt.Errorf("failed to check if part file exists: %s: %s", f.finalName, err)
}
}
// CreateTempFile creates the temp file to store the data before Finalize is called.
func (f *PartFile) CreateTempFile() error {
f.lock.Lock()
defer f.lock.Unlock()
tmpName := f.finalName + ".tmp"
fd, err := os.Create(tmpName)
if err != nil {
return fmt.Errorf("Failed to create part file: %s: %s", tmpName, err)
}
f.fd = fd
return nil
}
// Write to the temporary file.
func (f *PartFile) Write(b []byte) (int, error) {
return f.fd.Write(b)
}
// Close closes the temporary file.
// Double close is handled gracefully without error so that Close can be deferred for errors,
// and is also called when Finalize is called.
func (f *PartFile) Close() error {
f.lock.Lock()
defer f.lock.Unlock()
// Prevent double close
if f.fd == nil {
return nil
}
filename := f.fd.Name()
if err := f.fd.Sync(); err != nil {
return fmt.Errorf("failed to fsync part file: %s: %s", filename, err)
}
if err := f.fd.Close(); err != nil {
return fmt.Errorf("filed to close part file: %s: %s", filename, err)
}
f.fd = nil
return nil
}
// Finalize closes the temporary file, and renames it to the final file name.
func (f *PartFile) Finalize() error {
tmpFileName := f.fd.Name()
if err := f.Close(); err != nil {
return fmt.Errorf("failed to close part file: %s: %s", tmpFileName, err)
}
f.lock.Lock()
defer f.lock.Unlock()
if err := os.Rename(tmpFileName, f.finalName); err != nil {
return fmt.Errorf("failed to rename part file: %s: %s", tmpFileName, err)
}
return nil
}

@ -5,10 +5,12 @@ import (
"errors"
"flag"
"fmt"
"io"
"log"
"os"
"sort"
"strings"
"sync"
"text/tabwriter"
"time"
@ -62,6 +64,31 @@ type Query struct {
ColoredOutput bool
LocalConfig string
FetchSchemaFromStorage bool
// Parallelization parameters.
// The duration of each part/job.
ParallelDuration time.Duration
// Number of workers to start.
ParallelMaxWorkers int
// Path prefix of the name for each part file.
// The idea for this is to allow the user to download many different queries at the same
// time, and/or give a directory for the part files to be placed.
PartPathPrefix string
// By default (false value), if the part file has finished downloading, and another job with
// the same filename is run, it will skip the completed files. This will remove the completed
// files as each worker gets to that part file, so the part will be downloaded again.
OverwriteCompleted bool
// If true, the part files will be read in order, and the data will be output to stdout.
MergeParts bool
// If MergeParts is false, this parameter has no effect, part files will be kept.
// Otherwise, if this is true, the part files will not be deleted once they have been merged.
KeepParts bool
}
// DoQuery executes the query and prints out the results
@ -82,6 +109,24 @@ func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool)
var resp *loghttp.QueryResponse
var err error
var partFile *PartFile
if q.PartPathPrefix != "" {
var shouldSkip bool
partFile, shouldSkip = q.createPartFile()
// createPartFile will return true if the part file exists and
// OverwriteCompleted is false, therefor, we should exit the function
// here because we have nothing to do.
if shouldSkip {
return
}
}
if partFile != nil {
defer partFile.Close()
out = out.WithWriter(partFile)
}
if q.isInstant() {
resp, err = c.Query(q.QueryString, q.Limit, q.Start, d, q.Quiet)
if err != nil {
@ -160,11 +205,203 @@ func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool)
// fudge the timestamp forward in time to make sure to get the last entry from this batch in the next query
end = lastEntry[0].Timestamp.Add(1 * time.Nanosecond)
}
}
}
if partFile != nil {
if err := partFile.Finalize(); err != nil {
log.Fatalln(err)
}
}
}
func (q *Query) outputFilename() string {
return fmt.Sprintf(
"%s_%s_%s.part",
q.PartPathPrefix,
q.Start.UTC().Format("20060102T150405"),
q.End.UTC().Format("20060102T150405"),
)
}
// createPartFile returns a PartFile.
// The bool value shows if the part file already exists, and this range should be skipped.
func (q *Query) createPartFile() (*PartFile, bool) {
partFile := NewPartFile(q.outputFilename())
if !q.OverwriteCompleted {
// If we already have the completed file, no need to download it again.
// The user can delete the files if they want to download parts again.
exists, err := partFile.Exists()
if err != nil {
log.Fatalf("Query failed: %s\n", err)
}
if exists {
log.Printf("Skip range: %s - %s: already downloaded\n", q.Start, q.End)
return nil, true
}
}
if err := partFile.CreateTempFile(); err != nil {
log.Fatalf("Query failed: %s\n", err)
}
return partFile, false
}
// rounds up duration d by the multiple m, and then divides by m.
func ceilingDivision(d, m time.Duration) int64 {
return int64((d + m - 1) / m)
}
// Returns the next job's start and end times.
func (q *Query) nextJob(start, end time.Time) (time.Time, time.Time) {
if q.Forward {
start = end
return start, minTime(start.Add(q.ParallelDuration), q.End)
}
end = start
return maxTime(end.Add(-q.ParallelDuration), q.Start), end
}
type parallelJob struct {
q *Query
done chan struct{}
}
func newParallelJob(q *Query) *parallelJob {
return &parallelJob{
q: q,
done: make(chan struct{}),
}
}
func (j *parallelJob) run(c client.Client, out output.LogOutput, statistics bool) {
j.q.DoQuery(c, out, statistics)
j.done <- struct{}{}
}
func (q *Query) parallelJobs() []*parallelJob {
nJobs := ceilingDivision(q.End.Sub(q.Start), q.ParallelDuration)
jobs := make([]*parallelJob, nJobs)
// Normally `nextJob` will swap the start/end to get the next job. Here, we swap them
// on input so that we calculate the starting job instead of the next job.
start, end := q.nextJob(q.End, q.Start)
// Queue up jobs
for i := range jobs {
rq := *q
rq.Start = start
rq.End = end
jobs[i] = newParallelJob(&rq)
start, end = q.nextJob(start, end)
}
return jobs
}
// Waits for each job to finish in order, reads the part file and copies it to stdout
func (q *Query) mergeJobs(jobs []*parallelJob) error {
if !q.MergeParts {
return nil
}
for _, job := range jobs {
// wait for the next job to finish
<-job.done
f, err := os.Open(job.q.outputFilename())
if err != nil {
return fmt.Errorf("open file error: %w", err)
}
defer f.Close()
_, err = io.Copy(os.Stdout, f)
if err != nil {
return fmt.Errorf("copying file error: %w", err)
}
if !q.KeepParts {
err := os.Remove(job.q.outputFilename())
if err != nil {
return fmt.Errorf("removing file error: %w", err)
}
}
}
return nil
}
// Starts `ParallelMaxWorkers` number of workers to process all of the `parallelJob`s
// This function is non-blocking. The caller should `Wait` on the returned `WaitGroup`.
func (q *Query) startWorkers(
jobs []*parallelJob,
c client.Client,
out output.LogOutput,
statistics bool,
) *sync.WaitGroup {
wg := sync.WaitGroup{}
jobsChan := make(chan *parallelJob, len(jobs))
// Queue up the jobs
// There is a possible optimization here to use an unbuffered channel,
// But the memory and CPU overhead for yet another go routine makes me
// think that this optimization is not worth it. So I used a buffered
// channel instead.
for _, job := range jobs {
jobsChan <- job
}
close(jobsChan)
// Start workers
for w := 0; w < q.ParallelMaxWorkers; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobsChan {
job.run(c, out, statistics)
}
}()
}
return &wg
}
func (q *Query) DoQueryParallel(c client.Client, out output.LogOutput, statistics bool) {
if q.ParallelDuration < 1 {
log.Fatalf("Parallel duration has to be a positive value\n")
}
jobs := q.parallelJobs()
wg := q.startWorkers(jobs, c, out, statistics)
if err := q.mergeJobs(jobs); err != nil {
log.Fatalf("Merging part files error: %s\n", err)
}
wg.Wait()
}
func minTime(t1, t2 time.Time) time.Time {
if t1.Before(t2) {
return t1
}
return t2
}
func maxTime(t1, t2 time.Time) time.Time {
if t1.After(t2) {
return t1
}
return t2
}
func (q *Query) printResult(value loghttp.ResultValue, out output.LogOutput, lastEntry []*loghttp.Entry) (int, []*loghttp.Entry) {
length := -1
var entry []*loghttp.Entry

@ -3,6 +3,7 @@ package query
import (
"bytes"
"context"
"fmt"
"os"
"path/filepath"
"reflect"
@ -680,3 +681,229 @@ func TestLoadFromURL(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, schemaConfig)
}
func TestDurationCeilDiv(t *testing.T) {
tests := []struct {
name string
d time.Duration
m time.Duration
expect int64
}{
{
"10m / 5m = 2",
10 * time.Minute,
5 * time.Minute,
2,
},
{
"11m / 5m = 3",
11 * time.Minute,
5 * time.Minute,
3,
},
{
"1h / 15m = 4",
1 * time.Hour,
15 * time.Minute,
4,
},
{
"1h / 14m = 5",
1 * time.Hour,
14 * time.Minute,
5,
},
}
for _, tt := range tests {
t.Run(
tt.name,
func(t *testing.T) {
require.Equal(t, tt.expect, ceilingDivision(tt.d, tt.m))
},
)
}
}
func mustParseTime(value string) time.Time {
t, err := time.Parse("2006-01-02 15:04:05", value)
if err != nil {
panic(fmt.Errorf("invalid timestamp: %w", err))
}
return t
}
func cmpParallelJobSlice(t *testing.T, expected, actual []*parallelJob) {
require.Equal(t, len(expected), len(actual), "job slice lengths don't match")
for i, jobE := range expected {
jobA := actual[i]
require.Equal(t, jobE.q.Start, jobA.q.Start, "i=%d: job start not equal", i)
require.Equal(t, jobE.q.End, jobA.q.End, "i=%d: job end not equal", i)
require.Equal(t, jobE.q.Forward, jobA.q.Forward, "i=%d: job direction not equal", i)
}
}
func TestParallelJobs(t *testing.T) {
mkQuery := func(start, end string, d time.Duration, forward bool) *Query {
return &Query{
Start: mustParseTime(start),
End: mustParseTime(end),
ParallelDuration: d,
Forward: forward,
}
}
mkParallelJob := func(start, end string, forward bool) *parallelJob {
return &parallelJob{
q: mkQuery(start, end, time.Minute, forward),
}
}
tests := []struct {
name string
q *Query
jobs []*parallelJob
}{
{
"1h range, 30m period, forward",
mkQuery(
"2023-02-10 15:00:00",
"2023-02-10 16:00:00",
30*time.Minute,
true,
),
[]*parallelJob{
mkParallelJob(
"2023-02-10 15:00:00",
"2023-02-10 15:30:00",
true,
),
mkParallelJob(
"2023-02-10 15:30:00",
"2023-02-10 16:00:00",
true,
),
},
},
{
"1h range, 30m period, reverse",
mkQuery(
"2023-02-10 15:00:00",
"2023-02-10 16:00:00",
30*time.Minute,
false,
),
[]*parallelJob{
mkParallelJob(
"2023-02-10 15:30:00",
"2023-02-10 16:00:00",
false,
),
mkParallelJob(
"2023-02-10 15:00:00",
"2023-02-10 15:30:00",
false,
),
},
},
{
"1h1m range, 30m period, forward",
mkQuery(
"2023-02-10 15:00:00",
"2023-02-10 16:01:00",
30*time.Minute,
true,
),
[]*parallelJob{
mkParallelJob(
"2023-02-10 15:00:00",
"2023-02-10 15:30:00",
true,
),
mkParallelJob(
"2023-02-10 15:30:00",
"2023-02-10 16:00:00",
true,
),
mkParallelJob(
"2023-02-10 16:00:00",
"2023-02-10 16:01:00",
true,
),
},
},
{
"1h1m range, 30m period, reverse",
mkQuery(
"2023-02-10 15:00:00",
"2023-02-10 16:01:00",
30*time.Minute,
false,
),
[]*parallelJob{
mkParallelJob(
"2023-02-10 15:31:00",
"2023-02-10 16:01:00",
false,
),
mkParallelJob(
"2023-02-10 15:01:00",
"2023-02-10 15:31:00",
false,
),
mkParallelJob(
"2023-02-10 15:00:00",
"2023-02-10 15:01:00",
false,
),
},
},
{
"15m range, 30m period, forward",
mkQuery(
"2023-02-10 15:00:00",
"2023-02-10 15:15:00",
30*time.Minute,
true,
),
[]*parallelJob{
mkParallelJob(
"2023-02-10 15:00:00",
"2023-02-10 15:15:00",
true,
),
},
},
{
"15m range, 30m period, reverse",
mkQuery(
"2023-02-10 15:00:00",
"2023-02-10 15:15:00",
30*time.Minute,
false,
),
[]*parallelJob{
mkParallelJob(
"2023-02-10 15:00:00",
"2023-02-10 15:15:00",
false,
),
},
},
}
for _, tt := range tests {
tt := tt
t.Run(
tt.name,
func(t *testing.T) {
jobs := tt.q.parallelJobs()
cmpParallelJobSlice(t, tt.jobs, jobs)
},
)
}
}

Loading…
Cancel
Save