Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,16 @@
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.util.StandardValidators;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class AbstractDatabaseLookupService extends AbstractControllerService {

static final String KEY = "key";

static final Set<String> REQUIRED_KEYS = Set.of(
KEY
);

static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
.name("Database Connection Pooling Service")
.description("The Controller Service that is used to obtain connection to database")
Expand All @@ -51,8 +50,11 @@ public class AbstractDatabaseLookupService extends AbstractControllerService {

static final PropertyDescriptor LOOKUP_KEY_COLUMN = new PropertyDescriptor.Builder()
.name("Lookup Key Column")
.description("The column in the table that will serve as the lookup key. This is the column that will be matched against "
+ "the property specified in the lookup processor. Note that this may be case-sensitive depending on the database.")
.description("The column(s) in the table that will serve as the lookup key. For composite key lookups, "
+ "specify a comma-separated list of column names (e.g. 'col1, col2'). When a single column is specified, "
+ "the lookup processor should pass a coordinate named 'key'. When multiple columns are specified, "
+ "the lookup processor should pass coordinates whose names match the column names. "
+ "Note that this may be case-sensitive depending on the database.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
Expand Down Expand Up @@ -93,6 +95,22 @@ public class AbstractDatabaseLookupService extends AbstractControllerService {

volatile String lookupKeyColumn;

volatile List<String> lookupKeyColumns;

static List<String> parseKeyColumns(final String value) {
if (value == null || value.isBlank()) {
return Collections.emptyList();
}
return Arrays.stream(value.split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
}

boolean isCompositeKey() {
return lookupKeyColumns != null && lookupKeyColumns.size() > 1;
}

@Override
public void migrateProperties(PropertyConfiguration config) {
config.renameProperty("dbrecord-lookup-dbcp-service", DBCP_SERVICE.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,33 +34,38 @@
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.ResultSetRecordSet;
import org.apache.nifi.util.Tuple;
import org.apache.nifi.util.db.JdbcProperties;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_PRECISION;
import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_SCALE;

@Tags({"lookup", "cache", "enrich", "join", "rdbms", "database", "reloadable", "key", "value", "record"})
@CapabilityDescription("A relational-database-based lookup service. When the lookup key is found in the database, "
+ "the specified columns (or all if Lookup Value Columns are not specified) are returned as a Record. Only one row "
+ "the specified columns (or all if Lookup Value Columns are not specified) are returned as a Record. "
+ "Supports composite (multi-column) primary key lookups by specifying a comma-separated list of column names "
+ "in the Lookup Key Column property. When multiple key columns are configured, the lookup processor should "
+ "pass coordinates whose names match the column names. Only one row "
+ "will be returned for each lookup, duplicate database entries are ignored.")
public class DatabaseRecordLookupService extends AbstractDatabaseLookupService implements RecordLookupService {

private volatile Cache<Tuple<String, Object>, Record> cache;
private volatile Cache<List<Object>, Record> cache;

static final PropertyDescriptor LOOKUP_VALUE_COLUMNS = new PropertyDescriptor.Builder()
.name("Lookup Value Columns")
Expand Down Expand Up @@ -91,26 +96,27 @@ protected void init(final ControllerServiceInitializationContext context) {
public void onEnabled(final ConfigurationContext context) {
this.dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
this.lookupKeyColumn = context.getProperty(LOOKUP_KEY_COLUMN).evaluateAttributeExpressions().getValue();
this.lookupKeyColumns = parseKeyColumns(this.lookupKeyColumn);
final int cacheSize = context.getProperty(CACHE_SIZE).evaluateAttributeExpressions().asInteger();
final boolean clearCache = context.getProperty(CLEAR_CACHE_ON_ENABLED).asBoolean();
final long durationNanos = context.getProperty(CACHE_EXPIRATION).isSet() ? context.getProperty(CACHE_EXPIRATION).evaluateAttributeExpressions().asTimePeriod(TimeUnit.NANOSECONDS) : 0L;
if (this.cache == null || (cacheSize > 0 && clearCache)) {
if (durationNanos > 0) {
this.cache = Caffeine.newBuilder()
.maximumSize(cacheSize)
.expireAfter(new Expiry<Tuple<String, Object>, Record>() {
.expireAfter(new Expiry<List<Object>, Record>() {
@Override
public long expireAfterCreate(Tuple<String, Object> stringObjectTuple, Record record, long currentTime) {
public long expireAfterCreate(List<Object> key, Record record, long currentTime) {
return durationNanos;
}

@Override
public long expireAfterUpdate(Tuple<String, Object> stringObjectTuple, Record record, long currentTime, long currentDuration) {
public long expireAfterUpdate(List<Object> key, Record record, long currentTime, long currentDuration) {
return currentDuration;
}

@Override
public long expireAfterRead(Tuple<String, Object> stringObjectTuple, Record record, long currentTime, long currentDuration) {
public long expireAfterRead(List<Object> key, Record record, long currentTime, long currentDuration) {
return currentDuration;
}
})
Expand All @@ -134,9 +140,22 @@ public Optional<Record> lookup(final Map<String, Object> coordinates, Map<String
return Optional.empty();
}

final Object key = coordinates.get(KEY);
if (StringUtils.isBlank(key.toString())) {
return Optional.empty();
// Build ordered list of key values from coordinates
final List<Object> keyValues = new ArrayList<>(lookupKeyColumns.size());
if (isCompositeKey()) {
for (final String col : lookupKeyColumns) {
final Object val = coordinates.get(col);
if (val == null || StringUtils.isBlank(val.toString())) {
return Optional.empty();
}
keyValues.add(val);
}
} else {
final Object key = coordinates.get(KEY);
if (key == null || StringUtils.isBlank(key.toString())) {
return Optional.empty();
}
keyValues.add(key);
}

final String tableName = getProperty(TABLE_NAME).evaluateAttributeExpressions(context).getValue();
Expand All @@ -155,45 +174,103 @@ public Optional<Record> lookup(final Map<String, Object> coordinates, Map<String

final String lookupValueColumns = lookupValueColumnsSet.isEmpty() ? "*" : String.join(",", lookupValueColumnsSet);

Tuple<String, Object> cacheLookupKey = new Tuple<>(tableName, key);
// Cache key includes table name and all key values
final List<Object> cacheKey = new ArrayList<>(keyValues.size() + 1);
cacheKey.add(tableName);
cacheKey.addAll(keyValues);

// Not using the function param of cache.get so we can catch and handle the checked exceptions
Record foundRecord = cache.get(cacheLookupKey, k -> null);
Record foundRecord = cache.get(cacheKey, k -> null);

if (foundRecord == null) {
final String selectQuery = "SELECT " + lookupValueColumns + " FROM " + tableName + " WHERE " + lookupKeyColumn + " = ?";
final String whereClause = lookupKeyColumns.stream()
.map(col -> col + " = ?")
.collect(Collectors.joining(" AND "));
final String selectQuery = "SELECT " + lookupValueColumns + " FROM " + tableName + " WHERE " + whereClause;
try (final Connection con = dbcpService.getConnection(context);
final PreparedStatement st = con.prepareStatement(selectQuery)) {

st.setObject(1, key);
ResultSet resultSet = st.executeQuery();
ResultSetRecordSet resultSetRecordSet = new ResultSetRecordSet(resultSet, null, defaultPrecision, defaultScale);
foundRecord = resultSetRecordSet.next();
for (int i = 0; i < keyValues.size(); i++) {
Object value = keyValues.get(i);
try {
final int sqlType = st.getParameterMetaData().getParameterType(i + 1);
if (value instanceof String) {
value = coerceValue((String) value, sqlType);
}
st.setObject(i + 1, value, sqlType);
} catch (final SQLException | NullPointerException e) {
if (value instanceof String) {
value = coerceStringValue((String) value);
}
st.setObject(i + 1, value);
}
}

final ResultSet resultSet = st.executeQuery();
try (final ResultSetRecordSet resultSetRecordSet = new ResultSetRecordSet(resultSet, null, defaultPrecision, defaultScale)) {
foundRecord = resultSetRecordSet.next();
}

// Populate the cache if the record is present
if (foundRecord != null) {
cache.put(cacheLookupKey, foundRecord);
cache.put(cacheKey, foundRecord);
}

} catch (SQLException se) {
throw new LookupFailureException("Error executing SQL statement: " + selectQuery + "for value " + key.toString()
} catch (final SQLException se) {
throw new LookupFailureException("Error executing SQL statement: " + selectQuery + " for values " + keyValues
+ " : " + (se.getCause() == null ? se.getMessage() : se.getCause().getMessage()), se);
} catch (IOException ioe) {
throw new LookupFailureException("Error retrieving result set for SQL statement: " + selectQuery + "for value " + key.toString()
} catch (final IOException ioe) {
throw new LookupFailureException("Error retrieving result set for SQL statement: " + selectQuery + " for values " + keyValues
+ " : " + (ioe.getCause() == null ? ioe.getMessage() : ioe.getCause().getMessage()), ioe);
}
}

return Optional.ofNullable(foundRecord);
}

static Object coerceValue(final String value, final int sqlType) {
switch (sqlType) {
case Types.INTEGER:
case Types.SMALLINT:
case Types.TINYINT:
return Integer.valueOf(value);
case Types.BIGINT:
return Long.valueOf(value);
case Types.FLOAT:
case Types.REAL:
return Float.valueOf(value);
case Types.DOUBLE:
return Double.valueOf(value);
case Types.NUMERIC:
case Types.DECIMAL:
return new java.math.BigDecimal(value);
default:
return value;
}
}

static Object coerceStringValue(final String value) {
try {
return Integer.valueOf(value);
} catch (final NumberFormatException e) {
try {
return Long.valueOf(value);
} catch (final NumberFormatException e2) {
return value;
}
}
}

private static boolean isNotBlank(final String value) {
return value != null && !value.isBlank();
}

@Override
public Set<String> getRequiredKeys() {
return REQUIRED_KEYS;
if (isCompositeKey()) {
return new LinkedHashSet<>(lookupKeyColumns);
}
return Set.of(KEY);
}

@Override
Expand Down
Loading
Loading