# # indexer.py # # Copyright 2018 Alan Orth. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # --- # # Connects to a DSpace Solr statistics core and ingests item views and downloads # into a PostgreSQL database for use by other applications (like an API). # # This script is written for Python 3.5+ and requires several modules that you # can install with pip (I recommend using a Python virtual environment): # # $ pip install SolrClient psycopg2-binary # # See: https://solrclient.readthedocs.io/en/latest/SolrClient.html # See: https://wiki.duraspace.org/display/DSPACE/Solr from .config import SOLR_SERVER from .database import DatabaseManager import json import psycopg2.extras import re import requests # Enumerate the cores in Solr to determine if statistics have been sharded into # yearly shards by DSpace's stats-util or not (for example: statistics-2018). def get_statistics_shards(): # Initialize an empty list for statistics core years statistics_core_years = [] # URL for Solr status to check active cores solr_query_params = { 'action': 'STATUS', 'wt': 'json' } solr_url = SOLR_SERVER + '/admin/cores' res = requests.get(solr_url, params=solr_query_params) if res.status_code == requests.codes.ok: data = res.json() # Iterate over active cores from Solr's STATUS response (cores are in # the status array of this response). for core in data['status']: # Pattern to match, for example: statistics-2018 pattern = re.compile('^statistics-[0-9]{4}$') if not pattern.match(core): continue # Append current core to list statistics_core_years.append(core) # Initialize a string to hold our shards (may end up being empty if the Solr # core has not been processed by stats-util). shards = str() if len(statistics_core_years) > 0: # Begin building a string of shards starting with the default one shards = '{}/statistics'.format(SOLR_SERVER) for core in statistics_core_years: # Create a comma-separated list of shards to pass to our Solr query # # See: https://wiki.apache.org/solr/DistributedSearch shards += ',{}/{}'.format(SOLR_SERVER, core) # Return the string of shards, which may actually be empty. Solr doesn't # seem to mind if the shards query parameter is empty and I haven't seen # any negative performance impact so this should be fine. return shards def index_views(): # get total number of distinct facets for items with a minimum of 1 view, # otherwise Solr returns all kinds of weird ids that are actually not in # the database. Also, stats are expensive, but we need stats.calcdistinct # so we can get the countDistinct summary. # # see: https://lucene.apache.org/solr/guide/6_6/the-stats-component.html solr_query_params = { 'q': 'type:2', 'fq': 'isBot:false AND statistics_type:view', 'facet': 'true', 'facet.field': 'id', 'facet.mincount': 1, 'facet.limit': 1, 'facet.offset': 0, 'stats': 'true', 'stats.field': 'id', 'stats.calcdistinct': 'true', 'shards': shards, 'rows': 0, 'wt': 'json' } solr_url = SOLR_SERVER + '/statistics/select' res = requests.get(solr_url, params=solr_query_params) try: # get total number of distinct facets (countDistinct) results_totalNumFacets = res.json()['stats']['stats_fields']['id']['countDistinct'] except TypeError: print('No item views to index, exiting.') exit(0) # divide results into "pages" (cast to int to effectively round down) results_per_page = 100 results_num_pages = int(results_totalNumFacets / results_per_page) results_current_page = 0 with DatabaseManager() as db: with db.cursor() as cursor: # create an empty list to store values for batch insertion data = [] while results_current_page <= results_num_pages: # "pages" are zero based, but one based is more human readable print('Indexing item views (page {} of {})'.format(results_current_page + 1, results_num_pages + 1)) solr_query_params = { 'q': 'type:2', 'fq': 'isBot:false AND statistics_type:view', 'facet': 'true', 'facet.field': 'id', 'facet.mincount': 1, 'facet.limit': results_per_page, 'facet.offset': results_current_page * results_per_page, 'shards': shards, 'rows': 0, 'wt': 'json', 'json.nl': 'map' # return facets as a dict instead of a flat list } solr_url = SOLR_SERVER + '/statistics/select' res = requests.get(solr_url, params=solr_query_params) # Solr returns facets as a dict of dicts (see json.nl parameter) views = res.json()['facet_counts']['facet_fields'] # iterate over the 'id' dict and get the item ids and views for item_id, item_views in views['id'].items(): data.append((item_id, item_views)) # do a batch insert of values from the current "page" of results sql = 'INSERT INTO items(id, views) VALUES %s ON CONFLICT(id) DO UPDATE SET views=excluded.views' psycopg2.extras.execute_values(cursor, sql, data, template='(%s, %s)') db.commit() # clear all items from the list so we can populate it with the next batch data.clear() results_current_page += 1 def index_downloads(): # get the total number of distinct facets for items with at least 1 download solr_query_params= { 'q': 'type:0', 'fq': 'isBot:false AND statistics_type:view AND bundleName:ORIGINAL', 'facet': 'true', 'facet.field': 'owningItem', 'facet.mincount': 1, 'facet.limit': 1, 'facet.offset': 0, 'stats': 'true', 'stats.field': 'owningItem', 'stats.calcdistinct': 'true', 'shards': shards, 'rows': 0, 'wt': 'json' } solr_url = SOLR_SERVER + '/statistics/select' res = requests.get(solr_url, params=solr_query_params) try: # get total number of distinct facets (countDistinct) results_totalNumFacets = res.json()['stats']['stats_fields']['owningItem']['countDistinct'] except TypeError: print('No item downloads to index, exiting.') exit(0) # divide results into "pages" (cast to int to effectively round down) results_per_page = 100 results_num_pages = int(results_totalNumFacets / results_per_page) results_current_page = 0 with DatabaseManager() as db: with db.cursor() as cursor: # create an empty list to store values for batch insertion data = [] while results_current_page <= results_num_pages: # "pages" are zero based, but one based is more human readable print('Indexing item downloads (page {} of {})'.format(results_current_page + 1, results_num_pages + 1)) solr_query_params = { 'q': 'type:0', 'fq': 'isBot:false AND statistics_type:view AND bundleName:ORIGINAL', 'facet': 'true', 'facet.field': 'owningItem', 'facet.mincount': 1, 'facet.limit': results_per_page, 'facet.offset': results_current_page * results_per_page, 'shards': shards, 'rows': 0, 'wt': 'json', 'json.nl': 'map' # return facets as a dict instead of a flat list } solr_url = SOLR_SERVER + '/statistics/select' res = requests.get(solr_url, params=solr_query_params) # Solr returns facets as a dict of dicts (see json.nl parameter) downloads = res.json()['facet_counts']['facet_fields'] # iterate over the 'owningItem' dict and get the item ids and downloads for item_id, item_downloads in downloads['owningItem'].items(): data.append((item_id, item_downloads)) # do a batch insert of values from the current "page" of results sql = 'INSERT INTO items(id, downloads) VALUES %s ON CONFLICT(id) DO UPDATE SET downloads=excluded.downloads' psycopg2.extras.execute_values(cursor, sql, data, template='(%s, %s)') db.commit() # clear all items from the list so we can populate it with the next batch data.clear() results_current_page += 1 with DatabaseManager() as db: with db.cursor() as cursor: # create table to store item views and downloads cursor.execute('''CREATE TABLE IF NOT EXISTS items (id INT PRIMARY KEY, views INT DEFAULT 0, downloads INT DEFAULT 0)''') # commit the table creation before closing the database connection db.commit() shards = get_statistics_shards() index_views() index_downloads() # vim: set sw=4 ts=4 expandtab: