mirror of
https://github.com/ilri/csv-metadata-quality.git
synced 2025-01-03 09:37:05 +01:00
Alan Orth
f6018c51b6
Apply recommended fix from fixit: RewriteToLiteral: It's slower to call list() than using the empty literal, because the name list must be looked up in the global scope in case it has been rebound.
398 lines
12 KiB
Python
Executable File
398 lines
12 KiB
Python
Executable File
# SPDX-License-Identifier: GPL-3.0-only
|
|
|
|
import logging
|
|
import re
|
|
from unicodedata import normalize
|
|
|
|
import country_converter as coco
|
|
import pandas as pd
|
|
from colorama import Fore
|
|
from ftfy import TextFixerConfig, fix_text
|
|
|
|
from csv_metadata_quality.util import is_mojibake, is_nfc
|
|
|
|
|
|
def whitespace(field, field_name):
|
|
"""Fix whitespace issues.
|
|
|
|
Return string with leading, trailing, and consecutive whitespace trimmed.
|
|
"""
|
|
|
|
# Skip fields with missing values
|
|
if pd.isna(field):
|
|
return
|
|
|
|
# Initialize an empty list to hold the cleaned values
|
|
values = []
|
|
|
|
# Try to split multi-value field on "||" separator
|
|
for value in field.split("||"):
|
|
# Strip leading and trailing whitespace
|
|
value = value.strip()
|
|
|
|
# Replace excessive whitespace (>2) with one space
|
|
pattern = re.compile(r"\s{2,}")
|
|
match = re.findall(pattern, value)
|
|
|
|
if match:
|
|
print(
|
|
f"{Fore.GREEN}Removing excessive whitespace ({field_name}): {Fore.RESET}{value}"
|
|
)
|
|
value = re.sub(pattern, " ", value)
|
|
|
|
# Save cleaned value
|
|
values.append(value)
|
|
|
|
# Create a new field consisting of all values joined with "||"
|
|
new_field = "||".join(values)
|
|
|
|
return new_field
|
|
|
|
|
|
def separators(field, field_name):
|
|
"""Fix for invalid and unnecessary multi-value separators, for example:
|
|
|
|
value|value
|
|
value|||value
|
|
value||value||
|
|
|
|
Prints the field with the invalid multi-value separator.
|
|
"""
|
|
|
|
# Skip fields with missing values
|
|
if pd.isna(field):
|
|
return
|
|
|
|
# Initialize an empty list to hold the cleaned values
|
|
values = []
|
|
|
|
# Try to split multi-value field on "||" separator
|
|
for value in field.split("||"):
|
|
# Check if the value is blank and skip it
|
|
if value == "":
|
|
print(
|
|
f"{Fore.GREEN}Fixing unnecessary multi-value separator ({field_name}): {Fore.RESET}{field}"
|
|
)
|
|
|
|
continue
|
|
|
|
# After splitting, see if there are any remaining "|" characters
|
|
pattern = re.compile(r"\|")
|
|
match = re.findall(pattern, value)
|
|
|
|
if match:
|
|
print(
|
|
f"{Fore.GREEN}Fixing invalid multi-value separator ({field_name}): {Fore.RESET}{value}"
|
|
)
|
|
|
|
value = re.sub(pattern, "||", value)
|
|
|
|
# Save cleaned value
|
|
values.append(value)
|
|
|
|
# Create a new field consisting of all values joined with "||"
|
|
new_field = "||".join(values)
|
|
|
|
return new_field
|
|
|
|
|
|
def unnecessary_unicode(field):
|
|
"""Remove and replace unnecessary Unicode characters.
|
|
|
|
Removes unnecessary Unicode characters like:
|
|
- Zero-width space (U+200B)
|
|
- Replacement character (U+FFFD)
|
|
|
|
Replaces unnecessary Unicode characters like:
|
|
- Soft hyphen (U+00AD) → hyphen
|
|
- No-break space (U+00A0) → space
|
|
- Thin space (U+2009) → space
|
|
|
|
Return string with characters removed or replaced.
|
|
"""
|
|
|
|
# Skip fields with missing values
|
|
if pd.isna(field):
|
|
return
|
|
|
|
# Check for zero-width space characters (U+200B)
|
|
pattern = re.compile(r"\u200B")
|
|
match = re.findall(pattern, field)
|
|
|
|
if match:
|
|
print(f"{Fore.GREEN}Removing unnecessary Unicode (U+200B): {Fore.RESET}{field}")
|
|
field = re.sub(pattern, "", field)
|
|
|
|
# Check for replacement characters (U+FFFD)
|
|
pattern = re.compile(r"\uFFFD")
|
|
match = re.findall(pattern, field)
|
|
|
|
if match:
|
|
print(f"{Fore.GREEN}Removing unnecessary Unicode (U+FFFD): {Fore.RESET}{field}")
|
|
field = re.sub(pattern, "", field)
|
|
|
|
# Check for no-break spaces (U+00A0)
|
|
pattern = re.compile(r"\u00A0")
|
|
match = re.findall(pattern, field)
|
|
|
|
if match:
|
|
print(
|
|
f"{Fore.GREEN}Replacing unnecessary Unicode (U+00A0): {Fore.RESET}{field}"
|
|
)
|
|
field = re.sub(pattern, " ", field)
|
|
|
|
# Check for soft hyphens (U+00AD), sometimes preceeded with a normal hyphen
|
|
pattern = re.compile(r"\u002D*?\u00AD")
|
|
match = re.findall(pattern, field)
|
|
|
|
if match:
|
|
print(
|
|
f"{Fore.GREEN}Replacing unnecessary Unicode (U+00AD): {Fore.RESET}{field}"
|
|
)
|
|
field = re.sub(pattern, "-", field)
|
|
|
|
# Check for thin spaces (U+2009)
|
|
pattern = re.compile(r"\u2009")
|
|
match = re.findall(pattern, field)
|
|
|
|
if match:
|
|
print(
|
|
f"{Fore.GREEN}Replacing unnecessary Unicode (U+2009): {Fore.RESET}{field}"
|
|
)
|
|
field = re.sub(pattern, " ", field)
|
|
|
|
return field
|
|
|
|
|
|
def duplicates(field, field_name):
|
|
"""Remove duplicate metadata values."""
|
|
|
|
# Skip fields with missing values
|
|
if pd.isna(field):
|
|
return
|
|
|
|
# Try to split multi-value field on "||" separator
|
|
values = field.split("||")
|
|
|
|
# Initialize an empty list to hold the de-duplicated values
|
|
new_values = []
|
|
|
|
# Iterate over all values
|
|
for value in values:
|
|
# Check if each value exists in our list of values already
|
|
if value not in new_values:
|
|
new_values.append(value)
|
|
else:
|
|
print(
|
|
f"{Fore.GREEN}Removing duplicate value ({field_name}): {Fore.RESET}{value}"
|
|
)
|
|
|
|
# Create a new field consisting of all values joined with "||"
|
|
new_field = "||".join(new_values)
|
|
|
|
return new_field
|
|
|
|
|
|
def newlines(field, field_name):
|
|
"""Fix newlines.
|
|
|
|
Single metadata values should not span multiple lines because this is not
|
|
rendered properly in DSpace's XMLUI and even causes issues during import.
|
|
|
|
Implementation note: this currently only detects Unix line feeds (0x0a).
|
|
This is essentially when a user presses "Enter" to move to the next line.
|
|
Other newlines like the Windows carriage return are already handled with
|
|
the string stipping performed in the whitespace fixes.
|
|
|
|
Confusingly, in Vim '\n' matches a line feed when searching, but you must
|
|
use '\r' to *insert* a line feed, ie in a search and replace expression.
|
|
|
|
Return string with newlines removed.
|
|
"""
|
|
|
|
# Skip fields with missing values
|
|
if pd.isna(field):
|
|
return
|
|
|
|
# Check for Unix line feed (LF)
|
|
match = re.findall(r"\n", field)
|
|
|
|
if match:
|
|
print(f"{Fore.GREEN}Removing newline ({field_name}): {Fore.RESET}{field}")
|
|
field = field.replace("\n", "")
|
|
|
|
return field
|
|
|
|
|
|
def comma_space(field, field_name):
|
|
"""Fix occurrences of commas missing a trailing space, for example:
|
|
|
|
Orth,Alan S.
|
|
|
|
This is a very common mistake in author and citation fields.
|
|
|
|
Return string with a space added.
|
|
"""
|
|
|
|
# Skip fields with missing values
|
|
if pd.isna(field):
|
|
return
|
|
|
|
# Check for comma followed by a word character
|
|
match = re.findall(r",\w", field)
|
|
|
|
if match:
|
|
print(
|
|
f"{Fore.GREEN}Adding space after comma ({field_name}): {Fore.RESET}{field}"
|
|
)
|
|
field = re.sub(r",(\w)", r", \1", field)
|
|
|
|
return field
|
|
|
|
|
|
def normalize_unicode(field, field_name):
|
|
"""Fix occurrences of decomposed Unicode characters by normalizing them
|
|
with NFC to their canonical forms, for example:
|
|
|
|
Ouédraogo, Mathieu → Ouédraogo, Mathieu
|
|
|
|
Return normalized string.
|
|
"""
|
|
|
|
# Skip fields with missing values
|
|
if pd.isna(field):
|
|
return
|
|
|
|
# Check if the current string is using normalized Unicode (NFC)
|
|
if not is_nfc(field):
|
|
print(f"{Fore.GREEN}Normalizing Unicode ({field_name}): {Fore.RESET}{field}")
|
|
field = normalize("NFC", field)
|
|
|
|
return field
|
|
|
|
|
|
def mojibake(field, field_name):
|
|
"""Attempts to fix mojibake (text that was encoded in one encoding and deco-
|
|
ded in another, perhaps multiple times). See util.py.
|
|
|
|
Return fixed string.
|
|
"""
|
|
|
|
# Skip fields with missing values
|
|
if pd.isna(field):
|
|
return field
|
|
|
|
# We don't want ftfy to change “smart quotes” to "ASCII quotes"
|
|
config = TextFixerConfig(uncurl_quotes=False)
|
|
|
|
if is_mojibake(field):
|
|
print(f"{Fore.GREEN}Fixing encoding issue ({field_name}): {Fore.RESET}{field}")
|
|
|
|
return fix_text(field, config)
|
|
else:
|
|
return field
|
|
|
|
|
|
def countries_match_regions(row, exclude):
|
|
"""Check for the scenario where an item has country coverage metadata, but
|
|
does not have the corresponding region metadata. For example, an item that
|
|
has country coverage "Kenya" should also have region "Eastern Africa" acc-
|
|
ording to the UN M.49 classification scheme.
|
|
|
|
See: https://unstats.un.org/unsd/methodology/m49/
|
|
|
|
Return fixed string.
|
|
"""
|
|
# Initialize some variables at global scope so that we can set them in the
|
|
# loop scope below and still be able to access them afterwards.
|
|
country_column_name = ""
|
|
region_column_name = ""
|
|
title_column_name = ""
|
|
|
|
# Instantiate a CountryConverter() object here. According to the docs it is
|
|
# more performant to do that as opposed to calling coco.convert() directly
|
|
# because we don't need to re-load the country data with each iteration.
|
|
cc = coco.CountryConverter()
|
|
|
|
# Set logging to ERROR so country_converter's convert() doesn't print the
|
|
# "not found in regex" warning message to the screen.
|
|
logging.basicConfig(level=logging.ERROR)
|
|
|
|
# Iterate over the labels of the current row's values to get the names of
|
|
# the title and citation columns. Then we check if the title is present in
|
|
# the citation.
|
|
for label in row.axes[0]:
|
|
# Find the name of the country column
|
|
match = re.match(r"^.*?country.*$", label)
|
|
if match is not None:
|
|
country_column_name = label
|
|
|
|
# Find the name of the region column, but make sure it's not subregion!
|
|
match = re.match(r"^.*?region.*$", label)
|
|
if match is not None and "sub" not in label:
|
|
region_column_name = label
|
|
|
|
# Find the name of the title column
|
|
match = re.match(r"^(dc|dcterms)\.title.*$", label)
|
|
if match is not None:
|
|
title_column_name = label
|
|
|
|
# Make sure the user has not asked to exclude any metadata fields. If so, we
|
|
# should return immediately.
|
|
column_names = [country_column_name, region_column_name, title_column_name]
|
|
if any(field in column_names for field in exclude):
|
|
return row
|
|
|
|
# Make sure we found the country and region columns
|
|
if country_column_name != "" and region_column_name != "":
|
|
# If we don't have any countries then we should return early before
|
|
# suggesting regions.
|
|
if row[country_column_name] is not None:
|
|
countries = row[country_column_name].split("||")
|
|
else:
|
|
return row
|
|
|
|
if row[region_column_name] is not None:
|
|
regions = row[region_column_name].split("||")
|
|
else:
|
|
regions = []
|
|
|
|
# An empty list for our regions so we can keep track for all countries
|
|
missing_regions = []
|
|
|
|
for country in countries:
|
|
# Look up the UN M.49 regions for this country code. CoCo seems to
|
|
# only list the direct region, ie Western Africa, rather than all
|
|
# the parent regions ("Sub-Saharan Africa", "Africa", "World")
|
|
un_region = cc.convert(names=country, to="UNRegion")
|
|
|
|
# Add the new un_region to regions if it is not "not found" and if
|
|
# it doesn't already exist in regions.
|
|
if un_region != "not found" and un_region not in regions:
|
|
if un_region not in missing_regions:
|
|
try:
|
|
print(
|
|
f"{Fore.YELLOW}Adding missing region ({un_region}): {Fore.RESET}{row[title_column_name]}"
|
|
)
|
|
except KeyError:
|
|
# If there is no title column in the CSV we will print
|
|
# the fix without the title instead of crashing.
|
|
print(
|
|
f"{Fore.YELLOW}Adding missing region ({un_region}): {Fore.RESET}<title field not present>"
|
|
)
|
|
|
|
missing_regions.append(un_region)
|
|
|
|
if len(missing_regions) > 0:
|
|
# Add the missing regions back to the row, paying attention to whether
|
|
# or not the row's region column is None (aka null) or just an empty
|
|
# string (length would be 0).
|
|
if row[region_column_name] is not None and len(row[region_column_name]) > 0:
|
|
row[region_column_name] = (
|
|
row[region_column_name] + "||" + "||".join(missing_regions)
|
|
)
|
|
else:
|
|
row[region_column_name] = "||".join(missing_regions)
|
|
|
|
return row
|