pytorch_utils package

Submodules

pytorch_utils.data_modules module

class AugmentedBernoulliDataModule(augmented_bernoulli_dataset_configs: ~pytorch_utils.dataset_configurations.AugmentedBernoulliDatasetConfigs, preprocessing_pipeline: ~sklearn.pipeline.Pipeline | None = None, train_dataloader_params: ~typing.Dict[str, ~typing.Any] = {}, val_dataloader_params: ~typing.Dict[str, ~typing.Any] = {}, test_dataloader_params: ~typing.Dict[str, ~typing.Any] = {}, predict_dataloader_params: ~typing.Dict[str, ~typing.Any] = {}, data_module_logger: ~pytorch_utils.logging.loggers.Logger = <pytorch_utils.logging.loggers.VoidLogger object>, preprocessing_pipeline_logger: ~pytorch_utils.logging.loggers.Logger = <pytorch_utils.logging.loggers.VoidLogger object>, prepare_data_per_node: bool = False)[source]

Bases: LightningDataModule

For prediction: attributes prediction_df (and optionally prediction_scaling_factors, prediction_min_augmented_value and prediction_max_augmented_value) must be set as desired.

preprocessing_pandas_collate_fn(batch: List[DataFrameRow]) Mapping[str, Tensor] | Tuple[Mapping[str, Tensor], Tensor, Tensor | None][source]

More computationally efficient than using collate_fn=lambda batch: pd.concat(batch, axis=0, sort=True) and setting fitted_transformers_pipeline=self.preprocessing_pipeline in AugmentedBernoulliDataset. This vectorized the transform operations.

transform(batch: DataFrame) DataFrame[source]

Implicit assumption in the following implementation: the preprocessing pipeline does not modify the number of rows.

transform_to_tensors(batch: DataFrame) Mapping[str, Tensor] | Tuple[Mapping[str, Tensor], Tensor, Tensor | None][source]
augment_transform_to_tensors(batch: DataFrame, augmentation_scaling_factors: ndarray = array([1.]), min_augmented_value: float = -inf, max_augmented_value: float = inf) Mapping[str, Tensor] | Tuple[Mapping[str, Tensor], Tensor, Tensor | None][source]
format_to_tensors(transformed_batch: DataFrame) Mapping[str, Tensor] | Tuple[Mapping[str, Tensor], Tensor, Tensor | None][source]
property is_preprocessing_pipeline_fitted: bool

Boolean indicating whether pipeline is fitted

fit_preprocessing_pipeline(refit: bool = False) None[source]
property input_features_dtypes: Series
property output_features_dtypes: Series
property output_features
property output_real_features: List[str]
property output_categorical_features: Dict[str, Set[int]]
prepare_data() None[source]

Use this to download and prepare data. Downloading and saving data with multiple processes (distributed settings) will result in corrupted data. Lightning ensures this method is called only within a single process, so you can safely add your downloading logic within.

Warning

DO NOT set state to the model (use setup instead) since this is NOT called on every device

Example:

def prepare_data(self):
    # good
    download_data()
    tokenize()
    etc()

    # bad
    self.split = data_split
    self.some_state = some_other_state()

In a distributed environment, prepare_data can be called in two ways (using prepare_data_per_node)

  1. Once per node. This is the default and is only called on LOCAL_RANK=0.

  2. Once in total. Only called on GLOBAL_RANK=0.

Example:

# DEFAULT
# called once per node on LOCAL_RANK=0 of that node
class LitDataModule(LightningDataModule):
    def __init__(self):
        super().__init__()
        self.prepare_data_per_node = True


# call on GLOBAL_RANK=0 (great for shared file systems)
class LitDataModule(LightningDataModule):
    def __init__(self):
        super().__init__()
        self.prepare_data_per_node = False

This is called before requesting the dataloaders:

model.prepare_data()
initialize_distributed()
model.setup(stage)
model.train_dataloader()
model.val_dataloader()
model.test_dataloader()
model.predict_dataloader()
setup(stage: str) None[source]

Called at the beginning of fit (train + validate), validate, test, or predict. This is a good hook when you need to build models dynamically or adjust something about them. This hook is called on every process when using DDP.

Parameters:

stage – either 'fit', 'validate', 'test', or 'predict'

Example:

class LitModel(...):
    def __init__(self):
        self.l1 = None

    def prepare_data(self):
        download_data()
        tokenize()

        # don't do this
        self.something = else

    def setup(self, stage):
        data = load_data(...)
        self.l1 = nn.Linear(28, data.num_classes)
setup_datasets(ml_stage: Literal[MLStage.fit, MLStage.validate, MLStage.test, MLStage.predict], dataset: Dataset, validation_dataset: Dataset | None = None) None[source]
property batch_size
train_dataloader()[source]

An iterable or collection of iterables specifying training samples.

For more information about multiple dataloaders, see this section.

The dataloader you return will not be reloaded unless you set :paramref:`~pytorch_lightning.trainer.trainer.Trainer.reload_dataloaders_every_n_epochs` to a positive integer.

For data processing use the following pattern:

However, the above are only necessary for distributed processing.

Warning

do not assign state in prepare_data

Note

Lightning tries to add the correct sampler for distributed and arbitrary hardware. There is no need to set it yourself.

val_dataloader()[source]

An iterable or collection of iterables specifying validation samples.

For more information about multiple dataloaders, see this section.

The dataloader you return will not be reloaded unless you set :paramref:`~pytorch_lightning.trainer.trainer.Trainer.reload_dataloaders_every_n_epochs` to a positive integer.

It’s recommended that all data downloads and preparation happen in prepare_data().

Note

Lightning tries to add the correct sampler for distributed and arbitrary hardware There is no need to set it yourself.

Note

If you don’t need a validation dataset and a validation_step(), you don’t need to implement this method.

test_dataloader()[source]

An iterable or collection of iterables specifying test samples.

For more information about multiple dataloaders, see this section.

For data processing use the following pattern:

However, the above are only necessary for distributed processing.

Warning

do not assign state in prepare_data

Note

Lightning tries to add the correct sampler for distributed and arbitrary hardware. There is no need to set it yourself.

Note

If you don’t need a test dataset and a test_step(), you don’t need to implement this method.

predict_dataloader()[source]

An iterable or collection of iterables specifying prediction samples.

For more information about multiple dataloaders, see this section.

It’s recommended that all data downloads and preparation happen in prepare_data().

Note

Lightning tries to add the correct sampler for distributed and arbitrary hardware There is no need to set it yourself.

Returns:

A torch.utils.data.DataLoader or a sequence of them specifying prediction samples.

clear_data()[source]
log(logger: Logger) None[source]
classmethod load_from_checkpoint(checkpoint_path, map_location=None, hparams_file=None, strict=None, **kwargs: Any) AugmentedBernoulliDataModule[source]

We override this method to correct a bug with map_location argument. See Github issue: https://github.com/Lightning-AI/lightning/issues/17945

pytorch_utils.dataset_configurations module

class DataSplitConfig(training_proportion: float = 1.0, validation_proportion: float = 0.0, test_proportion: float = 0.0, random_seed: int | None = None, stratify: ~typing.List[str] | None = None, logger: ~pytorch_utils.logging.loggers.Logger = <pytorch_utils.logging.loggers.VoidLogger object>)[source]

Bases: SingleLoggerDataclassLoggable

Configuration used to specify train, validation and test splits. The proportions should all lie in [0,1], with their sum smaller or equal to 1. In case the sum is strictly smaller than 1, only a random subset of the data is used.

training_proportion

proportion of dataframe to be used as training samples

Type:

float, default=1.

validation_proportion

proportion of dataframe to be used as validation samples

Type:

float, default=0.

test_proportion

proportion of dataframe to be used as test samples

Type:

float, default=0.

random_seed

random seed used for random splitting

Type:

Optional[int], default=None

stratify

list of column names used to stratify the data (see also sklearn.model_selection.train_test_split)

Type:

Optional[List[str]], default=None

training_proportion: float = 1.0
validation_proportion: float = 0.0
test_proportion: float = 0.0
random_seed: int | None = None
stratify: List[str] | None = None
logger: Logger = <pytorch_utils.logging.loggers.VoidLogger object>
train_valid_test_split(df: DataFrame) Tuple[Index, Index, Index][source]
class DataAugmentationConfig(augmented_col: str, scaling_factors: ~numpy.ndarray = array([1.]), min_value: float = -inf, max_value: float = inf, logger: ~pytorch_utils.logging.loggers.Logger = <pytorch_utils.logging.loggers.VoidLogger object>)[source]

Bases: SingleLoggerDataclassLoggable

Configuration used to specify data augmentation on specific column (augmented_col). The idea of this data augmentation is to duplicate the data several times, with only augmented_col changed by multiplying the original values with a scaling factor.

This can be useful when there is some monotone relationship between a covariate (augmented_col) and the success of an event (0=success, 1=event).

augmented_col

Name of column to augment

Type:

str

scaling_factors

Numpy array of floats corresponding to scaling factors used for data augmentation

Type:

np.ndarray

augmented_col: str
scaling_factors: np.ndarray = array([1.])
min_value: float = -inf
max_value: float = inf
logger: Logger = <pytorch_utils.logging.loggers.VoidLogger object>
augment_data(data: DataFrame) DataFrame[source]
static scaling_filter(data: DataFrame, scaling_factor: ndarray, col: str, min_value: float = -inf, max_value: float = inf) DataFrame[source]

Filter used to drop scaled values that are outside the range [min_value, max_value]. See method scale_col.

static scale_col(data: DataFrame, scaling_factor: ndarray, col: str, min_value: float = -inf, max_value: float = inf) DataFrame[source]

Return pandas dataframe identical to data with column col scaled by scaling_factor. The scaled values that are outside the range [min_value, max_value] are dropped.

static scaling_length(data: DataFrame, scaling_factor: ndarray, col: str, min_value: float = -inf, max_value: float = inf) int[source]

Length of the pandas dataframe obtained when calling method scale_col with the exact same input. The implementation does not require explicitly building the dataframe.

augmentation_length(data: DataFrame) int[source]

Length of the pandas dataframe obtained when calling method augment_data with the exact same input. The implementation does not require explicitly building the dataframe.

log(logger: Logger) None[source]
class AugmentedBernoulliDatasetConfig(data: ~pandas.core.frame.DataFrame, is_success: bool, data_augmentation_config: ~pytorch_utils.dataset_configurations.DataAugmentationConfig, split_config: ~pytorch_utils.dataset_configurations.DataSplitConfig = DataSplitConfig(training_proportion=1.0, validation_proportion=0.0, test_proportion=0.0, random_seed=None, stratify=None), metadata: ~typing.Dict[str, ~typing.Any] = <factory>, logger: ~pytorch_utils.logging.loggers.Logger = <pytorch_utils.logging.loggers.VoidLogger object>)[source]

Bases: SingleLoggerDataclassLoggable

Dataset configuration for augmented Bernoulli samples (binary outcomes: successful or not). There are 2 ways to construct an instance of AugmentedBernoulliDatasetConfig:

  1. either by calling the constructor and passing a pandas dataframe (with optional meatada) as input

  2. or by calling class method `from_meta_dataframe and passing a delta table as input

Method 1 is preferred for testing/debugging/prototyping while method 2 is preferred for production and traceable experimentations (clean metadata, etc…).

data

The Pandas dataframe containing the data.

Type:

pandas.DataFrame

is_success

Whether the samples correspond to successful events or not

Type:

bool

split_config

The configuration for splitting between train, validation and test

Type:

DataSplitConfig

data_augmentation_config

The configuration for data augmentation

Type:

DataAugmentationConfig

metadata

Any information regarding the source data that we wish to track/save

Type:

Dict[str, Any], default={}

data: pd.DataFrame
is_success: bool
data_augmentation_config: DataAugmentationConfig
split_config: DataSplitConfig = DataSplitConfig(training_proportion=1.0, validation_proportion=0.0, test_proportion=0.0, random_seed=None, stratify=None)
metadata: Dict[str, Any]
logger: Logger = <pytorch_utils.logging.loggers.VoidLogger object>
classmethod from_meta_dataframe(meta_df: Any[pyspark.sql.DataFrame], is_success: bool, data_augmentation_config: DataAugmentationConfig, split_config: DataSplitConfig, logger: Logger = <pytorch_utils.logging.loggers.VoidLogger object>, spark_filter: Optional[str] = None, pandas_formatter: PandasFormatter = PandasIdentityFormatter()) AugmentedBernoulliDatasetConfig[source]

Use this method to construct an instance of AugmentedBernoulliDatasetConfig directly from a delta table with proper metadata.

delta_table

The delta table containing the data. To use a previous version of the data, call restoreToVersion(version: int) on delta_table before passing it to AugmentedBernoulliDatasetConfig.

Type:

delta.tables.DeltaTable

is_success

Whether the samples correspond to successful events or not

Type:

bool

split_config

The configuration for splitting between train, validation and test

Type:

DataSplitConfig

data_augmentation_config

The configuration for data augmentation

Type:

DataAugmentationConfig

pandas_formatter

Any formatting on pandas data (cast dtypes, etc…).

Type:

PandasFormatter

sample(n: int, replace: bool = False)[source]
property training_data
property training_data_length
property validation_data
property validation_data_length
property test_data
property test_data_length
property augmented_data
property augmented_data_length
property augmented_training_data
property augmented_training_data_length
property augmented_validation_data
property augmented_validation_data_length
property augmented_test_data
property augmented_test_data_length
property augmented_col
property data_augmentation_scaling_factors
property columns
property dtypes
clear_data() AugmentedBernoulliDatasetConfig[source]
log(logger: Logger) None[source]
class AugmentedBernoulliDatasetConfigs(augmented_bernoulli_dataset_configs=typing.List[pytorch_utils.dataset_configurations.AugmentedBernoulliDatasetConfig], label_col: str = 'success_labels', labels_dtype: ~typing.Type[~numpy.int32] = <class 'numpy.int32'>, sample_weight_col: str | None = None, logger: ~pytorch_utils.logging.loggers.Logger = <pytorch_utils.logging.loggers.VoidLogger object>)[source]

Bases: UserList

check_compatibility() None[source]
property augmented_col
property columns
property dtypes
sample(n: int, replace: bool = False)[source]
property all_data
property all_data_length
property all_training_data
property all_training_data_length
property all_validation_data
property all_validation_data_length
property all_test_data
property all_test_data_length
property all_augmented_data
property all_augmented_data_length
property all_augmented_training_data
property all_augmented_training_data_length
property all_augmented_validation_data
property all_augmented_validation_data_length
property all_augmented_test_data
property all_augmented_test_data_length
clear_data() AugmentedBernoulliDatasetConfigs[source]
log(logger: Logger) None[source]

pytorch_utils.datasets module

class MLStage(value)[source]

Bases: Enum

An enumeration.

fit = 1
validate = 2
test = 3
predict = 4
class AugmentedBernoulliDataset(data: ListDataFrameRows, is_success: bool | None, augmented_col: str, fitted_preprocessing_pipeline: Pipeline | None = None, data_augmentation_scaling_factors: ndarray = array([1.]), label_col: str = 'success_labels', labels_dtype: dtype = dtype('int32'), sample_weight_col: str | None = None, min_augmented_value: float = -inf, max_augmented_value: float = inf)[source]

Bases: Dataset

Implements storage-efficient data augmentation of Bernoulli samples (binary outcomes: successful or not) as well as data transformations (e.g., scaling, encoding, …). If is_success is set to None, only features are generated (labels are dropped). This is useful for prediction sets.

data: ListDataFrameRows
is_success: bool | None
augmented_col: str
fitted_preprocessing_pipeline: Pipeline | None = None
data_augmentation_scaling_factors: ndarray = array([1.])
label_col: str = 'success_labels'
labels_dtype: dtype = dtype('int32')
sample_weight_col: str | None = None
min_augmented_value: float = -inf
max_augmented_value: float = inf
classmethod from_config(config: AugmentedBernoulliDatasetConfig, ml_stage: Literal[MLStage.fit, MLStage.validate, MLStage.test], fitted_preprocessing_pipeline: Pipeline | None = None, label_col: str = 'success_labels', labels_dtype: dtype = dtype('int32'), sample_weight_col: str | None = None) AugmentedBernoulliDataset[source]
property dataframe: DataFrame
clear_data() AugmentedBernoulliDataset[source]
property raw_feature_names
property transformed_feature_names

pytorch_utils.exceptions module

exception MLError[source]

Bases: Exception

Exception class from which every exception in this library will derive. It enables other projects using this library to catch all errors coming from the library with a single “except” statement

exception CatalogError[source]

Bases: MLError

Raised when the catalog is not valid

exception InvalidDataFormatError[source]

Bases: MLError

Raised when the catalog is not valid

exception LocalDirNotWriteableException[source]

Bases: MLError

Raised when the local directory is not writeable

exception MissingConfigFileException[source]

Bases: MLError

Raised when a given configuration file cannot be found

exception BadCLIParameterException[source]

Bases: MLError

Raised when there is an issue with a parameter in the CLI (commnand-line interface)

exception BadConfigException[source]

Bases: MLError

Raised when a configuration file cannot be loaded, for instance due to wrong syntax or poor formatting.

exception BadConfigLogLevelException[source]

Bases: BadConfigException

Raised when the log level (log_level) does not exist

exception BadConfigSparkMasterException[source]

Bases: BadConfigException

Raised when the Spark master (spark_master) is not valid

exception BadConfigPathException[source]

Bases: BadConfigException

Raised when the configuration path parameter (conf_path) does not contain at least a directory/path where to find configuration files

exception BadConfigMissingInputException[source]

Bases: BadConfigException

Raised when the ‘input_data’/{‘products’, ‘transactions’} file-path is missing

exception BadConfigMissingOutputException[source]

Bases: BadConfigException

Raised when the ‘output_data’/’transactions’ file-path is missing

exception TaskNotFoundError[source]

Bases: MLError

Raised when task name is not found in entrypoints

exception MissingDatasetError[source]

Bases: MLError

Raised when a dataset is not found in the catalog

exception DataSetError[source]

Bases: MLError

Raised when there is an issue with a dataset

exception MissingConfigException[source]

Bases: MLError

Raised when a configuration is missing

exception BadConfigFormatException[source]

Bases: MLError

Raised when a configuration is not formatted correctly

exception InconsistentDatasetConfigurations[source]

Bases: MLError

Raised when several inconsistent dataset configurations are being used to build a dataset.

exception NotMonotone[source]

Bases: MLError

Raised when some outputs are not a monotone function of some inputs.

exception NotNonDecreasing[source]

Bases: MLError

Raised when some outputs are not an non-decreasing function of some inputs.

exception NotNonIncreasing[source]

Bases: MLError

Raised when some outputs are not an non-increasing function of some inputs.

pytorch_utils.metrics module

class WeightedMeanSquaredError(squared: bool = True, **kwargs: Any)[source]

Bases: MeanSquaredError

Analogue of torchmetrics.MeanSquaredError but with (optional) sample weights.

update(preds: Tensor, target: Tensor, sample_weights: Tensor | None = None) None[source]

Update state with predictions and targets.

class WeightedMeanAbsoluteError(**kwargs: Any)[source]

Bases: MeanAbsoluteError

Analogue of torchmetrics.MeanAbsoluteError but with (optional) sample weights.

update(preds: Tensor, target: Tensor, sample_weights: Tensor | None = None) None[source]

Update state with predictions and targets.

class WeightedBinaryCalibrationError(n_bins: int = 15, norm: Literal['l1', 'l2', 'max'] = 'l1', ignore_index: int | None = None, validate_args: bool = True, **kwargs: Any)[source]

Bases: BinaryCalibrationError

Analogue of torchmetrics.classification import BinaryCalibrationError but with (optional) sample weights.

update(preds: Tensor, target: Tensor, weights: Tensor | None = None) None[source]

Update metric states with predictions and targets.

compute() Tensor[source]

Compute metric.

calibration_curve(estimator_name: str, pos_label: int = 1) CalibrationDisplay[source]

pytorch_utils.miscellaneous module

class_full_name(cls)[source]
sort_hashable(array: List[T]) List[T][source]
class DataclassType(*args, **kwargs)[source]

Bases: Protocol

pytorch_utils.modules module

class LinearNonNeg(in_features: int, out_features: int, bias: bool = True, device=None, dtype=None)[source]

Bases: Linear

Alternative linear layer with nonnegative weights (bias unchanged). This ensures the outputs are always a non-decreasing function of the inputs (no matter the values of parameters self.weight and self.bias, which may vary during training).

The easiest way to implement this class with minimal code is to subclass torch.nn.Linear and apply a positive transformation (namely torch.nn.functional.elu shifted by 1) to the weights before applying the linear transformation in the forward method.

forward(input: Tensor) Tensor[source]

Define the computation performed at every call.

Should be overridden by all subclasses.

Note

Although the recipe for forward pass needs to be defined within this function, one should call the Module instance afterwards instead of this since the former takes care of running the registered hooks while the latter silently ignores them.

class BatchNorm1dNonNeg(num_features: int, eps: float = 1e-05, momentum: float = 0.1, affine: bool = True, track_running_stats: bool = True, device=None, dtype=None)[source]

Bases: BatchNorm1d

Alternative batch normalization with nonnegative weights (bias unchanged). This ensures the outputs are always a non-decreasing function of the inputs when self.training=False (no matter the values of parameters self.weight and self.bias, which may vary during training).

The easiest way to implement this class with minimal code is to subclass torch.nn.BatchNorm1d and apply a positive transformation (namely torch.nn.functional.elu shifted by 1) to the weights before applying the batch norm transformation in the forward method.

forward(input: Tensor) Tensor[source]

Define the computation performed at every call.

Should be overridden by all subclasses.

Note

Although the recipe for forward pass needs to be defined within this function, one should call the Module instance afterwards instead of this since the former takes care of running the registered hooks while the latter silently ignores them.

class LinearSemiNonNeg(in_features_non_neg, in_features_others, out_features, non_neg_inputs_name='non_neg_inputs', other_inputs_name='other_inputs')[source]

Bases: Module

Alternative linear layer combining a standard linear layer (torch.nn.Linear) together with a LinearNonNeg layer (by summing the two). The outputs are always a non-decreasing function of the inputs named non_neg_inputs_name (no matter the weights and biases). The outputs are not necessarily monotone w.r.t. the inputs named other_inputs_name.

forward(input: Mapping[str, Tensor])[source]

Define the computation performed at every call.

Should be overridden by all subclasses.

Note

Although the recipe for forward pass needs to be defined within this function, one should call the Module instance afterwards instead of this since the former takes care of running the registered hooks while the latter silently ignores them.

class BiLinearSemiNonNeg(in_features_non_neg, in_features_others, out_features_non_neg, out_features_others, non_neg_inputs_name='non_neg_inputs', other_inputs_name='other_inputs')[source]

Bases: Module

forward(input: Mapping[str, Tensor])[source]

Define the computation performed at every call.

Should be overridden by all subclasses.

Note

Although the recipe for forward pass needs to be defined within this function, one should call the Module instance afterwards instead of this since the former takes care of running the registered hooks while the latter silently ignores them.

class Partitioned(**module_partitions: Module)[source]

Bases: Module

Unlike torch.nn.Sequential wich “chains” outputs to inputs sequentially for each module in a provided list, this module simultaneously transforms every partition of the input in parallel using the corresponding module. The difference between torch.nn.Sequential and Partitioned is similar to the difference between a series and parallel electric circuit.

forward(input_partitions: Mapping[str, Tensor])[source]

Define the computation performed at every call.

Should be overridden by all subclasses.

Note

Although the recipe for forward pass needs to be defined within this function, one should call the Module instance afterwards instead of this since the former takes care of running the registered hooks while the latter silently ignores them.

class ShiftedEmbedding(num_embeddings: int, embedding_dim: int, padding_idx: int | None = None, max_norm: float | None = None, norm_type: float = 2.0, scale_grad_by_freq: bool = False, sparse: bool = False, _weight: Tensor | None = None, _freeze: bool = False, device=None, dtype=None)[source]

Bases: Embedding

Custom embedding module that shifts all indices by 1. The original torch.nn.Embedding layer only accepts non-negative integers as inputs. This custom layer accepts non-negative integers and -1 as inputs. This is useful when -1 is used to encode unknown and/or missing values (i.e., using a sklearn.preprocessing.OrdinalEncoder with unknown_value=-1 and/or encoded_missing_value=-1).

The easiest way to implement this class with minimal code is to subclass torch.nn.Embedding and shift the inputs by 1 in forward method.

forward(input: Tensor) Tensor[source]

Define the computation performed at every call.

Should be overridden by all subclasses.

Note

Although the recipe for forward pass needs to be defined within this function, one should call the Module instance afterwards instead of this since the former takes care of running the registered hooks while the latter silently ignores them.

class MeanImputationEmbedding(num_embeddings: int, embedding_dim: int, padding_idx: int | None = None, max_norm: float | None = None, norm_type: float = 2.0, scale_grad_by_freq: bool = False, sparse: bool = False, _weight: Tensor | None = None, _freeze: bool = False, device=None, dtype=None)[source]

Bases: Embedding

Custom embedding module that applies “mean imputation” when inputs negative.

The original torch.nn.Embedding layer only accepts non-negative integers as inputs. This custom layer also accepts negative integers as inputs. This is useful when for instance -1 is used to encode unknown and/or missing values (i.e., using a sklearn.preprocessing.OrdinalEncoder with unknown_value=-1 and/or encoded_missing_value=-1). When a negative input is provided, all embeddings are averaged (form of “mean imputation”).

forward(input: Tensor) Tensor[source]

Define the computation performed at every call.

Should be overridden by all subclasses.

Note

Although the recipe for forward pass needs to be defined within this function, one should call the Module instance afterwards instead of this since the former takes care of running the registered hooks while the latter silently ignores them.

class MonotoneBernoulliProbability(real_features_non_decreasing: ~typing.List[str], real_features_non_increasing: ~typing.List[str], real_features_non_monotone: ~typing.List[str], categorical_feature_embeddings: ~typing.List[~pytorch_utils.utils.CategoricalFeatureEmbedding] = [], hidden_sizes_monotone: ~typing.List[int] = [], hidden_sizes_non_monotone: ~typing.List[int] = [], polynomial_real_features_expansions: ~typing.Dict[str, ~typing.List[int]] = {}, activation_layer_monotone: ~typing.Type[~torch.nn.modules.module.Module] = <class 'torch.nn.modules.activation.ReLU'>, activation_layer_non_monotone: ~typing.Type[~torch.nn.modules.module.Module] = <class 'torch.nn.modules.activation.ReLU'>, normalization_layer_monotone: ~typing.Type[~torch.nn.modules.module.Module] = <class 'pytorch_utils.modules.BatchNorm1dNonNeg'>, normalization_layer_non_monotone: ~typing.Type[~torch.nn.modules.module.Module] = <class 'torch.nn.modules.batchnorm.BatchNorm1d'>, dropout_rate_monotone: int = 0, dropout_rate_non_monotone: int = 0, optim_criterion_params: ~typing.Dict[str, ~typing.Any] = {'class': <class 'torch.nn.modules.loss.BCEWithLogitsLoss'>}, optimizer_params: ~typing.Dict[str, ~typing.Any] = {'betas': (0.9, 0.999), 'class': <class 'torch.optim.adam.Adam'>, 'eps': 1e-08, 'lr': 0.001}, validation_metrics: ~typing.Mapping[str, ~torchmetrics.metric.Metric] = {}, test_metrics: ~typing.Mapping[str, ~torchmetrics.metric.Metric] = {})[source]

Bases: LightningModule

Predicts the probability of success of an event conditioned on some features. The structure of the neural network enforces that the predicted probability is a monotone (i.e., non-decreasing and/or non-increasing) function of some specified features.

optimizer_params should at least contain the keys class and lr

Two ways of doing inference:
  • use self.predict directly

  • use method predict of pytorch_lightning.Trainer

module_scope: str
property learning_rate: float
forward(x: Mapping[str, Tensor]) Tensor[source]

Same as torch.nn.Module.forward().

Parameters:
  • *args – Whatever you decide to pass into the forward method.

  • **kwargs – Keyword arguments are also possible.

Returns:

Your model’s output

configure_optimizers()[source]

Choose what optimizers and learning-rate schedulers to use in your optimization. Normally you’d need one. But in the case of GANs or similar you might have multiple. Optimization with multiple optimizers only works in the manual optimization mode.

Returns:

Any of these 6 options.

  • Single optimizer.

  • List or Tuple of optimizers.

  • Two lists - The first list has multiple optimizers, and the second has multiple LR schedulers (or multiple lr_scheduler_config).

  • Dictionary, with an "optimizer" key, and (optionally) a "lr_scheduler" key whose value is a single LR scheduler or lr_scheduler_config.

  • None - Fit will run without any optimizer.

The lr_scheduler_config is a dictionary which contains the scheduler and its associated configuration. The default configuration is shown below.

lr_scheduler_config = {
    # REQUIRED: The scheduler instance
    "scheduler": lr_scheduler,
    # The unit of the scheduler's step size, could also be 'step'.
    # 'epoch' updates the scheduler on epoch end whereas 'step'
    # updates it after a optimizer update.
    "interval": "epoch",
    # How many epochs/steps should pass between calls to
    # `scheduler.step()`. 1 corresponds to updating the learning
    # rate after every epoch/step.
    "frequency": 1,
    # Metric to to monitor for schedulers like `ReduceLROnPlateau`
    "monitor": "val_loss",
    # If set to `True`, will enforce that the value specified 'monitor'
    # is available when the scheduler is updated, thus stopping
    # training if not found. If set to `False`, it will only produce a warning
    "strict": True,
    # If using the `LearningRateMonitor` callback to monitor the
    # learning rate progress, this keyword can be used to specify
    # a custom logged name
    "name": None,
}

When there are schedulers in which the .step() method is conditioned on a value, such as the torch.optim.lr_scheduler.ReduceLROnPlateau scheduler, Lightning requires that the lr_scheduler_config contains the keyword "monitor" set to the metric name that the scheduler should be conditioned on.

Metrics can be made available to monitor by simply logging it using self.log('metric_to_track', metric_val) in your LightningModule.

Note

Some things to know:

  • Lightning calls .backward() and .step() automatically in case of automatic optimization.

  • If a learning rate scheduler is specified in configure_optimizers() with key "interval" (default “epoch”) in the scheduler configuration, Lightning will call the scheduler’s .step() method automatically in case of automatic optimization.

  • If you use 16-bit precision (precision=16), Lightning will automatically handle the optimizer.

  • If you use torch.optim.LBFGS, Lightning handles the closure function automatically for you.

  • If you use multiple optimizers, you will have to switch to ‘manual optimization’ mode and step them yourself.

  • If you need to control how often the optimizer steps, override the optimizer_step() hook.

training_step(batch: Tuple[Mapping[str, Tensor], Tensor, Tensor | None], batch_idx: int) Tensor[source]

Here you compute and return the training loss and some additional metrics for e.g. the progress bar or logger.

Parameters:
  • batch – The output of your data iterable, normally a DataLoader.

  • batch_idx – The index of this batch.

  • dataloader_idx – The index of the dataloader that produced this batch. (only if multiple dataloaders used)

Returns:

  • Tensor - The loss tensor

  • dict - A dictionary which can include any keys, but must include the key 'loss' in the case of automatic optimization.

  • None - In automatic optimization, this will skip to the next batch (but is not supported for multi-GPU, TPU, or DeepSpeed). For manual optimization, this has no special meaning, as returning the loss is not required.

In this step you’d normally do the forward pass and calculate the loss for a batch. You can also do fancier things like multiple forward passes or something model specific.

Example:

def training_step(self, batch, batch_idx):
    x, y, z = batch
    out = self.encoder(x)
    loss = self.loss(out, x)
    return loss

To use multiple optimizers, you can switch to ‘manual optimization’ and control their stepping:

def __init__(self):
    super().__init__()
    self.automatic_optimization = False


# Multiple optimizers (e.g.: GANs)
def training_step(self, batch, batch_idx):
    opt1, opt2 = self.optimizers()

    # do training_step with encoder
    ...
    opt1.step()
    # do training_step with decoder
    ...
    opt2.step()

Note

When accumulate_grad_batches > 1, the loss returned here will be automatically normalized by accumulate_grad_batches internally.

validation_step(batch: Tuple[Mapping[str, Tensor], Tensor, Tensor | None], batch_idx: int) Tensor[source]

Operates on a single batch of data from the validation set. In this step you’d might generate examples or calculate anything of interest like accuracy.

Parameters:
  • batch – The output of your data iterable, normally a DataLoader.

  • batch_idx – The index of this batch.

  • dataloader_idx – The index of the dataloader that produced this batch. (only if multiple dataloaders used)

Returns:

  • Tensor - The loss tensor

  • dict - A dictionary. Can include any keys, but must include the key 'loss'.

  • None - Skip to the next batch.

# if you have one val dataloader:
def validation_step(self, batch, batch_idx): ...


# if you have multiple val dataloaders:
def validation_step(self, batch, batch_idx, dataloader_idx=0): ...

Examples:

# CASE 1: A single validation dataset
def validation_step(self, batch, batch_idx):
    x, y = batch

    # implement your own
    out = self(x)
    loss = self.loss(out, y)

    # log 6 example images
    # or generated text... or whatever
    sample_imgs = x[:6]
    grid = torchvision.utils.make_grid(sample_imgs)
    self.logger.experiment.add_image('example_images', grid, 0)

    # calculate acc
    labels_hat = torch.argmax(out, dim=1)
    val_acc = torch.sum(y == labels_hat).item() / (len(y) * 1.0)

    # log the outputs!
    self.log_dict({'val_loss': loss, 'val_acc': val_acc})

If you pass in multiple val dataloaders, validation_step() will have an additional argument. We recommend setting the default value of 0 so that you can quickly switch between single and multiple dataloaders.

# CASE 2: multiple validation dataloaders
def validation_step(self, batch, batch_idx, dataloader_idx=0):
    # dataloader_idx tells you which dataset this is.
    ...

Note

If you don’t need to validate you don’t need to implement this method.

Note

When the validation_step() is called, the model has been put in eval mode and PyTorch gradients have been disabled. At the end of validation, the model goes back to training mode and gradients are enabled.

test_step(batch: Tuple[Mapping[str, Tensor], Tensor, Tensor | None], batch_idx: int) Tensor[source]

Operates on a single batch of data from the test set. In this step you’d normally generate examples or calculate anything of interest such as accuracy.

Parameters:
  • batch – The output of your data iterable, normally a DataLoader.

  • batch_idx – The index of this batch.

  • dataloader_idx – The index of the dataloader that produced this batch. (only if multiple dataloaders used)

Returns:

  • Tensor - The loss tensor

  • dict - A dictionary. Can include any keys, but must include the key 'loss'.

  • None - Skip to the next batch.

# if you have one test dataloader:
def test_step(self, batch, batch_idx): ...


# if you have multiple test dataloaders:
def test_step(self, batch, batch_idx, dataloader_idx=0): ...

Examples:

# CASE 1: A single test dataset
def test_step(self, batch, batch_idx):
    x, y = batch

    # implement your own
    out = self(x)
    loss = self.loss(out, y)

    # log 6 example images
    # or generated text... or whatever
    sample_imgs = x[:6]
    grid = torchvision.utils.make_grid(sample_imgs)
    self.logger.experiment.add_image('example_images', grid, 0)

    # calculate acc
    labels_hat = torch.argmax(out, dim=1)
    test_acc = torch.sum(y == labels_hat).item() / (len(y) * 1.0)

    # log the outputs!
    self.log_dict({'test_loss': loss, 'test_acc': test_acc})

If you pass in multiple test dataloaders, test_step() will have an additional argument. We recommend setting the default value of 0 so that you can quickly switch between single and multiple dataloaders.

# CASE 2: multiple test dataloaders
def test_step(self, batch, batch_idx, dataloader_idx=0):
    # dataloader_idx tells you which dataset this is.
    ...

Note

If you don’t need to test you don’t need to implement this method.

Note

When the test_step() is called, the model has been put in eval mode and PyTorch gradients have been disabled. At the end of the test epoch, the model goes back to training mode and gradients are enabled.

predict_step(batch: Mapping[str, Tensor], batch_idx: int, dataloader_idx: int = 0) Tensor[source]

Step function called during predict(). By default, it calls forward(). Override to add any processing logic.

The predict_step() is used to scale inference on multi-devices.

To prevent an OOM error, it is possible to use BasePredictionWriter callback to write the predictions to disk or database after each batch or on epoch end.

The BasePredictionWriter should be used while using a spawn based accelerator. This happens for Trainer(strategy="ddp_spawn") or training on 8 TPU cores with Trainer(accelerator="tpu", devices=8) as predictions won’t be returned.

Parameters:
  • batch – The output of your data iterable, normally a DataLoader.

  • batch_idx – The index of this batch.

  • dataloader_idx – The index of the dataloader that produced this batch. (only if multiple dataloaders used)

Returns:

Predicted output (optional).

Example

class MyModel(LightningModule):

    def predict_step(self, batch, batch_idx, dataloader_idx=0):
        return self(batch)

dm = ...
model = MyModel()
trainer = Trainer(accelerator="gpu", devices=2)
predictions = trainer.predict(model, dm)
predict(features: Mapping[str, Tensor]) Tensor[source]
predict_from_pandas(features: DataFrame, data_module: AugmentedBernoulliDataModule, scaling_factors: ndarray = array([1.]), min_augmented_value: float = -inf, max_augmented_value: float = inf) Tensor[source]
probability_mapping(data_module: AugmentedBernoulliDataModule, other_features: DataFrame, min_value: float, max_value: float, nb_points: int = 100, trainer: Trainer | None = None) Tuple[DataFrame, ndarray, Tensor][source]

Computes the mapping between covariate data_module.augmented_col and the predicted probability of the neural network on the closed interval [min_value, max_value], all other_features being fixed. The mapping is discretized into nb_points points.

There are two ways to use this function: - call with trainer=None => method self.predict is used directly for inference - call with trainer=pytorch_lightning.Trainer(…) => method predict of pytorch_lightning.Trainer is used for inference

assert_monotone_probability(data_module: AugmentedBernoulliDataModule, other_features: DataFrame, non_decreasing: bool, min_value: float, max_value: float, nb_points: int = 100, trainer: Trainer | None = None, error_message: str = '')[source]
plot_probability_mapping(data_module: AugmentedBernoulliDataModule, other_features: DataFrame, min_value: float, max_value: float, nb_points: int = 100, x_title: str = 'Covariate', y_title: str = 'Predicted probability', title: str = 'Evolution of the predicted probability as a function of the covariate', trainer: Trainer | None = None) Figure[source]

Plots the mapping between covariate data_module.augmented_col and the predicted probability of the neural network on the closed interval [min_value, max_value], all other_features being fixed. The mapping is discretized into nb_points points.

There are two ways to use this function: - call with trainer=None => method self.predict is used directly for inference - call with trainer=pytorch_lightning.Trainer(…) => method predict of pytorch_lightning.Trainer is used for inference

class ProbabilityPredictor(module: Module, data_module: DataModule)[source]

Bases: Generic[Module, DataModule]

Just a pair (MonotoneBernoulliProbability, AugmentedBernoulliDataModule) with useful methods such as predict_from_pandas.

module: Module
data_module: DataModule
classmethod load_from_checkpoint(checkpoint_path: str, module_class: ~typing.Type[~pytorch_utils.modules.Module] = <class 'pytorch_utils.modules.MonotoneBernoulliProbability'>, data_module_class: ~typing.Type[~pytorch_utils.modules.DataModule] = <class 'pytorch_utils.data_modules.AugmentedBernoulliDataModule'>, clear_data: bool = False, compile_module: bool = False, compilation_kwargs: ~typing.Dict[str, ~typing.Any] = {}, **kwargs) ProbabilityPredictor[source]
clear_data() ProbabilityPredictor[Module, DataModule][source]
predict_from_pandas(context: DataFrame) ndarray[source]

pytorch_utils.utils module

class CategoricalFeatureEmbedding(feature_name: str, nb_distinct_values: int, embedding_size: int, logger: ~pytorch_utils.logging.loggers.Logger = <pytorch_utils.logging.loggers.VoidLogger object>)[source]

Bases: object

Collection that simplifies the constructor parameters.

feature_name: str
nb_distinct_values: int
embedding_size: int
logger: Logger = <pytorch_utils.logging.loggers.VoidLogger object>
log(logger: Logger) None[source]
get_embedding_size(nb_categories: int, multiplicative_factor: float = 1.6, power_exponent: float = 0.56, max_size: int = 600) int[source]

Determine empirically good embedding sizes (formula taken from fastai: https://docs.fast.ai/tabular.model.html). :param nb_categories: number of categories :type nb_categories: int :param max_size: maximum embedding size. Defaults to 600. :type max_size: (int, optional)

Returns:

embedding size

Return type:

int

assert_monotone(inputs: ndarray, outputs: ndarray, non_decreasing: bool | None = None, error_message: str = '', tol: float = 1e-05) None[source]

Asserts if the outputs are a monontone function of the inputs. inputs should be a 1-dimensional. outputs should be a 1 or 2-dimensional array. If 2-dimensional, then every row is tested to be a monotone function of the inputs. If non_decreasing is None then an exception is raised only if the mapping is neither non-decreasing, nor non-increasing. If non_decreasing is True then an exception is raised only if the mapping is not non-decreasing. If non_decreasing is False then an exception is raised only if the mapping is not non-increasing.

Module contents