Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions morango/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,10 @@ class RecordMaxCounter(AbstractCounter):

store_model = models.ForeignKey(Store, on_delete=models.CASCADE)

@property
def unique_key(self):
return f"{self.instance_id}:{self.store_model_id}"

class Meta:
unique_together = ("store_model", "instance_id")

Expand Down
10 changes: 10 additions & 0 deletions morango/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import inspect
import sys
from collections import OrderedDict
from typing import Generator

from django.db.models import QuerySet
from django.db.models.fields.related import ForeignKey

from morango.constants import transfer_stages
Expand Down Expand Up @@ -82,6 +84,14 @@ def get_models(self, profile):
self.check_models_ready(profile)
return list(self.profile_models.get(profile, {}).values())

def get_model_querysets(self, profile) -> Generator[QuerySet, None, None]:
"""
Method for future enhancement to iterate over model's and their querysets in a fashion
(particularly, an order) that is aware of FK dependencies.
"""
for model in self.get_models(profile):
yield model.syncing_objects.all()

def _insert_model_in_dependency_order(self, model, profile):
# When we add models to be synced, we need to make sure
# that models that depend on other models are synced AFTER
Expand Down
26 changes: 7 additions & 19 deletions morango/sync/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,47 +6,35 @@
from morango.constants import transfer_statuses
from morango.registry import session_middleware
from morango.sync.operations import _deserialize_from_store
from morango.sync.operations import _serialize_into_store
from morango.sync.operations import OperationLogger
from morango.sync.stream.serialize import serialize_into_store
from morango.sync.utils import SyncSignalGroup
from morango.utils import _assert


logger = logging.getLogger(__name__)


def _self_referential_fk(klass_model):
"""
Return whether this model has a self ref FK, and the name for the field
"""
for f in klass_model._meta.concrete_fields:
if f.related_model:
if issubclass(klass_model, f.related_model):
return f.attname
return None


class MorangoProfileController(object):
def __init__(self, profile):
_assert(profile, "profile needs to be defined.")
self.profile = profile

def serialize_into_store(self, filter=None):
def serialize_into_store(self, sync_filter=None):
"""
Takes data from app layer and serializes the models into the store.
"""
with OperationLogger("Serializing records", "Serialization complete"):
_serialize_into_store(self.profile, filter=filter)
serialize_into_store(self.profile, sync_filter=sync_filter)

def deserialize_from_store(self, skip_erroring=False, filter=None):
def deserialize_from_store(self, skip_erroring=False, sync_filter=None):
"""
Takes data from the store and integrates into the application.
"""
with OperationLogger("Deserializing records", "Deserialization complete"):
# we first serialize to avoid deserialization merge conflicts
_serialize_into_store(self.profile, filter=filter)
serialize_into_store(self.profile, sync_filter=sync_filter)
_deserialize_from_store(
self.profile, filter=filter, skip_erroring=skip_erroring
self.profile, filter=sync_filter, skip_erroring=skip_erroring
)

def create_network_connection(self, base_url, **kwargs):
Expand Down Expand Up @@ -217,7 +205,7 @@ def proceed_to_and_wait_for(
if tries >= max_interval_tries:
sleep(max_interval)
else:
sleep(0.3 * (2 ** tries - 1))
sleep(0.3 * (2**tries - 1))
result = self.proceed_to(target_stage, context=context)
tries += 1
if callable(callback):
Expand Down
39 changes: 39 additions & 0 deletions morango/sync/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import logging
from contextlib import contextmanager

from django.db import connection
from django.db import transaction

from morango.sync.backends.utils import load_backend
from morango.sync.utils import lock_partitions


logger = logging.getLogger(__name__)

DBBackend = load_backend(connection)


@contextmanager
def begin_transaction(sync_filter, isolated=False, shared_lock=False):
"""
Starts a transaction, sets the transaction isolation level to repeatable read, and locks
affected partitions

:param sync_filter: The filter for filtering applicable records of the sync
:type sync_filter: morango.models.certificates.Filter|None
:param isolated: Whether to alter the transaction isolation to repeatable-read
:type isolated: bool
:param shared_lock: Whether the advisory lock should be exclusive or shared
:type shared_lock: bool
"""
if isolated:
# when isolation is requested, we modify the transaction isolation of the connection for the
# duration of the transaction
with DBBackend._set_transaction_repeatable_read():
with transaction.atomic(savepoint=False):
lock_partitions(DBBackend, sync_filter=sync_filter, shared=shared_lock)
yield
else:
with transaction.atomic():
lock_partitions(DBBackend, sync_filter=sync_filter, shared=shared_lock)
yield
Loading
Loading