Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 8 additions & 15 deletions .github/workflows/openneuro-meg-loso.yml
Original file line number Diff line number Diff line change
Expand Up @@ -565,23 +565,16 @@ jobs:
if [[ -n "${OPENNEURO_API_KEY:-}" ]]; then
printf '%s\n' "$OPENNEURO_API_KEY" | openneuro-py login || true
fi
python -m neureptrace.openneuro_meg print-download-includes \
python -m neureptrace.openneuro_meg download-selected \
--dataset "$DATASET" \
--bids-root "$OPENNEURO_DATA_ROOT" \
--subjects "$OPENNEURO_SUBJECTS" \
--runs "$OPENNEURO_RUNS" > "$OPENNEURO_OUTPUT_DIR.includes.txt"
while IFS= read -r include_path; do
[[ -z "$include_path" ]] && continue
target_path="$OPENNEURO_DATA_ROOT/$include_path"
if [[ "$include_path" == *_meg.fif && -s "$target_path" ]]; then
echo "Using cached $include_path"
continue
fi
echo "Downloading $include_path"
openneuro-py download \
--dataset "$DATASET" \
--target-dir "$OPENNEURO_DATA_ROOT" \
--include "$include_path"
done < "$OPENNEURO_OUTPUT_DIR.includes.txt"
--runs "$OPENNEURO_RUNS" \
--include-manifest "$OPENNEURO_OUTPUT_DIR.includes.txt" \
--batch-size 24 \
--max-attempts 3 \
--max-concurrent-downloads 2 \
--metadata-timeout 30

- name: Re-check selected raw files
if: steps.staged_check.outputs.ready != 'true'
Expand Down
196 changes: 196 additions & 0 deletions src/neureptrace/openneuro_meg.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import argparse
import csv
import subprocess
from collections.abc import Iterable, Sequence
from dataclasses import dataclass
from pathlib import Path
Expand Down Expand Up @@ -188,6 +189,176 @@ def check_raw_files(dataset_id: str, *, bids_root: Path, subjects: str | Iterabl
return missing


def _chunked(values: Sequence[str], size: int) -> Iterable[tuple[str, ...]]:
if size <= 0:
raise ValueError("chunk size must be positive.")
for start in range(0, len(values), size):
yield tuple(values[start : start + size])


def selected_download_includes(
dataset_id: str,
*,
bids_root: Path,
subjects: str | Iterable[str | int] | None = None,
runs: str | Iterable[str] | None = None,
) -> tuple[str, ...]:
"""Return selected OpenNeuro include paths that still need downloading."""

includes = []
for relative_path in expected_relative_files(dataset_id, subjects=subjects, runs=runs):
local_path = bids_root / relative_path
if not local_path.is_file() or local_path.stat().st_size <= 0:
includes.append(relative_path)
return tuple(includes)


def _missing_include_paths(bids_root: Path, include_paths: Sequence[str]) -> tuple[str, ...]:
missing = []
for relative_path in include_paths:
local_path = bids_root / relative_path
if not local_path.is_file() or local_path.stat().st_size <= 0:
missing.append(relative_path)
return tuple(missing)


def _run_openneuro_download(
dataset_id: str,
*,
bids_root: Path,
include_paths: Sequence[str],
max_concurrent_downloads: int,
metadata_timeout: float,
) -> None:
command = [
"openneuro-py",
"download",
"--dataset",
dataset_id,
"--target-dir",
str(bids_root),
"--max-concurrent-downloads",
str(int(max_concurrent_downloads)),
"--metadata-timeout",
str(float(metadata_timeout)),
]
for include_path in include_paths:
command.extend(["--include", include_path])
print(f"Downloading OpenNeuro batch with {len(include_paths)} include path(s).")
subprocess.run(command, check=True)


def _download_include_batch_adaptive(
dataset_id: str,
*,
bids_root: Path,
include_paths: Sequence[str],
max_concurrent_downloads: int,
metadata_timeout: float,
) -> tuple[str, ...]:
missing_before = _missing_include_paths(bids_root, include_paths)
if not missing_before:
return ()

try:
_run_openneuro_download(
dataset_id,
bids_root=bids_root,
include_paths=missing_before,
max_concurrent_downloads=max_concurrent_downloads,
metadata_timeout=metadata_timeout,
)
except subprocess.CalledProcessError as exc:
print(f"OpenNeuro batch download failed with exit code {exc.returncode}; re-checking local files.")

missing_after = _missing_include_paths(bids_root, missing_before)
if not missing_after or len(missing_before) == 1:
return missing_after

downloaded_count = len(missing_before) - len(missing_after)
if downloaded_count > 0:
print(
f"OpenNeuro batch materialized {downloaded_count}/{len(missing_before)} requested file(s); "
"retrying the remaining files in smaller batches."
)
else:
print("OpenNeuro batch did not materialize requested files; retrying in smaller batches.")

split_size = max(1, len(missing_after) // 2)
remaining: list[str] = []
for chunk in _chunked(missing_after, split_size):
remaining.extend(
_download_include_batch_adaptive(
dataset_id,
bids_root=bids_root,
include_paths=chunk,
max_concurrent_downloads=max_concurrent_downloads,
metadata_timeout=metadata_timeout,
)
)
return tuple(remaining)


def download_selected_files(
dataset_id: str,
*,
bids_root: Path,
subjects: str | Iterable[str | int] | None = None,
runs: str | Iterable[str] | None = None,
include_manifest: Path | None = None,
batch_size: int = 24,
max_attempts: int = 3,
max_concurrent_downloads: int = 2,
metadata_timeout: float = 30.0,
) -> tuple[str, ...]:
"""Download selected OpenNeuro files in bounded batches.

Batching avoids hundreds of separate ``openneuro-py`` processes for large
ds004330 runs while preserving exact include-path selection.
"""

dataset_id = normalize_dataset_id(dataset_id)
bids_root.mkdir(parents=True, exist_ok=True)
all_includes = tuple(expected_relative_files(dataset_id, subjects=subjects, runs=runs))
if include_manifest is not None:
include_manifest.parent.mkdir(parents=True, exist_ok=True)
include_manifest.write_text("\n".join(all_includes) + ("\n" if all_includes else ""), encoding="utf-8")
pending = list(selected_download_includes(dataset_id, bids_root=bids_root, subjects=subjects, runs=runs))
if not pending:
print(f"All selected {dataset_id} files are already present under {bids_root}.")
return ()

batch_size = int(batch_size)
max_attempts = int(max_attempts)
if batch_size <= 0:
raise ValueError("batch_size must be positive.")
if max_attempts <= 0:
raise ValueError("max_attempts must be positive.")
if int(max_concurrent_downloads) <= 0:
raise ValueError("max_concurrent_downloads must be positive.")

for attempt in range(1, max_attempts + 1):
print(f"Downloading {len(pending)} selected {dataset_id} file(s), attempt {attempt}/{max_attempts}.")
for chunk in _chunked(pending, batch_size):
_download_include_batch_adaptive(
dataset_id,
bids_root=bids_root,
include_paths=chunk,
max_concurrent_downloads=int(max_concurrent_downloads),
metadata_timeout=float(metadata_timeout),
)
pending = list(selected_download_includes(dataset_id, bids_root=bids_root, subjects=subjects, runs=runs))
if not pending:
print(f"Downloaded all selected {dataset_id} files.")
return ()
print(f"{len(pending)} selected {dataset_id} file(s) remain after attempt {attempt}.")

print(f"Missing {len(pending)} selected {dataset_id} file(s) after download attempts:")
for relative_path in pending:
print(bids_root / relative_path)
return tuple(pending)


def invalid_raw_fif_files(
dataset_id: str,
*,
Expand Down Expand Up @@ -573,6 +744,16 @@ def main(argv: Sequence[str] | None = None) -> int:
check_parser.add_argument("--delete-invalid", action="store_true", help="Delete unreadable selected raw FIF files so they can be downloaded again.")
check_parser.set_defaults(func=_main_check_raw)

download_parser = subparsers.add_parser("download-selected", help="Download exactly the selected raw OpenNeuro files.")
_add_dataset_subject_run_args(download_parser)
download_parser.add_argument("--bids-root", type=Path, required=True)
download_parser.add_argument("--include-manifest", type=Path)
download_parser.add_argument("--batch-size", type=int, default=24)
download_parser.add_argument("--max-attempts", type=int, default=3)
download_parser.add_argument("--max-concurrent-downloads", type=int, default=2)
download_parser.add_argument("--metadata-timeout", type=float, default=30.0)
download_parser.set_defaults(func=_main_download_selected)

stage_parser = subparsers.add_parser("stage", help="Stage selected raw BIDS MEG files into per-subject MNE Epochs FIF files.")
_add_dataset_subject_run_args(stage_parser)
stage_parser.add_argument("--bids-root", type=Path, required=True)
Expand Down Expand Up @@ -635,6 +816,21 @@ def _main_check_raw(args) -> int:
return 1


def _main_download_selected(args) -> int:
missing = download_selected_files(
args.dataset,
bids_root=args.bids_root,
subjects=args.subjects,
runs=args.runs,
include_manifest=args.include_manifest,
batch_size=args.batch_size,
max_attempts=args.max_attempts,
max_concurrent_downloads=args.max_concurrent_downloads,
metadata_timeout=args.metadata_timeout,
)
return 0 if not missing else 1


def _main_stage(args) -> int:
include_labels = None
if args.include_labels:
Expand Down
79 changes: 79 additions & 0 deletions tests/test_openneuro_meg.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
_derive_metadata,
_drop_non_epochable_metadata,
_filter_metadata,
download_selected_files,
expected_relative_files,
invalid_raw_fif_files,
parse_runs,
parse_subjects,
run_files,
selected_download_includes,
subject_label,
)

Expand Down Expand Up @@ -83,6 +85,9 @@ def test_openneuro_workflow_exposes_every_configured_dataset():
assert "Resolve GitHub-hosted OpenNeuro cache keys" in workflow
assert "safe_cache_token" in workflow
assert "Cache staged OpenNeuro epochs on GitHub-hosted runners" in workflow
assert "download-selected" in workflow
assert "--batch-size 24" in workflow
assert "--max-concurrent-downloads 2" in workflow
assert "NeuRepTrace LOSO decode still running" in workflow
assert "Check selected staged epochs" in workflow
assert "raw-file check/download can be skipped" in workflow
Expand Down Expand Up @@ -164,6 +169,80 @@ def test_expected_relative_files_include_singsing_raw_and_events():
]


def test_selected_download_includes_skip_existing_files(tmp_path: Path):
existing = tmp_path / "sub-01" / "meg" / "sub-01_task-MMNHCS_run-0_events.tsv"
existing.parent.mkdir(parents=True)
existing.write_text("onset\tduration\ttrial_type\n", encoding="utf-8")

includes = selected_download_includes("ds006629", bids_root=tmp_path, subjects="1", runs="0")

assert includes == ("sub-01/meg/sub-01_task-MMNHCS_run-0_meg.fif",)


def test_download_selected_files_batches_openneuro_includes(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
commands: list[list[str]] = []

def fake_run(command, *, check):
assert check is True
commands.append(list(command))
target_dir = Path(command[command.index("--target-dir") + 1])
include_indices = [index + 1 for index, token in enumerate(command) if token == "--include"]
for index in include_indices:
path = target_dir / command[index]
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(b"ok")

monkeypatch.setattr("neureptrace.openneuro_meg.subprocess.run", fake_run)
manifest = tmp_path / "includes.txt"

missing = download_selected_files(
"ds006629",
bids_root=tmp_path,
subjects="1,2",
runs="0",
include_manifest=manifest,
batch_size=3,
max_attempts=1,
max_concurrent_downloads=2,
)

assert missing == ()
assert len(commands) == 2
assert all(command[:4] == ["openneuro-py", "download", "--dataset", "ds006629"] for command in commands)
assert all(command[command.index("--max-concurrent-downloads") + 1] == "2" for command in commands)
assert [command.count("--include") for command in commands] == [3, 1]
assert manifest.read_text(encoding="utf-8").splitlines() == expected_relative_files("ds006629", subjects="1,2", runs="0")


def test_download_selected_files_splits_partial_successful_batches(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
commands: list[list[str]] = []

def fake_run(command, *, check):
assert check is True
commands.append(list(command))
target_dir = Path(command[command.index("--target-dir") + 1])
include_indices = [index + 1 for index, token in enumerate(command) if token == "--include"]
path = target_dir / command[include_indices[-1]]
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(b"ok")

monkeypatch.setattr("neureptrace.openneuro_meg.subprocess.run", fake_run)

missing = download_selected_files(
"ds006629",
bids_root=tmp_path,
subjects="1,2",
runs="0",
batch_size=4,
max_attempts=1,
)

assert missing == ()
assert [command.count("--include") for command in commands] == [4, 1, 1, 1]
for relative_path in expected_relative_files("ds006629", subjects="1,2", runs="0"):
assert (tmp_path / relative_path).is_file()


def test_ds004276_word_metadata_joins_behavior_file(tmp_path: Path):
behavior = pd.DataFrame(
{
Expand Down