Source code for holoclean.learning.softmax

import torch
from torch.nn import Parameter, ParameterList
from torch.autograd import Variable
from torch import optim
from torch.nn.functional import softmax
from pyspark.sql.types import *
from tqdm import tqdm
import numpy as np


class LogReg(torch.nn.Module):
    """
    Class to generate weights
    """

    def _setup_weights(self):
        """
        Initializes weight tensor with random values
        ties init and dc weights if specified

        :return: Null
        """
        torch.manual_seed(42)
        # setup init
        self.weight_tensors = ParameterList()
        self.tensor_tuple = ()
        self.feature_id = []
        self.W = None
        for featurizer in self.featurizers:
            self.feature_id.append(featurizer.id)
            if featurizer.id == 'SignalInit':
                if self.tie_init:
                    signals_W = Parameter(torch.randn(1).expand(
                        1, self.output_dim))
                else:
                    signals_W = Parameter(torch.randn(1, self.output_dim))
            elif featurizer.id == 'SignalDC':
                if self.tie_dc:
                    signals_W = Parameter(
                        torch.randn(featurizer.count, 1).expand(
                            -1, self.output_dim))
                else:
                    signals_W = Parameter(
                        torch.randn(featurizer.count, self.output_dim))
            else:
                signals_W = Parameter(torch.randn(featurizer.count,
                                                  1).expand(-1,
                                                            self.output_dim))
            self.weight_tensors.append(signals_W)
        return

    def __init__(self, featurizers, input_dim_non_dc, input_dim_dc, output_dim,
                 tie_init, tie_dc):
        """
        Constructor for our logistic regression

        :param input_dim_non_dc: number of init + cooccur features
        :param input_dim_dc: number of dc features
        :param output_dim: number of classes
        :param tie_init: boolean, determines weight tying for init features
        :param tie_dc: boolean, determines weight tying for dc features
        """
        super(LogReg, self).__init__()

        self.featurizers = featurizers

        self.input_dim_non_dc = input_dim_non_dc
        self.input_dim_dc = input_dim_dc
        self.output_dim = output_dim

        self.tie_init = tie_init
        self.tie_dc = tie_dc

        self._setup_weights()

    def forward(self, X, index, mask):
        """
        Runs the forward pass of our logreg.

        :param X: values of the features
        :param index: indices to mask at
        :param mask: tensor to remove possibility of choosing unused class
        :return: output - X * W after masking
        """

        # Reties the weights - need to do on every pass

        self.concat_weights()

        # Calculates n x l matrix output
        output = X.mul(self.W)
        output = output.sum(1)
        # Changes values to extremely negative at specified indices
        if index is not None and mask is not None:
            output.index_add_(0, index, mask)
        return output

    def concat_weights(self):
        """
        Reties the weight
        """
        for feature_index in range(0, len(self.weight_tensors)):
            if self.feature_id[feature_index] == 'SignalInit':
                tensor = self.weight_tensors[feature_index].expand(
                    1, self.output_dim)
            elif self.feature_id[feature_index] == 'SignalDC':
                tensor = self.weight_tensors[feature_index].expand(
                    -1, self.output_dim)
            else:
                tensor = self.weight_tensors[feature_index].expand(
                    -1, self.output_dim)
            if feature_index == 0:
                self.W = tensor + 0
            else:
                self.W = torch.cat((self.W, tensor), 0)


[docs]class SoftMax: def __init__(self, session, X_training): """ Constructor for our softmax model :param X_training: x tensor used for training the model :param session: session object """ self.session = session self.dataengine = session.holo_env.dataengine self.dataset = session.dataset self.holo_obj = session.holo_env self.spark_session = self.holo_obj.spark_session query = "SELECT COUNT(*) AS dc FROM " + \ self.dataset.table_specific_name("Feature_id_map") + \ " WHERE Type = 'DC'" self.DC_count = self.dataengine.query(query, 1).collect()[0].dc dataframe_offset = self .dataengine.get_table_to_dataframe( "Dimensions_clean", self.dataset) list = dataframe_offset.collect() dimension_dict = {} for dimension in list: dimension_dict[dimension['dimension']] = dimension['length'] # X Tensor Dimensions (N * M * L) self.M = dimension_dict['M'] self.N = dimension_dict['N'] self.L = dimension_dict['L'] self.testM = None self.testN = None self.testL = None # pytorch tensors self.X = X_training self.mask = None self.testmask = None self.setupMask() self.Y = None self.grdt = None self._setupY() self.model = None # Will create the Y tensor of size NxL def _setupY(self): """ Initializes a y tensor to compare to our model's output :return: Null """ possible_values = self.dataengine .get_table_to_dataframe( "Observed_Possible_values_clean", self.dataset) .collect() self.Y = torch.zeros(self.N, 1).type(torch.LongTensor) for value in possible_values: self.Y[value.vid - 1, 0] = value.domain_id - 1 self.grdt = self.Y.numpy().flatten() return # Will create the X-value tensor of size nxmxl def _setupX(self, sparse=0): """ Initializes an X tensor of features for prediction :param sparse: 0 if dense tensor, 1 if sparse :return: Null """ feature_table = self .dataengine.get_table_to_dataframe( "Feature_clean", self.dataset).collect() if sparse: coordinates = torch.LongTensor() values = torch.FloatTensor([]) for factor in feature_table: coordinate = torch.LongTensor([[int(factor.vid) - 1], [int(factor.feature) - 1], [int(factor.assigned_val) - 1]]) coordinates = torch.cat((coordinates, coordinate), 1) value = factor['count'] values = torch.cat((values, torch.FloatTensor([value])), 0) self.X = torch.sparse\ .FloatTensor(coordinates, values, torch.Size([self.N, self.M, self.L])) else: self.X = torch.zeros(self.N, self.M, self.L) for factor in feature_table: self.X[factor.vid - 1, factor.feature - 1, factor.assigned_val - 1] = factor['count'] return
[docs] def setuptrainingX(self, sparse=0): """ Initializes an X tensor of features for training :param sparse: 0 if dense tensor, 1 if sparse :return: x tensor of features """ dataframe_offset = self.dataengine.get_table_to_dataframe( "Dimensions_dk", self.dataset) list = dataframe_offset.collect() dimension_dict = {} for dimension in list: dimension_dict[dimension['dimension']] = dimension['length'] # X Tensor Dimensions (N * M * L) self.testM = dimension_dict['M'] self.testN = dimension_dict['N'] self.testL = dimension_dict['L'] feature_table = self.dataengine.get_table_to_dataframe( "Feature_dk", self.dataset).collect() if sparse: coordinates = torch.LongTensor() values = torch.FloatTensor([]) for factor in feature_table: coordinate = torch.LongTensor([[int(factor.vid) - 1], [int(factor.feature) - 1], [int(factor.assigned_val) - 1]]) coordinates = torch.cat((coordinates, coordinate), 1) value = factor['count'] values = torch.cat((values, torch.FloatTensor([value])), 0) X = torch.sparse.FloatTensor(coordinates, values, torch.Size( [self.testN, self.testM, self.testL])) else: X = torch.zeros(self.testN, self.testM, self.testL) for factor in feature_table: X[factor.vid - 1, factor.feature - 1, factor.assigned_val - 1] = factor['count'] return X
[docs] def setupMask(self, clean=1, N=1, L=1): """ Initializes a masking tensor for ignoring impossible classes :param clean: 1 if clean cells, 0 if don't-know :param N: number of examples :param L: number of classes :return: masking tensor """ lookup = "Kij_lookup_clean" if clean else "Kij_lookup_dk" N = self.N if clean else N L = self.L if clean else L K_ij_lookup = self.dataengine.get_table_to_dataframe( lookup, self.dataset).select("vid", "k_ij").collect() mask = torch.zeros(N, L) for domain in K_ij_lookup: if domain.k_ij < L: mask[domain.vid - 1, domain.k_ij:] = -10e6 if clean: self.mask = mask else: self.testmask = mask return mask
[docs] def build_model(self, featurizers, input_dim_non_dc, input_dim_dc, output_dim, tie_init=True, tie_DC=True): """ Initializes the logreg part of our model :param input_dim_non_dc: number of init + cooccur features :param featurizers: list of featurizers :param input_dim_dc: number of dc features :param output_dim: number of classes :param tie_init: boolean to decide weight tying for init features :param tie_DC: boolean to decide weight tying for dc features :return: newly created LogReg model """ model = LogReg( featurizers, input_dim_non_dc, input_dim_dc, output_dim, tie_init, tie_DC,) return model
[docs] def train(self, model, loss, optimizer, x_val, y_val, mask=None): """ Trains our model on the clean cells :param model: logistic regression model :param loss: loss function used for evaluating performance :param optimizer: optimizer for our neural net :param x_val: x tensor - features :param y_val: y tensor - output for comparison :param mask: masking tensor :return: cost of traininng """ x = Variable(x_val, requires_grad=False) y = Variable(y_val, requires_grad=False) if mask is not None: mask = Variable(mask, requires_grad=False) index = torch.LongTensor(range(x_val.size()[0])) index = Variable(index, requires_grad=False) # Reset gradient optimizer.zero_grad() # Forward fx = model.forward(x, index, mask) output = loss.forward(fx, y.squeeze(1)) # Backward output.backward() # Update parameters optimizer.step() return output.data[0]
[docs] def predict(self, model, x_val, mask=None): """ Runs our model on the test set :param model: trained logreg model :param x_val: test x tensor :param mask: masking tensor to restrict domain :return: predicted classes with probabilities """ x = Variable(x_val, requires_grad=False) index = torch.LongTensor(range(x_val.size()[0])) index = Variable(index, requires_grad=False) if mask is not None: mask = Variable(mask, requires_grad=False) output = model.forward(x, index, mask) output = softmax(output, 1) return output
[docs] def logreg(self, featurizers): """ Trains our model on clean cells and predicts vals for clean cells :return: predictions """ n_examples, n_features, n_classes = self.X.size() self.model = self.build_model( featurizers, self.M - self.DC_count, self.DC_count, n_classes) loss = torch.nn.CrossEntropyLoss(size_average=True) optimizer = optim.SGD( self.model.parameters(), lr=self.holo_obj.learning_rate, momentum=self.holo_obj.momentum, weight_decay=self.holo_obj.weight_decay) # Experiment with different batch sizes. no hard rule on this batch_size = self.holo_obj.batch_size for i in tqdm(range(self.holo_obj.learning_iterations)): cost = 0. num_batches = n_examples // batch_size for k in range(num_batches): start, end = k * batch_size, (k + 1) * batch_size cost += self.train(self.model, loss, optimizer, self.X[start:end], self.Y[start:end], self.mask[start:end]) predY = self.predict(self.model, self.X, self.mask) map = predY.data.numpy().argmax(axis=1) if self.holo_obj.verbose: print("Epoch %d, cost = %f, acc = %.2f%%" % (i + 1, cost / num_batches, 100. * np.mean(map == self.grdt))) return self.predict(self.model, self.X, self.mask)
[docs] def save_prediction(self, Y): """ Stores our predicted values in the database :param Y: tensor with probability for each class :return: Null """ k_inferred = self.session.holo_env.k_inferred if k_inferred > Y.size()[1]: k_inferred = Y.size()[1] max_result = torch.topk(Y,k_inferred,1) max_indexes = max_result[1].data.tolist() max_prob = max_result[0].data.tolist() vid_to_value = [] df_possible_values = self.dataengine.get_table_to_dataframe( 'Possible_values_dk', self.dataset).select( "vid", "attr_name", "attr_val", "tid", "domain_id") # Save predictions upt to the specified k unless Prob = 0.0 for i in range(len(max_indexes)): for j in range(k_inferred): if max_prob[i][j]: vid_to_value.append([i + 1, max_indexes[i][j] + 1, max_prob[i][j]]) df_vid_to_value = self.spark_session.createDataFrame( vid_to_value, StructType([ StructField("vid1", IntegerType(), False), StructField("domain_id1", IntegerType(), False), StructField("probability", DoubleType(), False) ]) ) df1 = df_vid_to_value df2 = df_possible_values df_inference = df1.join( df2, [ df1.vid1 == df2.vid, df1.domain_id1 == df2.domain_id], 'inner')\ .drop("vid1","domain_id1") self.session.inferred_values = df_inference self.session.holo_env.logger.info\ ("The Inferred_values Data frame has been created") self.session.holo_env.logger.info(" ") return
[docs] def log_weights(self): """ Writes weights in the logger :return: Null """ self.model.concat_weights() weights = self.model.W self.session.holo_env.logger.info("Tensor weights") count = 0 for weight in \ torch.index_select( weights, 1, Variable(torch.LongTensor([0])) ): count += 1 msg = "Feature " + str(count) + ": " + str(weight[0].data[0]) self.session.holo_env.logger.info(msg) return