Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3b6584b
feat: read and write zarr methods to replace read_pickle
gampnico Apr 30, 2026
50cec31
feat: add support for zarr compatible downstream line
gampnico May 1, 2026
43fead9
refactor: rename pickle_jar to data_store
gampnico May 1, 2026
6988ffb
merge: merge branch 'master' into fix-pickle-doomsday
gampnico May 11, 2026
cf81e8a
fix: merge conflicts
gampnico May 11, 2026
b744fb2
feat: zarr-pickle interchange validation
gampnico May 18, 2026
18f7fa1
chore: minor type fixes
gampnico May 19, 2026
3c409b1
feat: framework for writing to zarr
gampnico May 19, 2026
7762443
fix: open datatree instead of zarr
gampnico May 20, 2026
25813ca
Merge pull request #2 from gampnico/ON-60-write-data-to-zarr
gampnico May 20, 2026
2b2b43f
fix: write storage mismatch
gampnico May 20, 2026
f502b66
merge: merge branch 'master' into ON-60-write-data-to-zarr
gampnico Jun 8, 2026
14fdfa4
feat: support write_store
gampnico Jun 9, 2026
e72f721
merge: merge branch 'master' into fix-pickle-doomsday
gampnico Jun 9, 2026
8123f9a
merge: merge branch 'ON-60-write-data-to-zarr' into fix-pickle-doomsday
gampnico Jun 9, 2026
b823991
fix: skip test which check for now-disabled warning
gampnico Jun 9, 2026
ea9f83e
refactor: support for pickles with multi-object lists
gampnico Jun 9, 2026
d3a2391
fix: add lock to file_downloader in run_prepro_levels
gampnico Jun 9, 2026
308a59d
fix: force zarr conversion
gampnico Jun 9, 2026
f46d1d3
merge: merge branch 'ON-63-tests-hang-when-calling-run_prepro_levels'…
gampnico Jun 10, 2026
fa56289
fix(tests): some failing tests
gampnico Jun 10, 2026
f44c535
refactor: replace rw methods for geometries
gampnico Jun 10, 2026
4691126
fix(tests): workaround for zarr async io messing up unittest-style tests
gampnico Jun 10, 2026
b911bdd
refactor: migrate entirely to read_store
gampnico Jun 10, 2026
4f55295
fix: zarr compatibility fixes
gampnico Jun 10, 2026
6f8bb18
fix(tests): multiprocessing fixes, zarr concurrency
gampnico Jun 11, 2026
e49a542
Merge pull request #3 from gampnico/merge-fix-pickle-ON-60-ON-63
gampnico Jun 11, 2026
c8bb654
merge: Merge pull request #4 from gampnico/ON-60-write-data-to-zarr
gampnico Jun 11, 2026
3010cc6
fix(workflow): replacement zarr groups
gampnico Jun 11, 2026
530401e
merge: merge pull request #5 from gampnico/merge-fix-pickle-ON-60-ON-63
gampnico Jun 11, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/_code/prepare_hef.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
df = workflow.calibrate_inversion_from_consensus(gdir, volume_m3_reference=refv)
np.testing.assert_allclose(refv, df.vol_oggm_m3, rtol=0.01)

cl = gdir.read_pickle('inversion_input')[-1]
cl = gdir.read_store('inversion_input')[-1]
mbmod = massbalance.ConstantMassBalance(gdir, y0=1985)
mbx = mbmod.get_annual_mb(cl['hgt']) * cfg.SEC_IN_YEAR * cfg.PARAMS['ice_density']
fdf = pd.DataFrame(index=np.arange(len(mbx))*cl['dx'])
Expand Down
2 changes: 1 addition & 1 deletion docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ level 5.
prepro_border=80)
gdir = gdirs[0] # init_glacier_directories always returns a list

fls = gdir.read_pickle('model_flowlines')
fls = gdir.read_store('model_flowlines')
fls
[fl.order for fl in fls]

Expand Down
4 changes: 4 additions & 0 deletions oggm/cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ def _log_param_change(self, key, value):
_doc = "The outlines of this glacier complex sub-entities (for RGI7C only!)."
BASENAMES['complex_sub_entities'] = ('complex_sub_entities.shp', _doc)

_doc = "A zarr store containing previous pickled data."
BASENAMES['data_store'] = ('data_store.zarr', _doc)



def set_logging_config(logging_level='INFO'):
"""Set the global logger parameters.
Expand Down
5 changes: 3 additions & 2 deletions oggm/cli/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ def run_benchmark(rgi_version=None, rgi_reg=None, border=None,
# Initialize OGGM and set up the run parameters
cfg.initialize(logging_level=logging_level, params=override_params)

# Use multiprocessing?
cfg.PARAMS['use_multiprocessing'] = platform.system() != 'Darwin'
# Allow multiprocessing override in tests/platform
if 'use_multiprocessing' not in override_params:
cfg.PARAMS['use_multiprocessing'] = platform.system() != 'Darwin'

# How many grid points around the glacier?
# Make it large if you expect your glaciers to grow large
Expand Down
33 changes: 19 additions & 14 deletions oggm/cli/prepro_levels.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,27 +392,32 @@ def _time_log():

# Try to avoid concurrency
if rgi_version == '70C':
fp = file_downloader('https://cluster.klima.uni-bremen.de/~oggm/'
'ref_mb_params/oggm_v1.6/inv_rgi7/'
'rgi7c_rgi_year_2025.1.csv')
rgi_year_by_id = pd.read_csv(fp, index_col=0)['rgi_year'].astype(int).astype(str)
rgidf['src_date'] = rgidf['rgi_id'].map(rgi_year_by_id) + '-01-01 00:00:00'
from oggm.utils._downloads import get_lock
with get_lock():
fp = file_downloader('https://cluster.klima.uni-bremen.de/~oggm/'
'ref_mb_params/oggm_v1.6/inv_rgi7/'
'rgi7c_rgi_year_2025.1.csv')
rgi_year_by_id = pd.read_csv(fp, index_col=0)['rgi_year'].astype(int).astype(str)
rgidf['src_date'] = rgidf['rgi_id'].map(rgi_year_by_id) + '-01-01 00:00:00'

# Add a new default source
if not dem_source:
fs_url = 'https://cluster.klima.uni-bremen.de/~oggm/gdirs/oggm_v1.6/rgitopo/2025.4/'
if rgi_version == '62':
fs = utils.file_downloader(fs_url + 'chosen_dem_RGI62_20251029.csv')
dfs = pd.read_csv(fs, index_col=0)
rgidf['dem_source'] = dfs.loc[rgidf['RGIId'], 'dem_source'].values
with get_lock():
fs = utils.file_downloader(fs_url + 'chosen_dem_RGI62_20251029.csv')
dfs = pd.read_csv(fs, index_col=0)
rgidf['dem_source'] = dfs.loc[rgidf['RGIId'], 'dem_source'].values
if rgi_version == '70G':
fs = utils.file_downloader(fs_url + 'chosen_dem_RGI70G_20251029.csv')
dfs = pd.read_csv(fs, index_col=0)
rgidf['dem_source'] = dfs.loc[rgidf['rgi_id'], 'dem_source'].values
with get_lock():
fs = utils.file_downloader(fs_url + 'chosen_dem_RGI70G_20251029.csv')
dfs = pd.read_csv(fs, index_col=0)
rgidf['dem_source'] = dfs.loc[rgidf['rgi_id'], 'dem_source'].values
if rgi_version == '70C':
fs = utils.file_downloader(fs_url + 'chosen_dem_RGI70C_20251029.csv')
dfs = pd.read_csv(fs, index_col=0)
rgidf['dem_source'] = dfs.loc[rgidf['rgi_id'], 'dem_source'].values
with get_lock():
fs = utils.file_downloader(fs_url + 'chosen_dem_RGI70C_20251029.csv')
dfs = pd.read_csv(fs, index_col=0)
rgidf['dem_sourc`e'] = dfs.loc[rgidf['rgi_id'], 'dem_source'].values

# L0 - go
if start_level == 0:
Expand Down
62 changes: 32 additions & 30 deletions oggm/core/centerlines.py
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ def compute_centerlines(gdir, heads=None):
raise InvalidParamsError('`force_one_flowline` is deprecated')

# open
geom = gdir.read_pickle('geometries')
geom = gdir.read_store('geometries')
grids_file = gdir.get_filepath('gridded_data')
with utils.ncDataset(grids_file) as nc:
# Variables
Expand Down Expand Up @@ -974,7 +974,7 @@ def compute_centerlines(gdir, heads=None):
'found!'.format(gdir.rgi_id))

# Write the data
gdir.write_pickle(cls, 'centerlines')
gdir.write_store(cls, 'centerlines')

if is_first_call:
# For diagnostics of filtered centerlines
Expand Down Expand Up @@ -1009,7 +1009,7 @@ def compute_downstream_line(gdir):
# Look for the starting points
try:
# Normal OGGM flowlines
p = gdir.read_pickle('centerlines')[-1].tail
p = gdir.read_store('centerlines')[-1].tail
head = (int(p.y), int(p.x))
except FileNotFoundError:
# Squeezes lines
Expand Down Expand Up @@ -1050,7 +1050,7 @@ def compute_downstream_line(gdir):
if line is None:
raise GeometryError('Downstream line not found')

cl = gdir.read_pickle('inversion_flowlines')[-1]
cl = gdir.read_store('inversion_flowlines')[-1]
if cl.line is not None:
# normal OGGM lines
lline, dline = _line_extend(cl.line, line, cl.dx)
Expand All @@ -1060,7 +1060,7 @@ def compute_downstream_line(gdir):
_, dline = _line_extend(shpg.LineString(), line, cl.dx)
out = dict(full_line=None, downstream_line=dline)

gdir.write_pickle(out, 'downstream_line')
gdir.write_store(out, 'downstream_line')


def _approx_parabola(x, y, y0=0):
Expand Down Expand Up @@ -1282,8 +1282,8 @@ def compute_downstream_bedshape(gdir):
return

# We make a flowline out of the downstream for simplicity
tpl = gdir.read_pickle('inversion_flowlines')[-1]
cl = gdir.read_pickle('downstream_line')['downstream_line']
tpl = gdir.read_store('inversion_flowlines')[-1]
cl = gdir.read_store('downstream_line')['downstream_line']
cl = Centerline(cl, dx=tpl.dx, map_dx=gdir.grid.dx)

# Topography
Expand Down Expand Up @@ -1316,11 +1316,13 @@ def compute_downstream_bedshape(gdir):
assert np.all(w0s >= w0_min), 'np.all(w0s >= w0_min)'

# write output
out = gdir.read_pickle('downstream_line')
out = gdir.read_store('downstream_line')
out['bedshapes'] = bs
out['surface_h'] = hgts
out['w0s'] = w0s
gdir.write_pickle(out, 'downstream_line')
from oggm.utils.geozarr import convert_pickles_to_datatree
convert_pickles_to_datatree({'downstream_line': gdir.read_store('downstream_line')})
gdir.write_store(out, 'downstream_line')


def _mask_to_polygon(mask, gdir=None):
Expand Down Expand Up @@ -1562,8 +1564,8 @@ def catchment_area(gdir):
"""

# Variables
cls = gdir.read_pickle('centerlines')
geom = gdir.read_pickle('geometries')
cls = gdir.read_store('centerlines')
geom = gdir.read_store('geometries')
glacier_pix = geom['polygon_pix']
fpath = gdir.get_filepath('gridded_data')
with utils.ncDataset(fpath) as nc:
Expand All @@ -1576,7 +1578,7 @@ def catchment_area(gdir):
if len(cls) == 1:
cl_catchments = [np.array(np.nonzero(glacier_mask == 1)).T]
geom['catchment_indices'] = cl_catchments
gdir.write_pickle(geom, 'geometries')
gdir.write_store(geom, 'geometries', use_pickle=True)
return

# Cost array
Expand Down Expand Up @@ -1659,7 +1661,7 @@ def catchment_area(gdir):

# Write the data
geom['catchment_indices'] = cl_catchments
gdir.write_pickle(geom, 'geometries')
gdir.write_store(geom, 'geometries', use_pickle=True)


@entity_task(log, writes=['flowline_catchments', 'catchments_intersects'])
Expand All @@ -1676,7 +1678,7 @@ def catchment_intersections(gdir):
where to write the data
"""

catchment_indices = gdir.read_pickle('geometries')['catchment_indices']
catchment_indices = gdir.read_store('geometries')['catchment_indices']

# Loop over the lines
mask = np.zeros((gdir.grid.ny, gdir.grid.nx))
Expand Down Expand Up @@ -1722,7 +1724,7 @@ def initialize_flowlines(gdir):
"""

# variables
cls = gdir.read_pickle('centerlines')
cls = gdir.read_store('centerlines')

# Initialise the flowlines
dx = cfg.PARAMS['flowline_dx']
Expand Down Expand Up @@ -1811,7 +1813,7 @@ def initialize_flowlines(gdir):
fl.set_flows_to(fls[cls.index(cl.flows_to)])

# Write the data
gdir.write_pickle(fls, 'inversion_flowlines')
gdir.write_store(fls, 'inversion_flowlines')
gdir.add_to_diagnostics('flowline_type', 'centerlines')
if do_filter:
out = diag_n_bad_slopes/diag_n_pix
Expand All @@ -1831,8 +1833,8 @@ def catchment_width_geom(gdir):
"""

# variables
flowlines = gdir.read_pickle('inversion_flowlines')
catchment_indices = gdir.read_pickle('geometries')['catchment_indices']
flowlines = gdir.read_store('inversion_flowlines')
catchment_indices = gdir.read_store('geometries')['catchment_indices']

# Topography is to filter the unrealistic lines afterwards.
# I take the non-smoothed topography
Expand Down Expand Up @@ -1928,8 +1930,8 @@ def catchment_width_geom(gdir):
fl.geometrical_widths = wlines
fl.is_rectangular = is_rectangular

# Overwrite pickle
gdir.write_pickle(flowlines, 'inversion_flowlines')
# Overwrite
gdir.write_store(flowlines, 'inversion_flowlines')


@entity_task(log, writes=['inversion_flowlines'])
Expand All @@ -1949,8 +1951,8 @@ def catchment_width_correction(gdir):
"""

# variables
fls = gdir.read_pickle('inversion_flowlines')
catchment_indices = gdir.read_pickle('geometries')['catchment_indices']
fls = gdir.read_store('inversion_flowlines')
catchment_indices = gdir.read_store('geometries')['catchment_indices']

# Topography for altitude-area distribution
# I take the non-smoothed topography and remove the borders
Expand Down Expand Up @@ -2077,8 +2079,8 @@ def catchment_width_correction(gdir):
for fl in fls:
fl.widths *= fac

# Overwrite centerlines
gdir.write_pickle(fls, 'inversion_flowlines')
# Overwrite
gdir.write_store(fls, 'inversion_flowlines')


@entity_task(log, writes=['inversion_flowlines'])
Expand All @@ -2099,7 +2101,7 @@ def terminus_width_correction(gdir, new_width=None):
"""

# variables
fls = gdir.read_pickle('inversion_flowlines')
fls = gdir.read_store('inversion_flowlines')
fl = fls[-1]
mapdx = gdir.grid.dx

Expand All @@ -2120,8 +2122,8 @@ def terminus_width_correction(gdir, new_width=None):
width[:-5] = width[:-5] * cor_factor
fl.widths = width

# Overwrite centerlines
gdir.write_pickle(fls, 'inversion_flowlines')
# Overwrite
gdir.write_store(fls, 'inversion_flowlines')


def intersect_downstream_lines(gdir, candidates=None):
Expand Down Expand Up @@ -2152,7 +2154,7 @@ def intersect_downstream_lines(gdir, candidates=None):
buffer = cfg.PARAMS['kbuffer']

# get main glacier downstream line and CRS
dline = gdir.read_pickle('downstream_line')['full_line']
dline = gdir.read_store('downstream_line')['full_line']
crs = gdir.grid

# return list
Expand All @@ -2165,7 +2167,7 @@ def intersect_downstream_lines(gdir, candidates=None):
continue

# get tributary glacier downstream line and CRS
_dline = trib.read_pickle('downstream_line')['full_line']
_dline = trib.read_store('downstream_line')['full_line']
_crs = trib.grid

# use salem to transform the grids
Expand Down Expand Up @@ -2449,5 +2451,5 @@ def fixed_dx_elevation_band_flowline(gdir, bin_variables=None,
fl.is_rectangular[-5:] = True
fl.is_trapezoid[-5:] = False

gdir.write_pickle([fl], 'inversion_flowlines')
gdir.write_store([fl], 'inversion_flowlines')
gdir.add_to_diagnostics('flowline_type', 'elevation_band')
19 changes: 15 additions & 4 deletions oggm/core/dynamic_spinup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import copy
import os
import shutil
import warnings

# External libs
Expand Down Expand Up @@ -259,7 +260,7 @@ def run_dynamic_spinup(gdir, init_model_filesuffix=None, init_model_yr=None,
init_model_fls = fmod.fls

if init_model_fls is None:
fls_spinup = gdir.read_pickle('model_flowlines',
fls_spinup = gdir.read_store('model_flowlines',
filesuffix=model_flowline_filesuffix)
else:
fls_spinup = copy.deepcopy(init_model_fls)
Expand Down Expand Up @@ -1118,7 +1119,7 @@ def dynamic_melt_f_run_with_dynamic_spinup(
# ATTENTION: it is assumed that the flowlines in gdir have the volume
# we want to match during calibrate_inversion_from_consensus when we
# set_local_variables
fls_ref = gdir.read_pickle('model_flowlines')
fls_ref = gdir.read_store('model_flowlines')
local_variables['vol_m3_ref'] = np.sum([f.volume_m3 for f in fls_ref])

# we are done with preparing the local_variables for the upcoming iterations
Expand Down Expand Up @@ -1150,7 +1151,7 @@ def dynamic_melt_f_run_with_dynamic_spinup(
np.all(getattr(fl_prov, 'bed_h') ==
getattr(fl_orig, 'bed_h'))
for fl_prov, fl_orig in
zip(fls_init, gdir.read_pickle('model_flowlines'))]):
zip(fls_init, gdir.read_store('model_flowlines'))]):
raise InvalidWorkflowError('If you want to perform a dynamic '
'melt_f calibration including an '
'inversion, it is not possible to '
Expand Down Expand Up @@ -1400,6 +1401,16 @@ def dynamic_melt_f_run_with_dynamic_spinup_fallback(
'model_flowlines_dyn_melt_f_calib.pkl')):
os.remove(os.path.join(gdir.dir,
'model_flowlines_dyn_melt_f_calib.pkl'))
zarr_fp = gdir.get_filepath("data_store").replace(".pkl", ".zarr")
zarr_group = os.path.join(zarr_fp, "model_flowlines__dyn_melt_f_calib")
if os.path.exists(zarr_group):
shutil.rmtree(zarr_group)
try:
import zarr as _zarr

_zarr.consolidate_metadata(zarr_fp)
except Exception:
pass

if target_yr is None:
target_yr = gdir.rgi_date + 1 # + 1 converted to hydro years
Expand Down Expand Up @@ -1869,7 +1880,7 @@ def run_dynamic_melt_f_calibration(
init_model_fls = fmod.fls

if init_model_fls is None:
fls_init = gdir.read_pickle('model_flowlines')
fls_init = gdir.read_store('model_flowlines')
else:
fls_init = copy.deepcopy(init_model_fls)

Expand Down
Loading
Loading