Skip to content
Merged
150 changes: 111 additions & 39 deletions src/index/DeltaTriples.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,46 +22,66 @@
#include "util/Serializer/TripleSerializer.h"

// ____________________________________________________________________________
LocatedTriples::iterator& DeltaTriples::LocatedTripleHandles::forPermutation(
template <bool isInternal>
LocatedTriples::iterator&
DeltaTriples::State<isInternal>::LocatedTripleHandles::forPermutation(
Permutation::Enum permutation) {
return handles_[static_cast<size_t>(permutation)];
}

// ____________________________________________________________________________
void DeltaTriples::clear() {
triplesInserted_.clear();
triplesDeleted_.clear();
ql::ranges::for_each(locatedTriples(), &LocatedTriplesPerBlock::clear);
auto clearImpl = [](auto& state) {
state.triplesInserted_.clear();
state.triplesDeleted_.clear();
ql::ranges::for_each(state.locatedTriples_, &LocatedTriplesPerBlock::clear);
};
clearImpl(state_);
clearImpl(internalState_);
}

// ____________________________________________________________________________
template <bool isInternal>
auto& DeltaTriples::getLocatedTriple() {
if constexpr (isInternal) {
return internalState_.locatedTriples_;
} else {
return locatedTriples();
}
}

// ____________________________________________________________________________
std::vector<DeltaTriples::LocatedTripleHandles>
template <bool isInternal>
std::vector<typename DeltaTriples::State<isInternal>::LocatedTripleHandles>
DeltaTriples::locateAndAddTriples(CancellationHandle cancellationHandle,
ql::span<const IdTriple<0>> triples,
bool insertOrDelete,
ad_utility::timer::TimeTracer& tracer) {
std::array<std::vector<LocatedTriples::iterator>, Permutation::ALL.size()>
constexpr const auto& allPermutations = Permutation::all<isInternal>();
auto& lt = getLocatedTriple<isInternal>();
std::array<std::vector<LocatedTriples::iterator>, allPermutations.size()>
intermediateHandles;
for (auto permutation : Permutation::ALL) {
for (auto permutation : allPermutations) {
tracer.beginTrace(std::string{Permutation::toString(permutation)});
tracer.beginTrace("locateTriples");
auto& perm = index_.getPermutation(permutation);
auto& basePerm = index_.getPermutation(permutation);
auto& perm = isInternal ? basePerm.internalPermutation() : basePerm;
auto locatedTriples = LocatedTriple::locateTriplesInPermutation(
triples, perm.metaData().blockData(), perm.keyOrder(), insertOrDelete,
cancellationHandle);
cancellationHandle->throwIfCancelled();
tracer.endTrace("locateTriples");
tracer.beginTrace("addToLocatedTriples");
intermediateHandles[static_cast<size_t>(permutation)] =
this->locatedTriples()[static_cast<size_t>(permutation)].add(
locatedTriples, tracer);
lt[static_cast<size_t>(permutation)].add(locatedTriples, tracer);
cancellationHandle->throwIfCancelled();
tracer.endTrace("addToLocatedTriples");
tracer.endTrace(Permutation::toString(permutation));
}
tracer.beginTrace("transformHandles");
std::vector<DeltaTriples::LocatedTripleHandles> handles{triples.size()};
for (auto permutation : Permutation::ALL) {
std::vector<typename State<isInternal>::LocatedTripleHandles> handles{
triples.size()};
for (auto permutation : allPermutations) {
for (size_t i = 0; i < triples.size(); i++) {
handles[i].forPermutation(permutation) =
intermediateHandles[static_cast<size_t>(permutation)][i];
Expand All @@ -72,12 +92,14 @@ DeltaTriples::locateAndAddTriples(CancellationHandle cancellationHandle,
}

// ____________________________________________________________________________
void DeltaTriples::eraseTripleInAllPermutations(LocatedTripleHandles& handles) {
template <bool isInternal>
void DeltaTriples::eraseTripleInAllPermutations(
typename State<isInternal>::LocatedTripleHandles& handles) {
auto& lt = getLocatedTriple<isInternal>();
// Erase for all permutations.
for (auto permutation : Permutation::ALL) {
for (auto permutation : Permutation::all<isInternal>()) {
auto ltIter = handles.forPermutation(permutation);
locatedTriples()[static_cast<int>(permutation)].erase(ltIter->blockIndex_,
ltIter);
lt[static_cast<int>(permutation)].erase(ltIter->blockIndex_, ltIter);
}
}

Expand All @@ -93,8 +115,9 @@ void DeltaTriples::insertTriples(CancellationHandle cancellationHandle,
AD_LOG_DEBUG << "Inserting"
<< " " << triples.size()
<< " triples (including idempotent triples)." << std::endl;
modifyTriplesImpl(std::move(cancellationHandle), std::move(triples), true,
triplesInserted_, triplesDeleted_, tracer);
modifyTriplesImpl<false>(std::move(cancellationHandle), std::move(triples),
true, state_.triplesInserted_,
state_.triplesDeleted_, tracer);
}

// ____________________________________________________________________________
Expand All @@ -104,8 +127,35 @@ void DeltaTriples::deleteTriples(CancellationHandle cancellationHandle,
AD_LOG_DEBUG << "Deleting"
<< " " << triples.size()
<< " triples (including idempotent triples)." << std::endl;
modifyTriplesImpl(std::move(cancellationHandle), std::move(triples), false,
triplesDeleted_, triplesInserted_, tracer);
modifyTriplesImpl<false>(std::move(cancellationHandle), std::move(triples),
false, state_.triplesDeleted_,
state_.triplesInserted_, tracer);
}

// ____________________________________________________________________________
void DeltaTriples::insertInternalTriples(
CancellationHandle cancellationHandle, Triples triples,
ad_utility::timer::TimeTracer& tracer) {
AD_LOG_DEBUG << "Inserting"
<< " " << triples.size()
<< " internal triples (including idempotent triples)."
<< std::endl;
modifyTriplesImpl<true>(std::move(cancellationHandle), std::move(triples),
true, internalState_.triplesInserted_,
internalState_.triplesDeleted_, tracer);
}

// ____________________________________________________________________________
void DeltaTriples::deleteInternalTriples(
CancellationHandle cancellationHandle, Triples triples,
ad_utility::timer::TimeTracer& tracer) {
AD_LOG_DEBUG << "Deleting"
<< " " << triples.size()
<< " internal triples (including idempotent triples)."
<< std::endl;
modifyTriplesImpl<true>(std::move(cancellationHandle), std::move(triples),
false, internalState_.triplesDeleted_,
internalState_.triplesInserted_, tracer);
}

// ____________________________________________________________________________
Expand Down Expand Up @@ -164,11 +214,12 @@ void DeltaTriples::rewriteLocalVocabEntriesAndBlankNodes(Triples& triples) {
}

// ____________________________________________________________________________
void DeltaTriples::modifyTriplesImpl(CancellationHandle cancellationHandle,
Triples triples, bool insertOrDelete,
TriplesToHandlesMap& targetMap,
TriplesToHandlesMap& inverseMap,
ad_utility::timer::TimeTracer& tracer) {
template <bool isInternal>
void DeltaTriples::modifyTriplesImpl(
CancellationHandle cancellationHandle, Triples triples, bool insertOrDelete,
typename State<isInternal>::TriplesToHandlesMap& targetMap,
typename State<isInternal>::TriplesToHandlesMap& inverseMap,
ad_utility::timer::TimeTracer& tracer) {
tracer.beginTrace("rewriteLocalVocabEntries");
rewriteLocalVocabEntriesAndBlankNodes(triples);
tracer.endTrace("rewriteLocalVocabEntries");
Expand All @@ -188,15 +239,16 @@ void DeltaTriples::modifyTriplesImpl(CancellationHandle cancellationHandle,
ql::ranges::for_each(triples, [this, &inverseMap](const IdTriple<0>& triple) {
auto handle = inverseMap.find(triple);
if (handle != inverseMap.end()) {
eraseTripleInAllPermutations(handle->second);
eraseTripleInAllPermutations<isInternal>(handle->second);
inverseMap.erase(triple);
}
});
tracer.endTrace("removeInverseTriples");
tracer.beginTrace("locatedAndAdd");

std::vector<LocatedTripleHandles> handles = locateAndAddTriples(
std::move(cancellationHandle), triples, insertOrDelete, tracer);
std::vector<typename State<isInternal>::LocatedTripleHandles> handles =
locateAndAddTriples<isInternal>(std::move(cancellationHandle), triples,
insertOrDelete, tracer);
tracer.endTrace("locatedAndAdd");
tracer.beginTrace("markTriples");

Expand All @@ -215,6 +267,15 @@ LocatedTriplesSnapshot::getLocatedTriplesForPermutation(
return locatedTriplesPerBlock_[static_cast<int>(permutation)];
}

// ____________________________________________________________________________
const LocatedTriplesPerBlock&
LocatedTriplesSnapshot::getInternalLocatedTriplesForPermutation(
Permutation::Enum permutation) const {
AD_CONTRACT_CHECK(permutation == Permutation::PSO ||
permutation == Permutation::POS);
return internalLocatedTriplesPerBlock_[static_cast<int>(permutation)];
}

// ____________________________________________________________________________
SharedLocatedTriplesSnapshot DeltaTriples::getSnapshot() {
// NOTE: Both members of the `LocatedTriplesSnapshot` are copied, but the
Expand All @@ -224,7 +285,8 @@ SharedLocatedTriplesSnapshot DeltaTriples::getSnapshot() {
++nextSnapshotIndex_;
return SharedLocatedTriplesSnapshot{
std::make_shared<LocatedTriplesSnapshot>(LocatedTriplesSnapshot{
locatedTriples(), localVocab_.getLifetimeExtender(), snapshotIndex})};
locatedTriples(), internalState_.locatedTriples_,
localVocab_.getLifetimeExtender(), snapshotIndex})};
}

// ____________________________________________________________________________
Expand Down Expand Up @@ -328,24 +390,34 @@ SharedLocatedTriplesSnapshot DeltaTriplesManager::getCurrentSnapshot() const {
// _____________________________________________________________________________
void DeltaTriples::setOriginalMetadata(
Permutation::Enum permutation,
std::shared_ptr<const std::vector<CompressedBlockMetadata>> metadata) {
locatedTriples()
.at(static_cast<size_t>(permutation))
.setOriginalMetadata(std::move(metadata));
std::shared_ptr<const std::vector<CompressedBlockMetadata>> metadata,
bool setInternalMetadata) {
auto& locatedTriplesPerBlock =
setInternalMetadata
? internalState_.locatedTriples_.at(static_cast<size_t>(permutation))
: locatedTriples().at(static_cast<size_t>(permutation));
locatedTriplesPerBlock.setOriginalMetadata(std::move(metadata));
}

// _____________________________________________________________________________
void DeltaTriples::updateAugmentedMetadata() {
ql::ranges::for_each(locatedTriples(),
&LocatedTriplesPerBlock::updateAugmentedMetadata);
auto update = [](auto& lt) {
ql::ranges::for_each(lt, &LocatedTriplesPerBlock::updateAugmentedMetadata);
};
update(locatedTriples());
update(internalState_.locatedTriples_);
}

// _____________________________________________________________________________
void DeltaTriples::writeToDisk() const {
if (!filenameForPersisting_.has_value()) {
return;
}
auto toRange = [](const TriplesToHandlesMap& map) {
// TODO<RobinTF> Currently this only writes non-internal delta triples to
// disk. The internal triples will be regenerated when importing the rest
// again. In the future we might to also want to explicitly store the internal
// triples.
auto toRange = [](const State<false>::TriplesToHandlesMap& map) {
return map | ql::views::keys |
ql::views::transform(
[](const IdTriple<0>& triple) -> const std::array<Id, 4>& {
Expand All @@ -355,9 +427,9 @@ void DeltaTriples::writeToDisk() const {
};
std::filesystem::path tempPath = filenameForPersisting_.value();
tempPath += ".tmp";
ad_utility::serializeIds(
tempPath, localVocab_,
std::array{toRange(triplesDeleted_), toRange(triplesInserted_)});
ad_utility::serializeIds(tempPath, localVocab_,
std::array{toRange(state_.triplesDeleted_),
toRange(state_.triplesInserted_)});
std::filesystem::rename(tempPath, filenameForPersisting_.value());
}

Expand Down
Loading
Loading