Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 126 additions & 43 deletions reproschema/redcap2reproschema.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import re
from pathlib import Path
from typing import Any, Dict
Expand All @@ -23,20 +24,24 @@
)


def process_input_value_types(input_type_rc, value_type_rc) -> (str, str):
def process_input_value_types(
input_type_rc, value_type_rc
) -> (str, str, dict):
"""
Process input type and value type to determine the final input type and value type,
that can be used by ReproSchema.
Process input type and value type to determine the final input type and value type.

Args:
input_type_rc (str): Input type from redcap form
value_type_rc (str): Value type from redcap form

Returns:
tuple: (input_type, value_type)
tuple: (input_type, value_type, additional_notes)
input_type (str): Final input type for ReproSchema
value_type (str): Final value type for ReproSchema
additional_notes (dict): Additional notes about custom types, or None
"""
additional_notes = None

# If input type in redcap is set but not recognized, raise an error
if input_type_rc not in INPUT_TYPE_MAP:
raise ValueError(
Expand All @@ -47,8 +52,20 @@ def process_input_value_types(input_type_rc, value_type_rc) -> (str, str):
input_type = INPUT_TYPE_MAP.get(input_type_rc)

if value_type_rc:
# Get value type using the new function
value_type = get_value_type(value_type_rc)
try:
# Try to get standard value type
value_type = get_value_type(value_type_rc)
except ValueError:
# If it fails, it's an unknown validation type
print(
f"Warning: Unrecognized validation type '{value_type_rc}', treating as string"
)
value_type = "xsd:string"
additional_notes = {
"source": "redcap",
"column": "Text Validation Type OR Show Slider Number",
"value": value_type_rc,
}

# Adjust input type based on validation
if value_type == "xsd:date" and input_type_rc == "text":
Expand All @@ -71,7 +88,7 @@ def process_input_value_types(input_type_rc, value_type_rc) -> (str, str):
else: # if no validation type is set, default to string
value_type = "xsd:string"

return input_type, value_type
return input_type, value_type, additional_notes


def process_response_options(row, input_type_rc, value_type) -> Dict[str, Any]:
Expand All @@ -83,13 +100,14 @@ def process_response_options(row, input_type_rc, value_type) -> Dict[str, Any]:
input_type_rc (str): Input type from redcap form
value_type (str): ReproSchema value type
Returns:
dict: Response options
dict: Response options and additional notes if any
"""
input_type = INPUT_TYPE_MAP[input_type_rc]
# Default response options
response_options = {"valueType": [value_type]}
additional_notes = None

# Handle specific input_type_rc that modify other properties (not only inputType and valueType)
# Handle specific input_type_rc that modify other properties
if input_type_rc == "yesno":
response_options["choices"] = [
{"name": {"en": "Yes"}, "value": 1},
Expand All @@ -104,17 +122,34 @@ def process_response_options(row, input_type_rc, value_type) -> Dict[str, Any]:
response_options["multipleChoice"] = True

if row.get("choices") and input_type:
if input_type in ["radio", "select", "slider", "text"]:
# We're checking input_type (ReproSchema type) here
if input_type in ["radio", "select", "slider", "text", "static"]:
choices, choices_val_type_l = process_choices(
row.get("choices"), item_name=row["item_name"]
)
if choices:
response_options.update(
{
"choices": choices,
"valueType": choices_val_type_l,
# We're checking input_type_rc (REDCap type) here
if input_type_rc == "descriptive":
print(
f"Info: Preserving choices for descriptive field {row['item_name']}"
)
# Store as additional notes instead of in response options
# Serialize the choices to a string to comply with additionalNotesObj model
additional_notes = {
"source": "redcap",
"column": "Choices, Calculations, OR Slider Labels (Descriptive Field)",
"value": json.dumps(
choices
), # Convert choices to a JSON string
}
)
else:
# For normal input types, process choices normally
response_options.update(
{
"choices": choices,
"valueType": choices_val_type_l,
}
)
if input_type == "slider":
response_options.update(
{
Expand All @@ -126,33 +161,39 @@ def process_response_options(row, input_type_rc, value_type) -> Dict[str, Any]:
pass # taken care below, it's not really choices
else:
print(
f"Warning/Error: Unexpected input type for choices in {row['item_name']}: input type {input_type} "
f"Warning: Unexpected input type for choices in {row['item_name']}: input type {input_type} "
f"(original in redcap: {input_type_rc}), values: {row.get('choices')}"
)
# raise ValueError(
# f"Unexpected input type '{input_type}' (original in redcap: {input_type_rc}) "
# f"for item with choices in {item['item_name']}"
# )

for key in RESPONSE_COND:
if row.get(key) is not None and str(row.get(key)).strip():
# Min/max validations only apply to numeric types
if value_type not in ["xsd:integer", "xsd:decimal"]:
print(
f"Warning: {key} is not supported for non-numeric type {value_type}. Skipping."
)
continue

try:
if value_type == "xsd:integer":
parsed_value = int(row[key])
elif value_type == "xsd:decimal":
parsed_value = float(row[key])
# Parse as float first to handle any numeric format
raw_value = float(row[key])

# If it's a whole number, store as integer for cleaner JSON
# Otherwise, keep as float
if raw_value.is_integer():
parsed_value = int(raw_value)
else:
print(
f"Warning: {key} is not supported for value types other than integer or decimal, value type provided is {value_type}"
)
continue
parsed_value = raw_value

response_options[key] = parsed_value

except ValueError:
print(
f"Warning/Error: Value {row[key]} is not a valid {value_type}"
f"Warning: Value '{row[key]}' for {key} is not a valid number"
)
continue
return response_options

return response_options, additional_notes


def process_choices(choices_str, item_name):
Expand Down Expand Up @@ -314,7 +355,7 @@ def process_row(
if not input_type_rc:
input_type_rc = "text"

input_type, value_type = process_input_value_types(
input_type, value_type, input_value_notes = process_input_value_types(
input_type_rc, value_type_rc
)
item_data = {
Expand All @@ -325,9 +366,10 @@ def process_row(
"ui": {"inputType": input_type},
}

item_data["responseOptions"] = process_response_options(
response_options, choices_notes = process_response_options(
row, input_type_rc, value_type
)
item_data["responseOptions"] = response_options

# setting readonly to true based on annotation and field type
if row.get("annotation"):
Expand Down Expand Up @@ -412,20 +454,60 @@ def process_row(
elif row.get("visibility"):
addProperties["isVis"] = normalize_condition(row.get("visibility"))

# Add custom validation type note and choices notes if present
if input_value_notes:
item_data.setdefault("additionalNotesObj", []).append(
input_value_notes
)
if choices_notes:
item_data.setdefault("additionalNotesObj", []).append(choices_notes)

return item_data, preamble_info_propagate, compute, addProperties


def process_csv(csv_file) -> (Dict[str, Any], list):
def process_csv(csv_file, encoding=None) -> (Dict[str, Any], list):
"""
Process a REDCap CSV file and extract structured data for items and activities.

Args:
csv_file: Path to the REDCap CSV file
encoding (str, optional): Specific encoding to use for the CSV file

Returns:
tuple: (activities, protocol_activities_order)
activities: Dictionary containing activity data
protocol_activities_order: List of activity names in order
"""
if encoding:
try:
df = pd.read_csv(csv_file, encoding=encoding, low_memory=False)
print(f"Using specified encoding: {encoding}")
except UnicodeDecodeError:
raise ValueError(
f"Failed to read CSV with specified encoding: {encoding}"
)
else:
# Try multiple encodings in order
encodings = ["utf-8-sig", "latin-1", "windows-1252", "cp1252"]
df = None

df = pd.read_csv(
csv_file, encoding="utf-8-sig", low_memory=False
) # utf-8-sig handles BOM automatically
for encoding in encodings:
try:
df = pd.read_csv(csv_file, encoding=encoding, low_memory=False)
print(f"Successfully read CSV with {encoding} encoding")
break
except UnicodeDecodeError:
print(
f"Failed to decode with {encoding}, trying next encoding..."
)

df.columns = df.columns.map(
lambda x: x.strip().strip('"')
) # some cleaning might not be needed
if df is None:
raise ValueError(
"Failed to read CSV file with any of the attempted encodings. "
"Please check the file encoding or convert it to UTF-8."
)

# Clean NaNs values in the dataframe
df.columns = df.columns.map(lambda x: x.strip().strip('"'))
df = df.astype(str).replace("nan", "")

# Validate required columns
Expand Down Expand Up @@ -496,7 +578,7 @@ def process_csv(csv_file) -> (Dict[str, Any], list):


def redcap2reproschema(
csv_file, yaml_file, output_path, schema_context_url=None
csv_file, yaml_file, output_path, schema_context_url=None, encoding=None
):
"""
Convert a REDCap data dictionary to Reproschema format.
Expand All @@ -506,6 +588,7 @@ def redcap2reproschema(
yaml_file (str/Path): Path to the YAML configuration file
output_path (str/Path): Path to the output directory
schema_context_url (str, optional): URL for the schema context
encoding (str, optional): Specific encoding to use for the CSV file

Raises:
ValueError: If required files are missing or invalid
Expand Down Expand Up @@ -533,8 +616,8 @@ def redcap2reproschema(
if schema_context_url is None:
schema_context_url = CONTEXTFILE_URL

# Process the CSV file and getting information about the activities and items
activities, prot_activities_order = process_csv(csv_path)
# Process the CSV file with the specified encoding
activities, prot_activities_order = process_csv(csv_path, encoding)

for activity_name, activity_data in activities.items():
create_activity_schema(
Expand Down