This is a setup script for mysql_example. It downloads a zip file of Illinois campaign contributions and loads them into a MySQL database named ‘contributions’.
Note: You will need to run this script first before execuing mysql_example.py.
Tables created: * raw_table - raw import of entire CSV file * donors - all distinct donors based on name and address * recipients - all distinct campaign contribution recipients * contributions - contribution amounts tied to donor and recipients tables
import os
import warnings
import zipfile
from urllib.request import urlopen
import MySQLdb
warnings.filterwarnings("ignore", category=MySQLdb.Warning)
contributions_zip_file = "Illinois-campaign-contributions.txt.zip"
contributions_txt_file = "Illinois-campaign-contributions.txt"
if not os.path.exists(contributions_zip_file):
print("downloading", contributions_zip_file, "(~60mb) ...")
u = urlopen(
"https://s3.amazonaws.com/dedupe-data/Illinois-campaign-contributions.txt.zip"
)
localFile = open(contributions_zip_file, "wb")
localFile.write(u.read())
localFile.close()
if not os.path.exists(contributions_txt_file):
zip_file = zipfile.ZipFile(contributions_zip_file, "r")
print("extracting %s" % contributions_zip_file)
zip_file_contents = zip_file.namelist()
for f in zip_file_contents:
if ".txt" in f:
zip_file.extract(f)
zip_file.close()
conn = MySQLdb.connect(
read_default_file=os.path.abspath(".") + "/mysql.cnf",
local_infile=1,
sql_mode="ALLOW_INVALID_DATES",
db="contributions",
)
c = conn.cursor()
print("importing raw data from csv...")
c.execute("DROP TABLE IF EXISTS raw_table")
c.execute("DROP TABLE IF EXISTS donors")
c.execute("DROP TABLE IF EXISTS recipients")
c.execute("DROP TABLE IF EXISTS contributions")
c.execute("DROP TABLE IF EXISTS processed_donors")
c.execute(
"CREATE TABLE raw_table "
"(reciept_id INT, last_name VARCHAR(70), first_name VARCHAR(35), "
" address_1 VARCHAR(35), address_2 VARCHAR(36), city VARCHAR(20), "
" state VARCHAR(15), zip VARCHAR(11), report_type VARCHAR(24), "
" date_recieved VARCHAR(10), loan_amount VARCHAR(12), "
" amount VARCHAR(23), receipt_type VARCHAR(23), "
" employer VARCHAR(70), occupation VARCHAR(40), "
" vendor_last_name VARCHAR(70), vendor_first_name VARCHAR(20), "
" vendor_address_1 VARCHAR(35), vendor_address_2 VARCHAR(31), "
" vendor_city VARCHAR(20), vendor_state VARCHAR(10), "
" vendor_zip VARCHAR(10), description VARCHAR(90), "
" election_type VARCHAR(10), election_year VARCHAR(10), "
" report_period_begin VARCHAR(10), report_period_end VARCHAR(33), "
" committee_name VARCHAR(70), committee_id VARCHAR(37)) "
"CHARACTER SET utf8 COLLATE utf8_unicode_ci"
)
conn.commit()
c.execute(
"LOAD DATA LOCAL INFILE %s INTO TABLE raw_table "
"FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\r\n' "
"IGNORE 1 LINES "
"(reciept_id, last_name, first_name, "
" address_1, address_2, city, state, "
" zip, report_type, date_recieved, "
" loan_amount, amount, receipt_type, "
" employer, occupation, vendor_last_name, "
" vendor_first_name, vendor_address_1, "
" vendor_address_2, vendor_city, vendor_state, "
" vendor_zip, description, election_type, "
" election_year, "
" report_period_begin, report_period_end, "
" committee_name, committee_id, @dummy)",
(contributions_txt_file,),
)
Remove the very few records that mess up the demo (demo purposes only! Don’t do something like this in production)
c.execute("DELETE FROM raw_table WHERE LENGTH(date_recieved) < 10")
set empty, non-zero, strings in date columns to null
c.execute(
"UPDATE raw_table SET report_period_begin = NULL WHERE LENGTH(report_period_begin) < 10"
)
c.execute(
"UPDATE raw_table SET report_period_end = NULL WHERE LENGTH(report_period_end) < 10"
)
committee ID is requred. Remove the 2 rows that don’t have it.
c.execute("DELETE FROM raw_table WHERE committee_id=''")
There’s a record with a date stuck in the committee_id column, which causes problems when inserting into the contributions table below. Get rid of it this way.
c.execute("DELETE FROM raw_table WHERE LENGTH( committee_id ) > 9")
conn.commit()
print("creating donors table...")
c.execute(
"CREATE TABLE donors "
"(donor_id INTEGER PRIMARY KEY AUTO_INCREMENT, "
" last_name VARCHAR(70), first_name VARCHAR(35), "
" address_1 VARCHAR(35), address_2 VARCHAR(36), "
" city VARCHAR(20), state VARCHAR(15), "
" zip VARCHAR(11), employer VARCHAR(70), "
" occupation VARCHAR(40)) "
"CHARACTER SET utf8 COLLATE utf8_unicode_ci"
)
c.execute(
"INSERT INTO donors "
"(first_name, last_name, address_1,"
" address_2, city, state, zip, employer, occupation) "
"SELECT DISTINCT "
"TRIM(first_name), TRIM(last_name), TRIM(address_1), "
"TRIM(address_2), TRIM(city), TRIM(state), TRIM(zip), "
"TRIM(employer), TRIM(occupation) "
"FROM raw_table"
)
conn.commit()
print("creating indexes on donors table")
c.execute(
"CREATE INDEX donors_donor_info ON donors "
"(last_name, first_name, address_1, address_2, city, "
" state, zip)"
)
conn.commit()
print("creating recipients table...")
c.execute(
"CREATE TABLE recipients "
"(recipient_id INTEGER PRIMARY KEY AUTO_INCREMENT, name VARCHAR(70)) "
"CHARACTER SET utf8 COLLATE utf8_unicode_ci"
)
c.execute(
"INSERT IGNORE INTO recipients "
"SELECT DISTINCT committee_id, committee_name FROM raw_table"
)
conn.commit()
print("creating contributions table")
c.execute(
"CREATE TABLE contributions "
"(contribution_id INT, donor_id INT, recipient_id INT, "
" report_type VARCHAR(24), date_recieved DATE, "
" loan_amount VARCHAR(12), amount VARCHAR(23), "
" receipt_type VARCHAR(23), "
" vendor_last_name VARCHAR(70), "
" vendor_first_name VARCHAR(20), "
" vendor_address_1 VARCHAR(35), vendor_address_2 VARCHAR(31), "
" vendor_city VARCHAR(20), vendor_state VARCHAR(10), "
" vendor_zip VARCHAR(10), description VARCHAR(90), "
" election_type VARCHAR(10), election_year VARCHAR(10), "
" report_period_begin DATE, report_period_end DATE) "
"CHARACTER SET utf8 COLLATE utf8_unicode_ci"
)
c.execute(
"INSERT INTO contributions "
"SELECT reciept_id, donors.donor_id, committee_id, "
" report_type, STR_TO_DATE(date_recieved, '%m/%d/%Y'), "
" loan_amount, amount, "
" receipt_type, vendor_last_name , "
" vendor_first_name, vendor_address_1, vendor_address_2, "
" vendor_city, vendor_state, vendor_zip, description, "
" election_type, election_year, "
" STR_TO_DATE(report_period_begin, '%m/%d/%Y'), "
" STR_TO_DATE(report_period_end, '%m/%d/%Y') "
"FROM raw_table JOIN donors ON "
"donors.first_name = TRIM(raw_table.first_name) AND "
"donors.last_name = TRIM(raw_table.last_name) AND "
"donors.address_1 = TRIM(raw_table.address_1) AND "
"donors.address_2 = TRIM(raw_table.address_2) AND "
"donors.city = TRIM(raw_table.city) AND "
"donors.state = TRIM(raw_table.state) AND "
"donors.employer = TRIM(raw_table.employer) AND "
"donors.occupation = TRIM(raw_table.occupation) AND "
"donors.zip = TRIM(raw_table.zip)"
)
conn.commit()
print("creating indexes on contributions")
c.execute("ALTER TABLE contributions ADD PRIMARY KEY(contribution_id)")
c.execute("CREATE INDEX donor_idx ON contributions (donor_id)")
c.execute("CREATE INDEX recipient_idx ON contributions (recipient_id)")
conn.commit()
print("nullifying empty strings in donors")
c.execute(
"UPDATE donors "
"SET "
"first_name = CASE first_name WHEN '' THEN NULL ELSE first_name END, "
"last_name = CASE last_name WHEN '' THEN NULL ELSE last_name END, "
"address_1 = CASE address_1 WHEN '' THEN NULL ELSE address_1 END, "
"address_2 = CASE address_2 WHEN '' THEN NULL ELSE address_2 END, "
"city = CASE city WHEN '' THEN NULL ELSE city END, "
"state = CASE state WHEN '' THEN NULL ELSE state END, "
"employer = CASE employer WHEN '' THEN NULL ELSE employer END, "
"occupation = CASE occupation WHEN '' THEN NULL ELSE occupation END, "
"zip = CASE zip WHEN '' THEN NULL ELSE zip END"
)
conn.commit()
c.execute(
"CREATE TABLE processed_donors AS "
"(SELECT donor_id, "
" LOWER(city) AS city, "
" CASE WHEN (first_name IS NULL AND last_name IS NULL) "
" THEN NULL "
" ELSE LOWER(CONCAT_WS(' ', first_name, last_name)) "
" END AS name, "
" LOWER(zip) AS zip, "
" LOWER(state) AS state, "
" CASE WHEN (address_1 IS NULL AND address_2 IS NULL) "
" THEN NULL "
" ELSE LOWER(CONCAT_WS(' ', address_1, address_2)) "
" END AS address, "
" LOWER(occupation) AS occupation, "
" LOWER(employer) AS employer, "
" ISNULL(first_name) AS person "
" FROM donors)"
)
c.execute("CREATE INDEX donor_idx ON processed_donors (donor_id)")
c.close()
conn.close()
print("done")