TorchDistributor

class pyspark.ml.torch.distributor.TorchDistributor(num_processes: int = 1, local_mode: bool = True, use_gpu: bool = True, _ssl_conf: str = 'pytorch.spark.distributor.ignoreSsl')[source]

A class to support distributed training on PyTorch and PyTorch Lightning using PySpark.

New in version 3.4.0.

Changed in version 3.5.0: Supports Spark Connect.

Parameters
num_processesint, optional

An integer that determines how many different concurrent tasks are allowed. We expect spark.task.gpus = 1 for GPU-enabled training. Default should be 1; we don’t want to invoke multiple cores/gpus without explicit mention.

local_modebool, optional

A boolean that determines whether we are using the driver node for training. Default should be false; we don’t want to invoke executors without explicit mention.

use_gpubool, optional

A boolean that indicates whether or not we are doing training on the GPU. Note that there are differences in how GPU-enabled code looks like and how CPU-specific code looks like.

Examples

Run PyTorch Training locally on GPU (using a PyTorch native function)

>>> def train(learning_rate):
...     import torch.distributed
...     torch.distributed.init_process_group(backend="nccl")
...     # ...
...     torch.destroy_process_group()
...     return model # or anything else
...
>>> distributor = TorchDistributor(
...     num_processes=2,
...     local_mode=True,
...     use_gpu=True)
>>> model = distributor.run(train, 1e-3)

Run PyTorch Training on GPU (using a file with PyTorch code)

>>> distributor = TorchDistributor(
...     num_processes=2,
...     local_mode=False,
...     use_gpu=True)
>>> distributor.run("/path/to/train.py", "--learning-rate=1e-3")

Run PyTorch Lightning Training on GPU

>>> num_proc = 2
>>> def train():
...     from pytorch_lightning import Trainer
...     # ...
...     # required to set devices = 1 and num_nodes = num_processes for multi node
...     # required to set devices = num_processes and num_nodes = 1 for single node multi GPU
...     trainer = Trainer(accelerator="gpu", devices=1, num_nodes=num_proc, strategy="ddp")
...     trainer.fit()
...     # ...
...     return trainer
...
>>> distributor = TorchDistributor(
...     num_processes=num_proc,
...     local_mode=True,
...     use_gpu=True)
>>> trainer = distributor.run(train)

Methods

run(train_object, *args, **kwargs)

Runs distributed training.

Methods Documentation

run(train_object: Union[Callable, str], *args: Any, **kwargs: Any) → Optional[Any][source]

Runs distributed training.

Parameters
train_objectcallable object or str

Either a PyTorch function, PyTorch Lightning function, or the path to a python file that launches distributed training.

args :

If train_object is a python function and not a path to a python file, args need to be the input parameters to that function. It would look like

>>> model = distributor.run(train, 1e-3, 64)

where train is a function and 1e-3 and 64 are regular numeric inputs to the function.

If train_object is a python file, then args would be the command-line arguments for that python file which are all in the form of strings. An example would be

>>> distributor.run("/path/to/train.py", "--learning-rate=1e-3", "--batch-size=64")

where since the input is a path, all of the parameters are strings that can be handled by argparse in that python file.

kwargs :

If train_object is a python function and not a path to a python file, kwargs need to be the key-word input parameters to that function. It would look like

>>> model = distributor.run(train, tol=1e-3, max_iter=64)

where train is a function of 2 arguments tol and max_iter.

If train_object is a python file, then you should not set kwargs arguments.

Returns
Returns the output of train_object called with args inside spark rank 0 task if the

train_object is a Callable with an expected output. Returns None if train_object is a file.