How to create a custom dataset by loading text data from elasticsearch database on a remote server?

Hi,

I would like to fine-tune a sentence transformer model with some text data stored in elasticsearch on a server. These are very large text data. I am wondering what is the best way to load the text data in the elasticsearch to a dataset?

Thank you

Hi! You can create a generator function that queries the index and yields the results as dictionaries one at a time and pass this function to Dataset.from_generator (or IterableDataset.from_generator if disk space is limited) to generate the HF dataset.

1 Like

Hi Mario,

Thanks for the suggestion. I tired to use Dataset.from_generator(). but got an error like below. The minimal code to reproduce the error is:

from elasticsearch import Elasticsearch
from elasticsearch import helpers
from elasticsearch.connection import create_ssl_context
from datasets import Dataset

estd = Elasticsearch(  username = 'user',
            password = 'password',
            host = 'host',
            ssl_context = create_ssl_context())

def export_index( fields, index, size = 3000):
    source_query = { "_source": fields }
    result = helpers.scan(estd,
                            index = index,
                            query = source_query,
                            size = size)  # Max here is 10000, but setting it this high might result in timeouts
    return(result)

source_index = 'meeting_reports'
fields = ["text", "source_title"]

def my_gen():
    for doc in export_index(fields, source_index, size = 1000):
        yield {'text':doc['_source']['text']}

dataset = Dataset.from_generator(my_gen)

error message:

TypeError                                 Traceback (most recent call last)
Input In [11], in <module>
     23     for doc in export_index(fields, source_index, size = 1000):
     24         yield {'text':doc['_source']['text']}
---> 26 dataset = Dataset.from_generator(my_gen)

File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/datasets/arrow_dataset.py:973, in Dataset.from_generator(generator, features, cache_dir, keep_in_memory, gen_kwargs, **kwargs)
    948 """Create a Dataset from a generator.
    949 
    950 Args:
   (...)
    969 ```
    970 """
    971 from .io.generator import GeneratorDatasetInputStream
--> 973 return GeneratorDatasetInputStream(
    974     generator=generator,
    975     features=features,
    976     cache_dir=cache_dir,
    977     keep_in_memory=keep_in_memory,
    978     gen_kwargs=gen_kwargs,
    979     **kwargs,
    980 ).read()

File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/datasets/io/generator.py:22, in GeneratorDatasetInputStream.__init__(self, generator, features, cache_dir, keep_in_memory, streaming, gen_kwargs, **kwargs)
      9 def __init__(
     10     self,
     11     generator: Callable,
   (...)
     17     **kwargs,
     18 ):
     19     super().__init__(
     20         features=features, cache_dir=cache_dir, keep_in_memory=keep_in_memory, streaming=streaming, **kwargs
     21     )
---> 22     self.builder = Generator(
     23         cache_dir=cache_dir,
     24         features=features,
     25         generator=generator,
     26         gen_kwargs=gen_kwargs,
     27         **kwargs,
     28     )

File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/datasets/builder.py:1292, in GeneratorBasedBuilder.__init__(self, writer_batch_size, *args, **kwargs)
   1291 def __init__(self, *args, writer_batch_size=None, **kwargs):
-> 1292     super().__init__(*args, **kwargs)
   1293     # Batch size used by the ArrowWriter
   1294     # It defines the number of samples that are kept in memory before writing them
   1295     # and also the length of the arrow chunks
   1296     # None means that the ArrowWriter will use its default value
   1297     self._writer_batch_size = writer_batch_size or self.DEFAULT_WRITER_BATCH_SIZE

File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/datasets/builder.py:303, in DatasetBuilder.__init__(self, cache_dir, config_name, hash, base_path, info, features, use_auth_token, repo_id, data_files, data_dir, name, **config_kwargs)
    301 if data_dir is not None:
    302     config_kwargs["data_dir"] = data_dir
--> 303 self.config, self.config_id = self._create_builder_config(
    304     config_name=config_name,
    305     custom_features=features,
    306     **config_kwargs,
    307 )
    309 # prepare info: DatasetInfo are a standardized dataclass across all datasets
    310 # Prefill datasetinfo
    311 if info is None:

File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/datasets/builder.py:471, in DatasetBuilder._create_builder_config(self, config_name, custom_features, **config_kwargs)
    468     raise ValueError(f"BuilderConfig must have a name, got {builder_config.name}")
    470 # compute the config id that is going to be used for caching
--> 471 config_id = builder_config.create_config_id(
    472     config_kwargs,
    473     custom_features=custom_features,
    474 )
    475 is_custom = (config_id not in self.builder_configs) and config_id != "default"
    476 if is_custom:

File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/datasets/builder.py:169, in BuilderConfig.create_config_id(self, config_kwargs, custom_features)
    167             suffix = Hasher.hash(config_kwargs_to_add_to_suffix)
    168     else:
--> 169         suffix = Hasher.hash(config_kwargs_to_add_to_suffix)
    171 if custom_features is not None:
    172     m = Hasher()

File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/datasets/fingerprint.py:237, in Hasher.hash(cls, value)
    235     return cls.dispatch[type(value)](cls, value)
    236 else:
--> 237     return cls.hash_default(value)

File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/datasets/fingerprint.py:230, in Hasher.hash_default(cls, value)
    228 @classmethod
    229 def hash_default(cls, value: Any) -> str:
--> 230     return cls.hash_bytes(dumps(value))

File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/datasets/utils/py_utils.py:625, in dumps(obj)
    623 file = StringIO()
    624 with _no_cache_fields(obj):
--> 625     dump(obj, file)
    626 return file.getvalue()

File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/datasets/utils/py_utils.py:600, in dump(obj, file)
    598 def dump(obj, file):
    599     """pickle an object to a file"""
--> 600     Pickler(file, recurse=True).dump(obj)
    601     return

File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/dill/_dill.py:620, in Pickler.dump(self, obj)
    618     raise PicklingError(msg)
    619 else:
--> 620     StockPickler.dump(self, obj)
    621 return

File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:487, in _Pickler.dump(self, obj)
    485 if self.proto >= 4:
    486     self.framer.start_framing()
--> 487 self.save(obj)
    488 self.write(STOP)
    489 self.framer.end_framing()

File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:560, in _Pickler.save(self, obj, save_persistent_id)
    558 f = self.dispatch.get(t)
    559 if f is not None:
--> 560     f(self, obj)  # Call unbound method with explicit self
    561     return
    563 # Check private dispatch table if any, or else
    564 # copyreg.dispatch_table

File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/dill/_dill.py:1251, in save_module_dict(pickler, obj)
   1248     if is_dill(pickler, child=False) and pickler._session:
   1249         # we only care about session the first pass thru
   1250         pickler._first_pass = False
-> 1251     StockPickler.save_dict(pickler, obj)
   1252     log.info("# D2")
   1253 return

File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:972, in _Pickler.save_dict(self, obj)
    969     self.write(MARK + DICT)
    971 self.memoize(obj)
--> 972 self._batch_setitems(obj.items())

File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:998, in _Pickler._batch_setitems(self, items)
    996     for k, v in tmp:
    997         save(k)
--> 998         save(v)
    999     write(SETITEMS)
   1000 elif n:

File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:560, in _Pickler.save(self, obj, save_persistent_id)
    558 f = self.dispatch.get(t)
    559 if f is not None:
--> 560     f(self, obj)  # Call unbound method with explicit self
    561     return
    563 # Check private dispatch table if any, or else
    564 # copyreg.dispatch_table

File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/datasets/utils/py_utils.py:891, in save_function(pickler, obj)
    888     if state_dict:
    889         state = state, state_dict
--> 891     dill._dill._save_with_postproc(
    892         pickler,
    893         (
    894             dill._dill._create_function,
    895             (obj.__code__, globs, obj.__name__, obj.__defaults__, closure),
    896             state,
    897         ),
    898         obj=obj,
    899         postproc_list=postproc_list,
    900     )
    901 else:
    902     closure = obj.func_closure

File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/dill/_dill.py:1154, in _save_with_postproc(pickler, reduction, is_pickler_dill, obj, postproc_list)
   1152 if source:
   1153     pickler.write(pickler.get(pickler.memo[id(dest)][0]))
-> 1154     pickler._batch_setitems(iter(source.items()))
   1155 else:
   1156     # Updating with an empty dictionary. Same as doing nothing.
   1157     continue

File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:998, in _Pickler._batch_setitems(self, items)
    996     for k, v in tmp:
    997         save(k)
--> 998         save(v)
    999     write(SETITEMS)
   1000 elif n:

File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:603, in _Pickler.save(self, obj, save_persistent_id)
    599     raise PicklingError("Tuple returned by %s must have "
    600                         "two to six elements" % reduce)
    602 # Save the reduce() output and finally memoize the object
--> 603 self.save_reduce(obj=obj, *rv)

File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:717, in _Pickler.save_reduce(self, func, args, state, listitems, dictitems, state_setter, obj)
    715 if state is not None:
    716     if state_setter is None:
--> 717         save(state)
    718         write(BUILD)
    719     else:
    720         # If a state_setter is specified, call it instead of load_build
    721         # to update obj's with its previous state.
    722         # First, push state_setter and its tuple of expected arguments
    723         # (obj, state) onto the stack.

File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:560, in _Pickler.save(self, obj, save_persistent_id)
    558 f = self.dispatch.get(t)
    559 if f is not None:
--> 560     f(self, obj)  # Call unbound method with explicit self
    561     return
    563 # Check private dispatch table if any, or else
    564 # copyreg.dispatch_table

File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/dill/_dill.py:1251, in save_module_dict(pickler, obj)
   1248     if is_dill(pickler, child=False) and pickler._session:
   1249         # we only care about session the first pass thru
   1250         pickler._first_pass = False
-> 1251     StockPickler.save_dict(pickler, obj)
   1252     log.info("# D2")
   1253 return

File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:972, in _Pickler.save_dict(self, obj)
    969     self.write(MARK + DICT)
    971 self.memoize(obj)
--> 972 self._batch_setitems(obj.items())

File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:998, in _Pickler._batch_setitems(self, items)
    996     for k, v in tmp:
    997         save(k)
--> 998         save(v)
    999     write(SETITEMS)
   1000 elif n:

File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:603, in _Pickler.save(self, obj, save_persistent_id)
    599     raise PicklingError("Tuple returned by %s must have "
    600                         "two to six elements" % reduce)
    602 # Save the reduce() output and finally memoize the object
--> 603 self.save_reduce(obj=obj, *rv)

File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:717, in _Pickler.save_reduce(self, func, args, state, listitems, dictitems, state_setter, obj)
    715 if state is not None:
    716     if state_setter is None:
--> 717         save(state)
    718         write(BUILD)
    719     else:
    720         # If a state_setter is specified, call it instead of load_build
    721         # to update obj's with its previous state.
    722         # First, push state_setter and its tuple of expected arguments
    723         # (obj, state) onto the stack.

File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:560, in _Pickler.save(self, obj, save_persistent_id)
    558 f = self.dispatch.get(t)
    559 if f is not None:
--> 560     f(self, obj)  # Call unbound method with explicit self
    561     return
    563 # Check private dispatch table if any, or else
    564 # copyreg.dispatch_table

File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/dill/_dill.py:1251, in save_module_dict(pickler, obj)
   1248     if is_dill(pickler, child=False) and pickler._session:
   1249         # we only care about session the first pass thru
   1250         pickler._first_pass = False
-> 1251     StockPickler.save_dict(pickler, obj)
   1252     log.info("# D2")
   1253 return

File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:972, in _Pickler.save_dict(self, obj)
    969     self.write(MARK + DICT)
    971 self.memoize(obj)
--> 972 self._batch_setitems(obj.items())

File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:998, in _Pickler._batch_setitems(self, items)
    996     for k, v in tmp:
    997         save(k)
--> 998         save(v)
    999     write(SETITEMS)
   1000 elif n:

File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:560, in _Pickler.save(self, obj, save_persistent_id)
    558 f = self.dispatch.get(t)
    559 if f is not None:
--> 560     f(self, obj)  # Call unbound method with explicit self
    561     return
    563 # Check private dispatch table if any, or else
    564 # copyreg.dispatch_table

File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/dill/_dill.py:1251, in save_module_dict(pickler, obj)
   1248     if is_dill(pickler, child=False) and pickler._session:
   1249         # we only care about session the first pass thru
   1250         pickler._first_pass = False
-> 1251     StockPickler.save_dict(pickler, obj)
   1252     log.info("# D2")
   1253 return

File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:972, in _Pickler.save_dict(self, obj)
    969     self.write(MARK + DICT)
    971 self.memoize(obj)
--> 972 self._batch_setitems(obj.items())

File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:998, in _Pickler._batch_setitems(self, items)
    996     for k, v in tmp:
    997         save(k)
--> 998         save(v)
    999     write(SETITEMS)
   1000 elif n:

File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:578, in _Pickler.save(self, obj, save_persistent_id)
    576 reduce = getattr(obj, "__reduce_ex__", None)
    577 if reduce is not None:
--> 578     rv = reduce(self.proto)
    579 else:
    580     reduce = getattr(obj, "__reduce__", None)

TypeError: cannot pickle 'SSLContext' object

Please help, thanks very much

1 Like

I also have this issue with the from_generator() method when trying to use data from a nosql database.

Linking the relevant Discord thread (HF server): Discord