From 8b3367bc81aea923f4b9dab8513f8214fbe0168d Mon Sep 17 00:00:00 2001 From: Matthieu Vachon Date: Mon, 23 Feb 2026 13:53:45 -0500 Subject: [PATCH 1/2] Summary Root cause: PostgreSQL cannot determine the type of an empty array literal (array[]) without an explicit type cast. ClickHouse can infer the type from the column definition. Fix implemented: 1. Added TypedEmptyArray struct in db_proto/sql/dialect.go to carry SQL type information for empty arrays 2. Added CreateEmptyArray method to Dialect interface that dialects implement to create appropriate empty array values 3. PostgreSQL implementation (db_proto/sql/postgres/dialect.go): - Returns a TypedEmptyArray with the SQL type (e.g., VARCHAR(255)[], INTEGER[]) 4. ClickHouse implementation (db_proto/sql/click_house/dialect.go): - Returns plain []interface{}{} since ClickHouse infers types from schema 5. Updated ValueToString in db_proto/sql/postgres/types.go to handle TypedEmptyArray by generating array[]::TYPE 6. Updated database.go to use dialect.CreateEmptyArray(fd, column) instead of []interface{}{} for empty repeated fields Test updates: - Fixed entityTypesTestWithEmptyRepeatedString to set OptionalStr_2Uint256 (separate pre-existing issue with empty strings in NUMERIC columns) - Fixed time comparison in PostgreSQL test to use time.Equal() instead of deep equality (timezone location differences) --- CHANGELOG.md | 2 + db_proto/sql/click_house/dialect.go | 7 + db_proto/sql/database.go | 11 +- db_proto/sql/dialect.go | 11 ++ db_proto/sql/postgres/dialect.go | 9 ++ db_proto/sql/postgres/types.go | 5 + db_proto/sql/schema/table.go | 10 ++ tests/integration/db_proto_clickhouse_test.go | 61 ++++++++ tests/integration/db_proto_postgres_test.go | 141 ++++++++++++++++++ tests/integration/helpers_test.go | 3 +- 10 files changed, 258 insertions(+), 2 deletions(-) create mode 100644 tests/integration/db_proto_postgres_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index e427491..17519c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * Fixed nil pointer panic in integration tests caused by missing backoff configuration when using `sink.New` directly. +* Fixed PostgreSQL error "cannot determine type of empty array" when inserting rows with empty `repeated` (array) fields in `from-proto` mode. PostgreSQL requires explicit type casts for empty array literals, which are now automatically generated based on the column type. + ## v4.12.0 ### DatabaseChanges mode improvements diff --git a/db_proto/sql/click_house/dialect.go b/db_proto/sql/click_house/dialect.go index eadcaa3..6ad2dcd 100644 --- a/db_proto/sql/click_house/dialect.go +++ b/db_proto/sql/click_house/dialect.go @@ -246,6 +246,13 @@ func (d *DialectClickHouse) AppendInlineFieldValues(fieldValues []any, fd protor return fieldValues, nil } +// CreateEmptyArray creates an empty array for ClickHouse. +// ClickHouse can infer the array type from the column definition, so we just return +// an empty slice. No special typing is needed unlike PostgreSQL. +func (d *DialectClickHouse) CreateEmptyArray(fd protoreflect.FieldDescriptor, column *schema.Column) any { + return []interface{}{} +} + func (d *DialectClickHouse) SchemaHash() string { h := fnv.New64a() diff --git a/db_proto/sql/database.go b/db_proto/sql/database.go index 21c2c4f..e88950d 100644 --- a/db_proto/sql/database.go +++ b/db_proto/sql/database.go @@ -6,6 +6,7 @@ import ( "strings" "time" + "github.com/streamingfast/substreams-sink-sql/db_proto/sql/schema" pbSchema "github.com/streamingfast/substreams-sink-sql/pb/sf/substreams/sink/sql/schema/v1" "github.com/streamingfast/substreams-sink-sql/proto" sink "github.com/streamingfast/substreams/sink" @@ -165,7 +166,15 @@ func (d *BaseDatabase) WalkMessageDescriptorAndInsertWithDialect(dm *dynamicpb.M } fieldValues = append(fieldValues, values) } else { - fieldValues = append(fieldValues, []interface{}{}) + // Empty array - use dialect-specific typed empty array to handle + // databases that need explicit type information for empty arrays (e.g., PostgreSQL) + var column *schema.Column + if tableInfo != nil { + if table := dialect.GetTable(tableInfo.Name); table != nil { + column = table.ColumnByFieldName(string(fd.Name())) + } + } + fieldValues = append(fieldValues, dialect.CreateEmptyArray(fd, column)) } } else if fd.Kind() == protoreflect.MessageKind { if fv.Message().IsValid() { diff --git a/db_proto/sql/dialect.go b/db_proto/sql/dialect.go index 0c78c0b..1d46afb 100644 --- a/db_proto/sql/dialect.go +++ b/db_proto/sql/dialect.go @@ -16,6 +16,14 @@ const DialectFieldBlockTimestamp = "_block_timestamp_" const DialectFieldVersion = "_version_" const DialectFieldDeleted = "_deleted_" +// TypedEmptyArray represents an empty array with type information. +// This is needed because some databases (like PostgreSQL) cannot determine +// the type of an empty array literal without an explicit type cast. +type TypedEmptyArray struct { + // SQLType is the full SQL array type (e.g., "TEXT[]", "INTEGER[]") + SQLType string +} + type Dialect interface { SchemaHash() string FullTableName(table *schema.Table) string @@ -24,6 +32,9 @@ type Dialect interface { UseVersionField() bool UseDeletedField() bool AppendInlineFieldValues(fieldValues []any, fd protoreflect.FieldDescriptor, fv protoreflect.Value, dm *dynamicpb.Message) ([]any, error) + // CreateEmptyArray creates an empty array value with type information for the given field. + // This allows dialects to provide type-specific empty array representations. + CreateEmptyArray(fd protoreflect.FieldDescriptor, column *schema.Column) any } type BaseDialect struct { diff --git a/db_proto/sql/postgres/dialect.go b/db_proto/sql/postgres/dialect.go index 3ce53bd..2c1e7d4 100644 --- a/db_proto/sql/postgres/dialect.go +++ b/db_proto/sql/postgres/dialect.go @@ -238,6 +238,15 @@ func (d *DialectPostgres) AppendInlineFieldValues(fieldValues []any, fd protoref return fieldValues, nil } +// CreateEmptyArray creates a typed empty array for PostgreSQL. +// PostgreSQL cannot determine the type of an empty array literal (e.g., array[]) +// without an explicit type cast, so we return a TypedEmptyArray that carries +// the SQL type information needed for the cast. +func (d *DialectPostgres) CreateEmptyArray(fd protoreflect.FieldDescriptor, column *schema.Column) any { + sqlType := MapFieldType(fd, d.bytesEncoding, column) + return sql2.TypedEmptyArray{SQLType: string(sqlType)} +} + func (d *DialectPostgres) SchemaHash() string { h := fnv.New64a() diff --git a/db_proto/sql/postgres/types.go b/db_proto/sql/postgres/types.go index e6de487..66bc67c 100644 --- a/db_proto/sql/postgres/types.go +++ b/db_proto/sql/postgres/types.go @@ -8,6 +8,7 @@ import ( "time" "github.com/streamingfast/substreams-sink-sql/bytes" + sql2 "github.com/streamingfast/substreams-sink-sql/db_proto/sql" "github.com/streamingfast/substreams-sink-sql/db_proto/sql/schema" v1 "github.com/streamingfast/substreams-sink-sql/pb/sf/substreams/sink/sql/schema/v1" "google.golang.org/protobuf/encoding/protojson" @@ -159,6 +160,10 @@ func ValueToString(value any, bytesEncoding bytes.Encoding) (s string) { s = "'" + v.Format(time.RFC3339) + "'" case *timestamppb.Timestamp: s = "'" + v.AsTime().Format(time.RFC3339) + "'" + // Handle typed empty arrays for PostgreSQL (needed because PostgreSQL cannot determine + // the type of an empty array literal without an explicit cast) + case sql2.TypedEmptyArray: + s = "array[]::" + v.SQLType // Handle array types for PostgreSQL case []interface{}: var elements []string diff --git a/db_proto/sql/schema/table.go b/db_proto/sql/schema/table.go index 61b07c7..9b49707 100644 --- a/db_proto/sql/schema/table.go +++ b/db_proto/sql/schema/table.go @@ -130,3 +130,13 @@ func (t *Table) processColumns(descriptor protoreflect.MessageDescriptor) error return nil } + +// ColumnByFieldName returns the column matching the given protobuf field name, or nil if not found. +func (t *Table) ColumnByFieldName(fieldName string) *Column { + for _, col := range t.Columns { + if col.FieldDescriptor != nil && string(col.FieldDescriptor.Name()) == fieldName { + return col + } + } + return nil +} diff --git a/tests/integration/db_proto_clickhouse_test.go b/tests/integration/db_proto_clickhouse_test.go index b569326..e3e01bb 100644 --- a/tests/integration/db_proto_clickhouse_test.go +++ b/tests/integration/db_proto_clickhouse_test.go @@ -20,6 +20,7 @@ import ( pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go/modules/clickhouse" + "google.golang.org/protobuf/types/known/timestamppb" ) func TestDbProtoClickhouseIntegration(t *testing.T) { @@ -41,6 +42,12 @@ func TestDbProtoClickhouseIntegration(t *testing.T) { } } + equalsTypesTestRows := func(expected []*TypesTestRow) func(t *testing.T, dbx *sqlx.DB) { + return func(t *testing.T, dbx *sqlx.DB) { + require.Equal(t, expected, readRowsBy[TypesTestRow](t, dbx, "types_tests", "id")) + } + } + testCases := []struct { name string responses []*pbsubstreamsrpc.Response @@ -81,6 +88,23 @@ func TestDbProtoClickhouseIntegration(t *testing.T) { {rowMeta(t, 1, "2025-01-01"), "o1", "c1"}, }), }, + { + // This test case verifies that ClickHouse correctly handles empty repeated fields. + // Unlike PostgreSQL, ClickHouse can infer the array type from the column definition + // in the schema, so empty arrays work correctly. + // + // This test serves as a regression test to ensure ClickHouse continues to handle + // empty arrays properly. + "types_test with empty repeated string field", + streamMock( + relationsBlockData(t, "1a", "2025-01-01", + entityTypesTestWithEmptyRepeatedString(1), + ), + ), + equalsTypesTestRows([]*TypesTestRow{ + {rowMeta(t, 1, "2025-01-01"), 1}, + }), + }, } for _, tc := range testCases { @@ -278,3 +302,40 @@ type OrderRow struct { OrderID string `db:"order_id"` CustomerID string `db:"customer_ref_id"` } + +// TypesTestRow represents a row from the types_tests table for assertions +type TypesTestRow struct { + Meta + ID uint64 `db:"id"` +} + +// entityTypesTestWithEmptyRepeatedString creates a TypesTest entity with an empty +// repeated string field to reproduce the "cannot determine type of empty array" error. +func entityTypesTestWithEmptyRepeatedString(id uint64) *pbrelations.Entity { + return &pbrelations.Entity{ + Entity: &pbrelations.Entity_TypesTest{ + TypesTest: &pbrelations.TypesTest{ + Id: id, + // RepeatedStringField is intentionally left nil/empty to reproduce + // the PostgreSQL error: "cannot determine type of empty array" + RepeatedStringField: nil, + // TimestampField must be set for ClickHouse to avoid panic on nil timestamp + TimestampField: timestamppb.New(time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)), + // These fields have type conversions and require valid numeric strings (not empty) + Str_2Int128: "0", + Str_2Uint128: "0", + Str_2Int256: "0", + Str_2Uint256: "0", + Str_2Decimal128: "0", + Str_2Decimal256: "0", + // Optional numeric conversion field - must be set to valid value for PostgreSQL + // (empty string fails with "invalid input syntax for type numeric") + OptionalStr_2Uint256: ptr("0"), + }, + }, + } +} + +func ptr[T any](v T) *T { + return &v +} diff --git a/tests/integration/db_proto_postgres_test.go b/tests/integration/db_proto_postgres_test.go new file mode 100644 index 0000000..a54b3d3 --- /dev/null +++ b/tests/integration/db_proto_postgres_test.go @@ -0,0 +1,141 @@ +package tests + +import ( + "context" + "database/sql" + "fmt" + "strings" + "testing" + + "github.com/cenkalti/backoff/v4" + "github.com/jmoiron/sqlx" + _ "github.com/lib/pq" + "github.com/streamingfast/bstream" + sink "github.com/streamingfast/substreams/sink" + "github.com/streamingfast/substreams-sink-sql/db_proto" + pbrelations "github.com/streamingfast/substreams-sink-sql/pb/test/relations" + "github.com/streamingfast/substreams/manifest" + pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" + "github.com/stretchr/testify/require" +) + +func TestDbProtoPostgresIntegration(t *testing.T) { + outputMessageDescriptor := (*pbrelations.Output)(nil).ProtoReflect().Descriptor() + + postgresContainer := sharedDbChangesPostgresContainer + + streamMock := func(responses ...*pbsubstreamsrpc.Response) []*pbsubstreamsrpc.Response { + return responses + } + + equalsTypesTestRows := func(expected []*TypesTestRow) func(t *testing.T, dbx *sqlx.DB, schema string) { + return func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readRowsBy[TypesTestRow](t, dbx, fmt.Sprintf(`"%s"."types_tests"`, schema), "id") + require.Len(t, rows, len(expected)) + for i, exp := range expected { + actual := rows[i] + // Compare time values using Equal() to ignore timezone location differences + require.True(t, exp.BlockTime.Equal(actual.BlockTime), "BlockTime mismatch at index %d: expected %v, got %v", i, exp.BlockTime, actual.BlockTime) + require.Equal(t, exp.IsDeleted, actual.IsDeleted) + require.Equal(t, exp.BlockNumber, actual.BlockNumber) + require.Equal(t, exp.ID, actual.ID) + } + } + } + + testCases := []struct { + name string + responses []*pbsubstreamsrpc.Response + expected func(t *testing.T, dbx *sqlx.DB, schema string) + }{ + { + // This test case reproduces the issue where PostgreSQL cannot determine the type + // of an empty array. When a repeated field is empty (nil or empty slice), the + // generated INSERT statement uses '{}' for the array, but PostgreSQL doesn't know + // what type to use for the empty array. + // + // Error: pq: cannot determine type of empty array + "types_test with empty repeated string field", + streamMock( + relationsBlockData(t, "1a", "2025-01-01", + entityTypesTestWithEmptyRepeatedString(1), + ), + ), + equalsTypesTestRows([]*TypesTestRow{ + {rowMeta(t, 1, "2025-01-01"), 1}, + }), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + pattern := make([]interface{}, len(tc.responses)) + for i, resp := range tc.responses { + pattern[i] = resp + } + substreamsClientConfig := setupFakeSubstreamsServer(t, pattern...) + substreamsPackage := substreamsTestPackage(pbrelations.File_test_relations_relations_proto, outputMessageDescriptor) + + baseSink, err := sink.New( + sink.SubstreamsModeProduction, + false, + substreamsPackage, + substreamsPackage.Modules.Modules[0], + manifest.ModuleHash{}, + substreamsClientConfig, + logger, + tracer, + sink.WithBlockRange(bstream.MustParseRange("1-2", bstream.WithExclusiveEnd())), + sink.WithRetryBackOff(&backoff.StopBackOff{}), + ) + require.NoError(t, err) + + options := db_proto.SinkerFactoryOptions{ + UseProtoOption: true, + UseConstraints: false, + UseTransactions: true, + BlockBatchSize: 1, + Parallel: false, + }.Defaults() + + sinkerFactory := db_proto.SinkerFactory( + baseSink, + defaultOutputModuleName, + outputMessageDescriptor, + options, + ) + + testSchema := strings.ReplaceAll(strings.ToLower(tc.name), " ", "_") + createPostgresTestSchema(t, postgresContainer.ConnectionString, testSchema) + + ctx := context.Background() + dbSinker, err := sinkerFactory(ctx, postgresContainer.ConnectionString, testSchema, logger, tracer) + require.NoError(t, err) + + err = dbSinker.Run(ctx) + require.NoError(t, err) + require.NoError(t, dbSinker.Err()) + + db, err := sql.Open("postgres", postgresContainer.ConnectionString) + require.NoError(t, err) + + dbx := sqlx.NewDb(db, "postgres").Unsafe() + defer dbx.Close() + + if tc.expected != nil { + tc.expected(t, dbx, testSchema) + } + }) + } +} + +func createPostgresTestSchema(t *testing.T, dsn string, schemaName string) { + t.Helper() + + db, err := sql.Open("postgres", dsn) + require.NoError(t, err) + defer db.Close() + + _, err = db.Exec(fmt.Sprintf(`CREATE SCHEMA IF NOT EXISTS "%s";`, schemaName)) + require.NoError(t, err) +} diff --git a/tests/integration/helpers_test.go b/tests/integration/helpers_test.go index 818441a..00b82ac 100644 --- a/tests/integration/helpers_test.go +++ b/tests/integration/helpers_test.go @@ -395,8 +395,9 @@ type finalBlock string var fixedBaseTime = time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC) // blockTime can be used in [blockScopedData] to specify the block time for the response. +// Always returns time in UTC to match database output. func blockTime(t *testing.T, in string) time.Time { - return blockTimepb(t, in).AsTime() + return blockTimepb(t, in).AsTime().UTC() } // blockTimepb can be used in [blockScopedData] to specify the block time for the response. From f5020abde8aa5a1bb98b5e2445e2a28ab0f1d0d7 Mon Sep 17 00:00:00 2001 From: Charles Billette Date: Tue, 24 Feb 2026 09:06:48 -0500 Subject: [PATCH 2/2] Remove `CreateEmptyArray` method and simplify handling of empty arrays across dialects --- db_proto/sql/click_house/dialect.go | 7 ------- db_proto/sql/database.go | 11 +---------- db_proto/sql/dialect.go | 11 ----------- db_proto/sql/postgres/dialect.go | 9 --------- db_proto/sql/postgres/types.go | 11 +++++------ 5 files changed, 6 insertions(+), 43 deletions(-) diff --git a/db_proto/sql/click_house/dialect.go b/db_proto/sql/click_house/dialect.go index 6ad2dcd..eadcaa3 100644 --- a/db_proto/sql/click_house/dialect.go +++ b/db_proto/sql/click_house/dialect.go @@ -246,13 +246,6 @@ func (d *DialectClickHouse) AppendInlineFieldValues(fieldValues []any, fd protor return fieldValues, nil } -// CreateEmptyArray creates an empty array for ClickHouse. -// ClickHouse can infer the array type from the column definition, so we just return -// an empty slice. No special typing is needed unlike PostgreSQL. -func (d *DialectClickHouse) CreateEmptyArray(fd protoreflect.FieldDescriptor, column *schema.Column) any { - return []interface{}{} -} - func (d *DialectClickHouse) SchemaHash() string { h := fnv.New64a() diff --git a/db_proto/sql/database.go b/db_proto/sql/database.go index e88950d..21c2c4f 100644 --- a/db_proto/sql/database.go +++ b/db_proto/sql/database.go @@ -6,7 +6,6 @@ import ( "strings" "time" - "github.com/streamingfast/substreams-sink-sql/db_proto/sql/schema" pbSchema "github.com/streamingfast/substreams-sink-sql/pb/sf/substreams/sink/sql/schema/v1" "github.com/streamingfast/substreams-sink-sql/proto" sink "github.com/streamingfast/substreams/sink" @@ -166,15 +165,7 @@ func (d *BaseDatabase) WalkMessageDescriptorAndInsertWithDialect(dm *dynamicpb.M } fieldValues = append(fieldValues, values) } else { - // Empty array - use dialect-specific typed empty array to handle - // databases that need explicit type information for empty arrays (e.g., PostgreSQL) - var column *schema.Column - if tableInfo != nil { - if table := dialect.GetTable(tableInfo.Name); table != nil { - column = table.ColumnByFieldName(string(fd.Name())) - } - } - fieldValues = append(fieldValues, dialect.CreateEmptyArray(fd, column)) + fieldValues = append(fieldValues, []interface{}{}) } } else if fd.Kind() == protoreflect.MessageKind { if fv.Message().IsValid() { diff --git a/db_proto/sql/dialect.go b/db_proto/sql/dialect.go index 1d46afb..0c78c0b 100644 --- a/db_proto/sql/dialect.go +++ b/db_proto/sql/dialect.go @@ -16,14 +16,6 @@ const DialectFieldBlockTimestamp = "_block_timestamp_" const DialectFieldVersion = "_version_" const DialectFieldDeleted = "_deleted_" -// TypedEmptyArray represents an empty array with type information. -// This is needed because some databases (like PostgreSQL) cannot determine -// the type of an empty array literal without an explicit type cast. -type TypedEmptyArray struct { - // SQLType is the full SQL array type (e.g., "TEXT[]", "INTEGER[]") - SQLType string -} - type Dialect interface { SchemaHash() string FullTableName(table *schema.Table) string @@ -32,9 +24,6 @@ type Dialect interface { UseVersionField() bool UseDeletedField() bool AppendInlineFieldValues(fieldValues []any, fd protoreflect.FieldDescriptor, fv protoreflect.Value, dm *dynamicpb.Message) ([]any, error) - // CreateEmptyArray creates an empty array value with type information for the given field. - // This allows dialects to provide type-specific empty array representations. - CreateEmptyArray(fd protoreflect.FieldDescriptor, column *schema.Column) any } type BaseDialect struct { diff --git a/db_proto/sql/postgres/dialect.go b/db_proto/sql/postgres/dialect.go index 2c1e7d4..3ce53bd 100644 --- a/db_proto/sql/postgres/dialect.go +++ b/db_proto/sql/postgres/dialect.go @@ -238,15 +238,6 @@ func (d *DialectPostgres) AppendInlineFieldValues(fieldValues []any, fd protoref return fieldValues, nil } -// CreateEmptyArray creates a typed empty array for PostgreSQL. -// PostgreSQL cannot determine the type of an empty array literal (e.g., array[]) -// without an explicit type cast, so we return a TypedEmptyArray that carries -// the SQL type information needed for the cast. -func (d *DialectPostgres) CreateEmptyArray(fd protoreflect.FieldDescriptor, column *schema.Column) any { - sqlType := MapFieldType(fd, d.bytesEncoding, column) - return sql2.TypedEmptyArray{SQLType: string(sqlType)} -} - func (d *DialectPostgres) SchemaHash() string { h := fnv.New64a() diff --git a/db_proto/sql/postgres/types.go b/db_proto/sql/postgres/types.go index 66bc67c..bbf4237 100644 --- a/db_proto/sql/postgres/types.go +++ b/db_proto/sql/postgres/types.go @@ -8,7 +8,6 @@ import ( "time" "github.com/streamingfast/substreams-sink-sql/bytes" - sql2 "github.com/streamingfast/substreams-sink-sql/db_proto/sql" "github.com/streamingfast/substreams-sink-sql/db_proto/sql/schema" v1 "github.com/streamingfast/substreams-sink-sql/pb/sf/substreams/sink/sql/schema/v1" "google.golang.org/protobuf/encoding/protojson" @@ -160,12 +159,12 @@ func ValueToString(value any, bytesEncoding bytes.Encoding) (s string) { s = "'" + v.Format(time.RFC3339) + "'" case *timestamppb.Timestamp: s = "'" + v.AsTime().Format(time.RFC3339) + "'" - // Handle typed empty arrays for PostgreSQL (needed because PostgreSQL cannot determine - // the type of an empty array literal without an explicit cast) - case sql2.TypedEmptyArray: - s = "array[]::" + v.SQLType - // Handle array types for PostgreSQL case []interface{}: + if len(v) == 0 { + s = "'{}'" + return + } + var elements []string for _, elem := range v { elements = append(elements, ValueToString(elem, bytesEncoding))