Skip to content

Commit 1118838

Browse files
authored
Flink: Dynamic Sink: Handle NoSuchNamespaceException properly (#14812)
1 parent 122c440 commit 1118838

File tree

3 files changed

+74
-2
lines changed

3 files changed

+74
-2
lines changed

flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.iceberg.Table;
3030
import org.apache.iceberg.catalog.Catalog;
3131
import org.apache.iceberg.catalog.TableIdentifier;
32+
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
3233
import org.apache.iceberg.exceptions.NoSuchTableException;
3334
import org.apache.iceberg.flink.FlinkSchemaUtil;
3435
import org.slf4j.Logger;
@@ -198,8 +199,8 @@ private Tuple2<Boolean, Exception> refreshTable(TableIdentifier identifier) {
198199
Table table = catalog.loadTable(identifier);
199200
update(identifier, table);
200201
return EXISTS;
201-
} catch (NoSuchTableException e) {
202-
LOG.debug("Table doesn't exist {}", identifier, e);
202+
} catch (NoSuchTableException | NoSuchNamespaceException e) {
203+
LOG.debug("Table or namespace doesn't exist {}", identifier, e);
203204
tableCache.put(
204205
identifier, new CacheItem(cacheRefreshClock.millis(), false, null, null, null, 1));
205206
return Tuple2.of(false, e);

flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,4 +130,28 @@ void testNoCacheRefreshingBeforeRefreshIntervalElapses() {
130130
assertThat(cacheItem).isNotNull();
131131
assertThat(cacheItem.inputSchemas()).containsKeys(SCHEMA, SCHEMA2);
132132
}
133+
134+
@Test
135+
void testNoSuchNamespaceExceptionHandling() {
136+
Catalog catalog = CATALOG_EXTENSION.catalog();
137+
TableIdentifier tableIdentifier = TableIdentifier.of("nonexistent_namespace", "myTable");
138+
TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10);
139+
140+
TableMetadataCache.ResolvedSchemaInfo result = cache.schema(tableIdentifier, SCHEMA, false);
141+
142+
assertThat(result).isEqualTo(TableMetadataCache.NOT_FOUND);
143+
assertThat(cache.getInternalCache().get(tableIdentifier)).isNotNull();
144+
}
145+
146+
@Test
147+
void testNoSuchTableExceptionHandling() {
148+
Catalog catalog = CATALOG_EXTENSION.catalog();
149+
TableIdentifier tableIdentifier = TableIdentifier.parse("default.nonexistent_table");
150+
TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10);
151+
152+
TableMetadataCache.ResolvedSchemaInfo result = cache.schema(tableIdentifier, SCHEMA, false);
153+
154+
assertThat(result).isEqualTo(TableMetadataCache.NOT_FOUND);
155+
assertThat(cache.getInternalCache().get(tableIdentifier)).isNotNull();
156+
}
133157
}

flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.iceberg.Table;
2929
import org.apache.iceberg.catalog.Catalog;
3030
import org.apache.iceberg.catalog.Namespace;
31+
import org.apache.iceberg.catalog.SupportsNamespaces;
3132
import org.apache.iceberg.catalog.TableIdentifier;
3233
import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase;
3334
import org.apache.iceberg.inmemory.InMemoryCatalog;
@@ -211,4 +212,50 @@ void testDropUnusedColumns() {
211212
assertThat(tableSchema.findField("data")).isNotNull();
212213
assertThat(tableSchema.findField("extra")).isNull();
213214
}
215+
216+
@Test
217+
void testNamespaceAndTableCreation() {
218+
Catalog catalog = CATALOG_EXTENSION.catalog();
219+
SupportsNamespaces namespaceCatalog = (SupportsNamespaces) catalog;
220+
TableIdentifier tableIdentifier = TableIdentifier.of("new_namespace", "myTable");
221+
TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10);
222+
TableUpdater tableUpdater = new TableUpdater(cache, catalog, false);
223+
224+
assertThat(namespaceCatalog.namespaceExists(Namespace.of("new_namespace"))).isFalse();
225+
assertThat(catalog.tableExists(tableIdentifier)).isFalse();
226+
227+
Tuple2<TableMetadataCache.ResolvedSchemaInfo, PartitionSpec> result =
228+
tableUpdater.update(
229+
tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned(), TableCreator.DEFAULT);
230+
231+
assertThat(namespaceCatalog.namespaceExists(Namespace.of("new_namespace"))).isTrue();
232+
233+
assertThat(catalog.tableExists(tableIdentifier)).isTrue();
234+
assertThat(result.f0.resolvedTableSchema().sameSchema(SCHEMA)).isTrue();
235+
assertThat(result.f0.compareResult()).isEqualTo(CompareSchemasVisitor.Result.SAME);
236+
}
237+
238+
@Test
239+
void testTableCreationWithExistingNamespace() {
240+
Catalog catalog = CATALOG_EXTENSION.catalog();
241+
SupportsNamespaces namespaceCatalog = (SupportsNamespaces) catalog;
242+
Namespace namespace = Namespace.of("existing_namespace");
243+
namespaceCatalog.createNamespace(namespace);
244+
245+
TableIdentifier tableIdentifier = TableIdentifier.of("existing_namespace", "myTable");
246+
TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10);
247+
TableUpdater tableUpdater = new TableUpdater(cache, catalog, false);
248+
249+
assertThat(namespaceCatalog.namespaceExists(namespace)).isTrue();
250+
assertThat(catalog.tableExists(tableIdentifier)).isFalse();
251+
252+
Tuple2<TableMetadataCache.ResolvedSchemaInfo, PartitionSpec> result =
253+
tableUpdater.update(
254+
tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned(), TableCreator.DEFAULT);
255+
256+
assertThat(namespaceCatalog.namespaceExists(namespace)).isTrue();
257+
assertThat(catalog.tableExists(tableIdentifier)).isTrue();
258+
assertThat(result.f0.resolvedTableSchema().sameSchema(SCHEMA)).isTrue();
259+
assertThat(result.f0.compareResult()).isEqualTo(CompareSchemasVisitor.Result.SAME);
260+
}
214261
}

0 commit comments

Comments
 (0)