Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions bootstrap/sql/migrations/native/1.12.0/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,13 @@ CREATE INDEX idx_test_definition_enabled ON test_definition(enabled);
UPDATE test_definition
SET json = JSON_SET(json, '$.enabled', true)
WHERE json_extract(json, '$.enabled') IS NULL;

-- Migrate termsOfUse from string to object with content and inherited fields
-- This converts existing termsOfUse string values to the new object structure: { "content": "...", "inherited": false }
UPDATE data_contract_entity
SET json = JSON_SET(
json,
'$.termsOfUse',
JSON_OBJECT('content', JSON_UNQUOTE(JSON_EXTRACT(json, '$.termsOfUse')), 'inherited', false)
)
WHERE JSON_TYPE(JSON_EXTRACT(json, '$.termsOfUse')) = 'STRING';
10 changes: 10 additions & 0 deletions bootstrap/sql/migrations/native/1.12.0/postgres/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ UPDATE test_definition
SET json = jsonb_set(json::jsonb, '{enabled}', 'true'::jsonb, true)::json
WHERE json ->> 'enabled' IS NULL;

-- Migrate termsOfUse from string to object with content and inherited fields
-- This converts existing termsOfUse string values to the new object structure: { "content": "...", "inherited": false }
UPDATE data_contract_entity
SET json = jsonb_set(
json::jsonb,
'{termsOfUse}',
jsonb_build_object('content', json ->> 'termsOfUse', 'inherited', false)
)::json
WHERE jsonb_typeof((json::jsonb) -> 'termsOfUse') = 'string';

CREATE UNIQUE INDEX IF NOT EXISTS idx_audit_log_event_change_event_id
ON audit_log_event (change_event_id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,23 @@
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.app.FailureContext;
import org.openmetadata.schema.entity.data.DataContract;
import org.openmetadata.schema.entity.datacontract.DataContractResult;
import org.openmetadata.schema.entity.domains.DataProduct;
import org.openmetadata.schema.system.Stats;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.schema.utils.ResultList;
import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.AbstractNativeApplication;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.DataContractRepository;
import org.openmetadata.service.jdbi3.DataProductRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.util.EntityUtil;
Expand Down Expand Up @@ -48,54 +53,21 @@ public void startApp(JobExecutionContext jobExecutionContext) {

DataContractRepository repository =
(DataContractRepository) Entity.getEntityRepository(Entity.DATA_CONTRACT);
ListFilter filter = new ListFilter();

int limit = 100;
String after = null;
int totalProcessed = 0;
int totalErrors = 0;
boolean hasMore = true;

while (hasMore) {
ResultList<DataContract> dataContracts =
repository.listAfter(null, EntityUtil.Fields.EMPTY_FIELDS, filter, limit, after);
// Phase 1: Validate existing data contracts
LOG.info("Phase 1: Validating existing data contracts");
int[] phase1Results = validateExistingContracts(repository);
totalProcessed += phase1Results[0];
totalErrors += phase1Results[1];

List<DataContract> contractBatch = dataContracts.getData();
LOG.info("Processing batch of {} data contracts", contractBatch.size());

if (nullOrEmpty(contractBatch)) {
LOG.info("No more data contracts to process. Exiting.");
break;
}

for (DataContract dataContract : contractBatch) {
try {
LOG.debug("Validating data contract: {}", dataContract.getFullyQualifiedName());
RestUtil.PutResponse<DataContractResult> validationResponse =
repository.validateContract(dataContract);
DataContractResult validationResult = validationResponse.getEntity();

LOG.debug(
"Validation completed for {}: Status = {}",
dataContract.getFullyQualifiedName(),
validationResult.getContractExecutionStatus());
totalProcessed++;
} catch (Exception e) {
String msg =
String.format(
"Failed to validate data contract %s: %s",
dataContract.getFullyQualifiedName(), e.getMessage());
LOG.error(msg, e);
failureDetails.put(dataContract.getFullyQualifiedName(), msg);
totalErrors++;
}
}
after = dataContracts.getPaging() != null ? dataContracts.getPaging().getAfter() : null;
hasMore = after != null;

setStats(dataContracts.getPaging().getTotal(), totalProcessed, totalErrors);
updateStatsRecord(AppRunRecord.Status.RUNNING);
}
// Phase 2: Process Data Products with contracts to materialize inherited contracts
LOG.info("Phase 2: Processing Data Products to materialize inherited contracts for assets");
int[] phase2Results = processDataProductContracts(repository);
totalProcessed += phase2Results[0];
totalErrors += phase2Results[1];

LOG.info(
"DataContractValidationApp completed. Processed: {}, Errors: {}",
Expand All @@ -110,6 +82,158 @@ public void startApp(JobExecutionContext jobExecutionContext) {
}
}

private int[] validateExistingContracts(DataContractRepository repository) {
ListFilter filter = new ListFilter();
int limit = 100;
String after = null;
int totalProcessed = 0;
int totalErrors = 0;
boolean hasMore = true;

while (hasMore) {
ResultList<DataContract> dataContracts =
repository.listAfter(null, EntityUtil.Fields.EMPTY_FIELDS, filter, limit, after);

List<DataContract> contractBatch = dataContracts.getData();
LOG.info("Processing batch of {} data contracts", contractBatch.size());

if (nullOrEmpty(contractBatch)) {
LOG.info("No more data contracts to process.");
break;
}

for (DataContract dataContract : contractBatch) {
try {
LOG.debug("Validating data contract: {}", dataContract.getFullyQualifiedName());
RestUtil.PutResponse<DataContractResult> validationResponse =
repository.validateContract(dataContract);
DataContractResult validationResult = validationResponse.getEntity();

LOG.debug(
"Validation completed for {}: Status = {}",
dataContract.getFullyQualifiedName(),
validationResult.getContractExecutionStatus());
totalProcessed++;
} catch (Exception e) {
String msg =
String.format(
"Failed to validate data contract %s: %s",
dataContract.getFullyQualifiedName(), e.getMessage());
LOG.error(msg, e);
failureDetails.put(dataContract.getFullyQualifiedName(), msg);
totalErrors++;
}
}
after = dataContracts.getPaging() != null ? dataContracts.getPaging().getAfter() : null;
hasMore = after != null;

setStats(dataContracts.getPaging().getTotal(), totalProcessed, totalErrors);
updateStatsRecord(AppRunRecord.Status.RUNNING);
}

return new int[] {totalProcessed, totalErrors};
}

private int[] processDataProductContracts(DataContractRepository contractRepository) {
int totalProcessed = 0;
int totalErrors = 0;

try {
DataProductRepository dataProductRepository =
(DataProductRepository) Entity.getEntityRepository(Entity.DATA_PRODUCT);

// Use listAll with null filter since data_product_entity doesn't have a deleted column
List<DataProduct> allDataProducts =
dataProductRepository.listAll(
dataProductRepository.getFields("id,fullyQualifiedName"), new ListFilter(null));

LOG.info("Found {} Data Products to process", allDataProducts.size());

for (DataProduct dataProduct : allDataProducts) {
try {
// Check if this Data Product has a contract
DataContract dpContract = contractRepository.getEntityDataContractSafely(dataProduct);
if (dpContract == null) {
continue;
}

LOG.debug(
"Data Product {} has contract {}, checking assets",
dataProduct.getName(),
dpContract.getName());

// Get assets using the paginated API (uses search)
ResultList<EntityReference> assetsResult =
dataProductRepository.getDataProductAssets(dataProduct.getId(), 1000, 0);

if (nullOrEmpty(assetsResult.getData())) {
continue;
}

// Process each asset
for (EntityReference assetRef : assetsResult.getData()) {
try {
// Get the asset entity
EntityInterface asset =
Entity.getEntity(assetRef.getType(), assetRef.getId(), "*", Include.NON_DELETED);

// Check if asset has its own direct (non-inherited) contract
DataContract assetContract = contractRepository.getEntityDataContractSafely(asset);
if (assetContract != null && !Boolean.TRUE.equals(assetContract.getInherited())) {
// Asset has its own direct contract, skip (it was validated in Phase 1)
continue;
}

// Asset only has inherited contract - materialize and validate
LOG.debug(
"Materializing inherited contract for asset {} from Data Product {}",
asset.getName(),
dataProduct.getName());

DataContract materializedContract =
contractRepository.materializeInheritedContract(
asset, dpContract.getName(), "system");

// Get effective contract for validation (includes inherited rules)
DataContract effectiveContract = contractRepository.getEffectiveDataContract(asset);

// Validate using effective contract, store results in materialized contract
RestUtil.PutResponse<DataContractResult> validationResponse =
contractRepository.validateContractWithEffective(
materializedContract, effectiveContract);

LOG.debug(
"Materialized and validated contract for {}: Status = {}",
asset.getName(),
validationResponse.getEntity().getContractExecutionStatus());
totalProcessed++;
} catch (Exception e) {
String msg =
String.format(
"Failed to process asset %s from Data Product %s: %s",
assetRef.getName(), dataProduct.getName(), e.getMessage());
LOG.error(msg, e);
failureDetails.put(assetRef.getFullyQualifiedName(), msg);
totalErrors++;
}
}
} catch (Exception e) {
String msg =
String.format(
"Failed to process Data Product %s: %s", dataProduct.getName(), e.getMessage());
LOG.error(msg, e);
failureDetails.put(dataProduct.getFullyQualifiedName(), msg);
totalErrors++;
}
}
} catch (Exception e) {
LOG.error("Error processing Data Products: {}", e.getMessage(), e);
failureDetails.put("dataProductProcessing", e.getMessage());
}

return new int[] {totalProcessed, totalErrors};
}

private void setStats(Integer totalRecords, Integer successRecords, Integer failedRecords) {
StepStats jobStats =
new StepStats()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3270,11 +3270,11 @@ default String getNameHashColumn() {

@ConnectionAwareSqlQuery(
value =
"SELECT json FROM data_contract_entity WHERE JSON_EXTRACT(json, '$.entity.id') = :entityId AND JSON_EXTRACT(json, '$.entity.type') = :entityType LIMIT 1",
"SELECT json FROM data_contract_entity WHERE JSON_EXTRACT(json, '$.entity.id') = :entityId AND JSON_EXTRACT(json, '$.entity.type') = :entityType AND (JSON_EXTRACT(json, '$.deleted') IS NULL OR JSON_EXTRACT(json, '$.deleted') = false) LIMIT 1",
connectionType = MYSQL)
@ConnectionAwareSqlQuery(
value =
"SELECT json FROM data_contract_entity WHERE json#>>'{entity,id}' = :entityId AND json#>>'{entity,type}' = :entityType LIMIT 1",
"SELECT json FROM data_contract_entity WHERE json#>>'{entity,id}' = :entityId AND json#>>'{entity,type}' = :entityType AND (json->>'deleted' IS NULL OR json->>'deleted' = 'false') LIMIT 1",
connectionType = POSTGRES)
String getContractByEntityId(
@Bind("entityId") String entityId, @Bind("entityType") String entityType);
Expand Down
Loading
Loading