diff --git a/pychunkedgraph/app/meshing/common.py b/pychunkedgraph/app/meshing/common.py index 8f1a0c20a..37813a3f3 100644 --- a/pychunkedgraph/app/meshing/common.py +++ b/pychunkedgraph/app/meshing/common.py @@ -183,7 +183,7 @@ def _remeshing(serialized_cg_info, lvl2_nodes): cv_mesh_dir = cg.meta.dataset_info["mesh"] cv_unsharded_mesh_dir = cg.meta.dataset_info["mesh_metadata"]["unsharded_mesh_dir"] cv_unsharded_mesh_path = os.path.join( - cg.meta.data_source.WATERSHED, cv_mesh_dir, cv_unsharded_mesh_dir + cg.meta.mesh_root, cv_mesh_dir, cv_unsharded_mesh_dir ) mesh_data = cg.meta.custom_data["mesh"] diff --git a/pychunkedgraph/app/meshing/tasks.py b/pychunkedgraph/app/meshing/tasks.py index a1f11ca68..f0b4d04cf 100644 --- a/pychunkedgraph/app/meshing/tasks.py +++ b/pychunkedgraph/app/meshing/tasks.py @@ -11,7 +11,7 @@ def remeshing(table_id, lvl2_nodes): cv_mesh_dir = cg.meta.dataset_info["mesh"] cv_unsharded_mesh_dir = cg.meta.dataset_info["mesh_metadata"]["unsharded_mesh_dir"] cv_unsharded_mesh_path = os.path.join( - cg.meta.data_source.WATERSHED, cv_mesh_dir, cv_unsharded_mesh_dir + cg.meta.mesh_root, cv_mesh_dir, cv_unsharded_mesh_dir ) mesh_data = cg.meta.custom_data["mesh"] diff --git a/pychunkedgraph/graph/meta.py b/pychunkedgraph/graph/meta.py index 83d670ffe..a9582b26d 100644 --- a/pychunkedgraph/graph/meta.py +++ b/pychunkedgraph/graph/meta.py @@ -16,8 +16,8 @@ from ..utils.redis import get_redis_connection -_datasource_fields = ("EDGES", "COMPONENTS", "WATERSHED", "DATA_VERSION", "CV_MIP") -_datasource_defaults = (None, None, None, None, 0) +_datasource_fields = ("EDGES", "COMPONENTS", "WATERSHED", "DATA_VERSION", "CV_MIP", "MESH") +_datasource_defaults = (None, None, None, None, 0, None) DataSource = namedtuple( "DataSource", _datasource_fields, @@ -80,6 +80,26 @@ def data_source(self): def custom_data(self): return self._custom_data + @property + def mesh_root(self) -> str: + """Fully-qualified parent of the mesh subdir. + + Returns `data_source.MESH` when set, otherwise falls back to + `data_source.WATERSHED` so graphs predating the MESH field keep working. + Every mesh-path construction site should read this instead of WATERSHED. + """ + return self._data_source.MESH or self._data_source.WATERSHED + + def update_mesh_root(self, mesh_root: str, client) -> None: + """Set `data_source.MESH` and persist the change via the given client. + + Pass `mesh_root=None` to clear MESH (meshes revert to living under + WATERSHED). `client` is the graph storage client (typically `cg.client`) + used to write the updated meta row. + """ + self._data_source = self._data_source._replace(MESH=mesh_root) + client.update_graph_meta(self) + @property def ws_cv(self): if self._ws_cv: @@ -240,6 +260,7 @@ def dataset_info(self) -> Dict: { "chunks_start_at_voxel_offset": True, "data_dir": self.data_source.WATERSHED, + "ws_data_dir": self.data_source.WATERSHED, "graph": { "chunk_size": self.graph_config.CHUNK_SIZE, "bounding_box": [2048, 2048, 512], @@ -250,6 +271,8 @@ def dataset_info(self) -> Dict: }, } ) + if self.data_source.MESH is not None: + info["mesh_data_dir"] = self.data_source.MESH mesh_dir = self.custom_data.get("mesh", {}).get("dir", None) if mesh_dir is not None: info.update({"mesh": mesh_dir}) diff --git a/pychunkedgraph/meshing/manifest/cache.py b/pychunkedgraph/meshing/manifest/cache.py index f38a830c2..640f5beb2 100644 --- a/pychunkedgraph/meshing/manifest/cache.py +++ b/pychunkedgraph/meshing/manifest/cache.py @@ -17,6 +17,13 @@ REDIS_PASSWORD = os.environ.get("MANIFEST_CACHE_REDIS_PASSWORD", "") REDIS_URL = f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/0" +# Upper bound on cache entry lifetime. Cache misses fall through to +# authoritative storage (sharded archives / dynamic/ in CloudFiles), so +# expiry is correctness-equivalent to the LRU eviction already in effect. +MANIFEST_CACHE_TTL_SECONDS = int( + os.environ.get("MANIFEST_CACHE_TTL_SECONDS", 3 * 24 * 3600) +) + REDIS = redis.Redis.from_url(REDIS_URL, socket_connect_timeout=1) try: @@ -140,10 +147,18 @@ def _set_cached_initial_fragments( for node_id, fragment_info in fragments_d.items(): path, offset, size = fragment_info key = f"{self.namespace}:{node_id}" - pipeline.set(key, f"{path[prefix_idx:]}:{offset}:{size}") + pipeline.set( + key, + f"{path[prefix_idx:]}:{offset}:{size}", + ex=MANIFEST_CACHE_TTL_SECONDS, + ) for node_id in not_existing: - pipeline.set(f"{self.namespace}:{node_id}", DOES_NOT_EXIST) + pipeline.set( + f"{self.namespace}:{node_id}", + DOES_NOT_EXIST, + ex=MANIFEST_CACHE_TTL_SECONDS, + ) pipeline.execute() @@ -155,9 +170,17 @@ def _set_cached_dynamic_fragments( pipeline = REDIS.pipeline() for node_id, fragment in fragments_d.items(): - pipeline.set(f"{self.namespace}:{node_id}", fragment) + pipeline.set( + f"{self.namespace}:{node_id}", + fragment, + ex=MANIFEST_CACHE_TTL_SECONDS, + ) for node_id in not_existing: - pipeline.set(f"{self.namespace}:{node_id}", DOES_NOT_EXIST) + pipeline.set( + f"{self.namespace}:{node_id}", + DOES_NOT_EXIST, + ex=MANIFEST_CACHE_TTL_SECONDS, + ) pipeline.execute() diff --git a/pychunkedgraph/meshing/manifest/utils.py b/pychunkedgraph/meshing/manifest/utils.py index 67e600653..25a408bc8 100644 --- a/pychunkedgraph/meshing/manifest/utils.py +++ b/pychunkedgraph/meshing/manifest/utils.py @@ -106,7 +106,7 @@ def _get_dynamic_meshes(cg, node_ids: Sequence[np.uint64]) -> Tuple[Dict, List]: return result, not_existing mesh_dir = cg.meta.custom_data.get("mesh", {}).get("dir", "graphene_meshes") - mesh_path = f"{cg.meta.data_source.WATERSHED}/{mesh_dir}/dynamic" + mesh_path = f"{cg.meta.mesh_root}/{mesh_dir}/dynamic" cf = CloudFiles(mesh_path) manifest_cache = ManifestCache(cg.graph_id, initial=False) diff --git a/pychunkedgraph/meshing/mesh_analysis.py b/pychunkedgraph/meshing/mesh_analysis.py index 97bb28f5b..ce7ec0367 100644 --- a/pychunkedgraph/meshing/mesh_analysis.py +++ b/pychunkedgraph/meshing/mesh_analysis.py @@ -63,7 +63,7 @@ def compute_mesh_centroids_of_l2_ids(cg, l2_ids, flatten=False): "unsharded_mesh_dir" ] cv_unsharded_mesh_path = os.path.join( - cg.meta.data_source.WATERSHED, + cg.meta.mesh_root, cv_sharded_mesh_dir, cv_unsharded_mesh_dir, ) diff --git a/pychunkedgraph/meshing/meshgen_utils.py b/pychunkedgraph/meshing/meshgen_utils.py index 711c09322..59f2172f0 100644 --- a/pychunkedgraph/meshing/meshgen_utils.py +++ b/pychunkedgraph/meshing/meshgen_utils.py @@ -146,6 +146,10 @@ def get_json_info(cg): dummy_app_info = {"app": {"supported_api_versions": [0, 1]}} info = {**dataset_info, **dummy_app_info} info["mesh"] = cg.meta.custom_data.get("mesh", {}).get("dir", "graphene_meshes") + # CloudVolume's graphene driver resolves mesh paths as `data_dir + mesh`. + # When MESH is set, point data_dir at the mesh root so sharded readers + # fetch from the dedicated mesh location instead of WATERSHED. + info["data_dir"] = cg.meta.mesh_root info_str = dumps(info) return loads(info_str) diff --git a/workers/mesh_worker.py b/workers/mesh_worker.py index b8f1e0024..5e134d628 100644 --- a/workers/mesh_worker.py +++ b/workers/mesh_worker.py @@ -52,7 +52,7 @@ def callback(payload): return mesh_path = path.join( - cg.meta.data_source.WATERSHED, mesh_dir, cv_unsharded_mesh_dir + cg.meta.mesh_root, mesh_dir, cv_unsharded_mesh_dir )