Source code for inferpy.inference.variational.vi

import numpy as np
import tensorflow as tf
import inspect
import itertools
from tensorflow_probability.python import edward2 as ed

from . import loss_functions
import inferpy as inf
from inferpy.queries import Query
from inferpy import util
from inferpy import contextmanager

from ..inference import Inference

from inferpy.data.loaders import build_sample_dict


[docs]class VI(Inference): def __init__(self, qmodel, loss='ELBO', optimizer='AdamOptimizer', epochs=1000): """Creates a new Variational Inference object. Args: qmodel (`inferpy.ProbModel`): The q model loss (`str` or `function`): A function that computes the loss tensor from the expanded variables of p and q models, or a `str` that refears to a function with that name in the package `inferpy.inference.variational.loss_function` optimizer (`str` or `tf.train.Optimizer`): An optimizer object from `tf.train` optimizers, or a string that refers to the name of an optimizer in such module or package epochs (`int`): The number of epochs to run in the gradient descent process """ # store the qmodel in self.qmodel. Can be a callable with no parameters which returns the qmodel if callable(qmodel): if len(inspect.signature(qmodel).parameters) > 0: raise ValueError("input qmodel can only be a callable object if this does not has any input parameter") self.qmodel = qmodel() else: self.qmodel = qmodel # store the loss function in self.loss_fn # if it is a string, build the object automatically from loss_functions package if isinstance(loss, str): self.loss_fn = getattr(loss_functions, loss) else: self.loss_fn = loss self.epochs = epochs # store the optimizer function in self.optimizer # if it is a string, build a new optimizer from tf.train (default parametrization) if isinstance(optimizer, str): self.optimizer = getattr(tf.train, optimizer)() else: self.optimizer = optimizer # pmodel not established yet self.pmodel = None # The size of the plate when expand the models self.plate_size = None # The tensor to optimize the tf.Variables self.train_tensor = None # expanded variables and parameters self.expanded_variables = {"p": None, "q": None} self.expanded_parameters = {"p": None, "q": None} # tf variable to enable the interception of Random Variables by edward2 for global and local hidden self.enable_interceptor_global = tf.Variable(False, trainable=False, name="inferpy-interceptor-global-enabled") self.enable_interceptor_local = tf.Variable(False, trainable=False, name="inferpy-interceptor-local-enabled") # and initialize it in the default session util.session.get_session().run(tf.variables_initializer( [self.enable_interceptor_global, self.enable_interceptor_local])) # list for storing the loss evolution class Debug: pass self.debug = Debug() self.debug.losses = []
[docs] def compile(self, pmodel, data_size, extra_loss_tensor=None): # set the used pmodel self.pmodel = pmodel # and the plate size, which matches the data size self.plate_size = data_size # create the train tensor self.train_tensor = self._generate_train_tensor(extra_loss_tensor, plate_size=self.plate_size)
[docs] def update(self, data): # data must be a sample dictionary sample_dict = build_sample_dict(data) # ensure that the size of the data matches with the self.plate_size data_size = util.iterables.get_plate_size(self.pmodel.vars, sample_dict) if data_size != self.plate_size: raise ValueError("The size of the data must be equal to the plate size: {}".format(self.plate_size)) t = [] sess = util.get_session() # reshape data in case it does not match exactly with the shape used when building the random variable # i.e.: (..., 1) dimension clean_sample_dict = {k: np.reshape(v, self.expanded_variables["p"][k].observed_value.shape.as_list()) for k, v in sample_dict.items()} with contextmanager.observe(self.expanded_variables["p"], clean_sample_dict): with contextmanager.observe(self.expanded_variables["q"], clean_sample_dict): for i in range(self.epochs): sess.run(self.train_tensor) t.append(sess.run(self.debug.loss_tensor)) if i % 200 == 0: print("\n {} epochs\t {}".format(i, t[-1]), end="", flush=True) if i % 10 == 0: print(".", end="", flush=True) # set the protected _losses attribute for the losses property self.debug.losses += t
@property def losses(self): return self.debug.losses
[docs] def get_interceptable_condition_variables(self): return (self.enable_interceptor_global, self.enable_interceptor_local)
[docs] def posterior(self, target_names=None, data={}): return Query(self.expanded_variables["q"], target_names, data)
[docs] def posterior_predictive(self, target_names=None, data={}): # posterior_predictive uses pmodel variables, but global hidden (parameters) intercepted with qmodel variables. return Query(self.expanded_variables["p"], target_names, data, # just interested in intercept the global parameters, not the local hidden enable_interceptor_variables=(self.enable_interceptor_global, None))
######################## # Auxiliar functions ######################## def _generate_train_tensor(self, extra_loss_tensor, **kwargs): """ This function expand the p and q models. Then, it uses the loss function to create the loss tensor and store it into the debug object as a new attribute. Then, uses the optimizer to create the train tensor used in the gradient descent iterative process. It store the expanded random variables and parameters from the p and q models in self.expanded_variables and self.expanded_parameters dicts. Returns: The `tf.Tensor` train tensor used in the gradient descent iterative process. """ # expand the p and q models # expand de qmodel qvars, qparams = self.qmodel.expand_model(self.plate_size) # expand de pmodel, using the intercept.set_values function, to include the sample_dict and the expanded qvars # the True first value enable to use tf.condition and observe RandomVariables modifying a tf.Variable value with ed.interception(util.interceptor.set_values(**qvars)): pvars, pparams = self.pmodel.expand_model(self.plate_size) # create the loss tensor and trainable tensor for the gradient descent process loss_tensor = self.loss_fn(pvars, qvars, **kwargs) # if extra_loss_tensor is not None, it must be a tensor with the inf.layers.Sequential losses if extra_loss_tensor is not None: loss_tensor += extra_loss_tensor # use the optimizer to create the train tensor train = self.optimizer.minimize(loss_tensor) # save the expanded variables and parameters self.expanded_variables = { "p": pvars, "q": qvars } self.expanded_parameters = { "p": pparams, "q": qparams } # save the loss tensor for debug purposes self.debug.loss_tensor = loss_tensor # Initialize all variables which are not in the probmodel p, because they have been initialized before model_variables = [v for v in itertools.chain( self.pmodel.params.values(), # do not re-initialize prior p model parameters pparams.values(), # do not re-initialize posterior p model parameters self.qmodel.params.values(), # do not re-initialize prior q model parameters qparams.values(), # do not re-initialize posterior q model parameters )] inf.get_session().run( tf.variables_initializer([ v for v in tf.global_variables() if v not in model_variables and not v.name.startswith("inferpy-") ])) return train