bulk_insert.py
3.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#GOAL insert a csv into a temp table in Clarity
#This library takes a dataframe and imports it into Clarity
import pandas as pd
import numpy as np
import sqlalchemy
def create_table_sql(table_name, column_def):
create_table_sql = """
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name} ({table_column_def});
""".format(table_name=table_name, table_column_def=column_def)
return create_table_sql
def create_and_import(data, table_name, table_def, conn, max_insert=1000):
import time
start = time.time()
cts = create_table_sql(table_name, table_def)
conn.execute(cts)
dtypes = get_dtypes_from_table_def(table_def)
insert_sql_generator = generate_insert_sql(table_name, data.columns, dtypes, data, max_insert=max_insert)
fline = 0
for insert_chunk_sql in insert_sql_generator:
try:
#TODO - handle this so long SQL insert doesn't obscure the screen
conn.execute(insert_chunk_sql)
# except sqlalchemy.exc.OperationalError as e:
# raise e
except Exception as e:
print("There was an exception during insert:\n" + e._message() + e.statement[:500] + '\n')
print("Aborting create_and_import()")
return False
end = time.time()
duration = end - start
line_cnt = len(data)
print("{} lines imported in {:.1f} s".format(line_cnt, duration))
#WARNING - can't handle table names with spaces in them
def get_dtypes_from_table_def(table_def):
import re
column_defs = [ (var_def).strip() for var_def in table_def.split(',') ]
column_types = []
for column_def in column_defs:
if re.search(r'VARCHAR', column_def):
column_types.append('STR')
elif re.search(r'INT', column_def):
column_types.append('NUM')
elif re.search(r'DATETIME', column_def):
column_types.append('DT')
else:
raise Exception("Unrecognized column definition data type")
return column_types
def format_data_for_insert(rows, column_types):
data_formatted = ""
for row in rows:
row_fmt = zip(row, column_types)
items_fmttd = []
for (item, fmt) in row_fmt:
if item is np.nan:
item_fmttd = 'NULL'
elif (fmt == 'STR') | (fmt == 'DT'):
item_fmttd = "'{}'".format(item)
else:
item_fmttd = "{}".format(item)
items_fmttd.append(item_fmttd)
row_formatted = ",".join([str(item) for item in items_fmttd])
row_formatted = "({})".format(row_formatted)
data_formatted += row_formatted + ",\n"
data_formatted = data_formatted[0:-2]
return data_formatted
def generate_insert_sql(table_name, column_names, column_types, data, max_insert=1000):
if type(data) is pd.core.frame.DataFrame:
data = data.values
import math
num_splits = math.ceil(len(data)/max_insert)
data_split = np.array_split(data, num_splits)
column_names_str = ','.join(column_names)
insert_chunk_sql_template = "INSERT INTO {table} ({column_names_str})\nvalues\n{rows_of_data}\n"
for insert_chunk in data_split:
rows_of_data = format_data_for_insert(insert_chunk, column_types)
insert_chunk_sql = insert_chunk_sql_template.format(table = table_name, column_names_str = column_names_str, rows_of_data = rows_of_data )
yield insert_chunk_sql
def collect_insert_sql(table_name, column_names, column_types, data, max_insert=1000):
insert_sql_generator = generate_insert_sql(table_name, column_names, column_types, data, max_insert)
insert_sql = ""
for insert_chunk_sql in insert_sql_generator:
insert_sql += insert_chunk_sql + ";\n"
return insert_sql