iseal-core/util/create-schema-dspace.py

291 lines
9.5 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
#
# create-schema-dspace.py 0.0.1
#
# SPDX-License-Identifier: GPL-3.0-only
#
# ---
#
# A quick and dirty script to read schema fields from a CSV file and create them
# them in the DSpace metadata registry using the REST API. Specify an email and
# for a DSpace user with administrator privileges when running:
#
# $ ./util/create-schema-dspace.py -e me@example.com -p 'fuu!'
#
# You can optionally specify the URL of a DSpace REST application (default is to
# use http://localhost:8080/rest).
#
# This script is written for Python 3 and requires several modules that you can
# install with pip (I recommend setting up a Python virtual environment first):
#
# $ pip install requests pandas
#
import argparse
import signal
import sys
import pandas as pd
import requests
def signal_handler(signal, frame):
sys.exit(1)
# Try to log in, for example:
#
# $ http -f POST http://localhost:8080/rest/login email=aorth@fuuu.com password='fuuuuuu'
#
def login(user, password):
request_url = rest_login_endpoint
headers = {"user-agent": rest_user_agent}
data = {"email": args.user, "password": args.password}
print(f"Logging in...")
try:
request = requests.post(rest_login_endpoint, headers=headers, data=data)
except requests.ConnectionError:
sys.stderr.write(f" Could not connect to REST API: {args.request_url}\n")
exit(1)
if request.status_code != requests.codes.ok:
sys.stderr.write(f" Login failed.\n")
exit(1)
return request.cookies["JSESSIONID"]
# Check the authentication status of the specified JSESSIONID.
def check_session(sessionid):
request_url = rest_status_endpoint
headers = {"user-agent": rest_user_agent, "Accept": "application/json"}
cookies = {"JSESSIONID": sessionid}
print(f"Checking session status...")
try:
request = requests.get(request_url, headers=headers, cookies=cookies)
except requests.ConnectionError:
sys.stderr.write(f" Could not connect to REST API: {args.request_url}\n")
exit(1)
if request.status_code == requests.codes.ok:
if not request.json()["authenticated"]:
sys.stderr.write(f" Session expired: {sessionid}\n")
exit(1)
else:
sys.stderr.write(f" Error checking session status.\n")
exit(1)
# Create a new schema by passing a Schema Object, for example:
#
# $ http POST http://localhost:8080/rest/registries/schema Cookie:JSESSIONID=549756EB08169F17697A56A7D56901B3 < schema.json
#
# See: https://wiki.lyrasis.org/display/DSDOC6x/REST+API#RESTAPI-SchemaObject
def create_schema(schema):
request_url = rest_schema_registry_endpoint
headers = {"user-agent": rest_user_agent}
cookies = {"JSESSIONID": session}
print(f" Attempting to create schema: {schema['prefix']}")
try:
request = requests.post(
request_url, headers=headers, json=schema, cookies=cookies
)
except requests.ConnectionError:
sys.stderr.write(f" Could not connect to REST API: {args.request_url}\n")
exit(1)
# Check the status
if request.status_code == requests.codes.ok:
return True
# DSpace responds with HTTP 500 if the schema exists
elif request.status_code == requests.codes.internal_server_error:
print(f" Schema already exists: {schema['prefix']}")
return False
# DSpace responds with HTTP 415 if we POST bad data like a malformed JSON
# or a request without headers, etc.
elif request.status_code == requests.codes.unsupported_media_type:
sys.stderr.write(f" Could not create schema: {schema['prefix']}\n")
sys.stderr.write(f" HTTP error code: {request.status_code}\n")
return False
# Unknown error
else:
sys.stderr.write(f" Could not create schema: {schema['prefix']}\n")
sys.stderr.write(f" HTTP error code: {request.status_code}\n")
return False
# Create a new metadata field by passing a MetadataField Object, for example:
#
# $ http POST http://localhost:8080/rest/registries/schema/fuuu/metadata-fields \
# Cookie:JSESSIONID=549756EB08169F17697A56A7D56901B3 < field.json
#
# See: https://wiki.lyrasis.org/display/DSDOC6x/REST+API#RESTAPI-MetadataFieldObject
def create_field(schema_prefix, field):
request_url = f"{rest_schema_registry_endpoint}/{schema_prefix}/metadata-fields"
headers = {"user-agent": rest_user_agent}
cookies = {"JSESSIONID": session}
print(f" Attempting to create field: {field['name']}")
try:
request = requests.post(
request_url,
headers=headers,
json=field,
cookies=cookies,
)
except requests.ConnectionError:
sys.stderr.write(f" Could not connect to REST API: {args.request_url}\n")
exit(1)
# Check the status
if request.status_code == requests.codes.ok:
return True
# DSpace responds with HTTP 500 if the field exists
elif request.status_code == requests.codes.internal_server_error:
print(f" Field already exists: {field['name']}")
return False
# DSpace responds with HTTP 415 if we POST bad data like a malformed JSON
# or a request without headers, etc.
elif request.status_code == requests.codes.unsupported_media_type:
sys.stderr.write(f" Could not create field: {field['name']}.\n")
sys.stderr.write(f" HTTP error code: {request.status_code}\n")
sys.stderr.write("\n")
return False
# DSpace responds with HTTP 404 if the schema does not exist
elif request.status_code == requests.codes.not_found:
sys.stderr.write(
f" Could not create field: {field['name']} (parent schema does not exist).\n"
)
return False
# Unknown error
else:
sys.stderr.write(f" Could not create field: {field['name']}\n")
sys.stderr.write(f" HTTP error code: {request.status_code}\n")
return False
def parse_fields(schema_df):
# Iterate over all rows (the "index, row" syntax allows us to access column
# headings in each row, which isn't possible if we just do row).
for index, row in schema_df.iterrows():
dspace_field_name = row["dspace field name"]
# Extract the prefix from the field name, ie is.link.url
dspace_field_prefix = dspace_field_name.split(".")[0]
element_name = row["element name"]
# Make sure we only try to create fields in IS and FSC schemas for now.
# In the future we may create fields for other schema extensions, but
# we don't want to create dcterms fields, for example.
if "is" in dspace_field_prefix or "fsc" in dspace_field_prefix:
# Extract the element, ie "link" in is.link.url
dspace_field_element = dspace_field_name.split(".")[1]
try:
# Extract the qualifier, ie "url" in is.link.url, if it exists
dspace_field_qualifier = dspace_field_name.split(".")[2]
except IndexError:
dspace_field_qualifier = None
# Create a list of tuples with the metadata field components
field_components = [
("element", dspace_field_element),
("qualifier", dspace_field_qualifier),
("name", row["dspace field name"]),
("description", element_name),
]
# Create a dict from the list of tuples аbove. We can pass the dict
# to requests directly and it will be converted to JSON.
field = dict(field_components)
if create_field(dspace_field_prefix, field):
print(f" Created field: {dspace_field_name}")
parser = argparse.ArgumentParser(
description="Create ISEAL and FSC schemas in a DSpace 6.x repository."
)
parser.add_argument("-d", "--debug", help="Print debug messages.", action="store_true")
parser.add_argument(
"-u",
"--rest-url",
help="URL of the DSpace 6.x REST API.",
default="http://localhost:8080/rest",
)
parser.add_argument("-e", "--user", help="Email of administrator user.")
parser.add_argument("-p", "--password", help="Email of administrator user.")
parser.add_argument(
"-s", "--jsessionid", help="JESSIONID, if previously authenticated."
)
args = parser.parse_args()
# DSpace 6.x REST API base URL and endpoints
rest_base_url = args.rest_url
rest_login_endpoint = f"{rest_base_url}/login"
rest_status_endpoint = f"{rest_base_url}/status"
rest_schema_registry_endpoint = f"{rest_base_url}/registries/schema"
rest_user_agent = "Alan Test Python Requests Bot"
session = args.jsessionid
# set the signal handler for SIGINT (^C)
signal.signal(signal.SIGINT, signal_handler)
# Try to login if no session was passed
if not args.jsessionid:
session = login(args.user, args.password)
else:
check_session(args.jsessionid)
if args.debug:
sys.stderr.write(f" Logged in, using JSESSIONID: {session}\n")
print("\nCreating schemas...")
iseal_schema = {
"namespace": "https://iseal-community.github.io/iseal-core",
"prefix": "is",
}
fsc_schema = {
"namespace": "https://iseal-community.github.io/iseal-core/fsc",
"prefix": "fsc",
}
if create_schema(iseal_schema):
print(" Created ISEAL Core schema")
if create_schema(fsc_schema):
print(" Created FSC schema")
print("\nCreating fields...")
for file in ["data/iseal-core.csv", "data/fsc.csv"]:
if args.debug:
sys.stderr.write(f" Opening {file}\n")
try:
df = pd.read_csv(file, usecols=["dspace field name", "element name"])
parse_fields(df)
except FileNotFoundError:
sys.stderr.write(f" Could not open {file}\n")