"""Containers for results from SPARCL Server.
These include results of client.retrieve() client.find().
"""
from collections import UserList
#!import copy
from sparcl.utils import _AttrDict
# from sparcl.gather_2d import bin_spectra_records
import sparcl.exceptions as ex
import sparcl.specutils as su
from warnings import warn
import re
[docs]
class Results(UserList):
def __init__(self, dict_list, client=None):
super().__init__(dict_list)
self.hdr = dict_list[0]
self.recs = dict_list[1:]
self.client = client
self.fields = client.fields if client else []
self.to_science_fields()
# HACK 12/14/2023 -sp- to fix UUID problem presumably
# produced on stack version upgrade (to Django 4.2, postgres 13+)
# Done per AB for expediency since real solution will be easier
# after field-renaming is removed.
for rec in self.recs:
if "sparcl_id" in rec:
rec["sparcl_id"] = str(rec["sparcl_id"])
# END __init__()
# https://docs.python.org/3/library/collections.html#collections.deque.clear
[docs]
def clear(self):
"""Delete the contents of this collection."""
super().clear()
self.hdr = {}
self.recs = []
@property
def info(self):
"""Info about this collection.
e.g. Warnings, parameters used to get the collection, etc."""
# Consolodate "Successfully found...." messages
cln_hdr = self.hdr.copy()
count = 0
success_msg = ""
info = []
for msg in cln_hdr["status"]["info"]:
if "Successfully" in msg:
matches = re.search("Successfully found ([0-9]*)", msg)
count += int(matches.groups()[0]) if len(matches.groups()) > 0 else 0 # noqa: E501
success_msg = msg
else:
info.append(msg)
if count > 0:
msg = re.sub(r"[0-9]+", str(count), success_msg, count=1)
info.append(msg)
cln_hdr["status"]["info"] = info
return cln_hdr
@property
def count(self):
"""Number of records in this collection."""
return len(self.recs)
@property
def records(self):
"""Records in this collection. Each record is a dictionary."""
return self.recs
def json(self):
return self.data
[docs]
def unit_for(self, fieldname, data_release=None):
"""
Look up the unit string for a specific field in a SPARCL header.
Searches the ``UNITS`` block of a SPARCL ``Found`` or ``Retrieved``
header object and returns the unit string(s) associated with the
requested field, optionally filtered to a single data release.
Args:
fieldname (str): Science field name (e.g. 'flux', 'wavelength').
data_release (str, optional): Data release str (e.g. 'BOSS-DR17').
Defaults to None, which will return units for all available
data releases.
Returns:
str or None: The unit string for the requested ``fieldname``.
Returned when ``data_release`` is provided, or when all data
releases share the same unit. Returns ``None`` if the field
is dimensionless or categorical (e.g. ``'spectype'``).
dict of {str : str or None}: A dict mapping each data release name
to its unit string (or ``None``) for the request ``fieldname``.
Returned only when ``data_release`` is ``None`` and units
differ across data releases.
Examples:
>>> results.unit_for('flux')
'1e-17 erg cm-2 s-1 AA-1'
>>> results.unit_for('wave_sigma')
{'SDSS-DR17': 'pixel', 'DESI-DR1': 'AA'}
>>> results.unit_for('dec', data_release='DESI-DR1')
'deg'
"""
units = self.hdr.get("UNITS", {})
if data_release:
dr_fields = units.get(data_release, {})
if fieldname not in dr_fields:
raise ex.UnknownField(
f"Field {fieldname!r} not found in UNITS for "
f"{data_release!r}.")
return dr_fields[fieldname]
per_dr = {rel: fields.get(fieldname) for rel, fields in units.items()}
if not any(fieldname in fields for fields in units.values()):
raise ex.UnknownField(
f"Field {fieldname!r} not found in UNITS for any data "
f"release.")
unique = set(per_dr.values())
if len(unique) == 1:
return unique.pop()
return per_dr
# Convert Internal field names to Science field names.
# SIDE-EFFECT: modifies self.recs
def to_science_fields(self): # from_orig
newrecs = list()
for rec in self.recs:
newrec = dict()
dr = rec["_dr"]
keep = True
for orig in rec.keys():
if orig == "_dr":
# keep DR around unchanged. We need it to rename back
# to Internal Field Names later.
newrec[orig] = rec[orig]
else:
new = self.fields._science_name(orig, dr)
if new is None:
keep = False # We don't have name mapping, toss rec
newrec[new] = rec[orig]
if keep:
newrecs.append(_AttrDict(newrec))
self.recs = newrecs
# Convert Science field names to Internal field names.
def to_internal_fields(self):
for rec in self.recs:
dr = rec.get("_dr")
for new in rec.keys():
if new == "_dr":
# keep DR around unchanged. We need it to rename back
# to Internal Field Names later.
continue
new = self.fields._internal_name(new, dr)
rec[new] = rec.pop(new)
def science_to_internal_fields(self):
newrecs = list()
for rec in self.recs:
newrec = dict()
dr = rec["_dr"]
keep = True
for sci_name in rec.keys():
if sci_name == "_dr":
# keep DR around unchanged. We need it to rename back
# to Internal Field Names later.
newrec[sci_name] = rec[sci_name]
else:
new = self.fields._internal_name(sci_name, dr)
if new is None:
keep = False
newrec[new] = rec[sci_name]
if keep:
newrecs.append(_AttrDict(newrec))
self.recs = newrecs
return self.recs
[docs]
def reorder(self, ids_og):
"""
Reorder the retrieved records to be in the same
order as the original IDs passed to client.retrieve().
Args:
ids_og (:obj:`list`): List of sparcl_ids or specIDs.
Returns:
reordered (:class:`~sparcl.Results.Retrieved`): Contains header and
reordered records.
"""
if len(ids_og) <= 0:
msg = (
f"The list of IDs passed to the reorder method "
f"does not contain any sparcl_ids or specIDs."
)
raise ex.NoIDs(msg)
elif len(self.recs) <= 0:
msg = "The retrieved or found results did not contain any records."
raise ex.NoRecords(msg)
else:
# Get the ids or specids from retrieved records
if isinstance(ids_og[0], str):
ids_re = [f.get("sparcl_id") for f in self.recs]
elif isinstance(ids_og[0], int):
ids_re = [f.get("specid") for f in self.recs]
# Enumerate the original ids
dict_og = {x: i for i, x in enumerate(ids_og)}
# Enumerate the retrieved ids
dict_re = {x: i for i, x in enumerate(ids_re)}
# Get the indices of the original ids. Set to None if not found
idx = [dict_re.get(key, None) for key in dict_og.keys()]
# Get the indices of None values
none_idx = [i for i, v in enumerate(idx) if v is None]
# Reorder the retrieved records
reordered = [self.recs[i] for i in idx if i is not None]
# Insert dummy record(s) if applicable
dummy_record = ("{'sparcl_id': None, 'specid': None, "
"'_dr': 'SDSS-DR17'}")
for i in none_idx:
reordered.insert(
i, {"sparcl_id": None, "specid": None, "_dr": "SDSS-DR17"}
)
reordered.insert(0, self.hdr)
meta = reordered[0]
if len(none_idx) > 0:
msg = (
f"{len(none_idx)} sparcl_ids or specIDs were "
f"not found in "
f'the database. Use "client.missing()" '
f"to get a list of the unavailable IDs. "
f"To maintain correct reordering, a dummy "
f"record has been placed at the indices "
f"where no record was found. Those "
f"indices are: {none_idx}. The dummy "
f"record will appear as follows: "
f"{dummy_record}. "
)
meta["status"].update({"warnings": [msg]})
warn(msg, stacklevel=2)
return self.__class__(reordered, client=self.client)
[docs]
def to_specutils(self):
"""Convert results to a `specutils` object.
Returns:
to_specutils (:class:`~specutils.Spectrum`): a `specutils` object.
"""
return su.to_specutils(self)
[docs]
def to_pandas(self):
"""Convert results to a pandas DataFrame object.
Returns:
to_pandas (:class:`~pandas.DataFrame`): a pandas DataFrame object.
"""
import pandas as pd
return pd.json_normalize(self.records)
# For results of retrieve()
[docs]
class Retrieved(Results):
"""Holds spectra records (and header)."""
def __init__(self, dict_list, client=None):
super().__init__(dict_list, client=client)
def __repr__(self):
return f"Retrieved Results: {len(self.recs)} records"
#! def bin_spectra(self):
#! """Align flux from all records by common wavelength bin.
#!
#! A value of nan is used where a record does not contain a flux
#! value for a specific bin.
#!
#! Returns:
#! flux: 2d numpy array with shape (numRecords, numWavelengthBins)
#! Flux value for each record, each bin
#! wavs: 1d numpy array with shape (numWavelengthBins)
#! Wavelength values for each bin
#!
#! Example:
#! >>> client = sparcl.client.SparclClient()
#! >>> found = client.find(
#! constraints={"data_release": ['BOSS-DR16']},
#! limit=10)
#! >>> got = client.retrieve(found.ids)
#! >>> flux2d,wavs = got.bin_spectra()
#!
#! """
#! flux2d, wavs = bin_spectra_records(self.recs)
#! return flux2d, wavs
[docs]
class Found(Results):
"""Holds metadata records (and header)."""
def __init__(self, dict_list, client=None):
super().__init__(dict_list, client=client)
def __repr__(self):
return f"Find Results: {len(self.recs)} records"
@property
def ids(self):
"""List of unique identifiers of matched records."""
#! dr = list(self.fields.all_drs)[0]
return [d.get("sparcl_id") for d in self.recs]