Minhash Deduplication

Hi @lhoestq,

I know you are very busy but I was wondering if you are able to help me out with some questions regarding the preprocessing scripts for the dataset used in Code Parrot.

In the Code Parrot research repository, there is an implementation of Minhash LSH for deduplicating datasets. The implementation uses a tuple, code_key, consisting of base_index, repo_name, and path as a reference to get information for the duplicated clusters. The clusters are formatted in a list of dict:

cluster = [{"base_index": el[0], "repo_name": el[1], "path": el[2]} for el in cluster]

In this example, thetransformersbook/codeparrot dataset already contains columns repo_name and path. How would you recommend handling this in the new case where the only column in the dataset is text (or content in the case of Code Parrot)?

In this case with only a text column, is it suitable to define the list of dict below as:

cluster = [{"base_index": el} for el in cluster]

Or do we need to add an additional column to be used as a reference? For example if we were using a dataset such as enron emails, which contains columns for text (string) and meta, would we define the list of dict as:

cluster = [{"base_index": el, "meta": el} for el in cluster]

In this new case with the enron emails dataset, the value, el is an int and not subscriptable, throwing an error if accessed as el[0]. So the full get_duplicate_clusters function would be:

def get_duplicate_clusters(self) -> List[List[Dict]]:
    """Export the duplicate clusters.
    For each cluster, the first element is the base element of the cluster.
    The base element has an estimation jaccard similarity higher than the threshold with all the other elements.

    Returns:
        duplicate_clusters (List[List[Dict]]):
            List of duplicate clusters.
    """
    duplicate_clusters = []
    for base, duplicates in self._duplicate_clusters.items():
        cluster = [base] + list(duplicates)
        # reformat the cluster to be a list of dict
        cluster = [{"base_index": el, "meta": el} for el in cluster]
        duplicate_clusters.append(cluster)
    return duplicate_clusters

And the compute_min_hash function as:

def _compute_min_hash(element):
    index, data = element
    min_hash = get_min_hash([t for t in NON_ALPHA.split(data["text"]) if len(t.strip()) > 0])
    if min_hash is not None:
        return (index), min_hash

When I run the deduplicate_dataset function it seems to correctly remove the duplicates:

def deduplicate_dataset(
    dataset: Type[Dataset], jaccard_threshold: float = 0.85
) -> Tuple[Type[Dataset], List[List[Dict]]]:
    """
    Example:
        >>> from datasets import load_dataset
        >>> from minhash_deduplication import deduplicate_dataset
        >>> ds = load_dataset("conceptofmind/pile_enron_emails", split="train")
        >>> ds_dedup, duplicate_clusters = deduplicate_dataset(ds, jaccard_threshold=0.95)
    """
    duplicate_clusters = make_duplicate_clusters(dataset, jaccard_threshold)
    duplicate_indices = set(x["base_index"] for cluster in duplicate_clusters for x in cluster)
    extreme_dict = {}
    extremes_clusters = find_extremes(duplicate_clusters, dataset, jaccard_threshold)
    for extremes in extremes_clusters:
        for element in extremes:
            extreme_dict[element["base_index"]] = element
    remove_indices = duplicate_indices - set(extreme_dict.keys())
    ds_filter = dataset.filter(lambda x, idx: idx not in remove_indices, with_indices=True)

    # update duplicate_clusters
    for cluster in duplicate_clusters:
        for element in cluster:
            element["is_extreme"] = element["base_index"] in extreme_dict
            if element["is_extreme"]:
                element["copies"] = extreme_dict[element["base_index"]]["copies"]

    return ds_filter, duplicate_clusters

I get the results:

Original dataset size: 237585
Number of duplicate clusters: 12566
Files in duplicate cluster: 29247
Unique files in duplicate cluster: 14791
Filtered dataset size: 223129
Time to deduplicate dataset: 104.63
Size of deduplicate dataset: 223129

Is there anything I could possibly be missing? Do I need to refactor any parts of the script differently to meet this general use case with enron emails, other than what I have listed above?

I think it would be very beneficial to the community to have an accessible deduplication script with MinHash LSH, or a reference/blog post to how to implement it with different datasets.

I greatly appreciate your time and consideration.

Thank you,

Enrico

1 Like

cc @lvwerra knows probably better about this code :slight_smile:

Overall I agree it would be useful for users to have a reference documentation/code/post for deduplication cc @lvwerra @osanseviero @stevhliu in case you have some ideas.

Hi @conceptofmind

I don’t think any major changes would be necessary - I think the repo_name and path are just there for information.

I agree that deduplication is something very useful and I have been discussing with @loubnabnl about integrating some functionality into datasets. I think this has come a few times already as it’s a pretty common data cleaning step. Something like ds = ds.deduplicate(column="text", strategy="exact") could be valuable. What do you think @lhoestq?

3 Likes

Yup I agree, see the relevant issue here: https://github.com/huggingface/datasets/issues/2514, and another one at New Preprocessing Feature - Deduplication [Request] · Issue #4448 · huggingface/datasets · GitHub

2 Likes

Cool! If we end up with a deduplicate() function, we can document it in the Text Process section :slight_smile:

Hi @lhoestq @lvwerra @stevhliu,

I greatly appreciate the responses.

I noticed that in the Code Parrot preprocessing.py file there is an implementation for doing exact hash matching and deduplication based on the column:

def get_hash(example):
    """Get hash of content field."""
    return {"hash": hashlib.md5(example["text"].strip().encode("utf-8")).hexdigest()}

def check_uniques(example, uniques):
    """Check if current hash is still in set of unique hashes and remove if true."""
    if example["hash"] in uniques:
        uniques.remove(example["hash"])
        return True
    else:
        return False

def preprocess(example):
    """Chain all preprocessing steps into one function to not fill cache."""
    results = dict()
    results.update(get_hash(example))
    return results

def filter(example, unique):
    """Filter dataset with heuristics. Config, test and has_no_keywords files are removed with a given probability."""
    if not check_uniques(example, uniques):
        return False
    else:
        return True

# Load dataset
ds = load_dataset(dataset_name, split="train")

# Run preprocessing
ds = ds.map(preprocess, num_proc=num_workers)

# Deduplicate hashes
uniques = set(ds.unique("hash"))

# Deduplicate data and apply heuristics
ds_filter = ds.filter(filter, fn_kwargs={"uniques": uniques})

I imagine this already would suffice in most cases for doing:

ds = ds.deduplicate(column="text", strategy="exact")

In the case of MinHash LSH, deduplicate() could be defined as something like:

ds = ds.deduplicate(
        column="text", 
        strategy="minhash", 
        threshold=0.95,
        min_num_tokens=10,
        num_perm=256
)

For readability, it may be better to have min_num_tokens and num_perm set as optional variables with some default value. I personally think it would be better to allow users to define the jaccard threshold themselves though.

@lvwerra In the example in my initial post with Enron emails, the exact deduplication section had worked sufficiently with very few alterations but I ran into errors when adapting the minhash_deduplication.py file.

Do you believe that it would be sufficient to make the minimal changes to get_duplicate_clusters by altering the cluster variable to only include the list of dict [{"base_index": el} for el in cluster]?

For example:

def get_duplicate_clusters(self) -> List[List[Dict]]:
    duplicate_clusters = []
    for base, duplicates in self._duplicate_clusters.items():
        cluster = [base] + list(duplicates)
        # reformat the cluster to be a list of dict
        cluster = [{"base_index": el} for el in cluster]
        duplicate_clusters.append(cluster)
    return duplicate_clusters

And with that change, only return index and min_hash in _compute_min_hash:

def _compute_min_hash(element):
    index, data = element
    min_hash = get_min_hash([t for t in NON_ALPHA.split(data["text"]) if len(t.strip()) > 0])
    if min_hash is not None:
        return index, min_hash

Other than those two changes, I swapped 'text' instead of 'content' as the value referenced in other parts of the script.

I believe that this is working properly to deduplicate the general case with Enron Emails, but it is always beneficial to get an additional confirmation in case I missed something.

I appreciate all of the help.

Thank you again,

Enrico

Hi Enrico,

Do you still encounter errors using minhash_deduplication.py ?

It is a good idea. The only thing you need is provide an unique key for each code snippet for the method DuplicationIndex.add.
In the very beginning, this key is only composed of repo_name and path, then I realize I would need the base_index. That’s why I used the tuple (and also this facilitate the debug), but I agree with you it is simpler to just use the index.

Tell me if you need more information !

Best,
Jia

1 Like

Hi @liyongsea,

I first want to thank you greatly for your awesome implementation of minhash lsh deduplication.

I’ll briefly go through the steps I went through when debugging. There will be some overlap from above. The error which I was encountering when attempting to refactor the code for a general use case was in this function here:

def get_duplicate_clusters(self) -> List[List[Dict]]:
    """Export the duplicate clusters.
    For each cluster, the first element is the base element of the cluster.
    The base element has an estimation jaccard similarity higher than the threshold with all the other elements.
    Returns:
        duplicate_clusters (List[List[Dict]]):
            List of duplicate clusters.
    """
    duplicate_clusters = []
    for base, duplicates in self._duplicate_clusters.items():
        cluster = [base] + list(duplicates)
        # reformat the cluster to be a list of dict
        cluster = [{"base_index": el[0], "repo_name": el[1], "path": el[2]} for el in cluster]
        duplicate_clusters.append(cluster)
    return duplicate_clusters

Specifically in this line:

cluster = [{"base_index": el[0], "repo_name": el[1], "path": el[2]} for el in cluster]

I removed the keys and values "repo_name": el[1] and "path": el[2] from the dictionary since I am only using base_index.

The line would then be:

cluster = [{"base_index": el[0]} for el in cluster]

This threw an error:

TypeError: 'int' object is not subscriptable

I changed the line to only include:

cluster = [{"base_index": el} for el in cluster]

So the final function would be:

def get_duplicate_clusters(self) -> List[List[Dict]]:
    """Export the duplicate clusters.
    For each cluster, the first element is the base element of the cluster.
    The base element has an estimation jaccard similarity higher than the threshold with all the other elements.

    Returns:
        duplicate_clusters (List[List[Dict]]):
            List of duplicate clusters.
    """
    duplicate_clusters = []
    for base, duplicates in self._duplicate_clusters.items():
        cluster = [base] + list(duplicates)
        # reformat the cluster to be a list of dict
        cluster = [{"base_index": el} for el in cluster]
        duplicate_clusters.append(cluster)
    return duplicate_clusters

I am only returning index and min_hash in compute_min_hash. This seemed to work but I am not 100% sure whether it would properly deduplicate the data based only on index and text.

Here is a gist of the minor changes which I made to the minhash_deduplication.py file to work on a general text dataset such as enron_emails: general_deduplicate.py · GitHub

Do these changes seem correct to you? Do you have any further input on handling general use cases?

Additionally, here is a gist for the prerocessing.py file I am using which seems to be working as intended: clean_data.py · GitHub

I wanted to ensure that I was correctly implementing this before spending the time and CPU cloud compute on deduplicating larger datasets and uploading them publicly to the hub.

I greatly appreciate your time and help.

Thank you very much,

Enrico

Hi Enrico,

I believe your error message comes from

return (index, data["repo_name"], data["path"]), min_hash

When your remove repo_name and path, the code_key is no longer a tuple but just the index.

In this case, your implementation is correct and it is effectively more generic. Well done !

Best,
Jia

1 Like

Hi @liyongsea ,

I appreciate you taking the time to answer my questions. Your work is greatly beneficial to the community.

I had done additional testing last night and you are definitely correct. I tested this code on enron_emails using a tuple with index and meta and it worked properly as well. Although deduplication results were slightly different depending on whether only the index or a tuple were returned.

def get_duplicate_clusters(self) -> List[List[Dict]]:
    duplicate_clusters = []
    for base, duplicates in self._duplicate_clusters.items():
        cluster = [base] + list(duplicates)
        # reformat the cluster to be a list of dict
        cluster = [{"base_index": el[0], "meta": el[1]} for el in cluster]
        duplicate_clusters.append(cluster)
    return duplicate_clusters

def _compute_min_hash(element):
    index, data = element
    min_hash = get_min_hash([t for t in NON_ALPHA.split(data["text"]) if len(t.strip()) > 0])
    if min_hash is not None:
        return (index, data["meta"]["pile_set_name"]), min_hash

Two additional things I had noticed when testing the minshash lsh deduplication script on different datasets was that this Warning would sometimes occur:

RuntimeWarning: Mean of empty slice. 
return _methods._mean(a, axis=axis, dtype=dtype,                                                                                                                        
RuntimeWarning: invalid value encountered in double_scalars 
ret = ret.dtype.type(ret / count)

It does not seem to have a significant impact on deduplication.

And this error would also occur depending on the dataset size and break the script:

uniques = set(ds.unique("hash"))
File "pyarrow/table.pxi", line 394, in pyarrow.lib.ChunkedArray.unique
File "pyarrow/_compute.pyx", line 531, in pyarrow._compute.call_function
File "pyarrow/_compute.pyx", line 330, in pyarrow._compute.Function.call
File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status
File "pyarrow/error.pxi", line 124, in pyarrow.lib.check_status
pyarrow.lib.ArrowCapacityError: array cannot contain more than 2147483646 bytes, have 2147483648

I think this error may require me to open an issue on GitHub though.

Have you seen any similar errors before?

I am currently testing deduplication on the_pile dataset here: the_pile · Datasets at Hugging Face

Thank you again for your time and help.

Best,

Enrico

Hi @liyongsea ,

Additionally to the warning and error above, I have also been experiencing a second error which sometimes occurs on datasets around 300 GBs of size:

Time to filter dataset: 866.99| 5244089/5244135 [1:22:04<00:00, 1083.08ex/s]
Size of filtered dataset: 52435971| 5237337/5244135 [1:21:50<00:04, 1364.48ex/s]
2889863it [33:35, 1635.90it/s]
Process ForkPoolWorker-1:
Killed
Traceback (most recent call last):
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe

I believe this error may be in relation to this function:

def minhash_iter(dataset_iterator: Type[Dataset]):
    with mp.Pool() as pool:
        for data in pool.imap_unordered(
            _compute_min_hash,
            ThreadedIterator(dataset_iterator, max_queue_size=10000),
            chunksize=100,
        ):
            if data is not None:
                yield data

Thank you,

Enrico

Replacing set(ds.unique("hash")) with set(ds["hash"]) should solve this issue as the list created by ds.unique is too large in this case.

2 Likes

Hi @loubnabnl ,

I greatly appreciate the help and additional information. I will make the modifications to the script to use set(ds["hash")) instead for deduplicating larger datasets. After further review, I believe both of the errors I experienced are connected to the size of the dataset.

I was additionally wondering if you pre-tokenized the CodeParrot dataset before using the preprocessing.py script? If so, did you note any significant performance improvements by pre-tokenizing the dataset first?

Do you believe pre-tokenization is even required for performant MinHash LSH deduplication? Or can the preprocessing script be run as is on the original dataset or text without any tokenization at all?

I have tried deduplication with and without pre-tokenization but wanted to see your input.

I have been testing different cases for generic deduplication before working on a full Pile dataset deduplication. It looks like deduplicating the entire dataset will take around 2000GB of RAM after reviewing Google’s attempts at cleaning large datasets.

Thank you again for everything you and the Huggingface team do!

Best,

Enrico

Hi Enrico, I have used the script for a 400GB dataset and it worked fine but it needed more than 1TB of RAM, however I didn’t use it for a larger dataset. Regarding pre-tokenization, I did make some experiments and it gave a small speedup during the training.

As for near-deduplication we actually run it on the text samples not on the pre-tokenized dataset, the tokenization in this script only refers to splitting the text into words based on a regex.

1 Like

Hi @loubnabnl,

I appreciate you taking the time to answer my questions.

I will have to adjust my RAM usage to 3 or 4 TB instead for The Pile.

Thank you for confirming that near-deduplication is used only on the text samples.

I have been running tests both with near and exact deduplication on text samples and tokenized data but I haven’t noticed much performance gain with pre-tokenization, similar to what you had stated.

Do you mind telling me what cloud instance you used? GCP / Vertex instances with high RAM seem to consistently crash when downloading data from the Huggingface hub.

Thank you again for your help.

Enrico

I use GCP VMs c2d-highmem and m2-megamem. However downloading the data from the hub shouldn’t make your machine crash if you have enough disk space for the dataset.