Commit 81f20848 by Selah Clarity

reorg, move stuff to clarity_tools repo

1 parent 0a5a5fb6
--examples for project proposal
SELECT
e.PAT_ID,
e.PAT_ENC_CSN_ID,
e.CONTACT_DATE,
et.NAME ENC_TYPE,
cd.DEPARTMENT_NAME
FROM pat_enc e
LEFT JOIN ZC_DISP_ENC_TYPE et ON e.ENC_TYPE_C = et.DISP_ENC_TYPE_C -- Encounter type
LEFT JOIN CLARITY_DEP cd ON e.DEPARTMENT_ID = cd.DEPARTMENT_ID -- Visit department
WHERE e.pat_id in (SELECT PAT_ID FROM ##COHORT )
;
import unittest
import sqlalchemy
from unittest.mock import MagicMock
def get_mssql_engine(
username="lynchse",
host="clarityprod.uphs.upenn.edu",
database="clarity_snapshot_db",
domain="UPHS",
port="1433",
timeout=600,
password=None,
):
from sqlalchemy import create_engine
user = domain + "\\" + username
clarity_engine = create_engine(f"mssql+pymssql://{user}:{password}@{host}:{port}/{database}?timeout={timeout}")
return clarity_engine
selahcredsfilename = 'C:\\Users\\LynchSe\\Documents\\selah_clarity_credentials.txt'
def get_clarity_engine(credsfilename = selahcredsfilename, timeout=600):
with open(credsfilename, 'r') as credsfile:
name = credsfile.readline().strip()
pw = credsfile.readline().strip()
clarity_engine = get_mssql_engine(username=name, password=pw, timeout=timeout)
return clarity_engine
##### BEGIN ACTUAL TESTS #####
class TestStuff(unittest.TestCase):
def close_conn(self):
eng = get_clarity_engine()
with eng.connect() as conn:
res = conn.execute('SELECT TOP 3 PAT_ID FROM PAT_ENC')
self.assertEqual(len(list(res)), 3)
# conn.close()
def test_temp_table_after_reconnect(self):
eng = get_clarity_engine()
with eng.connect() as conn:
conn.execute('DROP TABLE IF EXISTS ##COHORT')
conn.execute('SELECT TOP 3 PAT_ID INTO ##COHORT FROM PAT_ENC')
res = conn.execute('SELECT * FROM ##COHORT')
self.assertEqual(len(list(res)), 3)
with eng.connect() as conn:
with self.assertRaises(Exception) as e:
res = conn.execute('SELECT * FROM ##COHORT')
# print(e.exception)
def test_temp_table_both_handles(self):
eng = get_clarity_engine()
with eng.connect() as conn:
conn.execute('DROP TABLE IF EXISTS ##COHORT')
conn.execute('SELECT TOP 3 PAT_ID INTO ##COHORT FROM PAT_ENC')
res = conn.execute('SELECT * FROM ##COHORT')
self.assertEqual(len(list(res)), 3)
with eng.raw_connection().cursor() as cur:
cur.execute('DROP TABLE IF EXISTS ##COHORT')
cur.execute('SELECT TOP 3 PAT_ID INTO ##COHORT FROM PAT_ENC')
cur.execute('SELECT * FROM ##COHORT')
self.assertEqual(len([row for row in cur]), 3)
if __name__ == '__main__':
tests_to_run = [
"close_conn"
,"test_temp_table_after_reconnect"
,"test_temp_table_both_handles"
]
suite = unittest.TestSuite()
for test in tests_to_run:
suite.addTest(TestStuff(test))
runner = unittest.TextTestRunner()
runner.run(suite)
# unittest.main()
from sqlalchemy import create_engine
from getpass import getpass
import pymssql
import csv
import pandas as pd
#%% Andrew functions
def get_mssql_engine(
username="lynchse",
host="clarityprod.uphs.upenn.edu",
database="clarity_snapshot_db",
domain="UPHS",
port="1433",
timeout=7200,
password=None,
):
from sqlalchemy import create_engine
if password is None:
password = getpass("PW: ")
user = domain + "\\" + username
return create_engine(
f"mssql+pymssql://{user}:{password}@{host}:{port}/{database}?timeout={timeout}"
)
#%% My functions
selahcredsfilename = 'C:\\Users\\LynchSe\\Documents\\selah_clarity_credentials.txt'
def get_clarity_engine(credsfilename = selahcredsfilename, timeout=600):
with open(credsfilename, 'r') as credsfile:
nameline = credsfile.readline().strip()
pwline = credsfile.readline().strip()
clarity_engine = get_mssql_engine(username=nameline, password=pwline, timeout=timeout)
return clarity_engine
def clarity_to_csv(sqlfilename, csvfilenames, dbconn=None):
print("Running SQL from {}".format(sqlfilename))
with open(sqlfilename, 'r') as sqlfile:
sqltext = sqlfile.read()
eng = get_clarity_engine()
if dbconn:
clarity_to_csv_inner(sqltext, csvfilenames, dbconn)
else:
with eng.connect() as sqalconn:
clarity_to_csv_inner(sqltext, csvfilenames, dbconn)
def clarity_to_csv_inner(sqltext, csvfilenames, sqalconn, verbose=False):
import sqlparse
sqltext_cleaned = sqlparse.format(sqltext, strip_comments=True).strip()
sqlstatements = sqltext_cleaned.split(';')
if sqlstatements[-1].strip() == '':
sqlstatements.pop() # often there is a final semicolon leading to a empty last statement
which_statement = 0
which_csvfile = 0
for sqlstatement in sqlstatements:
if verbose == True:
print("\n\nProcessing statement: '{}'\n".format(sqlstatement.strip()))
#TODO - time the query execution time
results = sqalconn.execute(sqlstatement) #this is hanging?
if results.returns_rows == True: #ONLY DO THIS IFFFF there are >0 results rows
csvname = csvfilenames[which_csvfile]
which_csvfile += 1
if csvname:
print("Writing CSV file {}".format(csvname))
#TODO - time the file writing time
with open(csvname, 'w', newline='\n', encoding='utf-8') as csvfile:
line_count = 0
mycsvwriter = csv.writer(csvfile, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
#TODO - write column names
mycsvwriter.writerow(results.keys())
line_count += 1
for row in results:
# print("Line {}".format(line_count))
mycsvwriter.writerow(row)
line_count += 1
print("Done writing CSV file!")
which_statement += 1
import unittest
import clarity_to_csv as ctc
import pandas as pd
from unittest.mock import MagicMock
import sqlparse
#%%
testquerydir = 'C:\\Users\\LynchSe\\Documents\\Repos\\Covid19Related\\selah\\clarity_to_csv_tests\\'
testdatadir = 'C:\\Users\\LynchSe\\Documents\\Data\\clarity_to_csv_tests\\'
#TODO spin up a sqlite database here
def line_count(filename):
with open(filename) as myfile:
return len(myfile.readlines())
def remove_files(filenamelist):
import subprocess
for filename in filenamelist:
try:
subprocess.check_output('rm {}'.format(filename))
except Exception as e:
print(e)
pass
class TestStuff(unittest.TestCase):
def test_comment_with_semicolon(self):
sqltext = '''
SELECT TOP 2 PAT_ID FROM PAT_ENC;
/*
SELECT 1; SELECT 2;
*/
'''
genericcsvs = [
testdatadir + 'top2_pat_enc.csv'
]
remove_files(genericcsvs)
with ctc.get_clarity_engine().connect() as sqalconn:
ctc.clarity_to_csv_inner(sqltext, genericcsvs, sqalconn)
def test_none_csv(self):
sqlfilename = testquerydir + "testCohort.sql"
genericcsvs = [
None, #it should just skip this one
testdatadir + 'two_empis.csv',
]
remove_files(genericcsvs)
with ctc.get_clarity_engine().connect() as sqalconn:
ctc.clarity_to_csv(sqlfilename, genericcsvs, dbconn=sqalconn)
def integration_test(self):
sqlfilename1 = testquerydir + "testCohort.sql"
sqlfilename2 = testquerydir + "readTestCohort.sql"
genericcsvs = [
testdatadir + 'test1.csv',
testdatadir + 'test2.csv',
]
remove_files(genericcsvs)
with ctc.get_clarity_engine().connect() as sqalconn:
ctc.clarity_to_csv(sqlfilename1, genericcsvs, dbconn=sqalconn)
ctc.clarity_to_csv(sqlfilename2, genericcsvs, dbconn=sqalconn)
def unicode_error(self):
genericcsvs = [
testdatadir + 'test_cohort.csv'
]
remove_files(genericcsvs)
#I think it hangs if I don't include 'IF EXISTS'. GRRR... why??
sql_text = r'''
SELECT * FROM
X_COVID19_LAB_ORDERS l
WHERE PAT_ID IN ('055948350', '041356734', '057338121');
'''
with ctc.get_clarity_engine().connect() as conn:
ctc.clarity_to_csv_inner(sql_text, genericcsvs, conn)
def test_simple(self):
genericcsvs = [
testdatadir + 'test1.csv',
testdatadir + 'test2.csv'
]
remove_files(genericcsvs)
simple_sql_text = '''SELECT TOP 10 PAT_ID, PAT_ENC_CSN_ID FROM PAT_ENC;
SELECT TOP 5 CONTACT_DATE FROM PAT_ENC tablesample(0.01);
'''
with ctc.get_clarity_engine().connect() as conn:
ctc.clarity_to_csv_inner(simple_sql_text, genericcsvs, conn)
self.assertEqual(line_count( testdatadir + 'test1.csv'), 11)
self.assertEqual(line_count( testdatadir + 'test2.csv'), 6)
def test_wrapper(self):
sqlfilename = testquerydir + "tinyTestQuery.sql"
genericcsvs = [
testdatadir + 'test1.csv',
testdatadir + 'test2.csv',
testdatadir + 'test3.csv'
]
remove_files(genericcsvs)
ctc.clarity_to_csv(sqlfilename, genericcsvs)
#I believe this is hanging
def test_cohort(self):
genericcsvs = [
testdatadir + 'test_cohort.csv'
]
remove_files(genericcsvs)
#I think it hangs if I don't include 'IF EXISTS'. GRRR... why??
sql_text = '''
DROP TABLE IF EXISTS ##cohort;
CREATE TABLE ##cohort (EMPI VARCHAR(90) NOT NULL);
INSERT INTO ##cohort (EMPI) values ('8001111117'),('1000000000');
SELECT * FROM ##cohort;
'''
with ctc.get_clarity_engine().connect() as conn:
ctc.clarity_to_csv_inner(sql_text, genericcsvs, conn)
self.assertEqual(line_count(testdatadir + 'test_cohort.csv'), 3) #a header and two values
#TODO - deal with wrong number of csv's supplied
if __name__ == '__main__':
tests_to_run = [
"test_comment_with_semicolon"
# , "test_none_csv"
# , "integration_test"
# , "unicode_error"
# , "test_simple"
# , "test_wrapper"
# , "test_cohort"
]
suite = unittest.TestSuite()
for test in tests_to_run:
suite.addTest(TestStuff(test))
runner = unittest.TextTestRunner()
runner.run(suite)
# unittest.main()
SELECT * FROM ##RADCOHORT;
\ No newline at end of file \ No newline at end of file
--simple cohort
DROP TABLE IF EXISTS ##COHORT;
CREATE TABLE ##COHORT (EMPI VARCHAR(90) NOT NULL);
INSERT INTO ##COHORT (EMPI) values
('8001111117'),
('1000000000'),
('8333333002')
;
SELECT * FROM ##COHORT;
DROP TABLE IF EXISTS ##RADCOHORT;
SELECT ##COHORT.EMPI EMPI_GIVEN INTO ##RADCOHORT FROM ##COHORT;
SELECT TOP 2 * FROM ##RADCOHORT;
SELECT TOP 10 PAT_ID, PAT_ENC_CSN_ID FROM PAT_ENC;
SELECT DISTINCT CONTACT_DATE FROM PAT_ENC tablesample(0.001);
SELECT * FROM PAT_ENC tablesample(0.001);
import unittest
import sqlalchemy
from unittest.mock import MagicMock
def get_mssql_engine(
username="lynchse",
host="clarityprod.uphs.upenn.edu",
database="clarity_snapshot_db",
domain="UPHS",
port="1433",
timeout=600,
password=None,
):
from sqlalchemy import create_engine
user = domain + "\\" + username
clarity_engine = create_engine(f"mssql+pymssql://{user}:{password}@{host}:{port}/{database}?timeout={timeout}")
return clarity_engine
selahcredsfilename = 'C:\\Users\\LynchSe\\Documents\\selah_clarity_credentials.txt'
def get_clarity_engine(credsfilename = selahcredsfilename, timeout=600):
with open(credsfilename, 'r') as credsfile:
name = credsfile.readline().strip()
pw = credsfile.readline().strip()
clarity_engine = get_mssql_engine(username=name, password=pw, timeout=timeout)
return clarity_engine
##### BEGIN ACTUAL TESTS #####
class TestStuff(unittest.TestCase):
def test_sqlite(self):
from sqlalchemy import create_engine
sqlite_db_filepath = "C:\\Users\\LynchSe\\Documents\\Data\\database.db"
e = create_engine('sqlite:///{}'.format(sqlite_db_filepath))
c = e.connect()
c.execute('SELECT 1;')
c.execute('CREATE TABLE IF NOT EXISTS COHORT (EMPI VARCHAR(90) NOT NULL);')
c.execute("INSERT INTO COHORT (EMPI) values ('8001111117'),('1000000000'),('8333333002');")
res = c.execute("SELECT EMPI FROM COHORT;")
for line in res:
print(type(line))
print(line)
c.close()
import subprocess
subprocess.check_output('rm {}'.format(sqlite_db_filepath))
def test_drop_cohort(self):
eng = get_clarity_engine(timeout=10)
print("Am I about to hang like a shitty database engine?")
with eng.connect() as conn:
conn.execute('DROP TABLE IF EXISTS ##cohort;')
def test_sqalch_insert_read(self):
eng = get_clarity_engine()
with eng.connect() as conn:
conn.execute('DROP TABLE IF EXISTS ##COHORT')
conn.execute('SELECT TOP 3 PAT_ID INTO ##COHORT FROM PAT_ENC')
res = conn.execute('SELECT * FROM ##COHORT')
self.assertEqual(len(list(res)), 3)
with eng.raw_connection().cursor() as cur:
cur.execute('DROP TABLE IF EXISTS ##COHORT')
cur.execute('SELECT TOP 3 PAT_ID INTO ##COHORT FROM PAT_ENC')
cur.execute('SELECT * FROM ##COHORT')
self.assertEqual(len([row for row in cur]), 3)
if __name__ == '__main__':
tests_to_run = [
"test_sqlite"
# ,"try_a_mock"
# ,"test_sqalch_insert_read"
# ,"test_shitty_hang"
]
suite = unittest.TestSuite()
for test in tests_to_run:
suite.addTest(TestStuff(test))
runner = unittest.TextTestRunner()
runner.run(suite)
# unittest.main()
import unittest
import sqlalchemy
from unittest.mock import MagicMock
def get_mssql_engine(
username="lynchse",
host="clarityprod.uphs.upenn.edu",
database="clarity_snapshot_db",
domain="UPHS",
port="1433",
timeout=600,
password=None,
):
from sqlalchemy import create_engine
user = domain + "\\" + username
clarity_engine = create_engine(f"mssql+pymssql://{user}:{password}@{host}:{port}/{database}?timeout={timeout}")
return clarity_engine
selahcredsfilename = 'C:\\Users\\LynchSe\\Documents\\selah_clarity_credentials.txt'
def get_clarity_engine(credsfilename = selahcredsfilename, timeout=600):
with open(credsfilename, 'r') as credsfile:
name = credsfile.readline().strip()
pw = credsfile.readline().strip()
clarity_engine = get_mssql_engine(username=name, password=pw, timeout=timeout)
return clarity_engine
##### BEGIN ACTUAL TESTS #####
class TestStuff(unittest.TestCase):
def test_sqlite(self):
from sqlalchemy import create_engine
sqlite_db_filepath = "C:\\Users\\LynchSe\\Documents\\Data\\database.db"
e = create_engine('sqlite:///{}'.format(sqlite_db_filepath))
c = e.connect()
c.execute('SELECT 1;')
c.execute('CREATE TABLE IF NOT EXISTS COHORT (EMPI VARCHAR(90) NOT NULL);')
c.execute("INSERT INTO COHORT (EMPI) values ('8001111117'),('1000000000'),('8333333002');")
res = c.execute("SELECT EMPI FROM COHORT;")
for line in res:
print(type(line))
print(line)
c.close()
import subprocess
subprocess.check_output('rm {}'.format(sqlite_db_filepath))
def test_drop_cohort(self):
eng = get_clarity_engine(timeout=10)
print("Am I about to hang like a shitty database engine?")
with eng.connect() as conn:
conn.execute('DROP TABLE IF EXISTS ##cohort;')
def test_sqalch_insert_read(self):
eng = get_clarity_engine()
with eng.connect() as conn:
conn.execute('DROP TABLE IF EXISTS ##COHORT')
conn.execute('SELECT TOP 3 PAT_ID INTO ##COHORT FROM PAT_ENC')
res = conn.execute('SELECT * FROM ##COHORT')
self.assertEqual(len(list(res)), 3)
with eng.raw_connection().cursor() as cur:
cur.execute('DROP TABLE IF EXISTS ##COHORT')
cur.execute('SELECT TOP 3 PAT_ID INTO ##COHORT FROM PAT_ENC')
cur.execute('SELECT * FROM ##COHORT')
self.assertEqual(len([row for row in cur]), 3)
if __name__ == '__main__':
tests_to_run = [
"test_sqlite"
# ,"try_a_mock"
# ,"test_sqalch_insert_read"
# ,"test_shitty_hang"
]
suite = unittest.TestSuite()
for test in tests_to_run:
suite.addTest(TestStuff(test))
runner = unittest.TextTestRunner()
runner.run(suite)
# unittest.main()
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!