From 385a34e5d09ee31c91f08612727ec7a140914853 Mon Sep 17 00:00:00 2001 From: Alan Orth Date: Wed, 26 Sep 2018 23:10:29 +0300 Subject: [PATCH] indexer.py: Use psycopg2's execute_values to batch inserts Batch inserts are much faster than a series of individual inserts because they drastically reduce the overhead caused by round-trip communication with the server. My tests in development confirm: - cursor.execute(): 19 seconds - execute_values(): 14 seconds I'm currently only working with 4,500 rows, but I will experiment with larger data sets, as well as larger batches. For example, on the PostgreSQL mailing list a user reports doing 10,000 rows with a page size of 100. See: http://initd.org/psycopg/docs/extras.html#psycopg2.extras.execute_values See: https://github.com/psycopg/psycopg2/issues/491#issuecomment-276551038 --- indexer.py | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/indexer.py b/indexer.py index 26c9f2c..5b2585b 100755 --- a/indexer.py +++ b/indexer.py @@ -32,6 +32,7 @@ from database import database_connection import json +import psycopg2.extras from solr import solr_connection def index_views(): @@ -64,6 +65,9 @@ def index_views(): cursor = db.cursor() + # create an empty list to store values for batch insertion + data = [] + while results_current_page <= results_num_pages: print('Indexing item views (page {} of {})'.format(results_current_page, results_num_pages)) @@ -84,12 +88,16 @@ def index_views(): views = res.get_facets() # in this case iterate over the 'id' dict and get the item ids and views for item_id, item_views in views['id'].items(): - cursor.execute('''INSERT INTO items(id, views) VALUES(%s, %s) - ON CONFLICT(id) DO UPDATE SET downloads=excluded.views''', - (item_id, item_views)) + data.append((item_id, item_views)) + # do a batch insert of values from the current "page" of results + sql = 'INSERT INTO items(id, views) VALUES %s ON CONFLICT(id) DO UPDATE SET downloads=excluded.views' + psycopg2.extras.execute_values(cursor, sql, data, template='(%s, %s)') db.commit() + # clear all items from the list so we can populate it with the next batch + data.clear() + results_current_page += 1 cursor.close() @@ -119,6 +127,9 @@ def index_downloads(): cursor = db.cursor() + # create an empty list to store values for batch insertion + data = [] + while results_current_page <= results_num_pages: print('Indexing item downloads (page {} of {})'.format(results_current_page, results_num_pages)) @@ -136,12 +147,16 @@ def index_downloads(): downloads = res.get_facets() # in this case iterate over the 'owningItem' dict and get the item ids and downloads for item_id, item_downloads in downloads['owningItem'].items(): - cursor.execute('''INSERT INTO items(id, downloads) VALUES(%s, %s) - ON CONFLICT(id) DO UPDATE SET downloads=excluded.downloads''', - (item_id, item_downloads)) + data.append((item_id, item_downloads)) + # do a batch insert of values from the current "page" of results + sql = 'INSERT INTO items(id, downloads) VALUES %s ON CONFLICT(id) DO UPDATE SET downloads=excluded.downloads' + psycopg2.extras.execute_values(cursor, sql, data, template='(%s, %s)') db.commit() + # clear all items from the list so we can populate it with the next batch + data.clear() + results_current_page += 1 cursor.close()