diff --git a/reproschema/redcap2reproschema.py b/reproschema/redcap2reproschema.py index c6f0b20..33dfda3 100644 --- a/reproschema/redcap2reproschema.py +++ b/reproschema/redcap2reproschema.py @@ -1,3 +1,4 @@ +import json import re from pathlib import Path from typing import Any, Dict @@ -23,20 +24,24 @@ ) -def process_input_value_types(input_type_rc, value_type_rc) -> (str, str): +def process_input_value_types( + input_type_rc, value_type_rc +) -> (str, str, dict): """ - Process input type and value type to determine the final input type and value type, - that can be used by ReproSchema. + Process input type and value type to determine the final input type and value type. Args: input_type_rc (str): Input type from redcap form value_type_rc (str): Value type from redcap form Returns: - tuple: (input_type, value_type) + tuple: (input_type, value_type, additional_notes) input_type (str): Final input type for ReproSchema value_type (str): Final value type for ReproSchema + additional_notes (dict): Additional notes about custom types, or None """ + additional_notes = None + # If input type in redcap is set but not recognized, raise an error if input_type_rc not in INPUT_TYPE_MAP: raise ValueError( @@ -47,8 +52,20 @@ def process_input_value_types(input_type_rc, value_type_rc) -> (str, str): input_type = INPUT_TYPE_MAP.get(input_type_rc) if value_type_rc: - # Get value type using the new function - value_type = get_value_type(value_type_rc) + try: + # Try to get standard value type + value_type = get_value_type(value_type_rc) + except ValueError: + # If it fails, it's an unknown validation type + print( + f"Warning: Unrecognized validation type '{value_type_rc}', treating as string" + ) + value_type = "xsd:string" + additional_notes = { + "source": "redcap", + "column": "Text Validation Type OR Show Slider Number", + "value": value_type_rc, + } # Adjust input type based on validation if value_type == "xsd:date" and input_type_rc == "text": @@ -71,7 +88,7 @@ def process_input_value_types(input_type_rc, value_type_rc) -> (str, str): else: # if no validation type is set, default to string value_type = "xsd:string" - return input_type, value_type + return input_type, value_type, additional_notes def process_response_options(row, input_type_rc, value_type) -> Dict[str, Any]: @@ -83,13 +100,14 @@ def process_response_options(row, input_type_rc, value_type) -> Dict[str, Any]: input_type_rc (str): Input type from redcap form value_type (str): ReproSchema value type Returns: - dict: Response options + dict: Response options and additional notes if any """ input_type = INPUT_TYPE_MAP[input_type_rc] # Default response options response_options = {"valueType": [value_type]} + additional_notes = None - # Handle specific input_type_rc that modify other properties (not only inputType and valueType) + # Handle specific input_type_rc that modify other properties if input_type_rc == "yesno": response_options["choices"] = [ {"name": {"en": "Yes"}, "value": 1}, @@ -104,17 +122,34 @@ def process_response_options(row, input_type_rc, value_type) -> Dict[str, Any]: response_options["multipleChoice"] = True if row.get("choices") and input_type: - if input_type in ["radio", "select", "slider", "text"]: + # We're checking input_type (ReproSchema type) here + if input_type in ["radio", "select", "slider", "text", "static"]: choices, choices_val_type_l = process_choices( row.get("choices"), item_name=row["item_name"] ) if choices: - response_options.update( - { - "choices": choices, - "valueType": choices_val_type_l, + # We're checking input_type_rc (REDCap type) here + if input_type_rc == "descriptive": + print( + f"Info: Preserving choices for descriptive field {row['item_name']}" + ) + # Store as additional notes instead of in response options + # Serialize the choices to a string to comply with additionalNotesObj model + additional_notes = { + "source": "redcap", + "column": "Choices, Calculations, OR Slider Labels (Descriptive Field)", + "value": json.dumps( + choices + ), # Convert choices to a JSON string } - ) + else: + # For normal input types, process choices normally + response_options.update( + { + "choices": choices, + "valueType": choices_val_type_l, + } + ) if input_type == "slider": response_options.update( { @@ -126,33 +161,39 @@ def process_response_options(row, input_type_rc, value_type) -> Dict[str, Any]: pass # taken care below, it's not really choices else: print( - f"Warning/Error: Unexpected input type for choices in {row['item_name']}: input type {input_type} " + f"Warning: Unexpected input type for choices in {row['item_name']}: input type {input_type} " f"(original in redcap: {input_type_rc}), values: {row.get('choices')}" ) - # raise ValueError( - # f"Unexpected input type '{input_type}' (original in redcap: {input_type_rc}) " - # f"for item with choices in {item['item_name']}" - # ) for key in RESPONSE_COND: if row.get(key) is not None and str(row.get(key)).strip(): + # Min/max validations only apply to numeric types + if value_type not in ["xsd:integer", "xsd:decimal"]: + print( + f"Warning: {key} is not supported for non-numeric type {value_type}. Skipping." + ) + continue + try: - if value_type == "xsd:integer": - parsed_value = int(row[key]) - elif value_type == "xsd:decimal": - parsed_value = float(row[key]) + # Parse as float first to handle any numeric format + raw_value = float(row[key]) + + # If it's a whole number, store as integer for cleaner JSON + # Otherwise, keep as float + if raw_value.is_integer(): + parsed_value = int(raw_value) else: - print( - f"Warning: {key} is not supported for value types other than integer or decimal, value type provided is {value_type}" - ) - continue + parsed_value = raw_value + response_options[key] = parsed_value + except ValueError: print( - f"Warning/Error: Value {row[key]} is not a valid {value_type}" + f"Warning: Value '{row[key]}' for {key} is not a valid number" ) continue - return response_options + + return response_options, additional_notes def process_choices(choices_str, item_name): @@ -314,7 +355,7 @@ def process_row( if not input_type_rc: input_type_rc = "text" - input_type, value_type = process_input_value_types( + input_type, value_type, input_value_notes = process_input_value_types( input_type_rc, value_type_rc ) item_data = { @@ -325,9 +366,10 @@ def process_row( "ui": {"inputType": input_type}, } - item_data["responseOptions"] = process_response_options( + response_options, choices_notes = process_response_options( row, input_type_rc, value_type ) + item_data["responseOptions"] = response_options # setting readonly to true based on annotation and field type if row.get("annotation"): @@ -412,20 +454,60 @@ def process_row( elif row.get("visibility"): addProperties["isVis"] = normalize_condition(row.get("visibility")) + # Add custom validation type note and choices notes if present + if input_value_notes: + item_data.setdefault("additionalNotesObj", []).append( + input_value_notes + ) + if choices_notes: + item_data.setdefault("additionalNotesObj", []).append(choices_notes) + return item_data, preamble_info_propagate, compute, addProperties -def process_csv(csv_file) -> (Dict[str, Any], list): +def process_csv(csv_file, encoding=None) -> (Dict[str, Any], list): + """ + Process a REDCap CSV file and extract structured data for items and activities. + + Args: + csv_file: Path to the REDCap CSV file + encoding (str, optional): Specific encoding to use for the CSV file + + Returns: + tuple: (activities, protocol_activities_order) + activities: Dictionary containing activity data + protocol_activities_order: List of activity names in order + """ + if encoding: + try: + df = pd.read_csv(csv_file, encoding=encoding, low_memory=False) + print(f"Using specified encoding: {encoding}") + except UnicodeDecodeError: + raise ValueError( + f"Failed to read CSV with specified encoding: {encoding}" + ) + else: + # Try multiple encodings in order + encodings = ["utf-8-sig", "latin-1", "windows-1252", "cp1252"] + df = None - df = pd.read_csv( - csv_file, encoding="utf-8-sig", low_memory=False - ) # utf-8-sig handles BOM automatically + for encoding in encodings: + try: + df = pd.read_csv(csv_file, encoding=encoding, low_memory=False) + print(f"Successfully read CSV with {encoding} encoding") + break + except UnicodeDecodeError: + print( + f"Failed to decode with {encoding}, trying next encoding..." + ) - df.columns = df.columns.map( - lambda x: x.strip().strip('"') - ) # some cleaning might not be needed + if df is None: + raise ValueError( + "Failed to read CSV file with any of the attempted encodings. " + "Please check the file encoding or convert it to UTF-8." + ) - # Clean NaNs values in the dataframe + df.columns = df.columns.map(lambda x: x.strip().strip('"')) df = df.astype(str).replace("nan", "") # Validate required columns @@ -496,7 +578,7 @@ def process_csv(csv_file) -> (Dict[str, Any], list): def redcap2reproschema( - csv_file, yaml_file, output_path, schema_context_url=None + csv_file, yaml_file, output_path, schema_context_url=None, encoding=None ): """ Convert a REDCap data dictionary to Reproschema format. @@ -506,6 +588,7 @@ def redcap2reproschema( yaml_file (str/Path): Path to the YAML configuration file output_path (str/Path): Path to the output directory schema_context_url (str, optional): URL for the schema context + encoding (str, optional): Specific encoding to use for the CSV file Raises: ValueError: If required files are missing or invalid @@ -533,8 +616,8 @@ def redcap2reproschema( if schema_context_url is None: schema_context_url = CONTEXTFILE_URL - # Process the CSV file and getting information about the activities and items - activities, prot_activities_order = process_csv(csv_path) + # Process the CSV file with the specified encoding + activities, prot_activities_order = process_csv(csv_path, encoding) for activity_name, activity_data in activities.items(): create_activity_schema(