From eedd3d9fc2cb6f47aae879722ebf0df6d118eabb Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Tue, 9 Jul 2019 14:58:06 -0400 Subject: [PATCH] prune interval is configurable canary will suspend all operations on SIGINT but not exit, allowing you to shutdown the canary without it being restarted by docker/kubernetes SIGTERM will shutdown everything and end the process --- README.md | 12 +++++++++--- cmd/loki-canary/main.go | 11 ++++++++++- pkg/comparator/comparator.go | 7 +++++-- pkg/reader/reader.go | 12 ++++++++---- pkg/writer/writer.go | 7 +++++-- 5 files changed, 37 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 459228b40c..60678b4726 100644 --- a/README.md +++ b/README.md @@ -25,9 +25,9 @@ If the received log is: * The next in the array to be received, it is removed from the array and the (current time - log timestamp) is recorded in the `response_latency` histogram, this is the expected behavior for well behaving logs * Not the next in the array received, is is removed from the array, the response time is recorded in the `response_latency` histogram, and the `out_of_order_entries` counter is incremented - * Not in the array at all, the `unexpected_entries` counter is incremented + * Not in the array at all, it is checked against a separate list of received logs to either increment the `duplicate_entries` counter or the `unexpected_entries` counter. -In the background, loki-canary also runs a timer which iterates through all the entries in the internal array, if any are older than the duration specified by the `-wait` flag (default 60s), they are removed from the array and the `missing_entries` counter is incremented +In the background, loki-canary also runs a timer which iterates through all the entries in the internal array, if any are older than the duration specified by the `-wait` flag (default 60s), they are removed from the array and the `websocket_missing_entries` counter is incremented. Then an additional query is made directly to loki for these missing entries to determine if they were actually missing or just didn't make it down the websocket. If they are not found in the followup query the `missing_entries` counter is incremented. ## building and running @@ -74,6 +74,10 @@ You should also pass the `-labelname` and `-labelvalue` flags, these are used by If you get a high number of `unexpected_entries` you may not be waiting long enough and should increase `-wait` from 60s to something larger. +__Be cognizant__ of the relationship between `pruneinterval` and the `interval`. For example, with an interval of 10ms (100 logs per second) and a prune interval of 60s, you will write 6000 logs per minute, if those logs were not received over the websocket, the canary will attempt to query loki directly to see if they are completely lost. __However__ the query return is limited to 1000 results so you will not be able to return all the logs even if they did make it to Loki. + +__Likewise__, if you lower the `pruneinterval` you risk causing a denial of service attack as all your canaries attempt to query for missing logs at whatever your `pruneinterval` is defined at. + All options: ```nohighlight @@ -91,6 +95,8 @@ All options: Loki password -port int Port which loki-canary should expose metrics (default 3500) + -pruneinterval duration + Frequency to check sent vs received logs, also the frequency which queries for missing logs will be dispatched to loki (default 1m0s) -size int Size in bytes of each log line (default 100) -tls @@ -99,4 +105,4 @@ All options: Loki username -wait duration Duration to wait for log entries before reporting them lost (default 1m0s) -``` \ No newline at end of file +``` diff --git a/cmd/loki-canary/main.go b/cmd/loki-canary/main.go index 820cedaff3..442656e6d1 100644 --- a/cmd/loki-canary/main.go +++ b/cmd/loki-canary/main.go @@ -7,6 +7,7 @@ import ( "os" "os/signal" "strconv" + "syscall" "time" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -29,6 +30,7 @@ func main() { interval := flag.Duration("interval", 1000*time.Millisecond, "Duration between log entries") size := flag.Int("size", 100, "Size in bytes of each log line") wait := flag.Duration("wait", 60*time.Second, "Duration to wait for log entries before reporting them lost") + pruneInterval := flag.Duration("pruneinterval", 60*time.Second, "Frequency to check sent vs received logs, also the frequency which queries for missing logs will be dispatched to loki") buckets := flag.Int("buckets", 10, "Number of buckets in the response_latency histogram") flag.Parse() @@ -42,7 +44,7 @@ func main() { w := writer.NewWriter(os.Stdout, sentChan, *interval, *size) r := reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *lName, *lVal) - c := comparator.NewComparator(os.Stderr, *wait, 60*time.Second, *buckets, sentChan, receivedChan, r) + c := comparator.NewComparator(os.Stderr, *wait, *pruneInterval, *buckets, sentChan, receivedChan, r) http.Handle("/metrics", promhttp.Handler()) go func() { @@ -53,11 +55,18 @@ func main() { }() interrupt := make(chan os.Signal, 1) + terminate := make(chan os.Signal, 1) signal.Notify(interrupt, os.Interrupt) + signal.Notify(terminate, syscall.SIGTERM) for { select { case <-interrupt: + _, _ = fmt.Fprintf(os.Stderr, "suspending indefinetely\n") + w.Stop() + r.Stop() + c.Stop() + case <-terminate: _, _ = fmt.Fprintf(os.Stderr, "shutting down\n") w.Stop() r.Stop() diff --git a/pkg/comparator/comparator.go b/pkg/comparator/comparator.go index e78398e319..dd2451c0c6 100644 --- a/pkg/comparator/comparator.go +++ b/pkg/comparator/comparator.go @@ -95,8 +95,11 @@ func NewComparator(writer io.Writer, maxWait time.Duration, pruneInterval time.D } func (c *Comparator) Stop() { - close(c.quit) - <-c.done + if c.quit != nil { + close(c.quit) + <-c.done + c.quit = nil + } } func (c *Comparator) entrySent(time time.Time) { diff --git a/pkg/reader/reader.go b/pkg/reader/reader.go index 41ac31c817..dc80b2a121 100644 --- a/pkg/reader/reader.go +++ b/pkg/reader/reader.go @@ -92,8 +92,11 @@ func NewReader(writer io.Writer, receivedChan chan time.Time, tls bool, } func (r *Reader) Stop() { - close(r.quit) - <-r.done + if r.quit != nil { + close(r.quit) + <-r.done + r.quit = nil + } } func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) { @@ -105,8 +108,9 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) { Scheme: scheme, Host: r.addr, Path: "/api/prom/query", - RawQuery: fmt.Sprintf("start=%d&end=%d", start.UnixNano(), end.UnixNano()) + "&query=" + - url.QueryEscape(fmt.Sprintf("{stream=\"stdout\",%v=\"%v\"}", r.lName, r.lVal)), + RawQuery: fmt.Sprintf("start=%d&end=%d", start.UnixNano(), end.UnixNano()) + + "&query=" + url.QueryEscape(fmt.Sprintf("{stream=\"stdout\",%v=\"%v\"}", r.lName, r.lVal)) + + "&limit=1000", } _, _ = fmt.Fprintf(r.w, "Querying loki for missing values with query: %v\n", u.String()) diff --git a/pkg/writer/writer.go b/pkg/writer/writer.go index 74024f7c09..8d9b163a80 100644 --- a/pkg/writer/writer.go +++ b/pkg/writer/writer.go @@ -41,8 +41,11 @@ func NewWriter(writer io.Writer, sentChan chan time.Time, entryInterval time.Dur } func (w *Writer) Stop() { - close(w.quit) - <-w.done + if w.quit != nil { + close(w.quit) + <-w.done + w.quit = nil + } } func (w *Writer) run() {