Hi Mario,
Thanks for the suggestion. I tired to use Dataset.from_generator(). but got an error like below. The minimal code to reproduce the error is:
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from elasticsearch.connection import create_ssl_context
from datasets import Dataset
estd = Elasticsearch( username = 'user',
password = 'password',
host = 'host',
ssl_context = create_ssl_context())
def export_index( fields, index, size = 3000):
source_query = { "_source": fields }
result = helpers.scan(estd,
index = index,
query = source_query,
size = size) # Max here is 10000, but setting it this high might result in timeouts
return(result)
source_index = 'meeting_reports'
fields = ["text", "source_title"]
def my_gen():
for doc in export_index(fields, source_index, size = 1000):
yield {'text':doc['_source']['text']}
dataset = Dataset.from_generator(my_gen)
error message:
TypeError Traceback (most recent call last)
Input In [11], in <module>
23 for doc in export_index(fields, source_index, size = 1000):
24 yield {'text':doc['_source']['text']}
---> 26 dataset = Dataset.from_generator(my_gen)
File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/datasets/arrow_dataset.py:973, in Dataset.from_generator(generator, features, cache_dir, keep_in_memory, gen_kwargs, **kwargs)
948 """Create a Dataset from a generator.
949
950 Args:
(...)
969 ```
970 """
971 from .io.generator import GeneratorDatasetInputStream
--> 973 return GeneratorDatasetInputStream(
974 generator=generator,
975 features=features,
976 cache_dir=cache_dir,
977 keep_in_memory=keep_in_memory,
978 gen_kwargs=gen_kwargs,
979 **kwargs,
980 ).read()
File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/datasets/io/generator.py:22, in GeneratorDatasetInputStream.__init__(self, generator, features, cache_dir, keep_in_memory, streaming, gen_kwargs, **kwargs)
9 def __init__(
10 self,
11 generator: Callable,
(...)
17 **kwargs,
18 ):
19 super().__init__(
20 features=features, cache_dir=cache_dir, keep_in_memory=keep_in_memory, streaming=streaming, **kwargs
21 )
---> 22 self.builder = Generator(
23 cache_dir=cache_dir,
24 features=features,
25 generator=generator,
26 gen_kwargs=gen_kwargs,
27 **kwargs,
28 )
File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/datasets/builder.py:1292, in GeneratorBasedBuilder.__init__(self, writer_batch_size, *args, **kwargs)
1291 def __init__(self, *args, writer_batch_size=None, **kwargs):
-> 1292 super().__init__(*args, **kwargs)
1293 # Batch size used by the ArrowWriter
1294 # It defines the number of samples that are kept in memory before writing them
1295 # and also the length of the arrow chunks
1296 # None means that the ArrowWriter will use its default value
1297 self._writer_batch_size = writer_batch_size or self.DEFAULT_WRITER_BATCH_SIZE
File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/datasets/builder.py:303, in DatasetBuilder.__init__(self, cache_dir, config_name, hash, base_path, info, features, use_auth_token, repo_id, data_files, data_dir, name, **config_kwargs)
301 if data_dir is not None:
302 config_kwargs["data_dir"] = data_dir
--> 303 self.config, self.config_id = self._create_builder_config(
304 config_name=config_name,
305 custom_features=features,
306 **config_kwargs,
307 )
309 # prepare info: DatasetInfo are a standardized dataclass across all datasets
310 # Prefill datasetinfo
311 if info is None:
File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/datasets/builder.py:471, in DatasetBuilder._create_builder_config(self, config_name, custom_features, **config_kwargs)
468 raise ValueError(f"BuilderConfig must have a name, got {builder_config.name}")
470 # compute the config id that is going to be used for caching
--> 471 config_id = builder_config.create_config_id(
472 config_kwargs,
473 custom_features=custom_features,
474 )
475 is_custom = (config_id not in self.builder_configs) and config_id != "default"
476 if is_custom:
File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/datasets/builder.py:169, in BuilderConfig.create_config_id(self, config_kwargs, custom_features)
167 suffix = Hasher.hash(config_kwargs_to_add_to_suffix)
168 else:
--> 169 suffix = Hasher.hash(config_kwargs_to_add_to_suffix)
171 if custom_features is not None:
172 m = Hasher()
File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/datasets/fingerprint.py:237, in Hasher.hash(cls, value)
235 return cls.dispatch[type(value)](cls, value)
236 else:
--> 237 return cls.hash_default(value)
File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/datasets/fingerprint.py:230, in Hasher.hash_default(cls, value)
228 @classmethod
229 def hash_default(cls, value: Any) -> str:
--> 230 return cls.hash_bytes(dumps(value))
File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/datasets/utils/py_utils.py:625, in dumps(obj)
623 file = StringIO()
624 with _no_cache_fields(obj):
--> 625 dump(obj, file)
626 return file.getvalue()
File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/datasets/utils/py_utils.py:600, in dump(obj, file)
598 def dump(obj, file):
599 """pickle an object to a file"""
--> 600 Pickler(file, recurse=True).dump(obj)
601 return
File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/dill/_dill.py:620, in Pickler.dump(self, obj)
618 raise PicklingError(msg)
619 else:
--> 620 StockPickler.dump(self, obj)
621 return
File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:487, in _Pickler.dump(self, obj)
485 if self.proto >= 4:
486 self.framer.start_framing()
--> 487 self.save(obj)
488 self.write(STOP)
489 self.framer.end_framing()
File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:560, in _Pickler.save(self, obj, save_persistent_id)
558 f = self.dispatch.get(t)
559 if f is not None:
--> 560 f(self, obj) # Call unbound method with explicit self
561 return
563 # Check private dispatch table if any, or else
564 # copyreg.dispatch_table
File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/dill/_dill.py:1251, in save_module_dict(pickler, obj)
1248 if is_dill(pickler, child=False) and pickler._session:
1249 # we only care about session the first pass thru
1250 pickler._first_pass = False
-> 1251 StockPickler.save_dict(pickler, obj)
1252 log.info("# D2")
1253 return
File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:972, in _Pickler.save_dict(self, obj)
969 self.write(MARK + DICT)
971 self.memoize(obj)
--> 972 self._batch_setitems(obj.items())
File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:998, in _Pickler._batch_setitems(self, items)
996 for k, v in tmp:
997 save(k)
--> 998 save(v)
999 write(SETITEMS)
1000 elif n:
File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:560, in _Pickler.save(self, obj, save_persistent_id)
558 f = self.dispatch.get(t)
559 if f is not None:
--> 560 f(self, obj) # Call unbound method with explicit self
561 return
563 # Check private dispatch table if any, or else
564 # copyreg.dispatch_table
File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/datasets/utils/py_utils.py:891, in save_function(pickler, obj)
888 if state_dict:
889 state = state, state_dict
--> 891 dill._dill._save_with_postproc(
892 pickler,
893 (
894 dill._dill._create_function,
895 (obj.__code__, globs, obj.__name__, obj.__defaults__, closure),
896 state,
897 ),
898 obj=obj,
899 postproc_list=postproc_list,
900 )
901 else:
902 closure = obj.func_closure
File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/dill/_dill.py:1154, in _save_with_postproc(pickler, reduction, is_pickler_dill, obj, postproc_list)
1152 if source:
1153 pickler.write(pickler.get(pickler.memo[id(dest)][0]))
-> 1154 pickler._batch_setitems(iter(source.items()))
1155 else:
1156 # Updating with an empty dictionary. Same as doing nothing.
1157 continue
File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:998, in _Pickler._batch_setitems(self, items)
996 for k, v in tmp:
997 save(k)
--> 998 save(v)
999 write(SETITEMS)
1000 elif n:
File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:603, in _Pickler.save(self, obj, save_persistent_id)
599 raise PicklingError("Tuple returned by %s must have "
600 "two to six elements" % reduce)
602 # Save the reduce() output and finally memoize the object
--> 603 self.save_reduce(obj=obj, *rv)
File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:717, in _Pickler.save_reduce(self, func, args, state, listitems, dictitems, state_setter, obj)
715 if state is not None:
716 if state_setter is None:
--> 717 save(state)
718 write(BUILD)
719 else:
720 # If a state_setter is specified, call it instead of load_build
721 # to update obj's with its previous state.
722 # First, push state_setter and its tuple of expected arguments
723 # (obj, state) onto the stack.
File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:560, in _Pickler.save(self, obj, save_persistent_id)
558 f = self.dispatch.get(t)
559 if f is not None:
--> 560 f(self, obj) # Call unbound method with explicit self
561 return
563 # Check private dispatch table if any, or else
564 # copyreg.dispatch_table
File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/dill/_dill.py:1251, in save_module_dict(pickler, obj)
1248 if is_dill(pickler, child=False) and pickler._session:
1249 # we only care about session the first pass thru
1250 pickler._first_pass = False
-> 1251 StockPickler.save_dict(pickler, obj)
1252 log.info("# D2")
1253 return
File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:972, in _Pickler.save_dict(self, obj)
969 self.write(MARK + DICT)
971 self.memoize(obj)
--> 972 self._batch_setitems(obj.items())
File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:998, in _Pickler._batch_setitems(self, items)
996 for k, v in tmp:
997 save(k)
--> 998 save(v)
999 write(SETITEMS)
1000 elif n:
File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:603, in _Pickler.save(self, obj, save_persistent_id)
599 raise PicklingError("Tuple returned by %s must have "
600 "two to six elements" % reduce)
602 # Save the reduce() output and finally memoize the object
--> 603 self.save_reduce(obj=obj, *rv)
File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:717, in _Pickler.save_reduce(self, func, args, state, listitems, dictitems, state_setter, obj)
715 if state is not None:
716 if state_setter is None:
--> 717 save(state)
718 write(BUILD)
719 else:
720 # If a state_setter is specified, call it instead of load_build
721 # to update obj's with its previous state.
722 # First, push state_setter and its tuple of expected arguments
723 # (obj, state) onto the stack.
File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:560, in _Pickler.save(self, obj, save_persistent_id)
558 f = self.dispatch.get(t)
559 if f is not None:
--> 560 f(self, obj) # Call unbound method with explicit self
561 return
563 # Check private dispatch table if any, or else
564 # copyreg.dispatch_table
File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/dill/_dill.py:1251, in save_module_dict(pickler, obj)
1248 if is_dill(pickler, child=False) and pickler._session:
1249 # we only care about session the first pass thru
1250 pickler._first_pass = False
-> 1251 StockPickler.save_dict(pickler, obj)
1252 log.info("# D2")
1253 return
File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:972, in _Pickler.save_dict(self, obj)
969 self.write(MARK + DICT)
971 self.memoize(obj)
--> 972 self._batch_setitems(obj.items())
File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:998, in _Pickler._batch_setitems(self, items)
996 for k, v in tmp:
997 save(k)
--> 998 save(v)
999 write(SETITEMS)
1000 elif n:
File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:560, in _Pickler.save(self, obj, save_persistent_id)
558 f = self.dispatch.get(t)
559 if f is not None:
--> 560 f(self, obj) # Call unbound method with explicit self
561 return
563 # Check private dispatch table if any, or else
564 # copyreg.dispatch_table
File ~/miniforge3/envs/es_env/lib/python3.10/site-packages/dill/_dill.py:1251, in save_module_dict(pickler, obj)
1248 if is_dill(pickler, child=False) and pickler._session:
1249 # we only care about session the first pass thru
1250 pickler._first_pass = False
-> 1251 StockPickler.save_dict(pickler, obj)
1252 log.info("# D2")
1253 return
File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:972, in _Pickler.save_dict(self, obj)
969 self.write(MARK + DICT)
971 self.memoize(obj)
--> 972 self._batch_setitems(obj.items())
File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:998, in _Pickler._batch_setitems(self, items)
996 for k, v in tmp:
997 save(k)
--> 998 save(v)
999 write(SETITEMS)
1000 elif n:
File ~/miniforge3/envs/es_env/lib/python3.10/pickle.py:578, in _Pickler.save(self, obj, save_persistent_id)
576 reduce = getattr(obj, "__reduce_ex__", None)
577 if reduce is not None:
--> 578 rv = reduce(self.proto)
579 else:
580 reduce = getattr(obj, "__reduce__", None)
TypeError: cannot pickle 'SSLContext' object
Please help, thanks very much