SSE: Fix goroutine leak in math operation expression parsing (#102380)

---------

Co-authored-by: Sam Jewell <sam.jewell@grafana.com>
pull/102456/head
Kyle Brandt 4 months ago committed by GitHub
parent 216b6e96a9
commit 3f2ef07872
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 46
      pkg/expr/mathexp/parse/lex.go
  2. 69
      pkg/expr/mathexp/parse/lex_test.go
  3. 5
      pkg/expr/mathexp/parse/parse.go

@ -67,13 +67,14 @@ type stateFn func(*lexer) stateFn
// lexer holds the state of the scanner.
type lexer struct {
input string // the string being scanned
state stateFn // the next lexing function to enter
pos Pos // current position in the input
start Pos // start position of this item
width Pos // width of last rune read from input
lastPos Pos // position of most recent item returned by nextItem
items chan item // channel of scanned items
input string // the string being scanned
state stateFn // the next lexing function to enter
pos Pos // current position in the input
start Pos // start position of this item
width Pos // width of last rune read from input
lastPos Pos // position of most recent item returned by nextItem
items chan item // channel of scanned items
done chan struct{} // channel to signal lexer shutdown
}
// next returns the next rune in the input.
@ -103,7 +104,11 @@ func (l *lexer) backup() {
// emit passes an item back to the client.
func (l *lexer) emit(t itemType) {
l.items <- item{t, l.start, l.input[l.start:l.pos]}
select {
case l.items <- item{t, l.start, l.input[l.start:l.pos]}:
case <-l.done:
return
}
l.start = l.pos
}
@ -139,7 +144,11 @@ func (l *lexer) lineNumber() int {
// errorf returns an error token and terminates the scan by passing
// back a nil pointer that will be the next state, terminating l.nextItem.
func (l *lexer) errorf(format string, args ...any) stateFn {
l.items <- item{itemError, l.start, fmt.Sprintf(format, args...)}
select {
case l.items <- item{itemError, l.start, fmt.Sprintf(format, args...)}:
case <-l.done:
return nil
}
return nil
}
@ -155,15 +164,32 @@ func lex(input string) *lexer {
l := &lexer{
input: input,
items: make(chan item),
done: make(chan struct{}),
}
go l.run()
return l
}
// Close terminates the lexer goroutine.
func (l *lexer) Close() {
select {
case <-l.done:
// already closed
default:
close(l.done)
}
}
// run runs the state machine for the lexer.
func (l *lexer) run() {
defer close(l.items)
for l.state = lexItem; l.state != nil; {
l.state = l.state(l)
select {
case <-l.done:
return
default:
l.state = l.state(l)
}
}
}

@ -6,7 +6,9 @@ package parse
import (
"fmt"
"runtime"
"testing"
"time"
)
// Make the types prettyprint.
@ -167,3 +169,70 @@ func TestLex(t *testing.T) {
}
}
}
// TestLexerClose verifies that a lexer can be explicitly closed
func TestLexerClose(t *testing.T) {
// Create a lexer with some input
lexer := lex("1 + 2")
// Read one item to verify it's working
item := lexer.nextItem()
if item.typ != itemNumber || item.val != "1" {
t.Errorf("unexpected first item: %v", item)
}
// Close the lexer explicitly
lexer.Close()
// Verify the lexer's channel closes
select {
case _, ok := <-lexer.items:
if ok {
t.Fatal("lexer.items channel should be closed after lexer.Close()")
}
case <-time.After(100 * time.Millisecond):
t.Fatal("timed out waiting for lexer.items channel to close")
}
}
// TestParseErrorNoLeak verifies that lexer goroutines are properly terminated when Parse encounters errors
func TestParseErrorNoLeak(t *testing.T) {
// Count initial goroutines
initialGoroutines := runtime.NumGoroutine()
// Create several trees with parsing errors to check for leaks
for i := 0; i < 10; i++ {
tree := New()
input := "invalid expression with $"
err := tree.Parse(input)
// Verify that Parse returned an error
if err == nil {
t.Fatal("expected error but got nil")
}
// Verify that tree.lex is nil after an error
if tree.lex != nil {
t.Fatal("tree.lex was not set to nil after error")
}
}
// Poll for goroutine count to stabilize
deadline := time.Now().Add(500 * time.Millisecond)
var finalGoroutines int
for time.Now().Before(deadline) {
finalGoroutines = runtime.NumGoroutine()
// If we're close to the initial count, we can exit early
if finalGoroutines <= initialGoroutines+2 {
break
}
time.Sleep(10 * time.Millisecond)
}
// Check if we've leaked goroutines (with a small buffer for normal variations)
if finalGoroutines > initialGoroutines+5 {
t.Fatalf("Goroutine leak detected: started with %d goroutines, ended with %d (difference of %d)",
initialGoroutines, finalGoroutines, finalGoroutines-initialGoroutines)
}
}

@ -140,7 +140,10 @@ func (t *Tree) startParse(funcs []map[string]Func, lex *lexer) {
// stopParse terminates parsing.
func (t *Tree) stopParse() {
t.lex = nil
if t.lex != nil {
t.lex.Close()
t.lex = nil
}
}
// Parse parses the expression definition string to construct a representation

Loading…
Cancel
Save