diff --git a/CHANGELOG.md b/CHANGELOG.md index e427491..17519c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * Fixed nil pointer panic in integration tests caused by missing backoff configuration when using `sink.New` directly. +* Fixed PostgreSQL error "cannot determine type of empty array" when inserting rows with empty `repeated` (array) fields in `from-proto` mode. PostgreSQL requires explicit type casts for empty array literals, which are now automatically generated based on the column type. + ## v4.12.0 ### DatabaseChanges mode improvements diff --git a/db_proto/sql/postgres/types.go b/db_proto/sql/postgres/types.go index e6de487..bbf4237 100644 --- a/db_proto/sql/postgres/types.go +++ b/db_proto/sql/postgres/types.go @@ -159,8 +159,12 @@ func ValueToString(value any, bytesEncoding bytes.Encoding) (s string) { s = "'" + v.Format(time.RFC3339) + "'" case *timestamppb.Timestamp: s = "'" + v.AsTime().Format(time.RFC3339) + "'" - // Handle array types for PostgreSQL case []interface{}: + if len(v) == 0 { + s = "'{}'" + return + } + var elements []string for _, elem := range v { elements = append(elements, ValueToString(elem, bytesEncoding)) diff --git a/db_proto/sql/schema/table.go b/db_proto/sql/schema/table.go index 61b07c7..9b49707 100644 --- a/db_proto/sql/schema/table.go +++ b/db_proto/sql/schema/table.go @@ -130,3 +130,13 @@ func (t *Table) processColumns(descriptor protoreflect.MessageDescriptor) error return nil } + +// ColumnByFieldName returns the column matching the given protobuf field name, or nil if not found. +func (t *Table) ColumnByFieldName(fieldName string) *Column { + for _, col := range t.Columns { + if col.FieldDescriptor != nil && string(col.FieldDescriptor.Name()) == fieldName { + return col + } + } + return nil +} diff --git a/tests/integration/db_proto_clickhouse_test.go b/tests/integration/db_proto_clickhouse_test.go index b569326..e3e01bb 100644 --- a/tests/integration/db_proto_clickhouse_test.go +++ b/tests/integration/db_proto_clickhouse_test.go @@ -20,6 +20,7 @@ import ( pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go/modules/clickhouse" + "google.golang.org/protobuf/types/known/timestamppb" ) func TestDbProtoClickhouseIntegration(t *testing.T) { @@ -41,6 +42,12 @@ func TestDbProtoClickhouseIntegration(t *testing.T) { } } + equalsTypesTestRows := func(expected []*TypesTestRow) func(t *testing.T, dbx *sqlx.DB) { + return func(t *testing.T, dbx *sqlx.DB) { + require.Equal(t, expected, readRowsBy[TypesTestRow](t, dbx, "types_tests", "id")) + } + } + testCases := []struct { name string responses []*pbsubstreamsrpc.Response @@ -81,6 +88,23 @@ func TestDbProtoClickhouseIntegration(t *testing.T) { {rowMeta(t, 1, "2025-01-01"), "o1", "c1"}, }), }, + { + // This test case verifies that ClickHouse correctly handles empty repeated fields. + // Unlike PostgreSQL, ClickHouse can infer the array type from the column definition + // in the schema, so empty arrays work correctly. + // + // This test serves as a regression test to ensure ClickHouse continues to handle + // empty arrays properly. + "types_test with empty repeated string field", + streamMock( + relationsBlockData(t, "1a", "2025-01-01", + entityTypesTestWithEmptyRepeatedString(1), + ), + ), + equalsTypesTestRows([]*TypesTestRow{ + {rowMeta(t, 1, "2025-01-01"), 1}, + }), + }, } for _, tc := range testCases { @@ -278,3 +302,40 @@ type OrderRow struct { OrderID string `db:"order_id"` CustomerID string `db:"customer_ref_id"` } + +// TypesTestRow represents a row from the types_tests table for assertions +type TypesTestRow struct { + Meta + ID uint64 `db:"id"` +} + +// entityTypesTestWithEmptyRepeatedString creates a TypesTest entity with an empty +// repeated string field to reproduce the "cannot determine type of empty array" error. +func entityTypesTestWithEmptyRepeatedString(id uint64) *pbrelations.Entity { + return &pbrelations.Entity{ + Entity: &pbrelations.Entity_TypesTest{ + TypesTest: &pbrelations.TypesTest{ + Id: id, + // RepeatedStringField is intentionally left nil/empty to reproduce + // the PostgreSQL error: "cannot determine type of empty array" + RepeatedStringField: nil, + // TimestampField must be set for ClickHouse to avoid panic on nil timestamp + TimestampField: timestamppb.New(time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)), + // These fields have type conversions and require valid numeric strings (not empty) + Str_2Int128: "0", + Str_2Uint128: "0", + Str_2Int256: "0", + Str_2Uint256: "0", + Str_2Decimal128: "0", + Str_2Decimal256: "0", + // Optional numeric conversion field - must be set to valid value for PostgreSQL + // (empty string fails with "invalid input syntax for type numeric") + OptionalStr_2Uint256: ptr("0"), + }, + }, + } +} + +func ptr[T any](v T) *T { + return &v +} diff --git a/tests/integration/db_proto_postgres_test.go b/tests/integration/db_proto_postgres_test.go new file mode 100644 index 0000000..a54b3d3 --- /dev/null +++ b/tests/integration/db_proto_postgres_test.go @@ -0,0 +1,141 @@ +package tests + +import ( + "context" + "database/sql" + "fmt" + "strings" + "testing" + + "github.com/cenkalti/backoff/v4" + "github.com/jmoiron/sqlx" + _ "github.com/lib/pq" + "github.com/streamingfast/bstream" + sink "github.com/streamingfast/substreams/sink" + "github.com/streamingfast/substreams-sink-sql/db_proto" + pbrelations "github.com/streamingfast/substreams-sink-sql/pb/test/relations" + "github.com/streamingfast/substreams/manifest" + pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" + "github.com/stretchr/testify/require" +) + +func TestDbProtoPostgresIntegration(t *testing.T) { + outputMessageDescriptor := (*pbrelations.Output)(nil).ProtoReflect().Descriptor() + + postgresContainer := sharedDbChangesPostgresContainer + + streamMock := func(responses ...*pbsubstreamsrpc.Response) []*pbsubstreamsrpc.Response { + return responses + } + + equalsTypesTestRows := func(expected []*TypesTestRow) func(t *testing.T, dbx *sqlx.DB, schema string) { + return func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readRowsBy[TypesTestRow](t, dbx, fmt.Sprintf(`"%s"."types_tests"`, schema), "id") + require.Len(t, rows, len(expected)) + for i, exp := range expected { + actual := rows[i] + // Compare time values using Equal() to ignore timezone location differences + require.True(t, exp.BlockTime.Equal(actual.BlockTime), "BlockTime mismatch at index %d: expected %v, got %v", i, exp.BlockTime, actual.BlockTime) + require.Equal(t, exp.IsDeleted, actual.IsDeleted) + require.Equal(t, exp.BlockNumber, actual.BlockNumber) + require.Equal(t, exp.ID, actual.ID) + } + } + } + + testCases := []struct { + name string + responses []*pbsubstreamsrpc.Response + expected func(t *testing.T, dbx *sqlx.DB, schema string) + }{ + { + // This test case reproduces the issue where PostgreSQL cannot determine the type + // of an empty array. When a repeated field is empty (nil or empty slice), the + // generated INSERT statement uses '{}' for the array, but PostgreSQL doesn't know + // what type to use for the empty array. + // + // Error: pq: cannot determine type of empty array + "types_test with empty repeated string field", + streamMock( + relationsBlockData(t, "1a", "2025-01-01", + entityTypesTestWithEmptyRepeatedString(1), + ), + ), + equalsTypesTestRows([]*TypesTestRow{ + {rowMeta(t, 1, "2025-01-01"), 1}, + }), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + pattern := make([]interface{}, len(tc.responses)) + for i, resp := range tc.responses { + pattern[i] = resp + } + substreamsClientConfig := setupFakeSubstreamsServer(t, pattern...) + substreamsPackage := substreamsTestPackage(pbrelations.File_test_relations_relations_proto, outputMessageDescriptor) + + baseSink, err := sink.New( + sink.SubstreamsModeProduction, + false, + substreamsPackage, + substreamsPackage.Modules.Modules[0], + manifest.ModuleHash{}, + substreamsClientConfig, + logger, + tracer, + sink.WithBlockRange(bstream.MustParseRange("1-2", bstream.WithExclusiveEnd())), + sink.WithRetryBackOff(&backoff.StopBackOff{}), + ) + require.NoError(t, err) + + options := db_proto.SinkerFactoryOptions{ + UseProtoOption: true, + UseConstraints: false, + UseTransactions: true, + BlockBatchSize: 1, + Parallel: false, + }.Defaults() + + sinkerFactory := db_proto.SinkerFactory( + baseSink, + defaultOutputModuleName, + outputMessageDescriptor, + options, + ) + + testSchema := strings.ReplaceAll(strings.ToLower(tc.name), " ", "_") + createPostgresTestSchema(t, postgresContainer.ConnectionString, testSchema) + + ctx := context.Background() + dbSinker, err := sinkerFactory(ctx, postgresContainer.ConnectionString, testSchema, logger, tracer) + require.NoError(t, err) + + err = dbSinker.Run(ctx) + require.NoError(t, err) + require.NoError(t, dbSinker.Err()) + + db, err := sql.Open("postgres", postgresContainer.ConnectionString) + require.NoError(t, err) + + dbx := sqlx.NewDb(db, "postgres").Unsafe() + defer dbx.Close() + + if tc.expected != nil { + tc.expected(t, dbx, testSchema) + } + }) + } +} + +func createPostgresTestSchema(t *testing.T, dsn string, schemaName string) { + t.Helper() + + db, err := sql.Open("postgres", dsn) + require.NoError(t, err) + defer db.Close() + + _, err = db.Exec(fmt.Sprintf(`CREATE SCHEMA IF NOT EXISTS "%s";`, schemaName)) + require.NoError(t, err) +} diff --git a/tests/integration/helpers_test.go b/tests/integration/helpers_test.go index 818441a..00b82ac 100644 --- a/tests/integration/helpers_test.go +++ b/tests/integration/helpers_test.go @@ -395,8 +395,9 @@ type finalBlock string var fixedBaseTime = time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC) // blockTime can be used in [blockScopedData] to specify the block time for the response. +// Always returns time in UTC to match database output. func blockTime(t *testing.T, in string) time.Time { - return blockTimepb(t, in).AsTime() + return blockTimepb(t, in).AsTime().UTC() } // blockTimepb can be used in [blockScopedData] to specify the block time for the response.