Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

* Fixed nil pointer panic in integration tests caused by missing backoff configuration when using `sink.New` directly.

* Fixed PostgreSQL error "cannot determine type of empty array" when inserting rows with empty `repeated` (array) fields in `from-proto` mode. PostgreSQL requires explicit type casts for empty array literals, which are now automatically generated based on the column type.

## v4.12.0

### DatabaseChanges mode improvements
Expand Down
6 changes: 5 additions & 1 deletion db_proto/sql/postgres/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,12 @@ func ValueToString(value any, bytesEncoding bytes.Encoding) (s string) {
s = "'" + v.Format(time.RFC3339) + "'"
case *timestamppb.Timestamp:
s = "'" + v.AsTime().Format(time.RFC3339) + "'"
// Handle array types for PostgreSQL
case []interface{}:
if len(v) == 0 {
s = "'{}'"
return
}

var elements []string
for _, elem := range v {
elements = append(elements, ValueToString(elem, bytesEncoding))
Expand Down
10 changes: 10 additions & 0 deletions db_proto/sql/schema/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,13 @@ func (t *Table) processColumns(descriptor protoreflect.MessageDescriptor) error

return nil
}

// ColumnByFieldName returns the column matching the given protobuf field name, or nil if not found.
func (t *Table) ColumnByFieldName(fieldName string) *Column {
for _, col := range t.Columns {
if col.FieldDescriptor != nil && string(col.FieldDescriptor.Name()) == fieldName {
return col
}
}
return nil
}
61 changes: 61 additions & 0 deletions tests/integration/db_proto_clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/modules/clickhouse"
"google.golang.org/protobuf/types/known/timestamppb"
)

func TestDbProtoClickhouseIntegration(t *testing.T) {
Expand All @@ -41,6 +42,12 @@ func TestDbProtoClickhouseIntegration(t *testing.T) {
}
}

equalsTypesTestRows := func(expected []*TypesTestRow) func(t *testing.T, dbx *sqlx.DB) {
return func(t *testing.T, dbx *sqlx.DB) {
require.Equal(t, expected, readRowsBy[TypesTestRow](t, dbx, "types_tests", "id"))
}
}

testCases := []struct {
name string
responses []*pbsubstreamsrpc.Response
Expand Down Expand Up @@ -81,6 +88,23 @@ func TestDbProtoClickhouseIntegration(t *testing.T) {
{rowMeta(t, 1, "2025-01-01"), "o1", "c1"},
}),
},
{
// This test case verifies that ClickHouse correctly handles empty repeated fields.
// Unlike PostgreSQL, ClickHouse can infer the array type from the column definition
// in the schema, so empty arrays work correctly.
//
// This test serves as a regression test to ensure ClickHouse continues to handle
// empty arrays properly.
"types_test with empty repeated string field",
streamMock(
relationsBlockData(t, "1a", "2025-01-01",
entityTypesTestWithEmptyRepeatedString(1),
),
),
equalsTypesTestRows([]*TypesTestRow{
{rowMeta(t, 1, "2025-01-01"), 1},
}),
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -278,3 +302,40 @@ type OrderRow struct {
OrderID string `db:"order_id"`
CustomerID string `db:"customer_ref_id"`
}

// TypesTestRow represents a row from the types_tests table for assertions
type TypesTestRow struct {
Meta
ID uint64 `db:"id"`
}

// entityTypesTestWithEmptyRepeatedString creates a TypesTest entity with an empty
// repeated string field to reproduce the "cannot determine type of empty array" error.
func entityTypesTestWithEmptyRepeatedString(id uint64) *pbrelations.Entity {
return &pbrelations.Entity{
Entity: &pbrelations.Entity_TypesTest{
TypesTest: &pbrelations.TypesTest{
Id: id,
// RepeatedStringField is intentionally left nil/empty to reproduce
// the PostgreSQL error: "cannot determine type of empty array"
RepeatedStringField: nil,
// TimestampField must be set for ClickHouse to avoid panic on nil timestamp
TimestampField: timestamppb.New(time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)),
// These fields have type conversions and require valid numeric strings (not empty)
Str_2Int128: "0",
Str_2Uint128: "0",
Str_2Int256: "0",
Str_2Uint256: "0",
Str_2Decimal128: "0",
Str_2Decimal256: "0",
// Optional numeric conversion field - must be set to valid value for PostgreSQL
// (empty string fails with "invalid input syntax for type numeric")
OptionalStr_2Uint256: ptr("0"),
},
},
}
}

func ptr[T any](v T) *T {
return &v
}
141 changes: 141 additions & 0 deletions tests/integration/db_proto_postgres_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package tests

import (
"context"
"database/sql"
"fmt"
"strings"
"testing"

"github.com/cenkalti/backoff/v4"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
"github.com/streamingfast/bstream"
sink "github.com/streamingfast/substreams/sink"
"github.com/streamingfast/substreams-sink-sql/db_proto"
pbrelations "github.com/streamingfast/substreams-sink-sql/pb/test/relations"
"github.com/streamingfast/substreams/manifest"
pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
"github.com/stretchr/testify/require"
)

func TestDbProtoPostgresIntegration(t *testing.T) {
outputMessageDescriptor := (*pbrelations.Output)(nil).ProtoReflect().Descriptor()

postgresContainer := sharedDbChangesPostgresContainer

streamMock := func(responses ...*pbsubstreamsrpc.Response) []*pbsubstreamsrpc.Response {
return responses
}

equalsTypesTestRows := func(expected []*TypesTestRow) func(t *testing.T, dbx *sqlx.DB, schema string) {
return func(t *testing.T, dbx *sqlx.DB, schema string) {
rows := readRowsBy[TypesTestRow](t, dbx, fmt.Sprintf(`"%s"."types_tests"`, schema), "id")
require.Len(t, rows, len(expected))
for i, exp := range expected {
actual := rows[i]
// Compare time values using Equal() to ignore timezone location differences
require.True(t, exp.BlockTime.Equal(actual.BlockTime), "BlockTime mismatch at index %d: expected %v, got %v", i, exp.BlockTime, actual.BlockTime)
require.Equal(t, exp.IsDeleted, actual.IsDeleted)
require.Equal(t, exp.BlockNumber, actual.BlockNumber)
require.Equal(t, exp.ID, actual.ID)
}
}
}

testCases := []struct {
name string
responses []*pbsubstreamsrpc.Response
expected func(t *testing.T, dbx *sqlx.DB, schema string)
}{
{
// This test case reproduces the issue where PostgreSQL cannot determine the type
// of an empty array. When a repeated field is empty (nil or empty slice), the
// generated INSERT statement uses '{}' for the array, but PostgreSQL doesn't know
// what type to use for the empty array.
//
// Error: pq: cannot determine type of empty array
"types_test with empty repeated string field",
streamMock(
relationsBlockData(t, "1a", "2025-01-01",
entityTypesTestWithEmptyRepeatedString(1),
),
),
equalsTypesTestRows([]*TypesTestRow{
{rowMeta(t, 1, "2025-01-01"), 1},
}),
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
pattern := make([]interface{}, len(tc.responses))
for i, resp := range tc.responses {
pattern[i] = resp
}
substreamsClientConfig := setupFakeSubstreamsServer(t, pattern...)
substreamsPackage := substreamsTestPackage(pbrelations.File_test_relations_relations_proto, outputMessageDescriptor)

baseSink, err := sink.New(
sink.SubstreamsModeProduction,
false,
substreamsPackage,
substreamsPackage.Modules.Modules[0],
manifest.ModuleHash{},
substreamsClientConfig,
logger,
tracer,
sink.WithBlockRange(bstream.MustParseRange("1-2", bstream.WithExclusiveEnd())),
sink.WithRetryBackOff(&backoff.StopBackOff{}),
)
require.NoError(t, err)

options := db_proto.SinkerFactoryOptions{
UseProtoOption: true,
UseConstraints: false,
UseTransactions: true,
BlockBatchSize: 1,
Parallel: false,
}.Defaults()

sinkerFactory := db_proto.SinkerFactory(
baseSink,
defaultOutputModuleName,
outputMessageDescriptor,
options,
)

testSchema := strings.ReplaceAll(strings.ToLower(tc.name), " ", "_")
createPostgresTestSchema(t, postgresContainer.ConnectionString, testSchema)

ctx := context.Background()
dbSinker, err := sinkerFactory(ctx, postgresContainer.ConnectionString, testSchema, logger, tracer)
require.NoError(t, err)

err = dbSinker.Run(ctx)
require.NoError(t, err)
require.NoError(t, dbSinker.Err())

db, err := sql.Open("postgres", postgresContainer.ConnectionString)
require.NoError(t, err)

dbx := sqlx.NewDb(db, "postgres").Unsafe()
defer dbx.Close()

if tc.expected != nil {
tc.expected(t, dbx, testSchema)
}
})
}
}

func createPostgresTestSchema(t *testing.T, dsn string, schemaName string) {
t.Helper()

db, err := sql.Open("postgres", dsn)
require.NoError(t, err)
defer db.Close()

_, err = db.Exec(fmt.Sprintf(`CREATE SCHEMA IF NOT EXISTS "%s";`, schemaName))
require.NoError(t, err)
}
3 changes: 2 additions & 1 deletion tests/integration/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,9 @@ type finalBlock string
var fixedBaseTime = time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)

// blockTime can be used in [blockScopedData] to specify the block time for the response.
// Always returns time in UTC to match database output.
func blockTime(t *testing.T, in string) time.Time {
return blockTimepb(t, in).AsTime()
return blockTimepb(t, in).AsTime().UTC()
}

// blockTimepb can be used in [blockScopedData] to specify the block time for the response.
Expand Down
Loading