Source code for dot.model

import logging

import numpy as np
import pymc3 as pm
from pymc3.smc import sample_smc

__all__ = ['Model']


class MeanModel(pm.gp.mean.Mean):
    """
    Mean model for Gaussian process regression on photometry with starspots
    """
    def __init__(self, light_curve, rotation_period, n_spots, contrast, t0,
                 latitude_cutoff=10, partition_lon=True):
        pm.gp.mean.Mean.__init__(self)

        if contrast is None:
            contrast = pm.TruncatedNormal("contrast", lower=0.01, upper=0.99,
                                          testval=0.4, mu=0.5, sigma=0.5)

        self.f0 = pm.TruncatedNormal("f0", mu=0, sigma=1,
                                     testval=np.percentile(light_curve.flux, 80),
                                     lower=-1, upper=2)

        self.eq_period = pm.TruncatedNormal("P_eq",
                                            lower=0.8 * rotation_period,
                                            upper=1.2 * rotation_period,
                                            mu=rotation_period,
                                            sigma=0.2 * rotation_period,
                                            testval=rotation_period)

        eps = 1e-5  # Small but non-zero number
        BoundedHalfNormal = pm.Bound(pm.HalfNormal, lower=eps, upper=0.8)
        self.shear = BoundedHalfNormal("shear", sigma=0.2, testval=0.01)

        self.comp_inclination = pm.Uniform("comp_inc",
                                           lower=np.radians(eps),
                                           upper=np.radians(90-eps),
                                           testval=np.radians(1))

        if partition_lon:
            lon_lims = 2 * np.pi * np.arange(n_spots + 1) / n_spots
            lower = lon_lims[:-1]
            upper = lon_lims[1:]
            testval = np.mean([lower, upper], axis=0)
        else:
            lower = 0
            upper = 2 * np.pi
            testval = 2 * np.pi * np.arange(n_spots) / n_spots + 0.01

        self.lon = pm.Uniform("lon",
                              lower=lower,
                              upper=upper,
                              shape=(1, n_spots),
                              testval=testval)
        self.lat = pm.Uniform("lat",
                              lower=np.radians(latitude_cutoff),
                              upper=np.radians(180 - latitude_cutoff),
                              shape=(1, n_spots),
                              testval=np.pi/2)

        self.rspot = BoundedHalfNormal("R_spot",
                                       sigma=0.2,
                                       shape=(1, n_spots),
                                       testval=0.3)
        self.contrast = contrast

        # Need to wrap this equation with a where statement so that there isn't
        # a divide by zero in the tensor math (even though these parameters are
        # bounded to prevent this from happening during sampling)
        self.spot_period = pm.math.where(self.shear < 1,
                                         self.eq_period / (1 - self.shear *
                                             pm.math.sin(self.lat - np.pi / 2) ** 2),
                                         self.eq_period)
        self.sin_lat = pm.math.sin(self.lat)
        self.cos_lat = pm.math.cos(self.lat)
        self.sin_c_inc = pm.math.sin(self.comp_inclination)
        self.cos_c_inc = pm.math.cos(self.comp_inclination)
        self.t0 = t0

    def __call__(self, X):
        phi = 2 * np.pi / self.spot_period * (X - self.t0) - self.lon

        spot_position_x = (pm.math.cos(phi - np.pi / 2) *
                           self.sin_c_inc *
                           self.sin_lat +
                           self.cos_c_inc *
                           self.cos_lat)
        spot_position_y = -(pm.math.sin(phi - np.pi/2) *
                            self.sin_lat)
        spot_position_z = (self.cos_lat *
                           self.sin_c_inc -
                           pm.math.sin(phi) *
                           self.cos_c_inc *
                           self.sin_lat)
        rsq = spot_position_x ** 2 + spot_position_y ** 2
        spot_model = self.f0 - pm.math.sum(self.rspot ** 2 * (1 - self.contrast) *
                                           pm.math.where(spot_position_z > 0,
                                                         pm.math.sqrt(1 - rsq),
                                                         0),
                                           axis=1)

        return spot_model


class DisableLogger():
    """
    Simple logger disabler to minimize info-level messages during PyMC3
    integration
    """
    def __init__(self, verbose):
        self.verbose = verbose

    def __enter__(self):
        if not self.verbose:
            logging.disable(logging.CRITICAL)

    def __exit__(self, a, b, c):
        if not self.verbose:
            logging.disable(logging.NOTSET)


[docs]class Model(object): def __init__(self, light_curve, rotation_period, n_spots, scale_errors=1, skip_n_points=1, latitude_cutoff=10, rho_factor=250, verbose=False, min_time=None, max_time=None, contrast=0.7, partition_lon=False): """ Construct a new instance of `~dot.Model`. Parameters ---------- light_curve : `~lightkurve.lightcurve.LightCurve` rotation_period : float Stellar rotation period n_spots : int Number of spots latitude_cutoff : float Don't place spots above/below this number of degrees from the pole verbose : bool Allow PyMC3 dialogs to print to stdout partition_lon : bool Enforce strict partitions on star in longitude for sampling skip_n_points : int (optional) Skip every n points for faster runs min_time : float or None (optional) Minimum time to consider in the model max_time : float or None (optional) Maximum time to consider in the model contrast : float or None (optional) Starspot contrast rho_factor : float (optional) Scale up the GP length scale by a factor `rho_factor` larger than the estimated `rotation_period` """ self.lc = light_curve self.pymc_model = None self.skip_n_points = skip_n_points self.rotation_period = rotation_period self.n_spots = n_spots self.verbose = verbose if min_time is None: min_time = self.lc.time.min() if max_time is None: max_time = self.lc.time.max() self.mask = (self.lc.time >= min_time) & (self.lc.time <= max_time) self.contrast = contrast self.scale_errors = scale_errors self.pymc_model = None self.pymc_gp = None self.pymc_gp_white = None self.pymc_gp_matern = None self._initialize_model(latitude_cutoff=latitude_cutoff, rho_factor=rho_factor, partition_lon=partition_lon) def _initialize_model(self, latitude_cutoff, partition_lon, rho_factor): """ Construct a PyMC3 model instance for use with samplers. Parameters ---------- latitude_cutoff : float Don't place spots above/below this number of degrees from the pole partition_lon : bool Enforce strict partitions on star in longitude for sampling rho_factor : float Scale up the GP length scale by a factor `rho_factor` larger than the estimated `rotation_period` """ with pm.Model(name='dot') as model: mean_func = MeanModel( self.lc, self.rotation_period, n_spots=self.n_spots, latitude_cutoff=latitude_cutoff, contrast=self.contrast, t0=self.lc.time[self.mask][::self.skip_n_points].mean(), partition_lon=partition_lon ) x = self.lc.time[self.mask][::self.skip_n_points] y = self.lc.flux[self.mask][::self.skip_n_points] yerr = self.scale_errors * self.lc.flux_err[self.mask][::self.skip_n_points] ls = rho_factor * self.rotation_period mean_err = yerr.mean() gp_white = pm.gp.Marginal(mean_func=mean_func, cov_func=pm.gp.cov.WhiteNoise(mean_err)) gp_matern = pm.gp.Marginal(cov_func=mean_err ** 2 * pm.gp.cov.Matern32(1, ls=ls)) gp = gp_white + gp_matern gp.marginal_likelihood("y", X=x[:, None], y=y, noise=yerr) self.pymc_model = model self.pymc_gp = gp self.pymc_gp_white = gp_white self.pymc_gp_matern = gp_matern self.mean_model = mean_func(x[:, None]) return self.pymc_model def __enter__(self): """ Mocking the pymc3 context manager for models """ self._check_model() return self.pymc_model.__enter__() def __exit__(self, exc_type, exc_val, exc_tb): """ Mocking the pymc3 context manager for models """ self._check_model() return self.pymc_model.__exit__(exc_type, exc_val, exc_tb) def _check_model(self): """ Check that a model instance exists on this object """ if self.pymc_model is None: raise ValueError('Must first call `Model._initialize_model` first.')
[docs] def sample_smc(self, draws, random_seed=42, **kwargs): """ Sample the posterior distribution of the model given the data using Sequential Monte Carlo. Parameters ---------- draws : int Draws for the SMC sampler random_seed : int Random seed parallel : bool If True, run in parallel cores : int If `parallel`, run on this many cores Returns ------- trace : `~pymc3.backends.base.MultiTrace` """ self._check_model() with DisableLogger(self.verbose): with self.pymc_model: trace = sample_smc(draws, random_seed=random_seed, **kwargs) return trace
[docs] def sample_nuts(self, trace_smc, draws, cores=96, target_accept=0.99, **kwargs): """ Sample the posterior distribution of the model given the data using the No U-Turn Sampler. Parameters ---------- trace_smc : `~pymc3.backends.base.MultiTrace` Results from the SMC sampler draws : int Draws for the SMC sampler cores : int Run on this many cores target_accept : float Increase this number up to unity to decrease divergences Returns ------- trace : `~pymc3.backends.base.MultiTrace` Results of the NUTS sampler """ self._check_model() with DisableLogger(self.verbose): with self.pymc_model: trace = pm.sample(draws, start=trace_smc.point(-1), cores=cores, target_accept=target_accept, **kwargs) summary = pm.summary(trace) return trace, summary
[docs] def optimize(self, start=None, plot=False, **kwargs): """ Optimize the free parameters in `Model` using `~scipy.optimize.minimize` via `~exoplanet.optimize` Thanks x1000 to Daniel Foreman-Mackey for making this possible. """ from exoplanet import optimize with self.pymc_model: map_soln = optimize(start=start, **kwargs) if plot: best_fit = self(map_soln) import matplotlib.pyplot as plt ax = plt.gca() ax.errorbar(self.lc.time[self.mask][::self.skip_n_points], self.lc.flux[self.mask][::self.skip_n_points], self.lc.flux_err[self.mask][::self.skip_n_points], fmt='.', color='k', ecolor='silver', label='obs') ax.plot(self.lc.time[self.mask][::self.skip_n_points], best_fit, label='dot') ax.set(xlabel='Time', ylabel='Flux') ax.legend(loc='lower left') return map_soln
[docs] def __call__(self, point=None, **kwargs): """ Evaluate the model with input parameters at ``point`` Thanks x1000 to Daniel Foreman-Mackey for making this possible. """ from exoplanet import eval_in_model with self.pymc_model: result = eval_in_model( self.mean_model, point=point, **kwargs ) return result