From 9ff2c284f41c97de560e8ae637b1993b4df81760 Mon Sep 17 00:00:00 2001 From: Peter Tsrunchev Date: Wed, 8 Apr 2026 08:03:25 +0000 Subject: [PATCH 01/27] feat(fdb): switch to pyfdb 2.0 --- .../datacube/switching_grid_helper.py | 32 +++++-------------- pyproject.toml | 4 +-- 2 files changed, 10 insertions(+), 26 deletions(-) diff --git a/polytope_feature/datacube/switching_grid_helper.py b/polytope_feature/datacube/switching_grid_helper.py index 3abc909b1..ba11d8d20 100644 --- a/polytope_feature/datacube/switching_grid_helper.py +++ b/polytope_feature/datacube/switching_grid_helper.py @@ -16,30 +16,14 @@ def get_first_grib_message(req): # Make sure that we are accessing a single georef so that the grid is consistent assert "georef" in req.keys() - first_field = next(fdb.list(req, keys=True))["keys"] - - field = fdb.retrieve(first_field) - - # Normalize the retrieve() result into a plain `bytes` object - if hasattr(field, "read"): - # file-like object: read the contents - data = field.read() - else: - data = field - - # Convert common buffer types to bytes - if isinstance(data, bytes): - msg_bytes = data - elif isinstance(data, bytearray): - msg_bytes = bytes(data) - elif isinstance(data, memoryview): - msg_bytes = data.tobytes() - else: - # last resort: try to construct bytes (may raise) - try: - msg_bytes = bytes(data) - except Exception as e: - raise TypeError(f"Unsupported GRIB message type: {type(data)!r}") from e + # Use data_handle from the list element directly instead of a separate + # retrieve() call — avoids the list iterator polluting retrieve state. + first_element = next(fdb.list(req)) + dh = first_element.data_handle + if dh is None: + raise ValueError("List element has no data handle") + with dh: + msg_bytes = dh.read() gid = eccodes.codes_new_from_message(msg_bytes) return gid diff --git a/pyproject.toml b/pyproject.toml index bd21e4156..d6ca46559 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,7 +52,7 @@ tests = [ "h5py", "earthkit-data", "matplotlib", - "pyfdb<=0.1.3", + "pyfdb>=0.2.0", ] unstructured = [ @@ -65,7 +65,7 @@ catalogue = [ switching_grids = [ "eccodes", - "pyfdb<=0.1.3" + "pyfdb>=0.2.0" ] [project.urls] From 956cb15d9c6b1bf81ea214c921be452010c5826c Mon Sep 17 00:00:00 2001 From: Mathilde Leuridan Date: Fri, 10 Apr 2026 08:45:27 +0000 Subject: [PATCH 02/27] handle mixed step types --- .../datacube/backends/datacube.py | 1 + polytope_feature/datacube/backends/fdb.py | 2 + polytope_feature/datacube/datacube_axis.py | 109 ++++++++++++++---- .../datacube_type_change.py | 11 +- 4 files changed, 99 insertions(+), 24 deletions(-) diff --git a/polytope_feature/datacube/backends/datacube.py b/polytope_feature/datacube/backends/datacube.py index 58e761d74..9bd2bed41 100644 --- a/polytope_feature/datacube/backends/datacube.py +++ b/polytope_feature/datacube/backends/datacube.py @@ -138,6 +138,7 @@ def get_indices(self, path: DatacubePath, axis, lower, upper, method=None): """ path = self.fit_path(path) indexes = axis.find_indexes(path, self) + # print("WHAT ARE THE INDEXES HERE ", indexes) idx_between = axis.find_indices_between(indexes, lower, upper, self, method) diff --git a/polytope_feature/datacube/backends/fdb.py b/polytope_feature/datacube/backends/fdb.py index e3744e63c..6147203e9 100644 --- a/polytope_feature/datacube/backends/fdb.py +++ b/polytope_feature/datacube/backends/fdb.py @@ -175,6 +175,8 @@ def get(self, requests: TensorIndexTree, context=None): logging.debug("The requests we give GribJump are: %s", printed_list_to_gj) logging.info("Requests given to GribJump extract for %s", context) try: + # print("HERE") + # print(complete_list_complete_uncompressed_requests) iterator = self.gj.extract(complete_list_complete_uncompressed_requests, context) except Exception as e: if "BadValue: Grid hash mismatch" in str(e): diff --git a/polytope_feature/datacube/datacube_axis.py b/polytope_feature/datacube/datacube_axis.py index e6a5c4935..fd224740c 100644 --- a/polytope_feature/datacube/datacube_axis.py +++ b/polytope_feature/datacube/datacube_axis.py @@ -74,6 +74,7 @@ def find_standard_indexes(self, path, datacube): def find_indexes(self, path, datacube): indexes = self.find_standard_indexes(path, datacube) + print("WHAT ARE THE STANDARD INDEXES HERE ", indexes) for transformation in self.transformations[::-1]: indexes = transformation.find_modified_indexes(indexes, path, datacube, self) return indexes @@ -101,38 +102,104 @@ def _remap_val_to_axis_range(self, value): value = transformation._remap_val_to_axis_range(value, self) return value + # def find_standard_indices_between(self, indexes, low, up, datacube, method=None): + # indexes_between_ranges = [] + + # if self.name in datacube.complete_axes and self.name not in datacube.transformed_axes: + # # Find the range of indexes between lower and upper + # # https://pandas.pydata.org/docs/reference/api/pandas.Index.searchsorted.html + # # Assumes the indexes are already sorted (could sort to be sure) and monotonically increasing + # if method == "surrounding" or method == "nearest": + # start = indexes.searchsorted(low, "left") + # end = indexes.searchsorted(up, "right") + # start = max(start - 1, 0) + # end = min(end + 1, len(indexes)) + # indexes_between = indexes[start:end].to_list() + # indexes_between_ranges.extend(indexes_between) + # else: + # start = indexes.searchsorted(low, "left") + # end = indexes.searchsorted(up, "right") + # indexes_between = indexes[start:end].to_list() + # indexes_between_ranges.extend(indexes_between) + # else: + # if method == "surrounding" or method == "nearest": + # start = bisect.bisect_left(indexes, low) + # end = bisect.bisect_right(indexes, up) + # start = max(start - 1, 0) + # end = min(end + 1, len(indexes)) + # indexes_between = indexes[start:end] + # indexes_between_ranges.extend(indexes_between) + # else: + # lower_idx = bisect.bisect_left(indexes, low) + # upper_idx = bisect.bisect_right(indexes, up) + # indexes_between = indexes[lower_idx:upper_idx] + # indexes_between_ranges.extend(indexes_between) + # return indexes_between_ranges + + def _mixed_key(self, x): + if isinstance(x, pd.Timedelta): + return (0, x.total_seconds()) + elif isinstance(x, str): + return (1, x) + else: + return (2, x) + + def find_standard_indices_between(self, indexes, low, up, datacube, method=None): indexes_between_ranges = [] + # Ensure sorted with consistent key + try: + indexes = sorted(indexes, key=self._mixed_key) + except Exception: + indexes = list(indexes) + + # Wrap low/up with same key logic + low_k = self._mixed_key(low) + up_k = self._mixed_key(up) + + # Helper for bisect with key + class KeyWrapper: + def __init__(self, obj, key): + self.obj = obj + self.key = key + def __lt__(self, other): + return self.key(self.obj) < self.key(other.obj) + + wrapped = [KeyWrapper(x, self._mixed_key) for x in indexes] + low_w = KeyWrapper(low, self._mixed_key) + up_w = KeyWrapper(up, self._mixed_key) + if self.name in datacube.complete_axes and self.name not in datacube.transformed_axes: - # Find the range of indexes between lower and upper - # https://pandas.pydata.org/docs/reference/api/pandas.Index.searchsorted.html - # Assumes the indexes are already sorted (could sort to be sure) and monotonically increasing - if method == "surrounding" or method == "nearest": - start = indexes.searchsorted(low, "left") - end = indexes.searchsorted(up, "right") + # pandas searchsorted won't work reliably with mixed types + # so we fallback to bisect logic here too + if method in ("surrounding", "nearest"): + start = bisect.bisect_left(wrapped, low_w) + end = bisect.bisect_right(wrapped, up_w) start = max(start - 1, 0) end = min(end + 1, len(indexes)) - indexes_between = indexes[start:end].to_list() - indexes_between_ranges.extend(indexes_between) else: - start = indexes.searchsorted(low, "left") - end = indexes.searchsorted(up, "right") - indexes_between = indexes[start:end].to_list() - indexes_between_ranges.extend(indexes_between) + start = bisect.bisect_left(wrapped, low_w) + end = bisect.bisect_right(wrapped, up_w) + + indexes_between_ranges.extend(indexes[start:end]) + else: - if method == "surrounding" or method == "nearest": - start = bisect.bisect_left(indexes, low) - end = bisect.bisect_right(indexes, up) + if method in ("surrounding", "nearest"): + start = bisect.bisect_left(wrapped, low_w) + end = bisect.bisect_right(wrapped, up_w) start = max(start - 1, 0) end = min(end + 1, len(indexes)) - indexes_between = indexes[start:end] - indexes_between_ranges.extend(indexes_between) else: - lower_idx = bisect.bisect_left(indexes, low) - upper_idx = bisect.bisect_right(indexes, up) - indexes_between = indexes[lower_idx:upper_idx] - indexes_between_ranges.extend(indexes_between) + start = bisect.bisect_left(wrapped, low_w) + end = bisect.bisect_right(wrapped, up_w) + + indexes_between_ranges.extend(indexes[start:end]) + + print("WHAT ARE THE INDEXES BETWEEN ", indexes_between_ranges) + print("WHAT ARE THE LOW AND UP ", low, up) + print("WHAT ARE THE INDEXES ", indexes) + return indexes_between_ranges def find_indices_between(self, indexes_ranges, low, up, datacube, method=None): diff --git a/polytope_feature/datacube/transformations/datacube_type_change/datacube_type_change.py b/polytope_feature/datacube/transformations/datacube_type_change/datacube_type_change.py index 186facb3b..fbfa8967b 100644 --- a/polytope_feature/datacube/transformations/datacube_type_change/datacube_type_change.py +++ b/polytope_feature/datacube/transformations/datacube_type_change/datacube_type_change.py @@ -30,7 +30,8 @@ def change_val_type(self, axis_name, values): return_idx = [self._final_transformation.transform_type(val) for val in values] if None in return_idx: return None - return_idx.sort() + # return_idx.sort() + return_idx.sort(key=lambda x: (isinstance(x, str), x)) return return_idx def make_str(self, value): @@ -182,13 +183,18 @@ def __init__(self, axis_name, new_type): self._new_type = new_type def transform_type(self, value): + print("AND WHAT ABOUT HERE ", value) if isinstance(value, int): return pd.Timedelta(hours=value) if isinstance(value, str) and value.isdigit(): return pd.Timedelta(hours=int(value)) + + elif isinstance(value, str) and "-" in value: + # Step range is not parsed here + return value - if isinstance(value, str): + elif isinstance(value, str): # Extract days, hours, minutes and seconds using regex h_match = re.search(r"(\d+)\s*h", value) m_match = re.search(r"(\d+)\s*m(?:in)?", value) @@ -197,7 +203,6 @@ def transform_type(self, value): hours = int(h_match.group(1)) if h_match else 0 minutes = int(m_match.group(1)) if m_match else 0 seconds = int(s_match.group(1)) if s_match else 0 - return pd.Timedelta(hours=hours, minutes=minutes, seconds=seconds) raise ValueError(f"Unsupported timestep format: {value}") From 56a0cad268ff498babb9a0b6c7a05b3ec1476c11 Mon Sep 17 00:00:00 2001 From: Mathilde Leuridan <90444327+mathleur@users.noreply.github.com> Date: Fri, 10 Apr 2026 10:46:44 +0200 Subject: [PATCH 03/27] Update datacube.py --- polytope_feature/datacube/backends/datacube.py | 1 - 1 file changed, 1 deletion(-) diff --git a/polytope_feature/datacube/backends/datacube.py b/polytope_feature/datacube/backends/datacube.py index 9bd2bed41..58e761d74 100644 --- a/polytope_feature/datacube/backends/datacube.py +++ b/polytope_feature/datacube/backends/datacube.py @@ -138,7 +138,6 @@ def get_indices(self, path: DatacubePath, axis, lower, upper, method=None): """ path = self.fit_path(path) indexes = axis.find_indexes(path, self) - # print("WHAT ARE THE INDEXES HERE ", indexes) idx_between = axis.find_indices_between(indexes, lower, upper, self, method) From 569591852cea0ce3f9bc9a9240863f86fb7539b5 Mon Sep 17 00:00:00 2001 From: Mathilde Leuridan <90444327+mathleur@users.noreply.github.com> Date: Fri, 10 Apr 2026 10:47:37 +0200 Subject: [PATCH 04/27] Update fdb.py --- polytope_feature/datacube/backends/fdb.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/polytope_feature/datacube/backends/fdb.py b/polytope_feature/datacube/backends/fdb.py index 6147203e9..e3744e63c 100644 --- a/polytope_feature/datacube/backends/fdb.py +++ b/polytope_feature/datacube/backends/fdb.py @@ -175,8 +175,6 @@ def get(self, requests: TensorIndexTree, context=None): logging.debug("The requests we give GribJump are: %s", printed_list_to_gj) logging.info("Requests given to GribJump extract for %s", context) try: - # print("HERE") - # print(complete_list_complete_uncompressed_requests) iterator = self.gj.extract(complete_list_complete_uncompressed_requests, context) except Exception as e: if "BadValue: Grid hash mismatch" in str(e): From fc45637fe3a4ca65792c5347c2fa48b94c48d283 Mon Sep 17 00:00:00 2001 From: Mathilde Leuridan <90444327+mathleur@users.noreply.github.com> Date: Fri, 10 Apr 2026 10:48:16 +0200 Subject: [PATCH 05/27] Update datacube_type_change.py --- .../transformations/datacube_type_change/datacube_type_change.py | 1 - 1 file changed, 1 deletion(-) diff --git a/polytope_feature/datacube/transformations/datacube_type_change/datacube_type_change.py b/polytope_feature/datacube/transformations/datacube_type_change/datacube_type_change.py index fbfa8967b..c85c864c7 100644 --- a/polytope_feature/datacube/transformations/datacube_type_change/datacube_type_change.py +++ b/polytope_feature/datacube/transformations/datacube_type_change/datacube_type_change.py @@ -183,7 +183,6 @@ def __init__(self, axis_name, new_type): self._new_type = new_type def transform_type(self, value): - print("AND WHAT ABOUT HERE ", value) if isinstance(value, int): return pd.Timedelta(hours=value) From 13ec530bb47a3f2b0654ecdf18158499d239139a Mon Sep 17 00:00:00 2001 From: Mathilde Leuridan <90444327+mathleur@users.noreply.github.com> Date: Fri, 10 Apr 2026 10:49:08 +0200 Subject: [PATCH 06/27] Update datacube_axis.py --- polytope_feature/datacube/datacube_axis.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/polytope_feature/datacube/datacube_axis.py b/polytope_feature/datacube/datacube_axis.py index fd224740c..fd78612a9 100644 --- a/polytope_feature/datacube/datacube_axis.py +++ b/polytope_feature/datacube/datacube_axis.py @@ -74,7 +74,6 @@ def find_standard_indexes(self, path, datacube): def find_indexes(self, path, datacube): indexes = self.find_standard_indexes(path, datacube) - print("WHAT ARE THE STANDARD INDEXES HERE ", indexes) for transformation in self.transformations[::-1]: indexes = transformation.find_modified_indexes(indexes, path, datacube, self) return indexes @@ -195,10 +194,6 @@ def __lt__(self, other): end = bisect.bisect_right(wrapped, up_w) indexes_between_ranges.extend(indexes[start:end]) - - print("WHAT ARE THE INDEXES BETWEEN ", indexes_between_ranges) - print("WHAT ARE THE LOW AND UP ", low, up) - print("WHAT ARE THE INDEXES ", indexes) return indexes_between_ranges From e6621e7f97f7e5108fe24d159275267a830ddb08 Mon Sep 17 00:00:00 2001 From: mathleur Date: Fri, 10 Apr 2026 11:12:46 +0200 Subject: [PATCH 07/27] fix tests and rethink how to find indexes between, one slow way for mixed types which can be forwarded to fast bisection method for only one type --- polytope_feature/datacube/datacube_axis.py | 111 +++++++++------------ 1 file changed, 49 insertions(+), 62 deletions(-) diff --git a/polytope_feature/datacube/datacube_axis.py b/polytope_feature/datacube/datacube_axis.py index fd78612a9..784091e8e 100644 --- a/polytope_feature/datacube/datacube_axis.py +++ b/polytope_feature/datacube/datacube_axis.py @@ -101,6 +101,55 @@ def _remap_val_to_axis_range(self, value): value = transformation._remap_val_to_axis_range(value, self) return value + def _mixed_key(self, x): + + if isinstance(x, pd.Timedelta): + return (0, x.total_seconds()) + elif isinstance(x, str): + return (1, x) + else: + return (2, x) + + def _is_mixed(self, indexes): + types = {type(x) for x in indexes} + return len(types) > 1 + + def find_standard_indices_between(self, indexes, low, up, datacube, method=None): + + indexes_list = list(indexes) + + # If homogeneous → use fast path + if not self._is_mixed(indexes_list): + + if method in ("surrounding", "nearest"): + start = bisect.bisect_left(indexes_list, low) + end = bisect.bisect_right(indexes_list, up) + start = max(start - 1, 0) + end = min(end + 1, len(indexes_list)) + else: + start = bisect.bisect_left(indexes_list, low) + end = bisect.bisect_right(indexes_list, up) + + return indexes_list[start:end] + + # Mixed types → fallback (robust) + low_k = self._mixed_key(low) + up_k = self._mixed_key(up) + + filtered = [x for x in indexes_list if low_k <= self._mixed_key(x) <= up_k] + + if method in ("surrounding", "nearest") and filtered: + # add neighbors manually + first_idx = indexes_list.index(filtered[0]) + last_idx = indexes_list.index(filtered[-1]) + + start = max(first_idx - 1, 0) + end = min(last_idx + 2, len(indexes_list)) + + return indexes_list[start:end] + + return filtered + # def find_standard_indices_between(self, indexes, low, up, datacube, method=None): # indexes_between_ranges = [] @@ -135,68 +184,6 @@ def _remap_val_to_axis_range(self, value): # indexes_between_ranges.extend(indexes_between) # return indexes_between_ranges - def _mixed_key(self, x): - if isinstance(x, pd.Timedelta): - return (0, x.total_seconds()) - elif isinstance(x, str): - return (1, x) - else: - return (2, x) - - - def find_standard_indices_between(self, indexes, low, up, datacube, method=None): - indexes_between_ranges = [] - - # Ensure sorted with consistent key - try: - indexes = sorted(indexes, key=self._mixed_key) - except Exception: - indexes = list(indexes) - - # Wrap low/up with same key logic - low_k = self._mixed_key(low) - up_k = self._mixed_key(up) - - # Helper for bisect with key - class KeyWrapper: - def __init__(self, obj, key): - self.obj = obj - self.key = key - def __lt__(self, other): - return self.key(self.obj) < self.key(other.obj) - - wrapped = [KeyWrapper(x, self._mixed_key) for x in indexes] - low_w = KeyWrapper(low, self._mixed_key) - up_w = KeyWrapper(up, self._mixed_key) - - if self.name in datacube.complete_axes and self.name not in datacube.transformed_axes: - # pandas searchsorted won't work reliably with mixed types - # so we fallback to bisect logic here too - if method in ("surrounding", "nearest"): - start = bisect.bisect_left(wrapped, low_w) - end = bisect.bisect_right(wrapped, up_w) - start = max(start - 1, 0) - end = min(end + 1, len(indexes)) - else: - start = bisect.bisect_left(wrapped, low_w) - end = bisect.bisect_right(wrapped, up_w) - - indexes_between_ranges.extend(indexes[start:end]) - - else: - if method in ("surrounding", "nearest"): - start = bisect.bisect_left(wrapped, low_w) - end = bisect.bisect_right(wrapped, up_w) - start = max(start - 1, 0) - end = min(end + 1, len(indexes)) - else: - start = bisect.bisect_left(wrapped, low_w) - end = bisect.bisect_right(wrapped, up_w) - - indexes_between_ranges.extend(indexes[start:end]) - - return indexes_between_ranges - def find_indices_between(self, indexes_ranges, low, up, datacube, method=None): indexes_between_ranges = self.find_standard_indices_between(indexes_ranges, low, up, datacube, method) for transformation in self.transformations[::-1]: From 1fb4cb03182373f618ca79c1d6f74cae8796d5fa Mon Sep 17 00:00:00 2001 From: mathleur Date: Mon, 13 Apr 2026 13:47:41 +0200 Subject: [PATCH 08/27] remove cffi test dep --- pyproject.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index d6ca46559..da696e673 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,7 +46,6 @@ dependencies = [ tests = [ "pytest", "pytest-cov", - "cffi", "eccodes", "h5netcdf", "h5py", From c650c50e673f1af8c70056389a9fb5eb20aa4640 Mon Sep 17 00:00:00 2001 From: mathleur Date: Mon, 13 Apr 2026 14:08:03 +0200 Subject: [PATCH 09/27] qa --- .../datacube_type_change/datacube_type_change.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polytope_feature/datacube/transformations/datacube_type_change/datacube_type_change.py b/polytope_feature/datacube/transformations/datacube_type_change/datacube_type_change.py index c85c864c7..58d5051cf 100644 --- a/polytope_feature/datacube/transformations/datacube_type_change/datacube_type_change.py +++ b/polytope_feature/datacube/transformations/datacube_type_change/datacube_type_change.py @@ -188,7 +188,7 @@ def transform_type(self, value): if isinstance(value, str) and value.isdigit(): return pd.Timedelta(hours=int(value)) - + elif isinstance(value, str) and "-" in value: # Step range is not parsed here return value From 4bf5ff7e6e12181a1f81e3a6e4db23746457d060 Mon Sep 17 00:00:00 2001 From: Peter Tsrunchev Date: Wed, 29 Apr 2026 19:53:32 +0000 Subject: [PATCH 10/27] Add service-backed switching grid lookup Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus --- .../datacube/switching_grid_helper.py | 218 +++--------------- .../datacube/switching_grid_local.py | 151 ++++++++++++ .../datacube/switching_grid_service.py | 57 +++++ polytope_feature/options.py | 8 +- 4 files changed, 246 insertions(+), 188 deletions(-) create mode 100644 polytope_feature/datacube/switching_grid_local.py create mode 100644 polytope_feature/datacube/switching_grid_service.py diff --git a/polytope_feature/datacube/switching_grid_helper.py b/polytope_feature/datacube/switching_grid_helper.py index ba11d8d20..faf766887 100644 --- a/polytope_feature/datacube/switching_grid_helper.py +++ b/polytope_feature/datacube/switching_grid_helper.py @@ -1,194 +1,42 @@ -import json -import math import os -import tempfile +from urllib.parse import urljoin -import eccodes +import requests -# from polytope_feature.options import MapperConfig +def lookup_grid_config_local(req): + from polytope_feature.datacube.switching_grid_local import lookup_grid_config_local as _lookup_grid_config_local -def get_first_grib_message(req): - import pyfdb + return _lookup_grid_config_local(req) - fdb = pyfdb.FDB() - # Make sure that we are accessing a single georef so that the grid is consistent - assert "georef" in req.keys() +def lookup_grid_config_remote(req, service_url, timeout=None, retries=None, retry_timeout=None): + url = urljoin(service_url.rstrip('/') + '/', 'lookup-grid-config') + if timeout is None: + timeout = float(os.environ.get('POLYTOPE_DYNAMIC_GRID_SERVICE_TIMEOUT', '1')) + if retries is None: + retries = int(os.environ.get('POLYTOPE_DYNAMIC_GRID_SERVICE_RETRIES', '1')) + if retry_timeout is None: + retry_timeout = float(os.environ.get('POLYTOPE_DYNAMIC_GRID_SERVICE_RETRY_TIMEOUT', '5')) - # Use data_handle from the list element directly instead of a separate - # retrieve() call — avoids the list iterator polluting retrieve state. - first_element = next(fdb.list(req)) - dh = first_element.data_handle - if dh is None: - raise ValueError("List element has no data handle") - with dh: - msg_bytes = dh.read() - - gid = eccodes.codes_new_from_message(msg_bytes) - return gid - - -def get_gridspec_lamebert_conformal(gid): - # Lambert lam grid - - to_rad = math.pi / 180 - - md5hash = eccodes.codes_get(gid, "md5GridSection") - - earth_round = (eccodes.codes_get(gid, "shapeOfTheEarth") == 0) or (eccodes.codes_get(gid, "shapeOfTheEarth") == 6) - - if earth_round: - if eccodes.codes_get(gid, "shapeOfTheEarth") == 6: - radius = 6371229 - elif eccodes.codes_get(gid, "shapeOfTheEarth") == 0: - radius = 6367470 - else: - # TODO: set the earth major and minor axis accordingly - pass - - nv = eccodes.codes_get(gid, "NV") - nx = eccodes.codes_get(gid, "Nx") - ny = eccodes.codes_get(gid, "Ny") - LoVInDegrees = eccodes.codes_get(gid, "LoV") / 1000000 - Dx = eccodes.codes_get(gid, "Dx") - Dy = eccodes.codes_get(gid, "Dy") - latFirstInRadians = eccodes.codes_get(gid, "latitudeOfFirstGridPoint") / 1000000 * to_rad - lonFirstInRadians = eccodes.codes_get(gid, "longitudeOfFirstGridPoint") / 1000000 * to_rad - LoVInRadians = eccodes.codes_get(gid, "LoV") / 1000000 * to_rad - Latin1InRadians = eccodes.codes_get(gid, "Latin1") / 1000000 * to_rad - Latin2InRadians = eccodes.codes_get(gid, "Latin2") / 1000000 * to_rad - LaDInRadians = eccodes.codes_get(gid, "LaD") / 1000000 * to_rad - - gridspec = { - "type": "lambert_conformal", - "earth_round": earth_round, - "radius": radius, - "nv": nv, - "nx": nx, - "ny": ny, - "LoVInDegrees": LoVInDegrees, - "Dx": Dx, - "Dy": Dy, - "latFirstInRadians": latFirstInRadians, - "lonFirstInRadians": lonFirstInRadians, - "LoVInRadians": LoVInRadians, - "Latin1InRadians": Latin1InRadians, - "Latin2InRadians": Latin2InRadians, - "LaDInRadians": LaDInRadians, - } - return (gridspec, md5hash) - - -def get_gridspec_icon(gid): - # ICON - # TODO: Need the following: - # uuid: Optional[str] = None - md5hash = eccodes.codes_get(gid, "md5GridSection") - gridspec = {} - return (gridspec, md5hash) - - -def get_gridspec_and_hash(gid): - grid_type = eccodes.codes_get(gid, "gridType") - if grid_type == "lambert_lam": - return get_gridspec_lamebert_conformal(gid) - elif grid_type == "icon": - return get_gridspec_icon(gid) - else: - raise ValueError(f"Unsupported grid type: {grid_type}") - - -# TODO: extract the right info and then write it to file, one for the grid hash and one for the actual config - - -def lookup_grid_config(req): - gid = get_first_grib_message(req) - req_georef = req["georef"] - - # Cache file stored alongside this module - GRID_CACHE_FILE = os.path.join(os.path.dirname(__file__), "grid_cache.json") - - def _load_cache(): - try: - with open(GRID_CACHE_FILE, "r", encoding="utf-8") as fh: - return json.load(fh) - except FileNotFoundError: - return {} - except Exception: - return {} - - def _save_cache(cache): - dirpath = os.path.dirname(GRID_CACHE_FILE) - os.makedirs(dirpath, exist_ok=True) - fd, tmp = tempfile.mkstemp(dir=dirpath, prefix=".grid_cache.") - try: - with os.fdopen(fd, "w", encoding="utf-8") as fh: - json.dump(cache, fh, indent=2, sort_keys=True) - os.replace(tmp, GRID_CACHE_FILE) - finally: - if os.path.exists(tmp): - try: - os.remove(tmp) - except Exception: - pass - - # Use a stable serialization of the georef as the cache key - try: - cache_key = json.dumps(req_georef, sort_keys=True, default=str) - except Exception: - cache_key = str(req_georef) - - cache = _load_cache() - - try: - if cache_key in cache: - entry = cache[cache_key] - return (entry.get("gridspec"), entry.get("md5hash")) - - gridspec, md5hash = get_gridspec_and_hash(gid) - cache[cache_key] = {"gridspec": gridspec, "md5hash": md5hash} + timeouts = [timeout] + [retry_timeout] * retries + last_error = None + for request_timeout in timeouts: try: - _save_cache(cache) - except Exception: - # Swallow cache write errors but continue to return computed value - pass - return (gridspec, md5hash) - finally: - eccodes.codes_release(gid) - - -# def gridspec_to_grid_config(gridspec, md5hash): -# if gridspec.get("type") == "lambert_conformal": -# mc = MapperConfig( -# name="mapper", -# type="lambert_conformal", -# md5_hash=md5hash, -# is_spherical=gridspec.get("earth_round"), -# radius=gridspec.get("radius"), -# nv=gridspec.get("nv"), -# nx=gridspec.get("nx"), -# ny=gridspec.get("ny"), -# LoVInDegrees=gridspec.get("LoVInDegrees"), -# Dx=gridspec.get("Dx"), -# Dy=gridspec.get("Dy"), -# latFirstInRadians=gridspec.get("latFirstInRadians"), -# lonFirstInRadians=gridspec.get("lonFirstInRadians"), -# LoVInRadians=gridspec.get("LoVInRadians"), -# Latin1InRadians=gridspec.get("Latin1InRadians"), -# Latin2InRadians=gridspec.get("Latin2InRadians"), -# LaDInRadians=gridspec.get("LaDInRadians"), -# ) -# return mc -# return None - -# def replace_grid_config_in_options(options, req): -# gridspec, md5hash = lookup_grid_config(req) -# grid_config = gridspec_to_grid_config(gridspec, md5hash) -# if grid_config is not None: -# for axis_conf in options.axis_config: -# for idx, transformation in enumerate(axis_conf.transformations): -# if getattr(transformation, "name", None) == "mapper": -# axis_conf.transformations[idx] = grid_config -# return True -# return False + response = requests.post(url, json={'request': req}, timeout=request_timeout) + response.raise_for_status() + payload = response.json() + return (payload['gridspec'], payload['md5hash']) + except (requests.Timeout, requests.ConnectionError) as exc: + last_error = exc + + if last_error is not None: + raise last_error + raise RuntimeError('dynamic grid remote lookup failed without a captured error') + + +def lookup_grid_config(req, service_url=None): + service_url = service_url or os.environ.get('POLYTOPE_DYNAMIC_GRID_SERVICE_URL') + if service_url: + return lookup_grid_config_remote(req, service_url) + return lookup_grid_config_local(req) diff --git a/polytope_feature/datacube/switching_grid_local.py b/polytope_feature/datacube/switching_grid_local.py new file mode 100644 index 000000000..d0500f5ec --- /dev/null +++ b/polytope_feature/datacube/switching_grid_local.py @@ -0,0 +1,151 @@ +import json +import math +import os +import tempfile + +import eccodes + + +def get_first_grib_message(req): + import pyfdb + + fdb = pyfdb.FDB() + + # Make sure that we are accessing a single georef so that the grid is consistent + assert 'georef' in req.keys() + + # Use data_handle from the list element directly instead of a separate + # retrieve() call — avoids the list iterator polluting retrieve state. + first_element = next(fdb.list(req)) + dh = first_element.data_handle + if dh is None: + raise ValueError('List element has no data handle') + with dh: + msg_bytes = dh.read() + + gid = eccodes.codes_new_from_message(msg_bytes) + return gid + + +def get_gridspec_lamebert_conformal(gid): + to_rad = math.pi / 180 + + md5hash = eccodes.codes_get(gid, 'md5GridSection') + + earth_round = (eccodes.codes_get(gid, 'shapeOfTheEarth') == 0) or (eccodes.codes_get(gid, 'shapeOfTheEarth') == 6) + + if earth_round: + if eccodes.codes_get(gid, 'shapeOfTheEarth') == 6: + radius = 6371229 + elif eccodes.codes_get(gid, 'shapeOfTheEarth') == 0: + radius = 6367470 + else: + radius = None + + nv = eccodes.codes_get(gid, 'NV') + nx = eccodes.codes_get(gid, 'Nx') + ny = eccodes.codes_get(gid, 'Ny') + LoVInDegrees = eccodes.codes_get(gid, 'LoV') / 1000000 + Dx = eccodes.codes_get(gid, 'Dx') + Dy = eccodes.codes_get(gid, 'Dy') + latFirstInRadians = eccodes.codes_get(gid, 'latitudeOfFirstGridPoint') / 1000000 * to_rad + lonFirstInRadians = eccodes.codes_get(gid, 'longitudeOfFirstGridPoint') / 1000000 * to_rad + LoVInRadians = eccodes.codes_get(gid, 'LoV') / 1000000 * to_rad + Latin1InRadians = eccodes.codes_get(gid, 'Latin1') / 1000000 * to_rad + Latin2InRadians = eccodes.codes_get(gid, 'Latin2') / 1000000 * to_rad + LaDInRadians = eccodes.codes_get(gid, 'LaD') / 1000000 * to_rad + + gridspec = { + 'type': 'lambert_conformal', + 'earth_round': earth_round, + 'radius': radius, + 'nv': nv, + 'nx': nx, + 'ny': ny, + 'LoVInDegrees': LoVInDegrees, + 'Dx': Dx, + 'Dy': Dy, + 'latFirstInRadians': latFirstInRadians, + 'lonFirstInRadians': lonFirstInRadians, + 'LoVInRadians': LoVInRadians, + 'Latin1InRadians': Latin1InRadians, + 'Latin2InRadians': Latin2InRadians, + 'LaDInRadians': LaDInRadians, + } + return (gridspec, md5hash) + + +def get_gridspec_icon(gid): + md5hash = eccodes.codes_get(gid, 'md5GridSection') + gridspec = {} + return (gridspec, md5hash) + + +def get_gridspec_and_hash(gid): + grid_type = eccodes.codes_get(gid, 'gridType') + if grid_type == 'lambert_lam': + return get_gridspec_lamebert_conformal(gid) + elif grid_type == 'icon': + return get_gridspec_icon(gid) + else: + raise ValueError(f'Unsupported grid type: {grid_type}') + + +def _grid_cache_file(): + return os.path.join(os.path.dirname(__file__), 'grid_cache.json') + + +def _load_cache(): + try: + with open(_grid_cache_file(), 'r', encoding='utf-8') as fh: + return json.load(fh) + except FileNotFoundError: + return {} + except Exception: + return {} + + +def _save_cache(cache): + grid_cache_file = _grid_cache_file() + dirpath = os.path.dirname(grid_cache_file) + os.makedirs(dirpath, exist_ok=True) + fd, tmp = tempfile.mkstemp(dir=dirpath, prefix='.grid_cache.') + try: + with os.fdopen(fd, 'w', encoding='utf-8') as fh: + json.dump(cache, fh, indent=2, sort_keys=True) + os.replace(tmp, grid_cache_file) + finally: + if os.path.exists(tmp): + try: + os.remove(tmp) + except Exception: + pass + + +def _cache_key(req_georef): + try: + return json.dumps(req_georef, sort_keys=True, default=str) + except Exception: + return str(req_georef) + + +def lookup_grid_config_local(req): + gid = get_first_grib_message(req) + req_georef = req['georef'] + cache = _load_cache() + cache_key = _cache_key(req_georef) + + try: + if cache_key in cache: + entry = cache[cache_key] + return (entry.get('gridspec'), entry.get('md5hash')) + + gridspec, md5hash = get_gridspec_and_hash(gid) + cache[cache_key] = {'gridspec': gridspec, 'md5hash': md5hash} + try: + _save_cache(cache) + except Exception: + pass + return (gridspec, md5hash) + finally: + eccodes.codes_release(gid) diff --git a/polytope_feature/datacube/switching_grid_service.py b/polytope_feature/datacube/switching_grid_service.py new file mode 100644 index 000000000..064563260 --- /dev/null +++ b/polytope_feature/datacube/switching_grid_service.py @@ -0,0 +1,57 @@ +import argparse +import json +import logging +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer + +from polytope_feature.datacube.switching_grid_local import lookup_grid_config_local + + +class SwitchingGridHandler(BaseHTTPRequestHandler): + def _send_json(self, status, payload): + body = json.dumps(payload).encode('utf-8') + self.send_response(status) + self.send_header('Content-Type', 'application/json') + self.send_header('Content-Length', str(len(body))) + self.end_headers() + self.wfile.write(body) + + def do_POST(self): + if self.path.rstrip('/') != '/lookup-grid-config': + self._send_json(404, {'error': 'not found'}) + return + + try: + content_length = int(self.headers.get('Content-Length', '0')) + raw = self.rfile.read(content_length) + payload = json.loads(raw.decode('utf-8') or '{}') + req = payload.get('request', payload) + gridspec, md5hash = lookup_grid_config_local(req) + self._send_json(200, {'gridspec': gridspec, 'md5hash': md5hash}) + except Exception as exc: + logging.exception('lookup-grid-config failed') + self._send_json(500, {'error': str(exc)}) + + def do_GET(self): + if self.path.rstrip('/') == '/healthz': + self._send_json(200, {'ok': True}) + return + self._send_json(404, {'error': 'not found'}) + + def log_message(self, format, *args): + logging.info('%s - %s', self.address_string(), format % args) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--host', default='127.0.0.1') + parser.add_argument('--port', type=int, default=8765) + args = parser.parse_args() + + logging.basicConfig(level=logging.INFO) + server = ThreadingHTTPServer((args.host, args.port), SwitchingGridHandler) + logging.info('Starting switching-grid service on %s:%s', args.host, args.port) + server.serve_forever() + + +if __name__ == '__main__': + main() diff --git a/polytope_feature/options.py b/polytope_feature/options.py index 6117aa318..e914342f1 100644 --- a/polytope_feature/options.py +++ b/polytope_feature/options.py @@ -86,6 +86,7 @@ class Config(ConfigModel): use_catalogue: Optional[bool] = False engine_options: Optional[Dict[str, str]] = {} dynamic_grid: Optional[bool] = False + dynamic_grid_service_url: Optional[str] = None class PolytopeOptions(ABC): @@ -99,6 +100,7 @@ def get_polytope_options(options): compressed_axes_config = config_options.compressed_axes_config pre_path = config_options.pre_path dynamic_grid = config_options.dynamic_grid + dynamic_grid_service_url = config_options.dynamic_grid_service_url alternative_axes = config_options.alternative_axes use_catalogue = config_options.use_catalogue engine_options = config_options.engine_options @@ -107,7 +109,7 @@ def get_polytope_options(options): # TODO: look at the pre-path and query the eccodes function to get the new grid option # TODO: then change the grid option inside of the axis_config try: - replaced = replace_grid_config_in_options(config_options, pre_path) + replaced = replace_grid_config_in_options(config_options, pre_path, dynamic_grid_service_url) if replaced: axis_config = config_options.axis_config except Exception as e: @@ -152,10 +154,10 @@ def gridspec_to_grid_config(gridspec, md5hash): return None -def replace_grid_config_in_options(options, req): +def replace_grid_config_in_options(options, req, service_url=None): from polytope_feature.datacube.switching_grid_helper import lookup_grid_config - gridspec, md5hash = lookup_grid_config(req) + gridspec, md5hash = lookup_grid_config(req, service_url=service_url) grid_config = gridspec_to_grid_config(gridspec, md5hash) if grid_config is not None: for axis_conf in options.axis_config: From 16f1a500bc92cb6e595585f54d699309c62d4427 Mon Sep 17 00:00:00 2001 From: Peter Tsrunchev Date: Wed, 29 Apr 2026 19:53:32 +0000 Subject: [PATCH 11/27] Add switching grid service tests Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus --- tests/test_switching_grid_service.py | 123 +++++++++++++++++++++++++++ 1 file changed, 123 insertions(+) create mode 100644 tests/test_switching_grid_service.py diff --git a/tests/test_switching_grid_service.py b/tests/test_switching_grid_service.py new file mode 100644 index 000000000..c936d6cc8 --- /dev/null +++ b/tests/test_switching_grid_service.py @@ -0,0 +1,123 @@ +import json +import threading +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer + +import requests + +from polytope_feature.datacube.switching_grid_helper import lookup_grid_config, lookup_grid_config_remote +from polytope_feature.options import PolytopeOptions + + +class _MockHandler(BaseHTTPRequestHandler): + response_payload = { + 'gridspec': { + 'type': 'lambert_conformal', + 'earth_round': True, + 'radius': 6371229, + 'nv': 0, + 'nx': 10, + 'ny': 20, + 'LoVInDegrees': 1.0, + 'Dx': 1000.0, + 'Dy': 1000.0, + 'latFirstInRadians': 0.1, + 'lonFirstInRadians': 0.2, + 'LoVInRadians': 0.3, + 'Latin1InRadians': 0.4, + 'Latin2InRadians': 0.5, + 'LaDInRadians': 0.6, + }, + 'md5hash': 'abc123', + } + seen_request = None + + def do_POST(self): + length = int(self.headers.get('Content-Length', '0')) + payload = json.loads(self.rfile.read(length).decode('utf-8')) + _MockHandler.seen_request = payload + body = json.dumps(_MockHandler.response_payload).encode('utf-8') + self.send_response(200) + self.send_header('Content-Type', 'application/json') + self.send_header('Content-Length', str(len(body))) + self.end_headers() + self.wfile.write(body) + + def log_message(self, format, *args): + return + + +class _MockServer: + def __enter__(self): + self.server = ThreadingHTTPServer(('127.0.0.1', 0), _MockHandler) + self.thread = threading.Thread(target=self.server.serve_forever, daemon=True) + self.thread.start() + host, port = self.server.server_address + self.url = f'http://{host}:{port}' + return self.url + + def __exit__(self, exc_type, exc, tb): + self.server.shutdown() + self.server.server_close() + self.thread.join(timeout=5) + + +def test_lookup_grid_config_remote_service(): + req = {'georef': 'u1516b', 'class': 'd1'} + with _MockServer() as url: + gridspec, md5hash = lookup_grid_config(req, service_url=url) + + assert md5hash == 'abc123' + assert gridspec['type'] == 'lambert_conformal' + assert _MockHandler.seen_request == {'request': req} + + +def test_lookup_grid_config_remote_retries_on_timeout(monkeypatch): + req = {'georef': 'u1516b', 'class': 'd1'} + calls = [] + + class _Response: + def raise_for_status(self): + return None + + def json(self): + return _MockHandler.response_payload + + def _fake_post(url, json, timeout): + calls.append(timeout) + if len(calls) == 1: + raise requests.Timeout('slow first attempt') + return _Response() + + monkeypatch.setattr(requests, 'post', _fake_post) + + gridspec, md5hash = lookup_grid_config_remote(req, 'http://example.com') + + assert md5hash == 'abc123' + assert gridspec['type'] == 'lambert_conformal' + assert calls == [1.0, 5.0] + + +def test_dynamic_grid_service_replaces_mapper_config(): + options = { + 'axis_config': [ + { + 'axis_name': 'values', + 'transformations': [ + {'name': 'mapper', 'type': 'reduced_gaussian', 'resolution': 320, 'axes': ['latitude', 'longitude']} + ], + } + ], + 'compressed_axes_config': ['longitude', 'latitude'], + 'pre_path': {'class': 'd1', 'georef': 'u1516b'}, + 'dynamic_grid': True, + } + + with _MockServer() as url: + options['dynamic_grid_service_url'] = url + axis_config, *_ = PolytopeOptions.get_polytope_options(options) + + mapper = axis_config[0].transformations[0] + assert mapper.name == 'mapper' + assert mapper.type == 'lambert_conformal' + assert mapper.md5_hash == 'abc123' + assert mapper.axes == ['latitude', 'longitude'] From 7c6e758452724bc334e03e527d01848a2dc0d694 Mon Sep 17 00:00:00 2001 From: Peter Tsrunchev Date: Thu, 30 Apr 2026 16:23:58 +0000 Subject: [PATCH 12/27] Add logging to switching grid service Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus --- .../datacube/switching_grid_service.py | 42 +++++++++++++++---- 1 file changed, 33 insertions(+), 9 deletions(-) diff --git a/polytope_feature/datacube/switching_grid_service.py b/polytope_feature/datacube/switching_grid_service.py index 064563260..11593a983 100644 --- a/polytope_feature/datacube/switching_grid_service.py +++ b/polytope_feature/datacube/switching_grid_service.py @@ -5,18 +5,26 @@ from polytope_feature.datacube.switching_grid_local import lookup_grid_config_local +LOGGER = logging.getLogger(__name__) + class SwitchingGridHandler(BaseHTTPRequestHandler): def _send_json(self, status, payload): body = json.dumps(payload).encode('utf-8') - self.send_response(status) - self.send_header('Content-Type', 'application/json') - self.send_header('Content-Length', str(len(body))) - self.end_headers() - self.wfile.write(body) + try: + self.send_response(status) + self.send_header('Content-Type', 'application/json') + self.send_header('Content-Length', str(len(body))) + self.end_headers() + self.wfile.write(body) + except BrokenPipeError: + LOGGER.warning('client disconnected before response could be sent') + except OSError: + LOGGER.exception('failed to send HTTP response') def do_POST(self): if self.path.rstrip('/') != '/lookup-grid-config': + LOGGER.warning('unknown POST path: %s', self.path) self._send_json(404, {'error': 'not found'}) return @@ -25,20 +33,30 @@ def do_POST(self): raw = self.rfile.read(content_length) payload = json.loads(raw.decode('utf-8') or '{}') req = payload.get('request', payload) + LOGGER.info('lookup-grid-config request received for georef=%s', req.get('georef', 'unknown')) gridspec, md5hash = lookup_grid_config_local(req) self._send_json(200, {'gridspec': gridspec, 'md5hash': md5hash}) + LOGGER.info('lookup-grid-config request succeeded for georef=%s', req.get('georef', 'unknown')) + except json.JSONDecodeError: + LOGGER.warning('invalid JSON payload received') + self._send_json(400, {'error': 'invalid JSON payload'}) + except (AssertionError, KeyError, ValueError) as exc: + LOGGER.warning('bad lookup-grid-config request: %s', exc) + self._send_json(400, {'error': str(exc)}) except Exception as exc: - logging.exception('lookup-grid-config failed') + LOGGER.exception('lookup-grid-config failed') self._send_json(500, {'error': str(exc)}) def do_GET(self): if self.path.rstrip('/') == '/healthz': + LOGGER.debug('health check requested') self._send_json(200, {'ok': True}) return + LOGGER.warning('unknown GET path: %s', self.path) self._send_json(404, {'error': 'not found'}) def log_message(self, format, *args): - logging.info('%s - %s', self.address_string(), format % args) + LOGGER.info('%s - %s', self.address_string(), format % args) def main(): @@ -49,8 +67,14 @@ def main(): logging.basicConfig(level=logging.INFO) server = ThreadingHTTPServer((args.host, args.port), SwitchingGridHandler) - logging.info('Starting switching-grid service on %s:%s', args.host, args.port) - server.serve_forever() + LOGGER.info('Starting switching-grid service on %s:%s', args.host, args.port) + try: + server.serve_forever() + except KeyboardInterrupt: + LOGGER.info('Stopping switching-grid service') + finally: + server.server_close() + LOGGER.info('Switching-grid service stopped') if __name__ == '__main__': From a79546d9ab07eba1aafefcf1a0e9d5497fad8b01 Mon Sep 17 00:00:00 2001 From: Peter Tsrunchev Date: Fri, 1 May 2026 01:25:28 +0000 Subject: [PATCH 13/27] fix: fail when dynamic grid has been requested but fails --- polytope_feature/options.py | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/polytope_feature/options.py b/polytope_feature/options.py index e914342f1..a6727e479 100644 --- a/polytope_feature/options.py +++ b/polytope_feature/options.py @@ -108,16 +108,9 @@ def get_polytope_options(options): if dynamic_grid: # TODO: look at the pre-path and query the eccodes function to get the new grid option # TODO: then change the grid option inside of the axis_config - try: - replaced = replace_grid_config_in_options(config_options, pre_path, dynamic_grid_service_url) - if replaced: - axis_config = config_options.axis_config - except Exception as e: - logging.warning( - "Dynamic grid replacement failed for georef '%s': %s. Using static grid config.", - pre_path.get("georef", "unknown"), - e, - ) + replaced = replace_grid_config_in_options(config_options, pre_path, dynamic_grid_service_url) + if replaced: + axis_config = config_options.axis_config return ( axis_config, compressed_axes_config, From eb2372d21f77e8204144f02af8f358a9313425ea Mon Sep 17 00:00:00 2001 From: mathleur Date: Mon, 4 May 2026 16:02:00 +0200 Subject: [PATCH 14/27] unique list of timesteps --- .../datacube_type_change/datacube_type_change.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/polytope_feature/datacube/transformations/datacube_type_change/datacube_type_change.py b/polytope_feature/datacube/transformations/datacube_type_change/datacube_type_change.py index 58d5051cf..7975e8048 100644 --- a/polytope_feature/datacube/transformations/datacube_type_change/datacube_type_change.py +++ b/polytope_feature/datacube/transformations/datacube_type_change/datacube_type_change.py @@ -240,7 +240,9 @@ def make_str(self, value): return_vals.append(f"{hours}h{minutes}m") else: return_vals.append(f"{hours}h{minutes}m{seconds}s") - return return_vals + # return return_vals + unique_list = list(dict.fromkeys(return_vals)) + return unique_list _type_to_datacube_type_change_lookup = { From d9f9d638fb868cd8aa1b6ad0f6c242c57e8667b0 Mon Sep 17 00:00:00 2001 From: Peter Tsrunchev Date: Tue, 5 May 2026 14:32:26 +0000 Subject: [PATCH 15/27] chore(config): meaningful error when axis engine is not specified --- polytope_feature/polytope.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/polytope_feature/polytope.py b/polytope_feature/polytope.py index 7f45a776e..02fbc739c 100644 --- a/polytope_feature/polytope.py +++ b/polytope_feature/polytope.py @@ -174,6 +174,8 @@ def slice(self, datacube, polytopes: List[ConvexPolytope]): return request def find_engine(self, ax): + if ax.name not in self.engine_options: + raise ValueError(f"No engine specified for axis {ax.name}") slicer_type = self.engine_options[ax.name] return self.engines[slicer_type] From d1737cc2a70554441661451cadcb80816b1df132 Mon Sep 17 00:00:00 2001 From: Peter Tsrunchev Date: Thu, 7 May 2026 17:07:40 +0000 Subject: [PATCH 16/27] chore: remove grid service (moved to polytope-server --- .../datacube/switching_grid_service.py | 81 ------------------- 1 file changed, 81 deletions(-) delete mode 100644 polytope_feature/datacube/switching_grid_service.py diff --git a/polytope_feature/datacube/switching_grid_service.py b/polytope_feature/datacube/switching_grid_service.py deleted file mode 100644 index 11593a983..000000000 --- a/polytope_feature/datacube/switching_grid_service.py +++ /dev/null @@ -1,81 +0,0 @@ -import argparse -import json -import logging -from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer - -from polytope_feature.datacube.switching_grid_local import lookup_grid_config_local - -LOGGER = logging.getLogger(__name__) - - -class SwitchingGridHandler(BaseHTTPRequestHandler): - def _send_json(self, status, payload): - body = json.dumps(payload).encode('utf-8') - try: - self.send_response(status) - self.send_header('Content-Type', 'application/json') - self.send_header('Content-Length', str(len(body))) - self.end_headers() - self.wfile.write(body) - except BrokenPipeError: - LOGGER.warning('client disconnected before response could be sent') - except OSError: - LOGGER.exception('failed to send HTTP response') - - def do_POST(self): - if self.path.rstrip('/') != '/lookup-grid-config': - LOGGER.warning('unknown POST path: %s', self.path) - self._send_json(404, {'error': 'not found'}) - return - - try: - content_length = int(self.headers.get('Content-Length', '0')) - raw = self.rfile.read(content_length) - payload = json.loads(raw.decode('utf-8') or '{}') - req = payload.get('request', payload) - LOGGER.info('lookup-grid-config request received for georef=%s', req.get('georef', 'unknown')) - gridspec, md5hash = lookup_grid_config_local(req) - self._send_json(200, {'gridspec': gridspec, 'md5hash': md5hash}) - LOGGER.info('lookup-grid-config request succeeded for georef=%s', req.get('georef', 'unknown')) - except json.JSONDecodeError: - LOGGER.warning('invalid JSON payload received') - self._send_json(400, {'error': 'invalid JSON payload'}) - except (AssertionError, KeyError, ValueError) as exc: - LOGGER.warning('bad lookup-grid-config request: %s', exc) - self._send_json(400, {'error': str(exc)}) - except Exception as exc: - LOGGER.exception('lookup-grid-config failed') - self._send_json(500, {'error': str(exc)}) - - def do_GET(self): - if self.path.rstrip('/') == '/healthz': - LOGGER.debug('health check requested') - self._send_json(200, {'ok': True}) - return - LOGGER.warning('unknown GET path: %s', self.path) - self._send_json(404, {'error': 'not found'}) - - def log_message(self, format, *args): - LOGGER.info('%s - %s', self.address_string(), format % args) - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument('--host', default='127.0.0.1') - parser.add_argument('--port', type=int, default=8765) - args = parser.parse_args() - - logging.basicConfig(level=logging.INFO) - server = ThreadingHTTPServer((args.host, args.port), SwitchingGridHandler) - LOGGER.info('Starting switching-grid service on %s:%s', args.host, args.port) - try: - server.serve_forever() - except KeyboardInterrupt: - LOGGER.info('Stopping switching-grid service') - finally: - server.server_close() - LOGGER.info('Switching-grid service stopped') - - -if __name__ == '__main__': - main() From c3765f5696508c9044378a0dd92d53fdb9d33d04 Mon Sep 17 00:00:00 2001 From: Peter Tsrunchev Date: Fri, 8 May 2026 09:01:40 +0000 Subject: [PATCH 17/27] fix(switching grids): don't fdb read if cache hits, handle missing georef in local grid lookup, tests --- .../datacube/switching_grid_local.py | 102 +++++++++--------- tests/test_switching_grid_service.py | 58 ++++++++++ 2 files changed, 109 insertions(+), 51 deletions(-) diff --git a/polytope_feature/datacube/switching_grid_local.py b/polytope_feature/datacube/switching_grid_local.py index d0500f5ec..27023c96e 100644 --- a/polytope_feature/datacube/switching_grid_local.py +++ b/polytope_feature/datacube/switching_grid_local.py @@ -11,15 +11,12 @@ def get_first_grib_message(req): fdb = pyfdb.FDB() - # Make sure that we are accessing a single georef so that the grid is consistent - assert 'georef' in req.keys() - # Use data_handle from the list element directly instead of a separate # retrieve() call — avoids the list iterator polluting retrieve state. first_element = next(fdb.list(req)) dh = first_element.data_handle if dh is None: - raise ValueError('List element has no data handle') + raise ValueError("List element has no data handle") with dh: msg_bytes = dh.read() @@ -30,74 +27,74 @@ def get_first_grib_message(req): def get_gridspec_lamebert_conformal(gid): to_rad = math.pi / 180 - md5hash = eccodes.codes_get(gid, 'md5GridSection') + md5hash = eccodes.codes_get(gid, "md5GridSection") - earth_round = (eccodes.codes_get(gid, 'shapeOfTheEarth') == 0) or (eccodes.codes_get(gid, 'shapeOfTheEarth') == 6) + earth_round = (eccodes.codes_get(gid, "shapeOfTheEarth") == 0) or (eccodes.codes_get(gid, "shapeOfTheEarth") == 6) if earth_round: - if eccodes.codes_get(gid, 'shapeOfTheEarth') == 6: + if eccodes.codes_get(gid, "shapeOfTheEarth") == 6: radius = 6371229 - elif eccodes.codes_get(gid, 'shapeOfTheEarth') == 0: + elif eccodes.codes_get(gid, "shapeOfTheEarth") == 0: radius = 6367470 else: radius = None - nv = eccodes.codes_get(gid, 'NV') - nx = eccodes.codes_get(gid, 'Nx') - ny = eccodes.codes_get(gid, 'Ny') - LoVInDegrees = eccodes.codes_get(gid, 'LoV') / 1000000 - Dx = eccodes.codes_get(gid, 'Dx') - Dy = eccodes.codes_get(gid, 'Dy') - latFirstInRadians = eccodes.codes_get(gid, 'latitudeOfFirstGridPoint') / 1000000 * to_rad - lonFirstInRadians = eccodes.codes_get(gid, 'longitudeOfFirstGridPoint') / 1000000 * to_rad - LoVInRadians = eccodes.codes_get(gid, 'LoV') / 1000000 * to_rad - Latin1InRadians = eccodes.codes_get(gid, 'Latin1') / 1000000 * to_rad - Latin2InRadians = eccodes.codes_get(gid, 'Latin2') / 1000000 * to_rad - LaDInRadians = eccodes.codes_get(gid, 'LaD') / 1000000 * to_rad + nv = eccodes.codes_get(gid, "NV") + nx = eccodes.codes_get(gid, "Nx") + ny = eccodes.codes_get(gid, "Ny") + LoVInDegrees = eccodes.codes_get(gid, "LoV") / 1000000 + Dx = eccodes.codes_get(gid, "Dx") + Dy = eccodes.codes_get(gid, "Dy") + latFirstInRadians = eccodes.codes_get(gid, "latitudeOfFirstGridPoint") / 1000000 * to_rad + lonFirstInRadians = eccodes.codes_get(gid, "longitudeOfFirstGridPoint") / 1000000 * to_rad + LoVInRadians = eccodes.codes_get(gid, "LoV") / 1000000 * to_rad + Latin1InRadians = eccodes.codes_get(gid, "Latin1") / 1000000 * to_rad + Latin2InRadians = eccodes.codes_get(gid, "Latin2") / 1000000 * to_rad + LaDInRadians = eccodes.codes_get(gid, "LaD") / 1000000 * to_rad gridspec = { - 'type': 'lambert_conformal', - 'earth_round': earth_round, - 'radius': radius, - 'nv': nv, - 'nx': nx, - 'ny': ny, - 'LoVInDegrees': LoVInDegrees, - 'Dx': Dx, - 'Dy': Dy, - 'latFirstInRadians': latFirstInRadians, - 'lonFirstInRadians': lonFirstInRadians, - 'LoVInRadians': LoVInRadians, - 'Latin1InRadians': Latin1InRadians, - 'Latin2InRadians': Latin2InRadians, - 'LaDInRadians': LaDInRadians, + "type": "lambert_conformal", + "earth_round": earth_round, + "radius": radius, + "nv": nv, + "nx": nx, + "ny": ny, + "LoVInDegrees": LoVInDegrees, + "Dx": Dx, + "Dy": Dy, + "latFirstInRadians": latFirstInRadians, + "lonFirstInRadians": lonFirstInRadians, + "LoVInRadians": LoVInRadians, + "Latin1InRadians": Latin1InRadians, + "Latin2InRadians": Latin2InRadians, + "LaDInRadians": LaDInRadians, } return (gridspec, md5hash) def get_gridspec_icon(gid): - md5hash = eccodes.codes_get(gid, 'md5GridSection') + md5hash = eccodes.codes_get(gid, "md5GridSection") gridspec = {} return (gridspec, md5hash) def get_gridspec_and_hash(gid): - grid_type = eccodes.codes_get(gid, 'gridType') - if grid_type == 'lambert_lam': + grid_type = eccodes.codes_get(gid, "gridType") + if grid_type == "lambert_lam": return get_gridspec_lamebert_conformal(gid) - elif grid_type == 'icon': + elif grid_type == "icon": return get_gridspec_icon(gid) else: - raise ValueError(f'Unsupported grid type: {grid_type}') + raise ValueError(f"Unsupported grid type: {grid_type}") def _grid_cache_file(): - return os.path.join(os.path.dirname(__file__), 'grid_cache.json') + return os.path.join(os.path.dirname(__file__), "grid_cache.json") def _load_cache(): try: - with open(_grid_cache_file(), 'r', encoding='utf-8') as fh: + with open(_grid_cache_file(), "r", encoding="utf-8") as fh: return json.load(fh) except FileNotFoundError: return {} @@ -109,9 +106,9 @@ def _save_cache(cache): grid_cache_file = _grid_cache_file() dirpath = os.path.dirname(grid_cache_file) os.makedirs(dirpath, exist_ok=True) - fd, tmp = tempfile.mkstemp(dir=dirpath, prefix='.grid_cache.') + fd, tmp = tempfile.mkstemp(dir=dirpath, prefix=".grid_cache.") try: - with os.fdopen(fd, 'w', encoding='utf-8') as fh: + with os.fdopen(fd, "w", encoding="utf-8") as fh: json.dump(cache, fh, indent=2, sort_keys=True) os.replace(tmp, grid_cache_file) finally: @@ -130,18 +127,21 @@ def _cache_key(req_georef): def lookup_grid_config_local(req): - gid = get_first_grib_message(req) - req_georef = req['georef'] + # Make sure that we are accessing a single georef so that the grid is consistent + if "georef" not in req.keys(): + return + req_georef = req["georef"] cache = _load_cache() cache_key = _cache_key(req_georef) - try: - if cache_key in cache: - entry = cache[cache_key] - return (entry.get('gridspec'), entry.get('md5hash')) + if cache_key in cache: + entry = cache[cache_key] + return (entry.get("gridspec"), entry.get("md5hash")) + gid = get_first_grib_message(req) + try: gridspec, md5hash = get_gridspec_and_hash(gid) - cache[cache_key] = {'gridspec': gridspec, 'md5hash': md5hash} + cache[cache_key] = {"gridspec": gridspec, "md5hash": md5hash} try: _save_cache(cache) except Exception: diff --git a/tests/test_switching_grid_service.py b/tests/test_switching_grid_service.py index c936d6cc8..aab4c1363 100644 --- a/tests/test_switching_grid_service.py +++ b/tests/test_switching_grid_service.py @@ -2,8 +2,10 @@ import threading from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +import pytest import requests +from polytope_feature.datacube import switching_grid_local from polytope_feature.datacube.switching_grid_helper import lookup_grid_config, lookup_grid_config_remote from polytope_feature.options import PolytopeOptions @@ -97,6 +99,62 @@ def _fake_post(url, json, timeout): assert calls == [1.0, 5.0] +def test_lookup_grid_config_remote_raises_on_http_error(monkeypatch): + req = {'georef': 'u1516b', 'class': 'd1'} + + class _Response: + def raise_for_status(self): + raise requests.HTTPError('service rejected request') + + def _fake_post(url, json, timeout): + return _Response() + + monkeypatch.setattr(requests, 'post', _fake_post) + + with pytest.raises(requests.HTTPError, match='service rejected request'): + lookup_grid_config_remote(req, 'http://example.com') + + +def test_lookup_grid_config_without_georef_returns_none(monkeypatch): + req = {'class': 'd1'} + monkeypatch.delenv('POLYTOPE_DYNAMIC_GRID_SERVICE_URL', raising=False) + + assert lookup_grid_config(req) is None + + +def test_lookup_grid_config_local_saves_and_uses_cache(tmp_path, monkeypatch): + req = {'georef': 'u1516b', 'class': 'd1'} + cache_file = tmp_path / 'grid_cache.json' + gridspec = {'type': 'lambert_conformal', 'nx': 10, 'ny': 20} + calls = [] + releases = [] + + monkeypatch.setattr(switching_grid_local, '_grid_cache_file', lambda: str(cache_file)) + monkeypatch.setattr(switching_grid_local, 'get_first_grib_message', lambda request: calls.append(request) or 'gid') + monkeypatch.setattr(switching_grid_local, 'get_gridspec_and_hash', lambda gid: (gridspec, 'abc123')) + monkeypatch.setattr(switching_grid_local.eccodes, 'codes_release', lambda gid: releases.append(gid)) + + assert switching_grid_local.lookup_grid_config_local(req) == (gridspec, 'abc123') + assert cache_file.exists() + + monkeypatch.setattr( + switching_grid_local, + 'get_first_grib_message', + lambda request: pytest.fail('cache hit should not read from FDB'), + ) + + assert switching_grid_local.lookup_grid_config_local(req) == (gridspec, 'abc123') + assert calls == [req] + assert releases == ['gid'] + + +def test_get_gridspec_and_hash_rejects_unsupported_grid_type(monkeypatch): + monkeypatch.setattr(switching_grid_local.eccodes, 'codes_get', lambda gid, key: 'regular_ll') + + with pytest.raises(ValueError, match='Unsupported grid type: regular_ll'): + switching_grid_local.get_gridspec_and_hash('gid') + + def test_dynamic_grid_service_replaces_mapper_config(): options = { 'axis_config': [ From f322a99fcf34e2c70815658e03e9e17f64143be5 Mon Sep 17 00:00:00 2001 From: Peter Tsrunchev Date: Fri, 8 May 2026 09:33:00 +0000 Subject: [PATCH 18/27] chore(switching grids): add in memory caching and thread lock --- .../datacube/switching_grid_local.py | 32 ++- tests/test_switching_grid_service.py | 203 ++++++++++++------ 2 files changed, 160 insertions(+), 75 deletions(-) diff --git a/polytope_feature/datacube/switching_grid_local.py b/polytope_feature/datacube/switching_grid_local.py index 27023c96e..55b5cc8e9 100644 --- a/polytope_feature/datacube/switching_grid_local.py +++ b/polytope_feature/datacube/switching_grid_local.py @@ -2,10 +2,15 @@ import math import os import tempfile +import threading import eccodes +_GRID_CACHE = None +_GRID_CACHE_LOCK = threading.Lock() + + def get_first_grib_message(req): import pyfdb @@ -126,26 +131,41 @@ def _cache_key(req_georef): return str(req_georef) +def _get_cache_locked(): + global _GRID_CACHE + if _GRID_CACHE is None: + _GRID_CACHE = _load_cache() + return _GRID_CACHE + + def lookup_grid_config_local(req): # Make sure that we are accessing a single georef so that the grid is consistent if "georef" not in req.keys(): return req_georef = req["georef"] - cache = _load_cache() cache_key = _cache_key(req_georef) - if cache_key in cache: - entry = cache[cache_key] - return (entry.get("gridspec"), entry.get("md5hash")) + with _GRID_CACHE_LOCK: + cache = _get_cache_locked() + if cache_key in cache: + entry = cache[cache_key] + return (entry.get("gridspec"), entry.get("md5hash")) gid = get_first_grib_message(req) try: gridspec, md5hash = get_gridspec_and_hash(gid) + finally: + eccodes.codes_release(gid) + + with _GRID_CACHE_LOCK: + cache = _get_cache_locked() + if cache_key in cache: + entry = cache[cache_key] + return (entry.get("gridspec"), entry.get("md5hash")) + cache[cache_key] = {"gridspec": gridspec, "md5hash": md5hash} try: _save_cache(cache) except Exception: pass return (gridspec, md5hash) - finally: - eccodes.codes_release(gid) diff --git a/tests/test_switching_grid_service.py b/tests/test_switching_grid_service.py index aab4c1363..3bc6d90a1 100644 --- a/tests/test_switching_grid_service.py +++ b/tests/test_switching_grid_service.py @@ -1,5 +1,6 @@ import json import threading +from concurrent.futures import ThreadPoolExecutor from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer import pytest @@ -10,37 +11,42 @@ from polytope_feature.options import PolytopeOptions +@pytest.fixture(autouse=True) +def reset_switching_grid_memory_cache(monkeypatch): + monkeypatch.setattr(switching_grid_local, "_GRID_CACHE", None) + + class _MockHandler(BaseHTTPRequestHandler): response_payload = { - 'gridspec': { - 'type': 'lambert_conformal', - 'earth_round': True, - 'radius': 6371229, - 'nv': 0, - 'nx': 10, - 'ny': 20, - 'LoVInDegrees': 1.0, - 'Dx': 1000.0, - 'Dy': 1000.0, - 'latFirstInRadians': 0.1, - 'lonFirstInRadians': 0.2, - 'LoVInRadians': 0.3, - 'Latin1InRadians': 0.4, - 'Latin2InRadians': 0.5, - 'LaDInRadians': 0.6, + "gridspec": { + "type": "lambert_conformal", + "earth_round": True, + "radius": 6371229, + "nv": 0, + "nx": 10, + "ny": 20, + "LoVInDegrees": 1.0, + "Dx": 1000.0, + "Dy": 1000.0, + "latFirstInRadians": 0.1, + "lonFirstInRadians": 0.2, + "LoVInRadians": 0.3, + "Latin1InRadians": 0.4, + "Latin2InRadians": 0.5, + "LaDInRadians": 0.6, }, - 'md5hash': 'abc123', + "md5hash": "abc123", } seen_request = None def do_POST(self): - length = int(self.headers.get('Content-Length', '0')) - payload = json.loads(self.rfile.read(length).decode('utf-8')) + length = int(self.headers.get("Content-Length", "0")) + payload = json.loads(self.rfile.read(length).decode("utf-8")) _MockHandler.seen_request = payload - body = json.dumps(_MockHandler.response_payload).encode('utf-8') + body = json.dumps(_MockHandler.response_payload).encode("utf-8") self.send_response(200) - self.send_header('Content-Type', 'application/json') - self.send_header('Content-Length', str(len(body))) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) @@ -50,11 +56,11 @@ def log_message(self, format, *args): class _MockServer: def __enter__(self): - self.server = ThreadingHTTPServer(('127.0.0.1', 0), _MockHandler) + self.server = ThreadingHTTPServer(("127.0.0.1", 0), _MockHandler) self.thread = threading.Thread(target=self.server.serve_forever, daemon=True) self.thread.start() host, port = self.server.server_address - self.url = f'http://{host}:{port}' + self.url = f"http://{host}:{port}" return self.url def __exit__(self, exc_type, exc, tb): @@ -64,17 +70,17 @@ def __exit__(self, exc_type, exc, tb): def test_lookup_grid_config_remote_service(): - req = {'georef': 'u1516b', 'class': 'd1'} + req = {"georef": "u1516b", "class": "d1"} with _MockServer() as url: gridspec, md5hash = lookup_grid_config(req, service_url=url) - assert md5hash == 'abc123' - assert gridspec['type'] == 'lambert_conformal' - assert _MockHandler.seen_request == {'request': req} + assert md5hash == "abc123" + assert gridspec["type"] == "lambert_conformal" + assert _MockHandler.seen_request == {"request": req} def test_lookup_grid_config_remote_retries_on_timeout(monkeypatch): - req = {'georef': 'u1516b', 'class': 'd1'} + req = {"georef": "u1516b", "class": "d1"} calls = [] class _Response: @@ -87,95 +93,154 @@ def json(self): def _fake_post(url, json, timeout): calls.append(timeout) if len(calls) == 1: - raise requests.Timeout('slow first attempt') + raise requests.Timeout("slow first attempt") return _Response() - monkeypatch.setattr(requests, 'post', _fake_post) + monkeypatch.setattr(requests, "post", _fake_post) - gridspec, md5hash = lookup_grid_config_remote(req, 'http://example.com') + gridspec, md5hash = lookup_grid_config_remote(req, "http://example.com") - assert md5hash == 'abc123' - assert gridspec['type'] == 'lambert_conformal' + assert md5hash == "abc123" + assert gridspec["type"] == "lambert_conformal" assert calls == [1.0, 5.0] def test_lookup_grid_config_remote_raises_on_http_error(monkeypatch): - req = {'georef': 'u1516b', 'class': 'd1'} + req = {"georef": "u1516b", "class": "d1"} class _Response: def raise_for_status(self): - raise requests.HTTPError('service rejected request') + raise requests.HTTPError("service rejected request") def _fake_post(url, json, timeout): return _Response() - monkeypatch.setattr(requests, 'post', _fake_post) + monkeypatch.setattr(requests, "post", _fake_post) - with pytest.raises(requests.HTTPError, match='service rejected request'): - lookup_grid_config_remote(req, 'http://example.com') + with pytest.raises(requests.HTTPError, match="service rejected request"): + lookup_grid_config_remote(req, "http://example.com") def test_lookup_grid_config_without_georef_returns_none(monkeypatch): - req = {'class': 'd1'} - monkeypatch.delenv('POLYTOPE_DYNAMIC_GRID_SERVICE_URL', raising=False) + req = {"class": "d1"} + monkeypatch.delenv("POLYTOPE_DYNAMIC_GRID_SERVICE_URL", raising=False) assert lookup_grid_config(req) is None -def test_lookup_grid_config_local_saves_and_uses_cache(tmp_path, monkeypatch): - req = {'georef': 'u1516b', 'class': 'd1'} - cache_file = tmp_path / 'grid_cache.json' - gridspec = {'type': 'lambert_conformal', 'nx': 10, 'ny': 20} +def test_lookup_grid_config_local_saves_and_uses_memory_cache(tmp_path, monkeypatch): + req = {"georef": "u1516b", "class": "d1"} + cache_file = tmp_path / "grid_cache.json" + gridspec = {"type": "lambert_conformal", "nx": 10, "ny": 20} calls = [] + load_calls = [] + save_calls = [] releases = [] + original_load_cache = switching_grid_local._load_cache + original_save_cache = switching_grid_local._save_cache + + def _load_cache(): + load_calls.append(True) + return original_load_cache() - monkeypatch.setattr(switching_grid_local, '_grid_cache_file', lambda: str(cache_file)) - monkeypatch.setattr(switching_grid_local, 'get_first_grib_message', lambda request: calls.append(request) or 'gid') - monkeypatch.setattr(switching_grid_local, 'get_gridspec_and_hash', lambda gid: (gridspec, 'abc123')) - monkeypatch.setattr(switching_grid_local.eccodes, 'codes_release', lambda gid: releases.append(gid)) + def _save_cache(cache): + save_calls.append(cache.copy()) + return original_save_cache(cache) - assert switching_grid_local.lookup_grid_config_local(req) == (gridspec, 'abc123') + monkeypatch.setattr(switching_grid_local, "_grid_cache_file", lambda: str(cache_file)) + monkeypatch.setattr(switching_grid_local, "_load_cache", _load_cache) + monkeypatch.setattr(switching_grid_local, "_save_cache", _save_cache) + monkeypatch.setattr(switching_grid_local, "get_first_grib_message", lambda request: calls.append(request) or "gid") + monkeypatch.setattr(switching_grid_local, "get_gridspec_and_hash", lambda gid: (gridspec, "abc123")) + monkeypatch.setattr(switching_grid_local.eccodes, "codes_release", lambda gid: releases.append(gid)) + + assert switching_grid_local.lookup_grid_config_local(req) == (gridspec, "abc123") assert cache_file.exists() monkeypatch.setattr( switching_grid_local, - 'get_first_grib_message', - lambda request: pytest.fail('cache hit should not read from FDB'), + "get_first_grib_message", + lambda request: pytest.fail("cache hit should not read from FDB"), ) - assert switching_grid_local.lookup_grid_config_local(req) == (gridspec, 'abc123') + assert switching_grid_local.lookup_grid_config_local(req) == (gridspec, "abc123") assert calls == [req] - assert releases == ['gid'] + assert len(load_calls) == 1 + assert len(save_calls) == 1 + assert releases == ["gid"] + + +def test_lookup_grid_config_local_retry_during_slow_lookup_saves_once(tmp_path, monkeypatch): + req = {"georef": "u1516b", "class": "d1"} + cache_file = tmp_path / "grid_cache.json" + original_save_cache = switching_grid_local._save_cache + barrier = threading.Barrier(2) + gid_lock = threading.Lock() + gids = [] + releases = [] + save_calls = [] + + def _get_first_grib_message(request): + with gid_lock: + gid = f"gid-{len(gids)}" + gids.append(gid) + # Simulate a client retry arriving while the first request is still doing + # the slow FDB read. Both requests miss the cache and do the read, but the + # second thread must not overwrite the cache after the first one saves it. + barrier.wait(timeout=6) + return gid + + def _get_gridspec_and_hash(gid): + return ({"type": "lambert_conformal", "gid": gid}, gid) + + def _save_cache(cache): + save_calls.append(cache.copy()) + return original_save_cache(cache) + + monkeypatch.setattr(switching_grid_local, "_grid_cache_file", lambda: str(cache_file)) + monkeypatch.setattr(switching_grid_local, "get_first_grib_message", _get_first_grib_message) + monkeypatch.setattr(switching_grid_local, "get_gridspec_and_hash", _get_gridspec_and_hash) + monkeypatch.setattr(switching_grid_local, "_save_cache", _save_cache) + monkeypatch.setattr(switching_grid_local.eccodes, "codes_release", lambda gid: releases.append(gid)) + + with ThreadPoolExecutor(max_workers=2) as pool: + results = list(pool.map(lambda _: switching_grid_local.lookup_grid_config_local(req), range(2))) + + saved_entry = next(iter(save_calls[0].values())) + assert results == [(saved_entry["gridspec"], saved_entry["md5hash"])] * 2 + assert len(gids) == 2 + assert len(releases) == 2 + assert len(save_calls) == 1 def test_get_gridspec_and_hash_rejects_unsupported_grid_type(monkeypatch): - monkeypatch.setattr(switching_grid_local.eccodes, 'codes_get', lambda gid, key: 'regular_ll') + monkeypatch.setattr(switching_grid_local.eccodes, "codes_get", lambda gid, key: "regular_ll") - with pytest.raises(ValueError, match='Unsupported grid type: regular_ll'): - switching_grid_local.get_gridspec_and_hash('gid') + with pytest.raises(ValueError, match="Unsupported grid type: regular_ll"): + switching_grid_local.get_gridspec_and_hash("gid") def test_dynamic_grid_service_replaces_mapper_config(): options = { - 'axis_config': [ + "axis_config": [ { - 'axis_name': 'values', - 'transformations': [ - {'name': 'mapper', 'type': 'reduced_gaussian', 'resolution': 320, 'axes': ['latitude', 'longitude']} + "axis_name": "values", + "transformations": [ + {"name": "mapper", "type": "reduced_gaussian", "resolution": 320, "axes": ["latitude", "longitude"]} ], } ], - 'compressed_axes_config': ['longitude', 'latitude'], - 'pre_path': {'class': 'd1', 'georef': 'u1516b'}, - 'dynamic_grid': True, + "compressed_axes_config": ["longitude", "latitude"], + "pre_path": {"class": "d1", "georef": "u1516b"}, + "dynamic_grid": True, } with _MockServer() as url: - options['dynamic_grid_service_url'] = url + options["dynamic_grid_service_url"] = url axis_config, *_ = PolytopeOptions.get_polytope_options(options) mapper = axis_config[0].transformations[0] - assert mapper.name == 'mapper' - assert mapper.type == 'lambert_conformal' - assert mapper.md5_hash == 'abc123' - assert mapper.axes == ['latitude', 'longitude'] + assert mapper.name == "mapper" + assert mapper.type == "lambert_conformal" + assert mapper.md5_hash == "abc123" + assert mapper.axes == ["latitude", "longitude"] From 7f12a4c8452bcaac301d29ca08c65479bc16794f Mon Sep 17 00:00:00 2001 From: Peter Tsrunchev Date: Fri, 8 May 2026 13:24:50 +0000 Subject: [PATCH 19/27] chore: isort fixes --- polytope_feature/datacube/switching_grid_helper.py | 4 +++- polytope_feature/datacube/switching_grid_local.py | 1 - tests/test_switching_grid_service.py | 5 ++++- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/polytope_feature/datacube/switching_grid_helper.py b/polytope_feature/datacube/switching_grid_helper.py index faf766887..3b5e4a5e6 100644 --- a/polytope_feature/datacube/switching_grid_helper.py +++ b/polytope_feature/datacube/switching_grid_helper.py @@ -5,7 +5,9 @@ def lookup_grid_config_local(req): - from polytope_feature.datacube.switching_grid_local import lookup_grid_config_local as _lookup_grid_config_local + from polytope_feature.datacube.switching_grid_local import ( + lookup_grid_config_local as _lookup_grid_config_local, + ) return _lookup_grid_config_local(req) diff --git a/polytope_feature/datacube/switching_grid_local.py b/polytope_feature/datacube/switching_grid_local.py index 55b5cc8e9..3f8c9bac6 100644 --- a/polytope_feature/datacube/switching_grid_local.py +++ b/polytope_feature/datacube/switching_grid_local.py @@ -6,7 +6,6 @@ import eccodes - _GRID_CACHE = None _GRID_CACHE_LOCK = threading.Lock() diff --git a/tests/test_switching_grid_service.py b/tests/test_switching_grid_service.py index 3bc6d90a1..0da322dc0 100644 --- a/tests/test_switching_grid_service.py +++ b/tests/test_switching_grid_service.py @@ -7,7 +7,10 @@ import requests from polytope_feature.datacube import switching_grid_local -from polytope_feature.datacube.switching_grid_helper import lookup_grid_config, lookup_grid_config_remote +from polytope_feature.datacube.switching_grid_helper import ( + lookup_grid_config, + lookup_grid_config_remote, +) from polytope_feature.options import PolytopeOptions From 0635bc7b278d6f14aa562dbf2b2ce1dddae4f015 Mon Sep 17 00:00:00 2001 From: Peter Tsrunchev Date: Fri, 8 May 2026 13:55:38 +0000 Subject: [PATCH 20/27] chore: remove pre-commit default version to allow other dev envs --- .pre-commit-config.yaml | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 65a9d0caf..e49a2e8ea 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,3 @@ -default_language_version: - python: python3.10 - repos: - repo: https://github.com/pycqa/isort rev: 7.0.0 @@ -16,4 +13,4 @@ repos: - repo: https://github.com/pycqa/flake8 rev: 7.3.0 # Use the latest version of flake8 hooks: - - id: flake8 \ No newline at end of file + - id: flake8 From 0866b375050b7341359e765fee1fbb7963b67471 Mon Sep 17 00:00:00 2001 From: Peter Tsrunchev Date: Fri, 8 May 2026 13:55:47 +0000 Subject: [PATCH 21/27] chore: black, flake8 --- .../datacube/switching_grid_helper.py | 16 ++++++++-------- polytope_feature/options.py | 1 - 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/polytope_feature/datacube/switching_grid_helper.py b/polytope_feature/datacube/switching_grid_helper.py index 3b5e4a5e6..092943960 100644 --- a/polytope_feature/datacube/switching_grid_helper.py +++ b/polytope_feature/datacube/switching_grid_helper.py @@ -13,32 +13,32 @@ def lookup_grid_config_local(req): def lookup_grid_config_remote(req, service_url, timeout=None, retries=None, retry_timeout=None): - url = urljoin(service_url.rstrip('/') + '/', 'lookup-grid-config') + url = urljoin(service_url.rstrip("/") + "/", "lookup-grid-config") if timeout is None: - timeout = float(os.environ.get('POLYTOPE_DYNAMIC_GRID_SERVICE_TIMEOUT', '1')) + timeout = float(os.environ.get("POLYTOPE_DYNAMIC_GRID_SERVICE_TIMEOUT", "1")) if retries is None: - retries = int(os.environ.get('POLYTOPE_DYNAMIC_GRID_SERVICE_RETRIES', '1')) + retries = int(os.environ.get("POLYTOPE_DYNAMIC_GRID_SERVICE_RETRIES", "1")) if retry_timeout is None: - retry_timeout = float(os.environ.get('POLYTOPE_DYNAMIC_GRID_SERVICE_RETRY_TIMEOUT', '5')) + retry_timeout = float(os.environ.get("POLYTOPE_DYNAMIC_GRID_SERVICE_RETRY_TIMEOUT", "5")) timeouts = [timeout] + [retry_timeout] * retries last_error = None for request_timeout in timeouts: try: - response = requests.post(url, json={'request': req}, timeout=request_timeout) + response = requests.post(url, json={"request": req}, timeout=request_timeout) response.raise_for_status() payload = response.json() - return (payload['gridspec'], payload['md5hash']) + return (payload["gridspec"], payload["md5hash"]) except (requests.Timeout, requests.ConnectionError) as exc: last_error = exc if last_error is not None: raise last_error - raise RuntimeError('dynamic grid remote lookup failed without a captured error') + raise RuntimeError("dynamic grid remote lookup failed without a captured error") def lookup_grid_config(req, service_url=None): - service_url = service_url or os.environ.get('POLYTOPE_DYNAMIC_GRID_SERVICE_URL') + service_url = service_url or os.environ.get("POLYTOPE_DYNAMIC_GRID_SERVICE_URL") if service_url: return lookup_grid_config_remote(req, service_url) return lookup_grid_config_local(req) diff --git a/polytope_feature/options.py b/polytope_feature/options.py index a6727e479..3b3c75735 100644 --- a/polytope_feature/options.py +++ b/polytope_feature/options.py @@ -1,5 +1,4 @@ import argparse -import logging from abc import ABC from typing import Dict, List, Literal, Optional, Tuple, Union From 4f48c7cd9133d158d96fd318cf54fe3c94201544 Mon Sep 17 00:00:00 2001 From: Peter Tsrunchev Date: Fri, 8 May 2026 22:36:09 +0000 Subject: [PATCH 22/27] fix: manual grib message size read for dynamic grid (remotefdb does not implement size) --- .../datacube/switching_grid_local.py | 56 ++++++++++++- tests/test_switching_grid_service.py | 82 +++++++++++++++++++ 2 files changed, 137 insertions(+), 1 deletion(-) diff --git a/polytope_feature/datacube/switching_grid_local.py b/polytope_feature/datacube/switching_grid_local.py index 3f8c9bac6..e94dfea05 100644 --- a/polytope_feature/datacube/switching_grid_local.py +++ b/polytope_feature/datacube/switching_grid_local.py @@ -8,6 +8,60 @@ _GRID_CACHE = None _GRID_CACHE_LOCK = threading.Lock() +_DEFAULT_MAX_GRIB_MESSAGE_BYTES = 512 * 1024 * 1024 + + +def _max_grib_message_bytes(): + return int(os.environ.get("POLYTOPE_MAX_GRIB_MESSAGE_BYTES", _DEFAULT_MAX_GRIB_MESSAGE_BYTES)) + + +def _read_exact(data_handle, length): + buffer = bytearray(length) + view = memoryview(buffer) + offset = 0 + + while offset < length: + if hasattr(data_handle, "readinto"): + bytes_read = data_handle.readinto(view[offset:]) + else: + chunk = data_handle.read(length - offset) + bytes_read = len(chunk) + view[offset : offset + bytes_read] = chunk + + if bytes_read <= 0: + raise EOFError(f"Short GRIB read: wanted {length} bytes, got {offset}") + offset += bytes_read + + return bytes(buffer) + + +def read_first_grib_message(data_handle): + header = _read_exact(data_handle, 8) + + if header[:4] != b"GRIB": + raise ValueError("Not a GRIB message") + + edition = header[7] + if edition == 1: + total_length = int.from_bytes(header[4:7], "big") + elif edition == 2: + header += _read_exact(data_handle, 8) + total_length = int.from_bytes(header[8:16], "big") + else: + raise ValueError(f"Unsupported GRIB edition: {edition}") + + if total_length < len(header): + raise ValueError(f"Invalid GRIB length: {total_length}") + + max_length = _max_grib_message_bytes() + if total_length > max_length: + raise ValueError(f"GRIB message length {total_length} exceeds maximum {max_length}") + + message = header + _read_exact(data_handle, total_length - len(header)) + if message[-4:] != b"7777": + raise ValueError("GRIB terminator missing") + + return message def get_first_grib_message(req): @@ -22,7 +76,7 @@ def get_first_grib_message(req): if dh is None: raise ValueError("List element has no data handle") with dh: - msg_bytes = dh.read() + msg_bytes = read_first_grib_message(dh) gid = eccodes.codes_new_from_message(msg_bytes) return gid diff --git a/tests/test_switching_grid_service.py b/tests/test_switching_grid_service.py index 0da322dc0..971a69cc7 100644 --- a/tests/test_switching_grid_service.py +++ b/tests/test_switching_grid_service.py @@ -72,6 +72,88 @@ def __exit__(self, exc_type, exc, tb): self.thread.join(timeout=5) +class _FakeDataHandle: + def __init__(self, data): + self.data = data + self.offset = 0 + + def readinto(self, buffer): + length = min(len(buffer), len(self.data) - self.offset) + if length <= 0: + return 0 + buffer[:length] = self.data[self.offset : self.offset + length] + self.offset += length + return length + + +class _FakeReadDataHandle: + def __init__(self, data): + self.data = data + self.offset = 0 + + def read(self, length): + chunk = self.data[self.offset : self.offset + length] + self.offset += len(chunk) + return chunk + + +def _grib1_message(payload=b""): + total_length = 8 + len(payload) + 4 + return b"GRIB" + total_length.to_bytes(3, "big") + b"\x01" + payload + b"7777" + + +def _grib2_message(payload=b""): + total_length = 16 + len(payload) + 4 + return b"GRIB" + b"\x00\x00\x00\x02" + total_length.to_bytes(8, "big") + payload + b"7777" + + +def test_read_first_grib_message_uses_grib1_encoded_length(): + message = _grib1_message(b"payload") + data_handle = _FakeDataHandle(message + b"NEXT") + + assert switching_grid_local.read_first_grib_message(data_handle) == message + assert data_handle.offset == len(message) + + +def test_read_first_grib_message_uses_grib2_encoded_length(): + message = _grib2_message(b"payload") + data_handle = _FakeDataHandle(message + b"NEXT") + + assert switching_grid_local.read_first_grib_message(data_handle) == message + assert data_handle.offset == len(message) + + +def test_read_first_grib_message_supports_read_without_readinto(): + message = _grib2_message(b"payload") + data_handle = _FakeReadDataHandle(message) + + assert switching_grid_local.read_first_grib_message(data_handle) == message + assert data_handle.offset == len(message) + + +def test_read_first_grib_message_rejects_missing_terminator(): + message = _grib2_message(b"payload")[:-4] + b"BAD!" + + with pytest.raises(ValueError, match="GRIB terminator missing"): + switching_grid_local.read_first_grib_message(_FakeDataHandle(message)) + + +def test_read_first_grib_message_rejects_short_reads(): + message = _grib2_message(b"payload")[:-1] + + with pytest.raises(EOFError, match="Short GRIB read"): + switching_grid_local.read_first_grib_message(_FakeDataHandle(message)) + + +def test_read_first_grib_message_rejects_oversized_message(monkeypatch): + monkeypatch.setenv("POLYTOPE_MAX_GRIB_MESSAGE_BYTES", "64") + oversized_length = 65 + header = b"GRIB" + b"\x00\x00\x00\x02" + oversized_length.to_bytes(8, "big") + + with pytest.raises(ValueError, match="exceeds maximum 64"): + switching_grid_local.read_first_grib_message(_FakeDataHandle(header)) + + def test_lookup_grid_config_remote_service(): req = {"georef": "u1516b", "class": "d1"} with _MockServer() as url: From 7b16de8de9011e1e366c9362656ad6432da2a71d Mon Sep 17 00:00:00 2001 From: Peter Tsrunchev Date: Fri, 8 May 2026 23:44:42 +0000 Subject: [PATCH 23/27] fix: use first_element.length() --- .../datacube/switching_grid_local.py | 37 +++++++++++++++---- tests/test_switching_grid_service.py | 27 +++++++++----- 2 files changed, 46 insertions(+), 18 deletions(-) diff --git a/polytope_feature/datacube/switching_grid_local.py b/polytope_feature/datacube/switching_grid_local.py index e94dfea05..59ef98639 100644 --- a/polytope_feature/datacube/switching_grid_local.py +++ b/polytope_feature/datacube/switching_grid_local.py @@ -35,8 +35,11 @@ def _read_exact(data_handle, length): return bytes(buffer) -def read_first_grib_message(data_handle): - header = _read_exact(data_handle, 8) +def _validate_grib_message(message): + if len(message) < 8: + raise EOFError(f"Short GRIB read: wanted at least 8 bytes, got {len(message)}") + + header = message[:8] if header[:4] != b"GRIB": raise ValueError("Not a GRIB message") @@ -44,24 +47,42 @@ def read_first_grib_message(data_handle): edition = header[7] if edition == 1: total_length = int.from_bytes(header[4:7], "big") + header_length = 8 elif edition == 2: - header += _read_exact(data_handle, 8) + if len(message) < 16: + raise EOFError(f"Short GRIB read: wanted at least 16 bytes, got {len(message)}") + header = message[:16] total_length = int.from_bytes(header[8:16], "big") + header_length = 16 else: raise ValueError(f"Unsupported GRIB edition: {edition}") - if total_length < len(header): + if total_length < header_length: raise ValueError(f"Invalid GRIB length: {total_length}") max_length = _max_grib_message_bytes() if total_length > max_length: raise ValueError(f"GRIB message length {total_length} exceeds maximum {max_length}") - message = header + _read_exact(data_handle, total_length - len(header)) - if message[-4:] != b"7777": + if len(message) < total_length: + raise EOFError(f"Short GRIB read: wanted {total_length} bytes, got {len(message)}") + + grib_message = message[:total_length] + if grib_message[-4:] != b"7777": raise ValueError("GRIB terminator missing") - return message + return grib_message + + +def read_first_grib_message(data_handle, data_length): + if data_length <= 0: + raise ValueError(f"Invalid data handle length: {data_length}") + + max_length = _max_grib_message_bytes() + if data_length > max_length: + raise ValueError(f"Data handle length {data_length} exceeds maximum {max_length}") + + return _validate_grib_message(_read_exact(data_handle, data_length)) def get_first_grib_message(req): @@ -76,7 +97,7 @@ def get_first_grib_message(req): if dh is None: raise ValueError("List element has no data handle") with dh: - msg_bytes = read_first_grib_message(dh) + msg_bytes = read_first_grib_message(dh, first_element.length()) gid = eccodes.codes_new_from_message(msg_bytes) return gid diff --git a/tests/test_switching_grid_service.py b/tests/test_switching_grid_service.py index 971a69cc7..0409b3443 100644 --- a/tests/test_switching_grid_service.py +++ b/tests/test_switching_grid_service.py @@ -109,25 +109,32 @@ def _grib2_message(payload=b""): def test_read_first_grib_message_uses_grib1_encoded_length(): message = _grib1_message(b"payload") - data_handle = _FakeDataHandle(message + b"NEXT") + data = message + b"NEXT" + data_handle = _FakeDataHandle(data) - assert switching_grid_local.read_first_grib_message(data_handle) == message - assert data_handle.offset == len(message) + assert switching_grid_local.read_first_grib_message(data_handle, len(data)) == message + assert data_handle.offset == len(data) def test_read_first_grib_message_uses_grib2_encoded_length(): message = _grib2_message(b"payload") - data_handle = _FakeDataHandle(message + b"NEXT") + data = message + b"NEXT" + data_handle = _FakeDataHandle(data) - assert switching_grid_local.read_first_grib_message(data_handle) == message - assert data_handle.offset == len(message) + assert switching_grid_local.read_first_grib_message(data_handle, len(data)) == message + assert data_handle.offset == len(data) + + +def test_read_first_grib_message_rejects_invalid_data_length(): + with pytest.raises(ValueError, match="Invalid data handle length"): + switching_grid_local.read_first_grib_message(_FakeDataHandle(b""), 0) def test_read_first_grib_message_supports_read_without_readinto(): message = _grib2_message(b"payload") data_handle = _FakeReadDataHandle(message) - assert switching_grid_local.read_first_grib_message(data_handle) == message + assert switching_grid_local.read_first_grib_message(data_handle, len(message)) == message assert data_handle.offset == len(message) @@ -135,14 +142,14 @@ def test_read_first_grib_message_rejects_missing_terminator(): message = _grib2_message(b"payload")[:-4] + b"BAD!" with pytest.raises(ValueError, match="GRIB terminator missing"): - switching_grid_local.read_first_grib_message(_FakeDataHandle(message)) + switching_grid_local.read_first_grib_message(_FakeDataHandle(message), len(message)) def test_read_first_grib_message_rejects_short_reads(): message = _grib2_message(b"payload")[:-1] with pytest.raises(EOFError, match="Short GRIB read"): - switching_grid_local.read_first_grib_message(_FakeDataHandle(message)) + switching_grid_local.read_first_grib_message(_FakeDataHandle(message), len(message) + 1) def test_read_first_grib_message_rejects_oversized_message(monkeypatch): @@ -151,7 +158,7 @@ def test_read_first_grib_message_rejects_oversized_message(monkeypatch): header = b"GRIB" + b"\x00\x00\x00\x02" + oversized_length.to_bytes(8, "big") with pytest.raises(ValueError, match="exceeds maximum 64"): - switching_grid_local.read_first_grib_message(_FakeDataHandle(header)) + switching_grid_local.read_first_grib_message(_FakeDataHandle(header), oversized_length) def test_lookup_grid_config_remote_service(): From 645bb3fecd624a6457e58dfe59b976116ec27cd0 Mon Sep 17 00:00:00 2001 From: Peter Tsrunchev Date: Mon, 11 May 2026 14:58:39 +0000 Subject: [PATCH 24/27] chore: remove dynamic grid resolution --- .../datacube/switching_grid_helper.py | 44 --- .../datacube/switching_grid_local.py | 245 ------------- polytope_feature/options.py | 24 -- pyproject.toml | 5 - tests/test_switching_grid_service.py | 338 ------------------ 5 files changed, 656 deletions(-) delete mode 100644 polytope_feature/datacube/switching_grid_helper.py delete mode 100644 polytope_feature/datacube/switching_grid_local.py delete mode 100644 tests/test_switching_grid_service.py diff --git a/polytope_feature/datacube/switching_grid_helper.py b/polytope_feature/datacube/switching_grid_helper.py deleted file mode 100644 index 092943960..000000000 --- a/polytope_feature/datacube/switching_grid_helper.py +++ /dev/null @@ -1,44 +0,0 @@ -import os -from urllib.parse import urljoin - -import requests - - -def lookup_grid_config_local(req): - from polytope_feature.datacube.switching_grid_local import ( - lookup_grid_config_local as _lookup_grid_config_local, - ) - - return _lookup_grid_config_local(req) - - -def lookup_grid_config_remote(req, service_url, timeout=None, retries=None, retry_timeout=None): - url = urljoin(service_url.rstrip("/") + "/", "lookup-grid-config") - if timeout is None: - timeout = float(os.environ.get("POLYTOPE_DYNAMIC_GRID_SERVICE_TIMEOUT", "1")) - if retries is None: - retries = int(os.environ.get("POLYTOPE_DYNAMIC_GRID_SERVICE_RETRIES", "1")) - if retry_timeout is None: - retry_timeout = float(os.environ.get("POLYTOPE_DYNAMIC_GRID_SERVICE_RETRY_TIMEOUT", "5")) - - timeouts = [timeout] + [retry_timeout] * retries - last_error = None - for request_timeout in timeouts: - try: - response = requests.post(url, json={"request": req}, timeout=request_timeout) - response.raise_for_status() - payload = response.json() - return (payload["gridspec"], payload["md5hash"]) - except (requests.Timeout, requests.ConnectionError) as exc: - last_error = exc - - if last_error is not None: - raise last_error - raise RuntimeError("dynamic grid remote lookup failed without a captured error") - - -def lookup_grid_config(req, service_url=None): - service_url = service_url or os.environ.get("POLYTOPE_DYNAMIC_GRID_SERVICE_URL") - if service_url: - return lookup_grid_config_remote(req, service_url) - return lookup_grid_config_local(req) diff --git a/polytope_feature/datacube/switching_grid_local.py b/polytope_feature/datacube/switching_grid_local.py deleted file mode 100644 index 59ef98639..000000000 --- a/polytope_feature/datacube/switching_grid_local.py +++ /dev/null @@ -1,245 +0,0 @@ -import json -import math -import os -import tempfile -import threading - -import eccodes - -_GRID_CACHE = None -_GRID_CACHE_LOCK = threading.Lock() -_DEFAULT_MAX_GRIB_MESSAGE_BYTES = 512 * 1024 * 1024 - - -def _max_grib_message_bytes(): - return int(os.environ.get("POLYTOPE_MAX_GRIB_MESSAGE_BYTES", _DEFAULT_MAX_GRIB_MESSAGE_BYTES)) - - -def _read_exact(data_handle, length): - buffer = bytearray(length) - view = memoryview(buffer) - offset = 0 - - while offset < length: - if hasattr(data_handle, "readinto"): - bytes_read = data_handle.readinto(view[offset:]) - else: - chunk = data_handle.read(length - offset) - bytes_read = len(chunk) - view[offset : offset + bytes_read] = chunk - - if bytes_read <= 0: - raise EOFError(f"Short GRIB read: wanted {length} bytes, got {offset}") - offset += bytes_read - - return bytes(buffer) - - -def _validate_grib_message(message): - if len(message) < 8: - raise EOFError(f"Short GRIB read: wanted at least 8 bytes, got {len(message)}") - - header = message[:8] - - if header[:4] != b"GRIB": - raise ValueError("Not a GRIB message") - - edition = header[7] - if edition == 1: - total_length = int.from_bytes(header[4:7], "big") - header_length = 8 - elif edition == 2: - if len(message) < 16: - raise EOFError(f"Short GRIB read: wanted at least 16 bytes, got {len(message)}") - header = message[:16] - total_length = int.from_bytes(header[8:16], "big") - header_length = 16 - else: - raise ValueError(f"Unsupported GRIB edition: {edition}") - - if total_length < header_length: - raise ValueError(f"Invalid GRIB length: {total_length}") - - max_length = _max_grib_message_bytes() - if total_length > max_length: - raise ValueError(f"GRIB message length {total_length} exceeds maximum {max_length}") - - if len(message) < total_length: - raise EOFError(f"Short GRIB read: wanted {total_length} bytes, got {len(message)}") - - grib_message = message[:total_length] - if grib_message[-4:] != b"7777": - raise ValueError("GRIB terminator missing") - - return grib_message - - -def read_first_grib_message(data_handle, data_length): - if data_length <= 0: - raise ValueError(f"Invalid data handle length: {data_length}") - - max_length = _max_grib_message_bytes() - if data_length > max_length: - raise ValueError(f"Data handle length {data_length} exceeds maximum {max_length}") - - return _validate_grib_message(_read_exact(data_handle, data_length)) - - -def get_first_grib_message(req): - import pyfdb - - fdb = pyfdb.FDB() - - # Use data_handle from the list element directly instead of a separate - # retrieve() call — avoids the list iterator polluting retrieve state. - first_element = next(fdb.list(req)) - dh = first_element.data_handle - if dh is None: - raise ValueError("List element has no data handle") - with dh: - msg_bytes = read_first_grib_message(dh, first_element.length()) - - gid = eccodes.codes_new_from_message(msg_bytes) - return gid - - -def get_gridspec_lamebert_conformal(gid): - to_rad = math.pi / 180 - - md5hash = eccodes.codes_get(gid, "md5GridSection") - - earth_round = (eccodes.codes_get(gid, "shapeOfTheEarth") == 0) or (eccodes.codes_get(gid, "shapeOfTheEarth") == 6) - - if earth_round: - if eccodes.codes_get(gid, "shapeOfTheEarth") == 6: - radius = 6371229 - elif eccodes.codes_get(gid, "shapeOfTheEarth") == 0: - radius = 6367470 - else: - radius = None - - nv = eccodes.codes_get(gid, "NV") - nx = eccodes.codes_get(gid, "Nx") - ny = eccodes.codes_get(gid, "Ny") - LoVInDegrees = eccodes.codes_get(gid, "LoV") / 1000000 - Dx = eccodes.codes_get(gid, "Dx") - Dy = eccodes.codes_get(gid, "Dy") - latFirstInRadians = eccodes.codes_get(gid, "latitudeOfFirstGridPoint") / 1000000 * to_rad - lonFirstInRadians = eccodes.codes_get(gid, "longitudeOfFirstGridPoint") / 1000000 * to_rad - LoVInRadians = eccodes.codes_get(gid, "LoV") / 1000000 * to_rad - Latin1InRadians = eccodes.codes_get(gid, "Latin1") / 1000000 * to_rad - Latin2InRadians = eccodes.codes_get(gid, "Latin2") / 1000000 * to_rad - LaDInRadians = eccodes.codes_get(gid, "LaD") / 1000000 * to_rad - - gridspec = { - "type": "lambert_conformal", - "earth_round": earth_round, - "radius": radius, - "nv": nv, - "nx": nx, - "ny": ny, - "LoVInDegrees": LoVInDegrees, - "Dx": Dx, - "Dy": Dy, - "latFirstInRadians": latFirstInRadians, - "lonFirstInRadians": lonFirstInRadians, - "LoVInRadians": LoVInRadians, - "Latin1InRadians": Latin1InRadians, - "Latin2InRadians": Latin2InRadians, - "LaDInRadians": LaDInRadians, - } - return (gridspec, md5hash) - - -def get_gridspec_icon(gid): - md5hash = eccodes.codes_get(gid, "md5GridSection") - gridspec = {} - return (gridspec, md5hash) - - -def get_gridspec_and_hash(gid): - grid_type = eccodes.codes_get(gid, "gridType") - if grid_type == "lambert_lam": - return get_gridspec_lamebert_conformal(gid) - elif grid_type == "icon": - return get_gridspec_icon(gid) - else: - raise ValueError(f"Unsupported grid type: {grid_type}") - - -def _grid_cache_file(): - return os.path.join(os.path.dirname(__file__), "grid_cache.json") - - -def _load_cache(): - try: - with open(_grid_cache_file(), "r", encoding="utf-8") as fh: - return json.load(fh) - except FileNotFoundError: - return {} - except Exception: - return {} - - -def _save_cache(cache): - grid_cache_file = _grid_cache_file() - dirpath = os.path.dirname(grid_cache_file) - os.makedirs(dirpath, exist_ok=True) - fd, tmp = tempfile.mkstemp(dir=dirpath, prefix=".grid_cache.") - try: - with os.fdopen(fd, "w", encoding="utf-8") as fh: - json.dump(cache, fh, indent=2, sort_keys=True) - os.replace(tmp, grid_cache_file) - finally: - if os.path.exists(tmp): - try: - os.remove(tmp) - except Exception: - pass - - -def _cache_key(req_georef): - try: - return json.dumps(req_georef, sort_keys=True, default=str) - except Exception: - return str(req_georef) - - -def _get_cache_locked(): - global _GRID_CACHE - if _GRID_CACHE is None: - _GRID_CACHE = _load_cache() - return _GRID_CACHE - - -def lookup_grid_config_local(req): - # Make sure that we are accessing a single georef so that the grid is consistent - if "georef" not in req.keys(): - return - req_georef = req["georef"] - cache_key = _cache_key(req_georef) - - with _GRID_CACHE_LOCK: - cache = _get_cache_locked() - if cache_key in cache: - entry = cache[cache_key] - return (entry.get("gridspec"), entry.get("md5hash")) - - gid = get_first_grib_message(req) - try: - gridspec, md5hash = get_gridspec_and_hash(gid) - finally: - eccodes.codes_release(gid) - - with _GRID_CACHE_LOCK: - cache = _get_cache_locked() - if cache_key in cache: - entry = cache[cache_key] - return (entry.get("gridspec"), entry.get("md5hash")) - - cache[cache_key] = {"gridspec": gridspec, "md5hash": md5hash} - try: - _save_cache(cache) - except Exception: - pass - return (gridspec, md5hash) diff --git a/polytope_feature/options.py b/polytope_feature/options.py index 3b3c75735..10791867e 100644 --- a/polytope_feature/options.py +++ b/polytope_feature/options.py @@ -84,8 +84,6 @@ class Config(ConfigModel): alternative_axes: Optional[List[GribJumpAxesConfig]] = [] use_catalogue: Optional[bool] = False engine_options: Optional[Dict[str, str]] = {} - dynamic_grid: Optional[bool] = False - dynamic_grid_service_url: Optional[str] = None class PolytopeOptions(ABC): @@ -98,18 +96,10 @@ def get_polytope_options(options): axis_config = config_options.axis_config compressed_axes_config = config_options.compressed_axes_config pre_path = config_options.pre_path - dynamic_grid = config_options.dynamic_grid - dynamic_grid_service_url = config_options.dynamic_grid_service_url alternative_axes = config_options.alternative_axes use_catalogue = config_options.use_catalogue engine_options = config_options.engine_options - if dynamic_grid: - # TODO: look at the pre-path and query the eccodes function to get the new grid option - # TODO: then change the grid option inside of the axis_config - replaced = replace_grid_config_in_options(config_options, pre_path, dynamic_grid_service_url) - if replaced: - axis_config = config_options.axis_config return ( axis_config, compressed_axes_config, @@ -144,17 +134,3 @@ def gridspec_to_grid_config(gridspec, md5hash): ) return mc return None - - -def replace_grid_config_in_options(options, req, service_url=None): - from polytope_feature.datacube.switching_grid_helper import lookup_grid_config - - gridspec, md5hash = lookup_grid_config(req, service_url=service_url) - grid_config = gridspec_to_grid_config(gridspec, md5hash) - if grid_config is not None: - for axis_conf in options.axis_config: - for idx, transformation in enumerate(axis_conf.transformations): - if getattr(transformation, "name", None) == "mapper": - axis_conf.transformations[idx] = grid_config - return True - return False diff --git a/pyproject.toml b/pyproject.toml index da696e673..489f44421 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -62,11 +62,6 @@ catalogue = [ "qubed" ] -switching_grids = [ - "eccodes", - "pyfdb>=0.2.0" -] - [project.urls] repository = "https://github.com/ecmwf/polytope" documentation = "https://polytope.readthedocs.io/en/latest/" diff --git a/tests/test_switching_grid_service.py b/tests/test_switching_grid_service.py deleted file mode 100644 index 0409b3443..000000000 --- a/tests/test_switching_grid_service.py +++ /dev/null @@ -1,338 +0,0 @@ -import json -import threading -from concurrent.futures import ThreadPoolExecutor -from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer - -import pytest -import requests - -from polytope_feature.datacube import switching_grid_local -from polytope_feature.datacube.switching_grid_helper import ( - lookup_grid_config, - lookup_grid_config_remote, -) -from polytope_feature.options import PolytopeOptions - - -@pytest.fixture(autouse=True) -def reset_switching_grid_memory_cache(monkeypatch): - monkeypatch.setattr(switching_grid_local, "_GRID_CACHE", None) - - -class _MockHandler(BaseHTTPRequestHandler): - response_payload = { - "gridspec": { - "type": "lambert_conformal", - "earth_round": True, - "radius": 6371229, - "nv": 0, - "nx": 10, - "ny": 20, - "LoVInDegrees": 1.0, - "Dx": 1000.0, - "Dy": 1000.0, - "latFirstInRadians": 0.1, - "lonFirstInRadians": 0.2, - "LoVInRadians": 0.3, - "Latin1InRadians": 0.4, - "Latin2InRadians": 0.5, - "LaDInRadians": 0.6, - }, - "md5hash": "abc123", - } - seen_request = None - - def do_POST(self): - length = int(self.headers.get("Content-Length", "0")) - payload = json.loads(self.rfile.read(length).decode("utf-8")) - _MockHandler.seen_request = payload - body = json.dumps(_MockHandler.response_payload).encode("utf-8") - self.send_response(200) - self.send_header("Content-Type", "application/json") - self.send_header("Content-Length", str(len(body))) - self.end_headers() - self.wfile.write(body) - - def log_message(self, format, *args): - return - - -class _MockServer: - def __enter__(self): - self.server = ThreadingHTTPServer(("127.0.0.1", 0), _MockHandler) - self.thread = threading.Thread(target=self.server.serve_forever, daemon=True) - self.thread.start() - host, port = self.server.server_address - self.url = f"http://{host}:{port}" - return self.url - - def __exit__(self, exc_type, exc, tb): - self.server.shutdown() - self.server.server_close() - self.thread.join(timeout=5) - - -class _FakeDataHandle: - def __init__(self, data): - self.data = data - self.offset = 0 - - def readinto(self, buffer): - length = min(len(buffer), len(self.data) - self.offset) - if length <= 0: - return 0 - buffer[:length] = self.data[self.offset : self.offset + length] - self.offset += length - return length - - -class _FakeReadDataHandle: - def __init__(self, data): - self.data = data - self.offset = 0 - - def read(self, length): - chunk = self.data[self.offset : self.offset + length] - self.offset += len(chunk) - return chunk - - -def _grib1_message(payload=b""): - total_length = 8 + len(payload) + 4 - return b"GRIB" + total_length.to_bytes(3, "big") + b"\x01" + payload + b"7777" - - -def _grib2_message(payload=b""): - total_length = 16 + len(payload) + 4 - return b"GRIB" + b"\x00\x00\x00\x02" + total_length.to_bytes(8, "big") + payload + b"7777" - - -def test_read_first_grib_message_uses_grib1_encoded_length(): - message = _grib1_message(b"payload") - data = message + b"NEXT" - data_handle = _FakeDataHandle(data) - - assert switching_grid_local.read_first_grib_message(data_handle, len(data)) == message - assert data_handle.offset == len(data) - - -def test_read_first_grib_message_uses_grib2_encoded_length(): - message = _grib2_message(b"payload") - data = message + b"NEXT" - data_handle = _FakeDataHandle(data) - - assert switching_grid_local.read_first_grib_message(data_handle, len(data)) == message - assert data_handle.offset == len(data) - - -def test_read_first_grib_message_rejects_invalid_data_length(): - with pytest.raises(ValueError, match="Invalid data handle length"): - switching_grid_local.read_first_grib_message(_FakeDataHandle(b""), 0) - - -def test_read_first_grib_message_supports_read_without_readinto(): - message = _grib2_message(b"payload") - data_handle = _FakeReadDataHandle(message) - - assert switching_grid_local.read_first_grib_message(data_handle, len(message)) == message - assert data_handle.offset == len(message) - - -def test_read_first_grib_message_rejects_missing_terminator(): - message = _grib2_message(b"payload")[:-4] + b"BAD!" - - with pytest.raises(ValueError, match="GRIB terminator missing"): - switching_grid_local.read_first_grib_message(_FakeDataHandle(message), len(message)) - - -def test_read_first_grib_message_rejects_short_reads(): - message = _grib2_message(b"payload")[:-1] - - with pytest.raises(EOFError, match="Short GRIB read"): - switching_grid_local.read_first_grib_message(_FakeDataHandle(message), len(message) + 1) - - -def test_read_first_grib_message_rejects_oversized_message(monkeypatch): - monkeypatch.setenv("POLYTOPE_MAX_GRIB_MESSAGE_BYTES", "64") - oversized_length = 65 - header = b"GRIB" + b"\x00\x00\x00\x02" + oversized_length.to_bytes(8, "big") - - with pytest.raises(ValueError, match="exceeds maximum 64"): - switching_grid_local.read_first_grib_message(_FakeDataHandle(header), oversized_length) - - -def test_lookup_grid_config_remote_service(): - req = {"georef": "u1516b", "class": "d1"} - with _MockServer() as url: - gridspec, md5hash = lookup_grid_config(req, service_url=url) - - assert md5hash == "abc123" - assert gridspec["type"] == "lambert_conformal" - assert _MockHandler.seen_request == {"request": req} - - -def test_lookup_grid_config_remote_retries_on_timeout(monkeypatch): - req = {"georef": "u1516b", "class": "d1"} - calls = [] - - class _Response: - def raise_for_status(self): - return None - - def json(self): - return _MockHandler.response_payload - - def _fake_post(url, json, timeout): - calls.append(timeout) - if len(calls) == 1: - raise requests.Timeout("slow first attempt") - return _Response() - - monkeypatch.setattr(requests, "post", _fake_post) - - gridspec, md5hash = lookup_grid_config_remote(req, "http://example.com") - - assert md5hash == "abc123" - assert gridspec["type"] == "lambert_conformal" - assert calls == [1.0, 5.0] - - -def test_lookup_grid_config_remote_raises_on_http_error(monkeypatch): - req = {"georef": "u1516b", "class": "d1"} - - class _Response: - def raise_for_status(self): - raise requests.HTTPError("service rejected request") - - def _fake_post(url, json, timeout): - return _Response() - - monkeypatch.setattr(requests, "post", _fake_post) - - with pytest.raises(requests.HTTPError, match="service rejected request"): - lookup_grid_config_remote(req, "http://example.com") - - -def test_lookup_grid_config_without_georef_returns_none(monkeypatch): - req = {"class": "d1"} - monkeypatch.delenv("POLYTOPE_DYNAMIC_GRID_SERVICE_URL", raising=False) - - assert lookup_grid_config(req) is None - - -def test_lookup_grid_config_local_saves_and_uses_memory_cache(tmp_path, monkeypatch): - req = {"georef": "u1516b", "class": "d1"} - cache_file = tmp_path / "grid_cache.json" - gridspec = {"type": "lambert_conformal", "nx": 10, "ny": 20} - calls = [] - load_calls = [] - save_calls = [] - releases = [] - original_load_cache = switching_grid_local._load_cache - original_save_cache = switching_grid_local._save_cache - - def _load_cache(): - load_calls.append(True) - return original_load_cache() - - def _save_cache(cache): - save_calls.append(cache.copy()) - return original_save_cache(cache) - - monkeypatch.setattr(switching_grid_local, "_grid_cache_file", lambda: str(cache_file)) - monkeypatch.setattr(switching_grid_local, "_load_cache", _load_cache) - monkeypatch.setattr(switching_grid_local, "_save_cache", _save_cache) - monkeypatch.setattr(switching_grid_local, "get_first_grib_message", lambda request: calls.append(request) or "gid") - monkeypatch.setattr(switching_grid_local, "get_gridspec_and_hash", lambda gid: (gridspec, "abc123")) - monkeypatch.setattr(switching_grid_local.eccodes, "codes_release", lambda gid: releases.append(gid)) - - assert switching_grid_local.lookup_grid_config_local(req) == (gridspec, "abc123") - assert cache_file.exists() - - monkeypatch.setattr( - switching_grid_local, - "get_first_grib_message", - lambda request: pytest.fail("cache hit should not read from FDB"), - ) - - assert switching_grid_local.lookup_grid_config_local(req) == (gridspec, "abc123") - assert calls == [req] - assert len(load_calls) == 1 - assert len(save_calls) == 1 - assert releases == ["gid"] - - -def test_lookup_grid_config_local_retry_during_slow_lookup_saves_once(tmp_path, monkeypatch): - req = {"georef": "u1516b", "class": "d1"} - cache_file = tmp_path / "grid_cache.json" - original_save_cache = switching_grid_local._save_cache - barrier = threading.Barrier(2) - gid_lock = threading.Lock() - gids = [] - releases = [] - save_calls = [] - - def _get_first_grib_message(request): - with gid_lock: - gid = f"gid-{len(gids)}" - gids.append(gid) - # Simulate a client retry arriving while the first request is still doing - # the slow FDB read. Both requests miss the cache and do the read, but the - # second thread must not overwrite the cache after the first one saves it. - barrier.wait(timeout=6) - return gid - - def _get_gridspec_and_hash(gid): - return ({"type": "lambert_conformal", "gid": gid}, gid) - - def _save_cache(cache): - save_calls.append(cache.copy()) - return original_save_cache(cache) - - monkeypatch.setattr(switching_grid_local, "_grid_cache_file", lambda: str(cache_file)) - monkeypatch.setattr(switching_grid_local, "get_first_grib_message", _get_first_grib_message) - monkeypatch.setattr(switching_grid_local, "get_gridspec_and_hash", _get_gridspec_and_hash) - monkeypatch.setattr(switching_grid_local, "_save_cache", _save_cache) - monkeypatch.setattr(switching_grid_local.eccodes, "codes_release", lambda gid: releases.append(gid)) - - with ThreadPoolExecutor(max_workers=2) as pool: - results = list(pool.map(lambda _: switching_grid_local.lookup_grid_config_local(req), range(2))) - - saved_entry = next(iter(save_calls[0].values())) - assert results == [(saved_entry["gridspec"], saved_entry["md5hash"])] * 2 - assert len(gids) == 2 - assert len(releases) == 2 - assert len(save_calls) == 1 - - -def test_get_gridspec_and_hash_rejects_unsupported_grid_type(monkeypatch): - monkeypatch.setattr(switching_grid_local.eccodes, "codes_get", lambda gid, key: "regular_ll") - - with pytest.raises(ValueError, match="Unsupported grid type: regular_ll"): - switching_grid_local.get_gridspec_and_hash("gid") - - -def test_dynamic_grid_service_replaces_mapper_config(): - options = { - "axis_config": [ - { - "axis_name": "values", - "transformations": [ - {"name": "mapper", "type": "reduced_gaussian", "resolution": 320, "axes": ["latitude", "longitude"]} - ], - } - ], - "compressed_axes_config": ["longitude", "latitude"], - "pre_path": {"class": "d1", "georef": "u1516b"}, - "dynamic_grid": True, - } - - with _MockServer() as url: - options["dynamic_grid_service_url"] = url - axis_config, *_ = PolytopeOptions.get_polytope_options(options) - - mapper = axis_config[0].transformations[0] - assert mapper.name == "mapper" - assert mapper.type == "lambert_conformal" - assert mapper.md5_hash == "abc123" - assert mapper.axes == ["latitude", "longitude"] From d881dec9b776a6912508430bd9d8d1f4c0fc59fc Mon Sep 17 00:00:00 2001 From: Peter Tsrunchev Date: Tue, 12 May 2026 10:09:09 +0000 Subject: [PATCH 25/27] chore: revert pyfdb update to fix debian ci --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 489f44421..3f375d353 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,7 +51,7 @@ tests = [ "h5py", "earthkit-data", "matplotlib", - "pyfdb>=0.2.0", + "pyfdb<=0.1.3", ] unstructured = [ From caf7dae8970e72474161881c1d9dbbdd197c79a4 Mon Sep 17 00:00:00 2001 From: Mathilde Leuridan <90444327+mathleur@users.noreply.github.com> Date: Mon, 1 Jun 2026 15:44:49 +0200 Subject: [PATCH 26/27] Merge pull request #523 from ecmwf/feat/fix_timestep_types fix timestep types --- .pre-commit-config.yaml | 2 ++ polytope_feature/datacube/datacube_axis.py | 20 ++++++++++++++++++- .../datacube_type_change.py | 3 +++ polytope_feature/engine/hullslicer.py | 13 +++++++++--- 4 files changed, 34 insertions(+), 4 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e49a2e8ea..0c255051e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,3 +1,5 @@ +default_language_version: + python: python3.10 repos: - repo: https://github.com/pycqa/isort rev: 7.0.0 diff --git a/polytope_feature/datacube/datacube_axis.py b/polytope_feature/datacube/datacube_axis.py index 784091e8e..8af3da1e6 100644 --- a/polytope_feature/datacube/datacube_axis.py +++ b/polytope_feature/datacube/datacube_axis.py @@ -321,16 +321,34 @@ def __init__(self): self.type = np.timedelta64(0, "s") self.can_round = False + # def parse(self, value: Any) -> Any: + # if isinstance(value, np.str_): + # value = str(value) + # return pd.Timedelta(value) + + # def to_float(self, value: pd.Timedelta): + # if isinstance(value, np.timedelta64): + # return value.astype("timedelta64[s]").astype(int) + # else: + # return float(value.value / 10**9) + def parse(self, value: Any) -> Any: if isinstance(value, np.str_): value = str(value) + if isinstance(value, str) and "-" in value: + return value return pd.Timedelta(value) def to_float(self, value: pd.Timedelta): if isinstance(value, np.timedelta64): return value.astype("timedelta64[s]").astype(int) else: - return float(value.value / 10**9) + if isinstance(value, str): + if "-" in value: + return value + return float(value) + else: + return float(value.value / 10**9) def from_float(self, value): return pd.Timedelta(int(value), unit="s") diff --git a/polytope_feature/datacube/transformations/datacube_type_change/datacube_type_change.py b/polytope_feature/datacube/transformations/datacube_type_change/datacube_type_change.py index 7975e8048..64de60d92 100644 --- a/polytope_feature/datacube/transformations/datacube_type_change/datacube_type_change.py +++ b/polytope_feature/datacube/transformations/datacube_type_change/datacube_type_change.py @@ -209,6 +209,9 @@ def transform_type(self, value): def make_str(self, value): return_vals = [] for val in value: + if isinstance(val, str) and "-" in val: + return_vals.append(val) + continue total_seconds = int(val.total_seconds()) hours, rem = divmod(total_seconds, 3600) minutes, seconds = divmod(rem, 60) diff --git a/polytope_feature/engine/hullslicer.py b/polytope_feature/engine/hullslicer.py index 63fb7cc7c..da2f45d9f 100644 --- a/polytope_feature/engine/hullslicer.py +++ b/polytope_feature/engine/hullslicer.py @@ -48,9 +48,16 @@ def _build_unsliceable_child(self, polytope, ax, node, datacube, lowers, next_no raise ValueError(errmsg) def find_values_between(self, polytope, ax, node, datacube, lower, upper): - tol = ax.tol - lower = ax.from_float(lower - tol) - upper = ax.from_float(upper + tol) + # tol = ax.tol + # lower = ax.from_float(lower - tol) + # upper = ax.from_float(upper + tol) + if isinstance(lower, str) and isinstance(upper, str): + pass + else: + tol = ax.tol + # print("WHAT IS LOW AND UP HERE ", lower, upper) + lower = ax.from_float(lower - tol) + upper = ax.from_float(upper + tol) flattened = node.flatten() method = polytope.method From f6a4516a2ff118ff54f283d3199fda8ceccd8531 Mon Sep 17 00:00:00 2001 From: mathleur Date: Tue, 2 Jun 2026 09:50:44 +0200 Subject: [PATCH 27/27] clean up --- polytope_feature/datacube/datacube_axis.py | 45 ------------------- .../datacube_type_change.py | 2 - polytope_feature/engine/hullslicer.py | 3 -- 3 files changed, 50 deletions(-) diff --git a/polytope_feature/datacube/datacube_axis.py b/polytope_feature/datacube/datacube_axis.py index 8af3da1e6..49cdb055f 100644 --- a/polytope_feature/datacube/datacube_axis.py +++ b/polytope_feature/datacube/datacube_axis.py @@ -150,40 +150,6 @@ def find_standard_indices_between(self, indexes, low, up, datacube, method=None) return filtered - # def find_standard_indices_between(self, indexes, low, up, datacube, method=None): - # indexes_between_ranges = [] - - # if self.name in datacube.complete_axes and self.name not in datacube.transformed_axes: - # # Find the range of indexes between lower and upper - # # https://pandas.pydata.org/docs/reference/api/pandas.Index.searchsorted.html - # # Assumes the indexes are already sorted (could sort to be sure) and monotonically increasing - # if method == "surrounding" or method == "nearest": - # start = indexes.searchsorted(low, "left") - # end = indexes.searchsorted(up, "right") - # start = max(start - 1, 0) - # end = min(end + 1, len(indexes)) - # indexes_between = indexes[start:end].to_list() - # indexes_between_ranges.extend(indexes_between) - # else: - # start = indexes.searchsorted(low, "left") - # end = indexes.searchsorted(up, "right") - # indexes_between = indexes[start:end].to_list() - # indexes_between_ranges.extend(indexes_between) - # else: - # if method == "surrounding" or method == "nearest": - # start = bisect.bisect_left(indexes, low) - # end = bisect.bisect_right(indexes, up) - # start = max(start - 1, 0) - # end = min(end + 1, len(indexes)) - # indexes_between = indexes[start:end] - # indexes_between_ranges.extend(indexes_between) - # else: - # lower_idx = bisect.bisect_left(indexes, low) - # upper_idx = bisect.bisect_right(indexes, up) - # indexes_between = indexes[lower_idx:upper_idx] - # indexes_between_ranges.extend(indexes_between) - # return indexes_between_ranges - def find_indices_between(self, indexes_ranges, low, up, datacube, method=None): indexes_between_ranges = self.find_standard_indices_between(indexes_ranges, low, up, datacube, method) for transformation in self.transformations[::-1]: @@ -321,17 +287,6 @@ def __init__(self): self.type = np.timedelta64(0, "s") self.can_round = False - # def parse(self, value: Any) -> Any: - # if isinstance(value, np.str_): - # value = str(value) - # return pd.Timedelta(value) - - # def to_float(self, value: pd.Timedelta): - # if isinstance(value, np.timedelta64): - # return value.astype("timedelta64[s]").astype(int) - # else: - # return float(value.value / 10**9) - def parse(self, value: Any) -> Any: if isinstance(value, np.str_): value = str(value) diff --git a/polytope_feature/datacube/transformations/datacube_type_change/datacube_type_change.py b/polytope_feature/datacube/transformations/datacube_type_change/datacube_type_change.py index 64de60d92..c623b13da 100644 --- a/polytope_feature/datacube/transformations/datacube_type_change/datacube_type_change.py +++ b/polytope_feature/datacube/transformations/datacube_type_change/datacube_type_change.py @@ -30,7 +30,6 @@ def change_val_type(self, axis_name, values): return_idx = [self._final_transformation.transform_type(val) for val in values] if None in return_idx: return None - # return_idx.sort() return_idx.sort(key=lambda x: (isinstance(x, str), x)) return return_idx @@ -243,7 +242,6 @@ def make_str(self, value): return_vals.append(f"{hours}h{minutes}m") else: return_vals.append(f"{hours}h{minutes}m{seconds}s") - # return return_vals unique_list = list(dict.fromkeys(return_vals)) return unique_list diff --git a/polytope_feature/engine/hullslicer.py b/polytope_feature/engine/hullslicer.py index da2f45d9f..f3f8f69bb 100644 --- a/polytope_feature/engine/hullslicer.py +++ b/polytope_feature/engine/hullslicer.py @@ -48,9 +48,6 @@ def _build_unsliceable_child(self, polytope, ax, node, datacube, lowers, next_no raise ValueError(errmsg) def find_values_between(self, polytope, ax, node, datacube, lower, upper): - # tol = ax.tol - # lower = ax.from_float(lower - tol) - # upper = ax.from_float(upper + tol) if isinstance(lower, str) and isinstance(upper, str): pass else: