Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pychunkedgraph/app/meshing/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def _remeshing(serialized_cg_info, lvl2_nodes):
cv_mesh_dir = cg.meta.dataset_info["mesh"]
cv_unsharded_mesh_dir = cg.meta.dataset_info["mesh_metadata"]["unsharded_mesh_dir"]
cv_unsharded_mesh_path = os.path.join(
cg.meta.data_source.WATERSHED, cv_mesh_dir, cv_unsharded_mesh_dir
cg.meta.mesh_root, cv_mesh_dir, cv_unsharded_mesh_dir
)
mesh_data = cg.meta.custom_data["mesh"]

Expand Down
2 changes: 1 addition & 1 deletion pychunkedgraph/app/meshing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def remeshing(table_id, lvl2_nodes):
cv_mesh_dir = cg.meta.dataset_info["mesh"]
cv_unsharded_mesh_dir = cg.meta.dataset_info["mesh_metadata"]["unsharded_mesh_dir"]
cv_unsharded_mesh_path = os.path.join(
cg.meta.data_source.WATERSHED, cv_mesh_dir, cv_unsharded_mesh_dir
cg.meta.mesh_root, cv_mesh_dir, cv_unsharded_mesh_dir
)
mesh_data = cg.meta.custom_data["mesh"]

Expand Down
27 changes: 25 additions & 2 deletions pychunkedgraph/graph/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
from ..utils.redis import get_redis_connection


_datasource_fields = ("EDGES", "COMPONENTS", "WATERSHED", "DATA_VERSION", "CV_MIP")
_datasource_defaults = (None, None, None, None, 0)
_datasource_fields = ("EDGES", "COMPONENTS", "WATERSHED", "DATA_VERSION", "CV_MIP", "MESH")
_datasource_defaults = (None, None, None, None, 0, None)
DataSource = namedtuple(
"DataSource",
_datasource_fields,
Expand Down Expand Up @@ -80,6 +80,26 @@ def data_source(self):
def custom_data(self):
return self._custom_data

@property
def mesh_root(self) -> str:
"""Fully-qualified parent of the mesh subdir.

Returns `data_source.MESH` when set, otherwise falls back to
`data_source.WATERSHED` so graphs predating the MESH field keep working.
Every mesh-path construction site should read this instead of WATERSHED.
"""
return self._data_source.MESH or self._data_source.WATERSHED

def update_mesh_root(self, mesh_root: str, client) -> None:
"""Set `data_source.MESH` and persist the change via the given client.

Pass `mesh_root=None` to clear MESH (meshes revert to living under
WATERSHED). `client` is the graph storage client (typically `cg.client`)
used to write the updated meta row.
"""
self._data_source = self._data_source._replace(MESH=mesh_root)
client.update_graph_meta(self)

@property
def ws_cv(self):
if self._ws_cv:
Expand Down Expand Up @@ -240,6 +260,7 @@ def dataset_info(self) -> Dict:
{
"chunks_start_at_voxel_offset": True,
"data_dir": self.data_source.WATERSHED,
"ws_data_dir": self.data_source.WATERSHED,
"graph": {
"chunk_size": self.graph_config.CHUNK_SIZE,
"bounding_box": [2048, 2048, 512],
Expand All @@ -250,6 +271,8 @@ def dataset_info(self) -> Dict:
},
}
)
if self.data_source.MESH is not None:
info["mesh_data_dir"] = self.data_source.MESH
mesh_dir = self.custom_data.get("mesh", {}).get("dir", None)
if mesh_dir is not None:
info.update({"mesh": mesh_dir})
Expand Down
31 changes: 27 additions & 4 deletions pychunkedgraph/meshing/manifest/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@
REDIS_PASSWORD = os.environ.get("MANIFEST_CACHE_REDIS_PASSWORD", "")
REDIS_URL = f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/0"

# Upper bound on cache entry lifetime. Cache misses fall through to
# authoritative storage (sharded archives / dynamic/ in CloudFiles), so
# expiry is correctness-equivalent to the LRU eviction already in effect.
MANIFEST_CACHE_TTL_SECONDS = int(
os.environ.get("MANIFEST_CACHE_TTL_SECONDS", 3 * 24 * 3600)
)


REDIS = redis.Redis.from_url(REDIS_URL, socket_connect_timeout=1)
try:
Expand Down Expand Up @@ -140,10 +147,18 @@ def _set_cached_initial_fragments(
for node_id, fragment_info in fragments_d.items():
path, offset, size = fragment_info
key = f"{self.namespace}:{node_id}"
pipeline.set(key, f"{path[prefix_idx:]}:{offset}:{size}")
pipeline.set(
key,
f"{path[prefix_idx:]}:{offset}:{size}",
ex=MANIFEST_CACHE_TTL_SECONDS,
)

for node_id in not_existing:
pipeline.set(f"{self.namespace}:{node_id}", DOES_NOT_EXIST)
pipeline.set(
f"{self.namespace}:{node_id}",
DOES_NOT_EXIST,
ex=MANIFEST_CACHE_TTL_SECONDS,
)

pipeline.execute()

Expand All @@ -155,9 +170,17 @@ def _set_cached_dynamic_fragments(

pipeline = REDIS.pipeline()
for node_id, fragment in fragments_d.items():
pipeline.set(f"{self.namespace}:{node_id}", fragment)
pipeline.set(
f"{self.namespace}:{node_id}",
fragment,
ex=MANIFEST_CACHE_TTL_SECONDS,
)

for node_id in not_existing:
pipeline.set(f"{self.namespace}:{node_id}", DOES_NOT_EXIST)
pipeline.set(
f"{self.namespace}:{node_id}",
DOES_NOT_EXIST,
ex=MANIFEST_CACHE_TTL_SECONDS,
)

pipeline.execute()
2 changes: 1 addition & 1 deletion pychunkedgraph/meshing/manifest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def _get_dynamic_meshes(cg, node_ids: Sequence[np.uint64]) -> Tuple[Dict, List]:
return result, not_existing

mesh_dir = cg.meta.custom_data.get("mesh", {}).get("dir", "graphene_meshes")
mesh_path = f"{cg.meta.data_source.WATERSHED}/{mesh_dir}/dynamic"
mesh_path = f"{cg.meta.mesh_root}/{mesh_dir}/dynamic"
cf = CloudFiles(mesh_path)
manifest_cache = ManifestCache(cg.graph_id, initial=False)

Expand Down
2 changes: 1 addition & 1 deletion pychunkedgraph/meshing/mesh_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def compute_mesh_centroids_of_l2_ids(cg, l2_ids, flatten=False):
"unsharded_mesh_dir"
]
cv_unsharded_mesh_path = os.path.join(
cg.meta.data_source.WATERSHED,
cg.meta.mesh_root,
cv_sharded_mesh_dir,
cv_unsharded_mesh_dir,
)
Expand Down
4 changes: 4 additions & 0 deletions pychunkedgraph/meshing/meshgen_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ def get_json_info(cg):
dummy_app_info = {"app": {"supported_api_versions": [0, 1]}}
info = {**dataset_info, **dummy_app_info}
info["mesh"] = cg.meta.custom_data.get("mesh", {}).get("dir", "graphene_meshes")
# CloudVolume's graphene driver resolves mesh paths as `data_dir + mesh`.
# When MESH is set, point data_dir at the mesh root so sharded readers
# fetch from the dedicated mesh location instead of WATERSHED.
info["data_dir"] = cg.meta.mesh_root
info_str = dumps(info)
return loads(info_str)

Expand Down
2 changes: 1 addition & 1 deletion workers/mesh_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def callback(payload):
return

mesh_path = path.join(
cg.meta.data_source.WATERSHED, mesh_dir, cv_unsharded_mesh_dir
cg.meta.mesh_root, mesh_dir, cv_unsharded_mesh_dir
)


Expand Down
Loading