|
|
|
@ -87,19 +87,15 @@ func TestManager_AppendFailed(t *testing.T) { |
|
|
|
|
require.EqualError(t, res.Err(), "failed to flush") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func TestManager_AppendMaxAge(t *testing.T) { |
|
|
|
|
func TestManager_AppendFailedWALClosed(t *testing.T) { |
|
|
|
|
m, err := NewManager(Config{ |
|
|
|
|
MaxAge: 100 * time.Millisecond, |
|
|
|
|
MaxSegments: 1, |
|
|
|
|
MaxSegmentSize: 8 * 1024 * 1024, // 8MB
|
|
|
|
|
MaxAge: 30 * time.Second, |
|
|
|
|
MaxSegments: 10, |
|
|
|
|
MaxSegmentSize: 1024, // 1KB
|
|
|
|
|
}, NewMetrics(nil)) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
|
// Create a mock clock.
|
|
|
|
|
clock := quartz.NewMock(t) |
|
|
|
|
m.clock = clock |
|
|
|
|
|
|
|
|
|
// Append 1B of data.
|
|
|
|
|
// Append some data.
|
|
|
|
|
lbs := labels.Labels{{Name: "a", Value: "b"}} |
|
|
|
|
entries := []*logproto.Entry{{Timestamp: time.Now(), Line: "c"}} |
|
|
|
|
res, err := m.Append(AppendRequest{ |
|
|
|
@ -111,40 +107,70 @@ func TestManager_AppendMaxAge(t *testing.T) { |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NotNil(t, res) |
|
|
|
|
|
|
|
|
|
// The segment that was just appended to has neither reached the maximum
|
|
|
|
|
// age nor maximum size to be flushed.
|
|
|
|
|
require.Equal(t, 1, m.available.Len()) |
|
|
|
|
require.Equal(t, 0, m.pending.Len()) |
|
|
|
|
// Close the WAL.
|
|
|
|
|
m.Close() |
|
|
|
|
|
|
|
|
|
// Wait 100ms and append some more data.
|
|
|
|
|
clock.Advance(100 * time.Millisecond) |
|
|
|
|
entries = []*logproto.Entry{{Timestamp: time.Now(), Line: "c"}} |
|
|
|
|
// Should not be able to append more data as the WAL is closed.
|
|
|
|
|
res, err = m.Append(AppendRequest{ |
|
|
|
|
TenantID: "1", |
|
|
|
|
Labels: lbs, |
|
|
|
|
LabelsStr: lbs.String(), |
|
|
|
|
Entries: entries, |
|
|
|
|
}) |
|
|
|
|
require.Nil(t, res) |
|
|
|
|
require.ErrorIs(t, err, ErrClosed) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func TestManager_AppendFailedWALFull(t *testing.T) { |
|
|
|
|
m, err := NewManager(Config{ |
|
|
|
|
MaxAge: 30 * time.Second, |
|
|
|
|
MaxSegments: 10, |
|
|
|
|
MaxSegmentSize: 1024, // 1KB
|
|
|
|
|
}, NewMetrics(nil)) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NotNil(t, res) |
|
|
|
|
|
|
|
|
|
// The segment has reached the maximum age and should have been moved to
|
|
|
|
|
// pending list to be flushed.
|
|
|
|
|
require.Equal(t, 0, m.available.Len()) |
|
|
|
|
require.Equal(t, 1, m.pending.Len()) |
|
|
|
|
// Should be able to write 100KB of data, 10KB per segment.
|
|
|
|
|
lbs := labels.Labels{{Name: "a", Value: "b"}} |
|
|
|
|
for i := 0; i < 10; i++ { |
|
|
|
|
entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 1024)}} |
|
|
|
|
res, err := m.Append(AppendRequest{ |
|
|
|
|
TenantID: "1", |
|
|
|
|
Labels: lbs, |
|
|
|
|
LabelsStr: lbs.String(), |
|
|
|
|
Entries: entries, |
|
|
|
|
}) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NotNil(t, res) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// However, appending more data should fail as all segments are full and
|
|
|
|
|
// waiting to be flushed.
|
|
|
|
|
entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 1024)}} |
|
|
|
|
res, err := m.Append(AppendRequest{ |
|
|
|
|
TenantID: "1", |
|
|
|
|
Labels: lbs, |
|
|
|
|
LabelsStr: lbs.String(), |
|
|
|
|
Entries: entries, |
|
|
|
|
}) |
|
|
|
|
require.ErrorIs(t, err, ErrFull) |
|
|
|
|
require.Nil(t, res) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func TestManager_AppendMaxSize(t *testing.T) { |
|
|
|
|
func TestManager_AppendMaxAgeExceeded(t *testing.T) { |
|
|
|
|
m, err := NewManager(Config{ |
|
|
|
|
MaxAge: 30 * time.Second, |
|
|
|
|
MaxAge: 100 * time.Millisecond, |
|
|
|
|
MaxSegments: 1, |
|
|
|
|
MaxSegmentSize: 1024, // 1KB
|
|
|
|
|
MaxSegmentSize: 8 * 1024 * 1024, // 8MB
|
|
|
|
|
}, NewMetrics(nil)) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
|
// Append 512B of data.
|
|
|
|
|
// Create a mock clock.
|
|
|
|
|
clock := quartz.NewMock(t) |
|
|
|
|
m.clock = clock |
|
|
|
|
|
|
|
|
|
// Append 1B of data.
|
|
|
|
|
lbs := labels.Labels{{Name: "a", Value: "b"}} |
|
|
|
|
entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 512)}} |
|
|
|
|
entries := []*logproto.Entry{{Timestamp: time.Now(), Line: "c"}} |
|
|
|
|
res, err := m.Append(AppendRequest{ |
|
|
|
|
TenantID: "1", |
|
|
|
|
Labels: lbs, |
|
|
|
@ -159,8 +185,9 @@ func TestManager_AppendMaxSize(t *testing.T) { |
|
|
|
|
require.Equal(t, 1, m.available.Len()) |
|
|
|
|
require.Equal(t, 0, m.pending.Len()) |
|
|
|
|
|
|
|
|
|
// Append another 512B of data.
|
|
|
|
|
entries = []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 512)}} |
|
|
|
|
// Wait 100ms and append some more data.
|
|
|
|
|
clock.Advance(100 * time.Millisecond) |
|
|
|
|
entries = []*logproto.Entry{{Timestamp: time.Now(), Line: "c"}} |
|
|
|
|
res, err = m.Append(AppendRequest{ |
|
|
|
|
TenantID: "1", |
|
|
|
|
Labels: lbs, |
|
|
|
@ -170,23 +197,23 @@ func TestManager_AppendMaxSize(t *testing.T) { |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NotNil(t, res) |
|
|
|
|
|
|
|
|
|
// The segment has reached the maximum size and should have been moved to
|
|
|
|
|
// The segment has reached the maximum age and should have been moved to
|
|
|
|
|
// pending list to be flushed.
|
|
|
|
|
require.Equal(t, 0, m.available.Len()) |
|
|
|
|
require.Equal(t, 1, m.pending.Len()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func TestManager_AppendWALClosed(t *testing.T) { |
|
|
|
|
func TestManager_AppendMaxSizeExceeded(t *testing.T) { |
|
|
|
|
m, err := NewManager(Config{ |
|
|
|
|
MaxAge: 30 * time.Second, |
|
|
|
|
MaxSegments: 10, |
|
|
|
|
MaxSegments: 1, |
|
|
|
|
MaxSegmentSize: 1024, // 1KB
|
|
|
|
|
}, NewMetrics(nil)) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
|
// Append some data.
|
|
|
|
|
// Append 512B of data.
|
|
|
|
|
lbs := labels.Labels{{Name: "a", Value: "b"}} |
|
|
|
|
entries := []*logproto.Entry{{Timestamp: time.Now(), Line: "c"}} |
|
|
|
|
entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 512)}} |
|
|
|
|
res, err := m.Append(AppendRequest{ |
|
|
|
|
TenantID: "1", |
|
|
|
|
Labels: lbs, |
|
|
|
@ -196,53 +223,26 @@ func TestManager_AppendWALClosed(t *testing.T) { |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NotNil(t, res) |
|
|
|
|
|
|
|
|
|
// Close the WAL.
|
|
|
|
|
m.Close() |
|
|
|
|
// The segment that was just appended to has neither reached the maximum
|
|
|
|
|
// age nor maximum size to be flushed.
|
|
|
|
|
require.Equal(t, 1, m.available.Len()) |
|
|
|
|
require.Equal(t, 0, m.pending.Len()) |
|
|
|
|
|
|
|
|
|
// Should not be able to append more data as the WAL is closed.
|
|
|
|
|
// Append another 512B of data.
|
|
|
|
|
entries = []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 512)}} |
|
|
|
|
res, err = m.Append(AppendRequest{ |
|
|
|
|
TenantID: "1", |
|
|
|
|
Labels: lbs, |
|
|
|
|
LabelsStr: lbs.String(), |
|
|
|
|
Entries: entries, |
|
|
|
|
}) |
|
|
|
|
require.Nil(t, res) |
|
|
|
|
require.ErrorIs(t, err, ErrClosed) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func TestManager_AppendWALFull(t *testing.T) { |
|
|
|
|
m, err := NewManager(Config{ |
|
|
|
|
MaxAge: 30 * time.Second, |
|
|
|
|
MaxSegments: 10, |
|
|
|
|
MaxSegmentSize: 1024, // 1KB
|
|
|
|
|
}, NewMetrics(nil)) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NotNil(t, res) |
|
|
|
|
|
|
|
|
|
// Should be able to write 100KB of data, 10KB per segment.
|
|
|
|
|
lbs := labels.Labels{{Name: "a", Value: "b"}} |
|
|
|
|
for i := 0; i < 10; i++ { |
|
|
|
|
entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 1024)}} |
|
|
|
|
res, err := m.Append(AppendRequest{ |
|
|
|
|
TenantID: "1", |
|
|
|
|
Labels: lbs, |
|
|
|
|
LabelsStr: lbs.String(), |
|
|
|
|
Entries: entries, |
|
|
|
|
}) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NotNil(t, res) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// However, appending more data should fail as all segments are full and
|
|
|
|
|
// waiting to be flushed.
|
|
|
|
|
entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 1024)}} |
|
|
|
|
res, err := m.Append(AppendRequest{ |
|
|
|
|
TenantID: "1", |
|
|
|
|
Labels: lbs, |
|
|
|
|
LabelsStr: lbs.String(), |
|
|
|
|
Entries: entries, |
|
|
|
|
}) |
|
|
|
|
require.ErrorIs(t, err, ErrFull) |
|
|
|
|
require.Nil(t, res) |
|
|
|
|
// The segment has reached the maximum size and should have been moved to
|
|
|
|
|
// pending list to be flushed.
|
|
|
|
|
require.Equal(t, 0, m.available.Len()) |
|
|
|
|
require.Equal(t, 1, m.pending.Len()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func TestManager_NextPending(t *testing.T) { |
|
|
|
@ -281,15 +281,19 @@ func TestManager_NextPending(t *testing.T) { |
|
|
|
|
require.Nil(t, it) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func TestManager_NextPendingClosed(t *testing.T) { |
|
|
|
|
func TestManager_NextPendingMaxAgeExceeded(t *testing.T) { |
|
|
|
|
m, err := NewManager(Config{ |
|
|
|
|
MaxAge: 30 * time.Second, |
|
|
|
|
MaxSegments: 10, |
|
|
|
|
MaxAge: 100 * time.Millisecond, |
|
|
|
|
MaxSegments: 1, |
|
|
|
|
MaxSegmentSize: 1024, // 1KB
|
|
|
|
|
}, NewMetrics(nil)) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
|
// Append some data.
|
|
|
|
|
// Create a mock clock.
|
|
|
|
|
clock := quartz.NewMock(t) |
|
|
|
|
m.clock = clock |
|
|
|
|
|
|
|
|
|
// Append 1B of data.
|
|
|
|
|
lbs := labels.Labels{{Name: "a", Value: "b"}} |
|
|
|
|
entries := []*logproto.Entry{{Timestamp: time.Now(), Line: "c"}} |
|
|
|
|
res, err := m.Append(AppendRequest{ |
|
|
|
@ -301,40 +305,33 @@ func TestManager_NextPendingClosed(t *testing.T) { |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NotNil(t, res) |
|
|
|
|
|
|
|
|
|
// There should be no segments waiting to be flushed as neither the maximum
|
|
|
|
|
// age nor maximum size has been exceeded.
|
|
|
|
|
// The segment that was just appended to has neither reached the maximum
|
|
|
|
|
// age nor maximum size to be flushed.
|
|
|
|
|
it, err := m.NextPending() |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.Nil(t, it) |
|
|
|
|
require.Equal(t, 1, m.available.Len()) |
|
|
|
|
require.Equal(t, 0, m.pending.Len()) |
|
|
|
|
|
|
|
|
|
// Close the WAL.
|
|
|
|
|
m.Close() |
|
|
|
|
|
|
|
|
|
// There should be one segment waiting to be flushed.
|
|
|
|
|
// Wait 100ms. The segment that was just appended to should have reached
|
|
|
|
|
// the maximum age.
|
|
|
|
|
clock.Advance(100 * time.Millisecond) |
|
|
|
|
it, err = m.NextPending() |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NotNil(t, it) |
|
|
|
|
|
|
|
|
|
// There are no more segments waiting to be flushed, and since the WAL is
|
|
|
|
|
// closed, successive calls should return ErrClosed.
|
|
|
|
|
it, err = m.NextPending() |
|
|
|
|
require.ErrorIs(t, err, ErrClosed) |
|
|
|
|
require.Nil(t, it) |
|
|
|
|
require.Equal(t, 0, m.available.Len()) |
|
|
|
|
require.Equal(t, 0, m.pending.Len()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func TestManager_NexPendingMaxAge(t *testing.T) { |
|
|
|
|
func TestManager_NextPendingWALClosed(t *testing.T) { |
|
|
|
|
m, err := NewManager(Config{ |
|
|
|
|
MaxAge: 100 * time.Millisecond, |
|
|
|
|
MaxAge: 30 * time.Second, |
|
|
|
|
MaxSegments: 1, |
|
|
|
|
MaxSegmentSize: 1024, // 1KB
|
|
|
|
|
}, NewMetrics(nil)) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
|
// Create a mock clock.
|
|
|
|
|
clock := quartz.NewMock(t) |
|
|
|
|
m.clock = clock |
|
|
|
|
|
|
|
|
|
// Append 1B of data.
|
|
|
|
|
// Append some data.
|
|
|
|
|
lbs := labels.Labels{{Name: "a", Value: "b"}} |
|
|
|
|
entries := []*logproto.Entry{{Timestamp: time.Now(), Line: "c"}} |
|
|
|
|
res, err := m.Append(AppendRequest{ |
|
|
|
@ -346,22 +343,27 @@ func TestManager_NexPendingMaxAge(t *testing.T) { |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NotNil(t, res) |
|
|
|
|
|
|
|
|
|
// The segment that was just appended to has neither reached the maximum
|
|
|
|
|
// age nor maximum size to be flushed.
|
|
|
|
|
// There should be no segments waiting to be flushed as neither the maximum
|
|
|
|
|
// age nor maximum size has been exceeded.
|
|
|
|
|
it, err := m.NextPending() |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.Nil(t, it) |
|
|
|
|
require.Equal(t, 1, m.available.Len()) |
|
|
|
|
require.Equal(t, 0, m.pending.Len()) |
|
|
|
|
|
|
|
|
|
// Wait 100ms. The segment that was just appended to should have reached
|
|
|
|
|
// the maximum age.
|
|
|
|
|
clock.Advance(100 * time.Millisecond) |
|
|
|
|
// Close the WAL.
|
|
|
|
|
m.Close() |
|
|
|
|
|
|
|
|
|
// There should be one segment waiting to be flushed.
|
|
|
|
|
it, err = m.NextPending() |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NotNil(t, it) |
|
|
|
|
require.Equal(t, 0, m.available.Len()) |
|
|
|
|
require.Equal(t, 0, m.pending.Len()) |
|
|
|
|
|
|
|
|
|
// There are no more segments waiting to be flushed, and since the WAL is
|
|
|
|
|
// closed, successive calls should return ErrClosed.
|
|
|
|
|
for i := 0; i < 10; i++ { |
|
|
|
|
it, err = m.NextPending() |
|
|
|
|
require.ErrorIs(t, err, ErrClosed) |
|
|
|
|
require.Nil(t, it) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func TestManager_Put(t *testing.T) { |
|
|
|
@ -493,5 +495,4 @@ wal_segments_flushing 0 |
|
|
|
|
wal_segments_pending 0 |
|
|
|
|
` |
|
|
|
|
require.NoError(t, testutil.CollectAndCompare(r, strings.NewReader(expected), metricNames...)) |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|