Source code for holoclean.errordetection.sql_dcerrordetector

from errordetector import ErrorDetection
from holoclean.global_variables import GlobalVariables
from holoclean.utils.parser_interface import DenialConstraint
import time


__metaclass__ = type


[docs]class SqlDCErrorDetection(ErrorDetection): """ This class is a subclass of ErrorDetection class and will returns don't know cells and clean cells based on the denial constraints """ def __init__(self, session): """ This constructor converts all denial constraints to the form of SQL constraints :param session: Holoclean session """ super(SqlDCErrorDetection, self).\ __init__(session.holo_env, session.dataset) self.session = session self.index = GlobalVariables.index_name self.dc_parser = session.parser self.operationsarr = DenialConstraint.operationsArr self.noisy_cells = None self.dc_objects = session.dc_objects self.Denial_constraints = session.Denial_constraints # Internals Methods @staticmethod def _is_symmetric(dc_name): """ Identifying symmetric denial constraint :param dc_name: denial constraint :return: boolean value """ result = True non_sym_ops = ['<=', '>=', '<', '>'] for op in non_sym_ops: if op in dc_name: result = False return result def _get_noisy_cells_for_dc(self, dc_name): """ Returns a dataframe that consist of index of noisy cells index and attribute :param dc_name: denial constraint :return: spark_dataframe """ if self.holo_obj.verbose: self.holo_obj.logger.info( 'Denial Constraint Queries For ' + dc_name) t3 = time.time() dc_object = self.dc_objects[dc_name] temp_table = "tmp" + self.dataset.dataset_id # Create Query for temp table query = "CREATE TABLE " + temp_table +\ " AS SELECT " for tuple_name in dc_object.tuple_names: query += tuple_name + "." + self.index + " as " + \ tuple_name + "_ind," query = query[:-1] query += " FROM " for tuple_name in dc_object.tuple_names: query += self.dataset.table_specific_name("Init") + \ " as " + tuple_name + "," query = query[:-1] query += " WHERE " if len(dc_object.tuple_names) == 2: query += dc_object.tuple_names[0] + "." + self.index + \ " != " + dc_object.tuple_names[1] + "." + self.index + " AND " query += dc_object.cnf_form self.dataengine.query(query) t4 = time.time() if self.holo_obj.verbose: self.holo_obj.logger.\ info("Time for executing query " + dc_name + ":" + str(t4-t3)) # For each predicate add attributes tuple_attributes = {} for tuple_name in dc_object.tuple_names: tuple_attributes[tuple_name] = set() for predicate in dc_object.predicates: for component in predicate.components: if isinstance(component, str): pass else: tuple_attributes[component[0]].add(component[1]) tuple_attributes_lists = {} tuple_attributes_dfs = {} for tuple_name in dc_object.tuple_names: tuple_attributes_lists[tuple_name] = [[i] for i in tuple_attributes[tuple_name]] tuple_attributes_dfs[ tuple_name] = self.spark_session.createDataFrame( tuple_attributes_lists[tuple_name], ['attr_name']) name = self.dataset.table_specific_name(tuple_name + "_attributes") attribute_dataframe = tuple_attributes_dfs[tuple_name] self.dataengine.dataframe_to_table(name, attribute_dataframe) distinct = \ "(SELECT DISTINCT " + tuple_name + "_ind " \ " FROM " + \ temp_table + ") AS row_table" query = "INSERT INTO " + \ self.dataset.table_specific_name("C_dk_temp") + \ " SELECT row_table. " + tuple_name + "_ind as ind," \ " a.attr_name as attr FROM " + \ name + \ " AS a," + \ distinct self.dataengine.query(query) self.holo_obj.logger.info('Denial Constraint Query Left ' + dc_name + ":" + query) drop_temp_table = "DROP TABLE " + name self.dataengine.query(drop_temp_table) drop_temp_table = "DROP TABLE " + temp_table self.dataengine.query(drop_temp_table) def _get_sym_noisy_cells_for_dc(self, dc_name): """ Returns a dataframe that consists of index of noisy cells index, attribute :param dc_name: denial constraint :return: spark_dataframe """ self.holo_obj.logger.info('Denial Constraint Queries For ' + dc_name) temp_table = "tmp" + self.dataset.dataset_id query = "CREATE TABLE " + \ temp_table + " AS SELECT " \ "t1." + self.index + \ " as t1_ind, " \ "t2." + self.index + \ " as t2_ind " \ " FROM " + \ self.dataset.table_specific_name("Init") + \ " as t1, " + \ self.dataset.table_specific_name("Init") + \ " as t2 " + "WHERE t1." + self.index + \ " != t2." + self.index + \ " AND " + dc_name self.dataengine.query(query) t1_attributes = set() dc_predicates = self.dictionary_dc[dc_name] for predicate_index in range(0, len(dc_predicates)): predicate_type = dc_predicates[predicate_index][4] # predicate_type 0 : we do not have a literal in this predicate # predicate_type 1 : literal on the left side of the predicate # predicate_type 2 : literal on the right side of the predicate if predicate_type == 0: relax_indices = range(2, 4) elif predicate_type == 1: relax_indices = range(3, 4) elif predicate_type == 2: relax_indices = range(2, 3) else: raise ValueError( 'predicate type can only be 0: ' 'if the predicate does not have a literal' '1: if the predicate has a literal in the left side,' '2: if the predicate has a literal in right side' ) for relax_index in relax_indices: name_attribute = \ dc_predicates[predicate_index][relax_index].split(".") if name_attribute[0] == "t1": t1_attributes.add(name_attribute[1]) left_attributes = [[i] for i in t1_attributes] t1_attributes_dataframe = self.spark_session.createDataFrame( left_attributes, ['attr_name']) t1_name = self.dataset.table_specific_name("T1_attributes") self.dataengine.dataframe_to_table(t1_name, t1_attributes_dataframe) # Left part of predicates distinct_left = \ "(SELECT DISTINCT t1_ind FROM " + temp_table + ") AS row_table" query_left = "INSERT INTO " + \ self.dataset.table_specific_name("C_dk_temp") + \ " SELECT row_table.t1_ind as ind," \ " a.attr_name as attr FROM " + \ t1_name + \ " AS a," + \ distinct_left self.dataengine.query(query_left) self.holo_obj.logger.info('Denial Constraint Query Left ' + dc_name + ":" + query_left) drop_temp_table = "DROP TABLE " + temp_table self.dataengine.query(drop_temp_table) # Getters
[docs] def get_noisy_cells(self): """ Returns a dataframe that consists of index of noisy cells index, attribute :return: spark_dataframe """ table_name = self.dataset.table_specific_name("C_dk_temp") query_for_creation_table = "CREATE TABLE " + table_name + \ "(ind INT, attr VARCHAR(255));" self.dataengine.query(query_for_creation_table) for dc_name in self.dc_objects: self._get_noisy_cells_for_dc(dc_name) c_dk_drataframe = self.dataengine.\ get_table_to_dataframe("C_dk_temp", self.dataset) self.noisy_cells = c_dk_drataframe['ind', 'attr'].distinct() return self.noisy_cells
[docs] def get_clean_cells(self): """ Returns a dataframe that consists of index of clean cells index, attribute :return: spark dataframe """ c_clean_dataframe = self.session.init_flat.\ subtract(self.noisy_cells) return c_clean_dataframe