Download bill content from the web and process into our database with Tika#
When we're dealing with bill content, it comes in many many types of documents. Word, PDF, web pages - it's an impossible mix!
Fortunately, there's a great tool called Apache Tika that takes almost any type of document and converts it into text. It even does OCR on image-based PDFs! Tika is a dream.
How many have been processed?#
Since you might run this notebook a few times, we'll start off by seeing how many of the rows we've processed so far.
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine('postgresql://localhost:5432/legislation', isolation_level="AUTOCOMMIT")
pd.read_sql("""
SELECT
COUNT(bill_id) - COUNT(processed_at) AS unprocessed,
COUNT(processed_at) as processed,
COUNT(processed_at) / COUNT(id)::decimal as processed_pct
FROM bills;
""", engine)
Let's look at the processed rows#
While we're at it, let's take a look at the rows that have been processed. This can help you make sure the content
column is being filled up, and that error
shows up here or there.
pd.read_sql("""
SELECT * from bills where processed_at is not null limit 5
""", engine)
Use Tika to process the bills into text#
Now we'll go through the entries in the postgres database. For all of the unprocessed rows, we'll pull down the URL and try to convert whatever it's pointing at (PDF, Word doc, HTML page, etc) into some text. If it's successful, we'll save that into the contents
column. If we fail, we'll try to update the error
column instead.
This use of Tika is a little overly complex - it uses a class, saves to a database, all sorts of magic. There's another simpler notebook if you're just interested in seeing how Tika works.
import requests
import tika
from tika import parser
# Should we use OCR if normal processing fails?
USE_OCR = False
class Bill:
def __init__(self, id, url, conn):
self.id = id,
self.url = url
try:
# A little cleaning for URLs that have moved domains
self.url = self.url.replace("www.rilin.state.ri.us", "webserver.rilin.state.ri.us")
self.url = self.url.replace('legis.sd.gov', 'sdlegislature.gov')
except:
pass
self.conn = conn
def update_content(self):
self.content = None
self.error = None
try:
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
response = requests.get(self.url, headers=headers, allow_redirects=True, timeout=2)
# Send to tika
tika_output = parser.from_buffer(response)
# If we get nothing back, try OCR
if USE_OCR and ('content' not in tika_output or not tika_output['content']):
# headers = { 'X-Tika-PDFOcrStrategy': 'ocr_only' }
headers = { 'X-Tika-PDFextractInlineImages': 'true' }
tika_output = parser.from_buffer(response, headers=headers)
if 'content' in tika_output and tika_output['content']:
self.content = tika_output['content'].strip()
else:
self.error = 'tika'
except requests.exceptions.MissingSchema:
self.error = 'bad_url'
except requests.exceptions.Timeout:
self.error = 'timeout'
except requests.exceptions.ConnectionError:
self.error = 'connection'
self.save()
def save(self):
self.conn.execute("""
UPDATE bills SET content=(%s), error=(%s), processed_at=NOW()
WHERE id = (%s)
""", (self.content, self.error, self.id));
@classmethod
def get(cls, conn, id):
results = conn.execute(f"""
SELECT id, url
FROM bills
WHERE id = (%s)
LIMIT 1;
""", (id))
result = list(results)[0]
return Bill(result[0], result[1], conn)
@classmethod
def unprocessed(cls, conn, limit=10):
results = conn.execute(f"""
SELECT id, url
FROM bills
TABLESAMPLE BERNOULLI (1)
WHERE processed_at is null
LIMIT (%s);
""", (limit))
return [Bill(result[0], result[1], conn) for result in results]
@classmethod
def process_queue(cls, conn, limit=10):
todo = Bill.unprocessed(conn, limit)
for bill in todo:
bill.update_content()
def connect_and_update(_):
engine = create_engine('postgresql://localhost:5432/legislation', isolation_level="AUTOCOMMIT")
conn = engine.connect()
Bill.process_queue(conn)
conn.close()
engine.dispose()
Make sure Tika is started up#
We need to make sure Tika is started and running and working before we get down to business, so we'll pick one of the documents to give it a try.
engine = create_engine('postgresql://localhost:5432/legislation', isolation_level="AUTOCOMMIT")
try:
conn.close()
except:
pass
conn = engine.connect()
bill = Bill.get(conn, 1137554)
bill.update_content()
print("Done")
conn.close()
Run many#
Now we'll process them all! We process them in baches of 10, so we need to see how many more batches to perform.
import math
conn = engine.connect()
result = conn.execute("SELECT COUNT(id) - COUNT(processed_at) AS unprocessed FROM bills")
remaining = list(result)[0][0]
conn.close()
batches = math.ceil(remaining / 10)
batches
Tika loves to display warning messages, and I haven't figured out how to suppress them. I run this JavaScript code to make the millions of warnings disappear off of the page.
%%javascript
setInterval(() => [...document.querySelectorAll('.output_stderr')].forEach(e => e.remove()), 5000)
Here goes! Let's go grab all those documents and insert their contents into our database. If our notebook crashes, no problem - we can just start all this again and it'll resume from where it left off.
%%capture --no-display --no-stdout
from multiprocessing import Pool
import math
import tqdm
tasks = list(range(batches))
pool = Pool(processes=30)
for _ in tqdm.tqdm_notebook(pool.imap_unordered(connect_and_update, tasks), total=len(tasks)):
pass
pool.close()
pool.join()
After a long long long while: we're all set! And if it gets interrupted partway through, don't worry: we can just restart this notebook, run from the top, and it'll resume from where we left off.