1
0
mirror of https://github.com/ilri/csv-metadata-quality.git synced 2025-07-23 22:41:43 +02:00

Use uv build backend

uv's build backend expects our module to be in src.

See: https://docs.astral.sh/uv/concepts/build-backend/#modules
This commit is contained in:
2025-07-03 14:33:57 +03:00
parent 753f3340a3
commit be550e21f1
10 changed files with 3 additions and 8 deletions

View File

View File

@ -0,0 +1,13 @@
# SPDX-License-Identifier: GPL-3.0-only
from sys import argv
from csv_metadata_quality import app
def main():
app.run(argv)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,261 @@
# SPDX-License-Identifier: GPL-3.0-only
import argparse
import os
import re
import signal
import sys
from datetime import timedelta
import pandas as pd
import requests_cache
from colorama import Fore
import csv_metadata_quality.check as check
import csv_metadata_quality.experimental as experimental
import csv_metadata_quality.fix as fix
from csv_metadata_quality.version import VERSION
def parse_args(argv):
parser = argparse.ArgumentParser(description="Metadata quality checker and fixer.")
parser.add_argument(
"--agrovoc-fields",
"-a",
help="Comma-separated list of fields to validate against AGROVOC, for example: dcterms.subject,cg.coverage.country",
)
parser.add_argument(
"--drop-invalid-agrovoc",
"-d",
help="After validating metadata values against AGROVOC, drop invalid values.",
action="store_true",
)
parser.add_argument(
"--experimental-checks",
"-e",
help="Enable experimental checks like language detection",
action="store_true",
)
parser.add_argument(
"--input-file",
"-i",
help="Path to input file. Must be a UTF-8 CSV.",
required=True,
type=argparse.FileType("r", encoding="UTF-8"),
)
parser.add_argument(
"--output-file",
"-o",
help="Path to output file (always CSV).",
required=True,
type=argparse.FileType("w", encoding="UTF-8"),
)
parser.add_argument(
"--unsafe-fixes", "-u", help="Perform unsafe fixes.", action="store_true"
)
parser.add_argument(
"--version", "-V", action="version", version=f"CSV Metadata Quality v{VERSION}"
)
parser.add_argument(
"--exclude-fields",
"-x",
help="Comma-separated list of fields to skip, for example: dc.contributor.author,dcterms.bibliographicCitation",
)
args = parser.parse_args()
return args
def signal_handler(signal, frame):
sys.exit(1)
def run(argv):
args = parse_args(argv)
# set the signal handler for SIGINT (^C)
signal.signal(signal.SIGINT, signal_handler)
# Read all fields as strings so dates don't get converted from 1998 to 1998.0
df = pd.read_csv(args.input_file, dtype_backend="pyarrow", dtype="str")
# Check if the user requested to skip any fields
if args.exclude_fields:
# Split the list of excluded fields on ',' into a list. Note that the
# user should be careful to no include spaces here.
exclude = args.exclude_fields.split(",")
else:
exclude = []
# enable transparent request cache with thirty days expiry
expire_after = timedelta(days=30)
# Allow overriding the location of the requests cache, just in case we are
# running in an environment where we can't write to the current working di-
# rectory (for example from csv-metadata-quality-web).
REQUESTS_CACHE_DIR = os.environ.get("REQUESTS_CACHE_DIR", ".")
requests_cache.install_cache(
f"{REQUESTS_CACHE_DIR}/agrovoc-response-cache", expire_after=expire_after
)
# prune old cache entries
requests_cache.delete()
for column in df.columns:
if column in exclude:
print(f"{Fore.YELLOW}Skipping {Fore.RESET}{column}")
continue
if args.unsafe_fixes:
# Skip whitespace and newline fixes on abstracts and descriptions
# because there are too many with legitimate multi-line metadata.
match = re.match(r"^.*?(abstract|description).*$", column)
if match is None:
# Fix: whitespace
df[column] = df[column].apply(fix.whitespace, field_name=column)
# Fix: newlines
df[column] = df[column].apply(fix.newlines, field_name=column)
# Fix: missing space after comma. Only run on author and citation
# fields for now, as this problem is mostly an issue in names.
if args.unsafe_fixes:
match = re.match(r"^.*?(author|[Cc]itation).*$", column)
if match is not None:
df[column] = df[column].apply(fix.comma_space, field_name=column)
# Fix: perform Unicode normalization (NFC) to convert decomposed
# characters into their canonical forms.
if args.unsafe_fixes:
df[column] = df[column].apply(fix.normalize_unicode, field_name=column)
# Check: suspicious characters
df[column].apply(check.suspicious_characters, field_name=column)
# Fix: mojibake. If unsafe fixes are not enabled then we only check.
if args.unsafe_fixes:
df[column] = df[column].apply(fix.mojibake, field_name=column)
else:
df[column].apply(check.mojibake, field_name=column)
# Fix: unnecessary Unicode
df[column] = df[column].apply(fix.unnecessary_unicode)
# Fix: normalize DOIs
match = re.match(r"^.*?identifier\.doi.*$", column)
if match is not None:
df[column] = df[column].apply(fix.normalize_dois)
# Fix: invalid and unnecessary multi-value separators. Skip the title
# and abstract fields because "|" is used to indicate something like
# a subtitle.
match = re.match(r"^.*?(abstract|[Cc]itation|title).*$", column)
if match is None:
df[column] = df[column].apply(fix.separators, field_name=column)
# Run whitespace fix again after fixing invalid separators
df[column] = df[column].apply(fix.whitespace, field_name=column)
# Fix: duplicate metadata values
df[column] = df[column].apply(fix.duplicates, field_name=column)
# Check: invalid AGROVOC subject and optionally drop them
if args.agrovoc_fields:
# Identify fields the user wants to validate against AGROVOC
for field in args.agrovoc_fields.split(","):
if column == field:
df[column] = df[column].apply(
check.agrovoc, field_name=column, drop=args.drop_invalid_agrovoc
)
# Check: invalid language
match = re.match(r"^.*?language.*$", column)
if match is not None:
df[column].apply(check.language)
# Check: invalid ISSN
match = re.match(r"^.*?issn.*$", column)
if match is not None:
df[column].apply(check.issn)
# Check: invalid ISBN
match = re.match(r"^.*?isbn.*$", column)
if match is not None:
df[column].apply(check.isbn)
# Check: invalid date
match = re.match(r"^.*?(date|dcterms\.issued).*$", column)
if match is not None:
df[column].apply(check.date, field_name=column)
# Check: filename extension
if column == "filename":
df[column].apply(check.filename_extension)
# Check: SPDX license identifier
match = re.match(r"dcterms\.license.*$", column)
if match is not None:
df[column].apply(check.spdx_license_identifier)
### End individual column checks ###
# Check: duplicate items
# We extract just the title, type, and date issued columns to analyze
try:
duplicates_df = df.filter(
regex=r"dcterms\.title|dc\.title|dcterms\.type|dc\.type|dcterms\.issued|dc\.date\.issued"
)
check.duplicate_items(duplicates_df)
# Delete the temporary duplicates DataFrame
del duplicates_df
except IndexError:
pass
##
# Perform some checks on rows so we can consider items as a whole rather
# than simple on a field-by-field basis. This allows us to check whether
# the language used in the title and abstract matches the language indi-
# cated in the language field, for example.
#
# This is slower and apparently frowned upon in the Pandas community be-
# cause it requires iterating over rows rather than using apply over a
# column. For now it will have to do.
##
# Transpose the DataFrame so we can consider each row as a column
df_transposed = df.T
# Remember, here a "column" is an item (previously row). Perhaps I
# should rename column in this for loop...
for column in df_transposed.columns:
# Check: citation DOI
check.citation_doi(df_transposed[column], exclude)
# Check: title in citation
check.title_in_citation(df_transposed[column], exclude)
if args.unsafe_fixes:
# Fix: countries match regions
df_transposed[column] = fix.countries_match_regions(
df_transposed[column], exclude
)
else:
# Check: countries match regions
check.countries_match_regions(df_transposed[column], exclude)
if args.experimental_checks:
experimental.correct_language(df_transposed[column], exclude)
# Transpose the DataFrame back before writing. This is probably wasteful to
# do every time since we technically only need to do it if we've done the
# countries/regions fix above, but I can't think of another way for now.
df_transposed_back = df_transposed.T
# Write
df_transposed_back.to_csv(args.output_file, index=False)
# Close the input and output files before exiting
args.input_file.close()
args.output_file.close()
sys.exit(0)

560
src/csv_metadata_quality/check.py Executable file
View File

@ -0,0 +1,560 @@
# SPDX-License-Identifier: GPL-3.0-only
import logging
import re
from datetime import datetime
import country_converter as coco
import pandas as pd
import requests
from colorama import Fore
from pycountry import languages
from stdnum import isbn as stdnum_isbn
from stdnum import issn as stdnum_issn
from csv_metadata_quality.util import is_mojibake, load_spdx_licenses
def issn(field):
"""Check if an ISSN is valid.
Prints the ISSN if invalid.
stdnum's is_valid() function never raises an exception.
See: https://arthurdejong.org/python-stdnum/doc/1.11/index.html#stdnum.module.is_valid
"""
# Skip fields with missing values
if pd.isna(field):
return
# Try to split multi-value field on "||" separator
for value in field.split("||"):
if not stdnum_issn.is_valid(value):
print(f"{Fore.RED}Invalid ISSN: {Fore.RESET}{value}")
return
def isbn(field):
"""Check if an ISBN is valid.
Prints the ISBN if invalid.
stdnum's is_valid() function never raises an exception.
See: https://arthurdejong.org/python-stdnum/doc/1.11/index.html#stdnum.module.is_valid
"""
# Skip fields with missing values
if pd.isna(field):
return
# Try to split multi-value field on "||" separator
for value in field.split("||"):
if not stdnum_isbn.is_valid(value):
print(f"{Fore.RED}Invalid ISBN: {Fore.RESET}{value}")
return
def date(field, field_name):
"""Check if a date is valid.
In DSpace the issue date is usually 1990, 1990-01, or 1990-01-01, but it
could technically even include time as long as it is ISO8601.
Also checks for other invalid cases like missing and multiple dates.
Prints the date if invalid.
"""
if pd.isna(field):
print(f"{Fore.RED}Missing date ({field_name}).{Fore.RESET}")
return
# Try to split multi-value field on "||" separator
multiple_dates = field.split("||")
# We don't allow multi-value date fields
if len(multiple_dates) > 1:
print(
f"{Fore.RED}Multiple dates not allowed ({field_name}): {Fore.RESET}{field}"
)
return
try:
# Check if date is valid YYYY format
datetime.strptime(field, "%Y")
return
except ValueError:
pass
try:
# Check if date is valid YYYY-MM format
datetime.strptime(field, "%Y-%m")
return
except ValueError:
pass
try:
# Check if date is valid YYYY-MM-DD format
datetime.strptime(field, "%Y-%m-%d")
return
except ValueError:
pass
try:
# Check if date is valid YYYY-MM-DDTHH:MM:SSZ format
datetime.strptime(field, "%Y-%m-%dT%H:%M:%SZ")
return
except ValueError:
print(f"{Fore.RED}Invalid date ({field_name}): {Fore.RESET}{field}")
return
def suspicious_characters(field, field_name):
"""Warn about suspicious characters.
Look for standalone characters that could indicate encoding or copy/paste
errors for languages with accents. For example: foreˆt should be forêt.
"""
# Skip fields with missing values
if pd.isna(field):
return
# List of suspicious characters, for example: ́ˆ~`
suspicious_characters = ["\u00b4", "\u02c6", "\u007e", "\u0060"]
for character in suspicious_characters:
# Find the position of the suspicious character in the string
suspicious_character_position = field.find(character)
# Python returns -1 if there is no match
if suspicious_character_position != -1:
# Create a temporary new string starting from the position of the
# suspicious character
field_subset = field[suspicious_character_position:]
# Print part of the metadata value starting from the suspicious
# character and spanning enough of the rest to give a preview,
# but not too much to cause the line to break in terminals with
# a default of 80 characters width.
suspicious_character_msg = f"{Fore.YELLOW}Suspicious character ({field_name}): {Fore.RESET}{field_subset}"
print(f"{suspicious_character_msg:1.80}")
return
def language(field):
"""Check if a language is valid ISO 639-1 (alpha 2) or ISO 639-3 (alpha 3).
Prints the value if it is invalid.
"""
# Skip fields with missing values
if pd.isna(field):
return
# need to handle "Other" values here...
# Try to split multi-value field on "||" separator
for value in field.split("||"):
# After splitting, check if language value is 2 or 3 characters so we
# can check it against ISO 639-1 or ISO 639-3 accordingly.
if len(value) == 2:
if not languages.get(alpha_2=value):
print(f"{Fore.RED}Invalid ISO 639-1 language: {Fore.RESET}{value}")
elif len(value) == 3:
if not languages.get(alpha_3=value):
print(f"{Fore.RED}Invalid ISO 639-3 language: {Fore.RESET}{value}")
else:
print(f"{Fore.RED}Invalid language: {Fore.RESET}{value}")
return
def agrovoc(field, field_name, drop):
"""Check subject terms against AGROVOC REST API.
Function constructor expects the field as well as the field name because
many fields can now be validated against AGROVOC and we want to be able
to inform the user in which field the invalid term is.
Logic copied from agrovoc-lookup.py.
See: https://github.com/ilri/DSpace/blob/5_x-prod/agrovoc-lookup.py
Prints a warning if the value is invalid.
"""
# Skip fields with missing values
if pd.isna(field):
return
# Initialize an empty list to hold the validated AGROVOC values
values = []
# Try to split multi-value field on "||" separator
for value in field.split("||"):
request_url = "https://agrovoc.uniroma2.it/agrovoc/rest/v1/agrovoc/search"
request_params = {"query": value}
request = requests.get(request_url, params=request_params)
if request.status_code == requests.codes.ok:
data = request.json()
# check if there are any results
if len(data["results"]) == 0:
if drop:
print(
f"{Fore.GREEN}Dropping invalid AGROVOC ({field_name}): {Fore.RESET}{value}"
)
else:
print(
f"{Fore.RED}Invalid AGROVOC ({field_name}): {Fore.RESET}{value}"
)
# value is invalid AGROVOC, but we are not dropping
values.append(value)
else:
# value is valid AGROVOC so save it
values.append(value)
# Create a new field consisting of all values joined with "||"
new_field = "||".join(values)
return new_field
def filename_extension(field):
"""Check filename extension.
CSVs with a 'filename' column are likely meant as input for the SAFBuilder
tool, which creates a Simple Archive Format bundle for importing metadata
with accompanying PDFs or other files into DSpace.
This check warns if a filename has an uncommon extension (that is, other
than .pdf, .xls(x), .doc(x), ppt(x), case insensitive).
"""
# Skip fields with missing values
if pd.isna(field):
return
# Try to split multi-value field on "||" separator
values = field.split("||")
# List of common filename extentions
common_filename_extensions = [
".pdf",
".doc",
".docx",
".ppt",
".pptx",
".xls",
".xlsx",
]
# Iterate over all values
for value in values:
# Strip filename descriptions that are meant for SAF Bundler, for
# example: Annual_Report_2020.pdf__description:Report
if "__description" in value:
value = value.split("__")[0]
# Assume filename extension does not match
filename_extension_match = False
for filename_extension in common_filename_extensions:
# Check for extension at the end of the filename
pattern = re.escape(filename_extension) + r"$"
match = re.search(pattern, value, re.IGNORECASE)
if match is not None:
# Register the match and stop checking for this filename
filename_extension_match = True
break
if filename_extension_match is False:
print(f"{Fore.YELLOW}Filename with uncommon extension: {Fore.RESET}{value}")
return
def spdx_license_identifier(field):
"""Check if a license is a valid SPDX identifier.
Prints the value if it is invalid.
"""
# List of common non-SPDX licenses to ignore
# See: https://ilri.github.io/cgspace-submission-guidelines/dcterms-license/dcterms-license.txt
ignore_licenses = {
"All rights reserved; no re-use allowed",
"All rights reserved; self-archive copy only",
"Copyrighted; Non-commercial educational use only",
"Copyrighted; Non-commercial use only",
"Copyrighted; all rights reserved",
"Other",
}
# Skip fields with missing values
if pd.isna(field) or field in ignore_licenses:
return
spdx_licenses = load_spdx_licenses()
# Try to split multi-value field on "||" separator
for value in field.split("||"):
if value not in spdx_licenses:
print(f"{Fore.YELLOW}Non-SPDX license identifier: {Fore.RESET}{value}")
return
def duplicate_items(df):
"""Attempt to identify duplicate items.
First we check the total number of titles and compare it with the number of
unique titles. If there are less unique titles than total titles we expand
the search by creating a key (of sorts) for each item that includes their
title, type, and date issued, and compare it with all the others. If there
are multiple occurrences of the same title, type, date string then it's a
very good indicator that the items are duplicates.
"""
# Extract the names of the title, type, and date issued columns so we can
# reference them later. First we filter columns by likely patterns, then
# we extract the name from the first item of the resulting object, ie:
#
# Index(['dcterms.title[en_US]'], dtype='object')
#
# But, we need to consider that dc.title.alternative might come before the
# main title in the CSV, so use a negative lookahead to eliminate that.
#
# See: https://regex101.com/r/elyXkW/1
title_column_name = df.filter(
regex=r"^(dc|dcterms)\.title(?!\.alternative).*$"
).columns[0]
type_column_name = df.filter(regex=r"^(dcterms\.type|dc\.type).*$").columns[0]
date_column_name = df.filter(
regex=r"^(dcterms\.issued|dc\.date\.accessioned).*$"
).columns[0]
items_count_total = df[title_column_name].count()
items_count_unique = df[title_column_name].nunique()
if items_count_unique < items_count_total:
# Create a list to hold our items while we check for duplicates
items = []
for index, row in df.iterrows():
item_title_type_date = f"{row[title_column_name]}{row[type_column_name]}{row[date_column_name]}"
if item_title_type_date in items:
print(
f"{Fore.YELLOW}Possible duplicate ({title_column_name}): {Fore.RESET}{row[title_column_name]}"
)
else:
items.append(item_title_type_date)
def mojibake(field, field_name):
"""Check for mojibake (text that was encoded in one encoding and decoded in
in another, perhaps multiple times). See util.py.
Prints the string if it contains suspected mojibake.
"""
# Skip fields with missing values
if pd.isna(field):
return
if is_mojibake(field):
print(
f"{Fore.YELLOW}Possible encoding issue ({field_name}): {Fore.RESET}{field}"
)
return
def citation_doi(row, exclude):
"""Check for the scenario where an item has a DOI listed in its citation,
but does not have a cg.identifier.doi field.
Function prints a warning if the DOI field is missing, but there is a DOI
in the citation.
"""
# Check if the user requested us to skip any DOI fields so we can
# just return before going any further.
for field in exclude:
match = re.match(r"^.*?doi.*$", field)
if match is not None:
return
# Initialize some variables at global scope so that we can set them in the
# loop scope below and still be able to access them afterwards.
citation = ""
# Iterate over the labels of the current row's values to check if a DOI
# exists. If not, then we extract the citation to see if there is a DOI
# listed there.
for label in row.axes[0]:
# Skip fields with missing values
if pd.isna(row[label]):
continue
# If a DOI field exists we don't need to check the citation
match = re.match(r"^.*?doi.*$", label)
if match is not None:
return
# Check if the current label is a citation field and make sure the user
# hasn't asked to skip it. If not, then set the citation.
match = re.match(r"^.*?[cC]itation.*$", label)
if match is not None and label not in exclude:
citation = row[label]
if citation != "":
# Check the citation for "doi: 10.1186/1743-422X-9-218"
doi_match1 = re.match(r"^.*?doi:\s.*$", citation)
# Check the citation for a DOI URL (doi.org, dx.doi.org, etc)
doi_match2 = re.match(r"^.*?doi\.org.*$", citation)
if doi_match1 is not None or doi_match2 is not None:
print(
f"{Fore.YELLOW}DOI in citation, but missing a DOI field: {Fore.RESET}{citation}"
)
return
def title_in_citation(row, exclude):
"""Check for the scenario where an item's title is missing from its cita-
tion. This could mean that it is missing entirely, or perhaps just exists
in a different format (whitespace, accents, etc).
Function prints a warning if the title does not appear in the citation.
"""
# Initialize some variables at global scope so that we can set them in the
# loop scope below and still be able to access them afterwards.
title = ""
citation = ""
# Iterate over the labels of the current row's values to get the names of
# the title and citation columns. Then we check if the title is present in
# the citation.
for label in row.axes[0]:
# Skip fields with missing values
if pd.isna(row[label]):
continue
# Find the name of the title column
match = re.match(r"^(dc|dcterms)\.title.*$", label)
if match is not None and label not in exclude:
title = row[label]
# Find the name of the citation column
match = re.match(r"^.*?[cC]itation.*$", label)
if match is not None and label not in exclude:
citation = row[label]
if citation != "":
if title not in citation:
print(f"{Fore.YELLOW}Title is not present in citation: {Fore.RESET}{title}")
return
def countries_match_regions(row, exclude):
"""Check for the scenario where an item has country coverage metadata, but
does not have the corresponding region metadata. For example, an item that
has country coverage "Kenya" should also have region "Eastern Africa" acc-
ording to the UN M.49 classification scheme.
See: https://unstats.un.org/unsd/methodology/m49/
Function prints a warning if the appropriate region is not present.
"""
# Initialize some variables at global scope so that we can set them in the
# loop scope below and still be able to access them afterwards.
country_column_name = ""
region_column_name = ""
title_column_name = ""
# Instantiate a CountryConverter() object here. According to the docs it is
# more performant to do that as opposed to calling coco.convert() directly
# because we don't need to re-load the country data with each iteration.
cc = coco.CountryConverter()
# Set logging to ERROR so country_converter's convert() doesn't print the
# "not found in regex" warning message to the screen.
logging.basicConfig(level=logging.ERROR)
# Iterate over the labels of the current row's values to get the names of
# the title and citation columns. Then we check if the title is present in
# the citation.
for label in row.axes[0]:
# Find the name of the country column
match = re.match(r"^.*?country.*$", label)
if match is not None:
country_column_name = label
# Find the name of the region column, but make sure it's not subregion!
match = re.match(r"^.*?region.*$", label)
if match is not None and "sub" not in label:
region_column_name = label
# Find the name of the title column
match = re.match(r"^(dc|dcterms)\.title.*$", label)
if match is not None:
title_column_name = label
# Make sure the user has not asked to exclude any metadata fields. If so, we
# should return immediately.
column_names = [country_column_name, region_column_name, title_column_name]
if any(field in column_names for field in exclude):
return
# Make sure we found the country and region columns
if country_column_name != "" and region_column_name != "":
# If we don't have any countries then we should return early before
# suggesting regions.
if row[country_column_name] is not None:
countries = row[country_column_name].split("||")
else:
return
if row[region_column_name] is not None:
regions = row[region_column_name].split("||")
else:
regions = []
for country in countries:
# Look up the UN M.49 regions for this country code. CoCo seems to
# only list the direct region, ie Western Africa, rather than all
# the parent regions ("Sub-Saharan Africa", "Africa", "World")
un_region = cc.convert(names=country, to="UNRegion")
if un_region != "not found" and un_region not in regions:
try:
print(
f"{Fore.YELLOW}Missing region ({country} → {un_region}): {Fore.RESET}{row[title_column_name]}"
)
except KeyError:
print(
f"{Fore.YELLOW}Missing region ({country} → {un_region}): {Fore.RESET}<title field not present>"
)
return

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,99 @@
# SPDX-License-Identifier: GPL-3.0-only
import re
import pandas as pd
import py3langid as langid
from colorama import Fore
from pycountry import languages
def correct_language(row, exclude):
"""Analyze the text used in the title, abstract, and citation fields to pre-
dict the language being used and compare it with the item's dc.language.iso
field.
Function prints an error if the language field does not match the detected
language and returns the value in the language field if it does match.
"""
# Initialize some variables at global scope so that we can set them in the
# loop scope below and still be able to access them afterwards.
language = ""
sample_strings = []
title = None
# Iterate over the labels of the current row's values. Before we transposed
# the DataFrame these were the columns in the CSV, ie dc.title and dc.type.
for label in row.axes[0]:
# Skip fields with missing values
if pd.isna(row[label]):
continue
# Check if current row has multiple language values (separated by "||")
match = re.match(r"^.*?language.*$", label)
if match is not None:
# Skip fields with multiple language values
if "||" in row[label]:
return
language = row[label]
# Extract title if it is present (note that we don't allow excluding
# the title here because it complicates things).
match = re.match(r"^.*?title.*$", label)
if match is not None:
title = row[label]
# Append title to sample strings
sample_strings.append(row[label])
# Extract abstract if it is present
match = re.match(r"^.*?abstract.*$", label)
if match is not None and label not in exclude:
sample_strings.append(row[label])
# Extract citation if it is present
match = re.match(r"^.*?[cC]itation.*$", label)
if match is not None and label not in exclude:
sample_strings.append(row[label])
# Make sure language is not blank and is valid ISO 639-1/639-3 before proceeding with language prediction
if language != "":
# Check language value like "es"
if len(language) == 2:
if not languages.get(alpha_2=language):
return
# Check language value like "spa"
elif len(language) == 3:
if not languages.get(alpha_3=language):
return
# Language value is something else like "Span", do not proceed
else:
return
# Language is blank, do not proceed
else:
return
# Concatenate all sample strings into one string
sample_text = " ".join(sample_strings)
# Restrict the langid detection space to reduce false positives
langid.set_languages(
["ar", "de", "en", "es", "fr", "hi", "it", "ja", "ko", "pt", "ru", "vi", "zh"]
)
langid_classification = langid.classify(sample_text)
# langid returns an ISO 639-1 (alpha 2) representation of the detected language, but the current item's language field might be ISO 639-3 (alpha 3) so we should use a pycountry Language object to compare both represenations and give appropriate error messages that match the format used by in the input file.
detected_language = languages.get(alpha_2=langid_classification[0])
if len(language) == 2 and language != detected_language.alpha_2:
print(
f"{Fore.YELLOW}Possibly incorrect language {language} (detected {detected_language.alpha_2}): {Fore.RESET}{title}"
)
elif len(language) == 3 and language != detected_language.alpha_3:
print(
f"{Fore.YELLOW}Possibly incorrect language {language} (detected {detected_language.alpha_3}): {Fore.RESET}{title}"
)
else:
return

482
src/csv_metadata_quality/fix.py Executable file
View File

@ -0,0 +1,482 @@
# SPDX-License-Identifier: GPL-3.0-only
import logging
import re
from unicodedata import normalize
import country_converter as coco
import pandas as pd
from colorama import Fore
from ftfy import TextFixerConfig, fix_text
from csv_metadata_quality.util import is_mojibake, is_nfc
def whitespace(field, field_name):
"""Fix whitespace issues.
Return string with leading, trailing, and consecutive whitespace trimmed.
"""
# Skip fields with missing values
if pd.isna(field):
return
# Initialize an empty list to hold the cleaned values
values = []
# Try to split multi-value field on "||" separator
for value in field.split("||"):
# Strip leading and trailing whitespace
value = value.strip()
# Replace excessive whitespace (>2) with one space
pattern = re.compile(r"\s{2,}")
match = re.findall(pattern, value)
if match:
print(
f"{Fore.GREEN}Removing excessive whitespace ({field_name}): {Fore.RESET}{value}"
)
value = re.sub(pattern, " ", value)
# Save cleaned value
values.append(value)
# Create a new field consisting of all values joined with "||"
new_field = "||".join(values)
return new_field
def separators(field, field_name):
"""Fix for invalid and unnecessary multi-value separators, for example:
value|value
value|||value
value||value||
Prints the field with the invalid multi-value separator.
"""
# Skip fields with missing values
if pd.isna(field):
return
# Initialize an empty list to hold the cleaned values
values = []
# Try to split multi-value field on "||" separator
for value in field.split("||"):
# Check if the value is blank and skip it
if value == "":
print(
f"{Fore.GREEN}Fixing unnecessary multi-value separator ({field_name}): {Fore.RESET}{field}"
)
continue
# After splitting, see if there are any remaining "|" characters
pattern = re.compile(r"\|")
match = re.findall(pattern, value)
if match:
print(
f"{Fore.GREEN}Fixing invalid multi-value separator ({field_name}): {Fore.RESET}{value}"
)
value = re.sub(pattern, "||", value)
# Save cleaned value
values.append(value)
# Create a new field consisting of all values joined with "||"
new_field = "||".join(values)
return new_field
def unnecessary_unicode(field):
"""Remove and replace unnecessary Unicode characters.
Removes unnecessary Unicode characters like:
- Zero-width space (U+200B)
- Replacement character (U+FFFD)
Replaces unnecessary Unicode characters like:
- Soft hyphen (U+00AD) → hyphen
- No-break space (U+00A0) → space
- Thin space (U+2009) → space
Return string with characters removed or replaced.
"""
# Skip fields with missing values
if pd.isna(field):
return
# Check for zero-width space characters (U+200B)
pattern = re.compile(r"\u200B")
match = re.findall(pattern, field)
if match:
print(f"{Fore.GREEN}Removing unnecessary Unicode (U+200B): {Fore.RESET}{field}")
field = re.sub(pattern, "", field)
# Check for replacement characters (U+FFFD)
pattern = re.compile(r"\uFFFD")
match = re.findall(pattern, field)
if match:
print(f"{Fore.GREEN}Removing unnecessary Unicode (U+FFFD): {Fore.RESET}{field}")
field = re.sub(pattern, "", field)
# Check for no-break spaces (U+00A0)
pattern = re.compile(r"\u00A0")
match = re.findall(pattern, field)
if match:
print(
f"{Fore.GREEN}Replacing unnecessary Unicode (U+00A0): {Fore.RESET}{field}"
)
field = re.sub(pattern, " ", field)
# Check for soft hyphens (U+00AD), sometimes preceeded with a normal hyphen
pattern = re.compile(r"\u002D*?\u00AD")
match = re.findall(pattern, field)
if match:
print(
f"{Fore.GREEN}Replacing unnecessary Unicode (U+00AD): {Fore.RESET}{field}"
)
field = re.sub(pattern, "-", field)
# Check for thin spaces (U+2009)
pattern = re.compile(r"\u2009")
match = re.findall(pattern, field)
if match:
print(
f"{Fore.GREEN}Replacing unnecessary Unicode (U+2009): {Fore.RESET}{field}"
)
field = re.sub(pattern, " ", field)
return field
def duplicates(field, field_name):
"""Remove duplicate metadata values."""
# Skip fields with missing values
if pd.isna(field):
return
# Try to split multi-value field on "||" separator
values = field.split("||")
# Initialize an empty list to hold the de-duplicated values
new_values = []
# Iterate over all values
for value in values:
# Check if each value exists in our list of values already
if value not in new_values:
new_values.append(value)
else:
print(
f"{Fore.GREEN}Removing duplicate value ({field_name}): {Fore.RESET}{value}"
)
# Create a new field consisting of all values joined with "||"
new_field = "||".join(new_values)
return new_field
def newlines(field, field_name):
"""Fix newlines.
Single metadata values should not span multiple lines because this is not
rendered properly in DSpace's XMLUI and even causes issues during import.
Implementation note: this currently only detects Unix line feeds (0x0a).
This is essentially when a user presses "Enter" to move to the next line.
Other newlines like the Windows carriage return are already handled with
the string stipping performed in the whitespace fixes.
Confusingly, in Vim '\n' matches a line feed when searching, but you must
use '\r' to *insert* a line feed, ie in a search and replace expression.
Return string with newlines removed.
"""
# Skip fields with missing values
if pd.isna(field):
return
# Check for Unix line feed (LF)
match = re.findall(r"\n", field)
if match:
print(f"{Fore.GREEN}Removing newline ({field_name}): {Fore.RESET}{field}")
field = field.replace("\n", "")
return field
def comma_space(field, field_name):
"""Fix occurrences of commas missing a trailing space, for example:
Orth,Alan S.
This is a very common mistake in author and citation fields.
Return string with a space added.
"""
# Skip fields with missing values
if pd.isna(field):
return
# Check for comma followed by a word character
match = re.findall(r",\w", field)
if match:
print(
f"{Fore.GREEN}Adding space after comma ({field_name}): {Fore.RESET}{field}"
)
field = re.sub(r",(\w)", r", \1", field)
return field
def normalize_unicode(field, field_name):
"""Fix occurrences of decomposed Unicode characters by normalizing them
with NFC to their canonical forms, for example:
Ouédraogo, Mathieu → Ouédraogo, Mathieu
Return normalized string.
"""
# Skip fields with missing values
if pd.isna(field):
return
# Check if the current string is using normalized Unicode (NFC)
if not is_nfc(field):
print(f"{Fore.GREEN}Normalizing Unicode ({field_name}): {Fore.RESET}{field}")
field = normalize("NFC", field)
return field
def mojibake(field, field_name):
"""Attempts to fix mojibake (text that was encoded in one encoding and deco-
ded in another, perhaps multiple times). See util.py.
Return fixed string.
"""
# Skip fields with missing values
if pd.isna(field):
return field
# We don't want ftfy to change “smart quotes” to "ASCII quotes"
config = TextFixerConfig(uncurl_quotes=False)
if is_mojibake(field):
print(f"{Fore.GREEN}Fixing encoding issue ({field_name}): {Fore.RESET}{field}")
return fix_text(field, config)
else:
return field
def countries_match_regions(row, exclude):
"""Check for the scenario where an item has country coverage metadata, but
does not have the corresponding region metadata. For example, an item that
has country coverage "Kenya" should also have region "Eastern Africa" acc-
ording to the UN M.49 classification scheme.
See: https://unstats.un.org/unsd/methodology/m49/
Return fixed string.
"""
# Initialize some variables at global scope so that we can set them in the
# loop scope below and still be able to access them afterwards.
country_column_name = ""
region_column_name = ""
title_column_name = ""
# Instantiate a CountryConverter() object here. According to the docs it is
# more performant to do that as opposed to calling coco.convert() directly
# because we don't need to re-load the country data with each iteration.
cc = coco.CountryConverter()
# Set logging to ERROR so country_converter's convert() doesn't print the
# "not found in regex" warning message to the screen.
logging.basicConfig(level=logging.ERROR)
# Iterate over the labels of the current row's values to get the names of
# the title and citation columns. Then we check if the title is present in
# the citation.
for label in row.axes[0]:
# Find the name of the country column
match = re.match(r"^.*?country.*$", label)
if match is not None:
country_column_name = label
# Find the name of the region column, but make sure it's not subregion!
match = re.match(r"^.*?region.*$", label)
if match is not None and "sub" not in label:
region_column_name = label
# Find the name of the title column
match = re.match(r"^(dc|dcterms)\.title.*$", label)
if match is not None:
title_column_name = label
# Make sure the user has not asked to exclude any metadata fields. If so, we
# should return immediately.
column_names = [country_column_name, region_column_name, title_column_name]
if any(field in column_names for field in exclude):
return row
# Make sure we found the country and region columns
if country_column_name != "" and region_column_name != "":
# If we don't have any countries then we should return early before
# suggesting regions.
if row[country_column_name] is not None:
countries = row[country_column_name].split("||")
else:
return row
if row[region_column_name] is not None:
regions = row[region_column_name].split("||")
else:
regions = []
# An empty list for our regions so we can keep track for all countries
missing_regions = []
for country in countries:
# Look up the UN M.49 regions for this country code. CoCo seems to
# only list the direct region, ie Western Africa, rather than all
# the parent regions ("Sub-Saharan Africa", "Africa", "World")
un_region = cc.convert(names=country, to="UNRegion")
# Add the new un_region to regions if it is not "not found" and if
# it doesn't already exist in regions.
if un_region != "not found" and un_region not in regions:
if un_region not in missing_regions:
try:
print(
f"{Fore.YELLOW}Adding missing region ({un_region}): {Fore.RESET}{row[title_column_name]}"
)
except KeyError:
# If there is no title column in the CSV we will print
# the fix without the title instead of crashing.
print(
f"{Fore.YELLOW}Adding missing region ({un_region}): {Fore.RESET}<title field not present>"
)
missing_regions.append(un_region)
if len(missing_regions) > 0:
# Add the missing regions back to the row, paying attention to whether
# or not the row's region column is None (aka null) or just an empty
# string (length would be 0).
if row[region_column_name] is not None and len(row[region_column_name]) > 0:
row[region_column_name] = (
row[region_column_name] + "||" + "||".join(missing_regions)
)
else:
row[region_column_name] = "||".join(missing_regions)
return row
def normalize_dois(field):
"""Normalize DOIs.
DOIs are meant to be globally unique identifiers. They are case insensitive,
but in order to compare them robustly they should be normalized to a common
format:
- strip leading and trailing whitespace
- lowercase all ASCII characters
- convert all variations to https://doi.org/10.xxxx/xxxx URI format
Return string with normalized DOI.
See: https://www.crossref.org/documentation/member-setup/constructing-your-dois/
"""
# Skip fields with missing values
if pd.isna(field):
return
# Try to split multi-value field on "||" separator
values = field.split("||")
# Initialize an empty list to hold the de-duplicated values
new_values = []
# Iterate over all values (most items will only have one DOI)
for value in values:
# Strip leading and trailing whitespace
new_value = value.strip()
new_value = new_value.lower()
# Convert to HTTPS
pattern = re.compile(r"^http://")
match = re.findall(pattern, new_value)
if match:
new_value = re.sub(pattern, "https://", new_value)
# Convert dx.doi.org to doi.org
pattern = re.compile(r"dx\.doi\.org")
match = re.findall(pattern, new_value)
if match:
new_value = re.sub(pattern, "doi.org", new_value)
# Convert www.doi.org to doi.org
pattern = re.compile(r"www\.doi\.org")
match = re.findall(pattern, new_value)
if match:
new_value = re.sub(pattern, "doi.org", new_value)
# Convert erroneous %2f to /
pattern = re.compile("%2f")
match = re.findall(pattern, new_value)
if match:
new_value = re.sub(pattern, "/", new_value)
# Replace values like doi: 10.11648/j.jps.20140201.14
pattern = re.compile(r"^doi: 10\.")
match = re.findall(pattern, new_value)
if match:
new_value = re.sub(pattern, "https://doi.org/10.", new_value)
# Replace values like 10.3390/foods12010115
pattern = re.compile(r"^10\.")
match = re.findall(pattern, new_value)
if match:
new_value = re.sub(pattern, "https://doi.org/10.", new_value)
if new_value != value:
print(f"{Fore.GREEN}Normalized DOI: {Fore.RESET}{value}")
new_values.append(new_value)
new_field = "||".join(new_values)
return new_field

View File

@ -0,0 +1,65 @@
# SPDX-License-Identifier: GPL-3.0-only
import json
import os
from ftfy.badness import is_bad
def is_nfc(field):
"""Utility function to check whether a string is using normalized Unicode.
Python's built-in unicodedata library has the is_normalized() function, but
it was only introduced in Python 3.8. By using a simple utility function we
are able to run on Python >= 3.6 again.
See: https://docs.python.org/3/library/unicodedata.html
Return boolean.
"""
from unicodedata import normalize
return field == normalize("NFC", field)
def is_mojibake(field):
"""Determines whether a string contains mojibake.
We commonly deal with CSV files that were *encoded* in UTF-8, but decoded
as something else like CP-1252 (Windows Latin). This manifests in the form
of "mojibake", for example:
- CIAT Publicaçao
- CIAT Publicación
This uses the excellent "fixes text for you" (ftfy) library to determine
whether a string contains characters that have been encoded in one encoding
and decoded in another.
Inspired by this code snippet from Martijn Pieters on StackOverflow:
https://stackoverflow.com/questions/29071995/identify-garbage-unicode-string-using-python
Return boolean.
"""
if not is_bad(field):
# Nothing weird, should be okay
return False
try:
field.encode("sloppy-windows-1252")
except UnicodeEncodeError:
# Not CP-1252 encodable, probably fine
return False
else:
# Encodable as CP-1252, Mojibake alert level high
return True
def load_spdx_licenses():
"""Returns a Python list of SPDX short license identifiers."""
with open(os.path.join(os.path.dirname(__file__), "data/licenses.json")) as f:
licenses = json.load(f)
# List comprehension to extract the license ID for each license
return [license["licenseId"] for license in licenses["licenses"]]

View File

@ -0,0 +1,3 @@
# SPDX-License-Identifier: GPL-3.0-only
VERSION = "0.7.0"