bulk_insert.py 3.71 KB
#GOAL insert a csv into a temp table in Clarity
#This library takes a dataframe and imports it into Clarity

import pandas as pd
import numpy as np
import sqlalchemy


def create_table_sql(table_name, column_def):
    create_table_sql = """
    DROP TABLE IF EXISTS {table_name};
    CREATE TABLE {table_name} ({table_column_def});
    """.format(table_name=table_name, table_column_def=column_def)
    return create_table_sql


def create_and_import(data, table_name, table_def, conn, max_insert=1000):
    import time 
    start = time.time()
    cts = create_table_sql(table_name, table_def)
    conn.execute(cts)
    dtypes = get_dtypes_from_table_def(table_def)
    insert_sql_generator = generate_insert_sql(table_name, data.columns, dtypes, data, max_insert=max_insert)
    fline = 0
    for insert_chunk_sql in insert_sql_generator:
        try:
            #TODO - handle this so long SQL insert doesn't obscure the screen
            conn.execute(insert_chunk_sql)   
#        except sqlalchemy.exc.OperationalError as e:
#            raise e
        except Exception as e:
            print("There was an exception during insert:\n" + e._message() + e.statement[:500] + '\n')
            print("Aborting create_and_import()")
            return False
    end = time.time()
    duration = end - start
    line_cnt = len(data)
    print("{} lines imported in {:.1f} s".format(line_cnt, duration))

#WARNING - can't handle table names with spaces in them
def get_dtypes_from_table_def(table_def):
    import re
    column_defs = [ (var_def).strip() for var_def in table_def.split(',') ]
    column_types = []
    for column_def in column_defs:
        if re.search(r'VARCHAR', column_def):
            column_types.append('STR')
        elif re.search(r'INT', column_def):
            column_types.append('NUM')
        elif re.search(r'DATETIME', column_def):
            column_types.append('DT')
        else:
            raise Exception("Unrecognized column definition data type")
    return column_types


def format_data_for_insert(rows, column_types):
    data_formatted = ""
    for row in rows:
        row_fmt = zip(row, column_types)
        items_fmttd = []
        for (item, fmt) in row_fmt:
            if item is np.nan:
                item_fmttd = 'NULL'
            elif (fmt == 'STR') | (fmt == 'DT'):
                item_fmttd = "'{}'".format(item)
            else:
                item_fmttd = "{}".format(item)
            items_fmttd.append(item_fmttd)
        row_formatted = ",".join([str(item) for item in items_fmttd])
        row_formatted = "({})".format(row_formatted)
        data_formatted += row_formatted + ",\n"
    data_formatted = data_formatted[0:-2]
    return data_formatted


def generate_insert_sql(table_name, column_names, column_types, data, max_insert=1000):
    if type(data) is pd.core.frame.DataFrame:
        data = data.values

    import math
    num_splits = math.ceil(len(data)/max_insert)    
    data_split = np.array_split(data, num_splits)

    column_names_str = ','.join(column_names)
    insert_chunk_sql_template = "INSERT INTO {table} ({column_names_str})\nvalues\n{rows_of_data}\n"
    for insert_chunk in data_split:
        rows_of_data = format_data_for_insert(insert_chunk, column_types)
        insert_chunk_sql = insert_chunk_sql_template.format(table = table_name, column_names_str = column_names_str, rows_of_data = rows_of_data )
        yield insert_chunk_sql    


def collect_insert_sql(table_name, column_names, column_types, data, max_insert=1000):

    insert_sql_generator = generate_insert_sql(table_name, column_names, column_types, data, max_insert)
    insert_sql = ""
    for insert_chunk_sql in insert_sql_generator:
        insert_sql += insert_chunk_sql + ";\n"    
    return insert_sql