From 8ef4d7f29572740bc837a344d9a3c8b5a637f405 Mon Sep 17 00:00:00 2001 From: zhuxt2015 <594754793@qq.com> Date: Thu, 19 Mar 2026 12:33:17 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E6=94=AF=E6=8C=81=E6=8C=89=E7=85=A7SR?= =?UTF-8?q?=E8=A1=A8=E7=9A=84=E5=AD=97=E6=AE=B5=E9=A1=BA=E5=BA=8F=E5=86=99?= =?UTF-8?q?=E5=85=A5=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cdc/postgres/PostgresCDCBuilder.java | 25 ++++++++ .../org/dinky/cdc/sql/SQLSinkBuilder.java | 10 ++- .../main/java/org/dinky/data/model/Table.java | 3 + .../trans/ddl/CreateCDCSourceOperation.java | 63 ++++++++++++++++++- 4 files changed, 95 insertions(+), 6 deletions(-) diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/postgres/PostgresCDCBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/postgres/PostgresCDCBuilder.java index a70647055d..9a671f53ed 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/postgres/PostgresCDCBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/postgres/PostgresCDCBuilder.java @@ -118,4 +118,29 @@ public String generateUrl(String schema) { protected String getMetadataType() { return METADATA_TYPE; } + + @Override + public Map parseMetaDataConfig() { + String url = String.format( + "jdbc:postgres://%s:%d/%s", + config.getHostname(), config.getPort(), composeJdbcProperties(config.getJdbc())); + return parseMetaDataSingleConfig(url); + } + + private String composeJdbcProperties(Map jdbcProperties) { + if (jdbcProperties == null || jdbcProperties.isEmpty()) { + return ""; + } + + StringBuilder sb = new StringBuilder(); + sb.append('?'); + jdbcProperties.forEach((k, v) -> { + sb.append(k); + sb.append("="); + sb.append(v); + sb.append("&"); + }); + sb.deleteCharAt(sb.length() - 1); + return sb.toString(); + } } diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java index 1248f131b7..82ee20f69f 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java @@ -100,12 +100,16 @@ private List addSinkInsert( FlinkTableObjectIdentifier targetTable, String sinkSchemaName, FlinkTableObjectIdentifier sinkTable) { - String pkList = StringUtils.join(getPKList(table), "."); - String flinkDDL = FlinkStatementUtil.getFlinkDDL(table, targetTable, config, sinkSchemaName, sinkTable, pkList); + Table sinkTableObject = table.getSinkTable(); + if (sinkTableObject == null) { + sinkTableObject = table; + } + String pkList = StringUtils.join(getPKList(sinkTableObject), "."); + String flinkDDL = FlinkStatementUtil.getFlinkDDL(sinkTableObject, targetTable, config, sinkSchemaName, sinkTable, pkList); logger.info(flinkDDL); customTableEnvironment.executeSql(flinkDDL); logger.info("Create {} FlinkSQL DDL successful...", targetTable); - return createInsertOperations(table, sourceTable, targetTable); + return createInsertOperations(sinkTableObject, sourceTable, targetTable); } @Override diff --git a/dinky-common/src/main/java/org/dinky/data/model/Table.java b/dinky-common/src/main/java/org/dinky/data/model/Table.java index 2b3555937f..8415684601 100644 --- a/dinky-common/src/main/java/org/dinky/data/model/Table.java +++ b/dinky-common/src/main/java/org/dinky/data/model/Table.java @@ -67,6 +67,9 @@ public class Table implements Serializable, Comparable, Cloneable { private List columns; + /** The sink table for the source table */ + private Table sinkTable; + /** 驱动类型, @see org.dinky.metadata.enums.DriverType */ private String driverType; diff --git a/dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java b/dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java index 6bac67085c..81af77cd04 100644 --- a/dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java +++ b/dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java @@ -129,7 +129,9 @@ public TableResult execute(Executor executor) { String tableName = schemaTableName.split("\\.")[1]; table.setColumns(driver.listColumnsSortByPK(realSchemaName, tableName)); schemaList.add(schema); - + Driver sinkRealDriver = getDriver(config, schemaName); + final List
sinkTables= getSinkTables(config, schemaName); + setSinkTable(table, sinkTables, sinkBuilder, sinkRealDriver); if (null != sinkDriver) { final String createTableOptions = config.getSink().get(FlinkCDCConfig.AUTO_CREATE_OPTIONS); Table sinkTable = (Table) table.clone(); @@ -151,6 +153,8 @@ public TableResult execute(Executor executor) { Driver driver = Driver.build(confMap.get("name"), confMap.get("type"), JsonUtils.toMap(confMap)); final List
tables = driver.listTables(schemaName); + Driver sinkRealDriver = getDriver(config, schemaName); + final List
sinkTables= getSinkTables(config, schemaName); for (Table table : tables) { if (!Asserts.isEquals(table.getType(), "VIEW")) { if (Asserts.isNotNullCollection(tableRegList)) { @@ -160,6 +164,7 @@ public TableResult execute(Executor executor) { table.setColumns(driver.listColumnsSortByPK(schemaName, table.getName())); schema.getTables().add(table); schemaTableNameList.add(table.getSchemaTableName()); + setSinkTable(table, sinkTables, sinkBuilder, sinkRealDriver); break; } } @@ -167,6 +172,7 @@ public TableResult execute(Executor executor) { table.setColumns(driver.listColumnsSortByPK(schemaName, table.getName())); schemaTableNameList.add(table.getSchemaTableName()); schema.getTables().add(table); + setSinkTable(table, sinkTables, sinkBuilder, sinkRealDriver); } } } @@ -223,19 +229,70 @@ public TableResult execute(Executor executor) { return tableResultBuilder.build(); } + private static void setSinkTable(Table table, List
sinkTables, SinkBuilder sinkBuilder, Driver sinkRealDriver) { + String sinkTableName = sinkBuilder.getSinkTableName(table); + for (Table sinkTable : sinkTables) { + String sinkTableSchema = sinkTable.getSchema(); + String sinkTableSchemaTableName = sinkTable.getSchemaTableName(); + String currentSinkTableName = sinkTableSchemaTableName; + if (Asserts.isContainsString(sinkTableSchemaTableName, ".")) { + currentSinkTableName = sinkTableSchemaTableName.split("\\.")[1]; + } + if (sinkTableName.equals(currentSinkTableName)) { + if (null != sinkRealDriver) { + sinkTable.setColumns(sinkRealDriver.listColumnsSortByPK(sinkTableSchema, currentSinkTableName)); + } + table.setSinkTable(sinkTable); + break; + } + } + } + + private List
getSinkTables(FlinkCDCConfig config, String schemaName) throws Exception { + List
sinkTables; + Driver sinkDriver = getDriver(config, schemaName); + if (null == sinkDriver) { + return new ArrayList<>(); + } + Map sink = config.getSink(); + String schema = schemaName; + String sinkDb = sink.get(FlinkCDCConfig.SINK_DB); + if (Asserts.isNotNullString(sinkDb)) { + schema = SqlUtil.replaceAllParam(sinkDb, "schemaName", schemaName); + } + sinkTables = sinkDriver.listTables(schema); + return sinkTables; + } + private Driver checkAndCreateSinkSchema(FlinkCDCConfig config, String schemaName) throws Exception { Map sink = config.getSink(); String autoCreate = sink.get(FlinkCDCConfig.AUTO_CREATE); if (!Asserts.isEqualsIgnoreCase(autoCreate, "true") || Asserts.isNullString(schemaName)) { return null; } - String url = sink.get("url"); + return getDriver(config, schemaName); + } + + private Driver getDriver(FlinkCDCConfig config, String schemaName) throws Exception { + Map sink = config.getSink(); + String connector = sink.get("connector"); + String url; + if (Asserts.isEquals(connector, "starrocks")) { + url = sink.get("jdbc-url"); + } else if (Asserts.isEqualsIgnoreCase(connector, "doris")) { + url = "jdbc:mysql://" + sink.get("fenodes"); + } else if (Asserts.isEqualsIgnoreCase(connector, "jdbc")) { + url = sink.get("url"); + } else { + return null; + } String schema = schemaName; String sinkDb = sink.get(FlinkCDCConfig.SINK_DB); if (Asserts.isNotNullString(sinkDb)) { schema = SqlUtil.replaceAllParam(sinkDb, "schemaName", schemaName); } - Driver driver = Driver.build(sink.get("connector"), url, sink.get("username"), sink.get("password")); + + Driver driver = Driver.build(connector, url, sink.get("username"), sink.get("password")); if (null != driver && !driver.existSchema(schema)) { driver.createSchema(schema); } From 42330603b920b48df21f8a94cc89ff6f2c01c831 Mon Sep 17 00:00:00 2001 From: zhuxt2015 <594754793@qq.com> Date: Thu, 19 Mar 2026 12:47:12 +0800 Subject: [PATCH 2/4] =?UTF-8?q?[Test]=20=E4=B8=BASR=E5=AD=97=E6=AE=B5?= =?UTF-8?q?=E9=A1=BA=E5=BA=8F=E5=86=99=E5=85=A5=E5=8A=9F=E8=83=BD=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E5=8D=95=E5=85=83=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - StarRocksTypeConvertTest: 验证 datetime→TIMESTAMP 和 date→DATE 类型映射 - PostgresCDCBuilderTest: 覆盖 composeJdbcProperties 的边界场景(null/空/单参数/多参数) - CreateCDCSourceOperationTest: 通过反射测试 setSinkTable()(匹配、不匹配、带schema前缀、driver加载列、首个匹配优先) - TableTest: 覆盖新增 sinkTable 字段(默认null、存取、隔离性、重置) --- .../cdc/postgres/PostgresCDCBuilderTest.java | 125 ++++++++++++++ .../java/org/dinky/data/model/TableTest.java | 32 ++++ .../ddl/CreateCDCSourceOperationTest.java | 161 ++++++++++++++++++ .../convert/StarRocksTypeConvertTest.java | 127 ++++++++++++++ 4 files changed, 445 insertions(+) create mode 100644 dinky-cdc/dinky-cdc-core/src/test/java/org/dinky/cdc/postgres/PostgresCDCBuilderTest.java create mode 100644 dinky-core/src/test/java/org/dinky/trans/ddl/CreateCDCSourceOperationTest.java create mode 100644 dinky-metadata/dinky-metadata-starrocks/src/test/java/org/dinky/metadata/convert/StarRocksTypeConvertTest.java diff --git a/dinky-cdc/dinky-cdc-core/src/test/java/org/dinky/cdc/postgres/PostgresCDCBuilderTest.java b/dinky-cdc/dinky-cdc-core/src/test/java/org/dinky/cdc/postgres/PostgresCDCBuilderTest.java new file mode 100644 index 0000000000..4e7ecab2bf --- /dev/null +++ b/dinky-cdc/dinky-cdc-core/src/test/java/org/dinky/cdc/postgres/PostgresCDCBuilderTest.java @@ -0,0 +1,125 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.cdc.postgres; + +import static org.mockito.Mockito.when; + +import org.dinky.data.model.FlinkCDCConfig; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link PostgresCDCBuilder#parseMetaDataConfig()} and the private + * {@code composeJdbcProperties} helper introduced in this commit. + */ +public class PostgresCDCBuilderTest { + + @Mock + private FlinkCDCConfig config; + + private PostgresCDCBuilder builder; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + builder = new PostgresCDCBuilder(config); + when(config.getHostname()).thenReturn("localhost"); + when(config.getPort()).thenReturn(5432); + when(config.getDatabase()).thenReturn("testdb"); + when(config.getUsername()).thenReturn("user"); + when(config.getPassword()).thenReturn("pass"); + } + + /** No jdbc properties -> URL should not contain '?' */ + @Test + public void testParseMetaDataConfig_noJdbcProperties() { + when(config.getJdbc()).thenReturn(null); + + Map result = builder.parseMetaDataConfig(); + + String url = result.get("url"); + Assert.assertNotNull(url); + Assert.assertTrue("URL should start with jdbc:postgres://", url.startsWith("jdbc:postgres://")); + Assert.assertFalse("URL should not contain '?' when jdbc props are empty", url.contains("?")); + } + + /** Empty jdbc map -> same as null, no query string */ + @Test + public void testParseMetaDataConfig_emptyJdbcProperties() { + when(config.getJdbc()).thenReturn(Collections.emptyMap()); + + Map result = builder.parseMetaDataConfig(); + + String url = result.get("url"); + Assert.assertFalse("URL should not contain '?' for empty jdbc map", url.contains("?")); + } + + /** Single jdbc property -> ?key=value */ + @Test + public void testParseMetaDataConfig_singleJdbcProperty() { + Map jdbc = Collections.singletonMap("ssl", "true"); + when(config.getJdbc()).thenReturn(jdbc); + + Map result = builder.parseMetaDataConfig(); + + String url = result.get("url"); + Assert.assertTrue("URL should contain '?ssl=true'", url.contains("?ssl=true")); + // Must not end with trailing '&' + Assert.assertFalse("URL must not end with '&'", url.endsWith("&")); + } + + /** Multiple jdbc properties -> all encoded, no trailing '&' */ + @Test + public void testParseMetaDataConfig_multipleJdbcProperties() { + Map jdbc = new LinkedHashMap<>(); + jdbc.put("ssl", "true"); + jdbc.put("sslmode", "require"); + when(config.getJdbc()).thenReturn(jdbc); + + Map result = builder.parseMetaDataConfig(); + + String url = result.get("url"); + Assert.assertTrue("URL should contain ssl param", url.contains("ssl=true")); + Assert.assertTrue("URL should contain sslmode param", url.contains("sslmode=require")); + Assert.assertFalse("URL must not end with '&'", url.endsWith("&")); + // Exactly one '?' in the query string part + Assert.assertEquals("URL should have exactly one '?'", 1, url.chars().filter(c -> c == '?').count()); + } + + /** Host / port embedded correctly in URL */ + @Test + public void testParseMetaDataConfig_urlContainsHostAndPort() { + when(config.getJdbc()).thenReturn(null); + + Map result = builder.parseMetaDataConfig(); + + String url = result.get("url"); + Assert.assertTrue("URL should contain host", url.contains("localhost")); + Assert.assertTrue("URL should contain port", url.contains("5432")); + } +} diff --git a/dinky-common/src/test/java/org/dinky/data/model/TableTest.java b/dinky-common/src/test/java/org/dinky/data/model/TableTest.java index 33e17ed5bb..aac24f5f7b 100644 --- a/dinky-common/src/test/java/org/dinky/data/model/TableTest.java +++ b/dinky-common/src/test/java/org/dinky/data/model/TableTest.java @@ -87,6 +87,38 @@ void setUp() { flinkConfig = "#{schemaName}=schemaName, #{tableName}=tableName, #{abc}=abc, #{}=null, bcd=bcd"; } + // ---- sinkTable field (added in this commit) ---- + + @Test + void sinkTable_defaultIsNull() { + assertThat(table.getSinkTable(), equalTo(null)); + } + + @Test + void sinkTable_setAndGet() { + Table sink = new Table("sink_orders", "target_schema", null); + table.setSinkTable(sink); + assertThat(table.getSinkTable(), equalTo(sink)); + assertThat(table.getSinkTable().getName(), equalTo("sink_orders")); + } + + @Test + void sinkTable_doesNotAffectSourceTableFields() { + Table sink = new Table("sink_orders", "target_schema", null); + table.setSinkTable(sink); + // Source table fields must remain unchanged + assertThat(table.getName(), equalTo("TableNameOrigin")); + assertThat(table.getSchema(), equalTo("SchemaOrigin")); + } + + @Test + void sinkTable_canBeResetToNull() { + Table sink = new Table("sink_orders", "target_schema", null); + table.setSinkTable(sink); + table.setSinkTable(null); + assertThat(table.getSinkTable(), equalTo(null)); + } + @Test void getFlinkDDL() { String result = table.getFlinkDDL(flinkConfig, "NewTableName"); diff --git a/dinky-core/src/test/java/org/dinky/trans/ddl/CreateCDCSourceOperationTest.java b/dinky-core/src/test/java/org/dinky/trans/ddl/CreateCDCSourceOperationTest.java new file mode 100644 index 0000000000..bc5a6de469 --- /dev/null +++ b/dinky-core/src/test/java/org/dinky/trans/ddl/CreateCDCSourceOperationTest.java @@ -0,0 +1,161 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.trans.ddl; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +import org.dinky.cdc.SinkBuilder; +import org.dinky.data.model.Table; + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Unit tests for the private static method + * {@code CreateCDCSourceOperation#setSinkTable} introduced in this commit. + * + *

The method is exercised via reflection to avoid exposing it as package-visible. + */ +class CreateCDCSourceOperationTest { + + private Method setSinkTableMethod; + private SinkBuilder sinkBuilder; + + @BeforeEach + void setUp() throws Exception { + setSinkTableMethod = CreateCDCSourceOperation.class.getDeclaredMethod( + "setSinkTable", Table.class, List.class, SinkBuilder.class, + org.dinky.metadata.driver.Driver.class); + setSinkTableMethod.setAccessible(true); + + sinkBuilder = mock(SinkBuilder.class); + } + + private void invoke(Table table, List

sinkTables, SinkBuilder sb, + org.dinky.metadata.driver.Driver driver) throws Exception { + setSinkTableMethod.invoke(null, table, sinkTables, sb, driver); + } + + /** + * When the sinkBuilder returns a table name that matches one of the sink tables + * (plain name, no schema prefix), the source table's sinkTable is set. + */ + @Test + void testMatchBySinkTableName_plain() throws Exception { + Table sourceTable = new Table("orders", "public", null); + Table sinkTable = new Table("orders", "public", null); + + when(sinkBuilder.getSinkTableName(sourceTable)).thenReturn("orders"); + + invoke(sourceTable, Collections.singletonList(sinkTable), sinkBuilder, null); + + assertNotNull(sourceTable.getSinkTable()); + assertEquals("orders", sourceTable.getSinkTable().getName()); + } + + /** + * When the sink table carries a schema prefix (e.g. "public.orders"), the method + * must split on '.' and match using only the table-name part. + */ + @Test + void testMatchBySinkTableName_withSchemaPrefix() throws Exception { + Table sourceTable = new Table("orders", "public", null); + Table sinkTable = new Table("orders", "public", null); + // getSchemaTableName() returns "public.orders" + + when(sinkBuilder.getSinkTableName(sourceTable)).thenReturn("orders"); + + invoke(sourceTable, Collections.singletonList(sinkTable), sinkBuilder, null); + + assertNotNull(sourceTable.getSinkTable()); + } + + /** + * No matching sink table -> sinkTable stays null. + */ + @Test + void testNoMatch_sinkTableRemainsNull() throws Exception { + Table sourceTable = new Table("orders", "public", null); + Table sinkTable = new Table("customers", "public", null); + + when(sinkBuilder.getSinkTableName(sourceTable)).thenReturn("orders"); + + invoke(sourceTable, Collections.singletonList(sinkTable), sinkBuilder, null); + + assertNull(sourceTable.getSinkTable()); + } + + /** + * When sinkRealDriver is provided, columns should be loaded from it. + */ + @Test + void testMatchWithDriver_columnsAreSet() throws Exception { + Table sourceTable = new Table("orders", "public", null); + Table sinkTable = new Table("orders", "public", null); + + when(sinkBuilder.getSinkTableName(sourceTable)).thenReturn("orders"); + + org.dinky.metadata.driver.Driver driver = mock(org.dinky.metadata.driver.Driver.class); + when(driver.listColumnsSortByPK("public", "orders")) + .thenReturn(Collections.emptyList()); + + invoke(sourceTable, Collections.singletonList(sinkTable), sinkBuilder, driver); + + assertNotNull(sourceTable.getSinkTable()); + verify(driver).listColumnsSortByPK("public", "orders"); + } + + /** + * Empty sink table list -> sinkTable stays null, no exception. + */ + @Test + void testEmptySinkTableList() throws Exception { + Table sourceTable = new Table("orders", "public", null); + + when(sinkBuilder.getSinkTableName(sourceTable)).thenReturn("orders"); + + invoke(sourceTable, Collections.emptyList(), sinkBuilder, null); + + assertNull(sourceTable.getSinkTable()); + } + + /** + * Multiple candidates; only the first match should be used (break after first match). + */ + @Test + void testFirstMatchWins() throws Exception { + Table sourceTable = new Table("orders", "public", null); + Table sinkTable1 = new Table("orders", "schema1", null); + Table sinkTable2 = new Table("orders", "schema2", null); + + when(sinkBuilder.getSinkTableName(sourceTable)).thenReturn("orders"); + + invoke(sourceTable, Arrays.asList(sinkTable1, sinkTable2), sinkBuilder, null); + + assertNotNull(sourceTable.getSinkTable()); + assertEquals("schema1", sourceTable.getSinkTable().getSchema()); + } +} diff --git a/dinky-metadata/dinky-metadata-starrocks/src/test/java/org/dinky/metadata/convert/StarRocksTypeConvertTest.java b/dinky-metadata/dinky-metadata-starrocks/src/test/java/org/dinky/metadata/convert/StarRocksTypeConvertTest.java new file mode 100644 index 0000000000..be9ce64a5b --- /dev/null +++ b/dinky-metadata/dinky-metadata-starrocks/src/test/java/org/dinky/metadata/convert/StarRocksTypeConvertTest.java @@ -0,0 +1,127 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.metadata.convert; + +import org.dinky.data.enums.ColumnType; +import org.dinky.data.model.Column; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class StarRocksTypeConvertTest { + + private StarRocksTypeConvert typeConvert; + + @Before + public void setUp() { + typeConvert = new StarRocksTypeConvert(); + } + + private Column buildColumn(String type, boolean nullable, boolean keyFlag) { + Column column = new Column(); + column.setType(type); + column.setNullable(nullable); + column.setKeyFlag(keyFlag); + return column; + } + + // ---- datetime -> TIMESTAMP (was STRING before fix) ---- + + @Test + public void testDatetimeConvertsToTimestamp() { + Column column = buildColumn("datetime", false, true); + Assert.assertEquals(ColumnType.TIMESTAMP, typeConvert.convert(column)); + } + + @Test + public void testDatetimeNullableConvertsToTimestamp() { + Column column = buildColumn("datetime", true, false); + Assert.assertEquals(ColumnType.TIMESTAMP, typeConvert.convert(column)); + } + + // ---- date -> DATE (was STRING before fix) ---- + + @Test + public void testDateConvertsToDate() { + Column column = buildColumn("date", false, true); + Assert.assertEquals(ColumnType.DATE, typeConvert.convert(column)); + } + + @Test + public void testDateNullableConvertsToDate() { + Column column = buildColumn("date", true, false); + Assert.assertEquals(ColumnType.DATE, typeConvert.convert(column)); + } + + // ---- other types remain unchanged ---- + + @Test + public void testIntConvertsToInt() { + Column column = buildColumn("int", false, true); + Assert.assertEquals(ColumnType.INT, typeConvert.convert(column)); + } + + @Test + public void testBigintNullableConvertsToLong() { + Column column = buildColumn("bigint", true, false); + Assert.assertEquals(ColumnType.JAVA_LANG_LONG, typeConvert.convert(column)); + } + + @Test + public void testVarcharConvertsToString() { + Column column = buildColumn("varchar(255)", false, true); + Assert.assertEquals(ColumnType.STRING, typeConvert.convert(column)); + } + + @Test + public void testDecimalConvertsToDecimal() { + Column column = buildColumn("decimal(10,2)", false, true); + Assert.assertEquals(ColumnType.DECIMAL, typeConvert.convert(column)); + } + + // ---- convertToDB ---- + + @Test + public void testConvertTimestampToDB() { + // TIMESTAMP has no entry in convertToDB switch -> falls through to varchar + Assert.assertEquals("varchar", typeConvert.convertToDB(ColumnType.TIMESTAMP)); + } + + @Test + public void testConvertDateToDB() { + Assert.assertEquals("varchar", typeConvert.convertToDB(ColumnType.DATE)); + } + + @Test + public void testConvertStringToDB() { + Assert.assertEquals("varchar", typeConvert.convertToDB(ColumnType.STRING)); + } + + @Test + public void testConvertIntToDB() { + Assert.assertEquals("int", typeConvert.convertToDB(ColumnType.INT)); + } + + @Test + public void testConvertLongToDB() { + Assert.assertEquals("bigint", typeConvert.convertToDB(ColumnType.LONG)); + } +} From 6f9edaf976c4ff574dff349b21e5ceadee722cbc Mon Sep 17 00:00:00 2001 From: zhuxt2015 <594754793@qq.com> Date: Thu, 19 Mar 2026 16:13:02 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E6=A0=BC=E5=BC=8F?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=EF=BC=88spotless=20apply=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/org/dinky/cdc/sql/SQLSinkBuilder.java | 3 ++- .../org/dinky/cdc/postgres/PostgresCDCBuilderTest.java | 5 ++++- .../org/dinky/trans/ddl/CreateCDCSourceOperation.java | 7 ++++--- .../dinky/trans/ddl/CreateCDCSourceOperationTest.java | 10 ++++------ 4 files changed, 14 insertions(+), 11 deletions(-) diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java index 82ee20f69f..1bbb7d65f1 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java @@ -105,7 +105,8 @@ private List addSinkInsert( sinkTableObject = table; } String pkList = StringUtils.join(getPKList(sinkTableObject), "."); - String flinkDDL = FlinkStatementUtil.getFlinkDDL(sinkTableObject, targetTable, config, sinkSchemaName, sinkTable, pkList); + String flinkDDL = + FlinkStatementUtil.getFlinkDDL(sinkTableObject, targetTable, config, sinkSchemaName, sinkTable, pkList); logger.info(flinkDDL); customTableEnvironment.executeSql(flinkDDL); logger.info("Create {} FlinkSQL DDL successful...", targetTable); diff --git a/dinky-cdc/dinky-cdc-core/src/test/java/org/dinky/cdc/postgres/PostgresCDCBuilderTest.java b/dinky-cdc/dinky-cdc-core/src/test/java/org/dinky/cdc/postgres/PostgresCDCBuilderTest.java index 4e7ecab2bf..bed24c94db 100644 --- a/dinky-cdc/dinky-cdc-core/src/test/java/org/dinky/cdc/postgres/PostgresCDCBuilderTest.java +++ b/dinky-cdc/dinky-cdc-core/src/test/java/org/dinky/cdc/postgres/PostgresCDCBuilderTest.java @@ -108,7 +108,10 @@ public void testParseMetaDataConfig_multipleJdbcProperties() { Assert.assertTrue("URL should contain sslmode param", url.contains("sslmode=require")); Assert.assertFalse("URL must not end with '&'", url.endsWith("&")); // Exactly one '?' in the query string part - Assert.assertEquals("URL should have exactly one '?'", 1, url.chars().filter(c -> c == '?').count()); + Assert.assertEquals( + "URL should have exactly one '?'", + 1, + url.chars().filter(c -> c == '?').count()); } /** Host / port embedded correctly in URL */ diff --git a/dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java b/dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java index 81af77cd04..ea1a645890 100644 --- a/dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java +++ b/dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java @@ -130,7 +130,7 @@ public TableResult execute(Executor executor) { table.setColumns(driver.listColumnsSortByPK(realSchemaName, tableName)); schemaList.add(schema); Driver sinkRealDriver = getDriver(config, schemaName); - final List
sinkTables= getSinkTables(config, schemaName); + final List
sinkTables = getSinkTables(config, schemaName); setSinkTable(table, sinkTables, sinkBuilder, sinkRealDriver); if (null != sinkDriver) { final String createTableOptions = config.getSink().get(FlinkCDCConfig.AUTO_CREATE_OPTIONS); @@ -154,7 +154,7 @@ public TableResult execute(Executor executor) { final List
tables = driver.listTables(schemaName); Driver sinkRealDriver = getDriver(config, schemaName); - final List
sinkTables= getSinkTables(config, schemaName); + final List
sinkTables = getSinkTables(config, schemaName); for (Table table : tables) { if (!Asserts.isEquals(table.getType(), "VIEW")) { if (Asserts.isNotNullCollection(tableRegList)) { @@ -229,7 +229,8 @@ public TableResult execute(Executor executor) { return tableResultBuilder.build(); } - private static void setSinkTable(Table table, List
sinkTables, SinkBuilder sinkBuilder, Driver sinkRealDriver) { + private static void setSinkTable( + Table table, List
sinkTables, SinkBuilder sinkBuilder, Driver sinkRealDriver) { String sinkTableName = sinkBuilder.getSinkTableName(table); for (Table sinkTable : sinkTables) { String sinkTableSchema = sinkTable.getSchema(); diff --git a/dinky-core/src/test/java/org/dinky/trans/ddl/CreateCDCSourceOperationTest.java b/dinky-core/src/test/java/org/dinky/trans/ddl/CreateCDCSourceOperationTest.java index bc5a6de469..6446836a21 100644 --- a/dinky-core/src/test/java/org/dinky/trans/ddl/CreateCDCSourceOperationTest.java +++ b/dinky-core/src/test/java/org/dinky/trans/ddl/CreateCDCSourceOperationTest.java @@ -47,15 +47,14 @@ class CreateCDCSourceOperationTest { @BeforeEach void setUp() throws Exception { setSinkTableMethod = CreateCDCSourceOperation.class.getDeclaredMethod( - "setSinkTable", Table.class, List.class, SinkBuilder.class, - org.dinky.metadata.driver.Driver.class); + "setSinkTable", Table.class, List.class, SinkBuilder.class, org.dinky.metadata.driver.Driver.class); setSinkTableMethod.setAccessible(true); sinkBuilder = mock(SinkBuilder.class); } - private void invoke(Table table, List
sinkTables, SinkBuilder sb, - org.dinky.metadata.driver.Driver driver) throws Exception { + private void invoke(Table table, List
sinkTables, SinkBuilder sb, org.dinky.metadata.driver.Driver driver) + throws Exception { setSinkTableMethod.invoke(null, table, sinkTables, sb, driver); } @@ -119,8 +118,7 @@ void testMatchWithDriver_columnsAreSet() throws Exception { when(sinkBuilder.getSinkTableName(sourceTable)).thenReturn("orders"); org.dinky.metadata.driver.Driver driver = mock(org.dinky.metadata.driver.Driver.class); - when(driver.listColumnsSortByPK("public", "orders")) - .thenReturn(Collections.emptyList()); + when(driver.listColumnsSortByPK("public", "orders")).thenReturn(Collections.emptyList()); invoke(sourceTable, Collections.singletonList(sinkTable), sinkBuilder, driver); From efd9b829db883f35cd656b2227420335a8d4e193 Mon Sep 17 00:00:00 2001 From: zhuxt2015 <594754793@qq.com> Date: Fri, 20 Mar 2026 11:31:27 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E5=88=A0=E9=99=A4StarRocksTypeConvertTest.?= =?UTF-8?q?java?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../convert/StarRocksTypeConvertTest.java | 127 ------------------ 1 file changed, 127 deletions(-) delete mode 100644 dinky-metadata/dinky-metadata-starrocks/src/test/java/org/dinky/metadata/convert/StarRocksTypeConvertTest.java diff --git a/dinky-metadata/dinky-metadata-starrocks/src/test/java/org/dinky/metadata/convert/StarRocksTypeConvertTest.java b/dinky-metadata/dinky-metadata-starrocks/src/test/java/org/dinky/metadata/convert/StarRocksTypeConvertTest.java deleted file mode 100644 index be9ce64a5b..0000000000 --- a/dinky-metadata/dinky-metadata-starrocks/src/test/java/org/dinky/metadata/convert/StarRocksTypeConvertTest.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.dinky.metadata.convert; - -import org.dinky.data.enums.ColumnType; -import org.dinky.data.model.Column; - -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class StarRocksTypeConvertTest { - - private StarRocksTypeConvert typeConvert; - - @Before - public void setUp() { - typeConvert = new StarRocksTypeConvert(); - } - - private Column buildColumn(String type, boolean nullable, boolean keyFlag) { - Column column = new Column(); - column.setType(type); - column.setNullable(nullable); - column.setKeyFlag(keyFlag); - return column; - } - - // ---- datetime -> TIMESTAMP (was STRING before fix) ---- - - @Test - public void testDatetimeConvertsToTimestamp() { - Column column = buildColumn("datetime", false, true); - Assert.assertEquals(ColumnType.TIMESTAMP, typeConvert.convert(column)); - } - - @Test - public void testDatetimeNullableConvertsToTimestamp() { - Column column = buildColumn("datetime", true, false); - Assert.assertEquals(ColumnType.TIMESTAMP, typeConvert.convert(column)); - } - - // ---- date -> DATE (was STRING before fix) ---- - - @Test - public void testDateConvertsToDate() { - Column column = buildColumn("date", false, true); - Assert.assertEquals(ColumnType.DATE, typeConvert.convert(column)); - } - - @Test - public void testDateNullableConvertsToDate() { - Column column = buildColumn("date", true, false); - Assert.assertEquals(ColumnType.DATE, typeConvert.convert(column)); - } - - // ---- other types remain unchanged ---- - - @Test - public void testIntConvertsToInt() { - Column column = buildColumn("int", false, true); - Assert.assertEquals(ColumnType.INT, typeConvert.convert(column)); - } - - @Test - public void testBigintNullableConvertsToLong() { - Column column = buildColumn("bigint", true, false); - Assert.assertEquals(ColumnType.JAVA_LANG_LONG, typeConvert.convert(column)); - } - - @Test - public void testVarcharConvertsToString() { - Column column = buildColumn("varchar(255)", false, true); - Assert.assertEquals(ColumnType.STRING, typeConvert.convert(column)); - } - - @Test - public void testDecimalConvertsToDecimal() { - Column column = buildColumn("decimal(10,2)", false, true); - Assert.assertEquals(ColumnType.DECIMAL, typeConvert.convert(column)); - } - - // ---- convertToDB ---- - - @Test - public void testConvertTimestampToDB() { - // TIMESTAMP has no entry in convertToDB switch -> falls through to varchar - Assert.assertEquals("varchar", typeConvert.convertToDB(ColumnType.TIMESTAMP)); - } - - @Test - public void testConvertDateToDB() { - Assert.assertEquals("varchar", typeConvert.convertToDB(ColumnType.DATE)); - } - - @Test - public void testConvertStringToDB() { - Assert.assertEquals("varchar", typeConvert.convertToDB(ColumnType.STRING)); - } - - @Test - public void testConvertIntToDB() { - Assert.assertEquals("int", typeConvert.convertToDB(ColumnType.INT)); - } - - @Test - public void testConvertLongToDB() { - Assert.assertEquals("bigint", typeConvert.convertToDB(ColumnType.LONG)); - } -}