diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/AbstractDatabaseLookupService.java b/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/AbstractDatabaseLookupService.java index 59ce5e01d9c9..8b29fe67df5c 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/AbstractDatabaseLookupService.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/AbstractDatabaseLookupService.java @@ -23,17 +23,16 @@ import org.apache.nifi.migration.PropertyConfiguration; import org.apache.nifi.processor.util.StandardValidators; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; public class AbstractDatabaseLookupService extends AbstractControllerService { static final String KEY = "key"; - static final Set REQUIRED_KEYS = Set.of( - KEY - ); - static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder() .name("Database Connection Pooling Service") .description("The Controller Service that is used to obtain connection to database") @@ -51,8 +50,11 @@ public class AbstractDatabaseLookupService extends AbstractControllerService { static final PropertyDescriptor LOOKUP_KEY_COLUMN = new PropertyDescriptor.Builder() .name("Lookup Key Column") - .description("The column in the table that will serve as the lookup key. This is the column that will be matched against " - + "the property specified in the lookup processor. Note that this may be case-sensitive depending on the database.") + .description("The column(s) in the table that will serve as the lookup key. For composite key lookups, " + + "specify a comma-separated list of column names (e.g. 'col1, col2'). When a single column is specified, " + + "the lookup processor should pass a coordinate named 'key'. When multiple columns are specified, " + + "the lookup processor should pass coordinates whose names match the column names. " + + "Note that this may be case-sensitive depending on the database.") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) @@ -93,6 +95,22 @@ public class AbstractDatabaseLookupService extends AbstractControllerService { volatile String lookupKeyColumn; + volatile List lookupKeyColumns; + + static List parseKeyColumns(final String value) { + if (value == null || value.isBlank()) { + return Collections.emptyList(); + } + return Arrays.stream(value.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); + } + + boolean isCompositeKey() { + return lookupKeyColumns != null && lookupKeyColumns.size() > 1; + } + @Override public void migrateProperties(PropertyConfiguration config) { config.renameProperty("dbrecord-lookup-dbcp-service", DBCP_SERVICE.getName()); diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java b/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java index d933f8c4c222..acd5f9bfc296 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java @@ -34,7 +34,6 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.ResultSetRecordSet; -import org.apache.nifi.util.Tuple; import org.apache.nifi.util.db.JdbcProperties; import java.io.IOException; @@ -42,6 +41,8 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Types; +import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedHashSet; import java.util.List; @@ -49,6 +50,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_PRECISION; @@ -56,11 +58,14 @@ @Tags({"lookup", "cache", "enrich", "join", "rdbms", "database", "reloadable", "key", "value", "record"}) @CapabilityDescription("A relational-database-based lookup service. When the lookup key is found in the database, " - + "the specified columns (or all if Lookup Value Columns are not specified) are returned as a Record. Only one row " + + "the specified columns (or all if Lookup Value Columns are not specified) are returned as a Record. " + + "Supports composite (multi-column) primary key lookups by specifying a comma-separated list of column names " + + "in the Lookup Key Column property. When multiple key columns are configured, the lookup processor should " + + "pass coordinates whose names match the column names. Only one row " + "will be returned for each lookup, duplicate database entries are ignored.") public class DatabaseRecordLookupService extends AbstractDatabaseLookupService implements RecordLookupService { - private volatile Cache, Record> cache; + private volatile Cache, Record> cache; static final PropertyDescriptor LOOKUP_VALUE_COLUMNS = new PropertyDescriptor.Builder() .name("Lookup Value Columns") @@ -91,6 +96,7 @@ protected void init(final ControllerServiceInitializationContext context) { public void onEnabled(final ConfigurationContext context) { this.dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); this.lookupKeyColumn = context.getProperty(LOOKUP_KEY_COLUMN).evaluateAttributeExpressions().getValue(); + this.lookupKeyColumns = parseKeyColumns(this.lookupKeyColumn); final int cacheSize = context.getProperty(CACHE_SIZE).evaluateAttributeExpressions().asInteger(); final boolean clearCache = context.getProperty(CLEAR_CACHE_ON_ENABLED).asBoolean(); final long durationNanos = context.getProperty(CACHE_EXPIRATION).isSet() ? context.getProperty(CACHE_EXPIRATION).evaluateAttributeExpressions().asTimePeriod(TimeUnit.NANOSECONDS) : 0L; @@ -98,19 +104,19 @@ public void onEnabled(final ConfigurationContext context) { if (durationNanos > 0) { this.cache = Caffeine.newBuilder() .maximumSize(cacheSize) - .expireAfter(new Expiry, Record>() { + .expireAfter(new Expiry, Record>() { @Override - public long expireAfterCreate(Tuple stringObjectTuple, Record record, long currentTime) { + public long expireAfterCreate(List key, Record record, long currentTime) { return durationNanos; } @Override - public long expireAfterUpdate(Tuple stringObjectTuple, Record record, long currentTime, long currentDuration) { + public long expireAfterUpdate(List key, Record record, long currentTime, long currentDuration) { return currentDuration; } @Override - public long expireAfterRead(Tuple stringObjectTuple, Record record, long currentTime, long currentDuration) { + public long expireAfterRead(List key, Record record, long currentTime, long currentDuration) { return currentDuration; } }) @@ -134,9 +140,22 @@ public Optional lookup(final Map coordinates, Map keyValues = new ArrayList<>(lookupKeyColumns.size()); + if (isCompositeKey()) { + for (final String col : lookupKeyColumns) { + final Object val = coordinates.get(col); + if (val == null || StringUtils.isBlank(val.toString())) { + return Optional.empty(); + } + keyValues.add(val); + } + } else { + final Object key = coordinates.get(KEY); + if (key == null || StringUtils.isBlank(key.toString())) { + return Optional.empty(); + } + keyValues.add(key); } final String tableName = getProperty(TABLE_NAME).evaluateAttributeExpressions(context).getValue(); @@ -155,31 +174,53 @@ public Optional lookup(final Map coordinates, Map cacheLookupKey = new Tuple<>(tableName, key); + // Cache key includes table name and all key values + final List cacheKey = new ArrayList<>(keyValues.size() + 1); + cacheKey.add(tableName); + cacheKey.addAll(keyValues); // Not using the function param of cache.get so we can catch and handle the checked exceptions - Record foundRecord = cache.get(cacheLookupKey, k -> null); + Record foundRecord = cache.get(cacheKey, k -> null); if (foundRecord == null) { - final String selectQuery = "SELECT " + lookupValueColumns + " FROM " + tableName + " WHERE " + lookupKeyColumn + " = ?"; + final String whereClause = lookupKeyColumns.stream() + .map(col -> col + " = ?") + .collect(Collectors.joining(" AND ")); + final String selectQuery = "SELECT " + lookupValueColumns + " FROM " + tableName + " WHERE " + whereClause; try (final Connection con = dbcpService.getConnection(context); final PreparedStatement st = con.prepareStatement(selectQuery)) { - st.setObject(1, key); - ResultSet resultSet = st.executeQuery(); - ResultSetRecordSet resultSetRecordSet = new ResultSetRecordSet(resultSet, null, defaultPrecision, defaultScale); - foundRecord = resultSetRecordSet.next(); + for (int i = 0; i < keyValues.size(); i++) { + Object value = keyValues.get(i); + try { + final int sqlType = st.getParameterMetaData().getParameterType(i + 1); + if (value instanceof String) { + value = coerceValue((String) value, sqlType); + } + st.setObject(i + 1, value, sqlType); + } catch (final SQLException | NullPointerException e) { + if (value instanceof String) { + value = coerceStringValue((String) value); + } + st.setObject(i + 1, value); + } + } + + final ResultSet resultSet = st.executeQuery(); + try (final ResultSetRecordSet resultSetRecordSet = new ResultSetRecordSet(resultSet, null, defaultPrecision, defaultScale)) { + foundRecord = resultSetRecordSet.next(); + } // Populate the cache if the record is present if (foundRecord != null) { - cache.put(cacheLookupKey, foundRecord); + cache.put(cacheKey, foundRecord); } - } catch (SQLException se) { - throw new LookupFailureException("Error executing SQL statement: " + selectQuery + "for value " + key.toString() + } catch (final SQLException se) { + throw new LookupFailureException("Error executing SQL statement: " + selectQuery + " for values " + keyValues + " : " + (se.getCause() == null ? se.getMessage() : se.getCause().getMessage()), se); - } catch (IOException ioe) { - throw new LookupFailureException("Error retrieving result set for SQL statement: " + selectQuery + "for value " + key.toString() + } catch (final IOException ioe) { + throw new LookupFailureException("Error retrieving result set for SQL statement: " + selectQuery + " for values " + keyValues + " : " + (ioe.getCause() == null ? ioe.getMessage() : ioe.getCause().getMessage()), ioe); } } @@ -187,13 +228,49 @@ public Optional lookup(final Map coordinates, Map getRequiredKeys() { - return REQUIRED_KEYS; + if (isCompositeKey()) { + return new LinkedHashSet<>(lookupKeyColumns); + } + return Set.of(KEY); } @Override diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/SimpleDatabaseLookupService.java b/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/SimpleDatabaseLookupService.java index e90f015612cf..26bd641f3e14 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/SimpleDatabaseLookupService.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/SimpleDatabaseLookupService.java @@ -32,24 +32,29 @@ import org.apache.nifi.lookup.StringLookupService; import org.apache.nifi.migration.PropertyConfiguration; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.util.Tuple; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Types; +import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; @Tags({"lookup", "cache", "enrich", "join", "rdbms", "database", "reloadable", "key", "value"}) @CapabilityDescription("A relational-database-based lookup service. When the lookup key is found in the database, " + - "the specified lookup value column is returned. Only one value will be returned for each lookup, duplicate database entries are ignored.") + "the specified lookup value column is returned. Supports composite (multi-column) primary key lookups by " + + "specifying a comma-separated list of column names in the Lookup Key Column property. " + + "Only one value will be returned for each lookup, duplicate database entries are ignored.") public class SimpleDatabaseLookupService extends AbstractDatabaseLookupService implements StringLookupService { - private volatile Cache, String> cache; + private volatile Cache, String> cache; static final PropertyDescriptor LOOKUP_VALUE_COLUMN = new PropertyDescriptor.Builder() @@ -79,6 +84,7 @@ protected void init(final ControllerServiceInitializationContext context) { public void onEnabled(final ConfigurationContext context) { this.dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); this.lookupKeyColumn = context.getProperty(LOOKUP_KEY_COLUMN).evaluateAttributeExpressions().getValue(); + this.lookupKeyColumns = parseKeyColumns(this.lookupKeyColumn); int cacheSize = context.getProperty(CACHE_SIZE).evaluateAttributeExpressions().asInteger(); boolean clearCache = context.getProperty(CLEAR_CACHE_ON_ENABLED).asBoolean(); final long durationNanos = context.getProperty(CACHE_EXPIRATION).isSet() ? context.getProperty(CACHE_EXPIRATION).evaluateAttributeExpressions().asTimePeriod(TimeUnit.NANOSECONDS) : 0L; @@ -86,19 +92,19 @@ public void onEnabled(final ConfigurationContext context) { if (durationNanos > 0) { this.cache = Caffeine.newBuilder() .maximumSize(cacheSize) - .expireAfter(new Expiry, Object>() { + .expireAfter(new Expiry, String>() { @Override - public long expireAfterCreate(Tuple stringObjectTuple, Object value, long currentTime) { + public long expireAfterCreate(List key, String value, long currentTime) { return durationNanos; } @Override - public long expireAfterUpdate(Tuple stringObjectTuple, Object value, long currentTime, long currentDuration) { + public long expireAfterUpdate(List key, String value, long currentTime, long currentDuration) { return currentDuration; } @Override - public long expireAfterRead(Tuple stringObjectTuple, Object value, long currentTime, long currentDuration) { + public long expireAfterRead(List key, String value, long currentTime, long currentDuration) { return currentDuration; } }) @@ -122,25 +128,59 @@ public Optional lookup(Map coordinates, Map keyValues = new ArrayList<>(lookupKeyColumns.size()); + if (isCompositeKey()) { + for (final String col : lookupKeyColumns) { + final Object val = coordinates.get(col); + if (val == null || StringUtils.isBlank(val.toString())) { + return Optional.empty(); + } + keyValues.add(val); + } + } else { + final Object key = coordinates.get(KEY); + if (key == null || StringUtils.isBlank(key.toString())) { + return Optional.empty(); + } + keyValues.add(key); } final String tableName = getProperty(TABLE_NAME).evaluateAttributeExpressions(context).getValue(); final String lookupValueColumn = getProperty(LOOKUP_VALUE_COLUMN).evaluateAttributeExpressions(context).getValue(); - Tuple cacheLookupKey = new Tuple<>(tableName, key); + // Cache key includes table name and all key values + final List cacheKey = new ArrayList<>(keyValues.size() + 1); + cacheKey.add(tableName); + cacheKey.addAll(keyValues); // Not using the function param of cache.get so we can catch and handle the checked exceptions - String foundRecord = cache.get(cacheLookupKey, k -> null); + String foundRecord = cache.get(cacheKey, k -> null); if (foundRecord == null) { - final String selectQuery = "SELECT " + lookupValueColumn + " FROM " + tableName + " WHERE " + lookupKeyColumn + " = ?"; + final String whereClause = lookupKeyColumns.stream() + .map(col -> col + " = ?") + .collect(Collectors.joining(" AND ")); + final String selectQuery = "SELECT " + lookupValueColumn + " FROM " + tableName + " WHERE " + whereClause; try (final Connection con = dbcpService.getConnection(context); final PreparedStatement st = con.prepareStatement(selectQuery)) { - st.setObject(1, key); + for (int i = 0; i < keyValues.size(); i++) { + Object value = keyValues.get(i); + try { + final int sqlType = st.getParameterMetaData().getParameterType(i + 1); + if (value instanceof String) { + value = coerceValue((String) value, sqlType); + } + st.setObject(i + 1, value, sqlType); + } catch (final SQLException | NullPointerException e) { + if (value instanceof String) { + value = coerceStringValue((String) value); + } + st.setObject(i + 1, value); + } + } + ResultSet resultSet = st.executeQuery(); if (!resultSet.next()) { @@ -155,11 +195,11 @@ public Optional lookup(Map coordinates, Map lookup(Map coordinates, Map getRequiredKeys() { - return REQUIRED_KEYS; + if (isCompositeKey()) { + return new LinkedHashSet<>(lookupKeyColumns); + } + return Set.of(KEY); } @Override diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/db/TestDatabaseRecordLookupService.java b/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/db/TestDatabaseRecordLookupService.java index 7d961a947160..76276719d8a4 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/db/TestDatabaseRecordLookupService.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/db/TestDatabaseRecordLookupService.java @@ -41,8 +41,10 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.Collections; +import java.util.LinkedHashSet; import java.util.Map; import java.util.Optional; +import java.util.Set; import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_PRECISION; import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_SCALE; @@ -186,4 +188,84 @@ private void assertPreparedStatementExpected() throws SQLException { final String statement = statementCaptor.getValue(); assertEquals(EXPECTED_STATEMENT, statement); } + + // --- Composite key tests --- + + private static final String COMPOSITE_KEY_COLUMN_1 = "system_id"; + private static final String COMPOSITE_KEY_COLUMN_2 = "fund_code"; + private static final String COMPOSITE_KEY_COLUMNS = COMPOSITE_KEY_COLUMN_1 + ", " + COMPOSITE_KEY_COLUMN_2; + private static final String COMPOSITE_KEY_VALUE_1 = "PML"; + private static final int COMPOSITE_KEY_VALUE_2 = 13128; + private static final String COMPOSITE_EXPECTED_STATEMENT = String.format( + "SELECT %s FROM %s WHERE %s = ? AND %s = ?", + LOOKUP_VALUE_COLUMN, TABLE_NAME, COMPOSITE_KEY_COLUMN_1, COMPOSITE_KEY_COLUMN_2); + + private void setUpCompositeKey() { + runner.setProperty(lookupService, DatabaseRecordLookupService.LOOKUP_KEY_COLUMN, COMPOSITE_KEY_COLUMNS); + } + + @Test + void testCompositeKeyLookupEmpty() throws LookupFailureException, SQLException { + setUpCompositeKey(); + runner.enableControllerService(lookupService); + + setConnection(); + + final Map coordinates = Map.of( + COMPOSITE_KEY_COLUMN_1, COMPOSITE_KEY_VALUE_1, + COMPOSITE_KEY_COLUMN_2, COMPOSITE_KEY_VALUE_2); + final Optional lookupFound = lookupService.lookup(coordinates); + + assertFalse(lookupFound.isPresent()); + verify(connection).prepareStatement(statementCaptor.capture()); + assertEquals(COMPOSITE_EXPECTED_STATEMENT, statementCaptor.getValue()); + } + + @Test + void testCompositeKeyLookupFound() throws LookupFailureException, SQLException { + setUpCompositeKey(); + runner.enableControllerService(lookupService); + + setConnection(); + setResultSetMetaData(); + + when(resultSet.next()).thenReturn(true); + when(resultSet.getObject(eq(LOOKUP_VALUE_COLUMN))).thenReturn(LOOKUP_VALUE); + + final Map coordinates = Map.of( + COMPOSITE_KEY_COLUMN_1, COMPOSITE_KEY_VALUE_1, + COMPOSITE_KEY_COLUMN_2, COMPOSITE_KEY_VALUE_2); + final Optional lookupFound = lookupService.lookup(coordinates); + + assertTrue(lookupFound.isPresent()); + } + + @Test + void testCompositeKeyLookupMissingCoordinate() throws LookupFailureException, SQLException { + setUpCompositeKey(); + runner.enableControllerService(lookupService); + + // Only provide one of the two required keys + final Map coordinates = Map.of(COMPOSITE_KEY_COLUMN_1, COMPOSITE_KEY_VALUE_1); + final Optional lookupFound = lookupService.lookup(coordinates); + + assertFalse(lookupFound.isPresent()); + } + + @Test + void testCompositeKeyGetRequiredKeys() { + setUpCompositeKey(); + runner.enableControllerService(lookupService); + + final Set requiredKeys = lookupService.getRequiredKeys(); + assertEquals(new LinkedHashSet<>(java.util.List.of(COMPOSITE_KEY_COLUMN_1, COMPOSITE_KEY_COLUMN_2)), requiredKeys); + } + + @Test + void testSingleKeyGetRequiredKeys() { + runner.enableControllerService(lookupService); + + final Set requiredKeys = lookupService.getRequiredKeys(); + assertEquals(Set.of("key"), requiredKeys); + } } diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/db/TestSimpleDatabaseLookupService.java b/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/db/TestSimpleDatabaseLookupService.java index e1dcf3061e1d..ffa314da8287 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/db/TestSimpleDatabaseLookupService.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/db/TestSimpleDatabaseLookupService.java @@ -37,8 +37,10 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.Collections; +import java.util.LinkedHashSet; import java.util.Map; import java.util.Optional; +import java.util.Set; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -166,4 +168,85 @@ private void assertPreparedStatementExpected() throws SQLException { final String statement = statementCaptor.getValue(); assertEquals(EXPECTED_STATEMENT, statement); } + + // --- Composite key tests --- + + private static final String COMPOSITE_KEY_COLUMN_1 = "system_id"; + private static final String COMPOSITE_KEY_COLUMN_2 = "fund_code"; + private static final String COMPOSITE_KEY_COLUMNS = COMPOSITE_KEY_COLUMN_1 + ", " + COMPOSITE_KEY_COLUMN_2; + private static final String COMPOSITE_KEY_VALUE_1 = "PML"; + private static final int COMPOSITE_KEY_VALUE_2 = 13128; + private static final String COMPOSITE_EXPECTED_STATEMENT = String.format( + "SELECT %s FROM %s WHERE %s = ? AND %s = ?", + LOOKUP_VALUE_COLUMN, TABLE_NAME, COMPOSITE_KEY_COLUMN_1, COMPOSITE_KEY_COLUMN_2); + + private void setUpCompositeKey() { + runner.setProperty(lookupService, SimpleDatabaseLookupService.LOOKUP_KEY_COLUMN, COMPOSITE_KEY_COLUMNS); + } + + @Test + void testCompositeKeyLookupEmpty() throws LookupFailureException, SQLException { + setUpCompositeKey(); + runner.enableControllerService(lookupService); + + setConnection(); + + final Map coordinates = Map.of( + COMPOSITE_KEY_COLUMN_1, COMPOSITE_KEY_VALUE_1, + COMPOSITE_KEY_COLUMN_2, COMPOSITE_KEY_VALUE_2); + final Optional lookupFound = lookupService.lookup(coordinates); + + assertFalse(lookupFound.isPresent()); + verify(connection).prepareStatement(statementCaptor.capture()); + assertEquals(COMPOSITE_EXPECTED_STATEMENT, statementCaptor.getValue()); + } + + @Test + void testCompositeKeyLookupFound() throws LookupFailureException, SQLException { + setUpCompositeKey(); + runner.enableControllerService(lookupService); + + setConnection(); + when(resultSet.next()).thenReturn(true); + when(resultSet.getObject(eq(LOOKUP_VALUE_COLUMN))).thenReturn(LOOKUP_VALUE); + + final Map coordinates = Map.of( + COMPOSITE_KEY_COLUMN_1, COMPOSITE_KEY_VALUE_1, + COMPOSITE_KEY_COLUMN_2, COMPOSITE_KEY_VALUE_2); + final Optional lookupFound = lookupService.lookup(coordinates); + + assertTrue(lookupFound.isPresent()); + assertEquals(LOOKUP_VALUE, lookupFound.get()); + verify(connection).prepareStatement(statementCaptor.capture()); + assertEquals(COMPOSITE_EXPECTED_STATEMENT, statementCaptor.getValue()); + } + + @Test + void testCompositeKeyLookupMissingCoordinate() throws LookupFailureException, SQLException { + setUpCompositeKey(); + runner.enableControllerService(lookupService); + + // Only provide one of the two required keys + final Map coordinates = Map.of(COMPOSITE_KEY_COLUMN_1, COMPOSITE_KEY_VALUE_1); + final Optional lookupFound = lookupService.lookup(coordinates); + + assertFalse(lookupFound.isPresent()); + } + + @Test + void testCompositeKeyGetRequiredKeys() { + setUpCompositeKey(); + runner.enableControllerService(lookupService); + + final Set requiredKeys = lookupService.getRequiredKeys(); + assertEquals(new LinkedHashSet<>(java.util.List.of(COMPOSITE_KEY_COLUMN_1, COMPOSITE_KEY_COLUMN_2)), requiredKeys); + } + + @Test + void testSingleKeyGetRequiredKeys() { + runner.enableControllerService(lookupService); + + final Set requiredKeys = lookupService.getRequiredKeys(); + assertEquals(Set.of("key"), requiredKeys); + } }