Incrementally adding processed examples to a dataset

Hi,

I’m working with some large numerical weather prediction datasets that I’m working to add on the Hub. The datasets are quite large (a few TBs) and I don’t necessarily have the disk space for both all the raw files and the processed outputs locally. I have the raw files streaming from the Hub successfully, although very slowly, and with some postprocessing that takes a bit of time to run, so was thinking that converting them from raw files to Parquet could make it a lot faster to train with. I’ve been messing around the push_to_hub and have it working for a small number of examples, but was wondering if you can push to the same split multiple times and just keep appending to it? Or some other way where I don’t need to have two full copies of my dataset on disk?

As far as I know, :hugs: Datasets doesn’t currently support push_to_hub for streaming datasets, and running push_to_hub to will overwrite your dataset.

However, it might be possible for you to implement something similar to how push_to_hub was implemented? Here they use HfApi.upload_file (datasets/arrow_dataset.py at master · huggingface/datasets · GitHub) to upload each shard, but you could even use the new create_commit function: Upload files to the Hub. You’d also need to keep track of the dataset info (dataset size, number of examples for each split, number of bytes, etc…, full list here: Main classes) so that you can also upload it like it’s done here: datasets/arrow_dataset.py at master · huggingface/datasets · GitHub

Maybe there’s an easier way though, so I’ll pass this along to the Datasets team to see if they have any other thoughts!

Hi,

Thanks for the response! Yeah, I was thinking of uploading the raw files, then I can stream them back to generate the processed dataset, but there seems to be an odd issue with the raw data files stored here: openclimatefix/gfs-reforecast · Datasets at Hugging Face where if I stream in the data, I get a lot of NaN values for almost all the variables, but if I download the data file and open it locally, it works without issue. This doesn’t seem to be the case in another dataset I’ve uploaded here: openclimatefix/mrms · Datasets at Hugging Face so it seems to be something is off with that one dataset. I probably can hack together downloading each data file locally individually, adding it to the shard and then deleting the local copy, it just seems a lot more complicated than it should be.

But thanks for the response! And curious if the Datasets team has more thoughts on it.

Strange!

I noticed that if I try to load gfx-reforecast with streaming=True I get an error that points to a missing comma here: gfs-reforecast.py · openclimatefix/gfs-reforecast at e3a98d483374862c1c082c1294fe1abefbf5c760

I added the comma locally, and that got me a bit further. My laptop takes a really long time to run this line (gfs-reforecast.py · openclimatefix/gfs-reforecast at main), so I extracted only the first two variables of the dataset to make it more manageable (with dataset = dataset[["4LFTX.surface", "ABSV.0.01_mb"]]) just to verify that things were loading properly, and I didn’t see any NaNs, but I haven’t tried it with all the other variables.

@lhoestq might have more thoughts?

Hi,

Yeah, the dataset script in that repo is slightly out of date, and they are quite large files, I am working on the local copy now when I was finding out this issue. For example, I was using a fixed version of that script to push this small dataset example openclimatefix/gfs-surface-pressure-0.5deg · Datasets at Hugging Face which is where I saw all the NaN values.

I just reran the script below today, and yeah, it doesn’t seem to give any errors now. It definetly did before when I was trying it out, but I guess it fixed itself? Thanks for trying it out!

I have modified that dataset script locally to make a few different changes and more natively support different resolution of NWP grid outputs, here is what I was using that showed the issue, with just a single Zarr file for this. This script checks the majority of the fields for NaNs when creating examples and outputs them to the command line as it goes, but yeah, today it works apparently.

"""Archival NOAA NWP forecasting data covering most of 2016-2022. """
import numpy as np
import xarray as xr
import json

import datasets
from datasets import Array3D, Sequence, Value, Array2D


# Find for instance the citation on arxiv or on the dataset repo/website
_CITATION = """\
@InProceedings{ocf:gfs,
title = {GFS Forecast Dataset},
author={Jacob Bieker},
year={2022}
}
"""

# You can copy an official description
_DESCRIPTION = """\
This dataset consists of various NOAA datasets related to operational forecasts, including FNL Analysis files,
GFS operational forecasts, and the raw observations used to initialize the grid.
"""

_HOMEPAGE = "https://mtarchive.geol.iastate.edu/"

_LICENSE = "US Government data, Open license, no restrictions"

# The HuggingFace Datasets library doesn't host the datasets but only points to the original files.
# This can be an arbitrary nested dict/list of URLs (see below in `_split_generators` method)
_URLS = {
    "gfs_v16_2deg": ['data/forecasts/GFSv16/2022/02/2022021412.zarr.zip'],
    "raw": "raw.json",
    "analysis": "analysis.json",
}



import urllib.request


class GraphWeather(datasets.GeneratorBasedBuilder):
    """Archival MRMS Precipitation Rate Radar data for the continental US, covering most of 2016-2022."""
    DEFAULT_WRITER_BATCH_SIZE = 1
    VERSION = datasets.Version("1.0.0")
datasets.BuilderConfig

    BUILDER_CONFIGS = [
        datasets.BuilderConfig(name="analysis", version=VERSION, description="FNL 0.25 degree Analysis files"),
        datasets.BuilderConfig(name="raw_analysis", version=VERSION, description="FNL 0.25 degree Analysis files coupled with raw observations"),
        datasets.BuilderConfig(name="gfs_v16_1deg", version=VERSION, description="GFS v16 Forecasts from April 2021 through 2022 at 1 degree resolution"),
        datasets.BuilderConfig(name="gfs_v16_2deg", version=VERSION, description="GFS v16 Forecasts from April 2021 through 2022 at 2 degree resolution"),
        datasets.BuilderConfig(name="gfs_v16_0.5deg", version=VERSION, description="GFS v16 Forecasts from April 2021 through 2022 at 0.5 resolution"),
        datasets.BuilderConfig(name="gfs_v16", version=VERSION, description="GFS v16 Forecasts from April 2021 through 2022 at native 0.25 resolution"),
        datasets.BuilderConfig(name="raw_gfs_v16", version=VERSION, description="GFS v16 Forecasts from April 2021 through 2022, returned as a 696 channel image, coupled with raw observations"),
        datasets.BuilderConfig(name="gfs_v16_variables", version=VERSION, description="GFS v16 Forecasts from April 2021 through 2022 with one returned array per variable"),
    ]

    DEFAULT_CONFIG_NAME = "gfs_v16_2deg"  # It's not mandatory to have a default configuration. Just use one if it make sense.

    def _info(self):
        features = {}
        if "2deg" in self.config.name:
            LATITUDE = 91
            LONGITUDE = 180
        elif "1deg" in self.config.name:
            LATITUDE = 181
            LONGITUDE = 360
        elif "0.5deg" in self.config.name:
            LATITUDE = 361
            LONGITUDE = 720
        else:
            LATITUDE = 721
            LONGITUDE = 1440

        if "v16" in self.config.name:
            features = {'next_CLMR_mb': Array3D(shape=(LATITUDE, LONGITUDE, 22), dtype='float32', id=None), 'current_CLMR_mb': Array3D(shape=(LATITUDE, LONGITUDE, 22), dtype='float32', id=None), 'CLMR_mb_levels': Sequence(feature=Value(dtype='float32', id=None), length=-1, id=None), 'next_GRLE_mb': Array3D(shape=(LATITUDE, LONGITUDE, 22), dtype='float32', id=None), 'current_GRLE_mb': Array3D(shape=(LATITUDE, LONGITUDE, 22), dtype='float32', id=None), 'GRLE_mb_levels': Sequence(feature=Value(dtype='float32', id=None), length=-1, id=None), 'next_VVEL_mb': Array3D(shape=(LATITUDE, LONGITUDE, 41), dtype='float32', id=None), 'current_VVEL_mb': Array3D(shape=(LATITUDE, LONGITUDE, 41), dtype='float32', id=None), 'VVEL_mb_levels': Sequence(feature=Value(dtype='float32', id=None), length=-1, id=None), 'next_VGRD_mb': Array3D(shape=(LATITUDE, LONGITUDE, 41), dtype='float32', id=None), 'current_VGRD_mb': Array3D(shape=(LATITUDE, LONGITUDE, 41), dtype='float32', id=None), 'VGRD_mb_levels': Sequence(feature=Value(dtype='float32', id=None), length=-1, id=None), 'next_UGRD_mb': Array3D(shape=(LATITUDE, LONGITUDE, 41), dtype='float32', id=None), 'current_UGRD_mb': Array3D(shape=(LATITUDE, LONGITUDE, 41), dtype='float32', id=None), 'UGRD_mb_levels': Sequence(feature=Value(dtype='float32', id=None), length=-1, id=None), 'next_O3MR_mb': Array3D(shape=(LATITUDE, LONGITUDE, 41), dtype='float32', id=None), 'current_O3MR_mb': Array3D(shape=(LATITUDE, LONGITUDE, 41), dtype='float32', id=None), 'O3MR_mb_levels': Sequence(feature=Value(dtype='float32', id=None), length=-1, id=None), 'next_TMP_mb': Array3D(shape=(LATITUDE, LONGITUDE, 41), dtype='float32', id=None), 'current_TMP_mb': Array3D(shape=(LATITUDE, LONGITUDE, 41), dtype='float32', id=None), 'TMP_mb_levels': Sequence(feature=Value(dtype='float32', id=None), length=-1, id=None), 'next_DZDT_mb': Array3D(shape=(LATITUDE, LONGITUDE, 41), dtype='float32', id=None), 'current_DZDT_mb': Array3D(shape=(LATITUDE, LONGITUDE, 41), dtype='float32', id=None), 'DZDT_mb_levels': Sequence(feature=Value(dtype='float32', id=None), length=-1, id=None), 'next_HGT_mb': Array3D(shape=(LATITUDE, LONGITUDE, 41), dtype='float32', id=None), 'current_HGT_mb': Array3D(shape=(LATITUDE, LONGITUDE, 41), dtype='float32', id=None), 'HGT_mb_levels': Sequence(feature=Value(dtype='float32', id=None), length=-1, id=None), 'next_RH_mb': Array3D(shape=(LATITUDE, LONGITUDE, 41), dtype='float32', id=None), 'current_RH_mb': Array3D(shape=(LATITUDE, LONGITUDE, 41), dtype='float32', id=None), 'RH_mb_levels': Sequence(feature=Value(dtype='float32', id=None), length=-1, id=None), 'next_ICMR_mb': Array3D(shape=(LATITUDE, LONGITUDE, 22), dtype='float32', id=None), 'current_ICMR_mb': Array3D(shape=(LATITUDE, LONGITUDE, 22), dtype='float32', id=None), 'ICMR_mb_levels': Sequence(feature=Value(dtype='float32', id=None), length=-1, id=None), 'next_SNMR_mb': Array3D(shape=(LATITUDE, LONGITUDE, 22), dtype='float32', id=None), 'current_SNMR_mb': Array3D(shape=(LATITUDE, LONGITUDE, 22), dtype='float32', id=None), 'SNMR_mb_levels': Sequence(feature=Value(dtype='float32', id=None), length=-1, id=None), 'next_SPFH_mb': Array3D(shape=(LATITUDE, LONGITUDE, 41), dtype='float32', id=None), 'current_SPFH_mb': Array3D(shape=(LATITUDE, LONGITUDE, 41), dtype='float32', id=None), 'SPFH_mb_levels': Sequence(feature=Value(dtype='float32', id=None), length=-1, id=None), 'next_RWMR_mb': Array3D(shape=(LATITUDE, LONGITUDE, 22), dtype='float32', id=None), 'current_RWMR_mb': Array3D(shape=(LATITUDE, LONGITUDE, 22), dtype='float32', id=None), 'RWMR_mb_levels': Sequence(feature=Value(dtype='float32', id=None), length=-1, id=None), 'next_TCDC_mb': Array3D(shape=(LATITUDE, LONGITUDE, 22), dtype='float32', id=None), 'current_TCDC_mb': Array3D(shape=(LATITUDE, LONGITUDE, 22), dtype='float32', id=None), 'TCDC_mb_levels': Sequence(feature=Value(dtype='float32', id=None), length=-1, id=None), 'next_ABSV_mb': Array3D(shape=(LATITUDE, LONGITUDE, 41), dtype='float32', id=None), 'current_ABSV_mb': Array3D(shape=(LATITUDE, LONGITUDE, 41), dtype='float32', id=None), 'ABSV_mb_levels': Sequence(feature=Value(dtype='float32', id=None), length=-1, id=None), 'next_LFTX': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_LFTX': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_CRAIN': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_CRAIN': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_HGT': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_HGT': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_TMP': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_TMP': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_VIS': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_VIS': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_FRICV': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_FRICV': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_PRES': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_PRES': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_CAPE': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_CAPE': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_CFRZR': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_CFRZR': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_CNWAT': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_CNWAT': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_SNOD': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_SNOD': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_ICETK': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_ICETK': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_CIN': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_CIN': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_FLDCP': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_FLDCP': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_WEASD': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_WEASD': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_ICEC': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_ICEC': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_PRATE': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_PRATE': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_SUNSD': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_SUNSD': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_LAND': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_LAND': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_4LFTX': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_4LFTX': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_SFCR': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_SFCR': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_CSNOW': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_CSNOW': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_HPBL': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_HPBL': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_CICEP': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_CICEP': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_GUST': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_GUST': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_WILT': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_WILT': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_CPOFP': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_CPOFP': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_SOTYP': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_SOTYP': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_ICETMP': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_ICETMP': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_VEG': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_VEG': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_HINDEX': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_HINDEX': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_MCDC': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_MCDC': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_LCDC': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_LCDC': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_HCDC': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'current_HCDC': Array2D(shape=(LATITUDE, LONGITUDE), dtype='float32', id=None), 'next_max_wind': Array3D(shape=(LATITUDE, LONGITUDE, 6), dtype='float32', id=None), 'current_max_wind': Array3D(shape=(LATITUDE, LONGITUDE, 6), dtype='float32', id=None), 'next_2m_above_ground': Array3D(shape=(LATITUDE, LONGITUDE, 5), dtype='float32', id=None), 'current_2m_above_ground': Array3D(shape=(LATITUDE, LONGITUDE, 5), dtype='float32', id=None), 'next_10m_above_ground': Array3D(shape=(LATITUDE, LONGITUDE, 2), dtype='float32', id=None), 'current_10m_above_ground': Array3D(shape=(LATITUDE, LONGITUDE, 2), dtype='float32', id=None), 'next_20m_above_ground': Array3D(shape=(LATITUDE, LONGITUDE, 2), dtype='float32', id=None), 'current_20m_above_ground': Array3D(shape=(LATITUDE, LONGITUDE, 2), dtype='float32', id=None), 'next_30m_above_ground': Array3D(shape=(LATITUDE, LONGITUDE, 2), dtype='float32', id=None), 'current_30m_above_ground': Array3D(shape=(LATITUDE, LONGITUDE, 2), dtype='float32', id=None), 'next_40m_above_ground': Array3D(shape=(LATITUDE, LONGITUDE, 2), dtype='float32', id=None), 'current_40m_above_ground': Array3D(shape=(LATITUDE, LONGITUDE, 2), dtype='float32', id=None), 'next_50m_above_ground': Array3D(shape=(LATITUDE, LONGITUDE, 2), dtype='float32', id=None), 'current_50m_above_ground': Array3D(shape=(LATITUDE, LONGITUDE, 2), dtype='float32', id=None), 'next_80m_above_ground': Array3D(shape=(LATITUDE, LONGITUDE, 5), dtype='float32', id=None), 'current_80m_above_ground': Array3D(shape=(LATITUDE, LONGITUDE, 5), dtype='float32', id=None), 'next_100m_above_ground': Array3D(shape=(LATITUDE, LONGITUDE, 3), dtype='float32', id=None), 'current_100m_above_ground': Array3D(shape=(LATITUDE, LONGITUDE, 3), dtype='float32', id=None), 'timestamps': Sequence(feature=Value(dtype='timestamp[ns]', id=None), length=-1, id=None), 'reftime': Value(dtype='timestamp[ns]', id=None), 'latitude': Sequence(feature=Value(dtype='float32', id=None), length=-1, id=None), 'longitude': Sequence(feature=Value(dtype='float32', id=None), length=-1, id=None)}
        elif "analysis" in self.config.name:
            # TODO Add the variables one with all 322 variables, potentially combined by level
            features = {
                "current_state": datasets.Array3D((LATITUDE,LONGITUDE,322), dtype="float32"),
                "next_state": datasets.Array3D((LATITUDE,LONGITUDE,322), dtype="float32"),
                "timestamp": datasets.Sequence(datasets.Value("timestamp[ns]")),
                "latitude": datasets.Sequence(datasets.Value("float32")),
                "longitude": datasets.Sequence(datasets.Value("float32"))
                # These are the features of your dataset like images, labels ...
            }
        if "raw" in self.config.name:
            # Add the raw observation features, capping at 256,000 observations, padding if not enough
            raw_features = {"observations": datasets.Array2D((256000,1), dtype="float32"),
                            "observation_type": datasets.Array2D((256000,1), dtype="string"),
                            "observation_lat": datasets.Array2D((256000,1), dtype="float32"),
                            "observation_lon": datasets.Array2D((256000,1), dtype="float32"),
                            }
            features = features.update(raw_features)

        features = datasets.Features(features)

        return datasets.DatasetInfo(
            # This is the description that will appear on the datasets page.
            description=_DESCRIPTION,
            # This defines the different columns of the dataset and their types
            features=features,  # Here we define them above because they are different between the two configurations
            # Homepage of the dataset for documentation
            homepage=_HOMEPAGE,
            # License for the dataset if available
            license=_LICENSE,
            # Citation for the dataset
            citation=_CITATION,
        )

    def _split_generators(self, dl_manager):
        urls = _URLS[self.config.name]
        streaming = dl_manager.is_streaming
        #urls = dl_manager.download_and_extract(urls)
        return [
            datasets.SplitGenerator(
                name=datasets.Split.TRAIN,
                # These kwargs will be passed to _generate_examples
                gen_kwargs={
                    "filepath": urls,
                    "split": "train",
                    "streaming": streaming,
                })
        ]

    # method parameters are unpacked from `gen_kwargs` as given in `_split_generators`
    def _generate_examples(self, filepath, split, streaming):
        # Load the list of files for the type of data
        if "2deg" in self.config.name:
            coarsen_level = 8
        elif "1deg" in self.config.name:
            coarsen_level = 4
        elif "0.5deg" in self.config.name:
            coarsen_level = 2
        else:
            coarsen_level = 0
        filepaths = ['zip:///::https://huggingface.co/datasets/openclimatefix/gfs-reforecast/resolve/main/' + f for f in filepath]
        filepaths = filepaths[::2]
        if "v16" in self.config.name:
            idx = 0
            for f in filepaths:
                try:
                    print(f)
                    #urllib.request.urlretrieve(f, "file.zarr.zip")
                    dataset = xr.open_dataset(f, engine='zarr', chunks={})
                    for t in range(8):
                        if coarsen_level > 0:
                            data_t = dataset.isel(time=t).coarsen(latitude=coarsen_level, boundary="pad").mean().coarsen(longitude=coarsen_level).mean()
                            data_t1 = dataset.isel(time=(t+1)).coarsen(latitude=coarsen_level, boundary="pad").mean().coarsen(longitude=coarsen_level).mean()
                        else:
                            data_t = dataset.isel(time=t)
                            data_t1 = dataset.isel(time=(t+1))
                        values = get_values(data_t1, "next_")
                        values.update(get_values(data_t, "current_"))
                        values["timestamps"] = [values["reftime"] + data_t["time"].values, values["reftime"] + data_t1["time"].values]
                        idx += 1
                        yield idx, values
                except Exception as e:
                    print(e)
                    continue


def get_values(dataset, state):
    names = ['CLMR', 'GRLE', 'VVEL', 'VGRD', 'UGRD', 'O3MR', 'CAPE', 'TMP', 'PLPL', 'DZDT', 'CIN', 'HGT', 'RH', 'ICMR', 'SNMR', 'SPFH', 'RWMR', 'TCDC', 'ABSV']
    features = {}
    ones_with_nans = []
    total = 0
    # For pressure level values
    for n in names:
        if len(sorted([float(var.split(".", 1)[-1].split("_")[0]) for var in dataset.data_vars if "mb" in var and n in var and "-" not in var])) > 0:
            features[n+"_mb_levels"] = []
            features[state+n+"_mb"] = []
            for value in sorted([float(var.split(".", 1)[-1].split("_")[0]) for var in dataset.data_vars if "mb" in var and n in var and "-" not in var]):
                # Is floats now, but will be fixed
                if value >= 1:
                    value = int(value)
                var_name = f"{n}.{value}_mb"
                #print(var_name)
                if np.isnan(dataset[var_name].values).any():
                    ones_with_nans.append(var_name)
                features[state+n+"_mb"].append(dataset[var_name].values)
                features[n+"_mb_levels"].append(value)
                total += 1
            features[state+n+"_mb"] = np.stack(features[state+n+"_mb"], axis=-1)
            features[n+"_mb_levels"] = np.asarray(features[n+"_mb_levels"])
    print(f"After Presure Levels: {len(ones_with_nans)}/{total}")

    # For surface values
    for n in list(set([var.split(".", 1)[0] for var in dataset.data_vars if "surface" in var and "level" not in var and "2e06" not in var and "below" not in var and "atmos" not in var and "tropo" not in var and "iso" not in var and "planetary_boundary_layer" not in var])):
        features[state+n] = np.nan_to_num(dataset[n+".surface"].values)
        total += 1
        if np.isnan(dataset[n+".surface"].values).any():
            ones_with_nans.append(n+".surface")
    print(f"After Surface Levels: {len(ones_with_nans)}/{total}")

    # For Cloud levels
    for n in list(set([var.split(".", 1)[0] for var in dataset.data_vars if "sigma" not in var and "level" not in var and "2e06" not in var and "below" not in var and "atmos" not in var and "tropo" not in var and "iso" not in var and "planetary_boundary_layer" not in var])):
        if "LCDC" in n:# or "MCDC" in n or "HCDC" in n:
            features[state+n] = dataset["LCDC.low_cloud_layer"].values
            total += 1
            if np.isnan(dataset["LCDC.low_cloud_layer"].values).any():
                ones_with_nans.append("LCDC.low_cloud_layer")
        if "MCDC" in n:# or "HCDC" in n:
            features[state+n] = dataset["MCDC.middle_cloud_layer"].values
            total += 1
            if np.isnan(dataset["MCDC.middle_cloud_layer"].values).any():
                ones_with_nans.append("MCDC.middle_cloud_layer")
        if "HCDC" in n:
            features[state+n] = dataset["HCDC.high_cloud_layer"].values
            total += 1
            if np.isnan(dataset["HCDC.high_cloud_layer"].values).any():
                ones_with_nans.append("HCDC.high_cloud_layer")

    # Now for each of these
    features[state+"max_wind"] = []
    for n in sorted([var for var in dataset.data_vars if "max_wind" in var]):
        features[state+"max_wind"].append(dataset[n].values)
        total += 1
        if np.isnan(dataset[n].values).any():
            ones_with_nans.append(n)
    features[state+"max_wind"] = np.stack(features[state+"max_wind"], axis=-1)
    print(f"After Max Wind Levels: {len(ones_with_nans)}/{total}")

    for i in [2,10,20,30,40,50,80,100]:
        features[f"{state}{i}m_above_ground"] = []
        for n in sorted([var for var in dataset.data_vars if f"{i}_m_above_ground" in var]):
            features[f"{state}{i}m_above_ground"].append(dataset[n].values)
            total += 1
            if np.isnan(dataset[n].values).any():
                ones_with_nans.append(n)
        features[f"{state}{i}m_above_ground"] = np.stack(features[f"{state}{i}m_above_ground"], axis=-1)

    # Go through all features and see if any NaNs and compare
    feature_nans = []
    feature_total = 0
    for key, value in features.items():
        feature_total += 1
        if np.isnan(value).any():
            feature_nans.append(key)

    print(f"After Height Levels: {len(ones_with_nans)}/{total}")
    features["latitude"] = dataset["latitude"].values
    features["longitude"] = dataset["longitude"].values
    features["reftime"] = dataset["reftime"].values
    print(ones_with_nans)
    print(f"Feature NaNs: {len(feature_nans)}/{feature_total}")
    print(feature_nans)
    return features

And the script to generate a dataset like the one above with all the Null values, although not exactly that one

from datasets import load_dataset
from datasets import get_dataset_split_names
from datasets import DatasetDict
from tqdm import tqdm
# writer_batch_size=1, cache_dir="/run/media/jacob/data/hf_cache/",
from datasets import load_dataset
dataset = load_dataset("graph_weather.py", writer_batch_size=1,, split='train')
dataset.push_to_hub("openclimatefix/gfs-surface-pressure-2.5deg")
1 Like