#!/usr/bin/env python3
#
# -*- coding: utf-8 -*-
#
# Copyright (c) 2017 - 2020 Karlsruhe Institute of Technology - Steinbuch Centre for Computing
# This code is distributed under the MIT License
# Please, see the LICENSE file
#
# @author: vykozlov
#
# Script to process selected data and
# return either PDF plot or JSON document.
# Used to build REST API.
#
## Ozone related information: ##
# time: index for time (e.g. hours since start time - 6 hourly spacing)
# lat: latitude index for geolocation
# level: index for pressure / altitude (e.g. hPa)
# t: temperature
# o3: ozone data
# tco3_zm: total column ozone, zonal mean
# ...
# ToDo: improve Error handling, that Errors are correctly returned by API
# e.g. raise OSError("no files to open")
import o3api.config as cfg
import o3api.plothelpers as phlp
import o3api.plots as o3plots
import logging
import matplotlib.style as mplstyle
mplstyle.use('fast') # faster?
import matplotlib.pyplot as plt
import numpy as np
import os
import pkg_resources
import pandas as pd
import re
import time
import yaml
# try to loader faster CLoader, if not fall into standard Loader
try:
from yaml import CLoader as Loader, CSafeLoader as SafeLoader, CDumper as Dumper
except ImportError:
from yaml import Loader, SafeLoader, Dumper
import cProfile
import io
import pstats
from flask import send_file
from flask import jsonify, make_response, request
from fpdf import FPDF, HTMLMixin
from functools import wraps
from io import BytesIO
from PyPDF3 import PdfFileMerger
# conigure python logger
logger = logging.getLogger('__name__') #o3api
logging.basicConfig(format='%(asctime)s [%(levelname)s]: %(message)s')
logger.setLevel(cfg.log_level)
## Authorization
from flaat import Flaat
flaat = Flaat()
# list of trusted OIDC providers
flaat.set_trusted_OP_list(cfg.trusted_OP_list)
# configuration for API
PTYPE = cfg.api_conf['plot_t']
MODELS = cfg.api_conf['models']
BEGIN = cfg.api_conf['begin']
END = cfg.api_conf['end']
MONTH = cfg.api_conf['month']
LAT_MIN = cfg.api_conf['lat_min']
LAT_MAX = cfg.api_conf['lat_max']
REF_MEAS = cfg.api_conf['ref_meas']
REF_YEAR = cfg.api_conf['ref_year']
TCO3 = cfg.netCDF_conf['tco3']
TCO3Return = cfg.netCDF_conf['tco3_r']
VMRO3 = cfg.netCDF_conf['vmro3']
# configuration for plotting
plot_c = cfg.plot_conf
PLOT_ST = cfg.plot_conf['plot_st']
def _profile(func):
"""Decorate function for profiling
"""
@wraps(func)
def wrapper(*args, **kwargs):
pr = cProfile.Profile()
pr.enable()
retval = func(*args, **kwargs)
pr.disable()
s = io.StringIO()
sortby = 'cumulative' #SortKey.CUMULATIVE # 'cumulative'
ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
ps.print_stats()
print(s.getvalue())
return retval
return wrapper
def _catch_error(f):
"""Decorate function to return an error, in case
"""
# In general, API should return what is requested, i.e.
# JSON -> JSON, PDF->PDF
@wraps(f)
def wrap(*args, **kwargs):
try:
return f(*args, **kwargs)
except Exception as e:
e_message = []
e_message.append({ 'status': 'Error',
'object': str(type(e)),
'message': '{}'.format(e)
})
logger.debug(e_message)
#raise BadRequest(e)
if request.headers['Accept'] == "application/pdf":
pdf = FPDF()
pdf.add_page()
pdf.set_font("Arial", size = 14)
for key, value in e_message[0].items():
pdf.write(18, txt = "{} : {}".format(key, value))
pdf.ln()
pdf_byte_str = pdf.output(dest='S').encode('latin-1')
buffer_resp = BytesIO(bytes(pdf_byte_str))
buffer_resp.seek(0)
response = make_response(send_file(buffer_resp,
as_attachment=True,
attachment_filename='Error.pdf',
mimetype='application/pdf'), 500)
else:
response = make_response(jsonify(e_message), 500)
logger.debug("Response: {}".format(dict(response.headers)))
return response
return wrap
def _timeit(func):
"""Measure time of the function
"""
@wraps(func)
def wrap(*args, **kwargs):
time_model = time.time()
f = func(*args, **kwargs)
time_described = time.time()
logger.info("[TIME] One model processed: {}".format(time_described -
time_model))
return f
return wrap
def __convert_plot_style(models_style, ptype):
"""Function to convert array of dictionaries with model:name to
dictionary with named by model elements
Example:
[{
model: name,
plotstyle: {}
},
...
]
to
{ name: {},
...
}
:param models_style: input array of dictionaries
:return: dictionary
"""
ckwargs = {}
for mi in models_style:
model = mi['model']
ckwargs[model] = {}
for k,v in mi[ptype][PLOT_ST].items():
par = k
if k in plot_c[ptype][PLOT_ST].keys():
par = plot_c[ptype][PLOT_ST][k]
ckwargs[model][par] = v
return ckwargs
def __dict_remove_elems(dict_in):
"""Function to remove dictionary elements containing E-Mail addresses
:param dict_in: input dictionary
:return: input dictionary where elements with E-Mail are removed
"""
# https://stackoverflow.com/questions/17681670/extract-email-sub-strings-from-large-document/17681902
keys_to_delete = []
re_email_string = re.compile(r'[\w\.-]+@[\w\.-]+\.\w+')
for key, value in dict_in.items():
re_match = re_email_string.search(str(value))
if re_match != None:
logger.debug(F"{key}:{value}, E-Mail: {re_match.group(0)}")
keys_to_delete.append(key)
if len(keys_to_delete) > 0:
for key in keys_to_delete:
del dict_in[key]
return dict_in
def __legalinfo_link(model):
# extract name of the original data-source
#data_source = model.split(cfg.O3AS_MODELNAME_SPLIT)[0]
#return (cfg.O3AS_LEGALINFO_URL + "#" + data_source)
return cfg.O3AS_LEGALINFO_URL
def __return_json(df, model, pfmt):
"""Function to return JSON
:param df: data (pandas.DataFrame) to process
:param model: model to process
:param pfmt: plot format (e.g. linecolor, marker)
:return: JSON with points (x,y)
"""
logger.debug(F"plotstyle: {pfmt}")
data = {'model': model,
'legalinfo': __legalinfo_link(model),
'x': df[model].dropna().index.map(str).tolist(),
'y': df[model].dropna().values.tolist(), #curve[model]
PLOT_ST: pfmt
}
return data
[docs]@_catch_error
def get_api_info():
"""Return information about the package
:return: The o3api package info
:rtype: dict
"""
module = __name__.split('.', 1)
pkg = pkg_resources.get_distribution(module[0])
meta = {
'name' : None,
'version' : None,
'summary' : None,
'home-page' : None,
'author' : None,
'author-email' : None,
'license' : None
}
iline = 0
top_lines = 10 # take only top 10 lines (otherwise may pick from content)
for line in pkg.get_metadata_lines("PKG-INFO"):
line_low = line.lower() # to avoid inconsistency due to letter cases
if iline < top_lines:
for par in meta:
if line_low.startswith(par.lower() + ":", 0):
_, value = line.split(": ", 1)
meta[par] = value
iline += 1
logger.debug(F"Found metadata: {meta}")
return meta
[docs]@_catch_error
def get_data_types():
"""Get list of plot types with available data"""
ptypes = []
possible_types = [TCO3, TCO3Return, VMRO3]
kwargs = {}
for t in possible_types:
kwargs[PTYPE] = t
models = get_models_list(**kwargs)
isdata = True if len(models) > 0 else False
ptypes.append(t) if isdata else ''
return ptypes
[docs]@_catch_error
def get_data_tco3_zm(*args, **kwargs):
"""Retrieve data to produce tco3_zm plot
:param kwargs: provided in the API call parameters
:return: JSON document with data points
"""
kwargs[PTYPE] = TCO3
kwargs[REF_MEAS] = cfg.O3AS_TCO3_REF_MEAS
kwargs[REF_YEAR] = cfg.O3AS_TCO3_REF_YEAR
kwargs[MODELS] = phlp.cleanse_models(**kwargs)
models = kwargs[MODELS]
data = o3plots.ProcessForTCO3(**kwargs)
tco3_data = data.get_raw_ensemble_pd(models)
models_style = get_plot_style(**kwargs)
ckwargs = __convert_plot_style(models_style, TCO3)
json_output = []
__json_append = json_output.append
[ __json_append(__return_json(tco3_data, m, ckwargs[m])) for m in models ]
response = json_output
return response
[docs]@_catch_error
def get_data_tco3_return(*args, **kwargs):
"""Retrieve data to produce tco3_return plot
:param kwargs: provided in the API call parameters
:return: JSON document with data points
"""
kwargs[PTYPE] = TCO3Return
kwargs[REF_MEAS] = cfg.O3AS_TCO3_REF_MEAS
kwargs[REF_YEAR] = cfg.O3AS_TCO3_REF_YEAR
kwargs[MODELS] = phlp.cleanse_models(**kwargs)
models = kwargs[MODELS]
data = o3plots.ProcessForTCO3(**kwargs)
tco3_data = data.get_raw_ensemble_pd(models)
models_style = get_plot_style(**kwargs)
ckwargs = __convert_plot_style(models_style, TCO3Return)
json_output = []
__json_append = json_output.append
[ __json_append(__return_json(tco3_data, m, ckwargs[m])) for m in models ]
response = json_output
return response
[docs]def get_data_vmro3_zm(*args, **kwargs):
"""Retrieve data to produce vmro3_zm plot
:param kwargs: provided in the API call parameters
:return: JSON document with data points
"""
pass
[docs]@_catch_error
def get_models_info():
"""Return dictionary of available models with the meta info
:return: The dictionary of available models
:rtype: dict
"""
models = []
plot_types = get_plot_types()
colors = ["black", "gray", "red", "chocolate",
"orange", "gold", "olive", "green",
"lime", "lightseagreen", "teal", "deepskyblue",
"navy", "blue", "purple", "magenta"]
line_styles = ["solid", "dotted", "dashed", "dashdot"]
markers = [".", "o", "+", "x", "v", "^", "s", "*", "D"]
m_counter = 0
list_dir = os.listdir(cfg.O3AS_DATA_BASEPATH)
list_dir.sort()
for mdir in list_dir:
m_path = os.path.join(cfg.O3AS_DATA_BASEPATH, mdir)
m_files = os.listdir(m_path)
if (os.path.isdir(m_path)) and any(".nc" in f for f in m_files):
meta = { 'model' : mdir,
'legalinfo': __legalinfo_link(mdir),
TCO3: {
"isdata": False,
PLOT_ST: {
'color': '',
'marker': '',
'linestyle': ''
}
},
TCO3Return: {
"isdata": False,
PLOT_ST: {
'color': '',
'marker': '',
'linestyle': ''
}
},
VMRO3: {
"isdata": False,
PLOT_ST: {
'color': '',
'marker':'',
'linestyle': ''
}
},
}
# inizialize with some colors
for pt in plot_types:
i_color = m_counter % len(colors)
meta[pt][PLOT_ST]["color"] = colors[i_color]
i_marker = m_counter % len(markers)
meta[pt][PLOT_ST]["marker"] = markers[i_marker]
i_style = m_counter % len(line_styles)
meta[pt][PLOT_ST]["linestyle"] = line_styles[i_style]
# update with the info from metadata.yaml's
if "metadata.yaml" in m_files:
# update meta from metadata.yaml, if available:
with open(os.path.join(m_path, "metadata.yaml"), "r") as stream:
meta_yaml = yaml.load(stream, Loader=SafeLoader)
# if plot info is in meta_yaml, update it in meta
for pt in plot_types:
if PLOT_ST in meta_yaml[pt].keys():
meta[pt][PLOT_ST].update(meta_yaml[pt][PLOT_ST])
# if colors are defined for TCO3 and not others, use the same
if PLOT_ST in meta_yaml[TCO3].keys():
if PLOT_ST not in meta_yaml[TCO3Return].keys():
meta[TCO3Return][PLOT_ST].update(meta[TCO3][PLOT_ST])
if PLOT_ST not in meta_yaml[VMRO3].keys():
meta[VMRO3][PLOT_ST].update(meta[TCO3][PLOT_ST])
# get model attrs. comment. another endpoint?
#data = o3plots.Dataset(TCO3, **kwargs)
#ds = data.get_dataset(mdir)
#model_info_dict = ds.to_dict(data=False)
#meta['attrs'] = model_info_dict['attrs']
for f in os.listdir(m_path):
if "tco3" in f:
meta[TCO3]['isdata'] = True
meta[TCO3Return]['isdata'] = True
if "vmro3" in f:
meta[VMRO3]['isdata'] = True
models.append(meta)
m_counter += 1
return models
[docs]@_catch_error
def get_models_list(*args, **kwargs):
"""Return the list of available Ozone models
:return: The list of available models
:rtype: list
"""
models_list = []
models_info = get_models_info()
if PTYPE in kwargs:
ptype = kwargs[PTYPE]
for m in models_info:
if m[ptype]['isdata']:
models_list.append(m['model'])
else:
models_list = [ m['model'] for m in models_info ]
if 'select' in kwargs:
pattern = kwargs['select'].lower()
models_list = [ m for m in models_list if pattern in m.lower() ]
models_list.sort()
return models_list
[docs]@_catch_error
def get_plot_style(*args, **kwargs):
"""Returning plot style for selected models and plot type
"""
models_info = get_models_info()
plots_format = []
if MODELS in kwargs:
models = phlp.cleanse_models(**kwargs)
else:
models = [ m['model'] for m in models_info ]
# if "models = []", i.e. empty list, get all available models instead
if len(models) < 1:
models = [ m['model'] for m in models_info ]
if PTYPE in kwargs:
plot_types = [kwargs[PTYPE]]
else:
plot_types = get_data_types()
for m in models_info:
pfmt = {}
if m['model'] in models:
pfmt['model'] = m['model']
for pt in plot_types:
pfmt[pt] = {}
pfmt[pt][PLOT_ST] = m[pt][PLOT_ST]
plots_format.append(pfmt)
return plots_format
[docs]@_catch_error
def get_model_detail(*args, **kwargs):
"""Return information about the Ozone model
:return: Info about the Ozone model
:rtype: dict
"""
model = kwargs['model'].lstrip().rstrip()
models_info = get_models_info()
model_info_dict = {}
for m in models_info:
if m['model'] == model:
model_info_dict = m
plot_types = get_data_types()
for pt in plot_types:
if model_info_dict[pt]['isdata']:
# create dataset according to the plot type (tco3_zm, vmro3_zm, etc)
data = o3plots.Dataset(pt, **kwargs)
ds = data.get_dataset(model)
model_info_dict[pt]['original_metadata'] = ds.to_dict(data=False)
model_info_dict[pt]['original_metadata']['attrs'] = (
__dict_remove_elems(model_info_dict[pt]['original_metadata']['attrs']))
logger.debug(F"{model} model info: {model_info_dict}")
return model_info_dict
[docs]def get_plot_types():
"""Get list of the provided plot methods"""
plots = [ TCO3, TCO3Return] #, VMRO3 ]
return plots
#@_profile
[docs]@_catch_error
def plot_tco3_zm(*args, **kwargs):
"""Plot tco3_zm
:param kwargs: provided in the API call parameters
:return: Either PDF plot or JSON document
"""
time_start = time.time()
kwargs[PTYPE] = TCO3
kwargs[MODELS] = phlp.cleanse_models(**kwargs)
models_style = get_plot_style(**kwargs)
# define plot styling for more curves (reference, mean, median)
models_stats_style = [{ 'model': 'reference_value',
TCO3: { PLOT_ST: {'color': 'black',
'linestyle': 'dashed',
'label': ('Reference value' +
' (' +
str(kwargs['ref_year']) +
')')}
}
},
{ 'model': 'MMMean',
TCO3: { PLOT_ST: {'color': 'green',
'linestyle': 'solid',
'linewidth': 4 }
}
},
{ 'model': 'MMMean-Std',
TCO3: { PLOT_ST: {'color': 'green',
'linestyle': 'dotted',
'linewidth': 1 }
}
},
{ 'model': 'MMMean+Std',
TCO3: { PLOT_ST: {'color': 'green',
'linestyle': 'dotted',
'linewidth': 1 }
}
},
{ 'model': 'MMMedian',
TCO3: { PLOT_ST: {'color': 'blue',
'linestyle': 'dotted',
'linewidth': 4 }
}
},
]
models_style.extend(models_stats_style)
ckwargs = __convert_plot_style(models_style, TCO3)
# show lines, no marker, except REF_MEAS
for model in ckwargs.keys():
if model != kwargs[REF_MEAS]:
ckwargs[model]['marker'] = ''
data = o3plots.ProcessForTCO3(**kwargs)
plot_data = data.get_ensemble_for_plot(kwargs[MODELS])
logger.info(
"[TIME] Time to prepare data for plotting: {}".format(time.time() -
time_start))
if request.headers['Accept'] == "application/pdf":
response = plot(plot_data, ckwargs, **kwargs)
else:
response = plot_json(plot_data, ckwargs, **kwargs)
logger.info(
"[TIME] Total time from getting the request: {}".format(time.time() -
time_start))
return response
[docs]@_catch_error
def plot_tco3_return(*args, **kwargs):
"""Plot tco3_return
:param kwargs: provided in the API call parameters
:return: Either PDF plot or JSON document
"""
time_start = time.time()
kwargs[PTYPE] = TCO3Return
kwargs[MODELS] = phlp.cleanse_models(**kwargs)
kwargs[BEGIN] = cfg.O3AS_TCO3Return_BEGIN_YEAR
kwargs[END] = cfg.O3AS_TCO3Return_END_YEAR
user_month = kwargs[MONTH]
user_lat_min = kwargs[LAT_MIN]
user_lat_max = kwargs[LAT_MAX]
# initialize an empty pd.DataFrame
plot_data = pd.DataFrame()
# First draw pre-defined regions
for r,p in cfg.tco3_return_regions.items():
kwargs['region'] = r
kwargs.update(p)
#kwargs['lat_min'] = p['lat_min']
#kwargs['lat_max'] = p['lat_max']
if MONTH not in p.keys():
kwargs[MONTH] = ''
data = o3plots.ProcessForTCO3Return(**kwargs)
data_return = data.get_ensemble_for_plot(kwargs[MODELS])
plot_data = plot_data.append(data_return)
# Then draw the user-defined region
kwargs['region'] = 'User region'
kwargs[MONTH] = user_month
kwargs[LAT_MIN] = user_lat_min
kwargs[LAT_MAX] = user_lat_max
#('User region (' + str(kwargs['lat_min']) + ', ' + str(kwargs['lat_max']) + ')')
data = o3plots.ProcessForTCO3Return(**kwargs)
plot_data = plot_data.append(data.get_ensemble_for_plot(kwargs[MODELS]))
# define plot styling for the mean
models_style = get_plot_style(**kwargs)
models_stats_style = [ { 'model': 'MMMean',
TCO3Return: { PLOT_ST: {'marker': '^',
'color': 'red',
'markersize': 14,
'mfc': 'none'}
}
},
{ 'model': 'MMMean-Std',
TCO3Return: { PLOT_ST: {'marker': '_',
'color': 'red',
'markersize': 10,
'mfc': 'none'}
}
},
{ 'model': 'MMMean+Std',
TCO3Return: { PLOT_ST: {'marker': '_',
'color': 'red',
'markersize': 10,
'mfc': 'none'}
}
},
{ 'model': 'MMMedian',
TCO3Return: { PLOT_ST: {'marker': 'o',
'color': 'blue',
'markersize': 10,
'mfc': 'none'}
}
}
]
models_style.extend(models_stats_style)
ckwargs = __convert_plot_style(models_style, TCO3Return)
# show markers, no lines
for model in ckwargs.keys():
ckwargs[model]['linestyle'] = 'none'
if request.headers['Accept'] == "application/pdf":
# update MMMean plotstyle to plot with error bars
cols = plot_data.columns
mmmean_yerr = [ 0., 0.]
if 'MMMean-Std' in cols and 'MMMean+Std' in cols:
mmmean_yerr[0] = (plot_data['MMMean'] - plot_data['MMMean+Std'])
mmmean_yerr[1] = (plot_data['MMMean-Std'] - plot_data['MMMean'])
ckwargs['MMMean']['capsize'] = 6
ckwargs['MMMean']['yerr'] = mmmean_yerr
response = plot(plot_data, ckwargs, **kwargs)
else:
response = plot_json(plot_data, ckwargs, **kwargs)
logger.info(
"[TIME] Total time from getting the request: {}".format(time.time() -
time_start))
return response
[docs]@_catch_error
def plot_vmro3_zm(*args, **kwargs):
"""Plot vmro3_zm
:param kwargs: provided in the API call parameters
:return: Either PDF plot or JSON document
"""
kwargs[PTYPE] = "vmro3_zm"
data = None
models_info = get_models_info()
ckwargs = None
response = plot(data, ckwargs, **kwargs)
return response
#@_profile
#@flaat.login_required() # Require only authorized people to call the function
[docs]def plot(data, ckwargs, **kwargs):
"""Main plotting routine
:param data: data to plot
:param ckwargs: dictionary for curve plotting (e.g. color, style)
:param kwargs: provided in the API call parameters
:return: PDF plot
"""
plot_type = kwargs[PTYPE]
# update the list of models as columns from pd.DataFrame
# since additional columns can be added, e.g. 'reference_year', 'mean' etc
models = data.columns
logger.debug(F"headers: {dict(request.headers)}")
logger.debug(F"kwargs: {kwargs}")
def __return_plot(df, model):
"""Function to draw the plot
:param df: data (pandas.DataFrame) to process
:param model: model to process
"""
if model != 'MMMean-Std' and model != 'MMMean+Std':
df[model].plot(**ckwargs[model]) #.dropna()
figure_file = phlp.set_filename(**kwargs) + ".pdf"
fig = plt.figure(num=1, figsize=(plot_c[plot_type]['fig_size']),
dpi=150, facecolor='w',
edgecolor='k')
[ __return_plot(data, m) for m in models ]
if plot_type == TCO3:
if 'MMMean-Std' in models and 'MMMean+Std' in models:
plt.fill_between(data.index,
data['MMMean-Std'],
data['MMMean+Std'],
color='green', alpha=0.2)
phlp.set_figure_attr(fig, **kwargs)
# create the figure
buffer_plot = BytesIO() # store in IO buffer, not a file
fig_upd = plt.gcf()
fig_upd.set_figwidth(plot_c[plot_type]['fig_size'][0], forward=True)
plt.savefig(buffer_plot, format='pdf', bbox_inches='tight')
plt.close()
buffer_plot.seek(0)
# create the metadata page + legal info
buffer_meta = BytesIO() # store in IO buffer, not a file
# Instantiation of inherited class
class infoFPDF(FPDF, HTMLMixin):
pass
pdf = infoFPDF('P', 'mm', 'A4')
pdf.add_page()
pdf.set_font('Times', '', 12)
info_html = phlp.get_plot_info_html(**kwargs)
pdf.write_html(info_html)
pdf_output = pdf.output(dest='S')
buffer_meta.write(pdf_output.encode('latin-1')) # 'utf-8'
buffer_meta.seek(0)
# merge two pages in one PDF, add PDF meta
merger = PdfFileMerger()
merger.append(buffer_plot)
merger.append(buffer_meta)
merger.addMetadata({
'/Creator': cfg.O3AS_MAIN_URL,
'/Title': plot_c[plot_type]['ylabel'],
'/Subject': plot_type + ' generated with ' + cfg.O3AS_MAIN_URL
})
buffer_out = BytesIO()
merger.write(buffer_out)
buffer_out.seek(0)
response = send_file(buffer_out,
as_attachment=True,
attachment_filename=figure_file,
mimetype='application/pdf')
return response
[docs]def plot_json(data, ckwargs, **kwargs):
"""Plotting routine returning JSON points
:param data: data ready for plotting
:param ckwargs: dictionary for curve plotting (e.g. color, style)
:param kwargs: provided in the API call parameters
:return: JSON document with data points and styles for plotting
"""
plot_type = kwargs[PTYPE]
# update the list of models as columns from pd.DataFrame
# as additional columns can be added, e.g. 'reference_year', 'mean' etc
models = data.columns
logger.debug(F"headers: {dict(request.headers)}")
logger.debug(F"kwargs: {kwargs}")
models = data.columns
json_output = []
__json_append = json_output.append
[ __json_append(__return_json(data, m, ckwargs[m])) for m in models ]
return json_output