diff --git a/docs/faq/question_08_unconstrained.md b/docs/faq/question_08_unconstrained.md new file mode 100644 index 000000000..aea197ee3 --- /dev/null +++ b/docs/faq/question_08_unconstrained.md @@ -0,0 +1,41 @@ +# Using the logit transformation +If you've ruled out simulator issues, you can try +training your density or ratio estimator in an unbounded space +using a logit transformation: + +- **For NPE**: The transformation maps bounded parameters θ +to unbounded space before training, then applies the inverse (sigmoid) +after training to ensure posterior samples stay within prior bounds. + +- **For NLE/NRE**: The transformation would need to map bounded +data x to unbounded space, which requires estimating data bounds +from simulations (more complex). + +To enable this for NPE: + +```python +density_estimator_build_fun = posterior_nn( + model="zuko_nsf", + hidden_features=60, + num_transforms=3, + z_score_theta="transform_to_unconstrained" # Transforms parameters to unconstrained space + x_dist=prior # For NPE, this specifies bounds for parameters (internally called 'x') +) +inference = NPE(prior, density_estimator=density_estimator_build_fun) +``` + +This ensures that your density estimator operates in a +transformed space where it respects prior bounds, +improving the efficiency of rejection sampling. + +Note: The `x_dist=prior` might seem confusing - internally, +sbi uses generic `x,y` notation where for NPE, `x` represents +parameters (θ) and `y` represents data. +This is why we pass the prior as `x_dist`. + +Important: + +- This transformation is currently only supported for zuko density estimators. +- For **NLE/NRE**, setting up this transformation is more +complex as it requires estimating bounds for the simulated data +rather than using prior bounds. diff --git a/sbi/neural_nets/net_builders/flow.py b/sbi/neural_nets/net_builders/flow.py index cb6a8b930..9014a266b 100644 --- a/sbi/neural_nets/net_builders/flow.py +++ b/sbi/neural_nets/net_builders/flow.py @@ -2,7 +2,7 @@ # under the Apache License Version 2.0, see from functools import partial -from typing import List, Optional, Sequence, Union +from typing import List, Literal, Optional, Sequence, Tuple, Union import torch import zuko @@ -13,10 +13,14 @@ rational_quadratic, # pyright: ignore[reportAttributeAccessIssue] ) from torch import Tensor, nn, relu, tanh, tensor, uint8 +from torch.distributions import Distribution +from zuko.lazy import Flow, LazyDistribution from sbi.neural_nets.estimators import NFlowsFlow, ZukoFlow, ZukoUnconditionalFlow from sbi.utils.nn_utils import MADEMoGWrapper, get_numel from sbi.utils.sbiutils import ( + biject_transform_zuko, + mcmc_transform, standardizing_net, standardizing_transform, standardizing_transform_zuko, @@ -31,8 +35,12 @@ def build_made( batch_x: Tensor, batch_y: Tensor, - z_score_x: Optional[str] = "independent", - z_score_y: Optional[str] = "independent", + z_score_x: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", + z_score_y: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", hidden_features: int = 50, num_mixture_components: int = 10, embedding_net: nn.Module = nn.Identity(), @@ -71,11 +79,7 @@ def build_made( transform_zx = standardizing_transform(batch_x, structured_x) transform = transforms.CompositeTransform([transform_zx, transform]) - z_score_y_bool, structured_y = z_score_parser(z_score_y) - if z_score_y_bool: - embedding_net = nn.Sequential( - standardizing_net(batch_y, structured_y), embedding_net - ) + embedding_net = _prepare_y_embedding(z_score_y, batch_y, embedding_net) distribution = MADEMoGWrapper( features=x_numel, @@ -102,8 +106,12 @@ def build_made( def build_maf( batch_x: Tensor, batch_y: Tensor, - z_score_x: Optional[str] = "independent", - z_score_y: Optional[str] = "independent", + z_score_x: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", + z_score_y: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", hidden_features: int = 50, num_transforms: int = 5, embedding_net: nn.Module = nn.Identity(), @@ -169,11 +177,7 @@ def build_maf( standardizing_transform(batch_x, structured_x) ] + transform_list - z_score_y_bool, structured_y = z_score_parser(z_score_y) - if z_score_y_bool: - embedding_net = nn.Sequential( - standardizing_net(batch_y, structured_y), embedding_net - ) + embedding_net = _prepare_y_embedding(z_score_y, batch_y, embedding_net) # Combine transforms transform = transforms.CompositeTransform(transform_list) @@ -190,8 +194,12 @@ def build_maf( def build_maf_rqs( batch_x: Tensor, batch_y: Tensor, - z_score_x: Optional[str] = "independent", - z_score_y: Optional[str] = "independent", + z_score_x: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", + z_score_y: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", hidden_features: int = 50, num_transforms: int = 5, embedding_net: nn.Module = nn.Identity(), @@ -281,11 +289,7 @@ def build_maf_rqs( standardizing_transform(batch_x, structured_x) ] + transform_list - z_score_y_bool, structured_y = z_score_parser(z_score_y) - if z_score_y_bool: - embedding_net = nn.Sequential( - standardizing_net(batch_y, structured_y), embedding_net - ) + embedding_net = _prepare_y_embedding(z_score_y, batch_y, embedding_net) # Combine transforms. transform = transforms.CompositeTransform(transform_list) @@ -302,8 +306,12 @@ def build_maf_rqs( def build_nsf( batch_x: Tensor, batch_y: Tensor, - z_score_x: Optional[str] = "independent", - z_score_y: Optional[str] = "independent", + z_score_x: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", + z_score_y: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", hidden_features: int = 50, num_transforms: int = 5, num_bins: int = 10, @@ -402,12 +410,7 @@ def mask_in_layer(i): standardizing_transform(batch_x, structured_x) ] + transform_list - z_score_y_bool, structured_y = z_score_parser(z_score_y) - if z_score_y_bool: - # Prepend standardizing transform to y-embedding. - embedding_net = nn.Sequential( - standardizing_net(batch_y, structured_y), embedding_net - ) + embedding_net = _prepare_y_embedding(z_score_y, batch_y, embedding_net) distribution = get_base_dist(x_numel, **kwargs) @@ -424,8 +427,12 @@ def mask_in_layer(i): def build_zuko_nice( batch_x: Tensor, batch_y: Tensor, - z_score_x: Optional[str] = "independent", - z_score_y: Optional[str] = "independent", + z_score_x: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", + z_score_y: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", hidden_features: Union[Sequence[int], int] = 50, num_transforms: int = 5, embedding_net: nn.Module = nn.Identity(), @@ -479,8 +486,12 @@ def build_zuko_nice( def build_zuko_maf( batch_x: Tensor, batch_y: Tensor, - z_score_x: Optional[str] = "independent", - z_score_y: Optional[str] = "independent", + z_score_x: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", + z_score_y: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", hidden_features: Union[Sequence[int], int] = 50, num_transforms: int = 5, embedding_net: nn.Module = nn.Identity(), @@ -531,8 +542,12 @@ def build_zuko_maf( def build_zuko_nsf( batch_x: Tensor, batch_y: Tensor, - z_score_x: Optional[str] = "independent", - z_score_y: Optional[str] = "independent", + z_score_x: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", + z_score_y: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", hidden_features: Union[Sequence[int], int] = 50, num_transforms: int = 5, embedding_net: nn.Module = nn.Identity(), @@ -592,8 +607,12 @@ def build_zuko_nsf( def build_zuko_ncsf( batch_x: Tensor, batch_y: Tensor, - z_score_x: Optional[str] = "independent", - z_score_y: Optional[str] = "independent", + z_score_x: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", + z_score_y: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", hidden_features: Union[Sequence[int], int] = 50, num_transforms: int = 5, embedding_net: nn.Module = nn.Identity(), @@ -648,8 +667,12 @@ def build_zuko_ncsf( def build_zuko_sospf( batch_x: Tensor, batch_y: Tensor, - z_score_x: Optional[str] = "independent", - z_score_y: Optional[str] = "independent", + z_score_x: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", + z_score_y: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", hidden_features: Union[Sequence[int], int] = 50, num_transforms: int = 5, embedding_net: nn.Module = nn.Identity(), @@ -702,8 +725,12 @@ def build_zuko_sospf( def build_zuko_naf( batch_x: Tensor, batch_y: Tensor, - z_score_x: Optional[str] = "independent", - z_score_y: Optional[str] = "independent", + z_score_x: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", + z_score_y: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", hidden_features: Union[Sequence[int], int] = 50, num_transforms: int = 5, embedding_net: nn.Module = nn.Identity(), @@ -768,8 +795,12 @@ def build_zuko_naf( def build_zuko_unaf( batch_x: Tensor, batch_y: Tensor, - z_score_x: Optional[str] = "independent", - z_score_y: Optional[str] = "independent", + z_score_x: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", + z_score_y: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", hidden_features: Union[Sequence[int], int] = 50, num_transforms: int = 5, embedding_net: nn.Module = nn.Identity(), @@ -834,8 +865,12 @@ def build_zuko_unaf( def build_zuko_cnf( batch_x: Tensor, batch_y: Tensor, - z_score_x: Optional[str] = "independent", - z_score_y: Optional[str] = "independent", + z_score_x: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", + z_score_y: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", hidden_features: Union[Sequence[int], int] = 50, num_transforms: int = 5, embedding_net: nn.Module = nn.Identity(), @@ -888,8 +923,12 @@ def build_zuko_cnf( def build_zuko_gf( batch_x: Tensor, batch_y: Tensor, - z_score_x: Optional[str] = "independent", - z_score_y: Optional[str] = "independent", + z_score_x: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", + z_score_y: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", hidden_features: Union[Sequence[int], int] = 50, num_transforms: int = 3, embedding_net: nn.Module = nn.Identity(), @@ -945,8 +984,12 @@ def build_zuko_gf( def build_zuko_bpf( batch_x: Tensor, batch_y: Tensor, - z_score_x: Optional[str] = "independent", - z_score_y: Optional[str] = "independent", + z_score_x: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", + z_score_y: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", hidden_features: Union[Sequence[int], int] = 50, num_transforms: int = 3, embedding_net: nn.Module = nn.Identity(), @@ -1004,16 +1047,29 @@ def build_zuko_flow( which_nf: str, batch_x: Tensor, batch_y: Tensor, - z_score_x: Optional[str] = "independent", - z_score_y: Optional[str] = "independent", + z_score_x: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", + z_score_y: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ] = "independent", hidden_features: Union[Sequence[int], int] = 50, num_transforms: int = 5, embedding_net: nn.Module = nn.Identity(), + x_dist: Optional[Distribution] = None, **kwargs, ) -> ZukoFlow: """ Fundamental building blocks to build a Zuko normalizing flow model. + The following cases are considered in the if statements down below: + + z_score_x is `independent, `structured` or None, in which case we just use + the normal standardizing transform. + z_score_x is `transform_to_unconstrained`, in this case, we check if `x_dist` is + provided and has a support property. If `x_dist` is not valid (i.e. None + or has no support property), we raise an error. + Args: which_nf (str): The type of normalizing flow to build. batch_x: Batch of xs, used to infer dimensionality and (optional) z-scoring. @@ -1024,97 +1080,153 @@ def build_zuko_flow( - `structured`: treat dimensions as related, therefore compute mean and std over the entire batch, instead of per-dimension. Should be used when each sample is, for example, a time series or an image. + - `transform_to_unconstrained`: Transforms to + an unbounded space if bounds from `x_dist` are given. z_score_y: Whether to z-score ys passing into the network, same options as z_score_x. hidden_features: The number of hidden features in the flow. Defaults to 50. num_transforms: The number of transformations in the flow. Defaults to 5. embedding_net: The embedding network to use. Defaults to nn.Identity(). + x_dist: The distribution over x, used to determine the bounds for the + unconstrained transformation. + - In Neural Posterior Estimation (NPE), `x_dist` typically corresponds + to the prior over x (e.g., a `BoxUniform`). + - For Neural Likelihood Estimation (NLE) or Neural Ratio Estimation (NRE), + `x_dist` may instead be a user-specified distribution. However, make sure + all the data lies within the support of the distribution if you want to + use the `transform_to_unconstrained` option for NLE and NRE. **kwargs: Additional keyword arguments to pass to the flow constructor. Returns: ZukoFlow: The constructed Zuko normalizing flow model. """ - check_data_device(batch_x, batch_y) x_numel = get_numel(batch_x, embedding_net=None) y_numel = get_numel(batch_y, embedding_net=embedding_net) - # keep only zuko kwargs + # Keep only zuko kwargs kwargs = {k: v for k, v in kwargs.items() if k not in nflow_specific_kwargs} if isinstance(hidden_features, int): hidden_features = [hidden_features] * num_transforms + # Get base transforms from specified flow + base, base_transforms = _get_base_and_transforms( + which_nf, x_numel, y_numel, hidden_features, num_transforms, **kwargs + ) + + # Get x transforms (z-score or logit transform) + x_transforms = _prepare_x_transforms(z_score_x, batch_x, x_dist) + + # Combine all transforms + transforms = x_transforms + base_transforms + + # Maybe add y-z-scoring via embedding network + embedding_net = _prepare_y_embedding(z_score_y, batch_y, embedding_net) + + # Create final neural network + neural_net = zuko.flows.Flow(transforms, base) + + flow = ZukoFlow( + neural_net, + embedding_net, + input_shape=batch_x[0].shape, + condition_shape=batch_y[0].shape, + ) + + return flow + + +def _get_base_and_transforms( + which_nf: str, + x_numel: int, + y_numel: int, + hidden_features: Sequence[int], + num_transforms: int, + **kwargs, +) -> Tuple[LazyDistribution, tuple]: + """ + Build the base zuko flow and extract its transforms. + + Args: + which_nf: The type of normalizing flow to build. + x_numel: Number of elements in x. + y_numel: Number of elements in y. + hidden_features: Hidden features as a sequence. + num_transforms: Number of transforms. + **kwargs: Additional arguments for flow constructor. + + Returns: + tuple of flow base and its transforms. + """ build_nf = getattr(zuko.flows, which_nf) if which_nf == "CNF": - flow_built = build_nf( + flow: Flow = build_nf( features=x_numel, context=y_numel, hidden_features=hidden_features, **kwargs ) + # CNF has a single continuous transform + base_transforms = (flow.transform,) else: - flow_built = build_nf( + flow: Flow = build_nf( features=x_numel, context=y_numel, hidden_features=hidden_features, transforms=num_transforms, **kwargs, ) + # Regular flows have multiple discrete transforms + base_transforms = tuple(flow.transform.transforms) - # Continuous normalizing flows (CNF) only have one transform, - # so we need to handle them slightly differently. - if which_nf == "CNF": - transform = flow_built.transform + return flow.base, base_transforms - z_score_x_bool, structured_x = z_score_parser(z_score_x) - if z_score_x_bool: - transform = ( - standardizing_transform_zuko(batch_x, structured_x), - transform, - ) - z_score_y_bool, structured_y = z_score_parser(z_score_y) - if z_score_y_bool: - # Prepend standardizing transform to y-embedding. - embedding_net = nn.Sequential( - standardizing_transform_zuko(batch_y, structured_y), embedding_net - ) +def _prepare_x_transforms( + z_score_x: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ], + batch_x: Tensor, + x_dist: Optional[Distribution], +) -> tuple: + """ + Prepare transforms to prepend for x processing. - # Combine transforms. - neural_net = zuko.flows.Flow(transform, flow_built.base) - else: - transforms = flow_built.transform.transforms + Args: + z_score_x: Type of x preprocessing. + batch_x: Batch of x data. + x_dist: Distribution for unconstrained transformation. - z_score_x_bool, structured_x = z_score_parser(z_score_x) - if z_score_x_bool: - transforms = ( - standardizing_transform_zuko(batch_x, structured_x), - *transforms, + Returns: + Tuple of transforms to prepend (empty tuple if no preprocessing). + """ + transforms = () + z_score_x_bool, structured_x = z_score_parser(z_score_x) + if z_score_x == "transform_to_unconstrained": + if x_dist is None: + raise ValueError( + "Transformation to unconstrained space requires a distribution " + "provided through `x_dist`." ) - - z_score_y_bool, structured_y = z_score_parser(z_score_y) - if z_score_y_bool: - # Prepend standardizing transform to y-embedding. - embedding_net = nn.Sequential( - standardizing_net(batch_y, structured_y), embedding_net + if not hasattr(x_dist, "support"): + raise ValueError( + "`x_dist` requires a `.support` attribute for" + "an unconstrained transformation." ) + transform_to_unconstrained = biject_transform_zuko(mcmc_transform(x_dist)) + transforms = (transform_to_unconstrained,) + elif z_score_x_bool: + z_score_transform = standardizing_transform_zuko(batch_x, structured_x) + transforms = (z_score_transform,) - # Combine transforms. - neural_net = zuko.flows.Flow(transforms, flow_built.base) - - flow = ZukoFlow( - neural_net, - embedding_net, - input_shape=batch_x[0].shape, - condition_shape=batch_y[0].shape, - ) - - return flow + return transforms def build_zuko_unconditional_flow( which_nf: str, batch_x: Tensor, - z_score_x: Optional[str] = "independent", + z_score_x: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ], hidden_features: Union[Sequence[int], int] = 50, num_transforms: int = 5, **kwargs, @@ -1148,46 +1260,21 @@ def build_zuko_unconditional_flow( if isinstance(hidden_features, int): hidden_features = [hidden_features] * num_transforms - build_nf = getattr(zuko.flows, which_nf) + base, base_transforms = _get_base_and_transforms( + which_nf, x_numel, 0, hidden_features, num_transforms, **kwargs + ) - if which_nf == "CNF": - flow_built = build_nf( - features=x_numel, hidden_features=hidden_features, **kwargs - ) - else: - flow_built = build_nf( - features=x_numel, - hidden_features=hidden_features, - transforms=num_transforms, - **kwargs, + z_score_x_bool, structured_x = z_score_parser(z_score_x) + if z_score_x_bool: + # TODO: Check whether first base transform, then z-score is correct (it's the + # other way around in the conditional flows). + transforms = ( + *base_transforms, + standardizing_transform_zuko(batch_x, structured_x), ) - # Continuous normalizing flows (CNF) only have one transform, - # so we need to handle them slightly differently. - if which_nf == "CNF": - transform = flow_built.transform - - z_score_x_bool, structured_x = z_score_parser(z_score_x) - if z_score_x_bool: - transform = ( - transform, - standardizing_transform_zuko(batch_x, structured_x), - ) - - # Combine transforms. - neural_net = zuko.flows.Flow(transform, flow_built.base) - else: - transforms = flow_built.transform.transforms - - z_score_x_bool, structured_x = z_score_parser(z_score_x) - if z_score_x_bool: - transforms = ( - *transforms, - standardizing_transform_zuko(batch_x, structured_x), - ) - - # Combine transforms. - neural_net = zuko.flows.Flow(transforms, flow_built.base) + # Combine transforms. + neural_net = zuko.flows.Flow(transforms, base) flow = ZukoUnconditionalFlow( neural_net, @@ -1197,6 +1284,30 @@ def build_zuko_unconditional_flow( return flow +def _prepare_y_embedding( + z_score_y: Literal[ + "none", "independent", "structured", "transform_to_unconstrained" + ], + batch_y: Tensor, + embedding_net: nn.Module, +) -> nn.Module: + """ + Prepend the embedding network for y, adding z-scoring if needed. + + Args: + z_score_y: Type of y preprocessing. + batch_y: Batch of y data. + embedding_net: Original embedding network. + + Returns: + Modified embedding network. + """ + z_score_y_bool, structured_y = z_score_parser(z_score_y) + if z_score_y_bool: + return nn.Sequential(standardizing_net(batch_y, structured_y), embedding_net) + return embedding_net + + class ContextSplineMap(nn.Module): """ Neural network from `context` to the spline parameters. diff --git a/sbi/utils/sbiutils.py b/sbi/utils/sbiutils.py index 778caaa4d..cc31e5126 100644 --- a/sbi/utils/sbiutils.py +++ b/sbi/utils/sbiutils.py @@ -5,7 +5,17 @@ import random import warnings from math import pi -from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Type, Union +from typing import ( + Any, + Callable, + Dict, + List, + Optional, + Sequence, + Tuple, + Type, + Union, +) import numpy as np import pyknos.nflows.transforms as nflows_tf @@ -101,7 +111,9 @@ def clamp_and_warn(name: str, value: float, min_val: float, max_val: float) -> f return clamped_val -def z_score_parser(z_score_flag: Optional["str"]) -> Tuple[bool, bool]: +def z_score_parser( + z_score_flag: Optional[str] = None, +) -> Tuple[bool, bool]: """Parses string z-score flag into booleans. Converts string flag into booleans denoting whether to z-score or not, and whether @@ -133,11 +145,15 @@ def z_score_parser(z_score_flag: Optional["str"]) -> Tuple[bool, bool]: # Got one of two valid z-scoring methods. z_score_bool = True structured_data = z_score_flag == "structured" - + elif z_score_flag == "transform_to_unconstrained": + # Dependent on the distribution, the biject_to function + # will provide e.g., a logit, exponential of z-scored distribution. + z_score_bool, structured_data = False, False else: # Return warning due to invalid option, defaults to not z-scoring. raise ValueError( - "Invalid z-scoring option. Use 'none', 'independent', or 'structured'." + "Invalid z-scoring option. Use 'none', 'independent' " + "'structured' or 'transform_to_unconstrained'." ) return z_score_bool, structured_data @@ -197,6 +213,35 @@ def standardizing_transform_zuko( ) +class CallableTransform: + """Wraps a PyTorch Transform to be used in Zuko UnconditionalTransform.""" + + def __init__(self, transform): + self.transform = transform + + def __call__(self): + return self.transform + + +def biject_transform_zuko( + transform: TorchTransform, +) -> zuko.flows.UnconditionalTransform: + """ + Wraps a pytorch transform in a Zuko unconditional transfrom on a bounded interval. + + Args: + transform: a bijective transformation for Zuko, depending on the input + (e.g., logit, exponential or z-scored) + + Returns: + Zuko bijective transformation + """ + return zuko.flows.UnconditionalTransform( + CallableTransform(transform), + buffer=True, + ) + + def z_standardization( batch_t: Tensor, structured_dims: bool = False, diff --git a/tests/density_estimator_test.py b/tests/density_estimator_test.py index 719bd8626..cebe7c848 100644 --- a/tests/density_estimator_test.py +++ b/tests/density_estimator_test.py @@ -8,10 +8,11 @@ import pytest import torch from torch import eye, zeros -from torch.distributions import MultivariateNormal +from torch.distributions import HalfNormal, MultivariateNormal from sbi.neural_nets.embedding_nets import CNNEmbedding from sbi.neural_nets.estimators.shape_handling import reshape_to_sample_batch_event +from sbi.neural_nets.estimators.zuko_flow import ZukoFlow from sbi.neural_nets.net_builders import ( build_categoricalmassestimator, build_made, @@ -34,6 +35,8 @@ build_zuko_sospf, build_zuko_unaf, ) +from sbi.neural_nets.net_builders.flow import build_zuko_flow +from sbi.utils.torchutils import BoxUniform # List of all density estimator builders for testing. model_builders = [ @@ -462,3 +465,50 @@ def test_mixed_density_estimator( # Test samples samples = density_estimator.sample(sample_shape, condition=conditions) assert samples.shape == (*sample_shape, batch_dim, *input_event_shape) + + +@pytest.mark.parametrize("which_nf", ["MAF", "CNF"]) +@pytest.mark.parametrize( + "x_dist", + [ + BoxUniform(low=-2 * torch.ones(5), high=2 * torch.ones(5)), + HalfNormal(scale=torch.ones(1) * 2), + MultivariateNormal(loc=zeros(5), covariance_matrix=eye(5)), + ], +) +def test_build_zuko_flow_with_valid_unconstrained_transform(which_nf, x_dist): + """Test that ZukoFlow builds successfully with valid `x_dist`.""" + # input dimension is 5 + batch_x = torch.randn(10, 5) + batch_y = torch.randn(10, 3) + + # Test case where x_dist is provided (should not raise an error) + flow = build_zuko_flow( + which_nf=which_nf, + batch_x=batch_x, + batch_y=batch_y, + z_score_x="transform_to_unconstrained", + z_score_y="transform_to_unconstrained", + x_dist=x_dist, + ) + assert isinstance(flow, ZukoFlow) + + +@pytest.mark.parametrize("which_nf", ["MAF", "CNF"]) +def test_build_zuko_flow_missing_x_dist_raises_error(which_nf): + """Test that ValueError is raised if `x_dist` is None when required.""" + batch_x = torch.randn(10, 5) + batch_y = torch.randn(10, 3) + + with pytest.raises( + ValueError, + match=r".*distribution.*x_dist.*", + ): + build_zuko_flow( + which_nf=which_nf, + batch_x=batch_x, + batch_y=batch_y, + z_score_x="transform_to_unconstrained", + z_score_y="transform_to_unconstrained", + x_dist=None, # No distribution provided + ) diff --git a/tests/linearGaussian_snle_test.py b/tests/linearGaussian_snle_test.py index 558e1488c..df0d1fc6a 100644 --- a/tests/linearGaussian_snle_test.py +++ b/tests/linearGaussian_snle_test.py @@ -502,3 +502,90 @@ def test_api_nle_sampling_methods( posterior.train(max_num_iters=10) posterior.sample(sample_shape=(num_samples,), show_progress_bars=False) + + +@pytest.mark.slow +@pytest.mark.parametrize("num_dim", (2,)) +@pytest.mark.parametrize("prior_str", ("uniform", "gaussian")) +@pytest.mark.parametrize("model_str", ("zuko_maf", "zuko_nsf")) +def test_c2st_nle_unconstrained_space( + num_dim: int, prior_str: str, model_str: str, mcmc_params_accurate: dict +): + """Test SNL on linear Gaussian in unconstrained space. + + Args: + num_dim: parameter dimension of the gaussian model + prior_str: one of "gaussian" or "uniform" + + """ + num_samples = 500 + num_simulations = 3000 + + # likelihood_mean will be likelihood_shift+theta + likelihood_shift = -1.0 * ones(num_dim) + # Use increased cov to avoid too small posterior cov for many trials. + likelihood_cov = 0.8 * eye(num_dim) + + if prior_str == "gaussian": + prior_mean = zeros(num_dim) + prior_cov = eye(num_dim) + prior = MultivariateNormal(loc=prior_mean, covariance_matrix=prior_cov) + else: + prior = BoxUniform(-2.0 * ones(num_dim), 2.0 * ones(num_dim)) + + def simulator(theta): + return linear_gaussian(theta, likelihood_shift, likelihood_cov) + + theta = prior.sample((num_simulations,)) + x = simulator(theta) + + # Estimate prior on x. + x_dist = BoxUniform(low=x.min(dim=0)[0], high=x.max(dim=0)[0]) + + # Use likelihood_nn with z_score_theta="transform_to_unconstrained" + density_estimator = likelihood_nn( + model_str, + hidden_features=60, + num_transforms=3, + z_score_x="transform_to_unconstrained", + x_dist=x_dist, + ) + inference = NLE(density_estimator=density_estimator) + + likelihood_estimator = inference.append_simulations(theta, x).train() + + # Test inference amortized over trials. + x_o = zeros((1, num_dim)) + if prior_str == "gaussian": + gt_posterior = true_posterior_linear_gaussian_mvn_prior( + x_o, likelihood_shift, likelihood_cov, prior_mean, prior_cov + ) + target_samples = gt_posterior.sample((num_samples,)) + elif prior_str == "uniform": + target_samples = samples_true_posterior_linear_gaussian_uniform_prior( + x_o, + likelihood_shift, + likelihood_cov, + prior=prior, + num_samples=num_samples, + ) + + potential_fn, theta_transform = likelihood_estimator_based_potential( + prior=prior, likelihood_estimator=likelihood_estimator, x_o=x_o + ) + posterior = MCMCPosterior( + proposal=prior, + potential_fn=potential_fn, + theta_transform=theta_transform, + method="slice_np_vectorized", + **mcmc_params_accurate, + ) + + samples = posterior.sample(sample_shape=(num_samples,)) + + # Check performance based on c2st accuracy. + check_c2st( + samples, + target_samples, + alg=f"nle_a-{prior_str}-prior-{model_str}", + ) diff --git a/tests/linearGaussian_snpe_test.py b/tests/linearGaussian_snpe_test.py index 6c9d1cf30..fd0c2bc9b 100644 --- a/tests/linearGaussian_snpe_test.py +++ b/tests/linearGaussian_snpe_test.py @@ -731,3 +731,61 @@ def simulator(theta): _ = inference.append_simulations(theta, x, proposal=proposal).train() posterior = inference.build_posterior().set_default_x(x_o) proposal = posterior + + +@pytest.mark.slow +@pytest.mark.parametrize( + "num_dim", + ((2), (1)), +) +@pytest.mark.parametrize("npe_method", [NPE_B, NPE_C]) +@pytest.mark.parametrize( + "density_estimator", + ["zuko_maf", "zuko_nsf"], +) +def test_density_estimators_unconstrained_space( + num_dim, npe_method: type, density_estimator +): + """Test NPE B/C in inconstrained space.""" + + x_o = zeros(1, num_dim) + num_samples = 1000 + num_simulations = 2500 + + # likelihood_mean will be likelihood_shift+theta + likelihood_shift = -1.0 * ones(num_dim) + likelihood_cov = 0.3 * eye(num_dim) + + prior = utils.BoxUniform(-2.0 * ones(num_dim), 2.0 * ones(num_dim)) + + target_samples = samples_true_posterior_linear_gaussian_uniform_prior( + x_o, likelihood_shift, likelihood_cov, prior, num_samples + ) + + def simulator(theta): + return linear_gaussian(theta, likelihood_shift, likelihood_cov) + + # Train in unconstrained space + + density_estimator_build_fun = posterior_nn( + model=density_estimator, + hidden_features=60, + num_transforms=3, + z_score_theta="transform_to_unconstrained", + x_dist=prior, + ) + + inference = npe_method(prior, density_estimator=density_estimator_build_fun) + + theta = prior.sample((num_simulations,)) + x = simulator(theta) + posterior_estimator = inference.append_simulations(theta, x).train( + training_batch_size=100 + ) + posterior = DirectPosterior( + prior=prior, posterior_estimator=posterior_estimator + ).set_default_x(x_o) + samples = posterior.sample((num_samples,)) + + # Compute the c2st and assert it is near chance level of 0.5. + check_c2st(samples, target_samples, alg=f"npe_{density_estimator}") diff --git a/tests/sbiutils_test.py b/tests/sbiutils_test.py index 51b27c357..73a56cffd 100644 --- a/tests/sbiutils_test.py +++ b/tests/sbiutils_test.py @@ -20,6 +20,7 @@ from sbi.inference.trainers.npe.npe_a import NPE_A_MDN from sbi.neural_nets import classifier_nn, likelihood_nn, posterior_nn from sbi.utils import BoxUniform, get_kde +from sbi.utils.sbiutils import z_score_parser def test_conditional_density_1d(): @@ -406,10 +407,79 @@ def test_kde(bandwidth, transform, sample_weights): @pytest.mark.parametrize( - "z_x", [True, False, None, "none", "independent", "structured"] + "z_x", + [ + True, + False, + None, + "none", + "independent", + "structured", + "transform_to_unconstrained", + pytest.param( + "invalid_value", + marks=pytest.mark.xfail( + raises=ValueError, + reason="Invalid z-scoring option should raise ValueError.", + ), + ), + ], ) @pytest.mark.parametrize( - "z_theta", [True, False, None, "none", "independent", "structured"] + "z_theta", + [ + True, + False, + None, + "none", + "independent", + "structured", + "transform_to_unconstrained", + pytest.param( + "invalid_value", + marks=pytest.mark.xfail( + raises=ValueError, + reason="Invalid z-scoring option should raise ValueError.", + ), + ), + ], +) +def test_z_score_parser(z_x, z_theta): + """Test the z_score_parser function.""" + if z_x is bool or z_theta is bool: + with pytest.warns( + UserWarning, + match="Boolean values for z-scoring are deprecated and will", + ): + z_score_parser(z_x) + z_score_parser(z_theta) + + result_x = z_score_parser(z_x) + result_theta = z_score_parser(z_theta) + + assert result_x is not None, f"z_score_parser({z_x}) returned None" + assert result_theta is not None, f"z_score_parser({z_theta}) returned None" + + +@pytest.mark.parametrize( + "z_x", + [ + None, + "none", + "independent", + "structured", + "transform_to_unconstrained", + ], +) +@pytest.mark.parametrize( + "z_theta", + [ + None, + "none", + "independent", + "structured", + "transform_to_unconstrained", + ], ) @pytest.mark.parametrize("builder", [likelihood_nn, posterior_nn, classifier_nn]) def test_z_scoring_structured(z_x, z_theta, builder): @@ -417,19 +487,41 @@ def test_z_scoring_structured(z_x, z_theta, builder): Test that z-scoring string args don't break API. """ # Generate some signals for test. - t = torch.arange(0, 1, 0.001) + t = torch.arange(0, 1, 0.1) x_sin = torch.sin(t * 2 * torch.pi * 5) t_batch = torch.stack([(x_sin * (i + 1)) + (i * 2) for i in range(10)]) + num_dim = t_batch.shape[1] + x_dist = BoxUniform(low=-2 * torch.ones(num_dim), high=2 * torch.ones(num_dim)) + # API tests + # TODO: Test breaks at "mnle" if builder in [likelihood_nn, posterior_nn]: - for model in ["mdn", "made", "maf", "nsf"]: + for model in [ + "mdn", + "made", + "maf", + "nsf", + "zuko_nice", + "zuko_nsf", + "zuko_maf", + "zuko_ncsf", + "zuko_bpf", + "maf_rqs", + "zuko_sospf", + "zuko_naf", + "zuko_unaf", + "zuko_gf", + "mlp_flowmatcher", + "resnet_flowmatcher", + ]: net = builder( model, z_score_theta=z_theta, z_score_x=z_x, hidden_features=2, num_transforms=1, + x_dist=x_dist, ) assert net(t_batch, t_batch) else: