pytorch_utils package
Submodules
pytorch_utils.data_modules module
- class AugmentedBernoulliDataModule(augmented_bernoulli_dataset_configs: ~pytorch_utils.dataset_configurations.AugmentedBernoulliDatasetConfigs, preprocessing_pipeline: ~sklearn.pipeline.Pipeline | None = None, train_dataloader_params: ~typing.Dict[str, ~typing.Any] = {}, val_dataloader_params: ~typing.Dict[str, ~typing.Any] = {}, test_dataloader_params: ~typing.Dict[str, ~typing.Any] = {}, predict_dataloader_params: ~typing.Dict[str, ~typing.Any] = {}, data_module_logger: ~pytorch_utils.logging.loggers.Logger = <pytorch_utils.logging.loggers.VoidLogger object>, preprocessing_pipeline_logger: ~pytorch_utils.logging.loggers.Logger = <pytorch_utils.logging.loggers.VoidLogger object>, prepare_data_per_node: bool = False)[source]
Bases:
LightningDataModule
For prediction: attributes prediction_df (and optionally prediction_scaling_factors, prediction_min_augmented_value and prediction_max_augmented_value) must be set as desired.
- preprocessing_pandas_collate_fn(batch: List[DataFrameRow]) Mapping[str, Tensor] | Tuple[Mapping[str, Tensor], Tensor, Tensor | None] [source]
More computationally efficient than using collate_fn=lambda batch: pd.concat(batch, axis=0, sort=True) and setting fitted_transformers_pipeline=self.preprocessing_pipeline in AugmentedBernoulliDataset. This vectorized the transform operations.
- transform(batch: DataFrame) DataFrame [source]
Implicit assumption in the following implementation: the preprocessing pipeline does not modify the number of rows.
- transform_to_tensors(batch: DataFrame) Mapping[str, Tensor] | Tuple[Mapping[str, Tensor], Tensor, Tensor | None] [source]
- augment_transform_to_tensors(batch: DataFrame, augmentation_scaling_factors: ndarray = array([1.]), min_augmented_value: float = -inf, max_augmented_value: float = inf) Mapping[str, Tensor] | Tuple[Mapping[str, Tensor], Tensor, Tensor | None] [source]
- format_to_tensors(transformed_batch: DataFrame) Mapping[str, Tensor] | Tuple[Mapping[str, Tensor], Tensor, Tensor | None] [source]
- property is_preprocessing_pipeline_fitted: bool
Boolean indicating whether pipeline is fitted
- property input_features_dtypes: Series
- property output_features_dtypes: Series
- property output_features
- property output_real_features: List[str]
- property output_categorical_features: Dict[str, Set[int]]
- prepare_data() None [source]
Use this to download and prepare data. Downloading and saving data with multiple processes (distributed settings) will result in corrupted data. Lightning ensures this method is called only within a single process, so you can safely add your downloading logic within.
Warning
DO NOT set state to the model (use
setup
instead) since this is NOT called on every deviceExample:
def prepare_data(self): # good download_data() tokenize() etc() # bad self.split = data_split self.some_state = some_other_state()
In a distributed environment,
prepare_data
can be called in two ways (using prepare_data_per_node)Once per node. This is the default and is only called on LOCAL_RANK=0.
Once in total. Only called on GLOBAL_RANK=0.
Example:
# DEFAULT # called once per node on LOCAL_RANK=0 of that node class LitDataModule(LightningDataModule): def __init__(self): super().__init__() self.prepare_data_per_node = True # call on GLOBAL_RANK=0 (great for shared file systems) class LitDataModule(LightningDataModule): def __init__(self): super().__init__() self.prepare_data_per_node = False
This is called before requesting the dataloaders:
model.prepare_data() initialize_distributed() model.setup(stage) model.train_dataloader() model.val_dataloader() model.test_dataloader() model.predict_dataloader()
- setup(stage: str) None [source]
Called at the beginning of fit (train + validate), validate, test, or predict. This is a good hook when you need to build models dynamically or adjust something about them. This hook is called on every process when using DDP.
- Parameters:
stage – either
'fit'
,'validate'
,'test'
, or'predict'
Example:
class LitModel(...): def __init__(self): self.l1 = None def prepare_data(self): download_data() tokenize() # don't do this self.something = else def setup(self, stage): data = load_data(...) self.l1 = nn.Linear(28, data.num_classes)
- setup_datasets(ml_stage: Literal[MLStage.fit, MLStage.validate, MLStage.test, MLStage.predict], dataset: Dataset, validation_dataset: Dataset | None = None) None [source]
- property batch_size
- train_dataloader()[source]
An iterable or collection of iterables specifying training samples.
For more information about multiple dataloaders, see this section.
The dataloader you return will not be reloaded unless you set :paramref:`~pytorch_lightning.trainer.trainer.Trainer.reload_dataloaders_every_n_epochs` to a positive integer.
For data processing use the following pattern:
download in
prepare_data()
process and split in
setup()
However, the above are only necessary for distributed processing.
Warning
do not assign state in prepare_data
fit()
Note
Lightning tries to add the correct sampler for distributed and arbitrary hardware. There is no need to set it yourself.
- val_dataloader()[source]
An iterable or collection of iterables specifying validation samples.
For more information about multiple dataloaders, see this section.
The dataloader you return will not be reloaded unless you set :paramref:`~pytorch_lightning.trainer.trainer.Trainer.reload_dataloaders_every_n_epochs` to a positive integer.
It’s recommended that all data downloads and preparation happen in
prepare_data()
.fit()
validate()
Note
Lightning tries to add the correct sampler for distributed and arbitrary hardware There is no need to set it yourself.
Note
If you don’t need a validation dataset and a
validation_step()
, you don’t need to implement this method.
- test_dataloader()[source]
An iterable or collection of iterables specifying test samples.
For more information about multiple dataloaders, see this section.
For data processing use the following pattern:
download in
prepare_data()
process and split in
setup()
However, the above are only necessary for distributed processing.
Warning
do not assign state in prepare_data
test()
Note
Lightning tries to add the correct sampler for distributed and arbitrary hardware. There is no need to set it yourself.
Note
If you don’t need a test dataset and a
test_step()
, you don’t need to implement this method.
- predict_dataloader()[source]
An iterable or collection of iterables specifying prediction samples.
For more information about multiple dataloaders, see this section.
It’s recommended that all data downloads and preparation happen in
prepare_data()
.predict()
Note
Lightning tries to add the correct sampler for distributed and arbitrary hardware There is no need to set it yourself.
- Returns:
A
torch.utils.data.DataLoader
or a sequence of them specifying prediction samples.
- classmethod load_from_checkpoint(checkpoint_path, map_location=None, hparams_file=None, strict=None, **kwargs: Any) AugmentedBernoulliDataModule [source]
We override this method to correct a bug with map_location argument. See Github issue: https://github.com/Lightning-AI/lightning/issues/17945
pytorch_utils.dataset_configurations module
- class DataSplitConfig(training_proportion: float = 1.0, validation_proportion: float = 0.0, test_proportion: float = 0.0, random_seed: int | None = None, stratify: ~typing.List[str] | None = None, logger: ~pytorch_utils.logging.loggers.Logger = <pytorch_utils.logging.loggers.VoidLogger object>)[source]
Bases:
SingleLoggerDataclassLoggable
Configuration used to specify train, validation and test splits. The proportions should all lie in [0,1], with their sum smaller or equal to 1. In case the sum is strictly smaller than 1, only a random subset of the data is used.
- training_proportion
proportion of dataframe to be used as training samples
- Type:
float, default=1.
- validation_proportion
proportion of dataframe to be used as validation samples
- Type:
float, default=0.
- test_proportion
proportion of dataframe to be used as test samples
- Type:
float, default=0.
- random_seed
random seed used for random splitting
- Type:
Optional[int], default=None
- stratify
list of column names used to stratify the data (see also sklearn.model_selection.train_test_split)
- Type:
Optional[List[str]], default=None
- training_proportion: float = 1.0
- validation_proportion: float = 0.0
- test_proportion: float = 0.0
- random_seed: int | None = None
- stratify: List[str] | None = None
- logger: Logger = <pytorch_utils.logging.loggers.VoidLogger object>
- class DataAugmentationConfig(augmented_col: str, scaling_factors: ~numpy.ndarray = array([1.]), min_value: float = -inf, max_value: float = inf, logger: ~pytorch_utils.logging.loggers.Logger = <pytorch_utils.logging.loggers.VoidLogger object>)[source]
Bases:
SingleLoggerDataclassLoggable
Configuration used to specify data augmentation on specific column (augmented_col). The idea of this data augmentation is to duplicate the data several times, with only augmented_col changed by multiplying the original values with a scaling factor.
This can be useful when there is some monotone relationship between a covariate (augmented_col) and the success of an event (0=success, 1=event).
- augmented_col
Name of column to augment
- Type:
str
- scaling_factors
Numpy array of floats corresponding to scaling factors used for data augmentation
- Type:
np.ndarray
- augmented_col: str
- scaling_factors: np.ndarray = array([1.])
- min_value: float = -inf
- max_value: float = inf
- logger: Logger = <pytorch_utils.logging.loggers.VoidLogger object>
- static scaling_filter(data: DataFrame, scaling_factor: ndarray, col: str, min_value: float = -inf, max_value: float = inf) DataFrame [source]
Filter used to drop scaled values that are outside the range [min_value, max_value]. See method scale_col.
- static scale_col(data: DataFrame, scaling_factor: ndarray, col: str, min_value: float = -inf, max_value: float = inf) DataFrame [source]
Return pandas dataframe identical to data with column col scaled by scaling_factor. The scaled values that are outside the range [min_value, max_value] are dropped.
- static scaling_length(data: DataFrame, scaling_factor: ndarray, col: str, min_value: float = -inf, max_value: float = inf) int [source]
Length of the pandas dataframe obtained when calling method scale_col with the exact same input. The implementation does not require explicitly building the dataframe.
- class AugmentedBernoulliDatasetConfig(data: ~pandas.core.frame.DataFrame, is_success: bool, data_augmentation_config: ~pytorch_utils.dataset_configurations.DataAugmentationConfig, split_config: ~pytorch_utils.dataset_configurations.DataSplitConfig = DataSplitConfig(training_proportion=1.0, validation_proportion=0.0, test_proportion=0.0, random_seed=None, stratify=None), metadata: ~typing.Dict[str, ~typing.Any] = <factory>, logger: ~pytorch_utils.logging.loggers.Logger = <pytorch_utils.logging.loggers.VoidLogger object>)[source]
Bases:
SingleLoggerDataclassLoggable
Dataset configuration for augmented Bernoulli samples (binary outcomes: successful or not). There are 2 ways to construct an instance of AugmentedBernoulliDatasetConfig:
either by calling the constructor and passing a pandas dataframe (with optional meatada) as input
or by calling class method `from_meta_dataframe and passing a delta table as input
Method 1 is preferred for testing/debugging/prototyping while method 2 is preferred for production and traceable experimentations (clean metadata, etc…).
- data
The Pandas dataframe containing the data.
- Type:
pandas.DataFrame
- is_success
Whether the samples correspond to successful events or not
- Type:
bool
- split_config
The configuration for splitting between train, validation and test
- Type:
- data_augmentation_config
The configuration for data augmentation
- Type:
- metadata
Any information regarding the source data that we wish to track/save
- Type:
Dict[str, Any], default={}
- data: pd.DataFrame
- is_success: bool
- data_augmentation_config: DataAugmentationConfig
- split_config: DataSplitConfig = DataSplitConfig(training_proportion=1.0, validation_proportion=0.0, test_proportion=0.0, random_seed=None, stratify=None)
- metadata: Dict[str, Any]
- logger: Logger = <pytorch_utils.logging.loggers.VoidLogger object>
- classmethod from_meta_dataframe(meta_df: Any[pyspark.sql.DataFrame], is_success: bool, data_augmentation_config: DataAugmentationConfig, split_config: DataSplitConfig, logger: Logger = <pytorch_utils.logging.loggers.VoidLogger object>, spark_filter: Optional[str] = None, pandas_formatter: PandasFormatter = PandasIdentityFormatter()) AugmentedBernoulliDatasetConfig [source]
Use this method to construct an instance of AugmentedBernoulliDatasetConfig directly from a delta table with proper metadata.
- delta_table
The delta table containing the data. To use a previous version of the data, call restoreToVersion(version: int) on delta_table before passing it to AugmentedBernoulliDatasetConfig.
- Type:
delta.tables.DeltaTable
- is_success
Whether the samples correspond to successful events or not
- Type:
bool
- split_config
The configuration for splitting between train, validation and test
- Type:
- data_augmentation_config
The configuration for data augmentation
- Type:
- pandas_formatter
Any formatting on pandas data (cast dtypes, etc…).
- Type:
PandasFormatter
- property training_data
- property training_data_length
- property validation_data
- property validation_data_length
- property test_data
- property test_data_length
- property augmented_data
- property augmented_data_length
- property augmented_training_data
- property augmented_training_data_length
- property augmented_validation_data
- property augmented_validation_data_length
- property augmented_test_data
- property augmented_test_data_length
- property augmented_col
- property data_augmentation_scaling_factors
- property columns
- property dtypes
- clear_data() AugmentedBernoulliDatasetConfig [source]
- class AugmentedBernoulliDatasetConfigs(augmented_bernoulli_dataset_configs=typing.List[pytorch_utils.dataset_configurations.AugmentedBernoulliDatasetConfig], label_col: str = 'success_labels', labels_dtype: ~typing.Type[~numpy.int32] = <class 'numpy.int32'>, sample_weight_col: str | None = None, logger: ~pytorch_utils.logging.loggers.Logger = <pytorch_utils.logging.loggers.VoidLogger object>)[source]
Bases:
UserList
- property augmented_col
- property columns
- property dtypes
- property all_data
- property all_data_length
- property all_training_data
- property all_training_data_length
- property all_validation_data
- property all_validation_data_length
- property all_test_data
- property all_test_data_length
- property all_augmented_data
- property all_augmented_data_length
- property all_augmented_training_data
- property all_augmented_training_data_length
- property all_augmented_validation_data
- property all_augmented_validation_data_length
- property all_augmented_test_data
- property all_augmented_test_data_length
- clear_data() AugmentedBernoulliDatasetConfigs [source]
pytorch_utils.datasets module
- class MLStage(value)[source]
Bases:
Enum
An enumeration.
- fit = 1
- validate = 2
- test = 3
- predict = 4
- class AugmentedBernoulliDataset(data: ListDataFrameRows, is_success: bool | None, augmented_col: str, fitted_preprocessing_pipeline: Pipeline | None = None, data_augmentation_scaling_factors: ndarray = array([1.]), label_col: str = 'success_labels', labels_dtype: dtype = dtype('int32'), sample_weight_col: str | None = None, min_augmented_value: float = -inf, max_augmented_value: float = inf)[source]
Bases:
Dataset
Implements storage-efficient data augmentation of Bernoulli samples (binary outcomes: successful or not) as well as data transformations (e.g., scaling, encoding, …). If is_success is set to None, only features are generated (labels are dropped). This is useful for prediction sets.
- data: ListDataFrameRows
- is_success: bool | None
- augmented_col: str
- fitted_preprocessing_pipeline: Pipeline | None = None
- data_augmentation_scaling_factors: ndarray = array([1.])
- label_col: str = 'success_labels'
- labels_dtype: dtype = dtype('int32')
- sample_weight_col: str | None = None
- min_augmented_value: float = -inf
- max_augmented_value: float = inf
- classmethod from_config(config: AugmentedBernoulliDatasetConfig, ml_stage: Literal[MLStage.fit, MLStage.validate, MLStage.test], fitted_preprocessing_pipeline: Pipeline | None = None, label_col: str = 'success_labels', labels_dtype: dtype = dtype('int32'), sample_weight_col: str | None = None) AugmentedBernoulliDataset [source]
- property dataframe: DataFrame
- clear_data() AugmentedBernoulliDataset [source]
- property raw_feature_names
- property transformed_feature_names
pytorch_utils.exceptions module
- exception MLError[source]
Bases:
Exception
Exception class from which every exception in this library will derive. It enables other projects using this library to catch all errors coming from the library with a single “except” statement
- exception LocalDirNotWriteableException[source]
Bases:
MLError
Raised when the local directory is not writeable
- exception MissingConfigFileException[source]
Bases:
MLError
Raised when a given configuration file cannot be found
- exception BadCLIParameterException[source]
Bases:
MLError
Raised when there is an issue with a parameter in the CLI (commnand-line interface)
- exception BadConfigException[source]
Bases:
MLError
Raised when a configuration file cannot be loaded, for instance due to wrong syntax or poor formatting.
- exception BadConfigLogLevelException[source]
Bases:
BadConfigException
Raised when the log level (log_level) does not exist
- exception BadConfigSparkMasterException[source]
Bases:
BadConfigException
Raised when the Spark master (spark_master) is not valid
- exception BadConfigPathException[source]
Bases:
BadConfigException
Raised when the configuration path parameter (conf_path) does not contain at least a directory/path where to find configuration files
- exception BadConfigMissingInputException[source]
Bases:
BadConfigException
Raised when the ‘input_data’/{‘products’, ‘transactions’} file-path is missing
- exception BadConfigMissingOutputException[source]
Bases:
BadConfigException
Raised when the ‘output_data’/’transactions’ file-path is missing
- exception TaskNotFoundError[source]
Bases:
MLError
Raised when task name is not found in entrypoints
- exception MissingDatasetError[source]
Bases:
MLError
Raised when a dataset is not found in the catalog
- exception BadConfigFormatException[source]
Bases:
MLError
Raised when a configuration is not formatted correctly
- exception InconsistentDatasetConfigurations[source]
Bases:
MLError
Raised when several inconsistent dataset configurations are being used to build a dataset.
- exception NotMonotone[source]
Bases:
MLError
Raised when some outputs are not a monotone function of some inputs.
pytorch_utils.metrics module
- class WeightedMeanSquaredError(squared: bool = True, **kwargs: Any)[source]
Bases:
MeanSquaredError
Analogue of torchmetrics.MeanSquaredError but with (optional) sample weights.
- class WeightedMeanAbsoluteError(**kwargs: Any)[source]
Bases:
MeanAbsoluteError
Analogue of torchmetrics.MeanAbsoluteError but with (optional) sample weights.
- class WeightedBinaryCalibrationError(n_bins: int = 15, norm: Literal['l1', 'l2', 'max'] = 'l1', ignore_index: int | None = None, validate_args: bool = True, **kwargs: Any)[source]
Bases:
BinaryCalibrationError
Analogue of torchmetrics.classification import BinaryCalibrationError but with (optional) sample weights.
pytorch_utils.miscellaneous module
pytorch_utils.modules module
- class LinearNonNeg(in_features: int, out_features: int, bias: bool = True, device=None, dtype=None)[source]
Bases:
Linear
Alternative linear layer with nonnegative weights (bias unchanged). This ensures the outputs are always a non-decreasing function of the inputs (no matter the values of parameters self.weight and self.bias, which may vary during training).
The easiest way to implement this class with minimal code is to subclass torch.nn.Linear and apply a positive transformation (namely torch.nn.functional.elu shifted by 1) to the weights before applying the linear transformation in the forward method.
- forward(input: Tensor) Tensor [source]
Define the computation performed at every call.
Should be overridden by all subclasses.
Note
Although the recipe for forward pass needs to be defined within this function, one should call the
Module
instance afterwards instead of this since the former takes care of running the registered hooks while the latter silently ignores them.
- class BatchNorm1dNonNeg(num_features: int, eps: float = 1e-05, momentum: float = 0.1, affine: bool = True, track_running_stats: bool = True, device=None, dtype=None)[source]
Bases:
BatchNorm1d
Alternative batch normalization with nonnegative weights (bias unchanged). This ensures the outputs are always a non-decreasing function of the inputs when self.training=False (no matter the values of parameters self.weight and self.bias, which may vary during training).
The easiest way to implement this class with minimal code is to subclass torch.nn.BatchNorm1d and apply a positive transformation (namely torch.nn.functional.elu shifted by 1) to the weights before applying the batch norm transformation in the forward method.
- forward(input: Tensor) Tensor [source]
Define the computation performed at every call.
Should be overridden by all subclasses.
Note
Although the recipe for forward pass needs to be defined within this function, one should call the
Module
instance afterwards instead of this since the former takes care of running the registered hooks while the latter silently ignores them.
- class LinearSemiNonNeg(in_features_non_neg, in_features_others, out_features, non_neg_inputs_name='non_neg_inputs', other_inputs_name='other_inputs')[source]
Bases:
Module
Alternative linear layer combining a standard linear layer (torch.nn.Linear) together with a LinearNonNeg layer (by summing the two). The outputs are always a non-decreasing function of the inputs named non_neg_inputs_name (no matter the weights and biases). The outputs are not necessarily monotone w.r.t. the inputs named other_inputs_name.
- forward(input: Mapping[str, Tensor])[source]
Define the computation performed at every call.
Should be overridden by all subclasses.
Note
Although the recipe for forward pass needs to be defined within this function, one should call the
Module
instance afterwards instead of this since the former takes care of running the registered hooks while the latter silently ignores them.
- class BiLinearSemiNonNeg(in_features_non_neg, in_features_others, out_features_non_neg, out_features_others, non_neg_inputs_name='non_neg_inputs', other_inputs_name='other_inputs')[source]
Bases:
Module
- forward(input: Mapping[str, Tensor])[source]
Define the computation performed at every call.
Should be overridden by all subclasses.
Note
Although the recipe for forward pass needs to be defined within this function, one should call the
Module
instance afterwards instead of this since the former takes care of running the registered hooks while the latter silently ignores them.
- class Partitioned(**module_partitions: Module)[source]
Bases:
Module
Unlike torch.nn.Sequential wich “chains” outputs to inputs sequentially for each module in a provided list, this module simultaneously transforms every partition of the input in parallel using the corresponding module. The difference between torch.nn.Sequential and Partitioned is similar to the difference between a series and parallel electric circuit.
- forward(input_partitions: Mapping[str, Tensor])[source]
Define the computation performed at every call.
Should be overridden by all subclasses.
Note
Although the recipe for forward pass needs to be defined within this function, one should call the
Module
instance afterwards instead of this since the former takes care of running the registered hooks while the latter silently ignores them.
- class ShiftedEmbedding(num_embeddings: int, embedding_dim: int, padding_idx: int | None = None, max_norm: float | None = None, norm_type: float = 2.0, scale_grad_by_freq: bool = False, sparse: bool = False, _weight: Tensor | None = None, _freeze: bool = False, device=None, dtype=None)[source]
Bases:
Embedding
Custom embedding module that shifts all indices by 1. The original torch.nn.Embedding layer only accepts non-negative integers as inputs. This custom layer accepts non-negative integers and -1 as inputs. This is useful when -1 is used to encode unknown and/or missing values (i.e., using a sklearn.preprocessing.OrdinalEncoder with unknown_value=-1 and/or encoded_missing_value=-1).
The easiest way to implement this class with minimal code is to subclass torch.nn.Embedding and shift the inputs by 1 in forward method.
- forward(input: Tensor) Tensor [source]
Define the computation performed at every call.
Should be overridden by all subclasses.
Note
Although the recipe for forward pass needs to be defined within this function, one should call the
Module
instance afterwards instead of this since the former takes care of running the registered hooks while the latter silently ignores them.
- class MeanImputationEmbedding(num_embeddings: int, embedding_dim: int, padding_idx: int | None = None, max_norm: float | None = None, norm_type: float = 2.0, scale_grad_by_freq: bool = False, sparse: bool = False, _weight: Tensor | None = None, _freeze: bool = False, device=None, dtype=None)[source]
Bases:
Embedding
Custom embedding module that applies “mean imputation” when inputs negative.
The original torch.nn.Embedding layer only accepts non-negative integers as inputs. This custom layer also accepts negative integers as inputs. This is useful when for instance -1 is used to encode unknown and/or missing values (i.e., using a sklearn.preprocessing.OrdinalEncoder with unknown_value=-1 and/or encoded_missing_value=-1). When a negative input is provided, all embeddings are averaged (form of “mean imputation”).
- forward(input: Tensor) Tensor [source]
Define the computation performed at every call.
Should be overridden by all subclasses.
Note
Although the recipe for forward pass needs to be defined within this function, one should call the
Module
instance afterwards instead of this since the former takes care of running the registered hooks while the latter silently ignores them.
- class MonotoneBernoulliProbability(real_features_non_decreasing: ~typing.List[str], real_features_non_increasing: ~typing.List[str], real_features_non_monotone: ~typing.List[str], categorical_feature_embeddings: ~typing.List[~pytorch_utils.utils.CategoricalFeatureEmbedding] = [], hidden_sizes_monotone: ~typing.List[int] = [], hidden_sizes_non_monotone: ~typing.List[int] = [], polynomial_real_features_expansions: ~typing.Dict[str, ~typing.List[int]] = {}, activation_layer_monotone: ~typing.Type[~torch.nn.modules.module.Module] = <class 'torch.nn.modules.activation.ReLU'>, activation_layer_non_monotone: ~typing.Type[~torch.nn.modules.module.Module] = <class 'torch.nn.modules.activation.ReLU'>, normalization_layer_monotone: ~typing.Type[~torch.nn.modules.module.Module] = <class 'pytorch_utils.modules.BatchNorm1dNonNeg'>, normalization_layer_non_monotone: ~typing.Type[~torch.nn.modules.module.Module] = <class 'torch.nn.modules.batchnorm.BatchNorm1d'>, dropout_rate_monotone: int = 0, dropout_rate_non_monotone: int = 0, optim_criterion_params: ~typing.Dict[str, ~typing.Any] = {'class': <class 'torch.nn.modules.loss.BCEWithLogitsLoss'>}, optimizer_params: ~typing.Dict[str, ~typing.Any] = {'betas': (0.9, 0.999), 'class': <class 'torch.optim.adam.Adam'>, 'eps': 1e-08, 'lr': 0.001}, validation_metrics: ~typing.Mapping[str, ~torchmetrics.metric.Metric] = {}, test_metrics: ~typing.Mapping[str, ~torchmetrics.metric.Metric] = {})[source]
Bases:
LightningModule
Predicts the probability of success of an event conditioned on some features. The structure of the neural network enforces that the predicted probability is a monotone (i.e., non-decreasing and/or non-increasing) function of some specified features.
optimizer_params should at least contain the keys class and lr
- Two ways of doing inference:
use self.predict directly
use method predict of pytorch_lightning.Trainer
- module_scope: str
- property learning_rate: float
- forward(x: Mapping[str, Tensor]) Tensor [source]
Same as
torch.nn.Module.forward()
.- Parameters:
*args – Whatever you decide to pass into the forward method.
**kwargs – Keyword arguments are also possible.
- Returns:
Your model’s output
- configure_optimizers()[source]
Choose what optimizers and learning-rate schedulers to use in your optimization. Normally you’d need one. But in the case of GANs or similar you might have multiple. Optimization with multiple optimizers only works in the manual optimization mode.
- Returns:
Any of these 6 options.
Single optimizer.
List or Tuple of optimizers.
Two lists - The first list has multiple optimizers, and the second has multiple LR schedulers (or multiple
lr_scheduler_config
).Dictionary, with an
"optimizer"
key, and (optionally) a"lr_scheduler"
key whose value is a single LR scheduler orlr_scheduler_config
.None - Fit will run without any optimizer.
The
lr_scheduler_config
is a dictionary which contains the scheduler and its associated configuration. The default configuration is shown below.lr_scheduler_config = { # REQUIRED: The scheduler instance "scheduler": lr_scheduler, # The unit of the scheduler's step size, could also be 'step'. # 'epoch' updates the scheduler on epoch end whereas 'step' # updates it after a optimizer update. "interval": "epoch", # How many epochs/steps should pass between calls to # `scheduler.step()`. 1 corresponds to updating the learning # rate after every epoch/step. "frequency": 1, # Metric to to monitor for schedulers like `ReduceLROnPlateau` "monitor": "val_loss", # If set to `True`, will enforce that the value specified 'monitor' # is available when the scheduler is updated, thus stopping # training if not found. If set to `False`, it will only produce a warning "strict": True, # If using the `LearningRateMonitor` callback to monitor the # learning rate progress, this keyword can be used to specify # a custom logged name "name": None, }
When there are schedulers in which the
.step()
method is conditioned on a value, such as thetorch.optim.lr_scheduler.ReduceLROnPlateau
scheduler, Lightning requires that thelr_scheduler_config
contains the keyword"monitor"
set to the metric name that the scheduler should be conditioned on.Metrics can be made available to monitor by simply logging it using
self.log('metric_to_track', metric_val)
in yourLightningModule
.Note
Some things to know:
Lightning calls
.backward()
and.step()
automatically in case of automatic optimization.If a learning rate scheduler is specified in
configure_optimizers()
with key"interval"
(default “epoch”) in the scheduler configuration, Lightning will call the scheduler’s.step()
method automatically in case of automatic optimization.If you use 16-bit precision (
precision=16
), Lightning will automatically handle the optimizer.If you use
torch.optim.LBFGS
, Lightning handles the closure function automatically for you.If you use multiple optimizers, you will have to switch to ‘manual optimization’ mode and step them yourself.
If you need to control how often the optimizer steps, override the
optimizer_step()
hook.
- training_step(batch: Tuple[Mapping[str, Tensor], Tensor, Tensor | None], batch_idx: int) Tensor [source]
Here you compute and return the training loss and some additional metrics for e.g. the progress bar or logger.
- Parameters:
batch – The output of your data iterable, normally a
DataLoader
.batch_idx – The index of this batch.
dataloader_idx – The index of the dataloader that produced this batch. (only if multiple dataloaders used)
- Returns:
Tensor
- The loss tensordict
- A dictionary which can include any keys, but must include the key'loss'
in the case of automatic optimization.None
- In automatic optimization, this will skip to the next batch (but is not supported for multi-GPU, TPU, or DeepSpeed). For manual optimization, this has no special meaning, as returning the loss is not required.
In this step you’d normally do the forward pass and calculate the loss for a batch. You can also do fancier things like multiple forward passes or something model specific.
Example:
def training_step(self, batch, batch_idx): x, y, z = batch out = self.encoder(x) loss = self.loss(out, x) return loss
To use multiple optimizers, you can switch to ‘manual optimization’ and control their stepping:
def __init__(self): super().__init__() self.automatic_optimization = False # Multiple optimizers (e.g.: GANs) def training_step(self, batch, batch_idx): opt1, opt2 = self.optimizers() # do training_step with encoder ... opt1.step() # do training_step with decoder ... opt2.step()
Note
When
accumulate_grad_batches
> 1, the loss returned here will be automatically normalized byaccumulate_grad_batches
internally.
- validation_step(batch: Tuple[Mapping[str, Tensor], Tensor, Tensor | None], batch_idx: int) Tensor [source]
Operates on a single batch of data from the validation set. In this step you’d might generate examples or calculate anything of interest like accuracy.
- Parameters:
batch – The output of your data iterable, normally a
DataLoader
.batch_idx – The index of this batch.
dataloader_idx – The index of the dataloader that produced this batch. (only if multiple dataloaders used)
- Returns:
Tensor
- The loss tensordict
- A dictionary. Can include any keys, but must include the key'loss'
.None
- Skip to the next batch.
# if you have one val dataloader: def validation_step(self, batch, batch_idx): ... # if you have multiple val dataloaders: def validation_step(self, batch, batch_idx, dataloader_idx=0): ...
Examples:
# CASE 1: A single validation dataset def validation_step(self, batch, batch_idx): x, y = batch # implement your own out = self(x) loss = self.loss(out, y) # log 6 example images # or generated text... or whatever sample_imgs = x[:6] grid = torchvision.utils.make_grid(sample_imgs) self.logger.experiment.add_image('example_images', grid, 0) # calculate acc labels_hat = torch.argmax(out, dim=1) val_acc = torch.sum(y == labels_hat).item() / (len(y) * 1.0) # log the outputs! self.log_dict({'val_loss': loss, 'val_acc': val_acc})
If you pass in multiple val dataloaders,
validation_step()
will have an additional argument. We recommend setting the default value of 0 so that you can quickly switch between single and multiple dataloaders.# CASE 2: multiple validation dataloaders def validation_step(self, batch, batch_idx, dataloader_idx=0): # dataloader_idx tells you which dataset this is. ...
Note
If you don’t need to validate you don’t need to implement this method.
Note
When the
validation_step()
is called, the model has been put in eval mode and PyTorch gradients have been disabled. At the end of validation, the model goes back to training mode and gradients are enabled.
- test_step(batch: Tuple[Mapping[str, Tensor], Tensor, Tensor | None], batch_idx: int) Tensor [source]
Operates on a single batch of data from the test set. In this step you’d normally generate examples or calculate anything of interest such as accuracy.
- Parameters:
batch – The output of your data iterable, normally a
DataLoader
.batch_idx – The index of this batch.
dataloader_idx – The index of the dataloader that produced this batch. (only if multiple dataloaders used)
- Returns:
Tensor
- The loss tensordict
- A dictionary. Can include any keys, but must include the key'loss'
.None
- Skip to the next batch.
# if you have one test dataloader: def test_step(self, batch, batch_idx): ... # if you have multiple test dataloaders: def test_step(self, batch, batch_idx, dataloader_idx=0): ...
Examples:
# CASE 1: A single test dataset def test_step(self, batch, batch_idx): x, y = batch # implement your own out = self(x) loss = self.loss(out, y) # log 6 example images # or generated text... or whatever sample_imgs = x[:6] grid = torchvision.utils.make_grid(sample_imgs) self.logger.experiment.add_image('example_images', grid, 0) # calculate acc labels_hat = torch.argmax(out, dim=1) test_acc = torch.sum(y == labels_hat).item() / (len(y) * 1.0) # log the outputs! self.log_dict({'test_loss': loss, 'test_acc': test_acc})
If you pass in multiple test dataloaders,
test_step()
will have an additional argument. We recommend setting the default value of 0 so that you can quickly switch between single and multiple dataloaders.# CASE 2: multiple test dataloaders def test_step(self, batch, batch_idx, dataloader_idx=0): # dataloader_idx tells you which dataset this is. ...
Note
If you don’t need to test you don’t need to implement this method.
Note
When the
test_step()
is called, the model has been put in eval mode and PyTorch gradients have been disabled. At the end of the test epoch, the model goes back to training mode and gradients are enabled.
- predict_step(batch: Mapping[str, Tensor], batch_idx: int, dataloader_idx: int = 0) Tensor [source]
Step function called during
predict()
. By default, it callsforward()
. Override to add any processing logic.The
predict_step()
is used to scale inference on multi-devices.To prevent an OOM error, it is possible to use
BasePredictionWriter
callback to write the predictions to disk or database after each batch or on epoch end.The
BasePredictionWriter
should be used while using a spawn based accelerator. This happens forTrainer(strategy="ddp_spawn")
or training on 8 TPU cores withTrainer(accelerator="tpu", devices=8)
as predictions won’t be returned.- Parameters:
batch – The output of your data iterable, normally a
DataLoader
.batch_idx – The index of this batch.
dataloader_idx – The index of the dataloader that produced this batch. (only if multiple dataloaders used)
- Returns:
Predicted output (optional).
Example
class MyModel(LightningModule): def predict_step(self, batch, batch_idx, dataloader_idx=0): return self(batch) dm = ... model = MyModel() trainer = Trainer(accelerator="gpu", devices=2) predictions = trainer.predict(model, dm)
- predict_from_pandas(features: DataFrame, data_module: AugmentedBernoulliDataModule, scaling_factors: ndarray = array([1.]), min_augmented_value: float = -inf, max_augmented_value: float = inf) Tensor [source]
- probability_mapping(data_module: AugmentedBernoulliDataModule, other_features: DataFrame, min_value: float, max_value: float, nb_points: int = 100, trainer: Trainer | None = None) Tuple[DataFrame, ndarray, Tensor] [source]
Computes the mapping between covariate data_module.augmented_col and the predicted probability of the neural network on the closed interval [min_value, max_value], all other_features being fixed. The mapping is discretized into nb_points points.
There are two ways to use this function: - call with trainer=None => method self.predict is used directly for inference - call with trainer=pytorch_lightning.Trainer(…) => method predict of pytorch_lightning.Trainer is used for inference
- assert_monotone_probability(data_module: AugmentedBernoulliDataModule, other_features: DataFrame, non_decreasing: bool, min_value: float, max_value: float, nb_points: int = 100, trainer: Trainer | None = None, error_message: str = '')[source]
- plot_probability_mapping(data_module: AugmentedBernoulliDataModule, other_features: DataFrame, min_value: float, max_value: float, nb_points: int = 100, x_title: str = 'Covariate', y_title: str = 'Predicted probability', title: str = 'Evolution of the predicted probability as a function of the covariate', trainer: Trainer | None = None) Figure [source]
Plots the mapping between covariate data_module.augmented_col and the predicted probability of the neural network on the closed interval [min_value, max_value], all other_features being fixed. The mapping is discretized into nb_points points.
There are two ways to use this function: - call with trainer=None => method self.predict is used directly for inference - call with trainer=pytorch_lightning.Trainer(…) => method predict of pytorch_lightning.Trainer is used for inference
- class ProbabilityPredictor(module: Module, data_module: DataModule)[source]
Bases:
Generic
[Module
,DataModule
]Just a pair (MonotoneBernoulliProbability, AugmentedBernoulliDataModule) with useful methods such as predict_from_pandas.
- module: Module
- data_module: DataModule
- classmethod load_from_checkpoint(checkpoint_path: str, module_class: ~typing.Type[~pytorch_utils.modules.Module] = <class 'pytorch_utils.modules.MonotoneBernoulliProbability'>, data_module_class: ~typing.Type[~pytorch_utils.modules.DataModule] = <class 'pytorch_utils.data_modules.AugmentedBernoulliDataModule'>, clear_data: bool = False, compile_module: bool = False, compilation_kwargs: ~typing.Dict[str, ~typing.Any] = {}, **kwargs) ProbabilityPredictor [source]
- clear_data() ProbabilityPredictor[Module, DataModule] [source]
pytorch_utils.utils module
- class CategoricalFeatureEmbedding(feature_name: str, nb_distinct_values: int, embedding_size: int, logger: ~pytorch_utils.logging.loggers.Logger = <pytorch_utils.logging.loggers.VoidLogger object>)[source]
Bases:
object
Collection that simplifies the constructor parameters.
- feature_name: str
- nb_distinct_values: int
- embedding_size: int
- logger: Logger = <pytorch_utils.logging.loggers.VoidLogger object>
- get_embedding_size(nb_categories: int, multiplicative_factor: float = 1.6, power_exponent: float = 0.56, max_size: int = 600) int [source]
Determine empirically good embedding sizes (formula taken from fastai: https://docs.fast.ai/tabular.model.html). :param nb_categories: number of categories :type nb_categories: int :param max_size: maximum embedding size. Defaults to 600. :type max_size: (int, optional)
- Returns:
embedding size
- Return type:
int
- assert_monotone(inputs: ndarray, outputs: ndarray, non_decreasing: bool | None = None, error_message: str = '', tol: float = 1e-05) None [source]
Asserts if the outputs are a monontone function of the inputs. inputs should be a 1-dimensional. outputs should be a 1 or 2-dimensional array. If 2-dimensional, then every row is tested to be a monotone function of the inputs. If non_decreasing is None then an exception is raised only if the mapping is neither non-decreasing, nor non-increasing. If non_decreasing is True then an exception is raised only if the mapping is not non-decreasing. If non_decreasing is False then an exception is raised only if the mapping is not non-increasing.