Download bill content from the web and process into our database with Tika#

When we're dealing with bill content, it comes in many many types of documents. Word, PDF, web pages - it's an impossible mix!

Fortunately, there's a great tool called Apache Tika that takes almost any type of document and converts it into text. It even does OCR on image-based PDFs! Tika is a dream.

How many have been processed?#

Since you might run this notebook a few times, we'll start off by seeing how many of the rows we've processed so far.

import pandas as pd
from sqlalchemy import create_engine

engine = create_engine('postgresql://localhost:5432/legislation', isolation_level="AUTOCOMMIT")
pd.read_sql("""
    SELECT
        COUNT(bill_id) - COUNT(processed_at) AS unprocessed,
        COUNT(processed_at) as processed,
        COUNT(processed_at) / COUNT(id)::decimal as processed_pct
    FROM bills;
""", engine)
unprocessed processed processed_pct
0 1198810 54592 0.043555

Let's look at the processed rows#

While we're at it, let's take a look at the rows that have been processed. This can help you make sure the content column is being filled up, and that error shows up here or there.

pd.read_sql("""
    SELECT * from bills where processed_at is not null limit 5
""", engine)
id bill_id code bill_number title description state session filename status status_date url error content processed_at
0 56 307787 H0454 H0454 An Act Relating To The Administration And Issu... An Act Relating To The Administration And Issu... VT 2011-2012 Session bill_data/VT/2011-2012_Regular_Session/bill/H0... 1 2011-04-19 http://www.leg.state.vt.us/docs/2012/bills/Hou... None Microsoft Word - BillTemp.doc\n\n\nBILL AS PAS... 2019-11-17 20:16:24.520099+00:00
1 18 267749 H0229 H0229 An Act Relating To Requiring That Certain Surg... An Act Relating To Requiring That Certain Surg... VT 2011-2012 Session bill_data/VT/2011-2012_Regular_Session/bill/H0... 1 2011-02-11 http://www.leg.state.vt.us/docs/2012/bills/Int... None Microsoft Word - GENERAL-#264910-v1-Dr_11-777;... 2019-11-17 20:39:25.924196+00:00
2 55 247704 H0141 H0141 An Act Relating To Improving Transparency In G... An Act Relating To Improving Transparency In G... VT 2011-2012 Session bill_data/VT/2011-2012_Regular_Session/bill/H0... 1 2011-01-28 http://www.leg.state.vt.us/docs/2012/bills/Int... None Microsoft Word - GENERAL-#262040-v5A-H_141_-_2... 2019-11-17 20:39:27.023591+00:00
3 37 271790 HCR047 HCR047 House Concurrent Resolution Honoring The Town ... House Concurrent Resolution Honoring The Town ... VT 2011-2012 Session bill_data/VT/2011-2012_Regular_Session/bill/HC... 4 2011-02-15 http://www.leg.state.vt.us/docs/2012/Acts/ACTR... None Microsoft Word - GENERAL-#266635-v1-Act_No__R-... 2019-11-17 20:16:24.720864+00:00
4 83 417770 HCR315 HCR315 House Concurrent Resolution Congratulating The... House Concurrent Resolution Congratulating The... VT 2011-2012 Session bill_data/VT/2011-2012_Regular_Session/bill/HC... 4 2012-03-23 http://www.leg.state.vt.us/docs/2012/Acts/ACTR... None Microsoft Word - GENERAL-#279477-v1-Act_No__R-... 2019-11-17 20:27:44.197148+00:00

Use Tika to process the bills into text#

Now we'll go through the entries in the postgres database. For all of the unprocessed rows, we'll pull down the URL and try to convert whatever it's pointing at (PDF, Word doc, HTML page, etc) into some text. If it's successful, we'll save that into the contents column. If we fail, we'll try to update the error column instead.

This use of Tika is a little overly complex - it uses a class, saves to a database, all sorts of magic. There's another simpler notebook if you're just interested in seeing how Tika works.

import requests
import tika
from tika import parser

# Should we use OCR if normal processing fails?
USE_OCR = False

class Bill:

    def __init__(self, id, url, conn):
        self.id = id,
        self.url = url
        try:
            # A little cleaning for URLs that have moved domains
            self.url = self.url.replace("www.rilin.state.ri.us", "webserver.rilin.state.ri.us")
            self.url = self.url.replace('legis.sd.gov', 'sdlegislature.gov')
        except:
            pass
        self.conn = conn

    def update_content(self):
        self.content = None
        self.error = None

        try:
            headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
            response = requests.get(self.url, headers=headers, allow_redirects=True, timeout=2)

            # Send to tika
            tika_output = parser.from_buffer(response)

            # If we get nothing back, try OCR
            if USE_OCR and ('content' not in tika_output or not tika_output['content']):
                # headers = { 'X-Tika-PDFOcrStrategy': 'ocr_only' }
                headers = { 'X-Tika-PDFextractInlineImages': 'true' }
                tika_output = parser.from_buffer(response, headers=headers)

            if 'content' in tika_output and tika_output['content']:
                self.content = tika_output['content'].strip()        
            else:
                self.error = 'tika'
        except requests.exceptions.MissingSchema:
            self.error = 'bad_url'
        except requests.exceptions.Timeout:
            self.error = 'timeout'
        except requests.exceptions.ConnectionError:
            self.error = 'connection'
        
        self.save()
        
    def save(self):
        self.conn.execute("""
            UPDATE bills SET content=(%s), error=(%s), processed_at=NOW()
            WHERE id = (%s)
        """, (self.content, self.error, self.id));

    @classmethod
    def get(cls, conn, id):
        results = conn.execute(f"""
            SELECT id, url
            FROM bills
            WHERE id = (%s)
            LIMIT 1;
        """, (id))
        result = list(results)[0]
        return Bill(result[0], result[1], conn)
        
    @classmethod
    def unprocessed(cls, conn, limit=10):
        results = conn.execute(f"""
            SELECT id, url
            FROM bills
            TABLESAMPLE BERNOULLI (1)
            WHERE processed_at is null
            LIMIT (%s);
        """, (limit))
        return [Bill(result[0], result[1], conn) for result in results]
    
    @classmethod
    def process_queue(cls, conn, limit=10):
        todo = Bill.unprocessed(conn, limit)
        for bill in todo:
            bill.update_content()
def connect_and_update(_):
    engine = create_engine('postgresql://localhost:5432/legislation', isolation_level="AUTOCOMMIT")
    conn = engine.connect()
    Bill.process_queue(conn)
    conn.close()
    engine.dispose()

Make sure Tika is started up#

We need to make sure Tika is started and running and working before we get down to business, so we'll pick one of the documents to give it a try.

engine = create_engine('postgresql://localhost:5432/legislation', isolation_level="AUTOCOMMIT")

try:
    conn.close()
except:
    pass

conn = engine.connect()
bill = Bill.get(conn, 1137554)
bill.update_content()
print("Done")
conn.close()
Done

Run many#

Now we'll process them all! We process them in baches of 10, so we need to see how many more batches to perform.

import math

conn = engine.connect()
result = conn.execute("SELECT COUNT(id) - COUNT(processed_at) AS unprocessed FROM bills")
remaining = list(result)[0][0]
conn.close()

batches = math.ceil(remaining / 10)
batches
119881

Tika loves to display warning messages, and I haven't figured out how to suppress them. I run this JavaScript code to make the millions of warnings disappear off of the page.

%%javascript
setInterval(() => [...document.querySelectorAll('.output_stderr')].forEach(e => e.remove()), 5000)

Here goes! Let's go grab all those documents and insert their contents into our database. If our notebook crashes, no problem - we can just start all this again and it'll resume from where it left off.

%%capture --no-display --no-stdout

from multiprocessing import Pool
import math
import tqdm

tasks = list(range(batches))

pool = Pool(processes=30)

for _ in tqdm.tqdm_notebook(pool.imap_unordered(connect_and_update, tasks), total=len(tasks)):
    pass

pool.close()
pool.join()

After a long long long while: we're all set! And if it gets interrupted partway through, don't worry: we can just restart this notebook, run from the top, and it'll resume from where we left off.