Source code for ESOAsg.datacontainers.lightcurves

r"""Class to work on light curves
"""

import numpy as np
from astropy.io import fits
from astropy.io.fits.column import NUMPY2FITS
from astropy.table import Column, Table

from ESOAsg import msgs
from ESOAsg.core import fitsfiles
from ESOAsg.ancillary import checks

MANDATORY_PRIMARY_HEADER_KEYWORDS = ['TELESCOP', 'INSTRUME']


def _return_data_from_column(table, col_name):
    r"""Check that data are present in the `col_name` column of a table.

    Args:
        table (`fits.BinTableHDU`, `fits.TableHDU`):
            table from which you want to extract the data
        col_name (`str`):
            column name from which you want to extract the data.

    Returns:
        column_data (np.array):
            numpy array containing the data. Set to `None` if no data are present
        column_length (np.int):
            length of column_data. Set to `np.int(0)` if no data are present
    """
    if len(table.data[col_name]) > 0:
        column_data = np.array(table.data[col_name])
        column_length = len(column_data)
    else:
        column_data = None
        column_length = np.int(0)
    return column_data, column_length


def _return_attribute_from_column(table, col_name, attribute):
    r"""Returns the value of an attribute from a column of a table

    Args:
        table (`fits.BinTableHDU`, `fits.TableHDU`):
            table from which you want to extract the information.
        col_name (`str`):
            column name from which you want to extract the information.
        attribute (`str`):
            attribute name

    Returns:
        attribute_value:
            value of the selected attribute. Returns `None` if attribute is not present
    """
    if hasattr(table.columns[col_name], attribute):
        attribute_value = getattr(table.columns[col_name], attribute)
    else:
        attribute_value = None
    return attribute_value


def _check_attribute_is_in_light_curve_columns(attribute):  # Written by Ema 16.04.2020
    r"""Check if an attribute of a `LightCurves` object is not `header`, `primary_header`, `other`, or `_datatype`

    Args:
        attribute (`str`):
            attribute of the `LightCurves` object you want to check

    Returns:
        is_in_column (`bool`):
            `True` if the `attribute` is not `header`, `primary_header`, `other`, or `_datatype`

    """
    assert isinstance(attribute, (str, np.str)), 'Not valid string for `attribute`'
    is_in_column = not attribute.endswith('header') and attribute is not 'others' and not attribute.startswith('_')
    return is_in_column


def _return_format_column(column):
    r"""Returns format of a column in the fits format

    see https://docs.astropy.org/en/stable/io/fits/usage/table.html : column creation for further details.

    Args:
        column (`astropy.table.Column`):
            a `Column` object

    Returns:
        column_format (`str`):
            a string containing the size and the dtype of of `column`
    """
    assert isinstance(column, Column), r"Not a valid `column`"
    column_size = str(column.size)
    #
    if column.dtype.str.startswith(('<', '>', '=')):
        column_dtype = column.dtype.str[1:]
    else:
        column_dtype = column.dtype.str
    column_type = NUMPY2FITS[column_dtype]
    column_format = column_size + column_type
    return column_format


[docs]def save_into_fits(fits_file_name, primary_header, light_curve_headers, light_curve_names, light_curves, overwrite=True): r""" Args: light_curves: light_curve_names: light_curve_headers: fits_file_name: primary_header: lightcurves: overwrite (`bool`): Returns: """ assert isinstance(fits_file_name, (str, np.str)), r'Not a valid name for the fits file' if not fits_file_name.endswith('.fits'): fits_files_name = fits_file_name + '.fits' if not isinstance(light_curves, list): assert isinstance(light_curves, LightCurves), r'Not a LightCurves object' light_curves_list = [light_curves] else: for light_curve in light_curves: assert isinstance(light_curve, LightCurves), r'Not a LightCurves object' light_curves_list = light_curves if light_curve_names is None: light_curves_names_list = [None] * len(light_curves_list) elif not isinstance(light_curve_names, list): assert isinstance(light_curve_names, (str, np.str)), r'Not a valid name' light_curves_names_list = [light_curve_names] else: for light_curve_name in light_curve_names: assert isinstance(light_curve, (str, np.str)), r'Not a valid name' light_curves_names_list = light_curve_name if light_curve_headers is None: light_curve_headers_list = [None] * len(light_curves_list) elif not isinstance(light_curve_headers, list): assert isinstance(light_curve_headers, fits.Header), r'Not a valid header' light_curve_headers_list = [light_curve_headers] else: for light_curve_header in light_curve_headers: assert isinstance(light_curve_header, fits.Header), r'Not a valid header' light_curve_headers_list = light_curves if len(light_curves_names_list) != len(light_curves_list): msgs.error(r'`light_curve_names` and `light_curves` must have the same length') if len(light_curve_headers_list) != len(light_curves_list): msgs.error(r'`light_curve_headers` and `light_curves` must have the same length') # Primary HDU primary_hdu = fits.PrimaryHDU() if primary_header is not None: assert isinstance(primary_header, fits.Header), r'Not a valid header' primary_hdu.header = primary_header hdul = fits.HDUList(primary_hdu) # saving the lightcurves: for light_curve_name, light_curve, light_curve_header in zip(light_curves_names_list, light_curves_list, light_curve_headers_list): msgs.info('Saving {}'.format(light_curve_name)) list_of_columns = [] for attribute in light_curve.__dict__.keys(): if _check_attribute_is_in_light_curve_columns(attribute): if getattr(light_curve, attribute) is not None: # Name col_name = attribute.upper() # Format col_format = _return_format_column(getattr(light_curve, attribute)) # Units # ToDo Units col_units = getattr(light_curve, attribute).unit # Data col_array = np.array(getattr(light_curve, attribute).data) # ToDo Units col = fits.Column(name=col_name, format=col_format, array=[col_array]) list_of_columns.append(col) if light_curve.others is not None: for other_attribute in light_curve.others.colnames: # Name col_name = other_attribute.upper() # Format col_format = _return_format_column(light_curve.others[other_attribute]) # Units # ToDo Units col_units = light_curve.others[other_attribute].unit # Data col_array = np.array(light_curve.others[other_attribute].data) # ToDo Units col = fits.Column(name=col_name, format=col_format, array=[col_array]) list_of_columns.append(col) if light_curve_header is not None: hdu = fits.BinTableHDU.from_columns(list_of_columns, header=light_curve_header, nrows=1) else: hdu = fits.BinTableHDU.from_columns(list_of_columns, nrows=1) if light_curve_name is not None: hdu.header['EXTNAME'] = light_curve_name hdul.append(hdu) hdul.writeto(fits_file_name, overwrite=overwrite, checksum=True)
[docs]class LightCurves: r"""A class used to define and make simple operations on time series This allows to perform some basic tasks on a time series and to save it in a format that is ESO Phase3 compliant. Attributes: Methods: """ def __init__(self, primary_header=None, header=None, time=None, flux=None, error=None, time_bin=None, background=None, quality=None, others=None): r"""Instantiate the class LightCurves Each field of the BINTABLE shall be further described in the extension header. Mandatory fields shall be: * time * flux * error in that particular order. Additional fields may be added. """ self.primary_header = primary_header self.header = header self.time = time self.time_bin = time_bin self.flux = flux self.error = error self.background = background self.quality = quality if others is not None: self.others = others else: self.others = Table() self._datatype = 'LightCurves'
[docs] def load_from_table(self, table, primary_header=None, copy_header=True, where_time='TIME', where_time_bin='TIME_BIN', where_flux='FLUX', where_error='ERROR', where_background='BACKGROUND', where_quality='QUAL'): r"""Given a table put it in a LightCurves object Args: where_quality: where_background: where_error: where_time_bin: where_time: copy_header: primary_header: where_flux : """ if checks.table_is_valid(table): msgs.work('Reading input table') if primary_header is not None: if len(primary_header) > 0: self.primary_header = primary_header else: msgs.warning('Empty `primary_header` provided') if copy_header: if len(table.header) > 0: self.header = table.header else: msgs.warning('No header found in the table') if isinstance(table, fits.BinTableHDU): self._load_from_BinTableHDU(table, copy_header=copy_header, where_time=where_time, where_time_bin=where_time_bin, where_flux=where_flux, where_error=where_error, where_background=where_background, where_quality=where_quality) elif isinstance(table, fits.TableHDU): # ToDo implement TableHDU case msgs.error('To be implemented') else: msgs.error('Unknown table type')
def _load_from_BinTableHDU(self, table, copy_header=True, where_time='TIME', where_time_bin='TIME_BIN', where_flux='FLUX', where_error='ERROR', where_background='BACKGROUND', where_quality='QUAL'): r"""Given a BinTableHDU put it in a LightCurves object """ # Going through all the columns names column_names = [column.name for column in table.columns] column_loaded = [False for column in table.columns] # Loading time, time_bin, flux, error, background, qual (if possible) for attribute, value in zip(self.__dict__.keys(), self.__dict__.values()): if _check_attribute_is_in_light_curve_columns(attribute): where_attribute = vars()['where_' + attribute] # Loading attributes from table columns if where_attribute in column_names: msgs.info('Loading {} from column {}'.format(attribute, where_attribute)) # Check that data are present attribute_data, attribute_length = _return_data_from_column(table, where_attribute) if attribute_length == 0: msgs.warning('No data present. {} set to `None`'.format(attribute)) # Checks for units and dtype attribute_unit = _return_attribute_from_column(table, where_attribute, 'unit') attribute_dtype = _return_attribute_from_column(table, where_attribute, 'dtype') attribute_column = Column(name=attribute, data=attribute_data, length=attribute_length, unit=attribute_unit, dtype=attribute_dtype) setattr(self, attribute, attribute_column) column_loaded[column_names.index(where_attribute)] = True # Dealing with others for other_attribute in column_names: if column_loaded[column_names.index(other_attribute)] is False: other_attribute_data, other_attribute_length = _return_data_from_column(table, other_attribute) other_attribute_unit = _return_attribute_from_column(table, other_attribute, 'unit') other_attribute_dtype = _return_attribute_from_column(table, other_attribute, 'dtype') other_attribute_column = Column(name=other_attribute, data=other_attribute_data, length=other_attribute_length, unit=other_attribute_unit, dtype=other_attribute_dtype) self.others.add_column(other_attribute_column) column_loaded[column_names.index(other_attribute)] = True
[docs] def check(self, autocorrect=False): r"""Checks that a LightCurves objects is in a format compatible with the ESO standard """ good_light_curve = True # Checks that time, flux, and error are inside the LightCurve object if self.time is None: msgs.warning('`time` is not defined') good_light_curve = False if self.flux is None: msgs.warning('`flux` is not defined') good_light_curve = False if self.error is None: msgs.warning('`error` is not defined') good_light_curve = False # Check that all columns have the same length test_length = self.time.size for attribute in self.__dict__.keys(): if _check_attribute_is_in_light_curve_columns(attribute): if getattr(self, attribute) is not None: if getattr(self, attribute).size != test_length: msgs.warning('Inconsistent length in {}'.format(attribute)) good_light_curve = False for other_attribute in self.others.colnames: if self.others[other_attribute].size != test_length: msgs.warning('Inconsistent length in `others[{}]`'.format(attribute)) good_light_curve = False # Check that time is monotonic and always positive if np.min(self.time.data) < 0.: msgs.warning('`time` must be positive') good_light_curve = False delta_time = np.diff(self.time.data) if np.any(delta_time <= 0): msgs.warning('`time` should increase monotonically') good_light_curve = False # Check that there are no +/-inf for attribute in self.__dict__.keys(): if _check_attribute_is_in_light_curve_columns(attribute): if getattr(self, attribute) is not None: if np.any(np.isinf(getattr(self, attribute).data)): msgs.warning('{} contains +/-inf values'.format(attribute)) good_light_curve = False for other_attribute in self.others.colnames: if np.any(np.isinf(self.others[other_attribute].data)): msgs.warning('`others[{}]` contains +/-inf values'.format(other_attribute)) good_light_curve = False # Check that primary HEADERS exists and do not contain null: if not checks.header_is_valid(self.primary_header): msgs.warning('Primary Header not valid') good_light_curve = False if self.primary_header is not None and 'PRODCATG' not in self.primary_header and autocorrect: msgs.warning('AUTOCORRECT: Setting `PRODCATG` keyword to:`SCIENCE.LIGHTCURVE`') self.primary_header['PRODCATG'] = 'SCIENCE.LIGHTCURVE' for MANDATORY_PRIMARY_HEADER_KEYWORD in MANDATORY_PRIMARY_HEADER_KEYWORDS: if MANDATORY_PRIMARY_HEADER_KEYWORD not in self.primary_header: msgs.warning('Keyword: {} missing from the Primary Header'.format(MANDATORY_PRIMARY_HEADER_KEYWORD)) # if autocorrect: # fitsfiles.clean_header_null(self.primary_header) return good_light_curve
def to_fits(self, fits_file_name, light_curve_name='LIGHTCURVE', overwrite=True, autocorrect=False): if not self.check(autocorrect=autocorrect): msgs.error('the LightCurve object does not respect the requirements from ESO') save_into_fits(fits_file_name, self.primary_header, self.header, light_curve_name, self, overwrite=overwrite)