|
| 1 | +import textwrap |
| 2 | +from typing import Optional |
| 3 | +from urllib.parse import urlparse |
| 4 | + |
| 5 | +from abnf.grammars.misc import load_grammar_rulelist |
| 6 | +from abnf.parser import Rule as _Rule, ParseError, NodeVisitor, Node |
| 7 | + |
| 8 | +from checks.tasks.shared import TranslatableTechTableItem, validate_email |
| 9 | + |
| 10 | + |
| 11 | +class CAAParseError(ValueError): |
| 12 | + """ |
| 13 | + Error thrown for any CAA parsing issues, |
| 14 | + with a message ID and context dict. |
| 15 | + """ |
| 16 | + |
| 17 | + def __init__(self, msg_id: str, context: dict[str, str]): |
| 18 | + self.msg_id = msg_id |
| 19 | + self.context = context |
| 20 | + |
| 21 | + def to_translatable_tech_table_item(self): |
| 22 | + return TranslatableTechTableItem(self.msg_id, self.context) |
| 23 | + |
| 24 | + |
| 25 | +def node_get_named_child_value(node: Node, name: str) -> Optional[str]: |
| 26 | + """Search an ABNF tree from a node, for a node with a certain name, return the value of first match.""" |
| 27 | + queue = [node] |
| 28 | + while queue: |
| 29 | + n, queue = queue[0], queue[1:] |
| 30 | + if n.name == name: |
| 31 | + return n.value |
| 32 | + else: |
| 33 | + queue.extend(n.children) |
| 34 | + return None |
| 35 | + |
| 36 | + |
| 37 | +# https://www.iana.org/assignments/acme/acme.xhtml#acme-validation-methods |
| 38 | +ACME_VALIDATION_METHODS = { |
| 39 | + "http-01", |
| 40 | + "dns-01", |
| 41 | + "http-01", |
| 42 | + "tls-alpn-01", |
| 43 | + "tls-alpn-01", |
| 44 | + "email-reply-00", |
| 45 | + "tkauth-01", |
| 46 | +} |
| 47 | + |
| 48 | +# RFC 8657 4 |
| 49 | +ACME_VALIDATION_CUSTOM_PREFIX = "ca-" |
| 50 | + |
| 51 | + |
| 52 | +@load_grammar_rulelist() |
| 53 | +class CAAValidationMethodsGrammar(_Rule): |
| 54 | + """ |
| 55 | + Grammar for validationmethods CAA parameter to the issue/issuewild property. |
| 56 | + Per RFC8657 4 |
| 57 | + """ |
| 58 | + |
| 59 | + grammar = textwrap.dedent( |
| 60 | + """ |
| 61 | + value = [*(label ",") label] |
| 62 | + label = 1*(ALPHA / DIGIT / "-") |
| 63 | + """ |
| 64 | + ) |
| 65 | + |
| 66 | + |
| 67 | +def validate_issue_validation_methods(parameter_value: str) -> set[str]: |
| 68 | + """ |
| 69 | + Validate the validationmethods parameter value for the issue/issuewild CAA property. |
| 70 | + Called from the issue/issuewild parser, once it extracted validationmethods. |
| 71 | + """ |
| 72 | + parse_result = CAAValidationMethodsGrammar("value").parse_all(parameter_value) |
| 73 | + # Careful: terms label/value are used as properties of the parse tree, but also as properties |
| 74 | + # in the original ABNF grammer, in opposite roles. Not confusing at all. |
| 75 | + validation_methods = {label.value for label in parse_result.children if label.name == "label"} |
| 76 | + for validation_method in validation_methods: |
| 77 | + if validation_method not in ACME_VALIDATION_METHODS and not validation_method.startswith( |
| 78 | + ACME_VALIDATION_CUSTOM_PREFIX |
| 79 | + ): |
| 80 | + raise CAAParseError(msg_id="invalid_property_issue_validation_method", context={"value": parameter_value}) |
| 81 | + return validation_methods |
| 82 | + |
| 83 | + |
| 84 | +@load_grammar_rulelist() |
| 85 | +class CAAPropertyIssueGrammar(_Rule): |
| 86 | + """ |
| 87 | + Grammar for issue/issuewild CAA property values. |
| 88 | + Per RFC8659 4.2 |
| 89 | + """ |
| 90 | + |
| 91 | + grammar = textwrap.dedent( |
| 92 | + """ |
| 93 | + issue-value = *WSP [issuer-domain-name *WSP] |
| 94 | + [";" *WSP [parameters *WSP]] |
| 95 | +
|
| 96 | + issuer-domain-name = label *("." label) |
| 97 | + label = (ALPHA / DIGIT) *( *("-") (ALPHA / DIGIT)) |
| 98 | +
|
| 99 | + parameters = (parameter *WSP ";" *WSP parameters) / parameter |
| 100 | + parameter = tag *WSP "=" *WSP value |
| 101 | + tag = (ALPHA / DIGIT) *( *("-") (ALPHA / DIGIT)) |
| 102 | + value = *(%x21-3A / %x3C-7E) |
| 103 | + """ |
| 104 | + ) |
| 105 | + |
| 106 | + |
| 107 | +class CAAPropertyIssueVisitor(NodeVisitor): |
| 108 | + """ |
| 109 | + This ABNF visitor walks over the tree of a CAAPropertyIssueGrammar |
| 110 | + parsed value to extract field values in a relatively simple way. |
| 111 | + """ |
| 112 | + |
| 113 | + def __init__(self): |
| 114 | + super().__init__() |
| 115 | + self.issuer_domain_name = None |
| 116 | + self.parameters = {} |
| 117 | + |
| 118 | + def visit_issue_value(self, node): |
| 119 | + for child_node in node.children: |
| 120 | + self.visit(child_node) |
| 121 | + |
| 122 | + def visit_issuer_domain_name(self, node): |
| 123 | + self.issuer_domain_name = node.value |
| 124 | + |
| 125 | + def visit_parameters(self, node): |
| 126 | + for child_node in node.children: |
| 127 | + self.visit(child_node) |
| 128 | + |
| 129 | + def visit_parameter(self, node): |
| 130 | + tag = node_get_named_child_value(node, "tag") |
| 131 | + value = node_get_named_child_value(node, "value") |
| 132 | + self.parameters[tag] = value |
| 133 | + |
| 134 | + |
| 135 | +def validate_property_issue(value: str): |
| 136 | + """Validate the value of issue/issuewild, using the ABNF grammar/visitor.""" |
| 137 | + parse_result = CAAPropertyIssueGrammar("issue-value").parse_all(value) |
| 138 | + visitor = CAAPropertyIssueVisitor() |
| 139 | + visitor.visit(parse_result) |
| 140 | + if "validationmethods" in visitor.parameters: |
| 141 | + validate_issue_validation_methods(visitor.parameters["validationmethods"]) |
| 142 | + |
| 143 | + |
| 144 | +def validate_property_iodef(value: str): |
| 145 | + """Validate iodef value per RFC8659 4.4""" |
| 146 | + try: |
| 147 | + url = urlparse(value) |
| 148 | + except ValueError: |
| 149 | + raise CAAParseError(msg_id="invalid_property_iodef_value", context={"value": value}) |
| 150 | + if url.scheme in ["http", "https"]: |
| 151 | + # RFC8659 refers to RFC6546, which is unclear on requirements. Let's assume a netloc is needed. |
| 152 | + if not url.netloc: |
| 153 | + raise CAAParseError(msg_id="invalid_property_iodef_value", context={"value": value}) |
| 154 | + elif url.scheme == "mailto": |
| 155 | + if not validate_email(url.path): |
| 156 | + raise CAAParseError(msg_id="invalid_property_iodef_value", context={"value": value}) |
| 157 | + else: |
| 158 | + raise CAAParseError(msg_id="invalid_property_iodef_value", context={"value": value}) |
| 159 | + |
| 160 | + |
| 161 | +def validate_property_contactemail(value: str): |
| 162 | + """Validate contactemail per CAB BR 1.6.3, requiring a single RFC 6532 3.2 address.""" |
| 163 | + if not validate_email(value): |
| 164 | + raise CAAParseError(msg_id="invalid_property_contactemail_value", context={"value": value}) |
| 165 | + |
| 166 | + |
| 167 | +@load_grammar_rulelist() |
| 168 | +class PhoneNumberRule(_Rule): |
| 169 | + """ |
| 170 | + Grammar for phone numbers per RFC3966. |
| 171 | + Includes https://www.rfc-editor.org/errata/eid203 |
| 172 | + local-number-digits and its dependencies were stripped out, |
| 173 | + as the ABNF parser had issues with it, and they are not used by us now. |
| 174 | + """ |
| 175 | + |
| 176 | + grammar = textwrap.dedent( |
| 177 | + """ |
| 178 | + telephone-uri = "tel:" telephone-subscriber |
| 179 | + telephone-subscriber = global-number |
| 180 | + global-number = global-number-digits *par |
| 181 | + par = parameter / extension / isdn-subaddress |
| 182 | + isdn-subaddress = ";isub=" 1*uric |
| 183 | + extension = ";ext=" 1*phonedigit |
| 184 | + context = ";phone-context=" descriptor |
| 185 | + descriptor = domainname / global-number-digits |
| 186 | + global-number-digits = "+" *phonedigit DIGIT *phonedigit |
| 187 | + domainname = *( domainlabel "." ) toplabel [ "." ] |
| 188 | + domainlabel = alphanum |
| 189 | + / alphanum *( alphanum / "-" ) alphanum |
| 190 | + toplabel = ALPHA / ALPHA *( alphanum / "-" ) alphanum |
| 191 | + parameter = ";" pname ["=" pvalue ] |
| 192 | + pname = 1*( alphanum / "-" ) |
| 193 | + pvalue = 1*paramchar |
| 194 | + paramchar = param-unreserved / unreserved / pct-encoded |
| 195 | + unreserved = alphanum / mark |
| 196 | + mark = "-" / "_" / "." / "!" / "~" / "*" / |
| 197 | + "'" / "(" / ")" |
| 198 | + pct-encoded = "%" HEXDIG HEXDIG |
| 199 | + param-unreserved = "[" / "]" / "/" / ":" / "&" / "+" / "$" |
| 200 | + phonedigit = DIGIT / [ visual-separator ] |
| 201 | + phonedigit-hex = HEXDIG / "*" / "#" / [ visual-separator ] |
| 202 | + visual-separator = "-" / "." / "(" / ")" |
| 203 | + alphanum = ALPHA / DIGIT |
| 204 | + reserved = ";" / "/" / "?" / ":" / "@" / "&" / |
| 205 | + "=" / "+" / "$" / "," |
| 206 | + uric = reserved / unreserved / pct-encoded |
| 207 | + """ |
| 208 | + ) |
| 209 | + |
| 210 | + |
| 211 | +def validate_property_contactphone(value: str): |
| 212 | + """Validate contactphone per CAB SC014, requiring an RFC3966 5.1.4 global number.""" |
| 213 | + parse_result = PhoneNumberRule("global-number").parse_all(value) |
| 214 | + if not parse_result: |
| 215 | + raise CAAParseError(msg_id="invalid_property_contactphone_value", context={"value": value}) |
| 216 | + |
| 217 | + |
| 218 | +@load_grammar_rulelist() |
| 219 | +class CAAPropertyIssueMailRule(_Rule): |
| 220 | + """ |
| 221 | + Grammar for CAA issuemail property per RFC9495. |
| 222 | + """ |
| 223 | + |
| 224 | + grammar = textwrap.dedent( |
| 225 | + """ |
| 226 | + issuemail-value = *WSP [issuer-domain-name *WSP] |
| 227 | + [";" *WSP [parameters *WSP]] |
| 228 | +
|
| 229 | + issuer-domain-name = label *("." label) |
| 230 | + label = (ALPHA / DIGIT) *( *("-") (ALPHA / DIGIT)) |
| 231 | +
|
| 232 | + parameters = (parameter *WSP ";" *WSP parameters) / parameter |
| 233 | + parameter = tag *WSP "=" *WSP value |
| 234 | + tag = (ALPHA / DIGIT) *( *("-") (ALPHA / DIGIT)) |
| 235 | + value = *(%x21-3A / %x3C-7E) |
| 236 | + """ |
| 237 | + ) |
| 238 | + |
| 239 | + |
| 240 | +def validate_property_issuemail(value: str): |
| 241 | + """Validate issuemail property per RFC9495.""" |
| 242 | + parse_result = CAAPropertyIssueMailRule("issuemail-value").parse_all(value) |
| 243 | + if not parse_result: |
| 244 | + raise CAAParseError(msg_id="invalid_property_issuemail_value", context={"value": value}) |
| 245 | + |
| 246 | + |
| 247 | +def validate_flags(flags: int): |
| 248 | + """Validate the flags per RFC8659 4.1, i.e. only allow 0/128""" |
| 249 | + if flags not in [0, 128]: |
| 250 | + raise CAAParseError(msg_id="invalid_flags_reserved_bits", context={"value": str(flags)}) |
| 251 | + |
| 252 | + |
| 253 | +# https://www.iana.org/assignments/pkix-parameters/pkix-parameters.xhtml#caa-properties |
| 254 | +CAA_PROPERTY_VALIDATORS = { |
| 255 | + "issue": validate_property_issue, |
| 256 | + "issuewild": validate_property_issue, |
| 257 | + "iodef": validate_property_iodef, |
| 258 | + "auth": None, |
| 259 | + "path": None, |
| 260 | + "policy": None, |
| 261 | + "contactemail": validate_property_contactemail, |
| 262 | + "contactphone": validate_property_contactphone, |
| 263 | + "issuevmc": validate_property_issue, |
| 264 | + "issuemail": validate_property_issuemail, |
| 265 | +} |
| 266 | + |
| 267 | + |
| 268 | +def validate_caa_record(flags: int, tag: str, value: str) -> None: |
| 269 | + """ |
| 270 | + Validate a CAA record. |
| 271 | + Returns None if the record is valid, raises CAAParseError when invalid. |
| 272 | + """ |
| 273 | + validate_flags(flags) |
| 274 | + try: |
| 275 | + validator = CAA_PROPERTY_VALIDATORS[tag.lower()] |
| 276 | + if validator is None: |
| 277 | + raise CAAParseError(msg_id="invalid_reserved_property", context={"value": tag}) |
| 278 | + validator(value) |
| 279 | + except ParseError as e: |
| 280 | + raise CAAParseError( |
| 281 | + msg_id="invalid_property_syntax", |
| 282 | + context={ |
| 283 | + "property_name": tag, |
| 284 | + "property_value": value, |
| 285 | + "invalid_character_position": e.start, |
| 286 | + "invalid_character": value[e.start], |
| 287 | + }, |
| 288 | + ) |
| 289 | + except KeyError: |
| 290 | + raise CAAParseError(msg_id="invalid_unknown_property", context={"value": tag}) |
0 commit comments