#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# =============================================================================
# __ __ _____ _
# | \/ | | __ \| |
# | \ / |_ _ ___ ___ ___ | |__) | |__ ___ _ __ ___
# | |\/| | | | / __|/ _ \/ _ \ | ___/| '_ \ / _ \ '_ \ / _ \
# | | | | |_| \__ \ __/ (_) | | | | | | | __/ | | | (_) |
# |_| |_|\__,_|___/\___|\___/ |_| |_| |_|\___|_| |_|\___/
#
# @author: Nicolas Karasiak
# @site: www.karasiak.net
# @git: www.github.com/nkarasiak/MuseoPheno
# =============================================================================
"""
The :mod:`museopheno.sensors` module gathers sensors to ease index and time series computation.
`MuseoPheno`.
"""
import glob
import numpy as np
import gdal
import collections
from museotoolbox.processing import RasterMath as _RasterMath
from ..time_series import _are_bands_availables, expression_manager
[docs]class SensorManager:
"""
Manage sensor in order to produce temporal index and metadata.
Parameters
-----------
bands_order : list
list on how band are ordered (e.g. ['2','3','4','8'])
bands_names : list
list on band names (e.g. ['Blue','Green','Red','NIR']). Used to setraster metadata.
wavelengths : list
list on wavelenghts (e.g. [490,560,665,842]). Used to set raster metadata.
Example
--------
>>> modis = SensorManager(bands_order=['1','2'],wavelengths=['620-670','841-876'])
>>> modis.add_index('FirstBandRatio',expression='B1/(B1+B2)',condition='(B1+B2)!=0')
"""
[docs] def __init__(self, bands_order,
bands_names=False, wavelengths=False):
self.bands_names = bands_names
self.bands_order = bands_order
self.n_bands = len(self.bands_order)
self.wavelengths = wavelengths
self.available_indices = dict()
self._add_each_band_as_index()
self.configure_bands_order() # useless for now
def _check_index_exists(self, index_name):
if index_name not in self.available_indices.keys():
raise ValueError(
str(index_name) +
' is not an available index. Please select one of them : ' +
', '.join(
self.available_indices.keys()))
else:
return True
[docs] def get_index_expression(self, index_name):
"""
Return index expression
Parameters
-----------
index_name : str, mandatory
Example
--------
>>> get_index_expression('NDVI')
{'expression': '(B8-B4)/(B8+B4)', 'condition': '(B8+B4) != 0'}
"""
if self._check_index_exists(index_name):
return self.available_indices[index_name]
[docs] def SmoothSignal(self,input_dates,output_dates=False,fmt='%Y%m%d'):
from museopheno.time_series import SmoothSignal
return SmoothSignal(dates=input_dates,output_dates=output_dates, bands_order=self.bands_order, fmt=fmt)
[docs] def add_index(self, index_name, expression,
condition=False, compulsory=True):
"""
Add index for the current sensor, verify if band is available before adding the script.
Parameters
-----------
index_name : str
name of the index
expression : str
When calling a band, start with B (e.g. 'B8/B3+0.5')
condition : str
Condition to respect when computing the index (e.g. 'B3!=0')
compulsory : boolean, default True.
If compulsory, will return an error if band is not available with the current sensor.
Example
--------
>>> add_index('NBR',expression='(B08 - B12) / (B08 + B12)',condition='(B08+B12)!=0')
>>> dataset.get_index_expression('NBR')
{'expression': '(B08 - B12) / (B08 + B12)', 'condition': '(B08+B12)!=0'}
"""
if _are_bands_availables(
self.bands_order, expression, compulsory=compulsory):
self.available_indices[index_name] = dict(expression=expression)
if condition:
self.available_indices[index_name]['condition'] = condition
def _add_each_band_as_index(self):
for band in self.bands_order:
self.available_indices['B' +
str(band)] = dict(expression='B' + str(band))
[docs] def generate_index(self, X, expression, interpolate_nan=True,
divide_X_by=1, multiply_by=1, dtype=np.float32):
"""
Generate index from array
Parameters
-----------
X : array
array where each line is a pixel.
expression : str or dict
If str, contains only the expression (e.g. 'B8/B2')
If dict, please generate it from add_index function.
inteprolate_nan : boolean, default True
If nan value a linear interpolation is done.
divide_X_by : integer or float, default 1
Value to divide X before computing the index
multiply_by : integer or float, default 1.
Value to multiply the result (e.g. 100 to set the NDVI between -100 and 100)
dtype : numpy dtype, default np.float32
dtype of the output (e.g. np.int16 to store the NDVI in integer value)
Example
--------
>>> generate_index(X,expression='B8/B2')
"""
X_ = expression_manager(
X,
self.bands_order,
expression=expression,
interpolate_nan=interpolate_nan,
dtype=dtype,
multiply_by=multiply_by,
order_by=self.order_by)
return X_
[docs] def generate_raster(self, input_raster, output_raster, expression,
interpolate_nan=True, divide_X_by=1, multiply_by=1, dtype=np.float32):
"""
Generate index from raster
Parameters
-----------
intput_raster : path
path of the raster file.
output_raster : path
path to save the raster file. (e.g. '/tmp/myIndex.tif')
expression : str or dict
If str, contains only the expression (e.g. 'B8/B2')
If dict, please generate it from add_index function.
inteprolate_nan : boolean, default True
If nan value a linear interpolation is done.
divide_X_by : integer or float, default 1
Value to divide X before computing the index
multiply_by : integer or float, default 1.
Value to multiply the result (e.g. 100 to set the NDVI between -100 and 100)
dtype : numpy dtype, default np.float32
dtype of the output (e.g. np.int16 to store the NDVI in integer value)
Example
--------
>>> generateRaster(raster,'/tmp/my_index.tif',expression='B8/B2')
"""
rM = _RasterMath(input_raster, message='Computing index')
rM.add_function(
expression_manager,
output_raster,
dtype=dtype,
bands_order=self.bands_order,
expression=expression,
interpolate_nan=interpolate_nan,
multiply_by=multiply_by,
order_by=self.order_by)
rM.run()
[docs]class Sentinel2(SensorManager):
"""
Use Sentinel2 as sensor via :class:`museopheno.sensors.sensorManager`.
Parameters
-----------
n_bands : int, default 4
4 or 10.
bands_order : str or list, default 'default'
If 'default', bands_order is the first 10m-bands (2,3,4 and 8), and if n_bands is 10 bands, the first 20-m bands (5,6,...11,12)
Example
--------
>>> dataset = Sentinel2(n_bands=10)
>>> dataset.bands_names
['Blue',
'Green',
'Red',
'NIR',
'Vegetation Red Edge 1',
'Vegetation Red Edge 2',
'Vegetation Red Edge 3',
'Vegetation Red Edge 4',
'SWIR1',
'SWIR2']
>>> dataset.get_index_expression('NDVI')
{'expression': '(B8-B4)/(B8+B4)', 'condition': '(B8+B4) != 0'}
"""
[docs] def __init__(self, n_bands=10, bands_order='default'):
self.n_bands = n_bands
self.bands_order = bands_order
if not isinstance(self.bands_order, np.ndarray) and self.bands_order == 'default':
if self.n_bands == 4:
self.bands_order = np.array(['2', '3', '4', '8'])
elif self.n_bands == 10:
self.bands_order = np.array(
['2', '3', '4', '8', '5', '6', '7', '8A', '11', '12'])
else:
raise ValueError(
'If your Sentinel-2 raster has not 4 or 10 bands, please define bands_order parameter.')
super().__init__(self.bands_order)
self.wavelenghts = np.array([443, 490, 560, 665, 842, 705, 740,
783, 865, 945, 1375, 1610, 2190])
self._bands = np.array(['1','2', '3', '4', '8', '5', '6', '7', '8A', '9', '10', '11', '12'])
self._bands_idx = np.isin(self._bands,self.bands_order)
self.wavelenghts = self.wavelenghts[self._bands_idx]
self.bands_names = np.array(['Coastal aerosol',
'Blue',
'Green',
'Red',
'NIR',
'Vegetation Red Edge 1',
'Vegetation Red Edge 2',
'Vegetation Red Edge 3',
'Vegetation Red Edge 4',
'Water vapour',
'SWIR - Cirrus',
'SWIR1',
'SWIR2'])
self.bands_names = self.bands_names[self._bands_idx]
del self._bands,self._bands_idx
index = dict(
ACORVI=['( B8 - B4 + 0.05 ) / ( B8 + B4 + 0.05 ) ',
'(B8+B4+0.05) != 0'],
ACORVInarrow=[
'( B8A - B4 + 0.05 ) / ( B8A + B4 + 0.05 ) ',
'(B8A+B4+0.05) != 0'],
SAVI=['1.5 * (B8 - B4) / ( B8 + B4 + 0.5 )'],
EVI=['( 2.5 * ( B8 - B4 ) ) / ( ( B8 + 6 * B4 - 7.5 * B2 ) + 1 )'],
EVI2=[
'( 2.5 * (B8 - B4) ) / (B8 + 2.4 * B4 + 1) ',
'B8+2.4*B4+1 != 0'],
PSRI=['( (B4 - B2) / B5 )', 'B5 != 0'],
ARI=[
'( 1 / B3 ) - ( 1 / B5 )',
'np.logical_and(B3 != 0, B5 != 0)'],
ARI2=[
'( B8 / B2 ) - ( B8 / B3 )',
' np.logical_and(B2 != 0, B3 != 0)'],
MARI=['( (B5 - B5 ) - 0.2 * (B5 - B3) ) * (B5 / B4)', 'B4 != 0'],
CHLRE=['np.power(B5 / B7,-1.0)', 'B5!=0'],
MCARI=[
' ( ( B5 - B4 ) - 0.2 * ( B5 - B3 ) ) * ( B5 / B4 )',
'B4 != 0'],
MSI=['B11 / B8', 'B8 != 0'],
MSIB12=['B12 / B8', 'B8 != 0'],
NDrededgeSWIR=['( B6 - B12 ) / ( B6 + B12 )', '( B6 + B12 ) != 0'],
SIPI2=['( B8 - B3 ) / ( B8 - B4 )', 'B8 - B4 != 0'],
NDWI=['(B8-B11)/(B8+B11)', '(B8+B11) != 0'],
LCaroC=['B7 / ( B2-B5 )', '( B7 ) != 0'],
LChloC=['B7 / B5', 'B5 != 0'],
LAnthoC=['B7 / (B3 - B5) ', '( B3-B5 ) !=0'],
Chlogreen=['B8A / (B3 + B5)'],
NDRESWIR = ['(B6-B12)/(B6+B12)','(B6+B12) != 0'],
NDVI=['(B8-B4)/(B8+B4)', '(B8+B4) != 0'],
NDVInarrow=['(B8A-B4)/(B8A+B4)', '(B8A+B4) != 0'],
NDVIre=['(B8A-B5)/(B8A+B5)', '(B8A+B5) != 0'],
RededgePeakArea=['B4+B5+B6+B7+B8A'],
Rratio=['B4/(B2+B3+B4)'],
MTCI=['(B6 - B5)/(B5 - B4)', '(B5-B4) != 0'],
S2REP=['35 * ((((B7 + B4)/2) - B5)/(B6 - B5))'],
IRECI=['(B7-B4)/(B5/B6)', 'np.logical_and(B5 != 0, B6 != 0)'],
NBR=['(B08 - B12) / (B08 + B12)', '(B08+B12)!=0'],
REP=['700 + 40 * (( ( B4 + B7 ) / B2) -5) / ( B6 - B5 ) '],
WDRVI_B6B7 = ['(0.1*B7+B6) / (0.1*B7-B6)',' (0.1*B7-B6) != 0']
)
for idx in index.keys():
if len(index[idx]) == 1:
self.add_index(idx, index[idx], compulsory=False)
else:
self.add_index(
idx,
index[idx][0],
index[idx][1],
compulsory=False)
[docs] def generate_temporal_sampling(S2_dir, start_date=False, last_date=False, day_interval=5, save_csv=False):
"""
Generate sample time for gap-filling of Time Series
Parameters
-----------
S2_dir : str
path of the folder where zip or unzipped S2 tiles are.
start_date : False str or int, default false.
If str or int, use %Y%m%d format, e.g. '20180130'.
end_date : False, str or int, default False.
If str or int, use %Y%m%d format, e.g. '20181230'.
day_interval : int, default 5
The delta days interval from the first acquistion to the last
save_csv : False or str, default False
If str, path where the csv file will be saved.
"""
# =============================================================================
# List all subfolders which begins with SENTINEL2
# if no last_date and start_date given
# =============================================================================
AcquisitionDates = [start_date, last_date]
if last_date is False or start_date is False:
S2 = glob.glob(glob.os.path.join(S2_dir, 'SENTINEL2*/'))
# if no folder, looking for zip files
if S2 == []:
S2 = glob.glob(glob.os.path.join(S2_dir, 'SENTINEL2*.zip'))
else:
S2 = [glob.os.path.basename(glob.os.path.dirname(S2Folder))
for S2Folder in S2]
# ==========================================================================
# Detecting YYYYMMDD date format
# ==========================================================================
import re
regexYYYYMMDD = r"(?<!\d)(?:(?:20\d{2})(?:(?:(?:0[13578]|1[02])31)|(?:(?:0[1,3-9]|1[0-2])(?:29|30)))|(?:(?:20(?:0[48]|[2468][048]|[13579][26]))0229)|(?:20\d{2})(?:(?:0?[1-9])|(?:1[0-2]))(?:0?[1-9]|1\d|2[0-8]))(?!\d)"
p = re.compile(regexYYYYMMDD)
AcquisitionDates = sorted(
[p.findall(S2folder)[0] for S2folder in S2])
if start_date is False:
start_date = AcquisitionDates[0]
if last_date is False:
last_date = AcquisitionDates[-1]
from museopheno.time_series import generate_temporal_sampling
return generate_temporal_sampling(AcquisitionDates[0], AcquisitionDates[-1], day_interval=day_interval, save_csv=save_csv)
[docs] def compute_SITS(self, S2dir, out_SITS, resample_CSV=False, interpolation='linear', unzip=False,
out_cloud_mask=False, check_outlier=False, use_flatreflectance=True, n_jobs=1, ram=4000):
"""
Compute Satellite Image Time Series from Sentinel2 level 2A.
This script has only been tested on Theia distributed images.
Parameters
----------
S2Dir : Directory
Directory where zip files from THEIA L2A are unzipped, or zipped.
out_SITS : Output raster
Output name of your raster (tif format, int16)
resample_20mbands : boolean, default True
If True, resample the six 20m band at 10m.
If False, use only the four 10m bands.
interpolation : str, default 'linear'
Two choices : 'linear' or 'spline'
unzip : Bool, default False.
If True, unzip only mandatory images, plus xml and jpg thumbnails.
out_cloud_mask : False or str.
If False, cloud mask won't be saved.
Else, path of the output raster file.
check_outliers : bool, default True.
If True, check outliers (values below 0 in red band are considered as invalid).
use_flatreflectance ; boolean, default True
If False, ou Surface Reflectance (SRE)
n_jobs : int, default 1.
Number of cores used.
ram : int, default 2048
Available ram in mb.
References
-----------
- For the gap-filling, see https://zenodo.org/record/45572#.XaBjs3UzaV4
- For the level 2A images, see https://theia.cnes.fr/atdistrib/rocket/
"""
from museopheno.sensors.__computeS2SITS import _computeSITS
if self.n_bands == 4:
resample_20mbands = False
else:
resample_20mbands = True
_computeSITS(
S2dir,
out_SITS,
resample_20mbands=resample_20mbands,
resample_CSV=resample_CSV,
interpolation=interpolation,
unzip=unzip,
onlyROI=False,
out_cloudMask=out_cloud_mask,
checkOutliers=check_outlier,
use_flatreflectance=use_flatreflectance,
n_jobs=n_jobs,
ram=ram)
[docs]class Venus(SensorManager):
"""
Use Venus as sensor.
Parameters
-----------
n_bands : int, default 12
bands_order : str or list, default 'default'
If 'default', bands_order is the first 10m-bands (2,3,4 and 8), and if n_bands is 10 bands, the first 20-m bands (5,6,...11,12)
Example
--------
>>> dataset = Venus()
>>> dataset.bands_names
>>> dataset.get_index_expression('NDVI')
{'expression': '(B8-B4)/(B8+B4)', 'condition': '(B8+B4) != 0'}
>>>
"""
[docs] def __init__(self, n_bands=12, bands_order='default'):
self.n_bands = n_bands
self.bands_order = bands_order
if bands_order == 'default':
if self.n_bands == 4:
self.bands_order = np.array(['1', '2', '3', '4','5','6','7','8','9','10','11','12'])
else:
raise ValueError(
'If your FormoSat2 raster has not 4 bands, please define bands_order parameter.')
super().__init__(bands_order)
self.wavelengths = [420, 443, 490, 555, 638, 638, 672, 702, 742, 782, 865, 910]
self.bands_order = ['B'+i for i in self.bands_order]
else:
super().__init__(n_bands, bands_order)
indices = dict(ACORVI=['( B4 - B3 + 0.05 ) / ( B4 + B3 + 0.05 ) ', '(B4+B3+0.05) != 0'],
SAVI=['1.5 * (B4 - B3) / ( B4 + B3 + 0.5 )'],
EVI=[
'( 2.5 * ( B4 - B3 ) ) / ( ( B4 + 6 * B3 - 7.5 * B1 ) + 1 )'],
EVI2=['( 2.5 * (B4 - B3) ) / (B4 + 2.4 * B3 + 1) ',
'B4+2.4*B3+1 != 0'],
ARI2=['( B4 / B1 ) - ( B4 / B2 )',
' np.logical_and(B1 != 0, B2 != 0)'],
NDVI=['(B4-B3)/(B4+B3)', '(B4+B3) != 0'],
Rratio=['B3/(B1+B2+B3)'])
collections.OrderedDicted(sorted(indices.items()))
for idx in indices.keys():
if len(indices[idx]) == 1:
self.add_index(idx, indices[idx], compulsory=False)
else:
self.add_index(
idx,
indices[idx][0],
indices[idx][1],
compulsory=False)