"""Containers for results from SPARCL Server.
These include results of client.retrieve() client.find().
"""
from collections import UserList
#!import copy
from sparcl.utils import _AttrDict
# from sparcl.gather_2d import bin_spectra_records
import sparcl.exceptions as ex
from warnings import warn
[docs]
class Results(UserList):
def __init__(self, dict_list, client=None):
super().__init__(dict_list)
self.hdr = dict_list[0]
self.recs = dict_list[1:]
self.client = client
self.fields = client.fields
self.to_science_fields()
# HACK 12/14/2023 -sp- to fix UUID problem presumably
# produced on stack version upgrade (to Django 4.2, postgres 13+)
# Done per AB for expediency since real solution will be easier
# after field-renaming is removed.
for rec in self.recs:
if "sparcl_id" in rec:
rec["sparcl_id"] = str(rec["sparcl_id"])
# END __init__()
# https://docs.python.org/3/library/collections.html#collections.deque.clear
[docs]
def clear(self):
"""Delete the contents of this collection."""
super().clear()
self.hdr = {}
self.recs = []
@property
def info(self):
"""Info about this collection.
e.g. Warnings, parameters used to get the collection, etc."""
return self.hdr
@property
def count(self):
"""Number of records in this collection."""
return len(self.recs)
@property
def records(self):
"""Records in this collection. Each record is a dictionary."""
return self.recs
def json(self):
return self.data
# Convert Internal field names to Science field names.
# SIDE-EFFECT: modifies self.recs
def to_science_fields(self): # from_orig
newrecs = list()
for rec in self.recs:
newrec = dict()
dr = rec["_dr"]
keep = True
for orig in rec.keys():
if orig == "_dr":
# keep DR around unchanged. We need it to rename back
# to Internal Field Names later.
newrec[orig] = rec[orig]
else:
new = self.fields._science_name(orig, dr)
if new is None:
keep = False # We don't have name mapping, toss rec
newrec[new] = rec[orig]
if keep:
newrecs.append(_AttrDict(newrec))
self.recs = newrecs
# Convert Science field names to Internal field names.
def to_internal_fields(self):
for rec in self.recs:
dr = rec.get("_dr")
for new in rec.keys():
if new == "_dr":
# keep DR around unchanged. We need it to rename back
# to Internal Field Names later.
continue
new = self.fields._internal_name(new, dr)
rec[new] = rec.pop(new)
def science_to_internal_fields(self):
newrecs = list()
for rec in self.recs:
newrec = dict()
dr = rec["_dr"]
keep = True
for sci_name in rec.keys():
if sci_name == "_dr":
# keep DR around unchanged. We need it to rename back
# to Internal Field Names later.
newrec[sci_name] = rec[sci_name]
else:
new = self.fields._internal_name(sci_name, dr)
if new is None:
keep = False
newrec[new] = rec[sci_name]
if keep:
newrecs.append(_AttrDict(newrec))
self.recs = newrecs
return self.recs
[docs]
def reorder(self, ids_og):
"""
Reorder the retrieved records to be in the same
order as the original IDs passed to client.retrieve().
Args:
ids_og (:obj:`list`): List of sparcl_ids or specIDs.
Returns:
reordered (:class:`~sparcl.Results.Retrieved`): Contains header and
reordered records.
# none_idx (:obj:`list`): List of indices where record is None.
"""
if len(ids_og) <= 0:
msg = (
f"The list of IDs passed to the reorder method "
f"does not contain any sparcl_ids or specIDs."
)
raise ex.NoIDs(msg)
elif len(self.recs) <= 0:
msg = (
"The retrieved or found results did not "
"contain any records."
)
raise ex.NoRecords(msg)
else:
# Transform science fields to internal fields
new_recs = self.science_to_internal_fields()
# Get the ids or specids from retrieved records
if type(ids_og[0]) is str:
ids_re = [f["sparcl_id"] for f in new_recs]
elif type(ids_og[0]) is int:
ids_re = [f["specid"] for f in new_recs]
# Enumerate the original ids
dict_og = {x: i for i, x in enumerate(ids_og)}
# Enumerate the retrieved ids
dict_re = {x: i for i, x in enumerate(ids_re)}
# Get the indices of the original ids. Set to None if not found
idx = [dict_re.get(key, None) for key in dict_og.keys()]
# Get the indices of None values
none_idx = [i for i, v in enumerate(idx) if v is None]
# Reorder the retrieved records
reordered = [self.recs[i] for i in idx if i is not None]
# Insert dummy record(s) if applicable
dummy_record = "{'id': None, 'specid': None, '_dr': 'SDSS-DR16'}"
for i in none_idx:
reordered.insert(
i, {"sparcl_id": None, "specid": None, "_dr": "SDSS-DR16"}
)
reordered.insert(0, self.hdr)
meta = reordered[0]
if len(none_idx) > 0:
msg = (
f"{len(none_idx)} sparcl_ids or specIDs were "
f"not found in "
f'the database. Use "client.missing()" '
f"to get a list of the unavailable IDs. "
f"To maintain correct reordering, a dummy "
f"record has been placed at the indices "
f"where no record was found. Those "
f"indices are: {none_idx}. The dummy "
f"record will appear as follows: "
f"{dummy_record}. "
)
meta["status"].update({"warnings": [msg]})
warn(msg, stacklevel=2)
return Results(reordered, client=self.client)
# For results of retrieve()
[docs]
class Retrieved(Results):
"""Holds spectra records (and header)."""
def __init__(self, dict_list, client=None):
super().__init__(dict_list, client=client)
def __repr__(self):
return f"Retrieved Results: {len(self.recs)} records"
#! def bin_spectra(self):
#! """Align flux from all records by common wavelength bin.
#!
#! A value of nan is used where a record does not contain a flux
#! value for a specific bin.
#!
#! Returns:
#! flux: 2d numpy array with shape (numRecords, numWavelengthBins)
#! Flux value for each record, each bin
#! wavs: 1d numpy array with shape (numWavelengthBins)
#! Wavelength values for each bin
#!
#! Example:
#! >>> client = sparcl.client.SparclClient()
#! >>> found = client.find(
#! constraints={"data_release": ['BOSS-DR16']},
#! limit=10)
#! >>> got = client.retrieve(found.ids)
#! >>> flux2d,wavs = got.bin_spectra()
#!
#! """
#! flux2d, wavs = bin_spectra_records(self.recs)
#! return flux2d, wavs
[docs]
class Found(Results):
"""Holds metadata records (and header)."""
def __init__(self, dict_list, client=None):
super().__init__(dict_list, client=client)
def __repr__(self):
return f"Find Results: {len(self.recs)} records"
@property
def ids(self):
"""List of unique identifiers of matched records."""
#! dr = list(self.fields.all_drs)[0]
return [d.get("sparcl_id") for d in self.recs]