|
|
|
|
@ -2,6 +2,7 @@ package loki |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"bytes" |
|
|
|
|
"context" |
|
|
|
|
"flag" |
|
|
|
|
"fmt" |
|
|
|
|
"io" |
|
|
|
|
@ -177,6 +178,13 @@ func getRandomPorts(n int) []int { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func TestLoki_CustomRunOptsBehavior(t *testing.T) { |
|
|
|
|
t.Parallel() |
|
|
|
|
// Set an overall test timeout
|
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) |
|
|
|
|
defer cancel() |
|
|
|
|
|
|
|
|
|
// Create a channel to signal test completion
|
|
|
|
|
done := make(chan struct{}) |
|
|
|
|
ports := getRandomPorts(2) |
|
|
|
|
httpPort := ports[0] |
|
|
|
|
grpcPort := ports[1] |
|
|
|
|
@ -210,27 +218,33 @@ schema_config: |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
|
lokiHealthCheck := func() error { |
|
|
|
|
// wait for Loki HTTP server to be ready.
|
|
|
|
|
// retries at most 10 times (1 second in total) to avoid infinite loops when no timeout is set.
|
|
|
|
|
for i := 0; i < 10; i++ { |
|
|
|
|
// waits until request to /ready doesn't error.
|
|
|
|
|
resp, err := http.DefaultClient.Get(fmt.Sprintf("http://localhost:%d/ready", httpPort)) |
|
|
|
|
if err != nil { |
|
|
|
|
time.Sleep(time.Millisecond * 200) |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// waits until /ready returns OK.
|
|
|
|
|
if resp.StatusCode != http.StatusOK { |
|
|
|
|
time.Sleep(time.Millisecond * 200) |
|
|
|
|
continue |
|
|
|
|
// Set an overall timeout for health check attempts
|
|
|
|
|
timeout := time.After(5 * time.Second) |
|
|
|
|
ticker := time.NewTicker(200 * time.Millisecond) |
|
|
|
|
defer ticker.Stop() |
|
|
|
|
|
|
|
|
|
// Loop until timeout or success
|
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
case <-timeout: |
|
|
|
|
return fmt.Errorf("timeout waiting for loki HTTP to become healthy") |
|
|
|
|
case <-ticker.C: |
|
|
|
|
// Try to connect to the /ready endpoint
|
|
|
|
|
resp, err := http.DefaultClient.Get(fmt.Sprintf("http://0.0.0.0:%d/ready", httpPort)) |
|
|
|
|
if err != nil { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Check if status is OK
|
|
|
|
|
if resp.StatusCode != http.StatusOK { |
|
|
|
|
resp.Body.Close() |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
resp.Body.Close() |
|
|
|
|
return nil // Loki is healthy
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Loki is healthy.
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return fmt.Errorf("loki HTTP not healthy") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
customHandlerInvoked := false |
|
|
|
|
@ -240,24 +254,76 @@ schema_config: |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Create a context for server shutdown
|
|
|
|
|
srvCtx, srvCancel := context.WithCancel(context.Background()) |
|
|
|
|
|
|
|
|
|
// Run Loki querier in a different go routine and with custom /config handler.
|
|
|
|
|
go func() { |
|
|
|
|
err := loki.Run(RunOpts{CustomConfigEndpointHandlerFn: customHandler}) |
|
|
|
|
defer close(done) |
|
|
|
|
runOpts := RunOpts{ |
|
|
|
|
CustomConfigEndpointHandlerFn: customHandler, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Create a separate goroutine to stop Loki when test is done or times out
|
|
|
|
|
go func() { |
|
|
|
|
select { |
|
|
|
|
case <-ctx.Done(): |
|
|
|
|
t.Logf("Test timeout reached, stopping Loki server") |
|
|
|
|
loki.SignalHandler.Stop() |
|
|
|
|
case <-srvCtx.Done(): |
|
|
|
|
t.Logf("Test completed, stopping Loki server") |
|
|
|
|
loki.SignalHandler.Stop() |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
err := loki.Run(runOpts) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
err = lokiHealthCheck() |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
// Run the test in a goroutine to handle timeout properly
|
|
|
|
|
go func() { |
|
|
|
|
// Using a separate function to handle any panics in the test goroutine
|
|
|
|
|
defer srvCancel() // Always cancel the context when this function exits
|
|
|
|
|
|
|
|
|
|
resp, err := http.DefaultClient.Get(fmt.Sprintf("http://localhost:%d/config", httpPort)) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
// Wait for Loki to be healthy
|
|
|
|
|
err = lokiHealthCheck() |
|
|
|
|
if err != nil { |
|
|
|
|
t.Errorf("Health check failed: %v", err) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
defer resp.Body.Close() |
|
|
|
|
// Make request to custom handler
|
|
|
|
|
resp, err := http.DefaultClient.Get(fmt.Sprintf("http://0.0.0.0:%d/config", httpPort)) |
|
|
|
|
if err != nil { |
|
|
|
|
t.Errorf("Failed to get config: %v", err) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
defer resp.Body.Close() |
|
|
|
|
|
|
|
|
|
bBytes, err := io.ReadAll(resp.Body) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.Equal(t, string(bBytes), "abc") |
|
|
|
|
assert.True(t, customHandlerInvoked) |
|
|
|
|
bBytes, err := io.ReadAll(resp.Body) |
|
|
|
|
if err != nil { |
|
|
|
|
t.Errorf("Failed to read response: %v", err) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Verify results
|
|
|
|
|
if string(bBytes) != "abc" { |
|
|
|
|
t.Errorf("Expected response 'abc', got '%s'", string(bBytes)) |
|
|
|
|
} |
|
|
|
|
if !customHandlerInvoked { |
|
|
|
|
t.Errorf("Custom handler was not invoked") |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
// Wait for either test completion or timeout
|
|
|
|
|
select { |
|
|
|
|
case <-ctx.Done(): |
|
|
|
|
t.Fatalf("Test timed out after 30 seconds") |
|
|
|
|
case <-done: |
|
|
|
|
// Test completed successfully
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Clean up metrics
|
|
|
|
|
unregisterLokiMetrics(loki) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|