diff --git a/pkg/engine/compat.go b/pkg/engine/compat.go index 29ff9879f4..b61af8e9d6 100644 --- a/pkg/engine/compat.go +++ b/pkg/engine/compat.go @@ -285,11 +285,6 @@ func collectSamplesFromRow(builder *labels.Builder, rec arrow.Record, i int) (pr // TODO: we add a lot of overhead by reading row by row. Switch to vectorized conversion. for colIdx := range int(rec.NumCols()) { col := rec.Column(colIdx) - // Ignore column values that are NULL or invalid - if col.IsNull(i) || !col.IsValid(i) { - return promql.Sample{}, false - } - field := rec.Schema().Field(colIdx) ident, err := semconv.ParseFQN(field.Name) if err != nil { @@ -300,12 +295,20 @@ func collectSamplesFromRow(builder *labels.Builder, rec arrow.Record, i int) (pr // Extract timestamp if ident.Equal(semconv.ColumnIdentTimestamp) { + // Ignore column values that are NULL or invalid + if col.IsNull(i) || !col.IsValid(i) { + return promql.Sample{}, false + } // [promql.Sample] expects milliseconds as timestamp unit sample.T = int64(col.(*array.Timestamp).Value(i) / 1e6) continue } if ident.Equal(semconv.ColumnIdentValue) { + // Ignore column values that are NULL or invalid + if col.IsNull(i) || !col.IsValid(i) { + return promql.Sample{}, false + } col, ok := col.(*array.Float64) if !ok { return promql.Sample{}, false diff --git a/pkg/engine/compat_test.go b/pkg/engine/compat_test.go index a0e98eb299..273eeeab3f 100644 --- a/pkg/engine/compat_test.go +++ b/pkg/engine/compat_test.go @@ -212,6 +212,7 @@ func TestVectorResultBuilder(t *testing.T) { rows := arrowtest.Rows{ {colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): float64(42), colInst.FQN(): "localhost:9090", colJob.FQN(): "prometheus"}, {colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): float64(23), colInst.FQN(): "localhost:9100", colJob.FQN(): "node-exporter"}, + {colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): float64(32), colInst.FQN(): nil, colJob.FQN(): "node-exporter"}, {colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): float64(15), colInst.FQN(): "localhost:9100", colJob.FQN(): "prometheus"}, } @@ -225,12 +226,12 @@ func TestVectorResultBuilder(t *testing.T) { err := collectResult(context.Background(), pipeline, builder) require.NoError(t, err) - require.Equal(t, 3, builder.Len()) + require.Equal(t, 4, builder.Len()) md, _ := metadata.NewContext(t.Context()) result := builder.Build(stats.Result{}, md) vector := result.Data.(promql.Vector) - require.Equal(t, 3, len(vector)) + require.Equal(t, 4, len(vector)) expected := promql.Vector{ { @@ -248,6 +249,11 @@ func TestVectorResultBuilder(t *testing.T) { F: 15.0, Metric: labels.FromStrings("instance", "localhost:9100", "job", "prometheus"), }, + { + T: int64(1620000000000), + F: 32.0, + Metric: labels.FromStrings("job", "node-exporter"), + }, } require.Equal(t, expected, vector) })