""" OOI Object """
import datetime
import logging
import os
import re
import threading
import time
import warnings
from io import StringIO
from queue import Queue
import gevent
import pandas as pd
import pytz
import requests
import s3fs
import urllib3
import xarray as xr
from dateutil import parser
from lxml.html import fromstring as html_parser
from yodapy.datasources.ooi.CAVA import CAVA
from yodapy.datasources.ooi.helpers import set_thread
from yodapy.utils.conn import (
download_url,
fetch_url,
fetch_xr,
get_download_urls,
instrument_to_query,
perform_ek60_download,
perform_ek60_processing,
)
from yodapy.utils.files import CREDENTIALS_FILE
from yodapy.utils.meta import delete_all_cache
from yodapy.utils.parser import (
get_instrument_list,
get_nc_urls,
parse_annotations_json,
parse_deployments_json,
parse_global_range_dataframe,
parse_parameter_streams_dataframe,
parse_raw_data_catalog,
parse_streams_dataframe,
parse_toc_instruments,
unix_time_millis,
)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logging.basicConfig(
level=logging.INFO, format="(%(threadName)-10s) %(message)s"
)
logger = logging.getLogger(__name__)
print_lock = threading.Lock()
DATA_TEAM_GITHUB_INFRASTRUCTURE = "https://raw.githubusercontent.com/ooi-data-review/datateam-portal-backend/master/infrastructure"
FILE_SYSTEM = s3fs.S3FileSystem(anon=True)
BUCKET_DATA = "io2data/data"
[docs]class OOI(CAVA):
"""OOI Object for Ocean Observatories Initiative Data Retrieval.
Attributes:
ooi_name (str): Username for OOI API Data Access.
ooi_token (str): Token for OOI API Data Access.
source_name (str): Data source name.
regions (pandas.DataFrame): Table of OOI regions.
sites (pandas.DataFrame): Table of OOI sites.
instruments (pandas.DataFrame): Table of available instrument streams.
global_ranges (pandas.DataFrame): Table of global ranges for each instrument streams.
deployments (pandas.DataFrame): Table of deployments for filtered instrument streams.
annotations (pandas.DataFrame): Table of annotations for filtered instrument streams.
start_date (list): List of start dates requested.
end_date (list): List of end dates requested.
last_request (list): List of requested urls and parameters.
last_m2m_urls (list): List of requested M2M urls.
cava_arrays (pandas.DataFrame): Cabled array team Arrays vocab table.
cava_sites (pandas.DataFrame): Cabled array team Sites vocab table.
cava_infrastructures (pandas.DataFrame): Cabled array team Infrastructures vocab table.
cava_instruments (pandas.DataFrame): Cabled array team Instruments vocab table.
cava_parameters (pandas.DataFrame): Cabled array team Parameters vocab table.
"""
def __init__(
self, ooi_username=None, ooi_token=None, cloud_source=False, **kwargs
):
super().__init__()
self._source_name = "OOI"
self._start_date = None
self._end_date = None
# Private global variables
self._OOI_M2M_VOCAB = (
"https://ooinet.oceanobservatories.org/api/m2m/12586/vocab"
)
self._OOI_M2M_TOC = "https://ooinet.oceanobservatories.org/api/m2m/12576/sensor/inv/toc"
self._OOI_M2M_STREAMS = (
"https://ooinet.oceanobservatories.org/api/m2m/12575/stream"
)
self._OOI_DATA_URL = (
"https://ooinet.oceanobservatories.org/api/m2m/12576/sensor/inv"
)
self._OOI_M2M_ANNOTATIONS = (
"https://ooinet.oceanobservatories.org/api/m2m/12580/anno/find"
)
self._OOI_M2M_DEPLOYMENT_QUERY = "https://ooinet.oceanobservatories.org/api/m2m/12587/events/deployment/query"
# From visualocean
self._OOI_VISUALOCEAN_M_STATS = (
"https://ooi-visualocean.whoi.edu/instruments/stats-monthly"
)
self._OOI_GLOBAL_RANGE = "https://raw.githubusercontent.com/ooi-integration/qc-lookup/master/data_qc_global_range_values.csv"
# From GitHub
self._OOI_PORTAL_REGIONS = (
f"{DATA_TEAM_GITHUB_INFRASTRUCTURE}/regions.csv"
)
self._OOI_PORTAL_SITES = f"{DATA_TEAM_GITHUB_INFRASTRUCTURE}/sites.csv"
# Not used
# self._OOI_VOCAB = 'https://raw.githubusercontent.com/ooi-integration/asset-management/master/vocab/vocab.csv'
self._regions = None
self._sites = None
# User inputs
self.ooi_username = ooi_username
self.ooi_token = ooi_token
# Private cache variables
self._rvocab = None
self._rglobal_range = None
self._rstreams = None
self._rtoc = None
self._raw_datadf = None
self._raw_data_url = None
# For bio-acoustic sonar
self._zplsc_data_catalog = None
self._raw_file_dict = None
self._data_type = None
self._current_data_catalog = None
self._filtered_data_catalog = None
self._q = None
self._raw_data = []
self._dataset_list = []
self._netcdf_urls = []
# Cloud copy
self._s3content = None
self._cloud_source = cloud_source
# ----------- Session Configs ---------------------
self._session = requests.Session()
self._pool_connections = kwargs.get("pool_connections", 100)
self._pool_maxsize = kwargs.get("pool_maxsize", 100)
self._adapter = requests.adapters.HTTPAdapter(
pool_connections=self._pool_connections,
pool_maxsize=self._pool_maxsize,
)
self._session.mount("https://", self._adapter)
self._session.verify = False
# --------------------------------------------------
self._request_urls = None
self._last_m2m_urls = []
self._last_download_list = None
self._last_downloaded_netcdfs = None
self._thread_list = []
self._setup()
@property
def regions(self):
""" Returns the OOI regions """
if not isinstance(self._regions, pd.DataFrame):
try:
self._regions = pd.read_csv(self._OOI_PORTAL_REGIONS).rename(
{
"reference_designator": "array_rd",
"name": "region_name",
},
axis="columns",
)
except Exception as e:
logger.error(e)
return self._regions
@property
def sites(self):
""" Returns the OOI sites """
if not isinstance(self._sites, pd.DataFrame):
try:
self._sites = (
pd.read_csv(self._OOI_PORTAL_SITES)
.dropna(subset=["longitude", "latitude"]) # noqa
.rename(
{
"reference_designator": "site_rd",
"name": "site_name",
},
axis="columns",
)
)
except Exception as e:
logger.error(e)
return self._sites
@property
def instruments(self):
def threads_alive(t):
return not t.is_alive()
if all(list(map(threads_alive, self._thread_list))):
""" Returns instruments dataframe """
if isinstance(self._filtered_data_catalog, pd.DataFrame):
return get_instrument_list(self._filtered_data_catalog)
if isinstance(self._current_data_catalog, pd.DataFrame):
return get_instrument_list(self._current_data_catalog)
else:
message = "Please wait while we fetch the metadata ..."
logger.info(message)
@property
def deployments(self):
""" Return instruments deployments """
instrument_list = self._current_data_catalog
if isinstance(self._filtered_data_catalog, pd.DataFrame):
instrument_list = self._filtered_data_catalog
if len(instrument_list) <= 50:
text = f"Fetching deployments from {len(instrument_list)} unique instrument streams..." # noqa
print(text) # noqa
logger.info(text)
dflist = [
self._get_deployments(inst)
for idx, inst in instrument_list.iterrows()
] # noqa
return pd.concat(dflist).reset_index(drop="index")
else:
raise Exception(
f"You have {len(instrument_list)} unique streams; too many to fetch deployments. Please filter by performing search."
) # noqa
@property
def annotations(self):
""" Return instruments annotations """
instrument_list = self._current_data_catalog
if isinstance(self._filtered_data_catalog, pd.DataFrame):
instrument_list = self._filtered_data_catalog
if len(instrument_list) <= 20:
text = f"Fetching annotations from {len(instrument_list)} unique instrument streams..." # noqa
print(text) # noqa
logger.info(text)
dflist = [
self._get_annotations(inst)
for idx, inst in instrument_list.iterrows()
] # noqa
return pd.concat(dflist).reset_index(drop="index")
else:
raise Exception(
f"You have {len(instrument_list)} unique streams; too many to fetch annotations. Please filter by performing search."
) # noqa
@property
def start_date(self):
""" Return requested start date(s) """
if isinstance(self._start_date, pd.Series):
return self._start_date
return "Start date(s) can't be found."
@property
def end_date(self):
""" Return requested end date(s) """
if isinstance(self._end_date, pd.Series):
return self._end_date
return "End date(s) can't be found."
@property
def source_name(self):
""" Return data source name """
return self._source_name
@property
def last_requests(self):
""" Return last request url and parameters """
if self._request_urls:
return self._request_urls
return "Data request has not been made."
@property
def last_m2m_urls(self):
""" Return last request m2m urls """
if self._last_m2m_urls:
return self._last_m2m_urls
return "Data request has not been made."
@property
def global_ranges(self):
""" Return global ranges """
return self._get_global_ranges()
[docs] def view_instruments(self):
"""
**DEPRECATED.**
Shows the current instruments requested.
Use OOI.instruments attribute instead.
Returns:
DataFrame: Pandas dataframe of the instruments.
"""
warnings.warn(
"The function view_instruments is deprecated. Please use OOI.instruments attribute instead.",
DeprecationWarning,
stacklevel=2,
)
return self.instruments
[docs] def view_regions(self):
"""
**DEPRECATED.**
Shows the regions within OOI.
Use OOI.regions attribute instead.
Returns:
DataFrame: Pandas dataframe of the regions.
"""
warnings.warn(
"The function view_regions is deprecated. Please use OOI.regions attribute instead.",
DeprecationWarning,
stacklevel=2,
)
return self.regions
[docs] def view_sites(self):
"""
**DEPRECATED.**
Shows the sites within OOI.
Use OOI.sites attribute instead.
Returns:
DataFrame: Pandas dataframe of the sites.
"""
warnings.warn(
"The function view_sites is deprecated. Please use OOI.sites attribute instead.",
DeprecationWarning,
stacklevel=2,
)
return self.sites
def __repr__(self):
""" Prints out the representation of the OOI object """
inst_text = "Instrument Stream"
if isinstance(self._current_data_catalog, pd.DataFrame):
data_length = len(
self._current_data_catalog.drop_duplicates(
subset=[
"reference_designator",
"stream_method",
"stream_rd",
]
)
)
else:
data_length = 0
if isinstance(self._filtered_data_catalog, pd.DataFrame):
data_length = len(
self._filtered_data_catalog.drop_duplicates(
subset=[
"reference_designator",
"stream_method",
"stream_rd",
]
)
)
if data_length > 1:
inst_text = inst_text + "s"
return (
f"<Data Source: {self._source_name} ({data_length} {inst_text})>"
) # noqa
def __len__(self):
""" Prints the length of the object """
if isinstance(self._filtered_data_catalog, pd.DataFrame):
return len(
self._filtered_data_catalog.drop_duplicates(
subset=[
"reference_designator",
"stream_method",
"stream_rd",
]
)
)
else:
return 0
def _setup(self):
""" Setup the OOI Instance by fetching data catalog ahead of time """
logger.debug("Setting UFrame credentials.")
if not self.ooi_username or not self.ooi_token:
self._use_existing_credentials()
# Check if ooinet is available
try:
req = requests.get("https://ooinet.oceanobservatories.org")
if req.status_code == 200:
threads = [
("get-data-catalog", self._get_data_catalog),
("get-global-ranges", self._get_global_ranges),
("get-rawdata-filelist", self._get_rawdata_filelist),
] # noqa
for t in threads:
ft = set_thread(*t)
self._thread_list.append(ft)
else:
logger.warning(
f"Server not available, please try again later: {req.status_code}"
)
except Exception as e:
logger.error(f"Server not available, please try again later: {e}")
# Retrieve datasets info in the s3 bucket.
try:
self._s3content = [
os.path.basename(rd) for rd in FILE_SYSTEM.ls(BUCKET_DATA)
]
except Exception as e:
logger.error(e)
[docs] def clear_cache(self):
# TODO: This should also delete netcdf urls from Uframe!
delete_all_cache(self._source_name)
[docs] def request_data(
self, begin_date, end_date, data_type="netcdf", limit=-1, **kwargs
):
"""
Request data for filtered instruments.
Args:
begin_date (str): Begin date of desired data in ISO-8601 Format.
end_date (str): End date of desired data in ISO-8601 Format.
data_type (str): Desired data type. Either 'netcdf' or 'json'.
limit (int, optional): Desired data points. Required for 'json' ``data_type``. Max is 20000.
**kwargs: Optional Keyword arguments. \n
**time_check** - set to true (default) to ensure the request times fall within the stream data availability \n
**exec_dpa** - boolean value specifying whether to execute all data product algorithms to return L1/L2 parameters (Default is True) \n
**provenance** - boolean value specifying whether provenance information should be included in the data set (Default is True) \n
**email** - provide email.
Returns:
self: Modified OOI Object. Use ``raw()`` to see either data url for netcdf or json result for json.
"""
self._data_type = data_type
begin_dates = list(map(lambda x: x.strip(" "), begin_date.split(",")))
end_dates = list(map(lambda x: x.strip(" "), end_date.split(",")))
data_catalog_copy = self._filtered_data_catalog.copy()
self._q = Queue()
# Limit the number of request
if len(data_catalog_copy) > 6:
text = f"Too many instruments to request data for! Max is 6, you have {len(data_catalog_copy)}" # noqa
logger.error(text)
raise Exception(text)
if len(begin_dates) == 1 and len(end_dates) == 1:
begin_dates = begin_dates[0]
end_dates = end_dates[0]
elif len(begin_dates) != len(end_dates):
logger.warning(
"Please provide the same number of begin and end dates"
)
raise ValueError(
"Please provide the same number of begin and end dates"
)
else:
begin_dates = pd.Series(begin_dates)
end_dates = pd.Series(end_dates)
self._start_date = (begin_date,)
self._end_date = end_dates
request_urls = []
if self._cloud_source:
data_catalog_copy.loc[:, "user_begin"] = pd.to_datetime(
begin_dates
)
data_catalog_copy.loc[:, "user_end"] = pd.to_datetime(end_dates)
data_catalog_copy.loc[:, "full_rd"] = data_catalog_copy.apply(
lambda row: "-".join(
[
row["reference_designator"],
row["stream_method"],
row["stream_rd"],
]
),
axis=1,
)
data_catalog_copy.loc[:, "rd_path"] = data_catalog_copy[
"full_rd"
].apply(lambda row: "/".join([BUCKET_DATA, row]))
request_urls = data_catalog_copy["rd_path"].values.tolist()
for idx, row in data_catalog_copy.iterrows():
tempdf = pd.DataFrame(
FILE_SYSTEM.ls(row["rd_path"]), columns=["uri"]
)
tempdf.loc[:, "time"] = tempdf.apply(
lambda r: pd.to_datetime(os.path.basename(r["uri"])),
axis=1,
)
selected = tempdf[
(tempdf.time >= row["user_begin"])
& (tempdf.time <= row["user_end"])
]
if len(selected) > 0:
self._q.put([selected, row["user_begin"], row["user_end"]])
else:
data_catalog_copy["user_begin"] = begin_dates
data_catalog_copy["user_end"] = end_dates
# For bio-acoustic sonar only
self._zplsc_data_catalog = data_catalog_copy[
data_catalog_copy.instrument_name.str.contains(
"bio-acoustic sonar", case=False
)
]
data_catalog_copy = data_catalog_copy[
~data_catalog_copy.instrument_name.str.contains(
"bio-acoustic sonar", case=False
)
]
if len(data_catalog_copy) > 0:
request_urls = [
instrument_to_query(
ooi_url=self._OOI_DATA_URL,
site_rd=row.site_rd,
infrastructure_rd=row.infrastructure_rd,
instrument_rd=row.instrument_rd,
stream_method=row.stream_method,
stream_rd=row.stream_rd,
begin_ts=row.user_begin,
end_ts=row.user_end,
stream_start=row.begin_date,
stream_end=row.end_date,
application_type=data_type,
limit=limit,
**kwargs,
)
for idx, row in data_catalog_copy.iterrows()
]
prepared_requests = [
requests.Request(
"GET",
data_url,
auth=(self.ooi_username, self.ooi_token),
params=params,
)
for data_url, params in request_urls
] # noqa
for job in prepared_requests:
prepped = job.prepare()
self._last_m2m_urls.append(prepped.url)
self._q.put(prepped)
if len(self._raw_data) > 0:
self._raw_data = []
self._process_request()
# block until all tasks are done
self._q.join()
if isinstance(self._zplsc_data_catalog, pd.DataFrame):
if len(self._zplsc_data_catalog) > 0:
self._zplsc_data_catalog.loc[
:, "ref"
] = self._zplsc_data_catalog.reference_designator.apply(
lambda rd: rd[:14]
)
filtered_datadf = {}
for idx, row in self._zplsc_data_catalog.iterrows():
fullref = "-".join(
[
row["reference_designator"],
row["stream_method"],
row["stream_rd"],
]
)
filtered_datadf[fullref] = self._raw_datadf[row["ref"]][
row["user_begin"] : row["user_end"]
].copy()
filtered_rawdata = filtered_datadf[fullref]
filtered_rawdata.loc[
:, "urls"
] = filtered_rawdata.filename.apply(
lambda f: "/".join([self._raw_data_url[row["ref"]], f])
)
raw_file_dict = perform_ek60_download(filtered_datadf)
self._raw_file_dict = raw_file_dict
self._raw_data.append(raw_file_dict)
self._request_urls = request_urls
return self
[docs] def search(
self,
region=None,
site=None,
node=None,
instrument=None,
stream_type="Science",
stream_method=None,
stream=None,
parameter=None,
):
"""
Perform a search, and filters data catalog
Args:
region (str): Region name. If multiple use comma separated.
site (str): Site name. If multiple use comma separated.
node (str): Node name. If multiple use comma separated.
instrument (str): Instrument name. If multiple use comma separated.
stream_type (str): Stream type. Either 'Science' or 'Engineering'. If multiple use comma separated.
stream_method (str): Stream method. If multiple use comma separated.
stream (str): Stream name. If multiple use comma separated.
parameter (str): Parameter name. If multiple use comma separated.
Returns:
self: Modified OOI Object
"""
if isinstance(self._current_data_catalog, pd.DataFrame):
current_dcat = self._current_data_catalog
else:
current_dcat = self._get_data_catalog()
self._current_data_catalog = current_dcat
if self._cloud_source:
current_dcat = current_dcat[
current_dcat.apply(
lambda row: "-".join(
[
row["reference_designator"],
row["stream_method"],
row["stream_rd"],
]
)
in self._s3content,
axis=1,
)
].reset_index(drop="index")
if region:
region_search = list(
map(lambda x: x.strip(" "), region.split(","))
) # noqa
current_dcat = current_dcat[
current_dcat.array_name.astype(str).str.contains(
"|".join(region_search), flags=re.IGNORECASE
)
| current_dcat.site_rd.astype(str).str.contains(
"|".join(region_search), flags=re.IGNORECASE
)
| current_dcat.reference_designator.astype(str).str.contains(
"|".join(region_search), flags=re.IGNORECASE
)
] # noqa
if site:
site_search = list(
map(lambda x: x.strip(" "), site.split(","))
) # noqa
current_dcat = current_dcat[
current_dcat.site_name.astype(str).str.contains(
"|".join(site_search), flags=re.IGNORECASE
)
| current_dcat.site_rd.astype(str).str.contains(
"|".join(site_search), flags=re.IGNORECASE
)
| current_dcat.reference_designator.astype(str).str.contains(
"|".join(site_search), flags=re.IGNORECASE
)
] # noqa
if node:
node_search = list(
map(lambda x: x.strip(" "), node.split(","))
) # noqa
current_dcat = current_dcat[
current_dcat.infrastructure_name.astype(str).str.contains(
"|".join(node_search), flags=re.IGNORECASE
)
| current_dcat.infrastructure_rd.astype(str).str.contains(
"|".join(node_search), flags=re.IGNORECASE
)
| current_dcat.reference_designator.astype(str).str.contains(
"|".join(node_search), flags=re.IGNORECASE
)
] # noqa
if instrument:
instrument_search = list(
map(lambda x: x.strip(" "), instrument.split(","))
) # noqa
current_dcat = current_dcat[
current_dcat.instrument_name.astype(str).str.contains(
"|".join(instrument_search), flags=re.IGNORECASE
)
| current_dcat.instrument_rd.astype(str).str.contains(
"|".join(instrument_search), flags=re.IGNORECASE
)
| current_dcat.reference_designator.astype(str).str.contains(
"|".join(instrument_search), flags=re.IGNORECASE
)
] # noqa
if parameter:
parameter_search = list(
map(lambda x: x.strip(" "), parameter.split(","))
) # noqa
current_dcat = current_dcat[
current_dcat.display_name.astype(str).str.contains(
"|".join(parameter_search), flags=re.IGNORECASE
)
| current_dcat.parameter_rd.astype(str).str.contains(
"|".join(parameter_search), flags=re.IGNORECASE
)
] # noqa
if stream_type:
stream_type_search = list(
map(lambda x: x.strip(" "), stream_type.split(","))
) # noqa
current_dcat = current_dcat[
current_dcat.stream_type.astype(str).str.contains(
"|".join(stream_type_search), flags=re.IGNORECASE
)
] # noqa
if stream_method:
stream_method_search = list(
map(lambda x: x.strip(" "), stream_method.split(","))
) # noqa
current_dcat = current_dcat[
current_dcat.stream_method.astype(str).str.contains(
"|".join(stream_method_search), flags=re.IGNORECASE
)
] # noqa
if stream:
stream_search = list(
map(lambda x: x.strip(" "), stream.split(","))
) # noqa
current_dcat = current_dcat[
current_dcat.stream_rd.astype(str).str.contains(
"|".join(stream_search), flags=re.IGNORECASE
)
] # noqa
self._filtered_data_catalog = current_dcat.drop_duplicates(
subset=["reference_designator", "stream_method", "stream_rd"]
)[
[
"array_name",
"site_name",
"infrastructure_name",
"instrument_name",
"site_rd",
"infrastructure_rd",
"instrument_rd",
"reference_designator",
"stream_method",
"stream_type",
"stream_rd",
"begin_date",
"end_date",
]
].reset_index(
drop="index"
)
return self
[docs] def clear(self):
"""
Clears the search filter.
Returns:
self: Modified OOI Object
"""
if isinstance(self._filtered_data_catalog, pd.DataFrame):
self._filtered_data_catalog = None
return self
[docs] def raw(self):
""" Returns the raw result from data request in json format """
return self._raw_data
[docs] def download_netcdfs(self, destination=os.path.curdir, timeout=3600):
"""
Download netcdf files from the catalog created from data request.
Args:
destination (str, optional): Location to save netcdf file. Default will save in current directory.
timeout (int, optional): Expected download time before timing out in seconds. Defaults to 30min or 3600s.
Returns:
list: List of exported netcdf.
"""
if not isinstance(timeout, int):
raise TypeError(f"Expected int; {type(int)} given.")
download_list = self._prepare_download()
logger.info("Downloading netcdfs ...")
jobs = [
gevent.spawn(download_url, url, destination, self._session)
for url in download_list
]
gevent.joinall(jobs, timeout=timeout)
finished_netcdfs = [job.value for job in jobs]
if finished_netcdfs:
self._last_downloaded_netcdfs = [
os.path.join(os.path.abspath(destination), nc)
for nc in finished_netcdfs
] # noqa
return self._last_downloaded_netcdfs
[docs] def to_xarray(self, **kwargs):
"""
Retrieve the OOI streams data and export to Xarray Datasets, saving in memory.
Args:
**kwargs: Keyword arguments for xarray open_mfdataset.
Returns:
list: List of xarray datasets
"""
ref_degs = self._filtered_data_catalog["reference_designator"].values
dataset_list = []
if self._data_type == "netcdf":
if not self._cloud_source:
if self._raw_file_dict:
mvbsnc_list = perform_ek60_processing(self._raw_file_dict)
for k, v in mvbsnc_list.items():
resdf = xr.open_mfdataset(
v,
concat_dim=["ping_time"],
combine="nested",
**kwargs,
)
resdf.attrs["id"] = k
dataset_list.append(resdf)
turls = self._perform_check()
if len(turls) > 0:
# TODO: Cache netcdf urls so that no need to re-request data
self._netcdf_urls = [get_nc_urls(turl) for turl in turls]
logger.info("Acquiring data from opendap urls ...")
jobs = [
gevent.spawn(fetch_xr, (url, ref_degs), **kwargs)
for url in turls
]
gevent.joinall(jobs, timeout=300)
for job in jobs:
dataset_list.append(job.value)
else:
self._logger.warning(
f"{self._data_type} cannot be converted to xarray dataset"
) # noqa
if dataset_list:
self._dataset_list = dataset_list
return self._dataset_list
[docs] def check_status(self):
""" Function for user to manually check the status of the data """
if not self._q.empty():
return None
turls = []
filtered_data_urls = list(filter(lambda x: "allURLs" in x, self.raw()))
for durl in filtered_data_urls:
turl = self._check_data_status(durl)
if turl:
turls.append(turl)
if len(turls) == len(filtered_data_urls):
return turls
return None
[docs] def data_availability(self):
"""
Plots data availability of desired instruments.
Returns:
pandas.DataFrame: Instrument Stream legend
"""
import matplotlib.pyplot as plt
import seaborn as sns
plt.clf()
plt.close("all")
inst = self._filtered_data_catalog.copy()
if isinstance(inst, pd.DataFrame):
if len(inst) > 0:
da_list = []
for idx, i in inst.iterrows():
if i.instrument_name not in [
"Bio-acoustic Sonar (Coastal)"
]:
da_list.append(self._fetch_monthly_stats(i))
else:
print(
f"{i.reference_designator} not available for data availability"
)
if len(da_list) > 0:
dadf = pd.concat(da_list)
dadf.loc[:, "unique_rd"] = dadf.apply(
lambda row: "-".join(
[
row.reference_designator,
row.stream_method,
row.stream_rd,
]
),
axis=1,
)
inst.loc[:, "unique_rd"] = inst.apply(
lambda row: "-".join(
[
row.reference_designator,
row.stream_method,
row.stream_rd,
]
),
axis=1,
)
name_df = inst[
[
"array_name",
"site_name",
"infrastructure_name",
"instrument_name",
"unique_rd",
]
]
raw_plotdf = pd.merge(dadf, name_df)
plotdf = raw_plotdf.pivot_table(
index="unique_rd", columns="month", values="percentage"
)
sns.set(style="white")
_, ax = plt.subplots(figsize=(20, 10))
ax.set_title("OOI Data Availability")
sns.heatmap(
plotdf,
annot=False,
fmt=".2f",
linewidths=1,
ax=ax,
square=True,
cmap=sns.light_palette("green"),
cbar_kws={
"orientation": "horizontal",
"shrink": 0.7,
"pad": 0.3,
"aspect": 30,
},
)
plt.ylabel("Instruments", rotation=0, labelpad=60)
plt.xlabel("Months", labelpad=30)
plt.yticks(rotation=0)
plt.tight_layout()
legend = raw_plotdf[
(
list(raw_plotdf.columns.values[-5:])
+ ["stream_method", "stream_rd"]
)
].drop_duplicates(subset="unique_rd")
legend.loc[:, "labels"] = legend.apply(
lambda row: [
row.array_name,
row.site_name,
row.infrastructure_name, # noqa
row.instrument_name,
row.stream_method,
row.stream_rd,
],
axis=1,
)
ldct = {}
for idx, row in legend.iterrows():
ldct[row.unique_rd] = row.labels
return pd.DataFrame.from_dict(ldct)
return None
elif len(inst) > 50:
raise Exception(
f"You have {len(inst)} unique streams; too many to fetch deployments. Please filter by performing search."
) # noqa
else:
logger.warning("Data catalog is empty.")
else:
logger.warning(
"Please find your desired instruments by using .search() method."
)
return self
def _perform_check(self):
""" Performing data status check every 10 seconds """
turls = self.check_status()
start = datetime.datetime.now()
while turls is None:
time.sleep(10)
end = datetime.datetime.now()
delta = end - start
logger.info(f"Data request time elapsed: {delta.seconds}s")
print(f"Data request time elapsed: {delta.seconds}s")
turls = self.check_status()
return turls
def _prepare_download(self):
""" Prepare netcdf download by parsing through the resulting raw urls """
import itertools
download_urls = [rurl["allURLs"][1] for rurl in self.raw()]
download_list = []
for durl in download_urls:
fname = "-".join(os.path.basename(durl).split("-")[1:])
nc_urls = get_download_urls(durl)
download_list.append(
list(
filter(
lambda x: (x.count(fname) > 1) and ("cal_" not in x),
nc_urls,
)
)
)
self._last_download_list = list(
itertools.chain.from_iterable(download_list)
) # noqa
return self._last_download_list
def _check_data_status(self, data):
""" Check if data is ready or not by looking for status.txt"""
urls = {
"thredds_url": data["allURLs"][0],
"status_url": data["allURLs"][1],
}
check_complete = "/".join([urls["status_url"], "status.txt"])
req = requests.get(check_complete)
status_code = req.status_code
if status_code != 200:
text = f'Your data ({urls["status_url"]}) is still compiling... Please wait.'
print(text) # noqa
logger.info(text) # noqa
return None
text = f'Request ({urls["status_url"]}) completed.'
print(text) # noqa
logger.info(text) # noqa
return urls["thredds_url"]
def _process_request(self):
""" Sets up thread workers, currently set to 5 """
for x in range(5):
thread = threading.Thread(
name="process-request", target=self._threader
)
# this ensures the thread will die when the main thread dies
# can set t.daemon to False if you want it to keep running
thread.daemon = True
thread.start()
def _perform_request(self, arg):
""" Function that perform task from queue """
# when this exits, the print_lock is released
with print_lock:
req = fetch_url(prepped_request=arg, session=self._session)
if req.json():
jsonres = req.json()
if "status_code" in jsonres:
jsonres["request_url"] = req.url
self._raw_data.append(jsonres)
logger.debug(arg)
def _perform_cloud_request(self, arg):
""" Function that perform task from queue """
# when this exits, the print_lock is released
with print_lock:
selected, start_dt, end_dt = arg
total_ds = xr.merge(
[
xr.open_zarr(store=s3fs.S3Map(sel.uri, s3=FILE_SYSTEM))
for idx, sel in selected.iterrows()
]
)
self._raw_data.append(selected)
if len(total_ds.coords) > 0:
self._dataset_list.append(
total_ds.sel(time=slice(start_dt, end_dt))
)
else:
message = f"{selected.iloc[0].uri} dates {start_dt} to {end_dt} is empty!"
logger.info(message)
logger.debug(selected)
def _threader(self):
""" Get job from the front of queue and pass to function """
while True:
nextq = self._q.get()
if self._cloud_source:
self._perform_cloud_request(nextq)
else:
self._perform_request(nextq)
self._q.task_done()
def _get_instruments_catalog(self):
""" Get instruments catalog """
if self._rtoc:
rtoc = self._rtoc
else:
rtoc = fetch_url(
requests.Request(
"GET",
self._OOI_M2M_TOC,
auth=(self.ooi_username, self.ooi_token),
).prepare(),
session=self._session,
)
self._rtoc = rtoc
toc_json = rtoc.json()
instruments_json = toc_json["instruments"]
return parse_toc_instruments(instruments_json)
def _get_vocab(self):
""" Get vocabulary """
if self._rvocab:
rvocab = self._rvocab
else:
rvocab = fetch_url(
requests.Request(
"GET",
self._OOI_M2M_VOCAB,
auth=(self.ooi_username, self.ooi_token),
).prepare(),
session=self._session,
)
self._rvocab = rvocab
return pd.DataFrame(rvocab.json())
def _get_rawdata_filelist(self):
raw_data_url = {
"CE04OSPS-PC01B": "https://rawdata.oceanobservatories.org/files/CE04OSPS/PC01B/ZPLSCB102_10.33.10.143",
"CE02SHBP-MJ01C": "https://rawdata.oceanobservatories.org/files/CE02SHBP/MJ01C/ZPLSCB101_10.33.13.7",
}
raw_datadf = {}
for ref, raw_url in raw_data_url.items():
req = requests.get(raw_url)
if req.status_code == 200:
page = html_parser(req.content)
else:
page = req.status_code
if not isinstance(page, int):
files = [
(
datetime.datetime.strptime(
a.get("href"), "OOI-D%Y%m%d-T%H%M%S.raw"
),
a.get("href"),
)
for a in page.xpath(
"//a[re:match(@href, '(\w)+\.raw')]",
namespaces={
"re": "http://exslt.org/regular-expressions"
},
)
]
file_df = pd.DataFrame(
files, columns=["datetime", "filename"]
).set_index("datetime")
raw_datadf[ref] = file_df
self._raw_datadf = raw_datadf
self._raw_data_url = raw_data_url
return self._raw_datadf, self._raw_data_url
def _get_data_catalog(self):
""" Get Data Catalog """
if self._current_data_catalog:
return self._current_data_catalog
else:
instruments_catalog = self._get_instruments_catalog()
vocabdf = self._get_vocab()
streams = self._get_streams()
parameter_streamdf = self._get_parameter_streams()
raw_data_catalog = pd.merge(
instruments_catalog,
vocabdf,
left_on="reference_designator",
right_on="refdes",
)
raw_data_catalog = pd.merge(
raw_data_catalog,
streams,
left_on="stream",
right_on="stream_rd",
)
allcat = pd.merge(raw_data_catalog, parameter_streamdf)
self._current_data_catalog = parse_raw_data_catalog(allcat)
return self._current_data_catalog
def _get_global_ranges(self):
""" Get global ranges in OOI """
if self._rglobal_range:
rglobal_range = self._rglobal_range
else:
rglobal_range = fetch_url(
requests.Request("GET", self._OOI_GLOBAL_RANGE).prepare(),
session=self._session,
)
self._rglobal_range = rglobal_range
global_ranges = pd.read_csv(StringIO(rglobal_range.text))
return parse_global_range_dataframe(global_ranges)
def _get_streams(self):
""" Get OOI Streams """
if self._rstreams:
rstreams = self._rstreams
else:
rstreams = fetch_url(
requests.Request(
"GET",
self._OOI_M2M_STREAMS,
auth=(self.ooi_username, self.ooi_token),
).prepare(),
session=self._session,
)
self._rstreams = rstreams
streamsdf = pd.DataFrame.from_records(rstreams.json()).copy()
return parse_streams_dataframe(streamsdf)
def _get_deployments(self, inst):
""" Get deployment of the inst pandas Series object """
params = {"refdes": inst.reference_designator}
rdeployments = fetch_url(
requests.Request(
"GET",
self._OOI_M2M_DEPLOYMENT_QUERY, # noqa
auth=(self.ooi_username, self.ooi_token),
params=params,
).prepare(),
session=self._session,
)
if rdeployments:
return parse_deployments_json(rdeployments.json(), inst)
return None
def _get_annotations(self, inst):
""" Get annotations of the inst pandas Series object """
params = {
"beginDT": unix_time_millis(
parser.parse(inst.begin_date).replace(tzinfo=pytz.UTC)
), # noqa
"endDT": unix_time_millis(
parser.parse(inst.end_date).replace(tzinfo=pytz.UTC)
), # noqa
"method": inst.stream_method,
"refdes": inst.reference_designator,
"stream": inst.stream_rd,
}
rannotations = fetch_url(
requests.Request(
"GET",
self._OOI_M2M_ANNOTATIONS,
auth=(self.ooi_username, self.ooi_token),
params=params,
).prepare(),
session=self._session,
)
if rannotations:
return parse_annotations_json(rannotations.json())
return None
def _get_parameter_streams(self):
""" Get OOI Parameter Streams """
if self._rstreams:
rstreams = self._rstreams
else:
rstreams = fetch_url(
requests.Request(
"GET",
self._OOI_M2M_STREAMS,
auth=(self.ooi_username, self.ooi_token),
).prepare(),
session=self._session,
)
self._rstreams = rstreams
streamsdf = pd.DataFrame.from_records(rstreams.json()).copy()
return parse_parameter_streams_dataframe(streamsdf)
def _fetch_monthly_stats(self, inst):
""" Fetched monthly stats for instrument object """
rmstats = fetch_url(
requests.Request(
"GET",
f"{self._OOI_VISUALOCEAN_M_STATS}/{inst.reference_designator}.json",
).prepare(), # noqa
session=self._session,
)
mstatsdf = pd.DataFrame(rmstats.json()).copy()
mstatsdf.loc[:, "percentage"] = mstatsdf.percentage.apply(
lambda row: row * 100.0
)
mstatsdf.loc[:, "reference_designator"] = inst.reference_designator
return mstatsdf.rename(
{"stream": "stream_rd", "method": "stream_method"}, axis=1
)
def _use_existing_credentials(self):
if os.path.exists(CREDENTIALS_FILE):
import json
with open(CREDENTIALS_FILE) as f:
creds = json.load(f)["ooi"]
self.ooi_username = creds["username"]
self.ooi_token = creds["api_key"]
else:
logger.error(
"Please authenticate by using yodapy.utils.creds.set_credentials_file"
) # noqa