Inserting data from a CSV into a Postgres database#
Our dataset of legislation information is a big big big CSV - over a million rows! To make it easier to manage later on - to process it in chunks, to query easily - it's probably best to move it into a database. We're going to use a Postgres database. On OS X an easy way to install PostgreSQL is using Postgres.app
Read in our data#
We'll start by reading in our dataset.
import pandas as pd
df = pd.read_csv("data/bills-with-urls.csv")
df.shape
df.head()
Create our database#
You could create the database by actually logging into postgres and running the commands below, but we'll do it by using the sqlalchemy
and psycopg2
packages.
Our database is going to be called legislation
. Just in case it's already been created, we're going to forcibly delete it and then re-create it.
from sqlalchemy import create_engine
engine = create_engine('postgresql://localhost:5432/postgres', isolation_level="AUTOCOMMIT")
with engine.connect() as conn:
try:
# If anyone is connected to it, we'll need to kick them off.
conn.execute("""
SELECT pg_terminate_backend(pg_stat_activity.pid)
FROM pg_stat_activity
WHERE pg_stat_activity.datname = 'legislation' AND pid <> pg_backend_pid();
""")
except:
pass
conn.execute("""
DROP DATABASE IF EXISTS legislation;
""")
conn.execute("""
CREATE DATABASE legislation;
""")
We'll then connect to our database and create the table we'll be storing our bills in. If you need a reminder of what the fields were that we created:
df.head(1).T
We'll also create fields to keep track of converting them to text:
- A content column to store the text of the bill
- An error column to store a note about any errors we encountered processing the bill
- A processed_at column to keep track of which ones have been attempted and which have not.
engine = create_engine('postgresql://localhost:5432/legislation', isolation_level="AUTOCOMMIT")
with engine.connect() as conn:
conn.execute("""
CREATE TABLE public.bills (
id serial NOT NULL,
bill_id numeric NOT NULL,
code text,
bill_number text,
title text,
description text,
state varchar(2),
session text,
filename text,
status numeric,
status_date TIMESTAMPTZ,
url text,
content text,
error text,
processed_at TIMESTAMPTZ,
PRIMARY KEY ("id")
);
""")
Insert our data into the database#
Now that we have our database created, we can insert them all into the database. We'll be using pandas' to_sql
method. How many are we adding?
df.shape
...and add some empty columns#
We'll also specify some extra columns that we'll be using later like content and error. We need to make them blank first, then set the dtype
when we save to the database so that postgres knows they're text columns.
import numpy as np
df['error'] = np.nan
df['content'] = np.nan
df['processed_at'] = np.nan
df.head(2)
import sqlalchemy
engine = create_engine('postgresql://localhost:5432/legislation', isolation_level="AUTOCOMMIT")
df.to_sql('bills',
engine,
if_exists='replace',
index_label='id',
chunksize=10000,
dtype= {
'bill_id': sqlalchemy.types.INTEGER(),
'code': sqlalchemy.types.TEXT(),
'bill_number': sqlalchemy.types.TEXT(),
'title': sqlalchemy.types.TEXT(),
'description': sqlalchemy.types.TEXT(),
'state': sqlalchemy.types.TEXT(),
'session': sqlalchemy.types.TEXT(),
'filename': sqlalchemy.types.TEXT(),
'status': sqlalchemy.types.INTEGER(),
'status_date': sqlalchemy.types.TIMESTAMP(timezone=False),
'url': sqlalchemy.types.TEXT(),
'content': sqlalchemy.types.TEXT(),
'error': sqlalchemy.types.TEXT(),
'processed_at': sqlalchemy.types.TIMESTAMP(timezone=True),
})
# Add indices to speed up processing
with engine.connect() as conn:
conn.execute("""
CREATE INDEX idx_bill_id ON bills (bill_id);
CREATE INDEX idx_processed_at ON bills (processed_at);
""")