diff --git a/ax/adapter/tests/test_data_utils.py b/ax/adapter/tests/test_data_utils.py index e38b0cdf1cd..f00f309df22 100644 --- a/ax/adapter/tests/test_data_utils.py +++ b/ax/adapter/tests/test_data_utils.py @@ -378,16 +378,16 @@ def test_extract_experiment_data_multiple_map(self) -> None: data_df[MAP_KEY].tolist(), [ 1000.0, - 1001.0, - 1002.0, - 1003.0, 0.0, + 1001.0, 1.0, + 1002.0, 2.0, + 1003.0, 3.0, 1000.0, - 1001.0, 0.0, + 1001.0, 1.0, ], ) diff --git a/ax/analysis/plotly/tests/test_marginal_effects.py b/ax/analysis/plotly/tests/test_marginal_effects.py index 25b299d5f77..107381844cd 100644 --- a/ax/analysis/plotly/tests/test_marginal_effects.py +++ b/ax/analysis/plotly/tests/test_marginal_effects.py @@ -43,7 +43,7 @@ def setUp(self) -> None: self.experiment.trials[i].mark_running(no_runner_required=True) self.experiment.attach_data( Data( - pd.DataFrame( + df=pd.DataFrame( { "trial_index": [i] * num_arms, "arm_name": [f"0_{j}" for j in range(num_arms)], diff --git a/ax/api/tests/test_client.py b/ax/api/tests/test_client.py index 232671b65ef..d02e5103b9c 100644 --- a/ax/api/tests/test_client.py +++ b/ax/api/tests/test_client.py @@ -540,9 +540,9 @@ def test_attach_data(self) -> None: "arm_name": {0: "0_0", 1: "0_0", 2: "0_0"}, "metric_name": {0: "foo", 1: "foo", 2: "bar"}, "metric_signature": {0: "foo", 1: "foo", 2: "bar"}, - "mean": {0: 2.0, 1: 1.0, 2: 2.0}, + "mean": {0: 1.0, 1: 2.0, 2: 2.0}, "sem": {0: np.nan, 1: np.nan, 2: np.nan}, - "step": {0: 10.0, 1: np.nan, 2: np.nan}, + "step": {0: np.nan, 1: 10.0, 2: np.nan}, } ) ), diff --git a/ax/benchmark/tests/test_benchmark_metric.py b/ax/benchmark/tests/test_benchmark_metric.py index 363d6a7ab37..7bd2ebd2e26 100644 --- a/ax/benchmark/tests/test_benchmark_metric.py +++ b/ax/benchmark/tests/test_benchmark_metric.py @@ -295,8 +295,15 @@ def _test_fetch_trial_multiple_time_steps_with_simulator(self, batch: bool) -> N ).drop(columns=drop_cols) if returns_full_data: self.assertEqual( - df_or_map_df[df_or_map_df["step"] == 0].to_dict(), - expected_df.to_dict(), + df_or_map_df[df_or_map_df["step"] == 0] + .sort_values(["trial_index", "arm_name", "metric_name", "step"]) + .reset_index(drop=True) + .to_dict(), + expected_df.sort_values( + ["trial_index", "arm_name", "metric_name", "step"] + ) + .reset_index(drop=True) + .to_dict(), ) else: self.assertEqual(df_or_map_df.to_dict(), expected_df.to_dict()) diff --git a/ax/core/base_trial.py b/ax/core/base_trial.py index 649c17c57e9..8e9a4d2853e 100644 --- a/ax/core/base_trial.py +++ b/ax/core/base_trial.py @@ -15,7 +15,7 @@ from typing import Any, TYPE_CHECKING from ax.core.arm import Arm -from ax.core.data import Data, sort_by_trial_index_and_arm_name +from ax.core.data import Data from ax.core.evaluations_to_data import raw_evaluations_to_data from ax.core.generator_run import GeneratorRun, GeneratorRunType from ax.core.metric import Metric, MetricFetchResult @@ -442,8 +442,6 @@ def fetch_data(self, metrics: list[Metric] | None = None, **kwargs: Any) -> Data data = Metric._unwrap_trial_data_multi( results=self.fetch_data_results(metrics=metrics, **kwargs) ) - if not data.has_step_column: - data.full_df = sort_by_trial_index_and_arm_name(data.full_df) return data diff --git a/ax/core/data.py b/ax/core/data.py index 0ebd6fdc01a..eb9ba84497a 100644 --- a/ax/core/data.py +++ b/ax/core/data.py @@ -8,13 +8,14 @@ from __future__ import annotations +import itertools from bisect import bisect_right from collections.abc import Iterable from copy import deepcopy from functools import cached_property from io import StringIO from logging import Logger -from typing import Any, TypeVar +from typing import Any import numpy as np import numpy.typing as npt @@ -34,11 +35,40 @@ logger: Logger = get_logger(__name__) -TData = TypeVar("TData", bound="Data") DF_REPR_MAX_LENGTH = 1000 MAP_KEY = "step" +class DataRow: + def __init__( + self, + trial_index: int, + arm_name: str, + metric_name: str, + metric_signature: str, + mean: float, + se: float, + step: float | None = None, + start_time: int | None = None, + end_time: int | None = None, + n: int | None = None, + ) -> None: + self.trial_index: int = trial_index + self.arm_name: str = arm_name + + self.metric_name: str = metric_name + self.metric_signature: str = metric_signature + + self.mean: float = mean + self.se: float = se + + self.step: float | None = step + + self.start_time: int | None = start_time + self.end_time: int | None = end_time + self.n: int | None = n + + class Data(Base, SerializationMixin): """Class storing numerical data for an experiment. @@ -102,8 +132,6 @@ class Data(Base, SerializationMixin): "start_time": pd.Timestamp, "end_time": pd.Timestamp, "n": int, - "frac_nonnull": np.float64, - "random_split": int, MAP_KEY: float, } @@ -116,16 +144,19 @@ class Data(Base, SerializationMixin): "metric_signature", ] - full_df: pd.DataFrame + _data_rows: list[DataRow] def __init__( - self: TData, + self, + data_rows: Iterable[DataRow] | None = None, df: pd.DataFrame | None = None, _skip_ordering_and_validation: bool = False, ) -> None: """Initialize a ``Data`` object from the given DataFrame. Args: + data_rows: Iterable of DataRows. If provided, this will be used as the + source of truth for Data, over df. df: DataFrame with underlying data, and required columns. Data must be unique at the level of ("trial_index", "arm_name", "metric_name"), plus "step" if a "step" column is present. A @@ -136,31 +167,94 @@ def __init__( Intended only for use in `Data.filter`, where the contents of the DataFrame are known to be ordered and valid. """ - if df is None: - # Initialize with barebones DF with expected dtypes - self.full_df = pd.DataFrame.from_dict( + if data_rows is not None: + self._data_rows = list(data_rows) + elif df is not None: + # Unroll the df into a list of DataRows + if missing_columns := self.REQUIRED_COLUMNS - {*df.columns}: + raise ValueError( + f"Dataframe must contain required columns {list(missing_columns)}." + ) + + self._data_rows = [ + DataRow( + # pyre-ignore[16] Intentional unsafe namedtuple access + trial_index=row.trial_index, + # pyre-ignore[16] Intentional unsafe namedtuple access + arm_name=row.arm_name, + # pyre-ignore[16] Intentional unsafe namedtuple access + metric_name=row.metric_name, + # pyre-ignore[16] Intentional unsafe namedtuple access + metric_signature=row.metric_signature, + # pyre-ignore[16] Intentional unsafe namedtuple access + mean=row.mean, + # pyre-ignore[16] Intentional unsafe namedtuple access + se=row.sem, + step=getattr(row, "step", None), + start_time=getattr(row, "start_time", None), + end_time=getattr(row, "end_time", None), + n=getattr(row, "n", None), + ) + # Using itertuples() instead of iterrows() for speed + for row in df.itertuples() + ] + else: + self._data_rows = [] + + self._memo_df: pd.DataFrame | None = None + self.has_step_column: bool = any( + row.step is not None for row in self._data_rows + ) + + @property + def empty(self) -> bool: + """Whether the data is empty.""" + return len(self._data_rows) == 0 + + @cached_property + def full_df(self) -> pd.DataFrame: + """ + Convert the DataRows into a pandas DataFrame. If step, start_time, or end_time + is None for all rows the column will be elided. + """ + if len(self._data_rows) == 0: + return pd.DataFrame.from_dict( { col: pd.Series([], dtype=self.COLUMN_DATA_TYPES[col]) for col in self.REQUIRED_COLUMNS } ) - elif _skip_ordering_and_validation: - self.full_df = df - else: - columns = set(df.columns) - missing_columns = self.REQUIRED_COLUMNS - columns - if missing_columns: - raise ValueError( - f"Dataframe must contain required columns {list(missing_columns)}." - ) - # Drop rows where every input is null. Since `dropna` can be slow, first - # check trial index to see if dropping nulls might be needed. - if df["trial_index"].isnull().any(): - df = df.dropna(axis=0, how="all", ignore_index=True) - df = self._safecast_df(df=df) - self.full_df = self._get_df_with_cols_in_expected_order(df=df) - self._memo_df = None - self.has_step_column = MAP_KEY in self.full_df.columns + + # Detect whether any of the optional attributes are present and should be + # included as columns in the full DataFrame. + include_step = any(row.step is not None for row in self._data_rows) + include_start_time = any(row.start_time is not None for row in self._data_rows) + include_end_time = any(row.end_time is not None for row in self._data_rows) + include_n = any(row.n is not None for row in self._data_rows) + + records = [ + { + "trial_index": row.trial_index, + "arm_name": row.arm_name, + "metric_name": row.metric_name, + "metric_signature": row.metric_signature, + "mean": row.mean, + "sem": row.se, + **({"step": row.step} if include_step else {}), + **({"start_time": row.start_time} if include_start_time else {}), + **({"end_time": row.end_time} if include_end_time else {}), + **({"n": row.n} if include_n else {}), + } + for row in self._data_rows + ] + + return sort_by_trial_index_and_arm_name( + df=self._get_df_with_cols_in_expected_order( + df=self._safecast_df( + df=pd.DataFrame.from_records(records), + ), + ) + ) @classmethod def _get_df_with_cols_in_expected_order(cls, df: pd.DataFrame) -> pd.DataFrame: @@ -175,7 +269,7 @@ def _get_df_with_cols_in_expected_order(cls, df: pd.DataFrame) -> pd.DataFrame: return df @classmethod - def _safecast_df(cls: type[TData], df: pd.DataFrame) -> pd.DataFrame: + def _safecast_df(cls, df: pd.DataFrame) -> pd.DataFrame: """Function for safely casting df to standard data types. Needed because numpy does not support NaNs in integer arrays. @@ -255,7 +349,7 @@ def df(self) -> pd.DataFrame: return self._memo_df # Case: Empty data - if self.full_df.empty: + if self.empty: return self.full_df idxs = ( @@ -275,14 +369,14 @@ def df(self) -> pd.DataFrame: return self._memo_df @classmethod - def from_multiple_data(cls: type[TData], data: Iterable[Data]) -> TData: + def from_multiple_data(cls, data: Iterable[Data]) -> Data: """Combines multiple objects into one (with the concatenated underlying dataframe). Args: data: Iterable of Ax objects of this class to combine. """ - dfs = [datum.full_df for datum in data if not datum.full_df.empty] + dfs = [datum.full_df for datum in data if not datum.empty] if len(dfs) == 0: return cls() @@ -302,7 +396,8 @@ def metric_names(self) -> set[str]: """Set of metric names that appear in the underlying dataframe of this object. """ - return set() if self.df.empty else set(self.df["metric_name"].values) + + return {row.metric_name for row in self._data_rows} @property def metric_signatures(self) -> set[str]: @@ -339,7 +434,7 @@ def filter( _skip_ordering_and_validation=True, ) - def clone(self: TData) -> TData: + def clone(self) -> Data: """Returns a new Data object with the same underlying dataframe.""" return self.__class__(df=deepcopy(self.full_df)) @@ -347,13 +442,13 @@ def __eq__(self, o: Data) -> bool: return type(self) is type(o) and dataframe_equals(self.full_df, o.full_df) def relativize( - self: TData, + self, status_quo_name: str = "status_quo", as_percent: bool = False, include_sq: bool = False, bias_correction: bool = True, control_as_constant: bool = False, - ) -> TData: + ) -> Data: """Relativize a data object w.r.t. a status_quo arm. Args: @@ -391,11 +486,8 @@ def relativize( @cached_property def trial_indices(self) -> set[int]: """Return the set of trial indices in the data.""" - if self._memo_df is not None: - # Use a smaller df if available - return set(self.df["trial_index"].unique()) - # If no small df is available, use the full df - return set(self.full_df["trial_index"].unique()) + + return {row.trial_index for row in self._data_rows} def latest(self, rows_per_group: int = 1) -> Data: """Return a new Data with the most recently observed `rows_per_group` @@ -437,12 +529,12 @@ def latest(self, rows_per_group: int = 1) -> Data: ) def subsample( - self: TData, + self, keep_every: int | None = None, limit_rows_per_group: int | None = None, limit_rows_per_metric: int | None = None, include_first_last: bool = True, - ) -> TData: + ) -> Data: """Return a new Data that subsamples the `MAP_KEY` column in an equally-spaced manner. This function considers only the relative ordering of the `MAP_KEY` values, making it most suitable when these values are @@ -504,6 +596,37 @@ def subsample( return self.__class__(df=subsampled_df) +def combine_data_rows_favoring_recent( + last_rows: Iterable[DataRow], new_rows: Iterable[DataRow] +) -> list[DataRow]: + """Combine last_rows and new_rows. + + Deduplicate in favor of new_rows when there are multiple observations with + the same "trial_index", "metric_name", "arm_name", and "step". + + Args: + last_rows: The rows of data currently attached to a trial + new_rows: A list of rows containing new data to be attached + """ + + deduped: dict[tuple[int, str, str, float | None], DataRow] = {} + + # Loop over all rows without creating a new list in memory + for row in itertools.chain(last_rows, new_rows): + # NaN must be treated specially since NaN != NaN + if row.step is not None and np.isnan(row.step): + step_key = None + else: + step_key = row.step + + key = (row.trial_index, row.metric_name, row.arm_name, step_key) + deduped[key] = row + + return list(deduped.values()) + + +# This function is only used in ax/storage and can be removed +# once storage is refactored to use DataRows. def combine_dfs_favoring_recent( last_df: pd.DataFrame, new_df: pd.DataFrame ) -> pd.DataFrame: diff --git a/ax/core/evaluations_to_data.py b/ax/core/evaluations_to_data.py index 3c442814401..56e894f5bd4 100644 --- a/ax/core/evaluations_to_data.py +++ b/ax/core/evaluations_to_data.py @@ -9,8 +9,7 @@ from collections.abc import Mapping from enum import Enum -import pandas as pd -from ax.core.data import Data, MAP_KEY +from ax.core.data import Data, DataRow from ax.core.types import FloatLike, SingleMetricData, TEvaluationOutcome from ax.exceptions.core import UserInputError @@ -67,19 +66,24 @@ def raw_evaluations_to_data( trial_index: Index of the trial, for which the evaluations are. data_type: An element of the ``DataType`` enum. """ - records = [] + data_rows: list[DataRow] = [] + metric_names_seen: set[str] = set() + for arm_name, evaluation in raw_data.items(): # TTrialEvaluation case ({metric_name -> (mean, SEM) or metric_name -> mean}) if isinstance(evaluation, dict): for metric_name, outcome in evaluation.items(): mean, sem = _validate_and_extract_single_metric_data(dat=outcome) - records.append( - { - "arm_name": arm_name, - "metric_name": metric_name, - "mean": mean, - "sem": sem, - } + metric_names_seen.add(metric_name) + data_rows.append( + DataRow( + trial_index=trial_index, + arm_name=arm_name, + metric_name=metric_name, + metric_signature=metric_name_to_signature.get(metric_name, ""), + mean=float(mean), + se=float(sem) if sem is not None else float("nan"), + ) ) elif isinstance(evaluation, list): # TMapTrialEvaluation case [(step, TTrialEvaluation)] @@ -91,14 +95,19 @@ def raw_evaluations_to_data( ) for metric_name, outcome in step_eval.items(): mean, sem = _validate_and_extract_single_metric_data(dat=outcome) - records.append( - { - "arm_name": arm_name, - "metric_name": metric_name, - "mean": mean, - "sem": sem, - MAP_KEY: step, - } + metric_names_seen.add(metric_name) + data_rows.append( + DataRow( + trial_index=trial_index, + arm_name=arm_name, + metric_name=metric_name, + metric_signature=metric_name_to_signature.get( + metric_name, "" + ), + mean=float(mean), + se=float(sem) if sem is not None else float("nan"), + step=float(step), + ) ) # SingleMetricData case: (mean, SEM) or mean else: @@ -111,17 +120,19 @@ def raw_evaluations_to_data( # pyre-fixme[6]: Incmopatible parameter type (Pyre doesn't know that # this is in fact a SingleMetricData) mean, sem = _validate_and_extract_single_metric_data(dat=evaluation) - records.append( - { - "arm_name": arm_name, - "metric_name": metric_name, - "mean": mean, - "sem": sem, - } + metric_names_seen.add(metric_name) + data_rows.append( + DataRow( + trial_index=trial_index, + arm_name=arm_name, + metric_name=metric_name, + metric_signature=metric_name_to_signature.get(metric_name, ""), + mean=float(mean), + se=float(sem) if sem is not None else float("nan"), + ) ) - df = pd.DataFrame.from_records(records) - metrics_missing_signatures = set(df["metric_name"].unique()) - set( + metrics_missing_signatures = metric_names_seen - set( metric_name_to_signature.keys() ) if len(metrics_missing_signatures) > 0: @@ -130,7 +141,5 @@ def raw_evaluations_to_data( "metric_name_to_signature. Please provide a mapping for all metric " "names present in the evaluations to their respective signatures." ) - df["metric_signature"] = df["metric_name"].map(metric_name_to_signature) - df["trial_index"] = trial_index - return Data(df=df) + return Data(data_rows=data_rows) diff --git a/ax/core/experiment.py b/ax/core/experiment.py index 2b09e4c460d..8f445a041b7 100644 --- a/ax/core/experiment.py +++ b/ax/core/experiment.py @@ -28,11 +28,7 @@ ) from ax.core.base_trial import BaseTrial from ax.core.batch_trial import BatchTrial -from ax.core.data import ( - combine_dfs_favoring_recent, - Data, - sort_by_trial_index_and_arm_name, -) +from ax.core.data import combine_data_rows_favoring_recent, Data from ax.core.generator_run import GeneratorRun from ax.core.metric import Metric, MetricFetchE, MetricFetchResult from ax.core.objective import MultiObjective @@ -893,11 +889,9 @@ def attach_data(self, data: Data, **kwargs: Any) -> None: raise ValueError( f"Unexpected arguments {unexpected_args} passed to `attach_data`." ) - if data.full_df.empty: + if data.empty: raise ValueError("Data to attach is empty.") - metrics_not_on_exp = set(data.full_df["metric_name"].values) - set( - self.metrics.keys() - ) + metrics_not_on_exp = data.metric_names - set(self.metrics.keys()) if metrics_not_on_exp: logger.debug( f"Attached data has some metrics ({metrics_not_on_exp}) that are " @@ -914,12 +908,11 @@ def attach_data(self, data: Data, **kwargs: Any) -> None: "because they have not been attached to the experiment." ) - new_df = combine_dfs_favoring_recent( - last_df=self.data.full_df, new_df=data.full_df + self.data = Data( + data_rows=combine_data_rows_favoring_recent( + last_rows=self.data._data_rows, new_rows=data._data_rows + ) ) - if not data.has_step_column: - new_df = sort_by_trial_index_and_arm_name(df=new_df) - self.data = Data(df=new_df) def attach_fetch_results( self, diff --git a/ax/core/tests/test_data.py b/ax/core/tests/test_data.py index 8567135a126..2b432e7ae9b 100644 --- a/ax/core/tests/test_data.py +++ b/ax/core/tests/test_data.py @@ -119,29 +119,22 @@ def get_test_dataframe() -> pd.DataFrame: ) -class TestDataBase(TestCase): - # Also run with has_step_column = True below - has_step_column: bool = False +class DataTest(TestCase): + """Tests for Data without a "step" column.""" def setUp(self) -> None: super().setUp() self.data_without_df = Data() - df = get_test_dataframe() - if not self.has_step_column: - self.df = df - self.data_with_df = Data(df=self.df) - else: - df_1 = df.copy().assign(**{MAP_KEY: 0}) - df_2 = df.copy().assign(**{MAP_KEY: 1}) - self.df = pd.concat((df_1, df_2)) - self.data_with_df = Data(df=self.df) - + self.df = get_test_dataframe() + self.data_with_df = Data(df=self.df) self.metric_name_to_signature = {"a": "a_signature", "b": "b_signature"} def test_init(self) -> None: + # Test equality self.assertEqual(self.data_without_df, self.data_without_df) self.assertEqual(self.data_with_df, self.data_with_df) + # Test accessing values df = self.data_with_df.df self.assertEqual( float(df[df["arm_name"] == "0_0"][df["metric_name"] == "a"]["mean"].item()), @@ -152,7 +145,14 @@ def test_init(self) -> None: 0.5, ) - self.assertEqual(self.data_with_df.has_step_column, self.has_step_column) + # Test has_step_column is False + self.assertFalse(self.data_with_df.has_step_column) + + # Test empty initialization + empty = Data() + self.assertTrue(empty.empty) + self.assertEqual(set(empty.full_df.columns), empty.REQUIRED_COLUMNS) + self.assertFalse(empty.has_step_column) def test_clone(self) -> None: data = self.data_with_df @@ -164,14 +164,9 @@ def test_clone(self) -> None: self.assertIsNot(data, data_clone) self.assertIsNot(data.df, data_clone.df) self.assertIsNone(data_clone._db_id) - if self.has_step_column: - self.assertIsNot(data.full_df, data_clone.full_df) - self.assertTrue(data.full_df.equals(data_clone.full_df)) def test_BadData(self) -> None: data = {"bad_field": "0_0", "bad_field_2": {"x": 0, "y": "a"}} - if self.has_step_column: - data[MAP_KEY] = "0" df = pd.DataFrame([data]) with self.assertRaisesRegex( ValueError, "Dataframe must contain required columns" @@ -184,25 +179,13 @@ def test_EmptyData(self) -> None: self.assertTrue(df.empty) self.assertTrue(Data.from_multiple_data([]).df.empty) - if data.has_step_column: - self.assertTrue(data.full_df.empty) - expected_columns = Data.REQUIRED_COLUMNS.union({MAP_KEY}) - else: - expected_columns = Data.REQUIRED_COLUMNS + expected_columns = Data.REQUIRED_COLUMNS self.assertEqual(set(df.columns), expected_columns) def test_from_multiple_with_generator(self) -> None: data = Data.from_multiple_data(self.data_with_df for _ in range(2)) self.assertEqual(len(data.full_df), 2 * len(self.data_with_df.full_df)) - self.assertEqual(data.has_step_column, self.has_step_column) - - def test_extra_columns(self) -> None: - value = 3 - extra_col_df = self.df.assign(foo=value) - data = Data(df=extra_col_df) - self.assertIn("foo", data.full_df.columns) - self.assertIn("foo", data.df.columns) - self.assertTrue((data.full_df["foo"] == value).all()) + self.assertFalse(data.has_step_column) def test_get_df_with_cols_in_expected_order(self) -> None: with self.subTest("Wrong order"): @@ -235,26 +218,6 @@ def test_trial_indices(self) -> None: set(self.data_with_df.full_df["trial_index"].unique()), ) - -class TestMapData(TestDataBase): - has_step_column = True - - -class DataTest(TestCase): - """Tests that are specific to Data without a "step" column.""" - - def setUp(self) -> None: - super().setUp() - self.df = get_test_dataframe() - self.metric_name_to_signature = {"a": "a_signature", "b": "b_signature"} - - def test_init(self) -> None: - # Initialize empty - empty = Data() - self.assertTrue(empty.full_df.empty) - self.assertEqual(set(empty.full_df.columns), empty.REQUIRED_COLUMNS) - self.assertFalse(empty.has_step_column) - def test_repr(self) -> None: self.assertEqual( str(Data(df=self.df)), @@ -263,13 +226,6 @@ def test_repr(self) -> None: with patch(f"{Data.__module__}.DF_REPR_MAX_LENGTH", 500): self.assertEqual(str(Data(df=self.df)), REPR_500) - def test_OtherClassInequality(self) -> None: - class CustomData(Data): - pass - - data = CustomData(df=self.df) - self.assertNotEqual(data, Data(self.df)) - def test_from_multiple(self) -> None: with self.subTest("Combinining non-empty Data"): data = Data.from_multiple_data([Data(df=self.df), Data(df=self.df)]) @@ -279,34 +235,6 @@ def test_from_multiple(self) -> None: data = Data.from_multiple_data([Data(), Data()]) self.assertEqual(data, Data()) - with self.subTest("Different types"): - - class CustomData(Data): - pass - - data = Data.from_multiple_data([CustomData(), CustomData()]) - self.assertEqual(data, Data()) - data = CustomData.from_multiple_data([Data(), CustomData()]) - self.assertEqual(data, CustomData()) - - def test_FromMultipleDataMismatchedTypes(self) -> None: - # create two custom data types - class CustomDataA(Data): - pass - - class CustomDataB(Data): - pass - - # Test using `Data.from_multiple_data` to combine non-Data types - data = Data.from_multiple_data([CustomDataA(), CustomDataB()]) - self.assertEqual(data, Data()) - - # multiple non-empty types - data_elt_A = CustomDataA(df=self.df) - data_elt_B = CustomDataB(df=self.df) - data = Data.from_multiple_data([data_elt_A, data_elt_B]) - self.assertEqual(len(data.full_df), 2 * len(self.df)) - def test_filter(self) -> None: data = Data(df=self.df) # Test that filter throws when we provide metric names and metric signatures diff --git a/ax/core/tests/test_experiment.py b/ax/core/tests/test_experiment.py index 9488ee8bbf3..77490ace093 100644 --- a/ax/core/tests/test_experiment.py +++ b/ax/core/tests/test_experiment.py @@ -673,7 +673,7 @@ def test_fetch_and_store_data(self) -> None: # Verify we do get the stored data if there are an unimplemented metrics. # Remove attached data for nonexistent metric. - exp.data.full_df = exp.data.full_df.loc[lambda x: x["metric_name"] != "z"] + exp.data = Data(df=exp.data.full_df.loc[lambda x: x["metric_name"] != "z"]) # Remove implemented metric that is `available_while_running` # (and therefore not pulled from cache). @@ -685,7 +685,9 @@ def test_fetch_and_store_data(self) -> None: looked_up_df = looked_up_data.full_df self.assertFalse((looked_up_df["metric_name"] == "z").any()) self.assertTrue( - batch.fetch_data().full_df.equals( + batch.fetch_data() + .full_df.sort_values(["arm_name", "metric_name"], ignore_index=True) + .equals( looked_up_df.loc[lambda x: (x["trial_index"] == 0)].sort_values( ["arm_name", "metric_name"], ignore_index=True ) @@ -953,7 +955,7 @@ def test_lookup_data(self) -> None: with self.subTest("Empty trial indices"): looked_up = exp.lookup_data(trial_indices=[]) self.assertIsInstance(looked_up, Data) - self.assertTrue(looked_up.full_df.empty) + self.assertTrue(looked_up.empty) def test_attach_and_sort_data(self) -> None: n = 4 diff --git a/ax/early_stopping/tests/test_strategies.py b/ax/early_stopping/tests/test_strategies.py index 4d6f4fdc611..e5751fdb946 100644 --- a/ax/early_stopping/tests/test_strategies.py +++ b/ax/early_stopping/tests/test_strategies.py @@ -1147,22 +1147,22 @@ def test_early_stopping_with_n_best_protection_handles_ties(self) -> None: """ # Create experiment with 5 trials exp = get_test_map_data_experiment(num_trials=5, num_fetches=3, num_complete=5) - data = exp.fetch_data() + data_df = exp.fetch_data().full_df # Manually set objective values to create ties # At progression=2, set trials 0, 1, 2 to have the same best value (30.0) # and trials 3, 4 to have worse values - progression_2_mask = (data.full_df["metric_name"] == "branin_map") & ( - data.full_df[MAP_KEY] == 2 + progression_2_mask = (data_df["metric_name"] == "branin_map") & ( + data_df[MAP_KEY] == 2 ) # Set values: trials 0, 1, 2 all have value 30.0 (tied for best) # trials 3, 4 have worse values 90.0, 95.0 for trial_idx, value in [(0, 30.0), (1, 30.0), (2, 30.0), (3, 90.0), (4, 95.0)]: - trial_mask = progression_2_mask & (data.full_df["trial_index"] == trial_idx) - data.full_df.loc[trial_mask, "mean"] = value + trial_mask = progression_2_mask & (data_df["trial_index"] == trial_idx) + data_df.loc[trial_mask, "mean"] = value - exp.attach_data(data=data) + exp.data = Data(df=data_df) # Use n_best_trials_to_complete=2, which is less than the 3 tied top trials # With rank-based logic, all 3 tied trials (rank=1) should be protected @@ -1540,13 +1540,13 @@ def test_early_stopping_with_unaligned_results(self) -> None: exp = get_test_map_data_experiment(num_trials=5, num_fetches=3, num_complete=5) # manually "unalign" timestamps to simulate real-world scenario # where each curve reports results at different steps - data = exp.fetch_data() + data_df = exp.fetch_data().full_df unaligned_timestamps = [0, 1, 4, 1, 2, 3, 1, 3, 4, 0, 1, 2, 0, 2, 4] - data.full_df.loc[data.full_df["metric_name"] == "branin_map", MAP_KEY] = ( + data_df.loc[data_df["metric_name"] == "branin_map", MAP_KEY] = ( unaligned_timestamps ) - exp.attach_data(data=data) + exp.data = Data(df=data_df) """ Dataframe after interpolation: diff --git a/ax/early_stopping/utils.py b/ax/early_stopping/utils.py index f044aab2792..d434f4d7369 100644 --- a/ax/early_stopping/utils.py +++ b/ax/early_stopping/utils.py @@ -248,7 +248,7 @@ def estimate_early_stopping_savings(experiment: Experiment) -> float: """ map_data = experiment.lookup_data() - if map_data.full_df.empty: + if map_data.empty: return 0 # Get max progression (resources used) for each trial resources_used = map_data.full_df.groupby("trial_index")[MAP_KEY].max() diff --git a/ax/metrics/tests/test_map_replay.py b/ax/metrics/tests/test_map_replay.py index d26059a14ef..51e23470b71 100644 --- a/ax/metrics/tests/test_map_replay.py +++ b/ax/metrics/tests/test_map_replay.py @@ -87,7 +87,7 @@ def test_map_replay_non_uniform(self) -> None: # After assinging steps, we have following steps for branin_map: # Trial 0: steps [0.25, 0.95] # Trial 1: steps [0.25, 1.0] - full_df[MAP_KEY] = pd.Series([0.25, 0.95, 0.0, 0.25, 1.0, 0.0]) + full_df[MAP_KEY] = pd.Series([0.25, 0.0, 0.95, 0.25, 0.0, 1.0]) historical_data = Data(df=full_df) replay_metric = MapDataReplayMetric( name="test_metric", diff --git a/ax/orchestration/tests/test_orchestrator.py b/ax/orchestration/tests/test_orchestrator.py index 1dcf56fc8ef..2cf3c7d0bf5 100644 --- a/ax/orchestration/tests/test_orchestrator.py +++ b/ax/orchestration/tests/test_orchestrator.py @@ -1359,9 +1359,7 @@ def test_orchestrator_with_metric_with_new_data_after_completion(self) -> None: return_value=timedelta(hours=1), ): orchestrator.run_all_trials() - self.assertFalse( - orchestrator.experiment.lookup_data(trial_indices={0}).full_df.empty - ) + self.assertFalse(orchestrator.experiment.lookup_data(trial_indices={0}).empty) def test_run_trials_in_batches(self) -> None: gs = self.two_sobol_steps_GS diff --git a/ax/plot/pareto_utils.py b/ax/plot/pareto_utils.py index 996c5c0fd99..a11af3c67dc 100644 --- a/ax/plot/pareto_utils.py +++ b/ax/plot/pareto_utils.py @@ -207,7 +207,7 @@ def get_observed_pareto_frontiers( ): # Make sure status quo is always included, for derelativization arm_names.append(experiment.status_quo.name) - data = Data(data.df[data.df["arm_name"].isin(arm_names)]) + data = Data(df=data.df[data.df["arm_name"].isin(arm_names)]) adapter = get_tensor_converter_adapter(experiment=experiment, data=data) pareto_observations = observed_pareto_frontier(adapter=adapter) # Convert to ParetoFrontierResults diff --git a/ax/plot/scatter.py b/ax/plot/scatter.py index a3ac1987146..27af4eb3c97 100644 --- a/ax/plot/scatter.py +++ b/ax/plot/scatter.py @@ -1731,7 +1731,7 @@ def tile_observations( if data is None: data = experiment.fetch_data() if arm_names is not None: - data = Data(data.df[data.df["arm_name"].isin(arm_names)]) + data = Data(df=data.df[data.df["arm_name"].isin(arm_names)]) m_ts = Generators.THOMPSON( data=data, search_space=experiment.search_space, diff --git a/ax/plot/tests/test_fitted_scatter.py b/ax/plot/tests/test_fitted_scatter.py index ef13bfd9016..9642dba9cd2 100644 --- a/ax/plot/tests/test_fitted_scatter.py +++ b/ax/plot/tests/test_fitted_scatter.py @@ -33,7 +33,7 @@ def test_fitted_scatter(self) -> None: model = Generators.BOTORCH_MODULAR( # Adapter kwargs experiment=exp, - data=Data.from_multiple_data([data, Data(df)]), + data=Data.from_multiple_data([data, Data(df=df)]), ) # Assert that each type of plot can be constructed successfully scalarized_metric_config = [ diff --git a/ax/plot/tests/test_pareto_utils.py b/ax/plot/tests/test_pareto_utils.py index bc97bbf3891..de2259a4f6d 100644 --- a/ax/plot/tests/test_pareto_utils.py +++ b/ax/plot/tests/test_pareto_utils.py @@ -107,7 +107,7 @@ def test_get_observed_pareto_frontiers(self) -> None: # For the check below, compute which arms are better than SQ df = experiment.fetch_data().df df["sem"] = np.nan - data = Data(df) + data = Data(df=df) sq_val = df[(df["arm_name"] == "status_quo") & (df["metric_name"] == "m1")][ "mean" ].values[0] diff --git a/ax/storage/json_store/tests/test_json_store.py b/ax/storage/json_store/tests/test_json_store.py index 00726651976..96d2c613007 100644 --- a/ax/storage/json_store/tests/test_json_store.py +++ b/ax/storage/json_store/tests/test_json_store.py @@ -705,10 +705,6 @@ def test_decode_map_data_backward_compatible(self) -> None: class_decoder_registry=CORE_CLASS_DECODER_REGISTRY, ) self.assertEqual(len(map_data.full_df), 2) - # Even though the "epoch" and "timestamps" columns have not been - # renamed to "step", they are present - self.assertEqual(map_data.full_df["epoch"].tolist(), [0.0, 1.0]) - self.assertEqual(map_data.full_df["timestamps"].tolist(), [3.0, 4.0]) self.assertIsInstance(map_data, Data) with self.subTest("Single map key"): @@ -729,8 +725,8 @@ def test_decode_map_data_backward_compatible(self) -> None: decoder_registry=CORE_DECODER_REGISTRY, class_decoder_registry=CORE_CLASS_DECODER_REGISTRY, ) - self.assertIn("epoch", map_data.full_df.columns) - self.assertEqual(map_data.full_df["epoch"].tolist(), [0.0, 1.0]) + self.assertEqual(len(map_data.full_df), 2) + self.assertIsInstance(map_data, Data) with self.subTest("No map key"): data_json = { diff --git a/ax/storage/sqa_store/utils.py b/ax/storage/sqa_store/utils.py index dfff1786dbf..3fc233ff15e 100644 --- a/ax/storage/sqa_store/utils.py +++ b/ax/storage/sqa_store/utils.py @@ -44,6 +44,7 @@ # don't need to recur into them during `copy_db_ids`. "auxiliary_experiments_by_purpose", "_metric_fetching_errors", + "_data_rows", } SKIP_ATTRS_ERROR_SUFFIX = "Consider adding to COPY_DB_IDS_ATTRS_TO_SKIP if appropriate." diff --git a/ax/utils/testing/core_stubs.py b/ax/utils/testing/core_stubs.py index 6d6dff0e638..6a33f38ceb3 100644 --- a/ax/utils/testing/core_stubs.py +++ b/ax/utils/testing/core_stubs.py @@ -2581,7 +2581,7 @@ def get_branin_data_batch( for i in range(len(means)) for metric in metrics ] - return Data(pd.DataFrame.from_records(records)) + return Data(df=pd.DataFrame.from_records(records)) def get_branin_data_multi_objective(