How to wrap a generator with HF dataset

Hey there, I have used seqio to get a well distributed mixture of samples from multiple dataset. However the resultant output from seqio is a python generator dict, which I cannot produce back into huggingface dataset.

The generator contains all the samples needed for training the model but I cannot convert it into a huggingface dataset.

The code looks like this:

for ex in seqio_data:
print(ex[“text”])

I need to convert the seqio_data (generator) into huggingface dataset.

Hi ! Right now to do this you have to define your dataset using a dataset script, in which you can define your generator.

We could however add something similar to ds = Dataset.from_iterable(seqio_data) to make it simpler though. What do you think ?

@lhoestq, thanks a ton for the help.

btw it would be cool to have a method like Dataset.from_iterable() similar to tensorflow datasets .from_generator() method.

if it design decision works out would love to take part in contributing to HF dataset. pls keep me in the loop.

1 Like

@lhoestq , hey i did as you instructed, but sadly i cannot get pass through the download_manager, as i dont have anything to download. i was skipping the def _split_generators(self, dl_manager): function. but i cannot get around it. I get a NotImplementedError:

the following is my code for the same:

import datasets 
import functools
import glob 
from datasets import load_from_disk
import seqio
import tensorflow as tf
import t5.data
from datasets import load_dataset
from t5.data import postprocessors
from t5.data import preprocessors
from t5.evaluation import metrics
from seqio import FunctionDataSource, utils

TaskRegistry = seqio.TaskRegistry

data_path = glob.glob("/home/stephen/Desktop/MEGA_CORPUS/COMBINED_CORPUS/*", recursive=False)


def gen_dataset(split, shuffle=False, seed=None, column="text", dataset_path=None):
    dataset = load_from_disk(dataset_path)
    if shuffle:
        if seed:
            dataset = dataset.shuffle(seed=seed)
        else:
            dataset = dataset.shuffle()
    while True:
        for item in dataset[str(split)]:
            yield item[column]


def dataset_fn(split, shuffle_files, seed=None, dataset_path=None):
    return tf.data.Dataset.from_generator(
        functools.partial(gen_dataset, split, shuffle_files, seed, dataset_path=dataset_path),
        output_signature=tf.TensorSpec(shape=(), dtype=tf.string, name=dataset_path)
    )

@utils.map_over_dataset
def target_to_key(x, key_map, target_key):
    """Assign the value from the dataset to target_key in key_map"""
    return {**key_map, target_key: x}


_CITATION = "Not ready yet"
_DESCRIPTION = "a custom seqio based mixed samples on a given temperature value, that again returns a dataset in HF dataset format well samples on the Mixture temperature"
_HOMEPAGE = "ldcil.org"

class CustomSeqio(datasets.GeneratorBasedBuilder):

    def _info(self):
        return datasets.DatasetInfo(
            description=_DESCRIPTION,
            features=datasets.Features(
                {
                    "text": datasets.Value("string"),
                }
            ),
            homepage="https://ldcil.org",
            citation=_CITATION,)

def generate_examples(self):
    seqio_train_list = []
    for lang in data_path:
        dataset_name = lang.split("/")[-1]
        dataset_shapes = None 

        TaskRegistry.add(
        str(dataset_name),
        source=seqio.FunctionDataSource(
            dataset_fn=functools.partial(dataset_fn, dataset_path=lang),
            splits=("train", "test"),
            caching_permitted=False,
            num_input_examples=dataset_shapes,
        ),
        preprocessors=[
        functools.partial(
        target_to_key, key_map={
        "targets": None,
        }, target_key="targets")],
            output_features={"targets": seqio.Feature(vocabulary=seqio.PassThroughVocabulary, add_eos=False, dtype=tf.string, rank=0)},
            metric_fns=[]
        )

        seqio_train_dataset = seqio.get_mixture_or_task(dataset_name).get_dataset(
        sequence_length=None,
        split="train",
        shuffle=True,
        num_epochs=1,
        shard_info=seqio.ShardInfo(index=0, num_shards=10),
        use_cached=False,
        seed=42)
        seqio_train_list.append(seqio_train_dataset)
    
    lang_name_list = []
    for lang in data_path:
        lang_name = lang.split("/")[-1]
        lang_name_list.append(lang_name)

    seqio_mixture = seqio.MixtureRegistry.add(
        "seqio_mixture",
        lang_name_list,
        default_rate=0.7)
    
    seqio_mixture_dataset = seqio.get_mixture_or_task("seqio_mixture").get_dataset(
    sequence_length=None,
    split="train",
    shuffle=True,
    num_epochs=1,
    shard_info=seqio.ShardInfo(index=0, num_shards=10),
    use_cached=False,
    seed=42)

    for id, ex in  enumerate(seqio_mixture_dataset):
        yield id, {"text": ex["targets"].numpy().decode()}

and i load it by:

seqio_mixture = load_dataset("seqio_loader")

@lhoestq , just to make things clear …

the following is my original code, thats not in the HF dataset loading script:

import functools
import seqio
import tensorflow as tf
import t5.data
from datasets import load_from_disk
from t5.data import postprocessors
from t5.data import preprocessors
from t5.evaluation import metrics
from seqio import FunctionDataSource, utils
import glob 

TaskRegistry = seqio.TaskRegistry



def gen_dataset(split, shuffle=False, seed=None, column="text", dataset_path=None):
    dataset = load_from_disk(dataset_path)
    if shuffle:
        if seed:
            dataset = dataset.shuffle(seed=seed)
        else:
            dataset = dataset.shuffle()
    while True:
        for item in dataset[str(split)]:
            yield item[column]


def dataset_fn(split, shuffle_files, seed=None, dataset_path=None):
    return tf.data.Dataset.from_generator(
        functools.partial(gen_dataset, split, shuffle_files, seed, dataset_path=dataset_path),
        output_signature=tf.TensorSpec(shape=(), dtype=tf.string, name=dataset_path)
    )


@utils.map_over_dataset
def target_to_key(x, key_map, target_key):
    """Assign the value from the dataset to target_key in key_map"""
    return {**key_map, target_key: x}

data_path = glob.glob("/home/stephen/Desktop/MEGA_CORPUS/COMBINED_CORPUS/*", recursive=False)

seqio_train_list = []

for lang in data_path:
    dataset_name = lang.split("/")[-1]
    dataset_shapes = None 

    TaskRegistry.add(
    str(dataset_name),
    source=seqio.FunctionDataSource(
        dataset_fn=functools.partial(dataset_fn, dataset_path=lang),
        splits=("train", "test"),
        caching_permitted=False,
        num_input_examples=dataset_shapes,
    ),
    preprocessors=[
    functools.partial(
    target_to_key, key_map={
    "targets": None,
    }, target_key="targets")],
        output_features={"targets": seqio.Feature(vocabulary=seqio.PassThroughVocabulary, add_eos=False, dtype=tf.string, rank=0)},
        metric_fns=[]
    )

    seqio_train_dataset = seqio.get_mixture_or_task(dataset_name).get_dataset(
    sequence_length=None,
    split="train",
    shuffle=True,
    num_epochs=1,
    shard_info=seqio.ShardInfo(index=0, num_shards=10),
    use_cached=False,
    seed=42)
    seqio_train_list.append(seqio_train_dataset)

lang_name_list = []
for lang in data_path:
    lang_name = lang.split("/")[-1]
    lang_name_list.append(lang_name)

seqio_mixture = seqio.MixtureRegistry.add(
  "seqio_mixture",
  lang_name_list,
  default_rate=0.7
)

seqio_mixture_dataset = seqio.get_mixture_or_task("seqio_mixture").get_dataset(
    sequence_length=None,
    split="train",
    shuffle=True,
    num_epochs=1,
    shard_info=seqio.ShardInfo(index=0, num_shards=10),
    use_cached=False,
    seed=42)

for _, ex in zip(range(15), seqio_mixture_dataset):
    print(ex["targets"].numpy().decode())

where the seqio_mixture_dataset is the generator that i wanted to be wrapped in HF dataset.

also additionally, could you please tell me how do i set the default_rate=0.7 args where seqio_mixture is defined to be made as a custom option in the HF load_dataset() method,

maybe like this:
seqio_mixture_dataset = datasets.load_dataset("seqio_loader",temperature=0.5)

You need to implement _split_generators, in your case it might just return a train split generator.

Regarding the temperature parameter, you can add it as part of a configuration of your dataset.
Every extra parameter passed to load_dataset is passed to your dataset configuration.

See Create a dataset loading script on how to define a configuration class. In _generate_examples, you can access the configuration with self.config