Skip to content

Commit aa40662

Browse files
authored
Merge pull request #22 from IdentityPython/fix/self_signing_cert
Self signing cert dynamic attributes
2 parents 68f001d + 4483d79 commit aa40662

15 files changed

+395
-179
lines changed

pymdoccbor/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.9.0"
1+
__version__ = "1.0.0"

pymdoccbor/mdoc/issuer.py

Lines changed: 95 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
from datetime import datetime, timezone
66
from cryptography.hazmat.primitives import serialization
77
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
8+
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
9+
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
810
from pycose.keys import CoseKey, EC2Key
911
from typing import Union
1012

@@ -23,14 +25,15 @@ class MdocCborIssuer:
2325
"""
2426
def __init__(
2527
self,
26-
key_label: str = None,
27-
user_pin: str = None,
28-
lib_path: str = None,
29-
slot_id: int = None,
28+
key_label: str | None = None,
29+
user_pin: str | None = None,
30+
lib_path: str | None = None,
31+
slot_id: int | None = None,
3032
hsm: bool = False,
31-
alg: str = None,
32-
kid: str = None,
33+
alg: str | None = None,
34+
kid: str | None = None,
3335
private_key: Union[dict, CoseKey] = {},
36+
cert_info: dict | None = None,
3437
):
3538
"""
3639
Initialize a new MdocCborIssuer
@@ -67,16 +70,17 @@ def __init__(
6770
self.hsm = hsm
6871
self.alg = alg
6972
self.kid = kid
73+
self.cert_info = cert_info
7074

7175
def new(
7276
self,
7377
data: dict,
7478
doctype: str,
75-
validity: dict = None,
76-
devicekeyinfo: Union[dict, CoseKey, str] = None,
77-
cert_path: str = None,
78-
revocation: dict = None,
79-
status: dict = None
79+
validity: dict | None = None,
80+
devicekeyinfo: dict | CoseKey | str | None = None,
81+
cert_path: str | None = None,
82+
revocation: dict | None = None,
83+
status: dict | None = None
8084
) -> dict:
8185
"""
8286
create a new mdoc with signed mso
@@ -93,49 +97,86 @@ def new(
9397
"""
9498
if isinstance(devicekeyinfo, dict):
9599
devicekeyinfoCoseKeyObject = CoseKey.from_dict(devicekeyinfo)
96-
devicekeyinfo = {
97-
1: devicekeyinfoCoseKeyObject.kty.identifier,
98-
-1: devicekeyinfoCoseKeyObject.crv.identifier,
99-
-2: devicekeyinfoCoseKeyObject.x,
100-
-3: devicekeyinfoCoseKeyObject.y,
101-
}
100+
if devicekeyinfoCoseKeyObject.kty.identifier == 2: # EC2Key
101+
devicekeyinfo = {
102+
1: devicekeyinfoCoseKeyObject.kty.identifier,
103+
-1: devicekeyinfoCoseKeyObject.crv.identifier,
104+
-2: devicekeyinfoCoseKeyObject.x,
105+
-3: devicekeyinfoCoseKeyObject.y,
106+
}
107+
elif devicekeyinfoCoseKeyObject.kty.identifier == 1: # OKPKey
108+
devicekeyinfo = {
109+
1: devicekeyinfoCoseKeyObject.kty.identifier,
110+
-1: devicekeyinfoCoseKeyObject.crv.identifier,
111+
-2: devicekeyinfoCoseKeyObject.x,
112+
}
113+
elif devicekeyinfoCoseKeyObject.kty.identifier == 3: # RSAKey
114+
devicekeyinfo = {
115+
1: devicekeyinfoCoseKeyObject.kty.identifier,
116+
-1: devicekeyinfoCoseKeyObject.n,
117+
-2: devicekeyinfoCoseKeyObject.e,
118+
}
119+
else:
120+
raise TypeError("Unsupported key type in devicekeyinfo")
102121
if isinstance(devicekeyinfo, str):
103122
device_key_bytes = base64.urlsafe_b64decode(devicekeyinfo.encode("utf-8"))
104-
public_key:EllipticCurvePublicKey = serialization.load_pem_public_key(device_key_bytes)
105-
curve_name = public_key.curve.name
106-
curve_map = {
107-
"secp256r1": 1, # NIST P-256
108-
"secp384r1": 2, # NIST P-384
109-
"secp521r1": 3, # NIST P-521
110-
"brainpoolP256r1": 8, # Brainpool P-256
111-
"brainpoolP384r1": 9, # Brainpool P-384
112-
"brainpoolP512r1": 10, # Brainpool P-512
113-
# Add more curve mappings as needed
114-
}
115-
curve_identifier = curve_map.get(curve_name)
116-
117-
# Extract the x and y coordinates from the public key
118-
x = public_key.public_numbers().x.to_bytes(
119-
(public_key.public_numbers().x.bit_length() + 7)
120-
// 8, # Number of bytes needed
121-
"big", # Byte order
122-
)
123+
public_key = serialization.load_pem_public_key(device_key_bytes)
124+
125+
if isinstance(public_key, EllipticCurvePublicKey):
126+
curve_name = public_key.curve.name
127+
curve_map = {
128+
"secp256r1": 1, # NIST P-256
129+
"secp384r1": 2, # NIST P-384
130+
"secp521r1": 3, # NIST P-521
131+
"brainpoolP256r1": 8, # Brainpool P-256
132+
"brainpoolP384r1": 9, # Brainpool P-384
133+
"brainpoolP512r1": 10, # Brainpool P-512
134+
# Add more curve mappings as needed
135+
}
136+
curve_identifier = curve_map.get(curve_name)
123137

124-
y = public_key.public_numbers().y.to_bytes(
125-
(public_key.public_numbers().y.bit_length() + 7)
126-
// 8, # Number of bytes needed
127-
"big", # Byte order
128-
)
138+
# Extract the x and y coordinates from the public key
139+
x = public_key.public_numbers().x.to_bytes(
140+
(public_key.public_numbers().x.bit_length() + 7)
141+
// 8, # Number of bytes needed
142+
"big", # Byte order
143+
)
129144

130-
devicekeyinfo = {
131-
1: 2,
132-
-1: curve_identifier,
133-
-2: x,
134-
-3: y,
135-
}
145+
y = public_key.public_numbers().y.to_bytes(
146+
(public_key.public_numbers().y.bit_length() + 7)
147+
// 8, # Number of bytes needed
148+
"big", # Byte order
149+
)
136150

137-
else:
138-
devicekeyinfo: CoseKey = devicekeyinfo
151+
devicekeyinfo = {
152+
1: 2,
153+
-1: curve_identifier,
154+
-2: x,
155+
-3: y,
156+
}
157+
elif isinstance(public_key, Ed25519PublicKey):
158+
devicekeyinfo = {
159+
1: 1, # OKPKey
160+
-1: "Ed25519", # Curve identifier for Ed25519
161+
-2: public_key.public_bytes(
162+
encoding=serialization.Encoding.Raw,
163+
format=serialization.PublicFormat.Raw
164+
)
165+
}
166+
elif isinstance(public_key, RSAPublicKey):
167+
devicekeyinfo = {
168+
1: 3, # RSAKey
169+
-1: public_key.public_numbers().n.to_bytes(
170+
(public_key.public_numbers().n.bit_length() + 7) // 8,
171+
"big"
172+
),
173+
-2: public_key.public_numbers().e.to_bytes(
174+
(public_key.public_numbers().e.bit_length() + 7) // 8,
175+
"big"
176+
)
177+
}
178+
else:
179+
raise TypeError("Loaded public key is not an EllipticCurvePublicKey")
139180

140181
if self.hsm:
141182
msoi = MsoIssuer(
@@ -149,7 +190,8 @@ def new(
149190
alg=self.alg,
150191
kid=self.kid,
151192
validity=validity,
152-
revocation=revocation
193+
revocation=revocation,
194+
cert_info=self.cert_info
153195
)
154196

155197
else:
@@ -159,10 +201,11 @@ def new(
159201
alg=self.alg,
160202
cert_path=cert_path,
161203
validity=validity,
162-
revocation=revocation
204+
revocation=revocation,
205+
cert_info=self.cert_info
163206
)
164207

165-
mso = msoi.sign(doctype=doctype, device_key=devicekeyinfo,valid_from=datetime.now(timezone.utc))
208+
mso = msoi.sign(doctype=doctype, device_key=devicekeyinfo, valid_from=datetime.now(timezone.utc))
166209

167210
mso_cbor = mso.encode(
168211
tag=False,

pymdoccbor/mso/issuer.py

Lines changed: 51 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -5,27 +5,24 @@
55
import uuid
66
import logging
77

8-
logger = logging.getLogger("pymdoccbor")
9-
10-
from pycose.headers import Algorithm #, KID
11-
from pycose.keys import CoseKey, EC2Key
8+
from pycose.keys import CoseKey
9+
from pycose.headers import Algorithm
1210
from pycose.messages import Sign1Message
1311

1412
from typing import Union
1513

1614
from pymdoccbor.exceptions import MsoPrivateKeyRequired
1715
from pymdoccbor import settings
18-
from pymdoccbor.x509 import MsoX509Fabric
16+
from pymdoccbor.x509 import selfsigned_x509cert
1917
from pymdoccbor.tools import shuffle_dict
2018
from cryptography import x509
2119
from cryptography.hazmat.primitives import serialization
2220
from cryptography.x509 import Certificate
2321

2422

25-
from cbor_diag import *
26-
23+
logger = logging.getLogger("pymdoccbor")
2724

28-
class MsoIssuer(MsoX509Fabric):
25+
class MsoIssuer:
2926
"""
3027
MsoIssuer helper class to create a new mso
3128
"""
@@ -34,17 +31,18 @@ def __init__(
3431
self,
3532
data: dict,
3633
validity: dict,
37-
cert_path: str = None,
38-
key_label: str = None,
39-
user_pin: str = None,
40-
lib_path: str = None,
41-
slot_id: int = None,
42-
kid: str = None,
43-
alg: str = None,
44-
hsm: bool = False,
45-
private_key: Union[dict, CoseKey] = None,
46-
digest_alg: str = settings.PYMDOC_HASHALG,
47-
revocation: dict = None
34+
cert_path: str | None = None,
35+
key_label: str | None = None,
36+
user_pin: str | None = None,
37+
lib_path: str | None = None,
38+
slot_id: int | None = None,
39+
kid: str | None = None,
40+
alg: str | None = None,
41+
hsm: bool | None = False,
42+
private_key: dict | CoseKey | None = None,
43+
digest_alg: str | None = settings.PYMDOC_HASHALG,
44+
revocation: dict | None = None,
45+
cert_info: dict | None = None,
4846
) -> None:
4947
"""
5048
Initialize a new MsoIssuer
@@ -64,17 +62,17 @@ def __init__(
6462
:param revocation: dict: revocation status dict to include in the mso, it may include status_list and identifier_list keys
6563
"""
6664

67-
if not hsm:
68-
if private_key:
69-
if isinstance(private_key, dict):
70-
self.private_key = CoseKey.from_dict(private_key)
71-
if not self.private_key.kid:
72-
self.private_key.kid = str(uuid.uuid4())
73-
elif isinstance(private_key, CoseKey):
74-
self.private_key = private_key
75-
else:
76-
raise ValueError("private_key must be a dict or CoseKey object")
65+
if private_key:
66+
if isinstance(private_key, dict):
67+
self.private_key = CoseKey.from_dict(private_key)
68+
if not self.private_key.kid:
69+
self.private_key.kid = str(uuid.uuid4())
70+
elif isinstance(private_key, CoseKey):
71+
self.private_key = private_key
7772
else:
73+
raise ValueError("private_key must be a dict or CoseKey object")
74+
else:
75+
if not hsm:
7876
raise MsoPrivateKeyRequired("MSO Writer requires a valid private key")
7977

8078
if not validity:
@@ -85,9 +83,8 @@ def __init__(
8583

8684
self.data: dict = data
8785
self.hash_map: dict = {}
88-
self.cert_path = cert_path
8986
self.disclosure_map: dict = {}
90-
self.digest_alg: str = digest_alg
87+
self.digest_alg = digest_alg
9188
self.key_label = key_label
9289
self.user_pin = user_pin
9390
self.lib_path = lib_path
@@ -98,9 +95,20 @@ def __init__(
9895
self.validity = validity
9996
self.revocation = revocation
10097

98+
self.cert_path = cert_path
99+
self.cert_info = cert_info
100+
101+
if not self.cert_path and (not self.cert_info or not self.private_key):
102+
raise ValueError(
103+
"cert_path or cert_info with a private key must be provided to properly insert a certificate"
104+
)
105+
101106
alg_map = {"ES256": "sha256", "ES384": "sha384", "ES512": "sha512"}
102107

103-
hashfunc = getattr(hashlib, alg_map.get(self.alg))
108+
if self.alg not in alg_map:
109+
raise ValueError(f"Unsupported algorithm: {self.alg}")
110+
111+
hashfunc = getattr(hashlib, alg_map[self.alg])
104112

105113
digest_cnt = 0
106114
for ns, values in data.items():
@@ -157,9 +165,9 @@ def format_datetime_repr(self, dt: datetime.datetime) -> str:
157165

158166
def sign(
159167
self,
160-
device_key: Union[dict, None] = None,
161-
valid_from: Union[None, datetime.datetime] = None,
162-
doctype: str = None,
168+
device_key: dict | None = None,
169+
valid_from: datetime.datetime | None = None,
170+
doctype: str | None = None,
163171
) -> Sign1Message:
164172
"""
165173
Sign a mso and returns it
@@ -230,7 +238,14 @@ def sign(
230238
raise Exception(f"Certificate at {self.cert_path} failed parse")
231239
_cert = cert.public_bytes(getattr(serialization.Encoding, "DER"))
232240
else:
233-
_cert = self.selfsigned_x509cert()
241+
if not self.cert_info:
242+
raise ValueError("cert_info must be provided if cert_path is not set")
243+
244+
logger.warning(
245+
"A self-signed certificate will be created using the provided cert_info but this is not recommended for production use."
246+
)
247+
248+
_cert = selfsigned_x509cert(self.cert_info, self.private_key)
234249

235250
if self.hsm:
236251
# print("payload diganostic notation: \n",cbor2diag(cbor2.dumps(cbor2.CBORTag(24, cbor2.dumps(payload)))))

pymdoccbor/settings.py

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
1-
import datetime
21
import os
32

4-
from datetime import timezone
5-
63
COSEKEY_HAZMAT_CRV_MAP = {
74
"secp256r1": "P_256",
85
"secp384r1": "P_384",
@@ -23,30 +20,8 @@
2320

2421
DIGEST_SALT_LENGTH = 32
2522

26-
2723
X509_DER_CERT = os.getenv("X509_DER_CERT", None)
2824

29-
# OR
30-
31-
X509_COUNTRY_NAME = os.getenv('X509_COUNTRY_NAME', "US")
32-
X509_STATE_OR_PROVINCE_NAME = os.getenv('X509_STATE_OR_PROVINCE_NAME', "California")
33-
X509_LOCALITY_NAME = os.getenv('X509_LOCALITY_NAME', "San Francisco")
34-
X509_ORGANIZATION_NAME = os.getenv('X509_ORGANIZATION_NAME', "My Company")
35-
X509_COMMON_NAME = os.getenv('X509_COMMON_NAME', "mysite.com")
36-
37-
X509_NOT_VALID_BEFORE = os.getenv('X509_NOT_VALID_BEFORE', datetime.datetime.now(timezone.utc))
38-
X509_NOT_VALID_AFTER_DAYS = os.getenv('X509_NOT_VALID_AFTER_DAYS', 10)
39-
X509_NOT_VALID_AFTER = os.getenv(
40-
'X509_NOT_VALID_AFTER',
41-
datetime.datetime.now(timezone.utc) + datetime.timedelta(
42-
days=X509_NOT_VALID_AFTER_DAYS
43-
)
44-
)
45-
46-
X509_SAN_URL = os.getenv(
47-
"X509_SAN_URL", "https://credential-issuer.example.org"
48-
)
49-
5025
CBORTAGS_ATTR_MAP = {
5126
"birth_date": 1004,
5227
"expiry_date": 1004,

0 commit comments

Comments
 (0)