Source code for museopheno.time_series

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# =============================================================================
#  __  __                        _____  _
# |  \/  |                      |  __ \| |
# | \  / |_   _ ___  ___  ___   | |__) | |__   ___ _ __   ___
# | |\/| | | | / __|/ _ \/ _ \  |  ___/| '_ \ / _ \ '_ \ / _ \
# | |  | | |_| \__ \  __/ (_) | | |    | | | |  __/ | | | (_) |
# |_|  |_|\__,_|___/\___|\___/  |_|    |_| |_|\___|_| |_|\___/
#
# @author:  Nicolas Karasiak
# @site:    www.karasiak.net
# @git:     www.github.com/nkarasiak/MuseoPheno
# =============================================================================
"""
The :mod:`museopheno.time_series` module gathers functions to compute expression and 
smooth time series.
"""
import datetime as dt

import math
import numpy as np
np.seterr(divide='ignore')

from scipy import interpolate
from scipy import signal
from scipy.ndimage import median_filter
from scipy.optimize import minimize, Bounds

from museopheno.time_series import __dl as fun_dl # double logistic by M. Fauvel

import re

[docs]def get_phenology_metrics(X,sos=0.2,eos=0.8,min_from_year=False): """ Return indices of phenology metrics (start of season and end of season). Parameters ---------- X : array array can be two dimensions (a line per sample) sos : float, optional (default=0.2) Percentage (0.2 for 20%) of the season amplitude eos : float, optional (default=0.8) Percentage (0.8 for 80%) of the season amplitude min_from_year : bool, optional (default=False) If True, metrics thresolds will be computed using the min from the year If False, metrics thresolds will be computed using the min from the season (start or end). Returns ------- Array with as many lines as samples. First column is SOS index, second is EOS index. """ if X.ndim==2 and X.shape[0] != 1: feats = np.zeros((X.shape[0],2)) for feature in range(X.shape[0]): feat = PhenologyMetrics(X[feature,...],sos,eos,min_from_year=min_from_year) feats[feature,:] = feat.sos,feat.eos else: feat = PhenologyMetrics(X,sos,eos,min_from_year=min_from_year) feats = np.asarray([feat.sos,feat.eos]).reshape(-1,2) return feats
[docs]class PhenologyMetrics: """ Get phenology metrics from one feature. start of season, end of season, length of season and amplitude. Parameters ------------ X : array array (1 dim) sos : float, optional (default=0.2) Percentage (0.2 for 20%) of the season amplitude eos : float, optional (default=0.8) Percentage (0.8 for 80%) of the season amplitude min_from_year : bool, optional (default=False) If True, metrics thresolds will be computed using the min from the year If False, metrics thresolds will be computed using the min from the season (start or end). """
[docs] def __init__(self,X,sos=0.2,eos=0.8,min_from_year=False): # thresold self.sos_thresold = sos self.eos_thresold = eos # arg self._argmax = np.argmax(X) self._argmin_sos = np.argmin(X[...,:self._argmax]) self._argmin_eos = self._argmax+np.argmin(X[...,self._argmax:]) self._min_from_year = min_from_year """ if min_from_year is True: print('Warning : Be careful, this option is in development') """ # val self._min = np.amin((X[...,self._argmin_sos],X[...,self._argmin_eos])) self._max = X[...,self._argmax] # n_features stuck to 1 for the moment self.n_features = 1 if X.ndim ==2: if X.shape[0] != 1: raise ValueError('only one feature at a time') self.X = X.flatten() self._get_sos() self._get_eos() self._get_los()
# self._get_amp() def _get_sos(self): if self._min_from_year: amp_sos = self.X[...,self._argmax]-self._min else: amp_sos = self.X[...,self._argmax]-self.X[...,self._argmin_sos] thresold = self.sos_thresold*amp_sos idx = self._argmin_sos+np.where(self.X[...,self._argmin_sos:self._argmax+1]>=self.X[...,self._argmin_sos]+thresold)[0][0] self.sos = idx def _get_eos(self): if self._min_from_year: amp_eos = self.X[...,self._argmax]-self._min else: amp_eos = self.X[...,self._argmax]-self.X[...,self._argmin_eos] thresold = (1-self.eos_thresold)*amp_eos idx = self._argmax+np.where(self.X[...,self._argmax:self._argmin_eos+1]<=self.X[...,self._argmax]-thresold)[0][0] self.eos = idx def _get_los(self): self.los = self.eos-self.sos
# def _get_amp(self): # self.amp = self.X[self._argmax]-np.min([self._argmin_eos,self._argmin_sos]) def _convert_band_to_array_idx(X, expression, n_comp, date, bands_order, condition=False, order_by='date', nodata=-9999): X = np.copy(X) # bandsToChange = re.findall('B[0-9]*',str(expression)) bandsToChange = re.findall('B[0-9]*[A-Z]*', str(expression)) out = np.zeros(X[:, 0].shape) out[:] = nodata for band in set(bandsToChange): # set to order band originalBand = band[1:] # to remove B while originalBand[0] == '0': originalBand = originalBand[1:] bandIdx = np.where(np.in1d(bands_order, originalBand) == True)[0] if order_by == 'date': newBand = n_comp * date + bandIdx elif order_by == 'band': newBand = bandIdx*date+n_comp if condition: condition = condition.replace(band, "X[:,{}]".format(newBand)) if isinstance(expression,list): expression = expression[0] expression = expression.replace(band, "X[:,{}]" .format(newBand)) if condition: TF = eval(condition).flatten() out[TF] = eval(expression)[TF].flatten() else: out = eval(expression) return out.flatten() def _are_bands_availables(bands_order, expression, compulsory=True): """ Check if band needed for expression are available Parameters ----------- bands_order : list list of bands (e.g. ['2','3','4','8']) expression : str expression (e.g. 'B08/B02*100') Examples --------- >>> _are_bands_availables(['2','3','4','8'],'B08/B02*100') True >>> _are_bands_availables(['2','3','4','8'],'B8A/B04*100') ValueError: Band ['B8A'] is missing. """ bandsToChange = re.findall('B[0-9]*[A-Z]*', str(expression)) bands_order_withoutB = [] for b in bands_order: if b.capitalize().startswith('B'): b = b[1:] while b[0] == '0': b = b[1:] bands_order_withoutB.append(str(b).capitalize()) missing_bands = [] for band in bandsToChange: band_withoutBand0 = band[1:] while str(band_withoutBand0).find('0') == 0: band_withoutBand0 = band_withoutBand0[1:] if str(band_withoutBand0).capitalize() in bands_order_withoutB: pass else: missing_bands.append(band) if len(missing_bands) > 0: if compulsory: if len(missing_bands) > 1: verb, s = ['are', 's'] else: verb, s = ['is', ''] raise ValueError( 'Band{} {} {} missing.'.format( s, missing_bands, verb)) else: return False else: return True
[docs]def expression_manager(X, bands_order, expression, interpolate_nan=True, divide_X_by=1, multiply_by=1, order_by='date', dtype=np.float32): """ Generate expression/index from an array according to a bands_order, and expression. The easiest way to use it is to choose a sensor from :class:`museopheno.sensors`. Parameters ---------- X : array. array where each line is a pixel. bands_order : list list of band order (e.g. ['2','3','4','8']) expression : str or dict. If str, contains only the expression (e.g. 'B8/B2') If dict, contains a expression key and can contain a condition key. See `museopheno.sensors.sensorManager.addIndice` function. inteprolate_nan : boolean, default True, optional. If nan value a linear interpolation is done. divide_X_by : integer or float, default 1, optional. Value to divide X before computing the indice multiply_by : integer or float, default 1, optional. Value to multiply the result (e.g. 100 to set the NDVI between -100 and 100) order_by : str, default 'date', optional. if 'date', means your raster is stacked in this way : B1, B2 to Bx for the first date, then B1,B2 to Bx for the second date... if 'band', means your raster is stacked in this way : B1 first date, B1 second date... to B1 last date, then B2 first date... dtype : numpy dtype, default np.float32, optional. dtype of the output (e.g. np.int16 to store the NDVI in integer value) Example -------- >>> from museopheno import datasets, expressionManager >>> X = datasets.Sentinel2_3a_2018(get_only_sample=True) >>> indices.generateIndice(X,bands_order=['2','3','4','8','5','6','7','8A','11','12'],expression='B4/B8') """ def _nan_helper(y): return np.isnan(y), lambda z: z.nonzero()[0] if X.ndim == 1: X = X.reshape(1, -1) X = X / divide_X_by nDates = X.shape[1] / len(bands_order) if nDates != int(X.shape[1] / len(bands_order)): raise ValueError( 'bands_order is not a multiple of the number of columns of your array which contains {} bands.'.format( X.shape[1])) else: n_comp = int(X.shape[1] / nDates) nDates = int(nDates) outIndice = np.zeros((X.shape[0], nDates), dtype=np.float64) if _are_bands_availables(bands_order, expression): for date in range(nDates): if isinstance(expression, str): mathExp = expression condition = False else: mathExp = expression['expression'] if 'condition' in expression.keys(): condition = expression['condition'] else: condition = False dateIndice = _convert_band_to_array_idx( X, mathExp, n_comp, date, bands_order, condition=condition, order_by=order_by) outIndice[:, date] = dateIndice if interpolate_nan: outIndice = np.where( np.logical_or( outIndice == np.inf, outIndice == - np.inf), np.nan, outIndice) for i in range(outIndice.shape[0]): nans, x = _nan_helper(outIndice[i, :]) if not np.all(nans == False): outIndice[i, nans] = np.interp( x(nans), x(~nans), outIndice[i, ~nans]) if multiply_by != 1: outIndice *= multiply_by if dtype: outIndice = outIndice.astype(dtype) return outIndice
[docs]class SmoothSignal:
[docs] def __init__(self, dates, bands_order=False, order_by='date', output_dates=False, fmt='%Y%m%d'): """ Smooth time series signal. Parameters ----------- dates : list list of dates. E.g. ['20180101','20180201'] bands_order : list, optional. order_by : str Default is 'date', 'band'. output_dates fmt : str, optional. Input format of dates. Default is '%Y%m%d', so '20181231' Example -------- >>> x = np.asarray([3.4825737, 4.27786 , 5.0373, 4.7196426, 4.1233397, 4.0338645,2.7735472]) >>> y = [20180429, 20180513, 20180708, 20180815, 20180915, 20181015, 20181115] >>> new_dates = generate_temporal_sampling(y[0],y[-1],10) # temporal sampling every 10 days >>> timeseries = SmoothSignal(dates=y,output_dates=new_dates) >>> timeseries.interpolation(x,kind='cubic') array([3.4825737 , 4.08649468, 4.516693 , 4.80103881, 4.96740226, 5.0436535 , 5.0576627 , 5.0373 , 5.00281393, 4.94396649, 4.84289809, 4.68186017, 4.46665263, 4.25561504, 4.11417114, 4.07489294, 4.07273781, 4.02461571, 3.84743661, 3.45811045, 2.7735472 ]) >>> timeseries.savitzski_golay(x) array([3.52581844, 3.96414587, 4.30156892, 4.49484286, 4.63045714, 4.76607143, 4.90168571, 4.96423055, 4.95370595, 4.87011189, 4.77926706, 4.65216832, 4.48881567, 4.30187759, 4.16911641, 4.09053213, 4.04814943, 3.88019043, 3.58665514, 3.18010117, 2.7735472 ]) """ self.bands_order = bands_order self.order_by = order_by # input dates self.init_dates = dates self.init_n_dates = len(dates) self.init_datetime = self.convert_to_datetime(dates, fmt=fmt) self.init_dates_int = self._convert_date_to_integer( self.init_datetime, fmt=fmt) self.init_doy = self.convert_to_doy(dates, fmt=fmt) # output dates if output_dates is False: output_dates = dates self.output_dates = output_dates self.output_n_dates = len(output_dates) self.output_doy = self.convert_to_doy(output_dates, fmt=fmt) self.output_datetime = self.convert_to_datetime(output_dates, fmt=fmt) self.output_dates_int = self._convert_date_to_integer( self.output_datetime, fmt=fmt, start_date = self.init_datetime[0]) # if temporal sampling is not the same in output if np.unique(np.diff(self.output_dates_int)).size != 1: raise Warning('Please be careful, the output dates have not the same delta. This could cause some problems.') else: self.output_deltadays = int(np.unique(np.diff(self.output_dates_int))) # delta self.output_dates_delta = self.output_dates_int[1]-self.output_dates_int[0]
def _get_time_series_position_per_band(self, X): """ Yields the time series for each band (all the B2, all the B8...) """ if self.bands_order is not False: if int(X.shape[1]/self.init_n_dates) != len(self.bands_order): raise ValueError('mismatch') for idx_band in range(len(self.bands_order)): if self.order_by == 'band': input_bands_position = [ idx_band*self.init_n_dates+d for d in range(self.init_n_dates)] output_bands_position = [ idx_band*self.output_n_dates+d for d in range(self.output_n_dates)] elif self.order_by == 'date': input_bands_position = [ idx_band+ len(self.bands_order) *d for d in range( len(self.init_dates) ) ] output_bands_position = [ idx_band+len(self.bands_order) *d for d in range( len(self.output_dates) ) ] yield input_bands_position, output_bands_position else: input_bands_position = np.arange(len(self.init_dates)) output_bands_position = np.arange(len(self.output_dates)) yield input_bands_position, output_bands_position def _resize_if_flatten(self, X): if X.ndim == 1: X = X.reshape(1, -1) return X def _get_empty_output_array(self, X): X = self._resize_if_flatten(X) if self.bands_order is not False: multiply_by = len(self.bands_order) else: multiply_by = 1 out_x = np.empty( (X.shape[0], len(self.output_dates)*multiply_by), X.dtype) self._resize_if_flatten(out_x) return out_x def _convert_date_to_integer(self, dates, fmt='%Y%m%d', convert_to_datetime=False, start_date=False): if convert_to_datetime: dates = self.convert_to_datetime(dates, fmt) if start_date: if convert_to_datetime: self.day0 = dt.datetime.strptime(str(start_date), fmt=fmt) else: self.day0 = start_date else: self.day0 = dates[0] dates_start_at_zero = [[date-self.day0][0].days for date in dates] return dates_start_at_zero
[docs] def int_to_datetime(self, dates): """ Convert list of int to datetime. Parameters ----------- dates : list List of int """ datetimes = [self.day0+ dt.timedelta(self.output_dates_int[0]) + ( dt.timedelta(i) * self.output_deltadays ) for i in dates] return datetimes
[docs] def convert_to_doy(self, dates, fmt='%Y%m%d'): """ Convert list of dates to Day Of Year (DOY) number. Parameters ----------- dates : list List of dates fmt : str Format type of each date """ return [dt.datetime.strptime(str(date), fmt).timetuple().tm_yday for date in dates]
[docs] def convert_to_datetime(self, dates, fmt='%Y%m%d'): """ Convert list of dates to a list of dates with datetime type. Parameters ----------- dates : list List of dates fmt : str Format type of each date """ return [dt.datetime.strptime(str(date), fmt) for date in dates]
[docs] def double_logistic(self, X, kind='cubic', interpolation_params={}): """ Generate a double logistic curve similar to those of the MODIS phenology product. Parameters ------------ X : array_like A N-D array of real values. The length of y along the interpolation axis must be equal to the length of dates. kind : str, default 'linear' Specifies the kind of interpolation as a string ('linear', 'nearest', 'zero', 'slinear', 'quadratic', 'cubic', 'previous', 'next', where 'zero', 'slinear', 'quadratic' and 'cubic' refer to a spline interpolation of zeroth, first, second or third order; 'previous' and 'next' simply return the previous or next value of the point) or as an integer specifying the order of the spline interpolator to use. Default is 'linear'. """ x = self._get_empty_output_array(X) X = self._resize_if_flatten(X) if X.ndim == 2: if X.shape[-1] != self.output_dates_int: X = self.interpolation(X,kind=kind,**interpolation_params) else: raise ValueError('X array must be of shape [2,-1].') params = np.asarray([0.0, 1.0, 75.0, 8.0, 250.0, 1.0])+ 10*np.random.rand(6) for n_row in range(X.shape[0]): increase_max = 100 solver = fun_dl.minimize(fun_dl.cost_function, params, args=(np.asarray(self.output_dates_int), X[n_row,:]), method='L-BFGS-B', jac=fun_dl.cost_function_grad, options={'gtol': 1e-10, 'ftol': 1e-10, 'maxiter':increase_max, 'maxcor':increase_max, 'maxfun':increase_max, 'maxls':1000}) x[n_row,:] = fun_dl.double_logistique(solver.x, np.asarray(self.output_dates_int)) return x
[docs] def interpolation(self, X, kind='linear', fill_value='extrapolate', **params): """ Based on :class:`scipy.interpolate.interp1d` Parameters ----------- X : array_like A N-D array of real values. The length of y along the interpolation axis must be equal to the length of dates. kind : str, default 'linear' Specifies the kind of interpolation as a string ('linear', 'nearest', 'zero', 'slinear', 'quadratic', 'cubic', 'previous', 'next', where 'zero', 'slinear', 'quadratic' and 'cubic' refer to a spline interpolation of zeroth, first, second or third order; 'previous' and 'next' simply return the previous or next value of the point) or as an integer specifying the order of the spline interpolator to use. Default is 'linear'. fill_value : str, default 'extrapolate' Extrapolate References ---------- :class:`scipy.interpolate.interp1d` """ x = self._get_empty_output_array(X) X = self._resize_if_flatten(X) for in_band, out_band in self._get_time_series_position_per_band(X): tmp = interpolate.interp1d( self.init_dates_int, X[:, in_band], kind=kind, **params) x[:, out_band] = self._resize_if_flatten(tmp(self.output_dates_int)) return (x)
[docs] def iterative_median(self, X, window_length=3, interpolation_params={}, **params): """ Savitzski golay Based on :class:`scipy.signal.median_filter` References ---------- :class:`scipy.signal.savgol_filter` """ x = self._get_empty_output_array(X) X = self._resize_if_flatten(X) for in_band, out_band in self._get_time_series_position_per_band(X): tmp = interpolate.interp1d( self.init_dates_int, X[:, in_band], **interpolation_params) x[:, out_band] = self._resize_if_flatten(median_filter( tmp(self.output_dates_int), size = window_length)) return x
[docs] def savitzski_golay(self, X, window_length=3, polyorder=1, interpolation_params={}, **params): """ Savitzski golay Based on :class:`scipy.signal.savgol_filter` References ---------- :class:`scipy.signal.savgol_filter` """ x = self._get_empty_output_array(X) X = self._resize_if_flatten(X) for in_band, out_band in self._get_time_series_position_per_band(X): tmp = interpolate.interp1d( self.init_dates_int, X[:, in_band], **interpolation_params) x[:, out_band] = self._resize_if_flatten(signal.savgol_filter( tmp(self.output_dates_int), window_length, polyorder, **params)) return x
[docs]def generate_temporal_sampling(start_date, last_date, day_interval=5, save_csv=False, fmt='%Y%m%d'): """ Generate a custom temporal sampling for Satellite Image Time Series. Parameters ---------- start_date : int, default False. If specified, format (YYYYMMDD). last_date : int, default False. If specified, format (YYYYMMDD). day_interval : int, default 5 Integer, days delta to between each date. save_csv : False or str. If str, path to save the csv. fmt : str, default '%Y%m%d' Format type of the input dates. Default: '%Y%m%d' (e.g. 20181230) Example ------- >>> generateTemporalSampling(20181203,20190326,day_interval=5) array([20181203, 20181208, 20181213, 20181218, 20181223, 20181228, 20190102, 20190107, 20190112, 20190117, 20190122, 20190127, 20190201, 20190206, 20190211, 20190216, 20190221, 20190226, 20190303, 20190308, 20190313, 20190318, 20190323, 20190328]) >>> generateTemporalSampling('2018-03-12','2019-26-03',day_interval=15,fmt='%Y-%d-%m') array([20181203, 20181218, 20190102, 20190117, 20190201, 20190216, 20190303, 20190318, 20190402]) >>> generateTemporalSampling('2018-03-12','2019-26-03',day_interval=15,fmt='%Y-%d-%m',save_csv='/tmp/AcquisitionDates.csv') >>> np.loadtxt('/tmp/AcquisitionDates.csv',dtype=int) array([20181203, 20181218, 20190102, 20190117, 20190201, 20190216, 20190303, 20190318, 20190402]) """ if isinstance(start_date, int): start_date = str(start_date) if isinstance(last_date, int): last_date = str(last_date) start_date = dt.datetime.strptime(start_date, fmt) last_date = dt.datetime.strptime(last_date, fmt) custom_acquisition_dates = [start_date.strftime('%Y%m%d')] newDate = start_date while newDate < last_date: newDate = newDate + dt.timedelta(day_interval) custom_acquisition_dates.append(newDate.strftime('%Y%m%d')) if save_csv: np.savetxt(save_csv, np.asarray( custom_acquisition_dates, dtype=np.int), fmt='%d') else: return np.asarray(custom_acquisition_dates, dtype=np.int)