|
|
|
@ -255,9 +255,9 @@ func TestManager_NextPending(t *testing.T) { |
|
|
|
|
|
|
|
|
|
// There should be no segments waiting to be flushed as no data has been
|
|
|
|
|
// written.
|
|
|
|
|
it, err := m.NextPending() |
|
|
|
|
s, err := m.NextPending() |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.Nil(t, it) |
|
|
|
|
require.Nil(t, s) |
|
|
|
|
|
|
|
|
|
// Append 1KB of data.
|
|
|
|
|
lbs := labels.Labels{{Name: "a", Value: "b"}} |
|
|
|
@ -271,14 +271,78 @@ func TestManager_NextPending(t *testing.T) { |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
|
// There should be a segment waiting to be flushed.
|
|
|
|
|
it, err = m.NextPending() |
|
|
|
|
s, err = m.NextPending() |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NotNil(t, it) |
|
|
|
|
require.NotNil(t, s) |
|
|
|
|
|
|
|
|
|
// There should be no more segments waiting to be flushed.
|
|
|
|
|
it, err = m.NextPending() |
|
|
|
|
s, err = m.NextPending() |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.Nil(t, it) |
|
|
|
|
require.Nil(t, s) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func TestManager_NextPendingAge(t *testing.T) { |
|
|
|
|
m, err := NewManager(Config{ |
|
|
|
|
MaxAge: 100 * time.Millisecond, |
|
|
|
|
MaxSegments: 1, |
|
|
|
|
MaxSegmentSize: 1024, // 1KB
|
|
|
|
|
}, NewMetrics(nil)) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
|
// Create a mock clock.
|
|
|
|
|
clock := quartz.NewMock(t) |
|
|
|
|
m.clock = clock |
|
|
|
|
|
|
|
|
|
// Append 1B of data.
|
|
|
|
|
lbs := labels.Labels{{Name: "a", Value: "b"}} |
|
|
|
|
entries := []*logproto.Entry{{Timestamp: time.Now(), Line: "c"}} |
|
|
|
|
res, err := m.Append(AppendRequest{ |
|
|
|
|
TenantID: "1", |
|
|
|
|
Labels: lbs, |
|
|
|
|
LabelsStr: lbs.String(), |
|
|
|
|
Entries: entries, |
|
|
|
|
}) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NotNil(t, res) |
|
|
|
|
|
|
|
|
|
// Wait 100ms. The segment that was just appended to should have reached
|
|
|
|
|
// the maximum age.
|
|
|
|
|
clock.Advance(100 * time.Millisecond) |
|
|
|
|
s, err := m.NextPending() |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NotNil(t, s) |
|
|
|
|
require.Equal(t, 100*time.Millisecond, s.Age()) |
|
|
|
|
m.Put(s) |
|
|
|
|
|
|
|
|
|
// Append 1KB of data using two separate append requests, 1ms apart.
|
|
|
|
|
entries = []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 512)}} |
|
|
|
|
res, err = m.Append(AppendRequest{ |
|
|
|
|
TenantID: "1", |
|
|
|
|
Labels: lbs, |
|
|
|
|
LabelsStr: lbs.String(), |
|
|
|
|
Entries: entries, |
|
|
|
|
}) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NotNil(t, res) |
|
|
|
|
|
|
|
|
|
// Wait 1ms and then append the rest of the data.
|
|
|
|
|
clock.Advance(time.Millisecond) |
|
|
|
|
entries = []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 512)}} |
|
|
|
|
res, err = m.Append(AppendRequest{ |
|
|
|
|
TenantID: "1", |
|
|
|
|
Labels: lbs, |
|
|
|
|
LabelsStr: lbs.String(), |
|
|
|
|
Entries: entries, |
|
|
|
|
}) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NotNil(t, res) |
|
|
|
|
|
|
|
|
|
// The segment that was just appended to should have reached the maximum
|
|
|
|
|
// size.
|
|
|
|
|
s, err = m.NextPending() |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NotNil(t, s) |
|
|
|
|
require.Equal(t, time.Millisecond, s.Age()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func TestManager_NextPendingMaxAgeExceeded(t *testing.T) { |
|
|
|
@ -307,18 +371,18 @@ func TestManager_NextPendingMaxAgeExceeded(t *testing.T) { |
|
|
|
|
|
|
|
|
|
// The segment that was just appended to has neither reached the maximum
|
|
|
|
|
// age nor maximum size to be flushed.
|
|
|
|
|
it, err := m.NextPending() |
|
|
|
|
s, err := m.NextPending() |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.Nil(t, it) |
|
|
|
|
require.Nil(t, s) |
|
|
|
|
require.Equal(t, 1, m.available.Len()) |
|
|
|
|
require.Equal(t, 0, m.pending.Len()) |
|
|
|
|
|
|
|
|
|
// Wait 100ms. The segment that was just appended to should have reached
|
|
|
|
|
// the maximum age.
|
|
|
|
|
clock.Advance(100 * time.Millisecond) |
|
|
|
|
it, err = m.NextPending() |
|
|
|
|
s, err = m.NextPending() |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NotNil(t, it) |
|
|
|
|
require.NotNil(t, s) |
|
|
|
|
require.Equal(t, 0, m.available.Len()) |
|
|
|
|
require.Equal(t, 0, m.pending.Len()) |
|
|
|
|
} |
|
|
|
@ -345,24 +409,24 @@ func TestManager_NextPendingWALClosed(t *testing.T) { |
|
|
|
|
|
|
|
|
|
// There should be no segments waiting to be flushed as neither the maximum
|
|
|
|
|
// age nor maximum size has been exceeded.
|
|
|
|
|
it, err := m.NextPending() |
|
|
|
|
s, err := m.NextPending() |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.Nil(t, it) |
|
|
|
|
require.Nil(t, s) |
|
|
|
|
|
|
|
|
|
// Close the WAL.
|
|
|
|
|
m.Close() |
|
|
|
|
|
|
|
|
|
// There should be one segment waiting to be flushed.
|
|
|
|
|
it, err = m.NextPending() |
|
|
|
|
s, err = m.NextPending() |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NotNil(t, it) |
|
|
|
|
require.NotNil(t, s) |
|
|
|
|
|
|
|
|
|
// There are no more segments waiting to be flushed, and since the WAL is
|
|
|
|
|
// closed, successive calls should return ErrClosed.
|
|
|
|
|
for i := 0; i < 10; i++ { |
|
|
|
|
it, err = m.NextPending() |
|
|
|
|
s, err = m.NextPending() |
|
|
|
|
require.ErrorIs(t, err, ErrClosed) |
|
|
|
|
require.Nil(t, it) |
|
|
|
|
require.Nil(t, s) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -395,22 +459,22 @@ func TestManager_Put(t *testing.T) { |
|
|
|
|
require.Equal(t, 1, m.pending.Len()) |
|
|
|
|
|
|
|
|
|
// Getting the pending segment should remove it from the list.
|
|
|
|
|
it, err := m.NextPending() |
|
|
|
|
s, err := m.NextPending() |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NotNil(t, it) |
|
|
|
|
require.NotNil(t, s) |
|
|
|
|
require.Equal(t, 0, m.available.Len()) |
|
|
|
|
require.Equal(t, 0, m.pending.Len()) |
|
|
|
|
|
|
|
|
|
// The segment should contain 1KB of data.
|
|
|
|
|
require.Equal(t, int64(1024), it.Writer.InputSize()) |
|
|
|
|
require.Equal(t, int64(1024), s.Writer.InputSize()) |
|
|
|
|
|
|
|
|
|
// Putting it back should add it to the available list.
|
|
|
|
|
m.Put(it) |
|
|
|
|
m.Put(s) |
|
|
|
|
require.Equal(t, 1, m.available.Len()) |
|
|
|
|
require.Equal(t, 0, m.pending.Len()) |
|
|
|
|
|
|
|
|
|
// The segment should be reset.
|
|
|
|
|
require.Equal(t, int64(0), it.Writer.InputSize()) |
|
|
|
|
require.Equal(t, int64(0), s.Writer.InputSize()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func TestManager_Metrics(t *testing.T) { |
|
|
|
@ -465,9 +529,9 @@ wal_segments_pending 1 |
|
|
|
|
require.NoError(t, testutil.CollectAndCompare(r, strings.NewReader(expected), metricNames...)) |
|
|
|
|
|
|
|
|
|
// Get the segment from the pending list.
|
|
|
|
|
it, err := m.NextPending() |
|
|
|
|
s, err := m.NextPending() |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NotNil(t, it) |
|
|
|
|
require.NotNil(t, s) |
|
|
|
|
expected = ` |
|
|
|
|
# HELP wal_segments_available The number of WAL segments accepting writes. |
|
|
|
|
# TYPE wal_segments_available gauge |
|
|
|
@ -482,7 +546,7 @@ wal_segments_pending 0 |
|
|
|
|
require.NoError(t, testutil.CollectAndCompare(r, strings.NewReader(expected), metricNames...)) |
|
|
|
|
|
|
|
|
|
// Reset the segment and put it back in the available list.
|
|
|
|
|
m.Put(it) |
|
|
|
|
m.Put(s) |
|
|
|
|
expected = ` |
|
|
|
|
# HELP wal_segments_available The number of WAL segments accepting writes. |
|
|
|
|
# TYPE wal_segments_available gauge |
|
|
|
|