"""Landsat preprocessing."""
from pydantic import validate_call
from typing import Any, Literal, Optional, Union
import warnings
from pathlib import Path
from urllib.parse import urlparse
import fsspec
import dask
from xarray import DataArray, Dataset
import satpy
from pystac import Item
from remote_sensing_processor.common.common_functions import create_folder, persist
from remote_sensing_processor.common.common_raster import (
check_dtype,
clip_to_initial_bbox,
clipf,
get_first_proj,
get_initial_bbox,
prepare_nodata,
reproject,
reproject_match,
restore_nodata_from_nan,
unpack_bitmap,
write,
)
from remote_sensing_processor.common.dataset import check_output
from remote_sensing_processor.common.types import (
CRS,
DirectoryPath,
FilePath,
ListOfPath,
ListOfPystacItem,
NewPath,
Temperature,
)
from remote_sensing_processor.imagery.landsat.dataset import (
postprocess_landsat_dataset,
read_landsat_dataset,
)
from remote_sensing_processor.imagery.types import get_type
warnings.filterwarnings("ignore", message=".*divide by zero.*")
warnings.filterwarnings("ignore", message=".*invalid value encountered.*")
warnings.filterwarnings("ignore", message=".*You will likely lose important projection information.*")
[docs]
@validate_call
def landsat(
inputs: Union[ListOfPath, ListOfPystacItem],
output_path: Optional[Union[DirectoryPath, NewPath]] = None,
crs: Optional[Union[Literal["same"], CRS]] = None,
cloud_mask: Optional[bool] = True,
pansharpen: Optional[bool] = True,
keep_pan_band: Optional[bool] = False,
resample: Optional[str] = "gradient_search",
clip: Optional[FilePath] = None,
t: Optional[Temperature] = "k",
normalize: Optional[bool] = False,
write_stac: Optional[bool] = True,
) -> list[NewPath]:
"""
Preprocess Landsat imagery.
Parameters
----------
inputs : string or list of strings or STAC Item or list of STAC Items
Path to archive / directory or list of paths to archives / directories
or a STAC Item or list of STAC Items (e.g., from Planetary Computer).
output_path : string (optional)
Path to output directory. If not set, then will write to the same directory as archive.
Must be set if the inputs are remote STAC Items.
crs : any (optional)
CRS in which output data should be or `same` to get CRS from the first archive.
cloud_mask : bool (default = True)
Is cloud masking needed.
pansharpen : bool (default = True)
Is pansharpening needed. RSP uses Brovey transform for pansharpening Landsat 7, 8 and 9.
keep_pan_band : bool (default = False)
Keep a pansharpening band or delete it. The pansharpening band has the same wavelengths as optical bands,
so it does not contain any additional information to other bands. Affects only Landsat 7, 8 and 9.
resample : resampling method from satpy as a string (default = 'gradient_search')
Resampling method that will be used to upscale bands that cannot be upscaled in pansharpening operation.
You can read more about resampling methods
`here <https://satpy.readthedocs.io/en/stable/api/satpy.resample.html#resampling-algorithms>`_.
Affects only Landsat 7, 8 and 9.
clip : string (optional)
Path to a vector file to be used to crop the image.
t : string ('k' or 'c', default = 'k')
Convert thermal bands to kelvins or celsius (no fahrenheit lol).
normalize : bool (default = False)
Is min-max normalization to 0-1 range needed.
write_stac : bool (default = True)
If True, then output metadata is saved to a STAC file.
Returns
-------
list of pathlib.Paths
List of paths where preprocessed Landsat products are saved.
Examples
--------
>>> import remote_sensing_processor as rsp
>>> from glob import glob
>>> landsat_imgs = glob("/home/rsp_test/landsat/*.tar")
>>> print(landsat_imgs)
['/home/rsp_test/landsat/LC08_L1TP_160023_20210825_20210901_02_T1.tar',
'/home/rsp_test/landsat/LT05_L1TP_160023_20110814_20200820_02_T1.tar',
'/home/rsp_test/landsat/LE07_L1TP_159023_20210826_20210921_02_T1.tar',
'/home/rsp_test/landsat/LT05_L1TP_162023_20110812_20200820_02_T1.tar',
'/home/rsp_test/landsat/LM05_L1TP_161023_19930803_20211018_02_T2.tar']
>>> output_landsats = rsp.landsat(landsat_imgs)
Preprocessing of /home/rsp_test/landsat/LC08_L1TP_160023_20210825_20210901_02_T1.tar completed
Preprocessing of /home/rsp_test/landsat/LT05_L1TP_160023_20110814_20200820_02_T1.tar completed
Preprocessing of /home/rsp_test/landsat/LE07_L1TP_159023_20210826_20210921_02_T1.tar completed
Preprocessing of /home/rsp_test/landsat/LT05_L1TP_162023_20110812_20200820_02_T1.tar completed
Preprocessing of /home/rsp_test/landsat/LM05_L1TP_161023_19930803_20211018_02_T2.tar completed
>>> print(output_landsats)
['/home/rsp_test/landsat/LC08_L1TP_160023_20210825_20210901_02_T1/LC08_L1TP_160023_20210825_20210901_02_T1.json',
'/home/rsp_test/landsat/LT05_L1TP_160023_20110814_20200820_02_T1/LT05_L1TP_160023_20110814_20200820_02_T1.json',
'/home/rsp_test/landsat/LE07_L1TP_159023_20210826_20210921_02_T1/LE07_L1TP_159023_20210826_20210921_02_T1.json',
'/home/rsp_test/landsat/LT05_L1TP_162023_20110812_20200820_02_T1/LT05_L1TP_162023_20110812_20200820_02_T1.json',
'/home/rsp_test/landsat/LM05_L1TP_161023_19930803_20211018_02_T2/LM05_L1TP_161023_19930803_20211018_02_T2.json']
"""
paths = []
for path in inputs:
if crs == "same":
crs = get_first_proj(path)
outfile = landsat_proc(
path=path,
output_path=output_path,
crs=crs,
cloud_mask=cloud_mask,
pansharpen=pansharpen,
keep_pan_band=keep_pan_band,
resample=resample,
t=t,
clip=clip,
normalize=normalize,
write_stac=write_stac,
)
paths.append(outfile)
print("Preprocessing of " + str(path) + " completed")
return paths
def landsat_proc(
path: Union[Path, Item],
output_path: Optional[Union[DirectoryPath, NewPath]] = None,
crs: Optional[CRS] = None,
cloud_mask: Optional[bool] = True,
pansharpen: Optional[bool] = True,
keep_pan_band: Optional[bool] = False,
resample: Optional[str] = "gradient_search",
t: Optional[Temperature] = "k",
clip: Optional[FilePath] = None,
normalize: Optional[bool] = False,
write_stac: Optional[bool] = True,
) -> NewPath:
"""Process a single Landsat product."""
output_path = check_output(path, output_path, parent=True)
rsp_type = get_type(path)
files = get_files(path, rsp_type)
scene = satpy.Scene(filenames=files)
# Get band names
spectral_bands, thermal_bands, pan_band, qa_bands = get_bands(files)
if pan_band is not None and pansharpen:
all_bands = spectral_bands + thermal_bands + pan_band
else:
all_bands = spectral_bands + thermal_bands
if cloud_mask:
all_bands += qa_bands
ok_bands = [band for band in all_bands if band in scene.available_dataset_names()]
scene.load(ok_bands)
dataset = read_landsat_dataset(path, scene, rsp_type)
# Clipping data to bbox
bbox = get_initial_bbox(scene.to_xarray_dataset(), clip)
if bbox is not None:
scene = scene.crop(xy_bbox=bbox[0].bounds)
scene = persist(scene)
scene = scene.resample(resampler=resample)
scene = persist(scene)
img = scene.to_xarray_dataset()
# Rename bands according to STAC convention
img = img.rename(
{
band: asset
for asset in dataset.assets
for band in ok_bands
if band == dataset.assets[asset].ext.eo.bands[0].name
},
)
# Set up nodata value
img, _ = prepare_nodata(img, default_nodata=0)
# Normalizing and processing temperature bands
img = img.map(process, t=t, normalize=normalize)
img = persist(img)
# Pansharpening
pan = None
if pan_band is None:
pansharpen = False
keep_pan_band = False
if (pansharpen is True) or (keep_pan_band is True):
if pansharpen:
pan_c = get_pan_coef(img, dataset)
img = img.map(pansharp, pan_c=pan_c.data, rsp_type=rsp_type, normalize=normalize)
if not keep_pan_band:
img = img.drop_vars(["pan"])
img = persist(img)
else:
scene = satpy.Scene(filenames=files)
scene.load(pan_band)
pan = scene["B8"]
pan = clip_to_initial_bbox(pan, bbox)
pan = persist(pan)
pan, _ = prepare_nodata(pan, default_nodata=0)
pan = process(pan, t=t, normalize=normalize)
pan = persist(pan)
# Reading quality assessment band and masking clouds
if cloud_mask:
qa = img[qa_bands]
img = img.drop_vars(qa_bands)
img, pan = get_mask(img, qa, dataset, pan)
# Reprojecting
if crs is not None:
img = reproject(img, crs)
if (pansharpen is False) and (keep_pan_band is True):
pan = reproject(pan, crs)
# Clipping
if clip is not None:
img = clipf(img, clip)
if (pansharpen is False) and (keep_pan_band is True):
pan = clipf(pan, clip)
img = check_dtype(img)
if (pansharpen is False) and (keep_pan_band is True):
pan = check_dtype(pan)
img = restore_nodata_from_nan(img)
if (pansharpen is False) and (keep_pan_band is True):
pan = restore_nodata_from_nan(pan)
# Save
outfiles = []
results = []
# Creating an output folder
json_path = output_path / dataset.id / (dataset.id + ".json")
# Updating STAC dataset
postprocess_landsat_dataset(dataset, img, json_path, pansharpen, keep_pan_band, pan)
# Creating an output directory or cleaning the directory if already exists
create_folder(output_path / dataset.id)
# Writing files
for band in img:
pathres = output_path / dataset.id / dataset.assets[band].href
outfiles.append(pathres)
results.append(write(img[band], pathres, compute=False))
if (pansharpen is False) and (keep_pan_band is True):
pathres = output_path / dataset.id / dataset.assets["pan"].href
outfiles.append(pathres)
results.append(write(pan, pathres, compute=False))
dask.compute(*results)
if write_stac:
# Writing JSON metadata file
dataset.save_object(dest_href=json_path.as_posix())
return json_path
return json_path.parent
def get_files(path: Union[Path, Item], rsp_type: str) -> dict[str, dict[str, Any]]:
"""Get files from the input path."""
# Construct reader name
if "olitirs" in rsp_type:
sensor = "oli_tirs"
elif "etm" in rsp_type:
sensor = "etm"
elif "tm" in rsp_type:
sensor = "tm"
elif "mss" in rsp_type:
sensor = "mss"
else:
raise ValueError("Unknown Landsat product")
if "l1" in rsp_type:
level = "l1"
elif "l2" in rsp_type:
level = "l2"
else:
raise ValueError("Unknown Landsat product")
reader = sensor + "_" + level + "_tif"
if isinstance(path, Item):
urls = [path.assets[i].href for i in path.assets]
reference = {
"version": 1,
"templates": {},
"refs": {Path(urlparse(url).path).name: [url] for url in urls},
}
fs = fsspec.filesystem("reference", fo=reference)
files = satpy.find_files_and_readers(base_dir="", reader=reader, fs=fs)
for key in files:
files[key] = [satpy.readers.core.remote.FSFile(file, fs=fs) for file in files[key]]
elif path.is_file() and (
(".tar" in path.suffixes) or (".gz" in path.suffixes) or (".TAR" in path.suffixes) or (".GZ" in path.suffixes)
):
fs = fsspec.filesystem("tar", fo=path.as_posix())
files = satpy.find_files_and_readers(base_dir="", reader=reader, fs=fs)
for key in files:
files[key] = [satpy.readers.core.remote.FSFile(file, fs=fs) for file in files[key]]
elif path.is_dir():
files = satpy.find_files_and_readers(base_dir=path.as_posix(), reader=reader)
for key in files:
files[key] = [Path(file).resolve() for file in files[key]]
else:
raise ValueError(str(path) + " is not a directory, tar/tar.gz archive or STAC object.")
return files
def get_bands(files: dict[str, dict[str, Any]]) -> tuple[list[str], list[str], Optional[list[str]], list[str]]:
"""Get band names for a specific Landsat product."""
if "oli_tirs_l1_tif" in files:
spectral_bands = ["B1", "B2", "B3", "B4", "B5", "B6", "B7", "B9"]
thermal_bands = ["B10", "B11"]
elif "oli_tirs_l2_tif" in files:
spectral_bands = ["B1", "B2", "B3", "B4", "B5", "B6", "B7"]
thermal_bands = ["B10"]
elif "etm_l1_tif" in files:
spectral_bands = ["B1", "B2", "B3", "B4", "B5", "B7"]
thermal_bands = ["B6_VCID_1", "B6_VCID_2"]
elif "etm_l2_tif" in files or "tm_l1_tif" in files or "tm_l2_tif" in files:
spectral_bands = ["B1", "B2", "B3", "B4", "B5", "B7"]
thermal_bands = ["B6"]
else:
spectral_bands = ["B1", "B2", "B3", "B4", "B5", "B6", "B7"]
thermal_bands = []
pan_band = ["B8"] if "oli_tirs_l1_tif" in files or "etm_l1_tif" in files else None
if "oli_tirs_l2_tif" in files:
qa_bands = ["qa", "qa_radsat", "qa_aerosol", "qa_st"]
elif "etm_l2_tif" in files or "tm_l2_tif" in files:
qa_bands = ["qa", "qa_radsat", "qa_atmos_opacity", "qa_cloud", "qa_st"]
else:
qa_bands = ["qa", "qa_radsat"]
return spectral_bands, thermal_bands, pan_band, qa_bands
def process(img: DataArray, t: Optional[Temperature] = "k", normalize: Optional[bool] = False) -> DataArray:
"""Normalize and modify temperature bands."""
if img.name in ["lwir11", "lwir12"]:
# Change temperature units
if t == "c":
img = img - 273.15
deg = 273.15
elif t == "k":
deg = 0
else:
raise ValueError("wtf is a fahrenheit")
# Normalize temperature in range 175 k - 375 k
if normalize:
img = (img - (175 - deg)) / ((375 - deg) - (175 - deg))
elif normalize:
img = img * 0.01
return img.clip(0, None)
def get_mask(
img: Dataset,
qa: Dataset,
dataset: Item,
pan: Optional[DataArray] = None,
) -> tuple[Dataset, Optional[DataArray]]:
"""Gets a mask from Quality assessment bands."""
# QA Pixel band
qa_pixel = qa["qa"].astype("int16")
qa_pixel = unpack_bitmap(qa_pixel)
qa_pixel = (
(qa_pixel[0] == 1) # Fill
| (qa_pixel[1] == 1) # Dilated cloud
| (qa_pixel[2] == 1) # Cirrus
| (qa_pixel[3] == 1) # Cloud
| (qa_pixel[4] == 1) # Cloud shadow
| ((qa_pixel[8] == 0) & (qa_pixel[9] == 1)) # Medium cloud confidence
| ((qa_pixel[8] == 1) & (qa_pixel[9] == 1)) # High cloud confidence
| ((qa_pixel[10] == 1) & (qa_pixel[11] == 1)) # High cloud shadow confidence
| ((qa_pixel[14] == 1) & (qa_pixel[15] == 1)) # High cirrus confidence
)
img = img.where(qa_pixel.data == 0)
if pan is not None:
qa_pixel = reproject_match(qa_pixel, pan)
pan = pan.where(qa_pixel.data == 0)
# QA Radsat band
if "qa_radsat" in img:
qar = qa["qa_radsat"].astype("int16")
qar = unpack_bitmap(qar)
def mask_qar(data: DataArray) -> DataArray:
"""Mask saturarted areas in a specific band."""
btitle = dataset.assets[str(data.name)].ext.eo.bands[0].name[1:]
if btitle.isdigit() and int(btitle) < 9:
btitle = int(btitle) - 1
elif btitle == "6_VCID_1":
btitle = 5
elif btitle == "6_VCID_2":
btitle = 8
if isinstance(btitle, int):
data = data.where(qar[btitle].data == 0)
return data
img = img.map(mask_qar)
img = img.where(qar[9].data == 0)
if pan is not None:
qar = reproject_match(qar, pan)
pan = pan.where(qar[9].data == 0)
# QA Aerosol
if "qa_aerosol" in img:
qa_aerosol = qa["qa_aerosol"].astype("int8")
qa_aerosol = unpack_bitmap(qa_aerosol)
qa_aerosol = (qa_aerosol[6] == 1) and (qa_aerosol[7] == 1) # High aerosol level
img = img.where(qa_aerosol.data == 0)
if pan is not None:
qa_aerosol = reproject_match(qa_aerosol, pan)
pan = pan.where(qa_aerosol.data == 0)
# QA Cloud
if "qa_cloud" in img:
qa_cloud = qa["qa_cloud"].astype("int8")
qa_cloud = unpack_bitmap(qa_cloud)
qa_cloud = (
(qa_cloud[1] == 1) # Cloud
| (qa_cloud[2] == 1) # Cloud shadow
| (qa_cloud[3] == 1) # Adjacent to cloud
)
img = img.where(qa_cloud.data == 0)
if pan:
qa_cloud = reproject_match(qa_cloud, pan)
pan = pan.where(qa_cloud.data == 0)
# QA Atmospheric opacity
if "qa_atmos_opacity" in img:
qa_ao = qa["qa_atmos_opacity"]
img = img.where(qa_ao.data > 0.3)
if pan is not None:
qa_ao = reproject_match(qa_ao, pan)
pan = pan.where(qa_ao.data > 0.3)
# QA ST
if "qa_st" in img:
qa_st = qa["qa_st"]
img = img.where(qa_st.data > 15)
if pan is not None:
qa_st = reproject_match(qa_st, pan)
pan = pan.where(qa_st.data > 15)
img = persist(img)
if pan is not None:
pan = persist(pan)
# TODO: this is a fix for an odc.reproject error that replaces 0 with -1 if nodata not set, remove it when fixed
# mask.rio.write_nodata(-1, inplace=True)
return img, pan
def get_pan_coef(img: Dataset, dataset: Item) -> DataArray:
"""Calculates panchromatic coefficient."""
if dataset.common_metadata.description == "Landsat_olitirs_up_l1":
return (img.pan / ((0.42 * img.blue) + (0.98 * img.green) + (0.6 * img.red)) / 2).squeeze()
if dataset.common_metadata.description == "Landsat_etm_up_l1":
return (img.pan / ((0.42 * img.blue) + (0.98 * img.green) + (0.6 * img.red) + (1 * img.nir08)) / 3).squeeze()
raise ValueError(f"{dataset.common_metadata.description} cannot be processed")
def pansharp(img: DataArray, pan_c: DataArray, rsp_type: str, normalize: Optional[bool] = False) -> DataArray:
"""Performs pansharpening of a single band."""
if (rsp_type == "Landsat_olitirs_up_l1" and img.name in ["blue", "green", "red"]) or (
rsp_type == "Landsat_etm_up_l1" and img.name in ["blue", "green", "red", "nir"]
):
if normalize:
return (img * pan_c).clip(0, 1)
return (img * pan_c).clip(0, 100)
return img