feat(loki): extended tailing (#764)

* refactor(querier/ingester): TailRequest Lookback window

Moves the specifications of the Lookback Window out of the
logproto.QueryRequest into it's own type logproto.Lookback.
This is required, because the Lookback Window will be used in the TailRequest
as well.

* feat(querier): parse Lookback from HTTP Request

* feat(logcli): send Lookback Window spec with tail request

* feat(querier): include historic entries in tail mode

Extends tailing by sending a configurable amount of historic entries with
before the live entries. This enables a behaviour that is closer to kubectl logs
-f and docker logs -f.

It is implemented by running a regular Query before subscribing to the ingesters.

* fix: adapt tests to Lookback change

* feat(querier): check all errors to make the linter happy

* fix(logproto): flatten Lookback window spec

Flattens the Lookback window spec into the individual queries

* fix(ingester): adapt test to Lookback flatten
pull/785/head
sh0rez 7 years ago committed by GitHub
parent da6a13383a
commit 38cb093358
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 20
      cmd/logcli/client.go
  2. 21
      cmd/logcli/query.go
  3. 269
      pkg/logproto/logproto.pb.go
  4. 2
      pkg/logproto/logproto.proto
  5. 47
      pkg/querier/http.go
  6. 50
      pkg/querier/querier.go
  7. 11
      pkg/querier/tail.go

@ -21,12 +21,18 @@ const (
queryPath = "/api/prom/query?query=%s&limit=%d&start=%d&end=%d&direction=%s&regexp=%s"
labelsPath = "/api/prom/label"
labelValuesPath = "/api/prom/label/%s/values"
tailPath = "/api/prom/tail?query=%s&regexp=%s&delay_for=%d"
tailPath = "/api/prom/tail?query=%s&regexp=%s&delay_for=%d&limit=%d&start=%d"
)
func query(from, through time.Time, direction logproto.Direction) (*logproto.QueryResponse, error) {
path := fmt.Sprintf(queryPath, url.QueryEscape(*queryStr), *limit, from.UnixNano(),
through.UnixNano(), direction.String(), url.QueryEscape(*regexpStr))
path := fmt.Sprintf(queryPath,
url.QueryEscape(*queryStr), // query
*limit, // limit
from.UnixNano(), // start
through.UnixNano(), // end
direction.String(), // direction
url.QueryEscape(*regexpStr), // regexp
)
var resp logproto.QueryResponse
if err := doRequest(path, &resp); err != nil {
@ -105,7 +111,13 @@ func doRequest(path string, out interface{}) error {
}
func liveTailQueryConn() (*websocket.Conn, error) {
path := fmt.Sprintf(tailPath, url.QueryEscape(*queryStr), url.QueryEscape(*regexpStr), *delayFor)
path := fmt.Sprintf(tailPath,
url.QueryEscape(*queryStr), // query
url.QueryEscape(*regexpStr), // regexp
*delayFor, // delay_for
*limit, // limit
getStart(time.Now()).UnixNano(), // start
)
return wsConnect(path)
}

@ -12,6 +12,18 @@ import (
"github.com/grafana/loki/pkg/logproto"
)
func getStart(end time.Time) time.Time {
start := end.Add(-*since)
if *from != "" {
var err error
start, err = time.Parse(time.RFC3339Nano, *from)
if err != nil {
log.Fatalf("error parsing date '%s': %s", *from, err)
}
}
return start
}
func doQuery() {
if *tail {
tailQuery()
@ -24,14 +36,7 @@ func doQuery() {
)
end := time.Now()
start := end.Add(-*since)
if *from != "" {
var err error
start, err = time.Parse(time.RFC3339Nano, *from)
if err != nil {
log.Fatalf("error parsing --from date '%s': %s", *from, err)
}
}
start := getStart(end)
if *to != "" {
var err error

@ -6,18 +6,20 @@ package logproto
import (
context "context"
fmt "fmt"
_ "github.com/gogo/protobuf/gogoproto"
proto "github.com/gogo/protobuf/proto"
_ "github.com/gogo/protobuf/types"
github_com_gogo_protobuf_types "github.com/gogo/protobuf/types"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
io "io"
math "math"
math_bits "math/bits"
reflect "reflect"
strconv "strconv"
strings "strings"
time "time"
_ "github.com/gogo/protobuf/gogoproto"
proto "github.com/gogo/protobuf/proto"
_ "github.com/gogo/protobuf/types"
github_com_gogo_protobuf_types "github.com/gogo/protobuf/types"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
@ -470,9 +472,11 @@ func (m *Entry) GetLine() string {
}
type TailRequest struct {
Query string `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"`
Regex string `protobuf:"bytes,2,opt,name=regex,proto3" json:"regex,omitempty"`
DelayFor uint32 `protobuf:"varint,3,opt,name=delayFor,proto3" json:"delayFor,omitempty"`
Query string `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"`
Regex string `protobuf:"bytes,2,opt,name=regex,proto3" json:"regex,omitempty"`
DelayFor uint32 `protobuf:"varint,3,opt,name=delayFor,proto3" json:"delayFor,omitempty"`
Limit uint32 `protobuf:"varint,4,opt,name=limit,proto3" json:"limit,omitempty"`
Start time.Time `protobuf:"bytes,5,opt,name=start,proto3,stdtime" json:"start"`
}
func (m *TailRequest) Reset() { *m = TailRequest{} }
@ -528,6 +532,20 @@ func (m *TailRequest) GetDelayFor() uint32 {
return 0
}
func (m *TailRequest) GetLimit() uint32 {
if m != nil {
return m.Limit
}
return 0
}
func (m *TailRequest) GetStart() time.Time {
if m != nil {
return m.Start
}
return time.Time{}
}
type TailResponse struct {
Stream *Stream `protobuf:"bytes,1,opt,name=stream,proto3" json:"stream,omitempty"`
DroppedStreams []*DroppedStream `protobuf:"bytes,2,rep,name=droppedStreams,proto3" json:"droppedStreams,omitempty"`
@ -656,53 +674,55 @@ func init() {
func init() { proto.RegisterFile("logproto.proto", fileDescriptor_7a8976f235a02f79) }
var fileDescriptor_7a8976f235a02f79 = []byte{
// 728 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0x4d, 0x4f, 0x13, 0x4f,
0x18, 0xdf, 0xe9, 0x7b, 0x9f, 0xbe, 0x40, 0xe6, 0xff, 0x17, 0x9a, 0xc6, 0x6c, 0x9b, 0x3d, 0x68,
0x43, 0x62, 0xd1, 0x4a, 0x44, 0xd1, 0xc4, 0x50, 0x91, 0x98, 0x68, 0xa2, 0x0e, 0x18, 0xcf, 0x5b,
0x3a, 0x94, 0x4d, 0xb6, 0x3b, 0x65, 0x77, 0x6a, 0xec, 0xcd, 0x8f, 0xc0, 0xcd, 0xaf, 0xe0, 0xa7,
0xf0, 0xcc, 0x91, 0x23, 0xa7, 0x2a, 0xcb, 0xc5, 0x70, 0xe2, 0xe6, 0xd5, 0xcc, 0xcb, 0x76, 0x17,
0x48, 0x04, 0x2f, 0xbb, 0xf3, 0x9b, 0xf9, 0x3d, 0xf3, 0xbc, 0xfd, 0x9e, 0x81, 0xaa, 0xcb, 0x06,
0x23, 0x9f, 0x71, 0xd6, 0x96, 0x5f, 0x5c, 0x88, 0x70, 0xbd, 0x31, 0x60, 0x6c, 0xe0, 0xd2, 0x65,
0x89, 0x7a, 0xe3, 0xdd, 0x65, 0xee, 0x0c, 0x69, 0xc0, 0xed, 0xe1, 0x48, 0x51, 0xeb, 0xf7, 0x06,
0x0e, 0xdf, 0x1b, 0xf7, 0xda, 0x3b, 0x6c, 0xb8, 0x3c, 0x60, 0x03, 0x16, 0x33, 0x05, 0x92, 0x40,
0xae, 0x14, 0xdd, 0xda, 0x84, 0xd2, 0xbb, 0x71, 0xb0, 0x47, 0xe8, 0xfe, 0x98, 0x06, 0x1c, 0xaf,
0x42, 0x3e, 0xe0, 0x3e, 0xb5, 0x87, 0x41, 0x0d, 0x35, 0xd3, 0xad, 0x52, 0x67, 0xbe, 0x3d, 0x0b,
0x65, 0x4b, 0x1e, 0x74, 0x4b, 0x67, 0xd3, 0x46, 0x44, 0x22, 0xd1, 0xc2, 0xaa, 0x42, 0x59, 0xdd,
0x13, 0x8c, 0x98, 0x17, 0x50, 0xeb, 0x37, 0x82, 0xf2, 0xfb, 0x31, 0xf5, 0x27, 0xd1, 0xcd, 0xff,
0x43, 0x76, 0x5f, 0xe0, 0x1a, 0x6a, 0xa2, 0x56, 0x91, 0x28, 0x20, 0x76, 0x5d, 0x67, 0xe8, 0xf0,
0x5a, 0xaa, 0x89, 0x5a, 0x15, 0xa2, 0x00, 0x5e, 0x83, 0x6c, 0xc0, 0x6d, 0x9f, 0xd7, 0xd2, 0x4d,
0xd4, 0x2a, 0x75, 0xea, 0x6d, 0x95, 0x74, 0x3b, 0x4a, 0xa5, 0xbd, 0x1d, 0x25, 0xdd, 0x2d, 0x1c,
0x4e, 0x1b, 0xc6, 0xc1, 0x8f, 0x06, 0x22, 0xca, 0x04, 0x3f, 0x82, 0x34, 0xf5, 0xfa, 0xb5, 0xcc,
0x3f, 0x58, 0x0a, 0x03, 0xfc, 0x00, 0x8a, 0x7d, 0xc7, 0xa7, 0x3b, 0xdc, 0x61, 0x5e, 0x2d, 0xdb,
0x44, 0xad, 0x6a, 0xe7, 0xbf, 0x38, 0xf7, 0x8d, 0xe8, 0x88, 0xc4, 0x2c, 0x11, 0xbc, 0x4f, 0x07,
0xf4, 0x73, 0x2d, 0xa7, 0x52, 0x92, 0xc0, 0x7a, 0x0a, 0x15, 0x9d, 0xb8, 0x2a, 0x05, 0x5e, 0xba,
0xb6, 0xa6, 0x71, 0x19, 0xd7, 0xa0, 0xfc, 0xc6, 0xee, 0x51, 0x37, 0xaa, 0x1a, 0x86, 0x8c, 0x67,
0x0f, 0xa9, 0x2e, 0x9a, 0x5c, 0xe3, 0x05, 0xc8, 0x7d, 0xb2, 0xdd, 0x31, 0x0d, 0x64, 0xd1, 0x0a,
0x44, 0x23, 0xeb, 0x2e, 0x54, 0xb4, 0xad, 0x76, 0x1c, 0x13, 0x85, 0xdf, 0xe2, 0x8c, 0xb8, 0x07,
0x39, 0xe5, 0x17, 0x5b, 0x90, 0x73, 0x85, 0x49, 0xa0, 0x1c, 0x74, 0xe1, 0x6c, 0xda, 0xd0, 0x3b,
0x44, 0xff, 0xf1, 0x1a, 0xe4, 0xa9, 0xc7, 0x7d, 0x47, 0xfa, 0x13, 0xe1, 0xcf, 0xc5, 0xe1, 0xbf,
0xf4, 0xb8, 0x3f, 0xe9, 0xce, 0x89, 0x4a, 0x0a, 0x55, 0x68, 0x1e, 0x89, 0x16, 0x16, 0x83, 0xac,
0xa4, 0xe0, 0x57, 0x50, 0x9c, 0x09, 0x55, 0xfa, 0xfa, 0x7b, 0x6f, 0xaa, 0xfa, 0xc6, 0x14, 0x0f,
0x64, 0x87, 0x62, 0x63, 0x7c, 0x1b, 0x32, 0xae, 0xe3, 0x51, 0x99, 0x7b, 0xb1, 0x5b, 0x38, 0x9b,
0x36, 0x24, 0x26, 0xf2, 0x6b, 0x7d, 0x80, 0xd2, 0xb6, 0xed, 0xb8, 0xd7, 0x8a, 0x4e, 0xf5, 0x2d,
0x95, 0xe8, 0x1b, 0xae, 0x43, 0xa1, 0x4f, 0x5d, 0x7b, 0xb2, 0xc9, 0x7c, 0xa9, 0xbb, 0x0a, 0x99,
0x61, 0x6b, 0x02, 0x65, 0x75, 0xad, 0xae, 0x6c, 0x0b, 0x72, 0xaa, 0x63, 0x3a, 0x97, 0xab, 0x1d,
0xd5, 0xe7, 0xf8, 0x39, 0x54, 0xfb, 0x3e, 0x1b, 0x8d, 0x68, 0x7f, 0x4b, 0x6b, 0x40, 0x15, 0x71,
0x31, 0xa1, 0xad, 0xe4, 0x39, 0xb9, 0x44, 0xb7, 0xbe, 0x22, 0xa8, 0x5c, 0x60, 0xe0, 0xc7, 0x90,
0xd9, 0xf5, 0xd9, 0xf0, 0x06, 0x65, 0x8c, 0x25, 0x2e, 0x2d, 0xf0, 0x0a, 0xa4, 0x38, 0x93, 0x59,
0xdf, 0xd4, 0x2e, 0xc5, 0x99, 0x90, 0x91, 0x16, 0x49, 0x5a, 0xd6, 0x4b, 0xa3, 0xa5, 0x3b, 0x50,
0x9c, 0x8d, 0x05, 0x2e, 0x41, 0x7e, 0xf3, 0x2d, 0xf9, 0xb8, 0x4e, 0x36, 0xe6, 0x0d, 0x5c, 0x86,
0x42, 0x77, 0xfd, 0xc5, 0x6b, 0x89, 0x50, 0x67, 0x1d, 0x72, 0xe2, 0x69, 0xa0, 0x3e, 0x5e, 0x85,
0x8c, 0x58, 0xe1, 0x5b, 0x71, 0xf2, 0x89, 0xc7, 0xa7, 0xbe, 0x70, 0x79, 0x5b, 0xbf, 0x25, 0x46,
0xe7, 0x3b, 0x82, 0xbc, 0x18, 0x2a, 0x87, 0xfa, 0xf8, 0x19, 0x64, 0xe5, 0x7c, 0xe1, 0x04, 0x3d,
0xf9, 0xd2, 0xd4, 0x17, 0xaf, 0xec, 0x47, 0xf7, 0xdc, 0x47, 0xe2, 0x69, 0x91, 0x43, 0x92, 0xb4,
0x4e, 0x4e, 0x5c, 0xd2, 0xfa, 0xc2, 0x34, 0x59, 0x06, 0x7e, 0x02, 0x19, 0xa1, 0x82, 0x64, 0xf8,
0x09, 0xb1, 0x25, 0xc3, 0x4f, 0x8a, 0x45, 0xb8, 0xed, 0xae, 0x1c, 0x9d, 0x98, 0xc6, 0xf1, 0x89,
0x69, 0x9c, 0x9f, 0x98, 0xe8, 0x4b, 0x68, 0xa2, 0x6f, 0xa1, 0x89, 0x0e, 0x43, 0x13, 0x1d, 0x85,
0x26, 0xfa, 0x19, 0x9a, 0xe8, 0x57, 0x68, 0x1a, 0xe7, 0xa1, 0x89, 0x0e, 0x4e, 0x4d, 0xe3, 0xe8,
0xd4, 0x34, 0x8e, 0x4f, 0x4d, 0xa3, 0x97, 0x93, 0xb7, 0x3d, 0xfc, 0x13, 0x00, 0x00, 0xff, 0xff,
0x37, 0x0c, 0xea, 0x1a, 0x0f, 0x06, 0x00, 0x00,
// 754 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x54, 0x4f, 0x4f, 0x13, 0x4f,
0x18, 0xde, 0xe9, 0xff, 0xbe, 0xfd, 0x03, 0x99, 0xdf, 0x4f, 0x68, 0x1a, 0xb3, 0x6d, 0xf6, 0xa0,
0x0d, 0x89, 0x45, 0x2b, 0x11, 0x45, 0x13, 0x43, 0x45, 0x62, 0xa2, 0x89, 0x3a, 0x90, 0x78, 0xde,
0xd2, 0xa1, 0x6c, 0xb2, 0xed, 0x94, 0xdd, 0xa9, 0xb1, 0x37, 0x3f, 0x02, 0x37, 0x3f, 0x82, 0x9e,
0xfc, 0x08, 0x9e, 0x39, 0x72, 0xe4, 0x54, 0x65, 0xb9, 0x18, 0x4e, 0xdc, 0xbc, 0x9a, 0xf9, 0xb3,
0xdd, 0x05, 0x8c, 0x80, 0x97, 0x76, 0x9e, 0x99, 0xf7, 0x9d, 0x7d, 0x9f, 0x67, 0x9e, 0xf7, 0x85,
0xb2, 0xcb, 0x7a, 0x43, 0x8f, 0x71, 0xd6, 0x94, 0xbf, 0x38, 0x17, 0xe2, 0x6a, 0xad, 0xc7, 0x58,
0xcf, 0xa5, 0x8b, 0x12, 0x75, 0x46, 0xdb, 0x8b, 0xdc, 0xe9, 0x53, 0x9f, 0xdb, 0xfd, 0xa1, 0x0a,
0xad, 0xde, 0xe9, 0x39, 0x7c, 0x67, 0xd4, 0x69, 0x6e, 0xb1, 0xfe, 0x62, 0x8f, 0xf5, 0x58, 0x14,
0x29, 0x90, 0x04, 0x72, 0xa5, 0xc2, 0xad, 0x75, 0x28, 0xbc, 0x19, 0xf9, 0x3b, 0x84, 0xee, 0x8e,
0xa8, 0xcf, 0xf1, 0x32, 0x64, 0x7d, 0xee, 0x51, 0xbb, 0xef, 0x57, 0x50, 0x3d, 0xd9, 0x28, 0xb4,
0x66, 0x9b, 0xd3, 0x52, 0x36, 0xe4, 0x41, 0xbb, 0x70, 0x32, 0xa9, 0x85, 0x41, 0x24, 0x5c, 0x58,
0x65, 0x28, 0xaa, 0x7b, 0xfc, 0x21, 0x1b, 0xf8, 0xd4, 0xfa, 0x85, 0xa0, 0xf8, 0x76, 0x44, 0xbd,
0x71, 0x78, 0xf3, 0xff, 0x90, 0xde, 0x15, 0xb8, 0x82, 0xea, 0xa8, 0x91, 0x27, 0x0a, 0x88, 0x5d,
0xd7, 0xe9, 0x3b, 0xbc, 0x92, 0xa8, 0xa3, 0x46, 0x89, 0x28, 0x80, 0x57, 0x20, 0xed, 0x73, 0xdb,
0xe3, 0x95, 0x64, 0x1d, 0x35, 0x0a, 0xad, 0x6a, 0x53, 0x91, 0x6e, 0x86, 0x54, 0x9a, 0x9b, 0x21,
0xe9, 0x76, 0x6e, 0x7f, 0x52, 0x33, 0xf6, 0xbe, 0xd7, 0x10, 0x51, 0x29, 0xf8, 0x01, 0x24, 0xe9,
0xa0, 0x5b, 0x49, 0x5d, 0x23, 0x53, 0x24, 0xe0, 0x7b, 0x90, 0xef, 0x3a, 0x1e, 0xdd, 0xe2, 0x0e,
0x1b, 0x54, 0xd2, 0x75, 0xd4, 0x28, 0xb7, 0xfe, 0x8b, 0xb8, 0xaf, 0x85, 0x47, 0x24, 0x8a, 0x12,
0xc5, 0x7b, 0xb4, 0x47, 0x3f, 0x54, 0x32, 0x8a, 0x92, 0x04, 0xd6, 0x63, 0x28, 0x69, 0xe2, 0x4a,
0x0a, 0xbc, 0x70, 0xa9, 0xa6, 0x91, 0x8c, 0x5f, 0x11, 0x14, 0x5f, 0xd9, 0x1d, 0xea, 0x86, 0xb2,
0x61, 0x48, 0x0d, 0xec, 0x3e, 0xd5, 0xaa, 0xc9, 0x35, 0x9e, 0x83, 0xcc, 0x7b, 0xdb, 0x1d, 0x51,
0x5f, 0xaa, 0x96, 0x23, 0x1a, 0x5d, 0x57, 0x36, 0xf4, 0xcf, 0xb2, 0xa1, 0xa9, 0x6c, 0xd6, 0x6d,
0x28, 0xe9, 0x7a, 0x35, 0xdb, 0xa8, 0x38, 0x41, 0x36, 0x1f, 0x16, 0x67, 0xed, 0x40, 0x46, 0x91,
0xc5, 0x16, 0x64, 0x5c, 0x91, 0xe2, 0x2b, 0x52, 0x6d, 0x38, 0x99, 0xd4, 0xf4, 0x0e, 0xd1, 0xff,
0x78, 0x05, 0xb2, 0x74, 0xc0, 0x3d, 0x47, 0x72, 0x14, 0x9a, 0xcd, 0x44, 0x9a, 0x3d, 0x1f, 0x70,
0x6f, 0xdc, 0x9e, 0x11, 0xcf, 0x27, 0xac, 0xa8, 0xe3, 0x48, 0xb8, 0xb0, 0x18, 0xa4, 0x65, 0x08,
0x7e, 0x01, 0xf9, 0x69, 0x77, 0xc8, 0x6f, 0xfd, 0x9d, 0x59, 0x59, 0xdf, 0x98, 0xe0, 0xbe, 0xe4,
0x17, 0x25, 0xe3, 0x9b, 0x90, 0x72, 0x9d, 0x01, 0x95, 0x7a, 0xe7, 0xdb, 0xb9, 0x93, 0x49, 0x4d,
0x62, 0x22, 0x7f, 0xad, 0xcf, 0x08, 0x0a, 0x9b, 0xb6, 0xe3, 0x5e, 0x6a, 0x75, 0xe5, 0x96, 0x44,
0xcc, 0x2d, 0xb8, 0x0a, 0xb9, 0x2e, 0x75, 0xed, 0xf1, 0x3a, 0xf3, 0xe4, 0xb3, 0x95, 0xc8, 0x14,
0x47, 0xcd, 0x91, 0xfa, 0x63, 0x73, 0xa4, 0xaf, 0xdd, 0x1c, 0xd6, 0x18, 0x8a, 0xaa, 0x50, 0xfd,
0x58, 0x0d, 0xc8, 0x28, 0xe7, 0x69, 0x79, 0x2e, 0x3a, 0x53, 0x9f, 0xe3, 0xa7, 0x50, 0xee, 0x7a,
0x6c, 0x38, 0xa4, 0xdd, 0x0d, 0xed, 0x65, 0xf5, 0x2e, 0xf3, 0xb1, 0x1e, 0x89, 0x9f, 0x93, 0x73,
0xe1, 0xd6, 0x27, 0x04, 0xa5, 0x33, 0x11, 0xf8, 0x21, 0xa4, 0xb6, 0x3d, 0xd6, 0xbf, 0xc2, 0xcb,
0x44, 0x3c, 0x64, 0x06, 0x5e, 0x82, 0x04, 0x67, 0x52, 0xc7, 0xab, 0xe6, 0x25, 0x38, 0x13, 0xce,
0xd4, 0xbe, 0x4b, 0xca, 0x17, 0xd0, 0x68, 0xe1, 0x16, 0xe4, 0xa7, 0xed, 0x8d, 0x0b, 0x90, 0x5d,
0x7f, 0x4d, 0xde, 0xad, 0x92, 0xb5, 0x59, 0x03, 0x17, 0x21, 0xd7, 0x5e, 0x7d, 0xf6, 0x52, 0x22,
0xd4, 0x5a, 0x85, 0x8c, 0x18, 0x71, 0xd4, 0xc3, 0xcb, 0x90, 0x12, 0x2b, 0x7c, 0x23, 0x22, 0x1f,
0x1b, 0xa2, 0xd5, 0xb9, 0xf3, 0xdb, 0x7a, 0x26, 0x1a, 0xad, 0x6f, 0x08, 0xb2, 0x62, 0x38, 0x38,
0xd4, 0xc3, 0x4f, 0x20, 0x2d, 0xe7, 0x04, 0x8e, 0x85, 0xc7, 0x27, 0x66, 0x75, 0xfe, 0xc2, 0x7e,
0x78, 0xcf, 0x5d, 0x24, 0x5c, 0x20, 0xfb, 0x2e, 0x9e, 0x1d, 0x1f, 0x1c, 0xf1, 0xec, 0x33, 0x0d,
0x6a, 0x19, 0xf8, 0x11, 0xa4, 0x84, 0x0b, 0xe2, 0xe5, 0xc7, 0xec, 0x1b, 0x2f, 0x3f, 0x6e, 0x16,
0xf1, 0xd9, 0xf6, 0xd2, 0xc1, 0x91, 0x69, 0x1c, 0x1e, 0x99, 0xc6, 0xe9, 0x91, 0x89, 0x3e, 0x06,
0x26, 0xfa, 0x12, 0x98, 0x68, 0x3f, 0x30, 0xd1, 0x41, 0x60, 0xa2, 0x1f, 0x81, 0x89, 0x7e, 0x06,
0xa6, 0x71, 0x1a, 0x98, 0x68, 0xef, 0xd8, 0x34, 0x0e, 0x8e, 0x4d, 0xe3, 0xf0, 0xd8, 0x34, 0x3a,
0x19, 0x79, 0xdb, 0xfd, 0xdf, 0x01, 0x00, 0x00, 0xff, 0xff, 0x25, 0xbd, 0x8d, 0xbf, 0xd7, 0x06,
0x00, 0x00,
}
func (x Direction) String() string {
@ -987,6 +1007,12 @@ func (this *TailRequest) Equal(that interface{}) bool {
if this.DelayFor != that1.DelayFor {
return false
}
if this.Limit != that1.Limit {
return false
}
if !this.Start.Equal(that1.Start) {
return false
}
return true
}
func (this *TailResponse) Equal(that interface{}) bool {
@ -1154,11 +1180,13 @@ func (this *TailRequest) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 7)
s := make([]string, 0, 9)
s = append(s, "&logproto.TailRequest{")
s = append(s, "Query: "+fmt.Sprintf("%#v", this.Query)+",\n")
s = append(s, "Regex: "+fmt.Sprintf("%#v", this.Regex)+",\n")
s = append(s, "DelayFor: "+fmt.Sprintf("%#v", this.DelayFor)+",\n")
s = append(s, "Limit: "+fmt.Sprintf("%#v", this.Limit)+",\n")
s = append(s, "Start: "+fmt.Sprintf("%#v", this.Start)+",\n")
s = append(s, "}")
return strings.Join(s, "")
}
@ -1235,6 +1263,14 @@ type PusherServer interface {
Push(context.Context, *PushRequest) (*PushResponse, error)
}
// UnimplementedPusherServer can be embedded to have forward compatible implementations.
type UnimplementedPusherServer struct {
}
func (*UnimplementedPusherServer) Push(ctx context.Context, req *PushRequest) (*PushResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Push not implemented")
}
func RegisterPusherServer(s *grpc.Server, srv PusherServer) {
s.RegisterService(&_Pusher_serviceDesc, srv)
}
@ -1367,6 +1403,20 @@ type QuerierServer interface {
Tail(*TailRequest, Querier_TailServer) error
}
// UnimplementedQuerierServer can be embedded to have forward compatible implementations.
type UnimplementedQuerierServer struct {
}
func (*UnimplementedQuerierServer) Query(req *QueryRequest, srv Querier_QueryServer) error {
return status.Errorf(codes.Unimplemented, "method Query not implemented")
}
func (*UnimplementedQuerierServer) Label(ctx context.Context, req *LabelRequest) (*LabelResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Label not implemented")
}
func (*UnimplementedQuerierServer) Tail(req *TailRequest, srv Querier_TailServer) error {
return status.Errorf(codes.Unimplemented, "method Tail not implemented")
}
func RegisterQuerierServer(s *grpc.Server, srv QuerierServer) {
s.RegisterService(&_Querier_serviceDesc, srv)
}
@ -1776,6 +1826,19 @@ func (m *TailRequest) MarshalTo(dAtA []byte) (int, error) {
i++
i = encodeVarintLogproto(dAtA, i, uint64(m.DelayFor))
}
if m.Limit != 0 {
dAtA[i] = 0x20
i++
i = encodeVarintLogproto(dAtA, i, uint64(m.Limit))
}
dAtA[i] = 0x2a
i++
i = encodeVarintLogproto(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.Start)))
n6, err6 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Start, dAtA[i:])
if err6 != nil {
return 0, err6
}
i += n6
return i, nil
}
@ -1798,11 +1861,11 @@ func (m *TailResponse) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0xa
i++
i = encodeVarintLogproto(dAtA, i, uint64(m.Stream.Size()))
n4, err4 := m.Stream.MarshalTo(dAtA[i:])
if err4 != nil {
return 0, err4
n7, err7 := m.Stream.MarshalTo(dAtA[i:])
if err7 != nil {
return 0, err7
}
i += n4
i += n7
}
if len(m.DroppedStreams) > 0 {
for _, msg := range m.DroppedStreams {
@ -1837,19 +1900,19 @@ func (m *DroppedStream) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0xa
i++
i = encodeVarintLogproto(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.From)))
n5, err5 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.From, dAtA[i:])
if err5 != nil {
return 0, err5
n8, err8 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.From, dAtA[i:])
if err8 != nil {
return 0, err8
}
i += n5
i += n8
dAtA[i] = 0x12
i++
i = encodeVarintLogproto(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.To)))
n6, err6 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.To, dAtA[i:])
if err6 != nil {
return 0, err6
n9, err9 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.To, dAtA[i:])
if err9 != nil {
return 0, err9
}
i += n6
i += n9
if len(m.Labels) > 0 {
dAtA[i] = 0x1a
i++
@ -2024,6 +2087,11 @@ func (m *TailRequest) Size() (n int) {
if m.DelayFor != 0 {
n += 1 + sovLogproto(uint64(m.DelayFor))
}
if m.Limit != 0 {
n += 1 + sovLogproto(uint64(m.Limit))
}
l = github_com_gogo_protobuf_types.SizeOfStdTime(m.Start)
n += 1 + l + sovLogproto(uint64(l))
return n
}
@ -2064,14 +2132,7 @@ func (m *DroppedStream) Size() (n int) {
}
func sovLogproto(x uint64) (n int) {
for {
n++
x >>= 7
if x == 0 {
break
}
}
return n
return (math_bits.Len64(x|1) + 6) / 7
}
func sozLogproto(x uint64) (n int) {
return sovLogproto(uint64((x << 1) ^ uint64((int64(x) >> 63))))
@ -2188,6 +2249,8 @@ func (this *TailRequest) String() string {
`Query:` + fmt.Sprintf("%v", this.Query) + `,`,
`Regex:` + fmt.Sprintf("%v", this.Regex) + `,`,
`DelayFor:` + fmt.Sprintf("%v", this.DelayFor) + `,`,
`Limit:` + fmt.Sprintf("%v", this.Limit) + `,`,
`Start:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Start), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`,
`}`,
}, "")
return s
@ -3287,6 +3350,58 @@ func (m *TailRequest) Unmarshal(dAtA []byte) error {
break
}
}
case 4:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Limit", wireType)
}
m.Limit = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLogproto
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Limit |= uint32(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 5:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Start", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLogproto
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthLogproto
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthLogproto
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.Start, dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipLogproto(dAtA[iNdEx:])

@ -65,6 +65,8 @@ message TailRequest {
string query = 1;
string regex = 2;
uint32 delayFor = 3;
uint32 limit = 4;
google.protobuf.Timestamp start = 5 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
}
message TailResponse {

@ -22,7 +22,7 @@ import (
const (
defaultQueryLimit = 100
defaulSince = 1 * time.Hour
defaultSince = 1 * time.Hour
wsPingPeriod = 1 * time.Second
maxDelayForInTailing = 5
)
@ -70,28 +70,16 @@ func directionParam(values url.Values, name string, def logproto.Direction) (log
func httpRequestToQueryRequest(httpRequest *http.Request) (*logproto.QueryRequest, error) {
params := httpRequest.URL.Query()
now := time.Now()
queryRequest := logproto.QueryRequest{
Regex: params.Get("regexp"),
Query: params.Get("query"),
}
limit, err := intParam(params, "limit", defaultQueryLimit)
var err error
queryRequest.Limit, queryRequest.Start, queryRequest.End, err = httpRequestToLookback(httpRequest)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
return nil, err
}
queryRequest.Limit = uint32(limit)
queryRequest.Start, err = unixNanoTimeParam(params, "start", now.Add(-defaulSince))
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
queryRequest.End, err = unixNanoTimeParam(params, "end", now)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
queryRequest.Direction, err = directionParam(params, "direction", logproto.BACKWARD)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
@ -106,6 +94,11 @@ func httpRequestToTailRequest(httpRequest *http.Request) (*logproto.TailRequest,
Regex: params.Get("regexp"),
Query: params.Get("query"),
}
var err error
tailRequest.Limit, tailRequest.Start, _, err = httpRequestToLookback(httpRequest)
if err != nil {
return nil, err
}
// delay_for is used to allow server to let slow loggers catch up.
// Entries would be accumulated in a heap until they become older than now()-<delay_for>
@ -119,6 +112,28 @@ func httpRequestToTailRequest(httpRequest *http.Request) (*logproto.TailRequest,
return &tailRequest, nil
}
func httpRequestToLookback(httpRequest *http.Request) (limit uint32, start, end time.Time, err error) {
params := httpRequest.URL.Query()
now := time.Now()
lim, err := intParam(params, "limit", defaultQueryLimit)
if err != nil {
return 0, time.Now(), time.Now(), httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
limit = uint32(lim)
start, err = unixNanoTimeParam(params, "start", now.Add(-defaultSince))
if err != nil {
return 0, time.Now(), time.Now(), httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
end, err = unixNanoTimeParam(params, "end", now)
if err != nil {
return 0, time.Now(), time.Now(), httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
return
}
// QueryHandler is a http.HandlerFunc for queries.
func (q *Querier) QueryHandler(w http.ResponseWriter, r *http.Request) {
request, err := httpRequestToQueryRequest(r)

@ -109,7 +109,19 @@ func (q *Querier) forGivenIngesters(replicationSet ring.ReplicationSet, f func(l
// Query does the heavy lifting for an actual query.
func (q *Querier) Query(ctx context.Context, req *logproto.QueryRequest) (*logproto.QueryResponse, error) {
iterators, err := q.getQueryIterators(ctx, req)
if err != nil {
return nil, err
}
iterator := iter.NewHeapIterator(iterators, req.Direction)
defer helpers.LogError("closing iterator", iterator.Close)
resp, _, err := ReadBatch(iterator, req.Limit)
return resp, err
}
func (q *Querier) getQueryIterators(ctx context.Context, req *logproto.QueryRequest) ([]iter.EntryIterator, error) {
ingesterIterators, err := q.queryIngesters(ctx, req)
if err != nil {
return nil, err
@ -121,11 +133,7 @@ func (q *Querier) Query(ctx context.Context, req *logproto.QueryRequest) (*logpr
}
iterators := append(ingesterIterators, chunkStoreIterators)
iterator := iter.NewHeapIterator(iterators, req.Direction)
defer helpers.LogError("closing iterator", iterator.Close)
resp, _, err := ReadBatch(iterator, req.Limit)
return resp, err
return iterators, nil
}
func (q *Querier) queryIngesters(ctx context.Context, req *logproto.QueryRequest) ([]iter.EntryIterator, error) {
@ -260,11 +268,31 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer,
tailClients[clients[i].addr] = clients[i].response.(logproto.Querier_TailClient)
}
return newTailer(time.Duration(req.DelayFor)*time.Second, tailClients, func(from, to time.Time, labels string) (iterator iter.EntryIterator, e error) {
return q.queryDroppedStreams(ctx, req, from, to, labels)
}, func(connectedIngestersAddr []string) (map[string]logproto.Querier_TailClient, error) {
return q.tailDisconnectedIngesters(ctx, req, connectedIngestersAddr)
}, q.cfg.TailMaxDuration), nil
histReq := logproto.QueryRequest{
Query: req.Query,
Start: req.Start,
End: time.Now(),
Limit: req.Limit,
Direction: logproto.FORWARD,
Regex: req.Regex,
}
histIterators, err := q.getQueryIterators(ctx, &histReq)
if err != nil {
return nil, err
}
return newTailer(
time.Duration(req.DelayFor)*time.Second,
tailClients,
histIterators,
func(from, to time.Time, labels string) (iterator iter.EntryIterator, e error) {
return q.queryDroppedStreams(ctx, req, from, to, labels)
},
func(connectedIngestersAddr []string) (map[string]logproto.Querier_TailClient, error) {
return q.tailDisconnectedIngesters(ctx, req, connectedIngestersAddr)
},
q.cfg.TailMaxDuration,
), nil
}
// passed to tailer for querying dropped streams
@ -284,9 +312,9 @@ func (q *Querier) queryDroppedStreams(ctx context.Context, req *logproto.TailReq
Direction: logproto.FORWARD,
Start: start,
End: end,
Limit: 10000,
Query: req.Query,
Regex: req.Regex,
Limit: 10000,
}
clients, err := q.forGivenIngesters(replicationSet, func(client logproto.QuerierClient) (interface{}, error) {

@ -324,11 +324,16 @@ func (t *Tailer) getCloseErrorChan() <-chan error {
return t.closeErrChan
}
func newTailer(delayFor time.Duration, querierTailClients map[string]logproto.Querier_TailClient,
func newTailer(
delayFor time.Duration,
querierTailClients map[string]logproto.Querier_TailClient,
historicEntries []iter.EntryIterator,
queryDroppedStreams func(from, to time.Time, labels string) (iter.EntryIterator, error),
tailDisconnectedIngesters func([]string) (map[string]logproto.Querier_TailClient, error), tailMaxDuration time.Duration) *Tailer {
tailDisconnectedIngesters func([]string) (map[string]logproto.Querier_TailClient, error),
tailMaxDuration time.Duration,
) *Tailer {
t := Tailer{
openStreamIterator: iter.NewHeapIterator([]iter.EntryIterator{}, logproto.FORWARD),
openStreamIterator: iter.NewHeapIterator(historicEntries, logproto.FORWARD),
//droppedStreamsIterator: &droppedStreamsIterator{},
querierTailClients: querierTailClients,
queryDroppedStreams: queryDroppedStreams,

Loading…
Cancel
Save