Commit adfc8de4 by Selah Lynch

Merge remote-tracking branch 'origin/master'

2 parents 38599a9a 643d13e5
#GOAL insert a csv into a temp table in Clarity
#This library takes a dataframe and imports it into Clarity
import pandas as pd
import numpy as np
import sqlalchemy
def create_table_sql(table_name, column_def):
create_table_sql = """
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name} ({table_column_def});
""".format(table_name=table_name, table_column_def=column_def)
return create_table_sql
def create_and_import(data, table_name, table_def, conn, max_insert=1000):
import time
start = time.time()
cts = create_table_sql(table_name, table_def)
conn.execute(cts)
dtypes = get_dtypes_from_table_def(table_def)
insert_sql_generator = generate_insert_sql(table_name, data.columns, dtypes, data, max_insert=max_insert)
fline = 0
for insert_chunk_sql in insert_sql_generator:
try:
#TODO - handle this so long SQL insert doesn't obscure the screen
conn.execute(insert_chunk_sql)
# except sqlalchemy.exc.OperationalError as e:
# raise e
except Exception as e:
print("There was an exception during insert:\n" + e._message() + e.statement[:500] + '\n')
print("Aborting create_and_import()")
return False
end = time.time()
duration = end - start
line_cnt = len(data)
print("{} lines imported in {:.1f} s".format(line_cnt, duration))
#WARNING - can't handle table names with spaces in them
def get_dtypes_from_table_def(table_def):
import re
column_defs = [ (var_def).strip() for var_def in table_def.split(',') ]
column_types = []
for column_def in column_defs:
if re.search(r'VARCHAR', column_def):
column_types.append('STR')
elif re.search(r'INT', column_def):
column_types.append('NUM')
elif re.search(r'DATETIME', column_def):
column_types.append('DT')
else:
raise Exception("Unrecognized column definition data type")
return column_types
def format_data_for_insert(rows, column_types):
data_formatted = ""
for row in rows:
row_fmt = zip(row, column_types)
items_fmttd = []
for (item, fmt) in row_fmt:
if item is np.nan:
item_fmttd = 'NULL'
elif (fmt == 'STR') | (fmt == 'DT'):
item_fmttd = "'{}'".format(item)
else:
item_fmttd = "{}".format(item)
items_fmttd.append(item_fmttd)
row_formatted = ",".join([str(item) for item in items_fmttd])
row_formatted = "({})".format(row_formatted)
data_formatted += row_formatted + ",\n"
data_formatted = data_formatted[0:-2]
return data_formatted
def generate_insert_sql(table_name, column_names, column_types, data, max_insert=1000):
if type(data) is pd.core.frame.DataFrame:
data = data.values
import math
num_splits = math.ceil(len(data)/max_insert)
data_split = np.array_split(data, num_splits)
column_names_str = ','.join(column_names)
insert_chunk_sql_template = "INSERT INTO {table} ({column_names_str})\nvalues\n{rows_of_data}\n"
for insert_chunk in data_split:
rows_of_data = format_data_for_insert(insert_chunk, column_types)
insert_chunk_sql = insert_chunk_sql_template.format(table = table_name, column_names_str = column_names_str, rows_of_data = rows_of_data )
yield insert_chunk_sql
def collect_insert_sql(table_name, column_names, column_types, data, max_insert=1000):
insert_sql_generator = generate_insert_sql(table_name, column_names, column_types, data, max_insert)
insert_sql = ""
for insert_chunk_sql in insert_sql_generator:
insert_sql += insert_chunk_sql + ";\n"
return insert_sql
import pandas as pd
import numpy as np
import unittest
import sqlalchemy
import bulk_insert
import clarity_to_csv as ctc
conn = ctc.get_clarity_engine(host='claritydev.uphs.upenn.edu').connect()
#conn = ctc.get_clarity_engine(host='clarityprod.uphs.upenn.edu').connect()
#TODO - scramble this damn data so it isn't PHI
class TestStuff(unittest.TestCase):
#How to deal with, I don't know!!
def test_error_in_burris_meds_insert(self):
datadir = "C:/Users/LynchSe/Documents/Data/Burris_Geobirth/"
dfmed = pd.read_csv(datadir + "from_burris_lab/Copy of GeoBirth_med_counts_20210713.csv")
sgids = dfmed.SIMPLE_GENERIC_C.unique()
dfsgids_raw = pd.DataFrame({'SIMPLE_GENERIC_C':sgids})
dfsgids = dfsgids_raw.loc[18:23]
import bulk_insert
table_def = '''
SIMPLE_GENERIC_C INT
'''
#there were overflow issues
bulk_insert.create_and_import(dfsgids, '##sgmedids', table_def, conn)
def test_something(self):
self.assertEqual(2+1, 3)
def test_format_row_for_insert_nans(self):
insert_chunk = np.array([[61,'hello','2020-08-21'],[np.NAN,np.NAN,np.NAN]], dtype='object')
column_types = ['NUM','STR','DT']
rows_of_data = bulk_insert.format_data_for_insert(insert_chunk, column_types)
self.assertEqual(rows_of_data, "(61,'hello','2020-08-21'),\n(NULL,NULL,NULL)")
def test_burris_pull_enc_short(self):
projdatadir = 'C:\\Users\\LynchSe\\Documents\\Data\\Burris_Geobirth\\'
d = '2021_05_26'
dfr = pd.read_csv(projdatadir + 'cohort_pat_delivery_{}.csv'.format(d))#.sample(1000)
df = dfr.iloc[30053:30069]
import bulk_insert
table_def = '''
HUP_MRN VARCHAR(30),
PAT_ID VARCHAR(18),
DELIVERY_DATE DATETIME,
HOSPITAL VARCHAR(20)
'''
bulk_insert.create_and_import(df, '##test_burris_pull_enc_short', table_def, conn, max_insert=200)
## REMEMBER TO LOOK IN CLARITYDEV, NOT PROD
def test_burris_pull_enc(self):
projdatadir = 'C:\\Users\\LynchSe\\Documents\\Data\\Burris_Geobirth\\'
d = '2021_05_26'
df = pd.read_csv(projdatadir + 'cohort_pat_delivery_{}.csv'.format(d))#.sample(1000)
import bulk_insert
table_def = '''
HUP_MRN VARCHAR(30),
PAT_ID VARCHAR(18),
DELIVERY_DATE DATETIME,
HOSPITAL VARCHAR(20)
'''
bulk_insert.create_and_import(df, '##test_burris_pull_enc', table_def, conn, max_insert=1000)
## REMEMBER TO LOOK IN CLARITYDEV, NOT PROD
def test_integration_1(self):
# Integration test, desired workflow as of July 2021
datadir = 'C:/Users/LynchSe/Documents/Data/Clarity_Tools_Selah/'
dfp = pd.read_csv(datadir + "test_cohort_36559.csv")
dfc = dfp[['PAT_ID', 'BIRTH_DATE', 'ENC_CNT_2017']]
tabledef = '''PAT_ID VARCHAR(18) NOT NULL, BIRTH_DATE DATETIME, ENC_CNT_2017 INTEGER
'''
import clarity_to_csv as ctc
conn = ctc.get_clarity_engine().connect()
bulk_insert.create_and_import(dfc, '##cohort_sample2', tabledef, conn)
## REMEMBER TO LOOK IN CLARITYDEV, NOT PROD
def test_integration_2(self):
datadir = 'C:/Users/LynchSe/Documents/Data/Clarity_Tools_Selah/'
dfcohort_inp1 = pd.read_excel(datadir + 'geobirth_patient_list_2018_2021.xls')
#TODO - scramble this data so it isn't PHI
import bulk_insert
table_def = 'MRN VARCHAR(100)'
bulk_insert.create_and_import(dfcohort_inp1[['mrn']], '##cohort_inp', table_def, conn, max_insert=1000)
## REMEMBER TO LOOK IN CLARITYDEV, NOT PROD
if __name__ == '__main__':
# unittest.main()
t = TestStuff()
t.test_error_in_burris_meds_insert()
# t.test_burris_pull_enc()
# t.test_burris_pull_enc_short()
# t.test_format_row_for_insert_nans()
# t.test_something()
''''''
......@@ -18,7 +18,7 @@ def get_mssql_engine(
selahcredsfilename = 'C:\\Users\\LynchSe\\Documents\\selah_clarity_credentials.txt'
def get_clarity_engine(credsfilename = selahcredsfilename, timeout=600):
def get_clarity_engine(credsfilename = selahcredsfilename, timeout=600, host='clarityprod.uphs.upenn.edu'):
with open(credsfilename, 'r') as credsfile:
name = credsfile.readline().strip()
pw = credsfile.readline().strip()
......@@ -27,38 +27,80 @@ def get_clarity_engine(credsfilename = selahcredsfilename, timeout=600):
##### BEGIN ACTUAL TESTS #####
#because we are dealing with wierd hanging issues, we run tests one at a time
#because we don't want to hit clarity more than necessary, we use dev server
class TestStuff(unittest.TestCase):
def close_conn(self):
def test_clarity_dev_connection(self):
eng = get_clarity_engine(host='claritydev.uphs.upenn.edu')
#Test a basic connect and execute
def test_basic_conn_execute(self):
eng = get_clarity_engine()
with eng.connect() as conn:
res = conn.execute('SELECT TOP 3 PAT_ID FROM PAT_ENC')
self.assertEqual(len(list(res)), 3)
# conn.close()
def test_temp_table_after_reconnect(self):
eng = get_clarity_engine()
def test_dev_conn_execute(self):
eng = get_clarity_engine(host='claritydev.uphs.upenn.edu')
with eng.connect() as conn:
res = conn.execute('SELECT TOP 3 PAT_ID FROM PAT_ENC')
self.assertEqual(len(list(res)), 3)
#This hangs...sometimes...why? Hung on the second time I ran it.
#Is it that it hangs after the raw connection
def test_temp_table_persistence(self):
eng = get_clarity_engine(host='claritydev.uphs.upenn.edu')
with eng.connect() as conn:
conn.execute('DROP TABLE IF EXISTS ##COHORT')
conn.execute('SELECT TOP 3 PAT_ID INTO ##COHORT FROM PAT_ENC')
res = conn.execute('SELECT * FROM ##COHORT')
self.assertEqual(len(list(res)), 3)
#we expect the global temp table to disappear with new connection
import time
print("Sleeping 3 seconds...")
time.sleep(3)
print("Done sleeping")
with eng.connect() as conn:
print("`eng.connect() as conn` finished executing")
with self.assertRaises(Exception) as e:
res = conn.execute('SELECT * FROM ##COHORT')
# print(e.exception)
print(e.exception)
def test_temp_table_both_handles(self):
eng = get_clarity_engine()
def will_this_hang(self):
eng = get_clarity_engine(host='claritydev.uphs.upenn.edu')
print("Try first connection as raw connection...")
with eng.raw_connection().cursor() as cur:
print("connect executed")
cur.execute('DROP TABLE IF EXISTS ##COHORT')
cur.execute('SELECT TOP 3 PAT_ID INTO ##COHORT FROM PAT_ENC')
cur.execute('SELECT * FROM ##COHORT')
self.assertEqual(len([row for row in cur]), 3)
print("recreate engine")
eng = get_clarity_engine(host='claritydev.uphs.upenn.edu', timeout=60) #if I get rid of this line it doesn't hang
print("Try second connection as regular connection...")
with eng.connect() as conn:
print("connect executed")
with self.assertRaises(Exception) as e:
res = conn.execute('SELECT * FROM ##COHORT') #I think this is where it hangs? or not
print(e.exception)
print("Try third connection as regular connection...")
with eng.connect() as conn:
print("connect executed")
conn.execute('DROP TABLE IF EXISTS ##COHORT')
conn.execute('SELECT TOP 3 PAT_ID INTO ##COHORT FROM PAT_ENC')
res = conn.execute('SELECT * FROM ##COHORT')
self.assertEqual(len(list(res)), 3)
#we expect the global temp table to disappear with new connection
def test_raw_connection(self):
eng = get_clarity_engine(host='claritydev.uphs.upenn.edu')
with eng.raw_connection().cursor() as cur:
cur.execute('DROP TABLE IF EXISTS ##COHORT')
cur.execute('SELECT TOP 3 PAT_ID INTO ##COHORT FROM PAT_ENC')
......@@ -66,20 +108,22 @@ class TestStuff(unittest.TestCase):
self.assertEqual(len([row for row in cur]), 3)
if __name__ == '__main__':
tests_to_run = [
"close_conn"
,"test_temp_table_after_reconnect"
,"test_temp_table_both_handles"
]
suite = unittest.TestSuite()
for test in tests_to_run:
suite.addTest(TestStuff(test))
runner = unittest.TextTestRunner()
runner.run(suite)
# unittest.main()
#%%
if __name__ == '__main__':
# unittest.main()
t = TestStuff()
t.test_clarity_dev_connection()
t.test_basic_conn_execute()
t.test_dev_conn_execute()
t.test_temp_table_persistence()
t.will_this_hang()
t.test_raw_connection()
......@@ -12,7 +12,7 @@ def get_mssql_engine(
database="clarity_snapshot_db",
domain="UPHS",
port="1433",
timeout=7200,
timeout=600, #2hr?
password=None,
):
from sqlalchemy import create_engine
......@@ -27,15 +27,43 @@ def get_mssql_engine(
#%% My functions
selahcredsfilename = 'C:\\Users\\LynchSe\\Documents\\selah_clarity_credentials.txt'
def get_clarity_engine(credsfilename = selahcredsfilename, timeout=600):
def get_clarity_engine(credsfilename = selahcredsfilename, timeout=600, host='clarityprod.uphs.upenn.edu'):
with open(credsfilename, 'r') as credsfile:
nameline = credsfile.readline().strip()
pwline = credsfile.readline().strip()
clarity_engine = get_mssql_engine(username=nameline, password=pwline, timeout=timeout)
clarity_engine = get_mssql_engine(username=nameline, password=pwline, timeout=timeout, host=host)
return clarity_engine
def sqltext_to_dfs(sqltext, dbconn):
sql_stmts = extract_sql_statements(sqltext)
dfs = []
for stmt in sql_stmts:
res = dbconn.execute(stmt)
if res.returns_rows == True:
columns = list(res.keys())
values = []
for row in res: #note - there is some repeated logic here
values.append(list(row))
df = pd.DataFrame(data=values, columns=columns)
dfs.append(df)
return tuple(dfs)
def sqlfile_to_dfs(sqlfilename, dbconn):
with open(sqlfilename, 'r') as sqlfile:
sqltext = sqlfile.read()
dfs = sqltext_to_dfs(sqltext, dbconn)
return dfs
def clarity_to_csv(sqlfilename, csvfilenames, dbconn=None):
print("Running SQL from {}".format(sqlfilename))
import time
start = time.time()
with open(sqlfilename, 'r') as sqlfile:
sqltext = sqlfile.read()
eng = get_clarity_engine()
......@@ -43,16 +71,26 @@ def clarity_to_csv(sqlfilename, csvfilenames, dbconn=None):
clarity_to_csv_inner(sqltext, csvfilenames, dbconn)
else:
with eng.connect() as sqalconn:
clarity_to_csv_inner(sqltext, csvfilenames, dbconn)
clarity_to_csv_inner(sqltext, csvfilenames, sqalconn)
end = time.time()
duration = end - start
dtstr = time.strftime('%a %I:%M%p %Y-%m-%d')
print("Query ran and exported {} in {:.1f} s".format(dtstr, duration))
def clarity_to_csv_inner(sqltext, csvfilenames, sqalconn, verbose=False):
def extract_sql_statements(sqltext):
import sqlparse
sqltext_cleaned = sqlparse.format(sqltext, strip_comments=True).strip()
sqlstatements = sqltext_cleaned.split(';')
sqlstatements = [stmt.strip() for stmt in sqlstatements]
if sqlstatements[-1].strip() == '':
sqlstatements.pop() # often there is a final semicolon leading to a empty last statement
return sqlstatements
def clarity_to_csv_inner(sqltext, csvfilenames, sqalconn, verbose=False):
sqlstatements = extract_sql_statements(sqltext)
which_statement = 0
which_csvfile = 0
for sqlstatement in sqlstatements:
......@@ -69,7 +107,7 @@ def clarity_to_csv_inner(sqltext, csvfilenames, sqalconn, verbose=False):
with open(csvname, 'w', newline='\n', encoding='utf-8') as csvfile:
line_count = 0
mycsvwriter = csv.writer(csvfile, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
#TODO - write column names
#write column names
mycsvwriter.writerow(results.keys())
line_count += 1
for row in results:
......
......@@ -4,10 +4,9 @@ import pandas as pd
from unittest.mock import MagicMock
import sqlparse
#%%
testquerydir = 'C:\\Users\\LynchSe\\Documents\\Repos\\Covid19Related\\selah\\clarity_to_csv_tests\\'
testdatadir = 'C:\\Users\\LynchSe\\Documents\\Data\\clarity_to_csv_tests\\'
testquerydir = 'C:\\Users\\LynchSe\\Documents\\Repos\\rClarity_Tools_Selah\\clarity_to_csv_tests\\'
testdatadir = 'C:\\Users\\LynchSe\\Documents\\Data\\Clarity_Tools_Selah\\'
#TODO spin up a sqlite database here
......@@ -17,17 +16,57 @@ def line_count(filename):
return len(myfile.readlines())
def remove_files(filenamelist):
import os
import subprocess
for filename in filenamelist:
if filename is None:
next
elif os.path.isfile(filename):
try:
subprocess.check_output('rm {}'.format(filename))
except Exception as e:
print(e)
pass
else :
print("Skipping removal because not recognized as file - {}".format(filename))
class TestStuff(unittest.TestCase):
def test_remove_file_not_there(self):
#make it not make that stupid shitty error
remove_files(['poop.csv'])
def test_integration_test(self):
sqlfilename1 = testquerydir + "testCohort.sql"
sqlfilename2 = testquerydir + "readTestCohort.sql"
genericcsvs = [
testdatadir + 'test1.csv',
testdatadir + 'test2.csv',
]
remove_files(genericcsvs)
with ctc.get_clarity_engine().connect() as sqalconn:
ctc.clarity_to_csv(sqlfilename1, genericcsvs, dbconn=sqalconn)
ctc.clarity_to_csv(sqlfilename2, genericcsvs, dbconn=sqalconn)
def test_save_to_dataframes(self):
sqlfilename = testquerydir + "testCohort.sql"
with ctc.get_clarity_engine().connect() as sqalconn:
(df1, df2) = ctc.sqlfile_to_dfs(sqlfilename, sqalconn)
self.assertEqual(len(df1),3)
self.assertEqual(len(df2),2)
def test_save_to_df_2col(self):
sql_2col = '''SELECT TOP 10 PAT_ID, PAT_ENC_CSN_ID FROM PAT_ENC;
SELECT TOP 5 CONTACT_DATE FROM PAT_ENC tablesample(0.01);
'''
with ctc.get_clarity_engine().connect() as sqalconn:
(df1, df2) = ctc.sqltext_to_dfs(sql_2col, sqalconn)
self.assertEqual(len(df1),10)
self.assertEqual(len(df2),5)
def test_comment_with_semicolon(self):
sqltext = '''
SELECT TOP 2 PAT_ID FROM PAT_ENC;
......@@ -54,21 +93,7 @@ class TestStuff(unittest.TestCase):
ctc.clarity_to_csv(sqlfilename, genericcsvs, dbconn=sqalconn)
def integration_test(self):
sqlfilename1 = testquerydir + "testCohort.sql"
sqlfilename2 = testquerydir + "readTestCohort.sql"
genericcsvs = [
testdatadir + 'test1.csv',
testdatadir + 'test2.csv',
]
remove_files(genericcsvs)
with ctc.get_clarity_engine().connect() as sqalconn:
ctc.clarity_to_csv(sqlfilename1, genericcsvs, dbconn=sqalconn)
ctc.clarity_to_csv(sqlfilename2, genericcsvs, dbconn=sqalconn)
def unicode_error(self):
def test_unicode_error(self):
genericcsvs = [
testdatadir + 'test_cohort.csv'
]
......@@ -131,25 +156,24 @@ class TestStuff(unittest.TestCase):
#TODO - deal with wrong number of csv's supplied
#%%
if __name__ == '__main__':
tests_to_run = [
"test_comment_with_semicolon"
# , "test_none_csv"
# , "integration_test"
# , "unicode_error"
# , "test_simple"
# , "test_wrapper"
# , "test_cohort"
]
suite = unittest.TestSuite()
for test in tests_to_run:
suite.addTest(TestStuff(test))
runner = unittest.TextTestRunner()
runner.run(suite)
t = TestStuff()
t.test_save_to_dataframes()
t.test_save_to_df_2col()
# t.test_remove_file_not_there()
# t.test_integration_test()
# t.test_comment_with_semicolon()
# t.test_none_csv()
# t.test_unicode_error()
# t.test_simple()
# t.test_wrapper()
# t.test_cohort()
# unittest.main()
# unittest.main()
......
select * from INFORMATION_SCHEMA.COLUMNS where table_name = 'CLARITY_MEDICATION'
\ No newline at end of file
import sqlalchemy
import clarity_to_csv as ctc
#from sqlalchemy import create_engine
sqlalchemy.__version__
#%%
sqlite_db_filepath = "C:\\Users\\LynchSe\\Documents\\Data\\database.db"
e = sqlalchemy.create_engine('sqlite:///{}'.format(sqlite_db_filepath))
conn = e.connect()
#%%
eng_clarity = ctc.get_clarity_engine()
conn_clarity = eng_clarity.connect()
#%%
metadata_obj = sqlalchemy.MetaData()
#%%
mytable = sqlalchemy.schema.Table("mytable", metadata_obj,
sqlalchemy.Column('mytable_id', sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column('value', sqlalchemy.String(50))
)
mytable.name
#%% HOLY SHIT, I CAN READ INFO ABOUT A TABLE FROM CLARITY!!!
pat_enc = sqlalchemy.Table('PAT_ENC', metadata_obj, autoload_with=eng_clarity)
list(pat_enc.columns)
#%% Can I easily import a cohort into clarity?
cohort = sqlalchemy.schema.Table("##cohort", metadata_obj,
sqlalchemy.Column('PAT_ID', sqlalchemy.String(18), primary_key=True))
#%%
cohort.create(eng_clarity)
#%%
stmt = sqlalchemy.insert(cohort).values(PAT_ID='5931')
eng_clarity.execute(stmt)
#%%
rows = [{'PAT_ID':'1132'}, {'PAT_ID':'1133'}]
stmt = sqlalchemy.insert(cohort).values(rows)
eng_clarity.execute(stmt)
#TODO - test if this can handle a gazillion rows without being super slow
#%%
stmt = sqlalchemy.select(cohort)
res = eng_clarity.execute(stmt)
#%%
import pandas as pd
df = pd.read_sql(stmt, eng_clarity)
df2 = pd.read_sql(stmt.where(cohort.c.PAT_ID < 2000), eng_clarity)
df3 = pd.read_sql(stmt.where(cohort.c.PAT_ID.in_([5931, 1132])), eng_clarity)
#%% now how do I select from PAT_ENC based on a list of ids
#TODO - import into temp table and use join
#TODO - compare with IN clause?
from sqlalchemy import create_engine
#%%
sqlite_db_filepath = "C:\\Users\\LynchSe\\Documents\\Data\\database.db"
e = create_engine('sqlite:///{}'.format(sqlite_db_filepath))
c = e.connect()
c.execute('SELECT 1;')
c.execute('CREATE TABLE IF NOT EXISTS COHORT (EMPI VARCHAR(90) NOT NULL);')
c.execute("INSERT INTO COHORT (EMPI) values ('8001111117'),('1000000000'),('8333333002');")
res = c.execute("SELECT EMPI FROM COHORT;")
#%%
for line in res:
print(type(line))
print(line)
#%%
c.close()
import pandas as pd
import unittest
class TestStuff(unittest.TestCase):
def test_something(self):
self.assertEqual(2+1, 3)
if __name__ == '__main__':
# unittest.main()
t = TestStuff()
t.test_something()
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!