Inserting data from a CSV into a Postgres database#

Our dataset of legislation information is a big big big CSV - over a million rows! To make it easier to manage later on - to process it in chunks, to query easily - it's probably best to move it into a database. We're going to use a Postgres database. On OS X an easy way to install PostgreSQL is using Postgres.app

Read in our data#

We'll start by reading in our dataset.

import pandas as pd

df = pd.read_csv("data/bills-with-urls.csv")
df.shape
(1253402, 11)
df.head()
bill_id code bill_number title description state session filename status status_date url
0 325258 HCR143 HCR143 House Concurrent Resolution Congratulating The... House Concurrent Resolution Congratulating The... VT 2011-2012 Session bill_data/VT/2011-2012_Regular_Session/bill/HC... 4 2011-04-22 http://www.leg.state.vt.us/docs/2012/Acts/ACTR...
1 285625 H0291 H0291 An Act Relating To Raising The Penalties For A... An Act Relating To Raising The Penalties For A... VT 2011-2012 Session bill_data/VT/2011-2012_Regular_Session/bill/H0... 1 2011-02-22 http://www.leg.state.vt.us/docs/2012/bills/Int...
2 398232 S0162 S0162 An Act Relating To Powers Of Attorney An Act Relating To Powers Of Attorney VT 2011-2012 Session bill_data/VT/2011-2012_Regular_Session/bill/S0... 1 2012-01-03 http://www.leg.state.vt.us/docs/2012/bills/Int...
3 243054 S0027 S0027 An Act Relating To The Role Of Municipalities ... An Act Relating To The Role Of Municipalities ... VT 2011-2012 Session bill_data/VT/2011-2012_Regular_Session/bill/S0... 1 2011-01-25 http://www.leg.state.vt.us/docs/2012/bills/Int...
4 417691 H0784 H0784 An Act Relating To Approval Of The Adoption An... An Act Relating To Approval Of The Adoption An... VT 2011-2012 Session bill_data/VT/2011-2012_Regular_Session/bill/H0... 4 2012-05-05 http://www.leg.state.vt.us/docs/2012/Acts/ACTM...

Create our database#

You could create the database by actually logging into postgres and running the commands below, but we'll do it by using the sqlalchemy and psycopg2 packages.

Our database is going to be called legislation. Just in case it's already been created, we're going to forcibly delete it and then re-create it.

from sqlalchemy import create_engine

engine = create_engine('postgresql://localhost:5432/postgres', isolation_level="AUTOCOMMIT")
with engine.connect() as conn:
    try:
        # If anyone is connected to it, we'll need to kick them off.
        conn.execute("""
        SELECT pg_terminate_backend(pg_stat_activity.pid)
            FROM pg_stat_activity
            WHERE pg_stat_activity.datname = 'legislation' AND pid <> pg_backend_pid();
        """)
    except:
        pass
    conn.execute("""
        DROP DATABASE IF EXISTS legislation;
    """)
    conn.execute("""
        CREATE DATABASE legislation;
    """)

We'll then connect to our database and create the table we'll be storing our bills in. If you need a reminder of what the fields were that we created:

df.head(1).T
0
bill_id 325258
code HCR143
bill_number HCR143
title House Concurrent Resolution Congratulating The...
description House Concurrent Resolution Congratulating The...
state VT
session 2011-2012 Session
filename bill_data/VT/2011-2012_Regular_Session/bill/HC...
status 4
status_date 2011-04-22
url http://www.leg.state.vt.us/docs/2012/Acts/ACTR...

We'll also create fields to keep track of converting them to text:

  • A content column to store the text of the bill
  • An error column to store a note about any errors we encountered processing the bill
  • A processed_at column to keep track of which ones have been attempted and which have not.
engine = create_engine('postgresql://localhost:5432/legislation', isolation_level="AUTOCOMMIT")
with engine.connect() as conn:
    conn.execute("""
        CREATE TABLE public.bills (
            id serial NOT NULL,
            bill_id numeric NOT NULL,
            code text,
            bill_number text,
            title text,
            description text,
            state varchar(2),
            session text,
            filename text,
            status numeric,
            status_date TIMESTAMPTZ,   
            url text,

            content text,
            error text,
            processed_at TIMESTAMPTZ,
            PRIMARY KEY ("id")
        );
    """)

Insert our data into the database#

Now that we have our database created, we can insert them all into the database. We'll be using pandas' to_sql method. How many are we adding?

df.shape
(1253402, 11)

...and add some empty columns#

We'll also specify some extra columns that we'll be using later like content and error. We need to make them blank first, then set the dtype when we save to the database so that postgres knows they're text columns.

import numpy as np

df['error'] = np.nan
df['content'] = np.nan
df['processed_at'] = np.nan

df.head(2)
bill_id code bill_number title description state session filename status status_date url error content processed_at
0 325258 HCR143 HCR143 House Concurrent Resolution Congratulating The... House Concurrent Resolution Congratulating The... VT 2011-2012 Session bill_data/VT/2011-2012_Regular_Session/bill/HC... 4 2011-04-22 http://www.leg.state.vt.us/docs/2012/Acts/ACTR... NaN NaN NaN
1 285625 H0291 H0291 An Act Relating To Raising The Penalties For A... An Act Relating To Raising The Penalties For A... VT 2011-2012 Session bill_data/VT/2011-2012_Regular_Session/bill/H0... 1 2011-02-22 http://www.leg.state.vt.us/docs/2012/bills/Int... NaN NaN NaN
import sqlalchemy

engine = create_engine('postgresql://localhost:5432/legislation', isolation_level="AUTOCOMMIT")

df.to_sql('bills',
          engine,
          if_exists='replace',
          index_label='id',
          chunksize=10000,
          dtype= {
            'bill_id': sqlalchemy.types.INTEGER(), 
            'code':  sqlalchemy.types.TEXT(),
            'bill_number': sqlalchemy.types.TEXT(),
            'title': sqlalchemy.types.TEXT(),
            'description': sqlalchemy.types.TEXT(),
            'state': sqlalchemy.types.TEXT(),
            'session': sqlalchemy.types.TEXT(),
            'filename': sqlalchemy.types.TEXT(),
            'status': sqlalchemy.types.INTEGER(),
            'status_date': sqlalchemy.types.TIMESTAMP(timezone=False),
            'url': sqlalchemy.types.TEXT(),
            'content': sqlalchemy.types.TEXT(),
            'error': sqlalchemy.types.TEXT(),
            'processed_at': sqlalchemy.types.TIMESTAMP(timezone=True),
          })

# Add indices to speed up processing
with engine.connect() as conn:
    conn.execute("""
        CREATE INDEX idx_bill_id ON bills (bill_id);
        CREATE INDEX idx_processed_at ON bills (processed_at);
    """)