Commit 00038591 by Selah Clarity

simplify import data to clarity, clean up tests

1 parent ba1be159
......@@ -13,6 +13,38 @@ def create_table_sql(table_name, column_def):
return create_table_sql
def create_and_import(data, table_name, table_def, conn, max_insert=1000):
import time
start = time.time()
cts = create_table_sql(table_name, table_def)
conn.execute(cts)
dtypes = get_dtypes_from_table_def(table_def)
print(dtypes)
insert_sql_generator = generate_insert_sql(table_name, data.columns, dtypes, data, max_insert=max_insert)
for insert_chunk_sql in insert_sql_generator:
conn.execute(insert_chunk_sql)
end = time.time()
duration = end - start
line_cnt = len(data)
print("{} lines imported in {:.1f} s".format(line_cnt, duration))
#WARNING - can't handle table names with spaces in them
def get_dtypes_from_table_def(table_def):
import re
column_defs = [ (var_def).strip() for var_def in table_def.split(',') ]
column_types = []
for column_def in column_defs:
if re.search(r'VARCHAR', column_def):
column_types.append('STR')
elif re.search(r'INT', column_def):
column_types.append('NUM')
elif re.search(r'DATETIME', column_def):
column_types.append('DT')
else:
raise Exception("Unrecognized column definition data type")
return column_types
def format_data_for_insert(rows, column_types):
data_formatted = ""
for row in rows:
......
import pandas as pd
import numpy as np
import unittest
import sqlalchemy
import bulk_insert
#%%
table_name = "##COHORT_BULK_INSERT_TEST"
test_data = [
[578,'29389','2011-09-03'],
[332,'11384','2011-09-07'],
[372,'14487','2011-09-07'],
[331,'41384','2011-09-07'],
[931,'24587','2011-10-03']
]
df_test_data = pd.DataFrame(test_data)
#%%
table_column_def = '''
PAT_ID VARCHAR(18) NOT NULL,
MRN VARCHAR(30) NOT NULL,
DELIVERY_DATE DATETIME NOT NULL'''
column_names = ["MRN","PAT_ID","DELIVERY_DATE"]
column_types = ['NUM','STR','DT']
#%%
import bulk_insert
print(bulk_insert.create_table_sql(table_name, table_column_def))
#%%
insert_sql_generator = bulk_insert.generate_insert_sql(table_name, column_names, column_types, df_test_data, max_insert=3)
print(next(insert_sql_generator))
#%%
print(next(insert_sql_generator))
#%%
#Dataframes should never be passed to this function
#print(bulk_insert.format_data_for_insert(df_test_data, ['NUM','STR','DT']))
print(bulk_insert.format_data_for_insert(test_data, ['NUM','STR','DT']))
#%%
print(bulk_insert.collect_insert_sql(table_name, column_names, column_types, df_test_data, max_insert=3))
#%% Test with clarity
import clarity_to_csv as ctc
#conn = ctc.get_clarity_engine().connect()
create_create_table_sql = """
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name} ({table_column_def});
""".format(table_name=table_name, table_column_def=table_column_def)
import bulk_insert
insert_sql_generator = bulk_insert.generate_insert_sql(table_name, column_names, column_types, df_test_data, max_insert=3)
insert_sql1 = next(insert_sql_generator)
insert_sql2 = next(insert_sql_generator)
#%%
conn.execute(create_create_table_sql)
conn.execute(insert_sql1)
conn.execute(insert_sql2)
conn = ctc.get_clarity_engine(host='claritydev.uphs.upenn.edu').connect()
#conn = ctc.get_clarity_engine(host='clarityprod.uphs.upenn.edu').connect()
#TODO - scramble this damn data so it isn't PHI
class TestStuff(unittest.TestCase):
def test_something(self):
self.assertEqual(2+1, 3)
def test_integration_1(self):
# Integration test, desired workflow as of July 2021
datadir = 'C:/Users/LynchSe/Documents/Data/Clarity_Tools_Selah/'
dfp = pd.read_csv(datadir + "test_cohort_36559.csv")
dfc = dfp[['PAT_ID', 'BIRTH_DATE', 'ENC_CNT_2017']]
tabledef = '''PAT_ID VARCHAR(18) NOT NULL, BIRTH_DATE DATETIME, ENC_CNT_2017 INTEGER
'''
import clarity_to_csv as ctc
conn = ctc.get_clarity_engine().connect()
bulk_insert.create_and_import(dfc, '##cohort_sample2', tabledef, conn)
def test_integration_2(self):
datadir = 'C:/Users/LynchSe/Documents/Data/Clarity_Tools_Selah/'
dfcohort_inp1 = pd.read_excel(datadir + 'geobirth_patient_list_2018_2021.xls')
#TODO - scramble this data so it isn't PHI
import bulk_insert
table_def = 'MRN VARCHAR(100)'
bulk_insert.create_and_import(dfcohort_inp1[['mrn']], '##cohort_inp', table_def, conn, max_insert=1000)
if __name__ == '__main__':
unittest.main()
# t = TestStuff()
# t.test_something()
......@@ -18,7 +18,7 @@ def get_mssql_engine(
selahcredsfilename = 'C:\\Users\\LynchSe\\Documents\\selah_clarity_credentials.txt'
def get_clarity_engine(credsfilename = selahcredsfilename, timeout=600):
def get_clarity_engine(credsfilename = selahcredsfilename, timeout=600, host='clarityprod.uphs.upenn.edu'):
with open(credsfilename, 'r') as credsfile:
name = credsfile.readline().strip()
pw = credsfile.readline().strip()
......@@ -27,9 +27,11 @@ def get_clarity_engine(credsfilename = selahcredsfilename, timeout=600):
##### BEGIN ACTUAL TESTS #####
#because we dont' want to hit clarity more than necessary, we run tests one at a time
#because we are dealing with wierd hanging issues, we run tests one at a time
#because we don't want to hit clarity more than necessary, we use dev server
class TestStuff(unittest.TestCase):
#Test a basic connect and execute
def test_basic_conn_execute(self):
eng = get_clarity_engine()
......@@ -38,33 +40,86 @@ class TestStuff(unittest.TestCase):
self.assertEqual(len(list(res)), 3)
def test_dev_conn_execute(self):
eng = get_clarity_engine(host='claritydev.uphs.upenn.edu')
with eng.connect() as conn:
res = conn.execute('SELECT TOP 3 PAT_ID FROM PAT_ENC')
self.assertEqual(len(list(res)), 3)
#This hangs...sometimes...why? Hung on the second time I ran it.
#Is it that it hangs after the raw connection
def test_temp_table_persistence(self):
eng = get_clarity_engine()
eng = get_clarity_engine(host='claritydev.uphs.upenn.edu')
with eng.connect() as conn:
conn.execute('DROP TABLE IF EXISTS ##COHORT')
conn.execute('SELECT TOP 3 PAT_ID INTO ##COHORT FROM PAT_ENC')
res = conn.execute('SELECT * FROM ##COHORT')
self.assertEqual(len(list(res)), 3)
#we expect the global temp table to disappear with new connection
import time
print("Sleeping 3 seconds...")
time.sleep(3)
print("Done sleeping")
with eng.connect() as conn:
print("`eng.connect() as conn` finished executing")
with self.assertRaises(Exception) as e:
res = conn.execute('SELECT * FROM ##COHORT')
print(e.exception)
# def test_raw_connection(self):
# eng = get_clarity_engine()
# with eng.raw_connection().cursor() as cur:
# cur.execute('DROP TABLE IF EXISTS ##COHORT')
# cur.execute('SELECT TOP 3 PAT_ID INTO ##COHORT FROM PAT_ENC')
# cur.execute('SELECT * FROM ##COHORT')
# self.assertEqual(len([row for row in cur]), 3)
def will_this_hang(self):
eng = get_clarity_engine(host='claritydev.uphs.upenn.edu')
print("Try first connection as raw connection...")
with eng.raw_connection().cursor() as cur:
print("connect executed")
cur.execute('DROP TABLE IF EXISTS ##COHORT')
cur.execute('SELECT TOP 3 PAT_ID INTO ##COHORT FROM PAT_ENC')
cur.execute('SELECT * FROM ##COHORT')
self.assertEqual(len([row for row in cur]), 3)
print("recreate engine")
eng = get_clarity_engine(host='claritydev.uphs.upenn.edu', timeout=60) #if I get rid of this line it doesn't hang
print("Try second connection as regular connection...")
with eng.connect() as conn:
print("connect executed")
with self.assertRaises(Exception) as e:
res = conn.execute('SELECT * FROM ##COHORT') #I think this is where it hangs? or not
print(e.exception)
print("Try third connection as regular connection...")
with eng.connect() as conn:
print("connect executed")
conn.execute('DROP TABLE IF EXISTS ##COHORT')
conn.execute('SELECT TOP 3 PAT_ID INTO ##COHORT FROM PAT_ENC')
res = conn.execute('SELECT * FROM ##COHORT')
self.assertEqual(len(list(res)), 3)
#we expect the global temp table to disappear with new connection
if __name__ == '__main__':
def test_raw_connection(self):
eng = get_clarity_engine(host='claritydev.uphs.upenn.edu')
with eng.raw_connection().cursor() as cur:
cur.execute('DROP TABLE IF EXISTS ##COHORT')
cur.execute('SELECT TOP 3 PAT_ID INTO ##COHORT FROM PAT_ENC')
cur.execute('SELECT * FROM ##COHORT')
self.assertEqual(len([row for row in cur]), 3)
unittest.main()
#%%
if __name__ == '__main__':
# unittest.main()
t = TestStuff()
t.test_basic_conn_execute()
t.test_dev_conn_execute()
t.test_temp_table_persistence()
t.will_this_hang()
t.test_raw_connection()
......@@ -12,7 +12,7 @@ def get_mssql_engine(
database="clarity_snapshot_db",
domain="UPHS",
port="1433",
timeout=7200,
timeout=600, #2hr?
password=None,
):
from sqlalchemy import create_engine
......@@ -27,13 +27,14 @@ def get_mssql_engine(
#%% My functions
selahcredsfilename = 'C:\\Users\\LynchSe\\Documents\\selah_clarity_credentials.txt'
def get_clarity_engine(credsfilename = selahcredsfilename, timeout=600):
def get_clarity_engine(credsfilename = selahcredsfilename, timeout=600, host='clarityprod.uphs.upenn.edu'):
with open(credsfilename, 'r') as credsfile:
nameline = credsfile.readline().strip()
pwline = credsfile.readline().strip()
clarity_engine = get_mssql_engine(username=nameline, password=pwline, timeout=timeout)
clarity_engine = get_mssql_engine(username=nameline, password=pwline, timeout=timeout, host=host)
return clarity_engine
def clarity_to_csv(sqlfilename, csvfilenames, dbconn=None):
print("Running SQL from {}".format(sqlfilename))
with open(sqlfilename, 'r') as sqlfile:
......
......@@ -6,8 +6,8 @@ import sqlparse
#%%
testquerydir = 'C:\\Users\\LynchSe\\Documents\\Repos\\Covid19Related\\selah\\clarity_to_csv_tests\\'
testdatadir = 'C:\\Users\\LynchSe\\Documents\\Data\\clarity_to_csv_tests\\'
testquerydir = 'C:\\Users\\LynchSe\\Documents\\Repos\\Covid19Related\\selah\\Clarity_Tools_Selah\\'
testdatadir = 'C:\\Users\\LynchSe\\Documents\\Data\\Clarity_Tools_Selah\\'
#TODO spin up a sqlite database here
......
from sqlalchemy import create_engine
#%%
sqlite_db_filepath = "C:\\Users\\LynchSe\\Documents\\Data\\database.db"
e = create_engine('sqlite:///{}'.format(sqlite_db_filepath))
c = e.connect()
c.execute('SELECT 1;')
c.execute('CREATE TABLE IF NOT EXISTS COHORT (EMPI VARCHAR(90) NOT NULL);')
c.execute("INSERT INTO COHORT (EMPI) values ('8001111117'),('1000000000'),('8333333002');")
res = c.execute("SELECT EMPI FROM COHORT;")
#%%
for line in res:
print(type(line))
print(line)
#%%
c.close()
import pandas as pd
import unittest
class TestStuff(unittest.TestCase):
def test_something(self):
self.assertEqual(2+1, 3)
if __name__ == '__main__':
# unittest.main()
t = TestStuff()
t.test_something()
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!