Source code for raster_tools.io

import os
import urllib
from pathlib import Path

import dask
import numpy as np
import rasterio as rio
import rioxarray as xrio
import xarray as xr
from affine import Affine
from dask.array.core import normalize_chunks as dask_chunks

from raster_tools.dtypes import F32, F64, I64, U8, is_bool, is_float, is_int
from raster_tools.exceptions import (
    AffineEncodingError,
    DimensionsError,
    RasterDataError,
    RasterIOError,
)
from raster_tools.masking import get_default_null_value
from raster_tools.utils import to_chunk_dict, validate_path


def _get_extension(path):
    return os.path.splitext(path)[-1].lower()


def _get_chunking_info_from_file(src_file):
    with rio.open(src_file) as src:
        tile_shape = (1, *src.block_shapes[0])
        shape = (src.count, *src.shape)
        dtype = np.dtype(src.dtypes[0])
        return tile_shape, shape, dtype


def _get_chunks(data=None, src_file=None):
    chunks = (1, "auto", "auto")
    if data is None:
        if src_file is None:
            return chunks
        tile_shape, shape, dtype = _get_chunking_info_from_file(src_file)
    else:
        shape = data.shape
        dtype = data.dtype
        tile_shape = None
        if dask.is_dask_collection(data):
            tile_shape = data.chunks
        elif src_file is not None:
            _, tile_shape, _ = _get_chunking_info_from_file(src_file)
    return dask_chunks(chunks, shape, dtype=dtype, previous_chunks=tile_shape)


[docs]def chunk(xrs, src_file=None): chunks = to_chunk_dict( _get_chunks( xrs.raster if isinstance(xrs, xr.Dataset) else xrs, src_file ) ) return xrs.chunk(chunks)
TIFF_EXTS = frozenset((".tif", ".tiff")) NC_EXTS = frozenset((".cdf", ".nc", ".nc4")) HDF_EXTS = frozenset((".hdf", ".h4", ".hdf4", ".he2", ".h5", ".hdf5", ".he5")) GRIB_EXTS = frozenset((".grib", ".grib2", ".grb", ".grb2", ".gb", ".gb2")) BATCH_EXTS = frozenset((".bch",)) # File extenstions that can't be read in yet READ_NOT_IMPLEMENTED_EXTS = NC_EXTS | HDF_EXTS | GRIB_EXTS # File extenstions that can't be written out yet WRITE_NOT_IMPLEMENTED_EXTS = NC_EXTS | HDF_EXTS | GRIB_EXTS IO_UNDERSTOOD_TYPES = (str, Path)
[docs]def is_batch_file(path): return _get_extension(path) in BATCH_EXTS
ESRI_DEFAULT_F32_NV = np.finfo(F32).min
[docs]def normalize_null_value(nv, dtype): # Make sure that ESRI's default F32 null value is properly # registered as F32 if dtype == F32 and nv is not None and np.isclose(nv, ESRI_DEFAULT_F32_NV): nv = F32.type(nv) # Some rasters have (u)int dtype and a null value that is a whole number # but it gets read in as a float. This can cause a lot of accidental type # promotions down the pipeline. Check for this case and correct it. if is_int(dtype) and is_float(nv) and float(nv).is_integer(): nv = int(nv) return nv
[docs]def open_raster_from_path_or_url(path): from raster_tools.raster import ( _try_to_get_null_value_xarray, normalize_xarray_data, ) if type(path) in IO_UNDERSTOOD_TYPES: path = str(path) else: raise RasterIOError( f"Could not resolve input to a raster path or URL: '{path}'" ) if urllib.parse.urlparse(path) == "": # Assume file path validate_path(path) ext = _get_extension(path) else: # URL ext = "" xrs = None # Try to let gdal open anything but NC, HDF, GRIB files if not ext or ext not in READ_NOT_IMPLEMENTED_EXTS: try: xrs = xrio.open_rasterio( path, chunks=to_chunk_dict(_get_chunks()), lock=False ) except rio.errors.RasterioIOError as e: raise RasterIOError( "Could not open given path as a raster." ) from e elif ext in READ_NOT_IMPLEMENTED_EXTS: raise NotImplementedError( "Reading of NetCDF, HDF, and GRIB files is not supported at this" " time." ) else: raise RasterIOError("Unknown file type") if isinstance(xrs, xr.Dataset): raise RasterDataError("Too many data variables in input data") assert isinstance( xrs, xr.DataArray ), "Resulting data structure must be a DataArray" if not dask.is_dask_collection(xrs): xrs = chunk(xrs, path) xrs = normalize_xarray_data(xrs) nv = _try_to_get_null_value_xarray(xrs) nv = normalize_null_value(nv, xrs.dtype) xrs = xrs.rio.write_nodata(nv) return xrs
[docs]def write_raster(xrs, path, no_data_value, **rio_gdal_kwargs): ext = _get_extension(path) rio_is_bool = False if ext in TIFF_EXTS or len(ext) == 0: if xrs.dtype == I64: # GDAL, and thus rioxarray and rasterio, doesn't support I64 so # cast up to float. This avoids to_raster throwing a TypeError. xrs = xrs.astype(F64) elif is_bool(xrs.dtype): # GDAL doesn't support boolean dtype either so convert to uint8 # 0-1 encoding. rio_is_bool = True xrs = xrs.astype(U8) if not ext or ext not in WRITE_NOT_IMPLEMENTED_EXTS: kwargs = {"lock": True, "compute": True, **rio_gdal_kwargs} if "blockheight" in kwargs: value = kwargs.pop("blockheight") kwargs["blockysize"] = value if "blockwidth" in kwargs: value = kwargs.pop("blockwidth") kwargs["blockxsize"] = value if rio_is_bool: # Store each entry using a single bit kwargs["nbits"] = 1 xrs.rio.to_raster(path, **kwargs) else: # TODO: populate raise NotImplementedError()
def _get_valid_variables(meta, ignore_too_many_dims): data_vars = list(meta.data_vars) valid = [] for v in data_vars: n = meta[v].squeeze().ndim if n > 3: if ignore_too_many_dims: continue else: raise DimensionsError( f"Too many dimensions for variable {v!r} with " f"{meta[v].ndim}." ) elif n in (2, 3): valid.append(v) else: raise DimensionsError( f"Too few dimensions for variable {v!r} with {n}." ) if not valid: raise ValueError("No valid raster variables found") return valid def _build_raster(path, variable, affine, crs, xarray_kwargs): from raster_tools.raster import data_to_raster if affine is None: affine = Affine(1, 0, 0, 0, -1, 0, 0) kwargs = xarray_kwargs.copy() kwargs["chunks"] = "auto" var = xr.open_dataset(path, **kwargs)[variable].squeeze() x = var[var.rio.x_dim].to_numpy() y = var[var.rio.y_dim].to_numpy() nv = var._FillValue if "_FillValue" in var.attrs else var.rio.nodata raster = data_to_raster(var.data, x=x, y=y, affine=affine, crs=crs, nv=nv) if nv is None or np.isnan(nv): raster = raster.set_null_value(get_default_null_value(raster.dtype)) return raster def _get_affine(ds): try: affine = ds.rio.transform() except TypeError as err: # Some datasets like gridMET improperly encode the transform. raise AffineEncodingError( "Error reading GeoTransform data:" f"{ds.coords[ds.rio.grid_mapping].attrs['GeoTransform']!r}" ) from err return affine
[docs]def open_dataset( path, crs=None, ignore_extra_dim_errors=False, xarray_kwargs=None, ): """Open a netCDF or GRIB dataset. This function opens a netCDF or GRIB dataset file and returns a dictionary of Raster objectds where each raster corrersponds to the variables in the the file. netCDF/GRIB files can be N-dimensional, while rasters only comprehend 2 to 3 dimensions (band, y, x), so it may not be possible to map all variables in a file to a raster. See the `ignore_extra_dim_errors` option below for more information. Parameters ---------- path : str THe path to the netCDF or GRIB dataset file. crs : str, rasterio.crs.CRS, optional A coordinate reference system definition to attach to the dataset. This can be an EPSG, PROJ, or WKT string. It can also be a `rasterio.crs.CRS` object. netCDF/GRIB files do not always encode a CRS. This option allows a CRS to be supplied, if known ahead of time. It can also be used to override the CRS encoded in the file. ignore_extra_dim_errors : bool, optional If ``True``, ignore dataset variables that cannot be mapped to a raster. An error is raised, otherwise. netCDF/GRIB files allow N-dimensional. Rasters only comprehend 2 or 3 dimensional data so it is not always possible to map a variable to a raster. The default is ``False``. xarray_kwargs : dict, optional Keyword arguments to supply to `xarray.open_dataset` when opening the file. Raises ------ raster_tools.io.AffineEncodingError Raised if the affine matrix is improperly encoded. ra Returns ------- dataset : dict of Raster A ``dict`` of Raster objects. The keys are the variable names in the dataset file and the values are the corresponding variable data as a raster. """ if xarray_kwargs is None: xarray_kwargs = {} xarray_kwargs["decode_coords"] = "all" tmp_ds = xr.open_dataset(path, **xarray_kwargs) data_vars = _get_valid_variables(tmp_ds, ignore_extra_dim_errors) crs = crs or tmp_ds.rio.crs affine = _get_affine(tmp_ds) tmp_ds = None ds = {} for v in data_vars: ds[v] = _build_raster(path, v, affine, crs, xarray_kwargs) return ds