# -*- coding: utf-8 -*-
#
# Copyright (c) 2017 - 2020 Karlsruhe Institute of Technology - Steinbuch Centre for Computing
# This code is distributed under the MIT License
# Please, see the LICENSE file
#
# @author: vykozlov
import glob
import matplotlib.pyplot as plt
import numpy as np
import o3api.config as cfg
import o3api.plothelpers as phlp
import os
import logging
import pandas as pd
from scipy import signal
from statsmodels.tsa.seasonal import seasonal_decompose # accurate enough
import xarray as xr
import cProfile
import io
import pstats
from functools import wraps
logger = logging.getLogger('__name__') #o3api
logger.setLevel(cfg.log_level)
# configuration for netCDF
TIME = cfg.netCDF_conf['t_c']
LAT = cfg.netCDF_conf['lat_c']
TCO3 = cfg.netCDF_conf['tco3']
VMRO3 = cfg.netCDF_conf['vmro3']
TCO3Return = cfg.netCDF_conf['tco3_r']
# configuration for API
api_c = cfg.api_conf
def _profile(func):
"""Decorate function for profiling
"""
@wraps(func)
def wrapper(*args, **kwargs):
pr = cProfile.Profile()
pr.enable()
retval = func(*args, **kwargs)
pr.disable()
s = io.StringIO()
sortby = 'cumulative' #SortKey.CUMULATIVE # 'cumulative'
ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
ps.print_stats()
print(s.getvalue())
return retval
return wrapper
[docs]def set_data_processing(plot_type, **kwargs):
"""Function to inizialize proper class for data processing
:param plot_type: The plot type (e.g. tco3_zm, vmro3_zm, ...)
:return: object corresponding to the plot type
"""
if plot_type == TCO3:
data = ProcessForTCO3(**kwargs)
elif plot_type == VMRO3:
data = ProcessForVMRO3(**kwargs)
elif plot_type == TCO3Return:
data = ProcessForTCO3Return(**kwargs)
return data
[docs]class Dataset:
"""Base Class to initialize the dataset
:param plot_type: The plot type (e.g. tco3_zm, vmro3_zm, ...)
"""
def __init__ (self, plot_type, **kwargs):
"""Constructor method
"""
self.plot_type = plot_type
self._data_pattern = self.plot_type + "*.nc"
self._datafiles = [] #None
def __set_datafiles(self, model):
"""Set the list of corresponding datafiles
:param model: The model to process
"""
# strip possible spaces in front and back, and then quotas
model = model.strip().strip('\"')
self._datafiles = glob.glob(os.path.join(cfg.O3AS_DATA_BASEPATH,
model,
self._data_pattern))
[docs] def get_dataset(self, model):
"""Load data from the datafile list
:param model: The model to process
:return: xarray dataset
:rtype: xarray
"""
# Check: http://xarray.pydata.org/en/stable/dask.html#chunking-and-performance
# chunks={'latitude': 8} - very machine dependent!
# laptop (RAM 8GB) : 8, lsdf-gpu (128GB) : 64
# engine='h5netcdf' : need h5netcdf files? yes, but didn't see improve
# parallel=True : in theory should use dask.delayed
# to open and preprocess in parallel. Default is False
self.__set_datafiles(model)
chunk_size = int(os.getenv('O3API_CHUNK_SIZE', -1))
if chunk_size > 0:
ds = xr.open_mfdataset(self._datafiles,
chunks={LAT: chunk_size },
concat_dim=TIME,
data_vars='minimal', coords='minimal',
parallel=False)
else:
ds = xr.open_mfdataset(self._datafiles,
concat_dim=TIME,
data_vars='minimal',
coords='minimal',
parallel=False)
return ds
[docs]class DataSelection(Dataset):
"""Class to perform data selection, based on :class:`Dataset`.
:param begin: Year to start data scanning from
:param end: Year to finish data scanning
:param month: Month(s) to select, if not a whole year
:param lat_min: Minimum latitude to define the range (-90..90)
:param lat_max: Maximum latitude to define the range (-90..90)
"""
def __init__ (self, plot_type, **kwargs):
"""Constructor method
"""
super().__init__(plot_type, **kwargs)
self.begin = kwargs[api_c['begin']]
self.end = kwargs[api_c['end']]
self.month = kwargs[api_c['month']]
self.lat_min = kwargs[api_c['lat_min']]
self.lat_max = kwargs[api_c['lat_max']]
def __check_latitude_order(self, ds):
"""Function to check the latitude order,
returns them correctly ordered
:param ds: xarray dataset to check
:return: lat_0, lat_last
"""
# check in what order latitude is used, e.g. (-90..90) or (90..-90)
lat_0 = np.amin(ds.coords[LAT].values[0]) # min latitude
lat_last = np.amax(ds.coords[LAT].values[-1]) # max latitude
if lat_0 < lat_last:
lat_a = self.lat_min
lat_b = self.lat_max
else:
lat_a = self.lat_max
lat_b = self.lat_min
return lat_a, lat_b
[docs] def get_dataslice(self, model):
"""Function to select the slice of data according
to the time and latitude requested
:param model: The model to process
:return: xarray dataset selected according to the time and latitude
:rtype: xarray
"""
ds = super().get_dataset(model)
logger.info("Dataset is loaded from storage location: {}".format(ds))
# check in what order latitude is used, return them correspondently
lat_a, lat_b = self.__check_latitude_order(ds)
# select data according to the period and latitude
# BUG(?) ccmi-umukca-ucam complains about 31-12-year, but 30-12-year works
# CFTime360day date format has 30 days for every month???
# {}-01-01T00:00:00 .. {}-12-30T23:59:59
if len(self.month) > 0:
ds = ds.sel(time=ds.time.dt.month.isin(self.month))
ds_slice = ds.sel(time=slice("{}-01".format(self.begin),
"{}-12".format(self.end)),
lat=slice(lat_a,
lat_b)) # latitude
return ds_slice
[docs] def get_1980slice(self, model):
"""Function to select the slice for 1980 (reference year)
:param model: The model to process
:return: xarray dataset selected according to the time and latitude
:rtype: xarray
"""
ds = super().get_dataset(model)
# check in what order latitude is used, return them correspondently
lat_a, lat_b = self.__check_latitude_order(ds)
if len(self.month) > 0:
ds = ds.sel(time=ds.time.dt.month.isin(self.month))
ds_1980 = ds.sel(time=slice("1980-01", "1980-12"),
lat=slice(lat_a, lat_b)) # latitude
return ds_1980
[docs]class ProcessForTCO3(DataSelection):
"""Subclass of :class:`DataSelection` to calculate tco3_zm
"""
def __init__(self, **kwargs):
super().__init__(TCO3, **kwargs)
def __to_pd_series(self, ds, model):
"""Convert xarray to pandas series
:param ds: xarray dataset
:param model: The model to process for tco3_zm
:return dataset as pandas series
:rtype: pandas series
"""
# convert to pandas series to keep date information
if (type(ds.indexes[TIME]) is
pd.core.indexes.datetimes.DatetimeIndex) :
time_axis = ds.indexes[TIME].values
else:
time_axis = ds.indexes[TIME].to_datetimeindex()
curve = pd.Series(np.nan_to_num(ds[TCO3]),
index=pd.DatetimeIndex(time_axis),
name=model)
return curve
[docs] def get_raw_data(self, model):
"""Process the model to get tco3_zm raw data
:param model: The model to process for tco3_zm
:return: raw data points in preparation for plotting
:rtype: pandas series (pd.Series)
"""
# data selection according to time and latitude
ds_slice = super().get_dataslice(model)
ds_tco3 = ds_slice[[TCO3]].mean(dim=[LAT])
logger.debug("ds_tco3: {}".format(ds_tco3))
data = self.__to_pd_series(ds_tco3, model)
return data
[docs] def get_plot_data(self, model):
"""Plot tco3_zm data applying a smoothing function (boxcar)
:param model: The model to process for tco3_zm
:return: ready for plotting data
:rtype: pandas series (pd.Series)
"""
curve = self.get_raw_data(model)
time_axis = curve.index
curve_values = curve.values
boxcar_win = 3
boxcar = np.ones(boxcar_win)
boxcar_values = signal.convolve(curve_values,
boxcar,
mode='same')/np.sum(boxcar)
logger.debug("time_axis.shape = {}, boxcar_values.shape = {}"
.format(time_axis.shape, boxcar_values.shape))
boxcar_values[0] = curve_values[0]
boxcar_values[-1] = curve_values[-1]
#boxcar_values[1] = curve_values[1]
#boxcar_values[-2] = curve_values[-2]
logger.debug(F"boxcar_values[:5] : {boxcar_values[:5]}")
logger.debug(F"vs curve_values[:5]: {curve_values[:5]}")
logger.debug(F"boxcar_values[-5:] : {boxcar_values[-5:]}")
logger.debug(F"vs curve_values[-5:]: {curve_values[-5:]}")
boxcar_smooth = pd.Series(boxcar_values,
index=time_axis,
name=model)
return boxcar_smooth
[docs] def get_ref1980(self, model):
"""Process the model to get tco3_zm reference for 1980
:param model: The model to process for tco3_zm
:return: xarray dataset for 1980
:rtype: xarray
"""
# data selection according to 1980 and latitude
ds_slice = super().get_1980slice(model)
ds_tco3_1980 = ds_slice[[TCO3]].mean(dim=[LAT])
#logger.debug("ds_tco3_1980: {}".format(ds_tco3_1980.to_dataframe()))
ref1980 = ds_tco3_1980.to_dataframe().mean().values[0]
return ref1980
[docs]class ProcessForVMRO3(DataSelection):
"""Subclass of :class:`DataSelection` to calculate vmro3_zm
"""
def __init__(self, **kwargs):
super().__init__(VMRO3, **kwargs)
[docs] def get_plot_data(self, model):
"""Process the model to get vmro3_zm data for plotting
:param model: The model to process for vmro3_zm
:return: xarray dataset for plotting
:rtype: xarray
"""
# data selection according to time and latitude
ds_slice = super().get_dataslice(model)
# currently placeholder. another calculation might be needed
# 20-10-07 vkoz
ds_vmro3 = ds_slice[[VMRO3]]
logger.debug("ds_vmro3: {}".format(ds_vmro3))
return ds_vmro3.mean(dim=[LAT])
[docs]class ProcessForTCO3Return(DataSelection):
"""Subclass of :class:`DataSelection` to calculate tco3_return
"""
def __init__(self, **kwargs):
super().__init__(TCO3Return, **kwargs)
[docs] def get_plot_data(self, model):
"""Process the model to get tco3_return data for plotting
:param model: The model to process for tco3_return
:return: xarray dataset for plotting
:rtype: xarray
"""
# data selection according to time and latitude
ds_slice = super().get_dataslice(model)
# currently placeholder. another calculation might be needed
# 20-10-07 vkoz
ds_tco3_return = ds_slice[[TCO3Return]]
logger.debug("ds_tco3_return: {}".format(ds_tco3_return))
return ds_tco3_return.mean(dim=[LAT])