Source code for sparcl.Results

"""Containers for results from SPARCL Server.
These include results of client.retrieve() client.find().
"""

from collections import UserList

#!import copy
from sparcl.utils import _AttrDict

# from sparcl.gather_2d import bin_spectra_records
import sparcl.exceptions as ex
import sparcl.specutils as su
from warnings import warn
import re


[docs] class Results(UserList): def __init__(self, dict_list, client=None): super().__init__(dict_list) self.hdr = dict_list[0] self.recs = dict_list[1:] self.client = client self.fields = client.fields if client else [] self.to_science_fields() # HACK 12/14/2023 -sp- to fix UUID problem presumably # produced on stack version upgrade (to Django 4.2, postgres 13+) # Done per AB for expediency since real solution will be easier # after field-renaming is removed. for rec in self.recs: if "sparcl_id" in rec: rec["sparcl_id"] = str(rec["sparcl_id"]) # END __init__() # https://docs.python.org/3/library/collections.html#collections.deque.clear
[docs] def clear(self): """Delete the contents of this collection.""" super().clear() self.hdr = {} self.recs = []
@property def info(self): """Info about this collection. e.g. Warnings, parameters used to get the collection, etc.""" # Consolodate "Successfully found...." messages cln_hdr = self.hdr.copy() count = 0 success_msg = "" info = [] for msg in cln_hdr["status"]["info"]: if "Successfully" in msg: matches = re.search("Successfully found ([0-9]*)", msg) count += int(matches.groups()[0]) if len(matches.groups()) > 0 else 0 # noqa: E501 success_msg = msg else: info.append(msg) if count > 0: msg = re.sub(r"[0-9]+", str(count), success_msg, count=1) info.append(msg) cln_hdr["status"]["info"] = info return cln_hdr @property def count(self): """Number of records in this collection.""" return len(self.recs) @property def records(self): """Records in this collection. Each record is a dictionary.""" return self.recs def json(self): return self.data
[docs] def unit_for(self, fieldname, data_release=None): """ Look up the unit string for a specific field in a SPARCL header. Searches the ``UNITS`` block of a SPARCL ``Found`` or ``Retrieved`` header object and returns the unit string(s) associated with the requested field, optionally filtered to a single data release. Args: fieldname (str): Science field name (e.g. 'flux', 'wavelength'). data_release (str, optional): Data release str (e.g. 'BOSS-DR17'). Defaults to None, which will return units for all available data releases. Returns: str or None: The unit string for the requested ``fieldname``. Returned when ``data_release`` is provided, or when all data releases share the same unit. Returns ``None`` if the field is dimensionless or categorical (e.g. ``'spectype'``). dict of {str : str or None}: A dict mapping each data release name to its unit string (or ``None``) for the request ``fieldname``. Returned only when ``data_release`` is ``None`` and units differ across data releases. Examples: >>> results.unit_for('flux') '1e-17 erg cm-2 s-1 AA-1' >>> results.unit_for('wave_sigma') {'SDSS-DR17': 'pixel', 'DESI-DR1': 'AA'} >>> results.unit_for('dec', data_release='DESI-DR1') 'deg' """ units = self.hdr.get("UNITS", {}) if data_release: dr_fields = units.get(data_release, {}) if fieldname not in dr_fields: raise ex.UnknownField( f"Field {fieldname!r} not found in UNITS for " f"{data_release!r}.") return dr_fields[fieldname] per_dr = {rel: fields.get(fieldname) for rel, fields in units.items()} if not any(fieldname in fields for fields in units.values()): raise ex.UnknownField( f"Field {fieldname!r} not found in UNITS for any data " f"release.") unique = set(per_dr.values()) if len(unique) == 1: return unique.pop() return per_dr
# Convert Internal field names to Science field names. # SIDE-EFFECT: modifies self.recs def to_science_fields(self): # from_orig newrecs = list() for rec in self.recs: newrec = dict() dr = rec["_dr"] keep = True for orig in rec.keys(): if orig == "_dr": # keep DR around unchanged. We need it to rename back # to Internal Field Names later. newrec[orig] = rec[orig] else: new = self.fields._science_name(orig, dr) if new is None: keep = False # We don't have name mapping, toss rec newrec[new] = rec[orig] if keep: newrecs.append(_AttrDict(newrec)) self.recs = newrecs # Convert Science field names to Internal field names. def to_internal_fields(self): for rec in self.recs: dr = rec.get("_dr") for new in rec.keys(): if new == "_dr": # keep DR around unchanged. We need it to rename back # to Internal Field Names later. continue new = self.fields._internal_name(new, dr) rec[new] = rec.pop(new) def science_to_internal_fields(self): newrecs = list() for rec in self.recs: newrec = dict() dr = rec["_dr"] keep = True for sci_name in rec.keys(): if sci_name == "_dr": # keep DR around unchanged. We need it to rename back # to Internal Field Names later. newrec[sci_name] = rec[sci_name] else: new = self.fields._internal_name(sci_name, dr) if new is None: keep = False newrec[new] = rec[sci_name] if keep: newrecs.append(_AttrDict(newrec)) self.recs = newrecs return self.recs
[docs] def reorder(self, ids_og): """ Reorder the retrieved records to be in the same order as the original IDs passed to client.retrieve(). Args: ids_og (:obj:`list`): List of sparcl_ids or specIDs. Returns: reordered (:class:`~sparcl.Results.Retrieved`): Contains header and reordered records. """ if len(ids_og) <= 0: msg = ( f"The list of IDs passed to the reorder method " f"does not contain any sparcl_ids or specIDs." ) raise ex.NoIDs(msg) elif len(self.recs) <= 0: msg = "The retrieved or found results did not contain any records." raise ex.NoRecords(msg) else: # Get the ids or specids from retrieved records if isinstance(ids_og[0], str): ids_re = [f.get("sparcl_id") for f in self.recs] elif isinstance(ids_og[0], int): ids_re = [f.get("specid") for f in self.recs] # Enumerate the original ids dict_og = {x: i for i, x in enumerate(ids_og)} # Enumerate the retrieved ids dict_re = {x: i for i, x in enumerate(ids_re)} # Get the indices of the original ids. Set to None if not found idx = [dict_re.get(key, None) for key in dict_og.keys()] # Get the indices of None values none_idx = [i for i, v in enumerate(idx) if v is None] # Reorder the retrieved records reordered = [self.recs[i] for i in idx if i is not None] # Insert dummy record(s) if applicable dummy_record = ("{'sparcl_id': None, 'specid': None, " "'_dr': 'SDSS-DR17'}") for i in none_idx: reordered.insert( i, {"sparcl_id": None, "specid": None, "_dr": "SDSS-DR17"} ) reordered.insert(0, self.hdr) meta = reordered[0] if len(none_idx) > 0: msg = ( f"{len(none_idx)} sparcl_ids or specIDs were " f"not found in " f'the database. Use "client.missing()" ' f"to get a list of the unavailable IDs. " f"To maintain correct reordering, a dummy " f"record has been placed at the indices " f"where no record was found. Those " f"indices are: {none_idx}. The dummy " f"record will appear as follows: " f"{dummy_record}. " ) meta["status"].update({"warnings": [msg]}) warn(msg, stacklevel=2) return self.__class__(reordered, client=self.client)
[docs] def to_specutils(self): """Convert results to a `specutils` object. Returns: to_specutils (:class:`~specutils.Spectrum`): a `specutils` object. """ return su.to_specutils(self)
[docs] def to_pandas(self): """Convert results to a pandas DataFrame object. Returns: to_pandas (:class:`~pandas.DataFrame`): a pandas DataFrame object. """ import pandas as pd return pd.json_normalize(self.records)
# For results of retrieve()
[docs] class Retrieved(Results): """Holds spectra records (and header).""" def __init__(self, dict_list, client=None): super().__init__(dict_list, client=client) def __repr__(self): return f"Retrieved Results: {len(self.recs)} records"
#! def bin_spectra(self): #! """Align flux from all records by common wavelength bin. #! #! A value of nan is used where a record does not contain a flux #! value for a specific bin. #! #! Returns: #! flux: 2d numpy array with shape (numRecords, numWavelengthBins) #! Flux value for each record, each bin #! wavs: 1d numpy array with shape (numWavelengthBins) #! Wavelength values for each bin #! #! Example: #! >>> client = sparcl.client.SparclClient() #! >>> found = client.find( #! constraints={"data_release": ['BOSS-DR16']}, #! limit=10) #! >>> got = client.retrieve(found.ids) #! >>> flux2d,wavs = got.bin_spectra() #! #! """ #! flux2d, wavs = bin_spectra_records(self.recs) #! return flux2d, wavs
[docs] class Found(Results): """Holds metadata records (and header).""" def __init__(self, dict_list, client=None): super().__init__(dict_list, client=client) def __repr__(self): return f"Find Results: {len(self.recs)} records" @property def ids(self): """List of unique identifiers of matched records.""" #! dr = list(self.fields.all_drs)[0] return [d.get("sparcl_id") for d in self.recs]