How to use ptls.data_load.datasets
datasets
Here are the datasets (torch.utils.data.Dataset
) which assure interface to the data.
For data prepared in memory use:
MemoryMapDataset
withi_filters
AugmentationDataset
withf_augmentations
if needed- endpoint map dataset from
ptls.frames
For small (map mode) parquet data use:
ParquetDataset
withi_filters
PersistDataset
AugmentationDataset
withf_augmentations
if needed- endpoint map dataset from
ptls.frames
For large (iterable mode) parquet data use:
ParquetDataset
withi_filters
AugmentationDataset
withf_augmentations
if needed- endpoint iterable dataset from
ptls.frames
Other dataset order and combination are possible but not tested.
Simple example
Dict features in a list is a simple example of data.
Python's list
have the same interface as torch.Dataset
, so you can just provide it to dataloader.
import torch
from ptls.data_load.utils import collate_feature_dict
data_list = [
{
'mcc': torch.arange(seq_len),
'id': f'user_{i}'
}
for i, seq_len in enumerate([4, 3, 6])
]
dl = torch.utils.data.DataLoader(
dataset=data_list,
collate_fn=collate_feature_dict,
batch_size=2,
)
for batch in dl:
print(batch.payload, batch.seq_lens, sep='\n')
In this example we use simple list as dataset.
Sometimes you need to make changes in the dataset. We propose a filter approach for this.
map and iterable
There are the types of torch datasets.
More info about dataset types to understand map
and iterable
.
Dataloader choose a way of iteration based on type his dataset.
In out pipeline Dataloader works with endpoint dataset from ptls.frames
.
So the type of endpoint dataset from ptls.frames
choose a way of iteration.
Map dataset provide better shuffle. Iterable dataset requires less memory.
Warning for multiprocessing dataloader
Each worker use the same source data.
Map dataloader knows dataset
len
and usessampler
to randomly split all indexes fromrange(o, len)
between workers. So each worker use his own part of data.Iterable dataloader can just iterate over the source data. In default case each worker iterate the same data and output are multiplied by worker count.
To avoid this iterable datasets should implement a way to split a data between workers.
Multiprocessing split implementation:
ParquetDataset
implement split it and works correcti_filters
andf_augmentations
don't contain a data and works correct- Iterable endpoint datasets works correct with iterable source
- Iterable endpoint datasets multiply data with map source
PersistDataset
iterate input during initialisation. Usually this happens out of dataloader in single main process. So it works correct.
i_filters
and f_augmentations
i_filters
- iterable filtersf_augmentations
- augmentation functions
Filters
ptls
propose filters for dataset transformation. All of them are in ptls.data_load.iterable_processing
.
These filter implemented in generator-style. Call filter object to get generator with modified records.
from ptls.data_load.iterable_processing import SeqLenFilter
i_filter = SeqLenFilter(min_seq_len=4)
for rec in i_filter(data_list):
print(rec)
There were 3 examples in the list, it became 2 cause SeqLenFilter drop short sequence.
Many kinds of filters possible: dropping records, multiply records, records transformation.
i_filters
can be chained. Datasets provide a convenient way to do it.
Many datasets in ptls.data_load.datasets
support i_filters
.
They takes i_filters
as list of iterable_processing
objects.
Augmentations
Sometimes we have to change an items from train data. This is augmentations
.
They are in ptls.data_load.augmentations
.
Example:
from ptls.data_load.augmentations import RandomSlice
f_augmentation = RandomSlice(min_len=4, max_len=10)
for rec in data_list:
new_rec = f_augmentation(rec)
print(new_rec)
Here RandomSlice
augmentation take a random slice from source record.
Compare
i_filter |
f_augmentation |
---|---|
May change record. Result is always the same | May change record. Result is random |
Place it be before persist stage to run it once and save total cpu resource | Don't place it before persist stage because it kills the random |
Can delete items | Can not delete items |
Can yield new items | Can not create new items |
Works a generator and requires iterable processing | Works as a function can be both map or iterable |
In memory data
In memory data is common case. Data can a list or generator with feature dicts.
import torch
import random
data_list = [
{
'mcc': torch.arange(seq_len),
'id': f'user_{i}'
}
for i, seq_len in enumerate([4, 3, 6])
]
def data_gen(n):
for i in range(n):
seq_len = random.randint(4, 8)
yield {
'mcc': torch.arange(seq_len),
'id': f'user_{i}'
}
ptls.data_load.datasets.MemoryMapDataset
:
- implements
map
dataset - iterates over the data and stores it in an internal list
- looks like a list
ptls.data_load.datasets.MemoryIterableDataset
:
- implements
iterable
dataset - just iterates over the data
- looks like a generator
Warning
Currently
MemoryIterableDataset
don`t support initial data split between workers. We don't recommend use it without modification.
Both datasets support any kind of input: list or generator. As all datasets supports tha same format (list or generator) as input and output they can be chained. This make sense for some cases.
Data pipelines:
list
input withMemoryMapDataset
- dataset keep modified withi_filters
data. Original data is unchanged.i_filters
applied once for each record. This assures fast item access but slow start. You should wait until all data are passed throughi_filters
.generator
input withMemoryMapDataset
- dataset iterate over generator and keep the result in memory. More memory are used, but faster access is possible.i_filters
applied once for each record. Freezes items taken from generator if it uses some random during generation.list
withMemoryIterableDataset
- take more times for data access causei_filters
applied during each record access (for each epoch). Faster start, you don't wait until all data are passed throughi_filters
.generator
input withMemoryIterableDataset
- generator output modified withi_filters
data. Less memory used. Infinite dataset is possible.
Example:
import torch
from ptls.data_load.datasets import MemoryMapDataset
from ptls.data_load.utils import collate_feature_dict
from ptls.data_load.iterable_processing import SeqLenFilter, FeatureRename
data_list = [
{
'mcc': torch.arange(seq_len),
'id': f'user_{i}'
}
for i, seq_len in enumerate([4, 3, 6, 2, 8, 3, 5, 4])
]
dataset = MemoryMapDataset(
data=data_list,
i_filters=[
SeqLenFilter(min_seq_len=4),
FeatureRename({'id': 'user_id'}),
]
)
dl = torch.utils.data.DataLoader(
dataset=dataset,
collate_fn=collate_feature_dict,
batch_size=10,
)
for batch in dl:
print(batch.payload, batch.seq_lens, sep='\n')
Parquet file read
For large amount of data pyspark
is possible engine to prepare data and convert it in feature dict format.
See demo/pyspark-parquet.ipynb
with example of data preprocessing with pyspark
and parquet file preparation.
ptls.data_load.datasets.ParquetDataset
is a dataset which reads parquet files with feature dicts.
ptls.data_load.datasets.ParquetDataset
:
- implements
iterable
dataset - works correct with multiprocessing dataloader
- looks like a generator
- supports
i_filters
You can feed ParquetDataset
directly fo dataloader for iterable
way of usage.
Cou can combine ParquetDataset
with MemoryMapDataset
to map
way of usage.
ParquetDataset
requires parquet file names. Usually spark
saves many parquet files for one dataset,
depending on the number of partitions.
You can get all file names with ptls.data_load.datasets.ParquetFiles
or ptls.data_load.datasets.parquet_file_scan
.
Many files for one dataset allows you to:
- control amount of data by reading more or less files
- split data on train, valid, test
Persist dataset
ptls.data_load.datasets.PersistDataset
store items from source dataset to the memory.
If you source data is iterator (like python generator or ParquetDataset
)
all i_filters
will be called each time when you access the data.
Persist the data into memory and i_filters
will be called once.
Much memory may be used to store all dataset items.
Data access is faster.
Persisted iterator have len
and can be randomly accessed by index.
Augmentations
Class ptls.data_load.datasets.AugmentationDataset
is a way to apply augmentations.
Example:
from ptls.data_load.datasets import AugmentationDataset, PersistDataset, ParquetDataset
from ptls.data_load.augmentations import AllTimeShuffle, DropoutTrx
train_data = AugmentationDataset(
f_augmentations=[
AllTimeShuffle(),
DropoutTrx(trx_dropout=0.01),
],
data=PersistDataset(
data=ParquetDataset(...),
),
)
Here we are using iterable ParquetDataset
as the source, loading it into memory using PersistDataset
.
Then, each time we access the data, we apply two augmentation functions to the items stored in the PersistDataset
.
AugmentationDataset
also works in iterable mode. Previous example will be like this:
train_data = AugmentationDataset(
f_augmentations=[
AllTimeShuffle(),
DropoutTrx(trx_dropout=0.01),
],
data=ParquetDataset(...),
)
Classes and functions
See docstrings for classes:
ptls.data_load.datasets.MemoryMapDataset
ptls.data_load.datasets.MemoryIterableDataset
ptls.data_load.datasets.ParquetFiles
ptls.data_load.datasets.ParquetDataset
ptls.data_load.datasets.PersistDataset
See docstrings for functions:
ptls.data_load.datasets.parquet_file_scan