|
4 | 4 | import http.client |
5 | 5 | import json |
6 | 6 | import os |
| 7 | +import secrets |
7 | 8 | import ssl |
| 9 | +import string |
8 | 10 | import subprocess |
9 | 11 | import sys |
10 | 12 | import threading |
11 | 13 | import time |
12 | 14 | import traceback |
| 15 | +import uuid |
| 16 | +from pathlib import Path |
13 | 17 |
|
14 | 18 | import pytest |
15 | 19 |
|
16 | 20 | import OpenSSL.SSL |
17 | 21 | import requests |
18 | 22 | import trustme |
| 23 | +from cryptography.hazmat.backends import default_backend |
| 24 | +from cryptography.hazmat.primitives.serialization import ( |
| 25 | + BestAvailableEncryption, |
| 26 | + Encoding, |
| 27 | + PrivateFormat, |
| 28 | + load_pem_private_key, |
| 29 | +) |
19 | 30 |
|
20 | 31 | from .._compat import ( |
21 | 32 | IS_ABOVE_OPENSSL10, |
|
29 | 40 | ntob, |
30 | 41 | ntou, |
31 | 42 | ) |
32 | | -from ..server import HTTPServer, get_ssl_adapter_class |
| 43 | +from ..server import Gateway, HTTPServer, get_ssl_adapter_class |
33 | 44 | from ..testing import ( |
34 | 45 | ANY_INTERFACE_IPV4, |
35 | 46 | ANY_INTERFACE_IPV6, |
@@ -118,6 +129,14 @@ def make_tls_http_server(bind_addr, ssl_adapter, request): |
118 | 129 | return httpserver |
119 | 130 |
|
120 | 131 |
|
| 132 | +@pytest.fixture |
| 133 | +def private_key_password(): |
| 134 | + """Provide random 10 character password for private key.""" |
| 135 | + return ''.join( |
| 136 | + secrets.choice(string.ascii_letters + string.digits) for _ in range(10) |
| 137 | + ) |
| 138 | + |
| 139 | + |
121 | 140 | @pytest.fixture |
122 | 141 | def tls_http_server(request): |
123 | 142 | """Provision a server creator as a fixture.""" |
@@ -158,6 +177,36 @@ def tls_certificate_private_key_pem_path(tls_certificate): |
158 | 177 | yield cert_key_pem |
159 | 178 |
|
160 | 179 |
|
| 180 | +@pytest.fixture |
| 181 | +def tls_certificate_passwd_private_key_pem_path( |
| 182 | + tls_certificate, |
| 183 | + private_key_password, |
| 184 | +): |
| 185 | + """Provide a certificate private key PEM file path via fixture.""" |
| 186 | + key_as_bytes = tls_certificate.private_key_pem.bytes() |
| 187 | + private_key_object = load_pem_private_key( |
| 188 | + key_as_bytes, |
| 189 | + password=None, |
| 190 | + backend=default_backend(), |
| 191 | + ) |
| 192 | + encrypted_key_as_bytes = private_key_object.private_bytes( |
| 193 | + encoding=Encoding.PEM, |
| 194 | + format=PrivateFormat.PKCS8, |
| 195 | + encryption_algorithm=BestAvailableEncryption( |
| 196 | + password=private_key_password.encode('utf-8'), |
| 197 | + ), |
| 198 | + ) |
| 199 | + |
| 200 | + with tls_certificate.private_key_pem.tempfile() as tf: |
| 201 | + keyfile_temp_path = Path(tf) |
| 202 | + keyfile_temp_path.write_bytes(encrypted_key_as_bytes) |
| 203 | + |
| 204 | + yield keyfile_temp_path |
| 205 | + |
| 206 | + # removes the temporary file in teardown |
| 207 | + Path.unlink(keyfile_temp_path, missing_ok=True) |
| 208 | + |
| 209 | + |
161 | 210 | def _thread_except_hook(exceptions, args): |
162 | 211 | """Append uncaught exception ``args`` in threads to ``exceptions``.""" |
163 | 212 | if issubclass(args.exc_type, SystemExit): |
@@ -726,3 +775,96 @@ def test_http_over_https_error( |
726 | 775 | 'The underlying error is {underlying_error!r}'.format(**locals()) |
727 | 776 | ) |
728 | 777 | assert expected_error_text in err_text |
| 778 | + |
| 779 | + |
| 780 | +@pytest.mark.parametrize( |
| 781 | + 'adapter_type', |
| 782 | + ( |
| 783 | + 'builtin', |
| 784 | + 'pyopenssl', |
| 785 | + ) |
| 786 | + * 2, |
| 787 | +) |
| 788 | +def test_ssl_adapters_with_private_key_password( |
| 789 | + private_key_password, |
| 790 | + tls_certificate_chain_pem_path, |
| 791 | + tls_certificate_passwd_private_key_pem_path, |
| 792 | + adapter_type, |
| 793 | +): |
| 794 | + """Check that server starts using ssl adapter with password-protected private key.""" |
| 795 | + httpserver = HTTPServer( |
| 796 | + bind_addr=(ANY_INTERFACE_IPV4, EPHEMERAL_PORT), |
| 797 | + gateway=Gateway, |
| 798 | + ) |
| 799 | + |
| 800 | + tls_adapter_cls = get_ssl_adapter_class(name=adapter_type) |
| 801 | + tls_adapter = tls_adapter_cls( |
| 802 | + certificate=tls_certificate_chain_pem_path, |
| 803 | + private_key=tls_certificate_passwd_private_key_pem_path, |
| 804 | + private_key_password=secrets.choice( |
| 805 | + [ |
| 806 | + private_key_password, |
| 807 | + private_key_password.encode('utf-8'), |
| 808 | + ], |
| 809 | + ), |
| 810 | + ) |
| 811 | + |
| 812 | + httpserver.ssl_adapter = tls_adapter |
| 813 | + httpserver.prepare() |
| 814 | + |
| 815 | + assert httpserver.ready |
| 816 | + assert httpserver.requests._threads |
| 817 | + for thr in httpserver.requests._threads: |
| 818 | + assert thr.ready |
| 819 | + |
| 820 | + httpserver.stop() |
| 821 | + |
| 822 | + |
| 823 | +@pytest.mark.parametrize( |
| 824 | + 'adapter_type', |
| 825 | + ('builtin',), |
| 826 | +) |
| 827 | +def test_builtin_adapter_with_false_key_password( |
| 828 | + tls_certificate_chain_pem_path, |
| 829 | + tls_certificate_passwd_private_key_pem_path, |
| 830 | + adapter_type, |
| 831 | +): |
| 832 | + """Check that builtin ssl-adapter initialization fails when wrong private key password given.""" |
| 833 | + tls_adapter_cls = get_ssl_adapter_class(name=adapter_type) |
| 834 | + with pytest.raises(ssl.SSLError, match=r'\[SSL\] PEM.+'): |
| 835 | + tls_adapter_cls( |
| 836 | + certificate=tls_certificate_chain_pem_path, |
| 837 | + private_key=tls_certificate_passwd_private_key_pem_path, |
| 838 | + private_key_password=str(uuid.uuid4()), |
| 839 | + ) |
| 840 | + |
| 841 | + |
| 842 | +@pytest.mark.parametrize( |
| 843 | + 'adapter_type', |
| 844 | + ('pyopenssl',), |
| 845 | +) |
| 846 | +def test_openssl_adapter_with_false_key_password( |
| 847 | + tls_certificate_chain_pem_path, |
| 848 | + tls_certificate_passwd_private_key_pem_path, |
| 849 | + adapter_type, |
| 850 | +): |
| 851 | + """Check that server init fails when wrong private key password given.""" |
| 852 | + httpserver = HTTPServer( |
| 853 | + bind_addr=(ANY_INTERFACE_IPV4, EPHEMERAL_PORT), |
| 854 | + gateway=Gateway, |
| 855 | + ) |
| 856 | + |
| 857 | + tls_adapter_cls = get_ssl_adapter_class(name=adapter_type) |
| 858 | + tls_adapter = tls_adapter_cls( |
| 859 | + certificate=tls_certificate_chain_pem_path, |
| 860 | + private_key=tls_certificate_passwd_private_key_pem_path, |
| 861 | + private_key_password=str(uuid.uuid4()), |
| 862 | + ) |
| 863 | + |
| 864 | + httpserver.ssl_adapter = tls_adapter |
| 865 | + |
| 866 | + with pytest.raises(OpenSSL.SSL.Error, match=r'.+bad decrypt.+'): |
| 867 | + httpserver.prepare() |
| 868 | + |
| 869 | + assert not httpserver.requests._threads |
| 870 | + assert not httpserver.ready |
0 commit comments