内联记录的代码
import sqlparse
line = '''
CREATE TABLE public.actor (
actor_id integer DEFAULT nextval('public.actor_actor_id_seq'::regclass) NOT NULL,
first_name character varying(45) NOT NULL,
last_name character varying(45) NOT NULL,
last_update timestamp without time zone DEFAULT now() NOT NULL
);
CREATE TABLE public.category (
category_id integer DEFAULT nextval('public.category_category_id_seq'::regclass) NOT NULL,
name character varying(25) NOT NULL,
last_update timestamp without time zone DEFAULT now() NOT NULL
);
CREATE TABLE IF NOT EXISTS "sample_schema"."sample_table"
(
"div_cd" VARCHAR(2) NOT NULL
,"div_name" VARCHAR(30) NOT NULL
,"org_cd" VARCHAR(8) NOT NULL
,"org_name" VARCHAR(60) NOT NULL
,"team_cd" VARCHAR(2) NOT NULL
,"team_name" VARCHAR(120) NOT NULL
,"personal_cd" VARCHAR(7) NOT NULL
,"personal_name" VARCHAR(300) NOT NULL
,"username" VARCHAR(6) NOT NULL
,"staff_flg" CHAR(1) DEFAULT '0'::bpchar ENCODE lzo
,"leader_flg" CHAR(1) DEFAULT '0'::bpchar ENCODE lzo
)
DISTSTYLE EVEN
;
CREATE TABLE IF NOT EXISTS "sample_schema"."ref_table"
(
"staff_flg" CHAR(1) DEFAULT '0'::bpchar SORTKEY ENCODE lzo
,"leader_flg" CHAR(1) DEFAULT '0'::bpchar ENCODE lzo
)
DISTSTYLE EVEN
;
'''
def get_table_name(tokens):
for token in reversed(tokens):
if token.ttype is None:
return token.value
return " "
parse = sqlparse.parse(line)
for stmt in parse:
# Get all the tokens except whitespaces
tokens = [t for t in sqlparse.sql.TokenList(stmt.tokens) if t.ttype != sqlparse.tokens.Whitespace]
is_create_stmt = False
for i, token in enumerate(tokens):
# Is it a create statements ?
if token.match(sqlparse.tokens.DDL, 'CREATE'):
is_create_stmt = True
continue
# If it was a create statement and the current token starts with "("
if is_create_stmt and token.value.startswith("("):
# Get the table name by looking at the tokens in reverse order till you find
# a token with None type
print (f"table: {get_table_name(tokens[:i])}")
# Now parse the columns
txt = token.value
columns = txt[1:txt.rfind(")")].replace("\n","").split(",")
for column in columns:
c = ' '.join(column.split()).split()
c_name = c[0].replace('\"',"")
c_type = c[1] # For condensed type information
# OR
#c_type = " ".join(c[1:]) # For detailed type information
print (f"column: {c_name}")
print (f"date type: {c_type}")
print ("---"*20)
break
输出:
table: public.actor
column: actor_id
date type: integer
column: first_name
date type: character
column: last_name
date type: character
column: last_update
date type: timestamp
------------------------------------------------------------
table: public.category
column: category_id
date type: integer
column: name
date type: character
column: last_update
date type: timestamp
------------------------------------------------------------
table: "sample_schema"."sample_table"
column: div_cd
date type: VARCHAR(2)
column: div_name
date type: VARCHAR(30)
column: org_cd
date type: VARCHAR(8)
column: org_name
date type: VARCHAR(60)
column: team_cd
date type: VARCHAR(2)
column: team_name
date type: VARCHAR(120)
column: personal_cd
date type: VARCHAR(7)
column: personal_name
date type: VARCHAR(300)
column: username
date type: VARCHAR(6)
column: staff_flg
date type: CHAR(1)
column: leader_flg
date type: CHAR(1)
------------------------------------------------------------
table: "sample_schema"."ref_table"
column: staff_flg
date type: CHAR(1)
column: leader_flg
date type: CHAR(1)
------------------------------------------------------------