Skip to content
Toggle navigation
Projects
Groups
Snippets
Help
Toggle navigation
This project
Loading...
Sign in
cicTeam
/
Clarity_Tools_Selah
Go to a project
Project
Repository
Issues
0
Merge Requests
0
Pipelines
Wiki
Settings
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Commit 1f22d1e8
authored
Nov 04, 2021
by
Selah Clarity
Browse Files
Options
Browse Files
Tag
Download
Plain Diff
Merge remote-tracking branch 'origin/master'
2 parents
384cd839
b42dddc6
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
150 additions
and
0 deletions
.gitignore
sql_to_schema.py
sql_to_schema_test.py
.gitignore
View file @
1f22d1e
*.pyc
*.pyc
.DS_Store
sql_to_schema.py
0 → 100644
View file @
1f22d1e
import
pandas
as
pd
import
numpy
as
np
import
sqlparse
def
filter_whitespace
(
tokenlist
):
tokenlistfilt
=
[]
for
token
in
tokenlist
:
if
(
not
(
token
.
ttype
==
sqlparse
.
tokens
.
Whitespace
)
or
(
token
.
ttype
==
sqlparse
.
tokens
.
Whitespace
.
Newline
)):
tokenlistfilt
.
append
(
token
)
return
tokenlistfilt
def
extract_tables
(
sql
):
tables
=
[]
sqlp
=
sqlparse
.
parse
(
sql
)
tks
=
sqlp
[
0
]
.
tokens
tksf
=
filter_whitespace
(
tks
)
cursor
=
0
;
chunk
=
tksf
[
cursor
]
state
=
"start"
while
(
state
!=
'done'
):
if
state
==
"start"
:
assert
chunk
.
value
==
'SELECT'
cursor
+=
1
;
chunk
=
tksf
[
cursor
]
assert
type
(
chunk
)
in
[
sqlparse
.
sql
.
IdentifierList
,
sqlparse
.
sql
.
Identifier
]
cursor
+=
1
;
chunk
=
tksf
[
cursor
]
assert
chunk
.
value
==
'FROM'
cursor
+=
1
;
chunk
=
tksf
[
cursor
]
state
=
"afterFROM"
if
state
==
"afterFROM"
:
assert
type
(
chunk
)
==
sqlparse
.
sql
.
Identifier
tables
.
append
(
chunk
)
cursor
+=
1
;
if
cursor
>=
len
(
tksf
):
state
=
"done"
else
:
state
=
"afterIdentifier"
chunk
=
tksf
[
cursor
]
if
state
==
"afterIdentifier"
:
chunk
=
tksf
[
cursor
]
assert
chunk
.
value
in
[
"INNER JOIN"
,
"LEFT OUTER JOIN"
]
cursor
+=
1
;
chunk
=
tksf
[
cursor
]
assert
type
(
chunk
)
==
sqlparse
.
sql
.
Identifier
tables
.
append
(
chunk
)
cursor
+=
1
;
chunk
=
tksf
[
cursor
]
assert
chunk
.
value
==
'ON'
cursor
+=
1
;
chunk
=
tksf
[
cursor
]
assert
type
(
chunk
)
==
sqlparse
.
sql
.
Comparison
cursor
+=
1
;
if
cursor
>=
len
(
tksf
):
state
=
"done"
else
:
state
=
"afterFROM"
tables2
=
[]
for
table
in
tables
:
tsplit
=
table
.
value
.
split
(
' '
)
if
len
(
tsplit
)
==
2
:
tables2
.
append
({
'name'
:
tsplit
[
0
],
'alias'
:
tsplit
[
1
]})
else
:
tables2
.
append
({
'name'
:
tsplit
[
0
]})
return
tables2
#%% for experimenting
if
__name__
==
'__main__'
:
pass
\ No newline at end of file
\ No newline at end of file
sql_to_schema_test.py
0 → 100644
View file @
1f22d1e
import
pandas
as
pd
import
numpy
as
np
import
unittest
import
sql_to_schema
as
sts
class
TestStuff
(
unittest
.
TestCase
):
def
test_filter_whitespace
(
self
):
sql
=
"SELECT rc.dateCooked, r.name FROM recipeCooked rc INNER JOIN recipe r ON r.recipeID = rc.recipeID"
import
sqlparse
tks
=
sqlparse
.
parse
(
sql
)[
0
]
.
tokens
tksf
=
sts
.
filter_whitespace
(
tks
)
self
.
assertEqual
(
tksf
[
0
]
.
value
,
'SELECT'
)
self
.
assertEqual
(
tksf
[
2
]
.
value
,
'FROM'
)
def
test_extract_table
(
self
):
sql
=
"SELECT rc.dateCooked FROM recipeCooked"
tables
=
sts
.
extract_tables
(
sql
)
self
.
assertEqual
(
len
(
tables
),
1
)
self
.
assertEqual
(
tables
[
0
][
'name'
],
'recipeCooked'
)
def
test_extract_tables
(
self
):
sql
=
"SELECT rc.dateCooked, r.name FROM recipeCooked rc INNER JOIN recipe r ON r.recipeID = rc.recipeID"
tables
=
sts
.
extract_tables
(
sql
)
self
.
assertEqual
(
tables
[
0
][
'name'
],
'recipeCooked'
)
self
.
assertEqual
(
tables
[
1
][
'name'
],
'recipe'
)
# def test_extract_schema(self):
# sql = "SELECT rc.dateCooked, r.name FROM recipeCooked rc INNER JOIN recipe r ON r.recipeID = rc.recipeID"
# scm = sts.extract_schema(sql)
# self.assertEqual(scm,None)
if
__name__
==
'__main__'
:
# tests_to_run = [
# "test_sqlite"
# # ,"try_a_mock"
# # ,"test_sqalch_insert_read"
# # ,"test_shitty_hang"
# ]
# suite = unittest.TestSuite()
# for test in tests_to_run:
# suite.addTest(TestStuff(test))
# runner = unittest.TextTestRunner()
# runner.run(suite)
unittest
.
main
()
Write
Preview
Markdown
is supported
Attach a file
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to post a comment