From 9ae3b014572b19cbf58d303ab1e10b1e71010412 Mon Sep 17 00:00:00 2001 From: Sai Aditya Mukkamala Date: Thu, 30 May 2024 12:21:23 +0530 Subject: [PATCH] draft for error propagation for API calls --- .../servicenow/ServiceNowBaseConfig.java | 2 +- .../apiclient/ServiceNowAPIException.java | 33 +++ .../ServiceNowTableAPIClientImpl.java | 200 ++++++++++-------- .../connector/ServiceNowConnector.java | 103 +++++---- .../servicenow/restapi/RestAPIClient.java | 99 +++++++-- .../servicenow/restapi/RestAPIResponse.java | 114 ++++++---- .../source/ServiceNowInputFormat.java | 49 +++-- .../source/ServiceNowMultiInputFormat.java | 51 +++-- .../source/ServiceNowMultiRecordReader.java | 26 ++- .../servicenow/restapi/ExperimentTest.java | 102 +++++++++ .../ServiceNowMultiRecordReaderTest.java | 107 ++++++---- 11 files changed, 601 insertions(+), 285 deletions(-) create mode 100644 src/main/java/io/cdap/plugin/servicenow/apiclient/ServiceNowAPIException.java create mode 100644 src/test/java/io/cdap/plugin/servicenow/restapi/ExperimentTest.java diff --git a/src/main/java/io/cdap/plugin/servicenow/ServiceNowBaseConfig.java b/src/main/java/io/cdap/plugin/servicenow/ServiceNowBaseConfig.java index eebb3861..945a9e39 100644 --- a/src/main/java/io/cdap/plugin/servicenow/ServiceNowBaseConfig.java +++ b/src/main/java/io/cdap/plugin/servicenow/ServiceNowBaseConfig.java @@ -137,7 +137,7 @@ public void validateTable(String tableName, SourceValueType valueType, FailureCo // Get the response JSON and fetch the header X-Total-Count. Set the value to recordCount requestBuilder.setResponseHeaders(ServiceNowConstants.HEADER_NAME_TOTAL_COUNT); - apiResponse = serviceNowTableAPIClient.executeGet(requestBuilder.build()); + apiResponse = serviceNowTableAPIClient.executeGetWithRetries(requestBuilder.build()); if (serviceNowTableAPIClient.parseResponseToResultListOfMap(apiResponse.getResponseBody()).isEmpty()) { // Removed config property as in case of MultiSource, only first table error was populating. collector.addFailure("Table: " + tableName + " is empty.", ""); diff --git a/src/main/java/io/cdap/plugin/servicenow/apiclient/ServiceNowAPIException.java b/src/main/java/io/cdap/plugin/servicenow/apiclient/ServiceNowAPIException.java new file mode 100644 index 00000000..253a32dd --- /dev/null +++ b/src/main/java/io/cdap/plugin/servicenow/apiclient/ServiceNowAPIException.java @@ -0,0 +1,33 @@ +package io.cdap.plugin.servicenow.apiclient; + +import org.apache.http.HttpResponse; + +import javax.annotation.Nullable; + +public class ServiceNowAPIException extends Exception { + + @Nullable private final HttpResponse httpResponse; + private final boolean isErrorRetryable; + + public ServiceNowAPIException( + Throwable t, @Nullable HttpResponse httpResponse, boolean isErrorRetryable) { + super(t); + this.httpResponse = httpResponse; + this.isErrorRetryable = isErrorRetryable; + } + + public ServiceNowAPIException( + String message, @Nullable HttpResponse httpResponse, boolean isErrorRetryable) { + super(message); + this.httpResponse = httpResponse; + this.isErrorRetryable = isErrorRetryable; + } + + public HttpResponse getHttpResponse() { + return httpResponse; + } + + public boolean isErrorRetryable() { + return isErrorRetryable; + } +} diff --git a/src/main/java/io/cdap/plugin/servicenow/apiclient/ServiceNowTableAPIClientImpl.java b/src/main/java/io/cdap/plugin/servicenow/apiclient/ServiceNowTableAPIClientImpl.java index 2e2dcc63..c52e5e54 100644 --- a/src/main/java/io/cdap/plugin/servicenow/apiclient/ServiceNowTableAPIClientImpl.java +++ b/src/main/java/io/cdap/plugin/servicenow/apiclient/ServiceNowTableAPIClientImpl.java @@ -58,13 +58,12 @@ import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; -/** - * Implementation class for ServiceNow Table API. - */ +/** Implementation class for ServiceNow Table API. */ public class ServiceNowTableAPIClientImpl extends RestAPIClient { private static final Logger LOG = LoggerFactory.getLogger(ServiceNowTableAPIClientImpl.class); - private static final String DATE_RANGE_TEMPLATE = "%sBETWEENjavascript:gs.dateGenerate('%s','start')" + - "@javascript:gs.dateGenerate('%s','end')"; + private static final String DATE_RANGE_TEMPLATE = + "%sBETWEENjavascript:gs.dateGenerate('%s','start')" + + "@javascript:gs.dateGenerate('%s','end')"; private static final String FIELD_CREATED_ON = "sys_created_on"; private static final String FIELD_UPDATED_ON = "sys_updated_on"; private static final String OAUTH_URL_TEMPLATE = "%s/oauth_token.do"; @@ -77,23 +76,27 @@ public ServiceNowTableAPIClientImpl(ServiceNowConnectorConfig conf) { } public String getAccessToken() throws OAuthSystemException, OAuthProblemException { - return generateAccessToken(String.format(OAUTH_URL_TEMPLATE, conf.getRestApiEndpoint()), - conf.getClientId(), - conf.getClientSecret(), conf.getUser(), conf.getPassword()); + return generateAccessToken( + String.format(OAUTH_URL_TEMPLATE, conf.getRestApiEndpoint()), + conf.getClientId(), + conf.getClientSecret(), + conf.getUser(), + conf.getPassword()); } - /** - * Retries to get the access token and returns the same when OAuthSystemException is thrown - */ + /** Retries to get the access token and returns the same when OAuthSystemException is thrown */ public String getAccessTokenRetryableMode() throws ExecutionException, RetryException { Callable fetchToken = this::getAccessToken; - Retryer retryer = RetryerBuilder.newBuilder() - .retryIfExceptionOfType(OAuthSystemException.class) - .withWaitStrategy(WaitStrategies.fixedWait(ServiceNowConstants.BASE_DELAY, TimeUnit.MILLISECONDS)) - .withStopStrategy(StopStrategies.stopAfterAttempt(ServiceNowConstants.MAX_NUMBER_OF_RETRY_ATTEMPTS)) - .build(); + Retryer retryer = + RetryerBuilder.newBuilder() + .retryIfExceptionOfType(OAuthSystemException.class) + .withWaitStrategy( + WaitStrategies.fixedWait(ServiceNowConstants.BASE_DELAY, TimeUnit.MILLISECONDS)) + .withStopStrategy( + StopStrategies.stopAfterAttempt(ServiceNowConstants.MAX_NUMBER_OF_RETRY_ATTEMPTS)) + .build(); return retryer.call(fetchToken); } @@ -104,18 +107,24 @@ public String getAccessTokenRetryableMode() throws ExecutionException, RetryExce * @param tableName The ServiceNow table name * @param valueType The value type * @param startDate The start date - * @param endDate The end date - * @param offset The number of records to skip - * @param limit The number of records to be fetched + * @param endDate The end date + * @param offset The number of records to skip + * @param limit The number of records to be fetched * @return The list of Map; each Map representing a table row */ - public List> fetchTableRecords(String tableName, SourceValueType valueType, String startDate, - String endDate, int offset, int limit) throws IOException { - ServiceNowTableAPIRequestBuilder requestBuilder = new ServiceNowTableAPIRequestBuilder( - this.conf.getRestApiEndpoint(), tableName, false) - .setExcludeReferenceLink(true) - .setDisplayValue(valueType) - .setLimit(limit); + public List> fetchTableRecords( + String tableName, + SourceValueType valueType, + String startDate, + String endDate, + int offset, + int limit) + throws IOException, ServiceNowAPIException { + ServiceNowTableAPIRequestBuilder requestBuilder = + new ServiceNowTableAPIRequestBuilder(this.conf.getRestApiEndpoint(), tableName, false) + .setExcludeReferenceLink(true) + .setDisplayValue(valueType) + .setLimit(limit); if (offset > 0) { requestBuilder.setOffset(offset); @@ -126,7 +135,7 @@ public List> fetchTableRecords(String tableName, SourceValue try { String accessToken = getAccessToken(); requestBuilder.setAuthHeader(accessToken); - RestAPIResponse apiResponse = executeGet(requestBuilder.build()); + RestAPIResponse apiResponse = executeGetWithRetries(requestBuilder.build()); return parseResponseToResultListOfMap(apiResponse.getResponseBody()); } catch (OAuthSystemException e) { throw new RetryableException("Authentication error occurred", e); @@ -135,8 +144,8 @@ public List> fetchTableRecords(String tableName, SourceValue } } - private void applyDateRangeToRequest(ServiceNowTableAPIRequestBuilder requestBuilder, String startDate, - String endDate) { + private void applyDateRangeToRequest( + ServiceNowTableAPIRequestBuilder requestBuilder, String startDate, String endDate) { String dateRange = generateDateRangeQuery(startDate, endDate); if (!Strings.isNullOrEmpty(dateRange)) { requestBuilder.setQuery(dateRange); @@ -150,8 +159,10 @@ private String generateDateRangeQuery(String startDate, String endDate) { String dateRange = ""; try { - String createdOnDateRange = String.format(DATE_RANGE_TEMPLATE, FIELD_CREATED_ON, startDate, endDate); - String updatedOnDateRange = String.format(DATE_RANGE_TEMPLATE, FIELD_UPDATED_ON, startDate, endDate); + String createdOnDateRange = + String.format(DATE_RANGE_TEMPLATE, FIELD_CREATED_ON, startDate, endDate); + String updatedOnDateRange = + String.format(DATE_RANGE_TEMPLATE, FIELD_UPDATED_ON, startDate, endDate); dateRange = String.format("%s^OR%s", createdOnDateRange, updatedOnDateRange); } catch (Exception e) { LOG.error("Error in generateDateRangeQuery, hence ignoring the date range", e); @@ -167,12 +178,10 @@ private int getRecordCountFromHeader(RestAPIResponse apiResponse) { public List> parseResponseToResultListOfMap(String responseBody) { - JsonObject jo = GSON.fromJson(responseBody, JsonObject.class); JsonArray ja = jo.getAsJsonArray(ServiceNowConstants.RESULT); - Type type = new TypeToken>>() { - }.getType(); + Type type = new TypeToken>>() {}.getType(); return GSON.fromJson(ja, type); } @@ -184,11 +193,14 @@ private String getErrorMessage(String responseBody) { String errorMessage = error.get(ServiceNowConstants.MESSAGE).getAsString(); String errorDetail = error.get(ServiceNowConstants.ERROR_DETAIL).getAsString(); if (errorMessage != null && errorDetail != null) { - return String.format("%s:%s", - jo.getAsJsonObject(ServiceNowConstants.ERROR).get(ServiceNowConstants.MESSAGE) - .getAsString(), - jo.getAsJsonObject(ServiceNowConstants.ERROR).get(ServiceNowConstants.ERROR_DETAIL) - .getAsString()); + return String.format( + "%s:%s", + jo.getAsJsonObject(ServiceNowConstants.ERROR) + .get(ServiceNowConstants.MESSAGE) + .getAsString(), + jo.getAsJsonObject(ServiceNowConstants.ERROR) + .get(ServiceNowConstants.ERROR_DETAIL) + .getAsString()); } } return null; @@ -199,36 +211,48 @@ private String getErrorMessage(String responseBody) { } /** - * Attempt four times with an exponential delay of 120 seconds to fetch the list of records from ServiceNow table when - * RetryableException is thrown . + * Attempt four times with an exponential delay of 120 seconds to fetch the list of records from + * ServiceNow table when RetryableException is thrown . * * @param tableName The ServiceNow table name * @param valueType The value type * @param startDate The start date - * @param endDate The end date - * @param offset The number of records to skip - * @param limit The number of records to be fetched + * @param endDate The end date + * @param offset The number of records to skip + * @param limit The number of records to be fetched * @return The list of Map; each Map representing a table row */ - public List> fetchTableRecordsRetryableMode(String tableName, SourceValueType valueType, - String startDate, String endDate, int offset, - int limit) throws IOException { + public List> fetchTableRecordsRetryableMode( + String tableName, + SourceValueType valueType, + String startDate, + String endDate, + int offset, + int limit) + throws IOException { final List> results = new ArrayList<>(); - Callable fetchRecords = () -> { - results.addAll(fetchTableRecords(tableName, valueType, startDate, endDate, offset, limit)); - return true; - }; - - Retryer retryer = RetryerBuilder.newBuilder() - .retryIfExceptionOfType(RetryableException.class) - .withWaitStrategy(WaitStrategies.exponentialWait(ServiceNowConstants.WAIT_TIME, TimeUnit.MILLISECONDS)) - .withStopStrategy(StopStrategies.stopAfterAttempt(ServiceNowConstants.MAX_NUMBER_OF_RETRY_ATTEMPTS)) - .build(); + Callable fetchRecords = + () -> { + results.addAll( + fetchTableRecords(tableName, valueType, startDate, endDate, offset, limit)); + return true; + }; + + Retryer retryer = + RetryerBuilder.newBuilder() + .retryIfExceptionOfType(RetryableException.class) + .withWaitStrategy( + WaitStrategies.exponentialWait( + ServiceNowConstants.WAIT_TIME, TimeUnit.MILLISECONDS)) + .withStopStrategy( + StopStrategies.stopAfterAttempt(ServiceNowConstants.MAX_NUMBER_OF_RETRY_ATTEMPTS)) + .build(); try { retryer.call(fetchRecords); } catch (RetryException | ExecutionException e) { - throw new IOException(String.format("Data Recovery failed for batch %s to %s.", offset, (offset + limit)), e); + throw new IOException( + String.format("Data Recovery failed for batch %s to %s.", offset, (offset + limit)), e); } return results; @@ -246,8 +270,11 @@ public Schema fetchTableSchema(String tableName, FailureCollector collector) { schema = fetchTableSchema(tableName); } catch (Exception e) { LOG.error("Failed to fetch schema on table {}", tableName, e); - collector.addFailure(String.format("Connection failed. Unable to fetch schema for table: %s. Cause: %s", - tableName, e.getMessage()), null); + collector.addFailure( + String.format( + "Connection failed. Unable to fetch schema for table: %s. Cause: %s", + tableName, e.getMessage()), + null); } return schema; } @@ -266,7 +293,7 @@ public SchemaResponse parseSchemaResponse(String responseBody) { * @throws OAuthSystemException */ public Schema fetchTableSchema(String tableName) - throws OAuthProblemException, OAuthSystemException, IOException { + throws OAuthProblemException, OAuthSystemException, IOException, ServiceNowAPIException { return fetchTableSchema(tableName, getAccessToken()); } @@ -277,14 +304,15 @@ public Schema fetchTableSchema(String tableName) * @param accessToken Access Token to use * @return schema for given ServiceNow table */ - public Schema fetchTableSchema(String tableName, String accessToken) throws IOException { - ServiceNowTableAPIRequestBuilder requestBuilder = new ServiceNowTableAPIRequestBuilder( - this.conf.getRestApiEndpoint(), tableName, true) - .setExcludeReferenceLink(true); + public Schema fetchTableSchema(String tableName, String accessToken) + throws IOException, ServiceNowAPIException { + ServiceNowTableAPIRequestBuilder requestBuilder = + new ServiceNowTableAPIRequestBuilder(this.conf.getRestApiEndpoint(), tableName, true) + .setExcludeReferenceLink(true); RestAPIResponse apiResponse; requestBuilder.setAuthHeader(accessToken); - apiResponse = executeGet(requestBuilder.build()); + apiResponse = executeGetWithRetries(requestBuilder.build()); SchemaResponse response = parseSchemaResponse(apiResponse.getResponseBody()); List columns = new ArrayList<>(); @@ -306,7 +334,7 @@ public Schema fetchTableSchema(String tableName, String accessToken) throws IOEx * @throws OAuthSystemException */ public int getTableRecordCount(String tableName) - throws OAuthProblemException, OAuthSystemException, IOException { + throws OAuthProblemException, OAuthSystemException, IOException, ServiceNowAPIException { return getTableRecordCount(tableName, getAccessToken()); } @@ -318,16 +346,17 @@ public int getTableRecordCount(String tableName) * @return the table record count * @throws IOException */ - public int getTableRecordCount(String tableName, String accessToken) throws IOException { - ServiceNowTableAPIRequestBuilder requestBuilder = new ServiceNowTableAPIRequestBuilder( - this.conf.getRestApiEndpoint(), tableName, false) - .setExcludeReferenceLink(true) - .setDisplayValue(SourceValueType.SHOW_DISPLAY_VALUE) - .setLimit(1); + public int getTableRecordCount(String tableName, String accessToken) + throws ServiceNowAPIException { + ServiceNowTableAPIRequestBuilder requestBuilder = + new ServiceNowTableAPIRequestBuilder(this.conf.getRestApiEndpoint(), tableName, false) + .setExcludeReferenceLink(true) + .setDisplayValue(SourceValueType.SHOW_DISPLAY_VALUE) + .setLimit(1); RestAPIResponse apiResponse = null; requestBuilder.setResponseHeaders(ServiceNowConstants.HEADER_NAME_TOTAL_COUNT); requestBuilder.setAuthHeader(accessToken); - apiResponse = executeGet(requestBuilder.build()); + apiResponse = executeGetWithRetries(requestBuilder.build()); return getRecordCountFromHeader(apiResponse); } @@ -335,12 +364,13 @@ public int getTableRecordCount(String tableName, String accessToken) throws IOEx * Create a new record in the ServiceNow Table * * @param tableName ServiceNow Table name - * @param entity Details of the Record to be created - * @description This function is being used in end-to-end (e2e) tests to fetch a record from the ServiceNow Table. + * @param entity Details of the Record to be created + * @description This function is being used in end-to-end (e2e) tests to fetch a record from the + * ServiceNow Table. */ public String createRecord(String tableName, HttpEntity entity) throws IOException { - ServiceNowTableAPIRequestBuilder requestBuilder = new ServiceNowTableAPIRequestBuilder( - this.conf.getRestApiEndpoint(), tableName, false); + ServiceNowTableAPIRequestBuilder requestBuilder = + new ServiceNowTableAPIRequestBuilder(this.conf.getRestApiEndpoint(), tableName, false); String systemID; RestAPIResponse apiResponse = null; try { @@ -359,8 +389,8 @@ public String createRecord(String tableName, HttpEntity entity) throws IOExcepti } private String getSystemId(RestAPIResponse restAPIResponse) { - CreateRecordAPIResponse apiResponse = GSON.fromJson(restAPIResponse.getResponseBody(), - CreateRecordAPIResponse.class); + CreateRecordAPIResponse apiResponse = + GSON.fromJson(restAPIResponse.getResponseBody(), CreateRecordAPIResponse.class); return apiResponse.getResult().get(ServiceNowConstants.SYSTEM_ID).toString(); } @@ -372,16 +402,16 @@ private String getSystemId(RestAPIResponse restAPIResponse) { * @param query The query */ public Map getRecordFromServiceNowTable(String tableName, String query) - throws OAuthProblemException, OAuthSystemException, IOException { + throws OAuthProblemException, OAuthSystemException, ServiceNowAPIException { - ServiceNowTableAPIRequestBuilder requestBuilder = new ServiceNowTableAPIRequestBuilder( - this.conf.getRestApiEndpoint(), tableName, false) - .setQuery(query); + ServiceNowTableAPIRequestBuilder requestBuilder = + new ServiceNowTableAPIRequestBuilder(this.conf.getRestApiEndpoint(), tableName, false) + .setQuery(query); RestAPIResponse restAPIResponse; String accessToken = getAccessToken(); requestBuilder.setAuthHeader(accessToken); - restAPIResponse = executeGet(requestBuilder.build()); + restAPIResponse = executeGetWithRetries(requestBuilder.build()); APIResponse apiResponse = GSON.fromJson(restAPIResponse.getResponseBody(), APIResponse.class); return apiResponse.getResult().get(0); diff --git a/src/main/java/io/cdap/plugin/servicenow/connector/ServiceNowConnector.java b/src/main/java/io/cdap/plugin/servicenow/connector/ServiceNowConnector.java index 5f1061c3..b4a3c4d4 100644 --- a/src/main/java/io/cdap/plugin/servicenow/connector/ServiceNowConnector.java +++ b/src/main/java/io/cdap/plugin/servicenow/connector/ServiceNowConnector.java @@ -40,6 +40,7 @@ import io.cdap.plugin.common.ConfigUtil; import io.cdap.plugin.common.Constants; import io.cdap.plugin.common.ReferenceNames; +import io.cdap.plugin.servicenow.apiclient.ServiceNowAPIException; import io.cdap.plugin.servicenow.apiclient.ServiceNowTableAPIClientImpl; import io.cdap.plugin.servicenow.apiclient.ServiceNowTableAPIRequestBuilder; import io.cdap.plugin.servicenow.restapi.RestAPIResponse; @@ -61,10 +62,7 @@ import javax.annotation.Nullable; import javax.ws.rs.core.MediaType; - -/** - * ServiceNow Connector Plugin - */ +/** ServiceNow Connector Plugin */ @Plugin(type = Connector.PLUGIN_TYPE) @Name(ServiceNowConstants.PLUGIN_NAME) @Description("Connection to access data in Servicenow tables.") @@ -88,21 +86,21 @@ public void test(ConnectorContext connectorContext) throws ValidationException { } @Override - public BrowseDetail browse(ConnectorContext connectorContext, BrowseRequest browseRequest) throws IOException { - ServiceNowTableAPIClientImpl serviceNowTableAPIClient = new ServiceNowTableAPIClientImpl(config); + public BrowseDetail browse(ConnectorContext connectorContext, BrowseRequest browseRequest) + throws IOException { + ServiceNowTableAPIClientImpl serviceNowTableAPIClient = + new ServiceNowTableAPIClientImpl(config); try { String accessToken = serviceNowTableAPIClient.getAccessToken(); return browse(connectorContext, accessToken); - } catch (OAuthSystemException | OAuthProblemException e) { + } catch (OAuthSystemException | OAuthProblemException | ServiceNowAPIException e) { throw new IOException(e); } } - /** - * Browse Details for the given AccessToken. - */ - public BrowseDetail browse(ConnectorContext connectorContext, - String accessToken) throws IOException { + /** Browse Details for the given AccessToken. */ + public BrowseDetail browse(ConnectorContext connectorContext, String accessToken) + throws ServiceNowAPIException { int count = 0; FailureCollector collector = connectorContext.getFailureCollector(); config.validateCredentialsFields(collector); @@ -112,10 +110,12 @@ public BrowseDetail browse(ConnectorContext connectorContext, for (int i = 0; i < table.length; i++) { String name = table[i].getName(); String label = table[i].getLabel(); - BrowseEntity.Builder entity = (BrowseEntity.builder(name, name, ENTITY_TYPE_TABLE). - canBrowse(false).canSample(true)); - entity.addProperty(LABEL_NAME, BrowseEntityPropertyValue.builder(label, BrowseEntityPropertyValue. - PropertyType.STRING).build()); + BrowseEntity.Builder entity = + (BrowseEntity.builder(name, name, ENTITY_TYPE_TABLE).canBrowse(false).canSample(true)); + entity.addProperty( + LABEL_NAME, + BrowseEntityPropertyValue.builder(label, BrowseEntityPropertyValue.PropertyType.STRING) + .build()); browseDetailBuilder.addEntity(entity.build()); count++; } @@ -125,18 +125,21 @@ public BrowseDetail browse(ConnectorContext connectorContext, /** * @return the list of tables. */ - private TableList listTables(String accessToken) throws IOException { - ServiceNowTableAPIRequestBuilder requestBuilder = new ServiceNowTableAPIRequestBuilder( - config.getRestApiEndpoint(), OBJECT_TABLE_LIST, false); + private TableList listTables(String accessToken) throws ServiceNowAPIException { + ServiceNowTableAPIRequestBuilder requestBuilder = + new ServiceNowTableAPIRequestBuilder(config.getRestApiEndpoint(), OBJECT_TABLE_LIST, false); requestBuilder.setAuthHeader(accessToken); requestBuilder.setAcceptHeader(MediaType.APPLICATION_JSON); requestBuilder.setContentTypeHeader(MediaType.APPLICATION_JSON); - ServiceNowTableAPIClientImpl serviceNowTableAPIClient = new ServiceNowTableAPIClientImpl(config); - RestAPIResponse apiResponse = serviceNowTableAPIClient.executeGet(requestBuilder.build()); + ServiceNowTableAPIClientImpl serviceNowTableAPIClient = + new ServiceNowTableAPIClientImpl(config); + RestAPIResponse apiResponse = + serviceNowTableAPIClient.executeGetWithRetries(requestBuilder.build()); return GSON.fromJson(apiResponse.getResponseBody(), TableList.class); } - public ConnectorSpec generateSpec(ConnectorContext connectorContext, ConnectorSpecRequest connectorSpecRequest) { + public ConnectorSpec generateSpec( + ConnectorContext connectorContext, ConnectorSpecRequest connectorSpecRequest) { ConnectorSpec.Builder specBuilder = ConnectorSpec.builder(); Map properties = new HashMap<>(); properties.put(io.cdap.plugin.common.ConfigUtil.NAME_USE_CONNECTION, "true"); @@ -144,45 +147,51 @@ public ConnectorSpec generateSpec(ConnectorContext connectorContext, ConnectorSp String tableName = connectorSpecRequest.getPath(); if (tableName != null) { properties.put(ServiceNowConstants.PROPERTY_TABLE_NAME, tableName); - properties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(tableName)); + properties.put( + Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(tableName)); } Schema schema = getSchema(tableName); if (schema != null) { specBuilder.setSchema(schema); } - return specBuilder.addRelatedPlugin(new PluginSpec(ServiceNowConstants.PLUGIN_NAME, BatchSource.PLUGIN_TYPE, - properties)) - .addRelatedPlugin(new PluginSpec(ServiceNowConstants.PLUGIN_NAME, BatchSink.PLUGIN_TYPE, properties)).build(); + return specBuilder + .addRelatedPlugin( + new PluginSpec(ServiceNowConstants.PLUGIN_NAME, BatchSource.PLUGIN_TYPE, properties)) + .addRelatedPlugin( + new PluginSpec(ServiceNowConstants.PLUGIN_NAME, BatchSink.PLUGIN_TYPE, properties)) + .build(); } @Override - public List sample(ConnectorContext connectorContext, SampleRequest sampleRequest) - throws IOException { + public List sample( + ConnectorContext connectorContext, SampleRequest sampleRequest) throws IOException { String table = sampleRequest.getPath(); if (table == null) { throw new IllegalArgumentException("Path should contain table name."); } try { return getTableData(table, sampleRequest.getLimit()); - } catch (OAuthProblemException | OAuthSystemException e) { - throw new IOException("Unable to fetch the data."); + } catch (OAuthProblemException | OAuthSystemException | ServiceNowAPIException e) { + throw new IOException("Unable to fetch the data.", e); } } private List getTableData(String tableName, int limit) - throws OAuthProblemException, OAuthSystemException, IOException { - ServiceNowTableAPIRequestBuilder requestBuilder = new ServiceNowTableAPIRequestBuilder( - config.getRestApiEndpoint(), tableName, false) - .setExcludeReferenceLink(true) - .setDisplayValue(SourceValueType.SHOW_DISPLAY_VALUE) - .setLimit(limit); - ServiceNowTableAPIClientImpl serviceNowTableAPIClient = new ServiceNowTableAPIClientImpl(config); + throws OAuthProblemException, OAuthSystemException, ServiceNowAPIException { + ServiceNowTableAPIRequestBuilder requestBuilder = + new ServiceNowTableAPIRequestBuilder(config.getRestApiEndpoint(), tableName, false) + .setExcludeReferenceLink(true) + .setDisplayValue(SourceValueType.SHOW_DISPLAY_VALUE) + .setLimit(limit); + ServiceNowTableAPIClientImpl serviceNowTableAPIClient = + new ServiceNowTableAPIClientImpl(config); String accessToken = serviceNowTableAPIClient.getAccessToken(); requestBuilder.setAuthHeader(accessToken); requestBuilder.setResponseHeaders(ServiceNowConstants.HEADER_NAME_TOTAL_COUNT); - RestAPIResponse apiResponse = serviceNowTableAPIClient.executeGet(requestBuilder.build()); - List> result = serviceNowTableAPIClient.parseResponseToResultListOfMap - (apiResponse.getResponseBody()); + RestAPIResponse apiResponse = + serviceNowTableAPIClient.executeGetWithRetries(requestBuilder.build()); + List> result = + serviceNowTableAPIClient.parseResponseToResultListOfMap(apiResponse.getResponseBody()); List recordList = new ArrayList<>(); Schema schema = getSchema(tableName); if (schema != null) { @@ -191,23 +200,25 @@ private List getTableData(String tableName, int limit) StructuredRecord.Builder recordBuilder = StructuredRecord.builder(schema); for (Schema.Field field : tableFields) { String fieldName = field.getName(); - ServiceNowRecordConverter.convertToValue(fieldName, field.getSchema(), result.get(i), recordBuilder); + ServiceNowRecordConverter.convertToValue( + fieldName, field.getSchema(), result.get(i), recordBuilder); } StructuredRecord structuredRecord = recordBuilder.build(); recordList.add(structuredRecord); } } return recordList; - } @Nullable private Schema getSchema(String tableName) { SourceQueryMode mode = SourceQueryMode.TABLE; - List tableInfo = ServiceNowInputFormat.fetchTableInfo(mode, config, tableName, - null); - Schema schema = tableInfo.stream().findFirst().isPresent() ? tableInfo.stream().findFirst().get().getSchema() : - null; + List tableInfo = + ServiceNowInputFormat.fetchTableInfo(mode, config, tableName, null); + Schema schema = + tableInfo.stream().findFirst().isPresent() + ? tableInfo.stream().findFirst().get().getSchema() + : null; return schema; } } diff --git a/src/main/java/io/cdap/plugin/servicenow/restapi/RestAPIClient.java b/src/main/java/io/cdap/plugin/servicenow/restapi/RestAPIClient.java index 03581df8..4572df8b 100644 --- a/src/main/java/io/cdap/plugin/servicenow/restapi/RestAPIClient.java +++ b/src/main/java/io/cdap/plugin/servicenow/restapi/RestAPIClient.java @@ -16,9 +16,18 @@ package io.cdap.plugin.servicenow.restapi; +import com.github.rholder.retry.Attempt; +import com.github.rholder.retry.RetryException; +import com.github.rholder.retry.Retryer; +import com.github.rholder.retry.RetryerBuilder; +import com.github.rholder.retry.StopStrategies; +import com.github.rholder.retry.WaitStrategies; +import com.google.common.base.Predicate; import com.jcraft.jsch.IO; import io.cdap.plugin.servicenow.apiclient.NonRetryableException; import io.cdap.plugin.servicenow.apiclient.RetryableException; +import io.cdap.plugin.servicenow.apiclient.ServiceNowAPIException; +import io.cdap.plugin.servicenow.util.ServiceNowConstants; import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; import org.apache.http.client.methods.CloseableHttpResponse; @@ -34,6 +43,7 @@ import org.apache.oltu.oauth2.common.exception.OAuthProblemException; import org.apache.oltu.oauth2.common.exception.OAuthSystemException; import org.apache.oltu.oauth2.common.message.types.GrantType; +import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,10 +53,11 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; -/** - * An abstract class to call Rest API. - */ +/** An abstract class to call Rest API. */ public abstract class RestAPIClient { private static final Logger LOG = LoggerFactory.getLogger(RestAPIClient.class); @@ -67,6 +78,60 @@ public RestAPIResponse executeGet(RestAPIRequest request) throws IOException { } } + /** + * Executes the Rest API request and returns the response with retries. + * + * @param request the Rest API request. + * @return an instance of RestAPIResponse object. + * @throws ServiceNowAPIException + */ + public RestAPIResponse executeGetWithRetries(RestAPIRequest request) + throws ServiceNowAPIException { + Callable callable = () -> executeGet(request); + return handleExecution(getRetryer(), callable); + } + + private RestAPIResponse handleExecution( + Retryer retryer, Callable callable) + throws ServiceNowAPIException { + try { + RestAPIResponse response = retryer.call(callable); + // Execution is successful + if (response.hasException()) { + // Execution is successful and returned non retryable error + throw response.getException(); + } + return response; + } catch (RetryException e) { + // Execution successful, returned retryable error and retries exhausted + Attempt apiResponseAttempt = e.getLastFailedAttempt(); + if (apiResponseAttempt.hasException()) { + // last attempt has execution failure + throw new ServiceNowAPIException(apiResponseAttempt.getExceptionCause(), null, false); + } else { + // last execution attempt was successful but has an error response + // if execution is successful, it's expected to have a exception in response object + RestAPIResponse response = (RestAPIResponse) apiResponseAttempt.getResult(); + throw response.getException(); + } + } catch (ExecutionException e) { + // Execution failed with error + throw new ServiceNowAPIException(e, null, false); + } + } + + private Retryer getRetryer() { + return RetryerBuilder.newBuilder() + .retryIfResult( + restAPIResponse -> + restAPIResponse.hasException() && restAPIResponse.getException().isErrorRetryable()) + .withWaitStrategy( + WaitStrategies.exponentialWait(ServiceNowConstants.WAIT_TIME, TimeUnit.MILLISECONDS)) + .withStopStrategy( + StopStrategies.stopAfterAttempt(ServiceNowConstants.MAX_NUMBER_OF_RETRY_ATTEMPTS)) + .build(); + } + /** * Executes the Rest API request and returns the response. * @@ -78,7 +143,8 @@ public RestAPIResponse executePost(RestAPIRequest request) throws IOException { request.getHeaders().entrySet().forEach(e -> httpPost.addHeader(e.getKey(), e.getValue())); httpPost.setEntity(request.getEntity()); - // We're retrying all transport exceptions while executing the HTTP POST method and the generic transport + // We're retrying all transport exceptions while executing the HTTP POST method and the generic + // transport // exceptions in HttpClient are represented by the standard java.io.IOException class // https://hc.apache.org/httpclient-legacy/exception-handling.html try (CloseableHttpClient httpClient = HttpClientBuilder.create().build()) { @@ -99,20 +165,25 @@ public RestAPIResponse executePost(RestAPIRequest request) throws IOException { * @throws OAuthSystemException * @throws OAuthProblemException */ - protected String generateAccessToken(String restApiEndpoint, String clientId, String clientSecret, String user, - String password) throws OAuthSystemException, OAuthProblemException { + protected String generateAccessToken( + String restApiEndpoint, String clientId, String clientSecret, String user, String password) + throws OAuthSystemException, OAuthProblemException { String token = "NO-VALUE"; OAuthClient client = new OAuthClient(new URLConnectionClient()); - OAuthClientRequest request = OAuthClientRequest.tokenLocation(restApiEndpoint) - .setGrantType(GrantType.PASSWORD) - .setClientId(clientId) - .setClientSecret(clientSecret) - .setUsername(user) - .setPassword(password) - .buildBodyMessage(); + OAuthClientRequest request = + OAuthClientRequest.tokenLocation(restApiEndpoint) + .setGrantType(GrantType.PASSWORD) + .setClientId(clientId) + .setClientSecret(clientSecret) + .setUsername(user) + .setPassword(password) + .buildBodyMessage(); - token = client.accessToken(request, OAuth.HttpMethod.POST, OAuthJSONAccessTokenResponse.class).getAccessToken(); + token = + client + .accessToken(request, OAuth.HttpMethod.POST, OAuthJSONAccessTokenResponse.class) + .getAccessToken(); return token; } } diff --git a/src/main/java/io/cdap/plugin/servicenow/restapi/RestAPIResponse.java b/src/main/java/io/cdap/plugin/servicenow/restapi/RestAPIResponse.java index be0a0b1a..4b59e9f0 100644 --- a/src/main/java/io/cdap/plugin/servicenow/restapi/RestAPIResponse.java +++ b/src/main/java/io/cdap/plugin/servicenow/restapi/RestAPIResponse.java @@ -20,12 +20,14 @@ import com.google.gson.JsonObject; import io.cdap.plugin.servicenow.apiclient.NonRetryableException; import io.cdap.plugin.servicenow.apiclient.RetryableException; +import io.cdap.plugin.servicenow.apiclient.ServiceNowAPIException; import io.cdap.plugin.servicenow.util.ServiceNowConstants; import org.apache.http.Header; import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; import org.apache.http.util.EntityUtils; +import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -37,63 +39,78 @@ import java.util.Set; import java.util.stream.Collectors; -/** - * Pojo class to capture the API response. - */ +/** Pojo class to capture the API response. */ public class RestAPIResponse { private static final Gson GSON = new Gson(); - private static final String HTTP_ERROR_MESSAGE = "Http call to ServiceNow instance returned status code %d."; - private static final String REST_ERROR_MESSAGE = "Rest Api response has errors. Error message: %s."; - private static final Set SUCCESS_CODES = new HashSet<>(Collections.singletonList(HttpStatus.SC_OK)); - private static final Set RETRYABLE_CODES = new HashSet<>(Arrays.asList(429, - HttpStatus.SC_BAD_GATEWAY, - HttpStatus.SC_SERVICE_UNAVAILABLE, - HttpStatus.SC_REQUEST_TIMEOUT, - HttpStatus.SC_GATEWAY_TIMEOUT)); - private final int httpStatus; + private static final String HTTP_ERROR_MESSAGE = + "Http call to ServiceNow instance returned status code %d."; + private static final String REST_ERROR_MESSAGE = + "Rest Api response has errors. Error message: %s."; + private static final Set SUCCESS_CODES = + new HashSet<>(Collections.singletonList(HttpStatus.SC_OK)); + private static final Set RETRYABLE_CODES = + new HashSet<>( + Arrays.asList( + 429, + HttpStatus.SC_BAD_GATEWAY, + HttpStatus.SC_SERVICE_UNAVAILABLE, + HttpStatus.SC_REQUEST_TIMEOUT, + HttpStatus.SC_GATEWAY_TIMEOUT)); private final Map headers; - private final String responseBody; + @Nullable private final String responseBody; + @Nullable private final ServiceNowAPIException exception; - public RestAPIResponse(int httpStatus, Map headers, String responseBody) { - this.httpStatus = httpStatus; + public RestAPIResponse( + Map headers, + @Nullable String responseBody, + @Nullable ServiceNowAPIException exception) { this.headers = headers; this.responseBody = responseBody; + this.exception = exception; } /** - * Parses HttpResponse into RestAPIResponse object when no errors occur. - * Throws a {@link RetryableException} if the error is retryable. - * Throws an {@link NonRetryableException} if the error is not retryable. + * Parses HttpResponse into RestAPIResponse object when no errors occur. Throws a {@link + * RetryableException} if the error is retryable. Throws an {@link NonRetryableException} if the + * error is not retryable. * * @param httpResponse The HttpResponse object to parse * @param headerNames The list of header names to be extracted * @return An instance of RestAPIResponse object. */ - public static RestAPIResponse parse(HttpResponse httpResponse, String... headerNames) throws IOException { - validateHttpResponse(httpResponse); - List headerNameList = headerNames == null ? Collections.emptyList() : Arrays.asList(headerNames); - int httpStatus = httpResponse.getStatusLine().getStatusCode(); + public static RestAPIResponse parse(HttpResponse httpResponse, String... headerNames) { + List headerNameList = + headerNames == null ? Collections.emptyList() : Arrays.asList(headerNames); Map headers = new HashMap<>(); if (!headerNameList.isEmpty()) { - headers.putAll(Arrays.stream(httpResponse.getAllHeaders()) - .filter(o -> headerNameList.contains(o.getName())) - .collect(Collectors.toMap(Header::getName, Header::getValue))); + headers.putAll( + Arrays.stream(httpResponse.getAllHeaders()) + .filter(o -> headerNameList.contains(o.getName())) + .collect(Collectors.toMap(Header::getName, Header::getValue))); + } + ServiceNowAPIException serviceNowAPIException = validateHttpResponse(httpResponse); + if (serviceNowAPIException != null) { + return new RestAPIResponse(headers, null, serviceNowAPIException); + } + + String responseBody = null; + try { + responseBody = EntityUtils.toString(httpResponse.getEntity()); + } catch (IOException e) { + return new RestAPIResponse(headers, null, new ServiceNowAPIException(e, httpResponse, false)); } - String responseBody = EntityUtils.toString(httpResponse.getEntity()); - validateRestApiResponse(responseBody); - return new RestAPIResponse(httpStatus, headers, responseBody); + + serviceNowAPIException = validateRestApiResponse(httpResponse, responseBody); + return new RestAPIResponse(headers, responseBody, serviceNowAPIException); } public static RestAPIResponse parse(HttpResponse httpResponse) throws IOException { return parse(httpResponse, new String[0]); } - public int getHttpStatus() { - return httpStatus; - } - - private static void validateRestApiResponse(String responseBody) { + private static ServiceNowAPIException validateRestApiResponse( + HttpResponse response, String responseBody) { JsonObject jo = GSON.fromJson(responseBody, JsonObject.class); // check if status is "failure" String status = null; @@ -101,33 +118,48 @@ private static void validateRestApiResponse(String responseBody) { status = jo.get(ServiceNowConstants.STATUS).getAsString(); } if (!ServiceNowConstants.FAILURE.equals(status)) { - return; + return null; } // check if failure is retryable - String errorMessage = jo.getAsJsonObject(ServiceNowConstants.ERROR).get(ServiceNowConstants.MESSAGE).getAsString(); + String errorMessage = + jo.getAsJsonObject(ServiceNowConstants.ERROR) + .get(ServiceNowConstants.MESSAGE) + .getAsString(); if (errorMessage.contains(ServiceNowConstants.MAXIMUM_EXECUTION_TIME_EXCEEDED)) { - throw new RetryableException(String.format(REST_ERROR_MESSAGE, errorMessage)); + return new ServiceNowAPIException( + String.format(REST_ERROR_MESSAGE, errorMessage), response, true); } else { - throw new NonRetryableException(String.format(REST_ERROR_MESSAGE, errorMessage)); + return new ServiceNowAPIException( + String.format(REST_ERROR_MESSAGE, errorMessage), response, false); } } - private static void validateHttpResponse(HttpResponse response) { + private static ServiceNowAPIException validateHttpResponse(HttpResponse response) { int code = response.getStatusLine().getStatusCode(); if (SUCCESS_CODES.contains(code)) { - return; + return null; } if (RETRYABLE_CODES.contains(code)) { - throw new RetryableException(String.format(HTTP_ERROR_MESSAGE, code)); + return new ServiceNowAPIException(String.format(HTTP_ERROR_MESSAGE, code), response, true); } - throw new NonRetryableException(String.format(HTTP_ERROR_MESSAGE, code)); + return new ServiceNowAPIException(String.format(HTTP_ERROR_MESSAGE, code), response, false); } public Map getHeaders() { return headers; } + @Nullable public String getResponseBody() { return responseBody; } + + @Nullable + public ServiceNowAPIException getException() { + return exception; + } + + public boolean hasException() { + return exception != null; + } } diff --git a/src/main/java/io/cdap/plugin/servicenow/source/ServiceNowInputFormat.java b/src/main/java/io/cdap/plugin/servicenow/source/ServiceNowInputFormat.java index 25a2b8b3..2a046bc2 100644 --- a/src/main/java/io/cdap/plugin/servicenow/source/ServiceNowInputFormat.java +++ b/src/main/java/io/cdap/plugin/servicenow/source/ServiceNowInputFormat.java @@ -18,6 +18,7 @@ import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.plugin.servicenow.apiclient.ServiceNowAPIException; import io.cdap.plugin.servicenow.apiclient.ServiceNowTableAPIClientImpl; import io.cdap.plugin.servicenow.connector.ServiceNowConnectorConfig; import io.cdap.plugin.servicenow.util.ServiceNowConstants; @@ -42,37 +43,38 @@ import java.util.List; import javax.annotation.Nullable; -/** - * ServiceNow input format. - */ +/** ServiceNow input format. */ public class ServiceNowInputFormat extends InputFormat { private static final Logger LOG = LoggerFactory.getLogger(ServiceNowInputFormat.class); /** - * Updates the jobConfig with the ServiceNow table information, which will then be read in getSplit() function. + * Updates the jobConfig with the ServiceNow table information, which will then be read in + * getSplit() function. * * @param jobConfig the job configuration - * @param mode the query mode - * @param conf the database conf + * @param mode the query mode + * @param conf the database conf * @return Collection of ServiceNowTableInfo containing table and schema. */ - public static List setInput(Configuration jobConfig, SourceQueryMode mode, - ServiceNowSourceConfig conf) { + public static List setInput( + Configuration jobConfig, SourceQueryMode mode, ServiceNowSourceConfig conf) { ServiceNowJobConfiguration jobConf = new ServiceNowJobConfiguration(jobConfig); jobConf.setPluginConfiguration(conf); // Depending on conf value fetch the list of fields for each table and create schema object // return the schema object for each table as ServiceNowTableInfo - List tableInfos = fetchTableInfo(mode, conf.getConnection(), conf.getTableName(), - conf.getApplicationName()); + List tableInfos = + fetchTableInfo(mode, conf.getConnection(), conf.getTableName(), conf.getApplicationName()); jobConf.setTableInfos(tableInfos); return tableInfos; } - public static List fetchTableInfo(SourceQueryMode mode, ServiceNowConnectorConfig conf, - @Nullable String tableName, - @Nullable SourceApplication application) { + public static List fetchTableInfo( + SourceQueryMode mode, + ServiceNowConnectorConfig conf, + @Nullable String tableName, + @Nullable SourceApplication application) { // When mode = Table, fetch details from the table name provided in plugin config if (mode == SourceQueryMode.TABLE) { ServiceNowTableInfo tableInfo = getTableMetaData(tableName, conf); @@ -95,7 +97,8 @@ public static List fetchTableInfo(SourceQueryMode mode, Ser return tableInfos; } - private static ServiceNowTableInfo getTableMetaData(String tableName, ServiceNowConnectorConfig conf) { + private static ServiceNowTableInfo getTableMetaData( + String tableName, ServiceNowConnectorConfig conf) { // Call API to fetch first record from the table ServiceNowTableAPIClientImpl restApi = new ServiceNowTableAPIClientImpl(conf); @@ -104,9 +107,12 @@ private static ServiceNowTableInfo getTableMetaData(String tableName, ServiceNow try { schema = restApi.fetchTableSchema(tableName); recordCount = restApi.getTableRecordCount(tableName); - } catch (OAuthProblemException | OAuthSystemException | IOException e) { - throw new RuntimeException(String.format("Error in fetching table metadata due to reason: %s", e.getMessage()), - e); + } catch (OAuthProblemException + | OAuthSystemException + | IOException + | ServiceNowAPIException e) { + throw new RuntimeException( + String.format("Error in fetching table metadata due to reason: %s", e.getMessage()), e); } return new ServiceNowTableInfo(tableName, schema, recordCount); } @@ -153,10 +159,11 @@ public List getSplits(Configuration configuration) { } @Override - public RecordReader createRecordReader(InputSplit inputSplit, - TaskAttemptContext taskAttemptContext) - throws IOException, InterruptedException { - ServiceNowJobConfiguration jobConfig = new ServiceNowJobConfiguration(taskAttemptContext.getConfiguration()); + public RecordReader createRecordReader( + InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { + ServiceNowJobConfiguration jobConfig = + new ServiceNowJobConfiguration(taskAttemptContext.getConfiguration()); ServiceNowSourceConfig pluginConf = jobConfig.getPluginConf(); return new ServiceNowRecordReader(pluginConf); } diff --git a/src/main/java/io/cdap/plugin/servicenow/source/ServiceNowMultiInputFormat.java b/src/main/java/io/cdap/plugin/servicenow/source/ServiceNowMultiInputFormat.java index cadefe35..3b32d931 100644 --- a/src/main/java/io/cdap/plugin/servicenow/source/ServiceNowMultiInputFormat.java +++ b/src/main/java/io/cdap/plugin/servicenow/source/ServiceNowMultiInputFormat.java @@ -19,6 +19,7 @@ import com.google.common.base.Strings; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.plugin.servicenow.apiclient.ServiceNowAPIException; import io.cdap.plugin.servicenow.apiclient.ServiceNowTableAPIClientImpl; import io.cdap.plugin.servicenow.connector.ServiceNowConnectorConfig; import io.cdap.plugin.servicenow.util.ServiceNowConstants; @@ -44,35 +45,36 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -/** - * ServiceNow input format. - */ +/** ServiceNow input format. */ public class ServiceNowMultiInputFormat extends InputFormat { private static final Logger LOG = LoggerFactory.getLogger(ServiceNowMultiInputFormat.class); /** - * Updates the jobConfig with the ServiceNow table information, which will then be read in getSplit() function. + * Updates the jobConfig with the ServiceNow table information, which will then be read in + * getSplit() function. * * @param jobConfig the job configuration - * @param conf the database conf + * @param conf the database conf * @return Collection of ServiceNowTableInfo containing table and schema. */ - public static Set setInput(Configuration jobConfig, - ServiceNowMultiSourceConfig conf) { + public static Set setInput( + Configuration jobConfig, ServiceNowMultiSourceConfig conf) { ServiceNowJobConfiguration jobConf = new ServiceNowJobConfiguration(jobConfig); jobConf.setMultiSourcePluginConfiguration(conf); // Depending on conf value fetch the list of fields for each table and create schema object // return the schema object for each table as ServiceNowTableInfo - Set tableInfos = fetchTablesInfo(conf.getConnection(), conf.getTableNames()); + Set tableInfos = + fetchTablesInfo(conf.getConnection(), conf.getTableNames()); jobConf.setTableInfos(tableInfos.stream().collect(Collectors.toList())); return tableInfos; } - static Set fetchTablesInfo(ServiceNowConnectorConfig conf, String tableNames) { + static Set fetchTablesInfo( + ServiceNowConnectorConfig conf, String tableNames) { Set tablesInfos = new LinkedHashSet<>(); @@ -88,7 +90,8 @@ static Set fetchTablesInfo(ServiceNowConnectorConfig conf, return tablesInfos; } - private static ServiceNowTableInfo getTableMetaData(String tableName, ServiceNowConnectorConfig conf) { + private static ServiceNowTableInfo getTableMetaData( + String tableName, ServiceNowConnectorConfig conf) { // Call API to fetch first record from the table ServiceNowTableAPIClientImpl restApi = new ServiceNowTableAPIClientImpl(conf); @@ -97,7 +100,10 @@ private static ServiceNowTableInfo getTableMetaData(String tableName, ServiceNow try { schema = restApi.fetchTableSchema(tableName); recordCount = restApi.getTableRecordCount(tableName); - } catch (OAuthProblemException | OAuthSystemException | IOException e) { + } catch (OAuthProblemException + | OAuthSystemException + | IOException + | ServiceNowAPIException e) { throw new RuntimeException(e); } LOG.debug("table {}, rows = {}", tableName, recordCount); @@ -106,16 +112,18 @@ private static ServiceNowTableInfo getTableMetaData(String tableName, ServiceNow public static Set getList(String value) { return Strings.isNullOrEmpty(value) - ? Collections.emptySet() - : Stream.of(value.split(",")) - .map(String::trim) - .filter(name -> !name.isEmpty()) - .collect(Collectors.toSet()); + ? Collections.emptySet() + : Stream.of(value.split(",")) + .map(String::trim) + .filter(name -> !name.isEmpty()) + .collect(Collectors.toSet()); } @Override - public List getSplits(JobContext jobContext) throws IOException, InterruptedException { - ServiceNowJobConfiguration jobConfig = new ServiceNowJobConfiguration(jobContext.getConfiguration()); + public List getSplits(JobContext jobContext) + throws IOException, InterruptedException { + ServiceNowJobConfiguration jobConfig = + new ServiceNowJobConfiguration(jobContext.getConfiguration()); int pageSize = jobConfig.getPluginConf().getPageSize().intValue(); List tableInfos = jobConfig.getTableInfos(); List resultSplits = new ArrayList<>(); @@ -140,9 +148,10 @@ public List getSplits(JobContext jobContext) throws IOException, Int } @Override - public RecordReader createRecordReader(InputSplit inputSplit, - TaskAttemptContext taskAttemptContext) { - ServiceNowJobConfiguration jobConfig = new ServiceNowJobConfiguration(taskAttemptContext.getConfiguration()); + public RecordReader createRecordReader( + InputSplit inputSplit, TaskAttemptContext taskAttemptContext) { + ServiceNowJobConfiguration jobConfig = + new ServiceNowJobConfiguration(taskAttemptContext.getConfiguration()); ServiceNowMultiSourceConfig pluginConf = jobConfig.getMultiSourcePluginConf(); return new ServiceNowMultiRecordReader(pluginConf); diff --git a/src/main/java/io/cdap/plugin/servicenow/source/ServiceNowMultiRecordReader.java b/src/main/java/io/cdap/plugin/servicenow/source/ServiceNowMultiRecordReader.java index 364485a4..ccbe8abb 100644 --- a/src/main/java/io/cdap/plugin/servicenow/source/ServiceNowMultiRecordReader.java +++ b/src/main/java/io/cdap/plugin/servicenow/source/ServiceNowMultiRecordReader.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.plugin.servicenow.apiclient.ServiceNowAPIException; import io.cdap.plugin.servicenow.apiclient.ServiceNowTableAPIClientImpl; import io.cdap.plugin.servicenow.connector.ServiceNowRecordConverter; import io.cdap.plugin.servicenow.util.ServiceNowConstants; @@ -33,9 +34,7 @@ import java.util.ArrayList; import java.util.List; -/** - * Record reader that reads the entire contents of a ServiceNow table. - */ +/** Record reader that reads the entire contents of a ServiceNow table. */ public class ServiceNowMultiRecordReader extends ServiceNowBaseRecordReader { private final ServiceNowMultiSourceConfig multiSourcePluginConf; @@ -84,8 +83,7 @@ public StructuredRecord getCurrentValue() throws IOException { try { for (Schema.Field field : tableFields) { String fieldName = field.getName(); - ServiceNowRecordConverter.convertToValue(fieldName, field.getSchema(), row, - recordBuilder); + ServiceNowRecordConverter.convertToValue(fieldName, field.getSchema(), row, recordBuilder); } } catch (Exception e) { throw new IOException("Error decoding row from table " + tableName, e); @@ -96,10 +94,14 @@ public StructuredRecord getCurrentValue() throws IOException { @VisibleForTesting void fetchData() throws IOException { // Get the table data - results = restApi.fetchTableRecordsRetryableMode(tableName, multiSourcePluginConf.getValueType(), - multiSourcePluginConf.getStartDate(), - multiSourcePluginConf.getEndDate(), split.getOffset(), - multiSourcePluginConf.getPageSize()); + results = + restApi.fetchTableRecordsRetryableMode( + tableName, + multiSourcePluginConf.getValueType(), + multiSourcePluginConf.getStartDate(), + multiSourcePluginConf.getEndDate(), + split.getOffset(), + multiSourcePluginConf.getPageSize()); iterator = results.iterator(); } @@ -112,9 +114,11 @@ private void fetchSchema(ServiceNowTableAPIClientImpl restApi) { List schemaFields = new ArrayList<>(tableFields); schemaFields.add(Schema.Field.of(tableNameField, Schema.of(Schema.Type.STRING))); schema = Schema.recordOf(tableName, schemaFields); - } catch (OAuthProblemException | OAuthSystemException | IOException e) { + } catch (OAuthProblemException + | OAuthSystemException + | IOException + | ServiceNowAPIException e) { throw new RuntimeException(e); } } - } diff --git a/src/test/java/io/cdap/plugin/servicenow/restapi/ExperimentTest.java b/src/test/java/io/cdap/plugin/servicenow/restapi/ExperimentTest.java new file mode 100644 index 00000000..16cb4ee2 --- /dev/null +++ b/src/test/java/io/cdap/plugin/servicenow/restapi/ExperimentTest.java @@ -0,0 +1,102 @@ +package io.cdap.plugin.servicenow.restapi; + +import com.github.rholder.retry.Attempt; +import com.github.rholder.retry.RetryException; +import com.github.rholder.retry.Retryer; +import com.github.rholder.retry.RetryerBuilder; +import com.github.rholder.retry.StopStrategies; +import com.github.rholder.retry.WaitStrategies; +import com.google.common.base.Predicate; +import io.cdap.plugin.servicenow.util.ServiceNowConstants; +import org.jetbrains.annotations.Nullable; +import org.junit.Test; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +public class ExperimentTest { + + public Boolean isRetryable = true; + public String errorMessage = "Error Handling request"; + public int cnt = 0; + + @Test + public void mainTest() throws TestException { + Callable func = + () -> { + return getResponse(errorMessage); + }; + Retryer retryer = getRetryer(); + try { + TestResponse response = retryer.call(func); + // Execution is successful + if (response.exception != null) { + // Execution successful but returned non retryable error + throw new TestException(response.exception); + } + } catch (RetryException e) { + // Execution successful but returned retryable error + Attempt attempt = e.getLastFailedAttempt(); + if (attempt.hasResult()) { + TestResponse response = (TestResponse) attempt.getResult(); + throw response.exception; + } else if (attempt.hasException()) { + throw new TestException(attempt.getExceptionCause()); + } + } catch (ExecutionException e) { + // Execution failed with error + System.out.println("Huh"); + throw new RuntimeException(e); + } + } + + public TestResponse getResponse(String message) throws InterruptedException { + System.out.println("Running request"); + cnt++; + //Thread.sleep(5*1000); + if (cnt == 5) { + throw new RuntimeException("runtime sssss"); + } + return new TestResponse(isRetryable, new TestException(message)); + } + + public Retryer getRetryer() { + Retryer retryer = + RetryerBuilder.newBuilder() + .retryIfResult( + new Predicate() { + @Override + public boolean apply(@Nullable TestResponse testResponse) { + return testResponse.exception != null && testResponse.isRetryable; + } + }) + .withWaitStrategy( + WaitStrategies.exponentialWait( + ServiceNowConstants.WAIT_TIME, TimeUnit.MILLISECONDS)) + .withStopStrategy( + StopStrategies.stopAfterAttempt(ServiceNowConstants.MAX_NUMBER_OF_RETRY_ATTEMPTS)) + .build(); + return retryer; + } + + public class TestResponse { + public Boolean isRetryable; + public TestException exception; + + public TestResponse(Boolean isRetryable, TestException exception) { + this.isRetryable = isRetryable; + this.exception = exception; + } + } + + public class TestException extends Exception { + public TestException(String message) { + super(message); + } + + public TestException(Throwable t) { + super(t); + } + } +} diff --git a/src/test/java/io/cdap/plugin/servicenow/source/ServiceNowMultiRecordReaderTest.java b/src/test/java/io/cdap/plugin/servicenow/source/ServiceNowMultiRecordReaderTest.java index 7614e2ed..33dd308b 100644 --- a/src/test/java/io/cdap/plugin/servicenow/source/ServiceNowMultiRecordReaderTest.java +++ b/src/test/java/io/cdap/plugin/servicenow/source/ServiceNowMultiRecordReaderTest.java @@ -20,6 +20,7 @@ import io.cdap.cdap.api.data.format.UnexpectedFormatException; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.api.plugin.PluginProperties; +import io.cdap.plugin.servicenow.apiclient.ServiceNowAPIException; import io.cdap.plugin.servicenow.apiclient.ServiceNowTableAPIClientImpl; import io.cdap.plugin.servicenow.apiclient.ServiceNowTableDataResponse; import io.cdap.plugin.servicenow.connector.ServiceNowRecordConverter; @@ -49,28 +50,30 @@ public class ServiceNowMultiRecordReaderTest { private static final String USER = "user"; private static final String PASSWORD = "password"; - @Rule - public ExpectedException thrown = ExpectedException.none(); + @Rule public ExpectedException thrown = ExpectedException.none(); private ServiceNowMultiSourceConfig serviceNowMultiSourceConfig; private ServiceNowMultiRecordReader serviceNowMultiRecordReader; @Before public void initializeTests() { - serviceNowMultiSourceConfig = Mockito.spy(new ServiceNowSourceConfigHelper.ConfigBuilder() - .setReferenceName("referenceName") - .setRestApiEndpoint(REST_API_ENDPOINT) - .setUser(USER) - .setPassword(PASSWORD) - .setClientId(CLIENT_ID) - .setClientSecret(CLIENT_SECRET) - .setTableNames("sys_user") - .setValueType("Actual") - .setStartDate("2021-01-01") - .setEndDate("2022-02-18") - .setTableNameField("tablename") - .buildMultiSource()); - - serviceNowMultiRecordReader = Mockito.spy(new ServiceNowMultiRecordReader(serviceNowMultiSourceConfig)); + serviceNowMultiSourceConfig = + Mockito.spy( + new ServiceNowSourceConfigHelper.ConfigBuilder() + .setReferenceName("referenceName") + .setRestApiEndpoint(REST_API_ENDPOINT) + .setUser(USER) + .setPassword(PASSWORD) + .setClientId(CLIENT_ID) + .setClientSecret(CLIENT_SECRET) + .setTableNames("sys_user") + .setValueType("Actual") + .setStartDate("2021-01-01") + .setEndDate("2022-02-18") + .setTableNameField("tablename") + .buildMultiSource()); + + serviceNowMultiRecordReader = + Mockito.spy(new ServiceNowMultiRecordReader(serviceNowMultiSourceConfig)); } @Test @@ -79,8 +82,9 @@ public void testConstructor() throws IOException { Assert.assertEquals("user", serviceNowMultiSourceConfig.getConnection().getUser()); Assert.assertEquals("sys_user", serviceNowMultiSourceConfig.getTableNames()); Assert.assertEquals("2021-01-01", serviceNowMultiSourceConfig.getStartDate()); - Assert.assertEquals("https://ven05127.service-now.com", serviceNowMultiSourceConfig.getConnection() - .getRestApiEndpoint()); + Assert.assertEquals( + "https://ven05127.service-now.com", + serviceNowMultiSourceConfig.getConnection().getRestApiEndpoint()); Assert.assertEquals("referenceName", serviceNowMultiSourceConfig.getReferenceName()); Assert.assertEquals("2022-02-18", serviceNowMultiSourceConfig.getEndDate()); PluginProperties properties = serviceNowMultiSourceConfig.getProperties(); @@ -91,8 +95,9 @@ public void testConstructor() throws IOException { @Test(expected = IllegalStateException.class) public void testConvertToValueInvalidFieldType() { - Schema fieldSchema = Schema.recordOf("record", Schema.Field.of("TimeField", - Schema.of(Schema.LogicalType.TIMESTAMP_MILLIS))); + Schema fieldSchema = + Schema.recordOf( + "record", Schema.Field.of("TimeField", Schema.of(Schema.LogicalType.TIMESTAMP_MILLIS))); StructuredRecord.Builder recordBuilder = StructuredRecord.builder(fieldSchema); Map map = new HashMap<>(); map.put("TimeField", "value"); @@ -141,34 +146,41 @@ public void testFetchData() throws IOException { ServiceNowTableAPIClientImpl restApi = Mockito.mock(ServiceNowTableAPIClientImpl.class); try { Mockito.when(restApi.fetchTableSchema(tableName)) - .thenReturn(Schema.recordOf(Schema.Field.of("calendar_integration", Schema.of(Schema.Type.STRING)))); + .thenReturn( + Schema.recordOf( + Schema.Field.of("calendar_integration", Schema.of(Schema.Type.STRING)))); serviceNowMultiRecordReader.initialize(split, null); - } catch (RuntimeException | OAuthProblemException | OAuthSystemException e) { + } catch (RuntimeException + | OAuthProblemException + | OAuthSystemException + | ServiceNowAPIException e) { Assert.assertTrue(e instanceof RuntimeException); } Mockito.doNothing().when(serviceNowMultiRecordReader).fetchData(); Collections.singletonList(new Object()); - serviceNowMultiRecordReader.iterator = Collections.singletonList(Collections.singletonMap("key", new String())). - iterator(); + serviceNowMultiRecordReader.iterator = + Collections.singletonList(Collections.singletonMap("key", new String())).iterator(); Assert.assertTrue(serviceNowMultiRecordReader.nextKeyValue()); } @Test(expected = IOException.class) - public void testFetchDataOnInvalidTable() throws IOException, OAuthProblemException, OAuthSystemException { - serviceNowMultiSourceConfig = ServiceNowSourceConfigHelper.newConfigBuilder() - .setReferenceName("referenceName") - .setRestApiEndpoint(REST_API_ENDPOINT) - .setUser(USER) - .setPassword(PASSWORD) - .setClientId(CLIENT_ID) - .setClientSecret(CLIENT_SECRET) - .setTableNames("") - .setValueType("Actual") - .setStartDate("2021-01-01") - .setEndDate("2022-02-18") - .setPageSize(10) - .setTableNameField("tablename") - .buildMultiSource(); + public void testFetchDataOnInvalidTable() + throws IOException, OAuthProblemException, OAuthSystemException, ServiceNowAPIException { + serviceNowMultiSourceConfig = + ServiceNowSourceConfigHelper.newConfigBuilder() + .setReferenceName("referenceName") + .setRestApiEndpoint(REST_API_ENDPOINT) + .setUser(USER) + .setPassword(PASSWORD) + .setClientId(CLIENT_ID) + .setClientSecret(CLIENT_SECRET) + .setTableNames("") + .setValueType("Actual") + .setStartDate("2021-01-01") + .setEndDate("2022-02-18") + .setPageSize(10) + .setTableNameField("tablename") + .buildMultiSource(); String tableName = serviceNowMultiSourceConfig.getTableNames(); ServiceNowTableAPIClientImpl restApi = Mockito.mock(ServiceNowTableAPIClientImpl.class); @@ -184,16 +196,21 @@ public void testFetchDataOnInvalidTable() throws IOException, OAuthProblemExcept map.put("sys_updated_by", "system"); map.put("sys_created_on", "2019-04-05 21:09:12"); results.add(map); - restApi.fetchTableRecords(tableName, serviceNowMultiSourceConfig.getValueType(), - serviceNowMultiSourceConfig.getStartDate(), serviceNowMultiSourceConfig.getEndDate(), - split.getOffset(), - serviceNowMultiSourceConfig.getPageSize()); + restApi.fetchTableRecords( + tableName, + serviceNowMultiSourceConfig.getValueType(), + serviceNowMultiSourceConfig.getStartDate(), + serviceNowMultiSourceConfig.getEndDate(), + split.getOffset(), + serviceNowMultiSourceConfig.getPageSize()); ServiceNowTableDataResponse response = new ServiceNowTableDataResponse(); response.setResult(results); try { Mockito.when(restApi.fetchTableSchema(tableName)) - .thenReturn(Schema.recordOf(Schema.Field.of("calendar_integration", Schema.of(Schema.Type.STRING)))); + .thenReturn( + Schema.recordOf( + Schema.Field.of("calendar_integration", Schema.of(Schema.Type.STRING)))); serviceNowMultiRecordReader.initialize(split, null); } catch (RuntimeException | OAuthProblemException | OAuthSystemException e) { Assert.assertTrue(e instanceof RuntimeException);