#!/usr/bin/env python
import logging
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
import time
import torch.nn.functional as F
import torch
from dataengine import DataEngine
from dataset import Dataset
from featurization.database_worker import DatabaseWorker, PopulateTensor
from utils.pruning import Pruning
from utils.parser_interface import ParserInterface, DenialConstraint
import multiprocessing
from errordetection.errordetector_wrapper import ErrorDetectorsWrapper
from featurization.initfeaturizer import SignalInit
from featurization.dcfeaturizer import SignalDC
from featurization.cooccurrencefeaturizer import SignalCooccur
from global_variables import GlobalVariables
from learning.softmax import SoftMax
from learning.accuracy import Accuracy
# Defining arguments for HoloClean
arguments = [
(('-path', '--holoclean_path'),
{'metavar': 'HOLOCLEAN_PATH',
'dest': 'holoclean_path',
'default': '.',
'type': str,
'help': 'Path of HoloCLean'}),
(('-u', '--db_user'),
{'metavar': 'DB_USER',
'dest': 'db_user',
'default': 'holocleanuser',
'type': str,
'help': 'User for DB used to persist state'}),
(('-p', '--password', '--pass'),
{'metavar': 'PASSWORD',
'dest': 'db_pwd',
'default': 'abcd1234',
'type': str,
'help': 'Password for DB used to persist state'}),
(('-h', '--host'),
{'metavar': 'HOST',
'dest': 'db_host',
'default': 'localhost',
'type': str,
'help': 'Host for DB used to persist state'}),
(('-d', '--database'),
{'metavar': 'DATABASE',
'dest': 'db_name',
'default': 'holo',
'type': str,
'help': 'Name of DB used to persist state'}),
(('-m', '--mysql_driver'),
{'metavar': 'MYSQL_DRIVER',
'dest': 'mysql_driver',
'default': 'holoclean/lib/mysql-connector-java-5.1.44-bin.jar',
'type': str,
'help': 'Path for MySQL driver'}),
(('-pg', '--pg_driver'),
{'metavar': 'PG_DRIVER',
'dest': 'pg_driver',
'default': 'holoclean/lib/postgresql-42.2.2.jar',
'type': str,
'help': 'Path for Postgresql PySpark driver'}),
(('-s', '--spark_cluster'),
{'metavar': 'SPARK',
'dest': 'spark_cluster',
'default': None,
'type': str,
'help': 'Spark cluster address'}),
(('-k', '--first_k'),
{'metavar': 'FIRST_K',
'dest': 'first_k',
'default': 1,
'type': int,
'help': 'The final output will show the k-first results '
'(if it is 0 it will show everything)'}),
(('-l', '--learning-rate'),
{'metavar': 'LEARNING_RATE',
'dest': 'learning_rate',
'default': 0.001,
'type': float,
'help': 'The learning rate holoclean will use during training'}),
(('-pt1', '--pruning-threshold1'),
{'metavar': 'PRUNING_THRESHOLD1',
'dest': 'pruning_threshold1',
'default': 0.0,
'type': float,
'help': 'Threshold1 used for domain pruning step'}),
(('-pt2', '--pruning-threshold2'),
{'metavar': 'PRUNING_THRESHOLD2',
'dest': 'pruning_threshold2',
'default': 0.1,
'type': float,
'help': 'Threshold2 used for domain pruning step'}),
(('-pdb', '--pruning-dk-breakoff'),
{'metavar': 'DK_BREAKOFF',
'dest': 'pruning_dk_breakoff',
'default': 5,
'type': float,
'help': 'DK breakoff used for domain pruning step'}),
(('-pcb', '--pruning-clean-breakoff'),
{'metavar': 'CLEAN_BREAKOFF',
'dest': 'pruning_clean_breakoff',
'default': 5,
'type': float,
'help': 'Clean breakoff used for domain pruning step'}),
(('-it', '--learning-iterations'),
{'metavar': 'LEARNING_ITERATIONS',
'dest': 'learning_iterations',
'default': 20,
'type': float,
'help': 'Number of iterations used for softmax'}),
(('-w', '--weight_decay'),
{'metavar': 'WEIGHT_DECAY',
'dest': 'weight_decay',
'default': 0.9,
'type': float,
'help': 'The L2 penalty HoloClean will use during training'}),
(('-p', '--momentum'),
{'metavar': 'MOMENTUM',
'dest': 'momentum',
'default': 0.0,
'type': float,
'help': 'The momentum term in the loss function'}),
(('-b', '--batch-size'),
{'metavar': 'BATCH_SIZE',
'dest': 'batch_size',
'default': 1,
'type': int,
'help': 'The batch size during training'}),
(('-ki', '--k-inferred'),
{'metavar': 'K_INFERRED',
'dest': 'k_inferred',
'default': 1,
'type': int,
'help': 'Number of inferred values'}),
(('-t', '--timing-file'),
{'metavar': 'TIMING_FILE',
'dest': 'timing_file',
'default': None,
'type': str,
'help': 'File to save timing infomrmation'}),
]
# Flags for Holoclean mode
flags = [
(('-q', '--quiet'),
{'default': False,
'dest': 'quiet',
'action': 'store_true',
'help': 'quiet'}),
(tuple(['--verbose']),
{'default': False,
'dest': 'verbose',
'action': 'store_true',
'help': 'verbose'})
]
[docs]class HoloClean:
"""
Main entry point for HoloClean which creates a HoloClean Data Engine
and initializes Spark.
"""
def __init__(self, **kwargs):
"""
Constructor for Holoclean
:param kwargs: arguments for HoloClean
"""
# Initialize default execution arguments
arg_defaults = {}
for arg, opts in arguments:
if 'directory' in arg[0]:
arg_defaults['directory'] = opts['default']
else:
arg_defaults[opts['dest']] = opts['default']
# Initialize default execution flags
for arg, opts in flags:
arg_defaults[opts['dest']] = opts['default']
for key in kwargs:
arg_defaults[key] = kwargs[key]
# Initialize additional arguments
for (arg, default) in arg_defaults.items():
setattr(self, arg, kwargs.get(arg, default))
if self.verbose:
logging.basicConfig(filename="logger.log",
filemode='w', level=logging.INFO)
else:
logging.basicConfig(filename="logger.log",
filemode='w', level=logging.ERROR)
self.logger = logging.getLogger("__main__")
# Initialize dataengine and spark session
self.spark_session, self.spark_sql_ctxt = self._init_spark()
self.dataengine = self._init_dataengine()
# Init empty session collection
self.session = {}
# Internal methods
def _init_dataengine(self):
"""
Creates dataengine object
:return: dataengine
"""
data_engine = DataEngine(self)
return data_engine
def _init_spark(self):
"""
Set spark configuration
:return: Spark session
:return: Spark context
"""
conf = SparkConf()
# Link PG driver to Spark
conf.set("spark.executor.extraClassPath",
self.holoclean_path + "/" + self.pg_driver)
conf.set("spark.driver.extraClassPath",
self.holoclean_path + "/" + self.pg_driver)
conf.set('spark.driver.memory', '20g')
conf.set('spark.executor.memory', '20g')
conf.set("spark.network.timeout", "6000")
conf.set("spark.rpc.askTimeout", "99999")
conf.set("spark.worker.timeout", "60000")
conf.set("spark.driver.maxResultSize", '70g')
conf.set("spark.ui.showConsoleProgress", "false")
if self.spark_cluster:
conf.set("spark.master", self.spark_cluster)
# Gets Spark context
sc = SparkContext(conf=conf)
sc.setLogLevel("OFF")
sql_ctxt = SQLContext(sc)
return sql_ctxt.sparkSession, sql_ctxt
[docs]class Session:
"""
Session class controls the entire pipeline of HoloClean
"""
def __init__(self, holo_env, name="session"):
"""
Constructor for Holoclean session
:param holo_env: Holoclean object
:param name: Name for the Holoclean session
"""
logging.basicConfig()
# Initialize members
self.name = name
self.holo_env = holo_env
self.Denial_constraints = [] # Denial Constraint strings
self.dc_objects = {} # Denial Constraint Objects
self.featurizers = []
self.error_detectors = []
self.cv = None
self.pruning = None
self.dataset = Dataset()
self.parser = ParserInterface(self)
self.inferred_values = None
self.feature_count = 0
[docs] def load_data(self, file_path):
"""
Loads a dataset from file into the database
:param file_path: path to data file
:return: pyspark dataframe
"""
if self.holo_env.verbose:
start = time.time()
self._ingest_dataset(file_path)
init = self.init_dataset
if self.holo_env.verbose:
end = time.time()
log = 'Time to Load Data: ' + str(end - start) + '\n'
print(log)
attributes = self.dataset.get_schema('Init')
for attribute in attributes:
self.holo_env.dataengine.add_db_table_index(
self.dataset.table_specific_name('Init'), attribute)
return init
[docs] def load_denial_constraints(self, file_path):
"""
Loads denial constraints from line-separated txt file
:param file_path: path to dc file
:return: string array of dc's
"""
new_denial_constraints, new_dc_objects = \
self.parser.load_denial_constraints(
file_path, self.Denial_constraints)
self.Denial_constraints.extend(new_denial_constraints)
self.dc_objects.update(new_dc_objects)
return self.Denial_constraints
[docs] def add_denial_constraint(self, dc):
"""
Adds denial constraints from string into
self.Denial_constraints
:param dc: string in dc format
:return: string array of dc's
"""
dc_object = DenialConstraint(dc, self.dataset.get_schema("Init"))
self.Denial_constraints.append(dc)
self.dc_objects[dc] = dc_object
return self.Denial_constraints
[docs] def remove_denial_constraint(self, index):
"""
Removing the denial constraint at index
:param index: index in list
:return: string array of dc's
"""
if len(self.Denial_constraints) == 0:
raise IndexError("No Denial Constraints Have Been Defined")
if index < 0 or index >= len(self.Denial_constraints):
raise IndexError("Given Index Out Of Bounds")
self.dc_objects.pop(self.Denial_constraints[index])
return self.Denial_constraints.pop(index)
[docs] def load_clean_data(self, file_path):
"""
Loads pre-defined clean cells from csv
:param file_path: path to file
:return: spark dataframe of clean cells
"""
clean = self.holo_env.spark_session.read.csv(file_path, header=True)
self.holo_env.dataengine.add_db_table('C_clean', clean, self.dataset)
return clean
[docs] def load_dirty_data(self, file_path):
"""
Loads pre-defined dirty cells from csv
:param file_path: path to file
:return: spark dataframe of dirty cells
"""
dirty = self.holo_env.spark_session.read.csv(file_path, header=True)
self.holo_env.dataengine.add_db_table('C_dk', dirty, self.dataset)
return dirty
[docs] def detect_errors(self, detector_list):
"""
Separates cells that violate DC's from those that don't
:param detector_list: List of error detectors
:return: clean dataframe
:return: don't know dataframe
"""
if self.holo_env.verbose:
start = time.time()
for detector in detector_list:
err_detector = ErrorDetectorsWrapper(detector)
self._add_error_detector(err_detector)
self._ds_detect_errors()
clean = self.clean_df
dk = self.dk_df
if self.holo_env.verbose:
end = time.time()
log = 'Time for Error Detection: ' + str(end - start) + '\n'
print(log)
return clean, dk
[docs] def repair(self):
"""
Repairs the initial data includes pruning, featurization, and softmax
:return: repaired dataset
"""
if self.holo_env.verbose:
start = time.time()
self._ds_domain_pruning(self.holo_env.pruning_threshold1,
self.holo_env.pruning_threshold2,
self.holo_env.pruning_dk_breakoff,
self.holo_env.pruning_clean_breakoff)
if self.holo_env.verbose:
end = time.time()
log = 'Time for Domain Pruning: ' + str(end - start) + '\n'
self.holo_env.logger.info("Total pruning time:"+str(end - start))
print log
start = time.time()
init_signal = SignalInit(self)
self._add_featurizer(init_signal)
dc_signal = SignalDC(self.Denial_constraints, self)
self._add_featurizer(dc_signal)
cooccur_signal = SignalCooccur(self)
self._add_featurizer(cooccur_signal)
# Trying to infer or to learn catch errors when tensors are none
try:
self._ds_featurize(clean=1)
if self.holo_env.verbose:
end = time.time()
log = 'Time for Featurization: ' + str(end - start) + '\n'
print log
self.holo_env.logger.info("Time for Featurization:" +
str(end - start))
start = time.time()
soft = SoftMax(self, self.X_training)
soft.logreg(self.featurizers)
if self.holo_env.verbose:
end = time.time()
log = 'Time for Training Model: ' + str(end - start) + '\n'
print(log)
self.holo_env.logger.info('Time for Training Model: ' +
str(end - start))
start = time.time()
except Exception as e:
self.holo_env.logger.\
error('Error Creating Training Tensor: nothing to learn',
exc_info=e)
try:
self._ds_featurize(clean=0)
if self.holo_env.verbose:
end = time.time()
log = 'Time for Test Featurization: ' + str(end - start) + '\n'
print(log)
self.holo_env.logger.\
info('Time for Test Featurization dk: ' + str(end - start))
start = time.time()
Y = soft.predict(soft.model, self.X_testing,
soft.setupMask(0, self.N, self.L))
soft.save_prediction(Y)
if self.holo_env.verbose:
end = time.time()
log = 'Time for Inference: ' + str(end - start) + '\n'
print(log)
self.holo_env.logger.info('Time for Inference: ' +
str(end - start))
soft.log_weights()
except Exception as e:
self.holo_env.logger.\
error('Error Creating Prediction Tensor: nothing to infer',
exc_info=e)
return self._create_corrected_dataset()
[docs] def compare_to_truth(self, truth_path):
"""
Compares our repaired set to the truth prints precision and recall
:param truth_path: path to clean version of dataset
"""
acc = Accuracy(self, truth_path)
acc.accuracy_calculation()
def _ingest_dataset(self, src_path):
"""
Ingests a dataset from given source path
:param src_path: string literal of path to the .csv file of the dataset
:return: Null
"""
self.holo_env.logger.info('ingesting file:' + src_path)
self.init_dataset, self.attribute_map = \
self.holo_env.dataengine.ingest_data(src_path, self.dataset)
self.holo_env.logger.info(
'creating dataset with id:' +
self.dataset.print_id())
all_attr = self.dataset.get_schema('Init')
all_attr.remove(GlobalVariables.index_name)
number_of_tuples = len(self.init_dataset.collect())
tuples = [[i] for i in range(1, number_of_tuples + 1)]
attr = [[a] for a in all_attr]
tuples_dataframe = self.holo_env.spark_session.createDataFrame(
tuples, ['ind'])
attr_dataframe = self.holo_env.spark_session.createDataFrame(
attr, ['attr'])
self.init_flat = tuples_dataframe.crossJoin(attr_dataframe)
return
def _add_featurizer(self, new_featurizer):
"""
Add a new featurizer
:param new_featurizer: Derived class of Featurizer Object
representing one Feature Signal being used
:return: Null
"""
self.holo_env.logger.info('getting new signal for featurization...')
self.featurizers.append(new_featurizer)
self.holo_env.logger.info(
'getting new signal for featurization is finished')
return
def _add_error_detector(self, new_error_detector):
"""
Add a new error detector
:param new_error_detector : Derived class of ErrorDetectors
Object representing a method of separating the dataset into
a clean set and a dirty set
:return: Null
"""
self.error_detectors.append(new_error_detector)
self.holo_env.logger.info('Added new error detection')
return
# Methods data
def _ds_detect_errors(self):
"""
Using added error detectors to split the dataset into clean and
dirty sets, must be called after add_error_detector
:return: Null
"""
clean_cells = []
dk_cells = []
self.holo_env.logger.info('starting error detection...')
# Get clean and dk cells from each error detector
for err_detector in self.error_detectors:
temp = err_detector.get_noisy_dknow_dataframe()
clean_cells.append(temp[1])
dk_cells.append(temp[0])
# Union all dk cells and intersect all clean cells
num_of_error_detectors = len(dk_cells)
union_dk_cells = dk_cells[0]
intersect_clean_cells = clean_cells[0]
for detector_counter in range(1, num_of_error_detectors):
union_dk_cells = union_dk_cells.union(
dk_cells[detector_counter])
intersect_clean_cells = intersect_clean_cells.intersect(
clean_cells[detector_counter])
del dk_cells
del clean_cells
self.clean_df = intersect_clean_cells
# Persist all clean and dk cells to Database
self.holo_env.dataengine.add_db_table(
'C_clean', intersect_clean_cells, self.dataset)
self.holo_env.logger.info('The table: ' +
self.dataset.table_specific_name('C_clean') +
" has been created")
self.holo_env.logger.info(" ")
self.dk_df = union_dk_cells
self.holo_env.dataengine.add_db_table(
'C_dk', union_dk_cells, self.dataset)
self.holo_env.logger.info('The table: ' +
self.dataset.table_specific_name('C_dk') +
" has been created")
self.holo_env.logger.info(" ")
self.holo_env.logger.info('error detection is finished')
return
def _ds_domain_pruning(self, pruning_threshold1, pruning_threshold2,
pruning_dk_breakoff, pruning_clean_breakoff):
"""
Prunes domain based off of threshold 1 to give each cell
repair candidates
:param pruning_threshold1:
Float variable to limit possible values for training data
:param pruning_threshold2: Float variable to
limit possible values for dirty data
:param pruning_dk_breakoff: to limit possible values for dont
know cells to less than k values
:param pruning_clean_breakoff: to limit possible values for
training data to less than k values
:return: Null
"""
self.holo_env.logger.info(
'starting domain pruning with threshold %s',
pruning_threshold1)
self.pruning = Pruning(
self,
pruning_threshold1, pruning_threshold2,
pruning_dk_breakoff, pruning_clean_breakoff)
self.holo_env.logger.info('Domain pruning is finished :')
return
def _parallel_queries(self,
number_of_threads=multiprocessing.cpu_count(),
clean=1):
"""
Runs queries in parallel
:param number_of_threads: the number of thread for parallel execution
:param clean: flag to execute queries for clean or don't know data
:return: Null
"""
list_of_threads = []
DatabaseWorker.table_names = []
for i in range(0, number_of_threads):
list_of_threads.append(DatabaseWorker(self))
for thread in list_of_threads:
thread.start()
for thread in list_of_threads:
thread.join()
feature_tables = list(DatabaseWorker.table_names)
self._create_dimensions(clean)
try:
X_tensor = torch.zeros(self.N, self.M, self.L)
# Creates Threads to populate tensor
tensor_threads = []
for thread_num in range(len(list_of_threads)):
thread = \
PopulateTensor(feature_tables[thread_num], X_tensor, self)
tensor_threads.append(thread)
thread.start()
for feature in self.featurizers:
if feature.direct_insert:
feature.insert_to_tensor(X_tensor, clean)
# Waits for Threads that are populating tensor to be finished
for thread in tensor_threads:
thread.join()
X_tensor = F.normalize(X_tensor, p=2, dim=1)
if clean:
self.X_training = X_tensor
self.holo_env.logger.\
info("The X-Tensor_training has been created")
self.holo_env.logger.info(" ")
else:
self.X_testing = X_tensor
self.holo_env.logger.\
info("The X-Tensor_testing has been created")
self.holo_env.logger.info(" ")
except Exception:
raise Exception("Error creating Tensor")
return
def _ds_featurize(self, clean=1):
"""
Extracting dataset features and creates the X tensor for learning
or for inferrence
Tables created (clean=1): Dimensions_clean
Tables created (clean=0): Dimensions_dk
:param clean: Optional, default=1, if clean = 1 then
the featurization is for clean cells otherwise dirty cells
:return: Null
"""
self.holo_env.logger.info('Start executing queries for featurization')
self.holo_env.logger.info(' ')
num_of_threads = multiprocessing.cpu_count()
# Produce all queries needed to be run for featurization
for featurizer in self.featurizers:
queries_to_add = featurizer.get_query(clean)
for query in queries_to_add:
DatabaseWorker.queries.append(query)
# Create multiple threads to execute all queries
self._parallel_queries(num_of_threads, clean)
def _create_dimensions(self, clean=1):
"""
Finding the dimension for the tensor
:param clean: Optional, default=1, if clean = 1 then
the featurization is for clean cells otherwise dirty cells
:return: Null
"""
dimensions = 'Dimensions_clean' if clean == 1 else 'Dimensions_dk'
possible_values = 'Possible_values_clean' if clean == 1 \
else 'Possible_values_dk'
feature_id_map = 'Feature_id_map'
query_for_create_offset = "CREATE TABLE \
" + self.dataset.table_specific_name(dimensions) \
+ "(dimension VARCHAR(255), length INT);"
self.holo_env.dataengine.query(query_for_create_offset)
insert_signal_query = \
"INSERT INTO " + self.dataset.table_specific_name(dimensions) + \
" SELECT 'N' as dimension, (" \
" SELECT MAX(vid) " \
"FROM " + \
self.dataset.table_specific_name(possible_values) + \
") as length;"
self.holo_env.dataengine.query(insert_signal_query)
insert_signal_query = \
"INSERT INTO " + self.dataset.table_specific_name(dimensions) + \
" SELECT 'M' as dimension, (" \
" SELECT COUNT(*) " \
"FROM " \
+ self.dataset.table_specific_name(feature_id_map) + \
") as length;"
self.holo_env.dataengine.query(insert_signal_query)
insert_signal_query = \
"INSERT INTO " + self.dataset.table_specific_name(dimensions) + \
" SELECT 'L' as dimension, MAX(m) as length " \
"FROM (" \
" SELECT MAX(k_ij) m FROM " \
+ self.dataset.table_specific_name('Kij_lookup_clean') + \
" UNION " \
" SELECT MAX(k_ij) as m " \
"FROM " \
+ self.dataset.table_specific_name('Kij_lookup_dk') + \
" ) k_ij_union;"
self.holo_env.dataengine.query(insert_signal_query)
if clean:
dataframe_offset = \
self.holo_env.dataengine.get_table_to_dataframe(
"Dimensions_clean", self.dataset)
list = dataframe_offset.collect()
dimension_dict = {}
for dimension in list:
dimension_dict[dimension['dimension']] = dimension['length']
self.M = dimension_dict['M']
self.N = dimension_dict['N']
self.L = dimension_dict['L']
else:
dataframe_offset = self.holo_env.dataengine.get_table_to_dataframe(
"Dimensions_dk", self.dataset)
list = dataframe_offset.collect()
dimension_dict = {}
for dimension in list:
dimension_dict[dimension['dimension']] = dimension['length']
# X Tensor Dimensions (N * M * L)
self.M = dimension_dict['M']
self.N = dimension_dict['N']
self.L = dimension_dict['L']
return
def _create_corrected_dataset(self):
"""
Will re-create the original dataset with the repaired values
and save it to Repaired_dataset table in Postgress
Need to create the MAP first if we inferred more than 1 value
:return: the original dataset with the repaired values from the
Inferred_values table
"""
if self.inferred_values:
# Should not be empty has at least initial values
# Persist it to compute MAP as max prediction
self.holo_env.dataengine.add_db_table ("Inferred_values",
self.inferred_values,
self.dataset)
else:
self.holo_env.logger.info(" No Inferred_values ")
self.holo_env.logger.info(" ")
return
init = self.init_dataset.collect()
attribute_map = {}
index = 0
for attribute in self.dataset.attributes['Init']:
attribute_map[attribute] = index
index += 1
corrected_dataset = []
for i in range(len(init)):
row = []
for attribute in self.dataset.attributes['Init']:
row.append(init[i][attribute])
corrected_dataset.append(row)
# Compute MAP if k_inferred > 1
if self.holo_env.k_inferred > 1:
map_inferred_query = "SELECT I.* FROM " + \
self.dataset.table_specific_name(
'Inferred_Values') + " AS I , (SELECT " \
"tid, attr_name, " \
"MAX(probability) " \
"AS max_prob FROM " + \
self.dataset.table_specific_name(
'Inferred_Values') + " GROUP BY tid, " \
"attr_name) AS M " \
"WHERE I.tid = " \
"M.tid AND " \
"I.attr_name = " \
"M.attr_name AND " \
"I.probability = " \
"M.max_prob"
self.inferred_map = self.holo_env.dataengine.query(
map_inferred_query, 1)
else:
# MAP is the inferred values when inferred k = 1
self.inferred_map = self.inferred_values
# persist another copy for now
# needed for accuracy calculations
self.holo_env.dataengine.add_db_table("Inferred_map",
self.inferred_map,
self.dataset)
# the corrections
value_predictions = self.inferred_map.collect()
# Replace values with the inferred values from multi value predictions
if value_predictions:
for j in range(len(value_predictions)):
tid = value_predictions[j]['tid'] - 1
column = attribute_map[value_predictions[j]['attr_name']]
corrected_dataset[tid][column] =\
value_predictions[j]['attr_val']
correct_dataframe = \
self.holo_env.spark_sql_ctxt.createDataFrame(
corrected_dataset, self.dataset.attributes['Init'])
self.holo_env.dataengine.add_db_table("Repaired_dataset",
correct_dataframe, self.dataset)
self.holo_env.logger.info("The Inferred_values, MAP, "
"and Repaired tables have been created")
self.holo_env.logger.info(" ")
return correct_dataframe