-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhttp_signature_server.py
132 lines (102 loc) · 4.33 KB
/
http_signature_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from base64 import b64decode
from collections import defaultdict
from datetime import datetime
import re
from typing import Callable, DefaultDict, List, Tuple, Union
def verify_headers(
verify: Callable[[str, bytes, bytes], Union[None, bool]],
max_skew: int, method: str, path: str,
headers: Tuple[Tuple[str, str], ...]
) \
-> Union[
Tuple[str, Tuple[None, None]],
Tuple[None, Tuple[str, Tuple[Tuple[str, str], ...]]]]:
now = int(datetime.now().timestamp())
#########################
# Ensure signature header
try:
sig_header = next(value for key, value in headers if key.lower() == 'signature').strip()
except StopIteration:
return 'Missing signature header', (None, None)
#############################
# Ensure is of correct format
is_sig = re.match(r'^(([a-zA-Z]+=(("[^"]*")|\d+))(, (?=[a-zA-Z])|$))*$', sig_header)
if not is_sig:
return 'Invalid signature header', (None, None)
#################################
# Ensure have required parameters
param_key_values = re.findall(r'([a-zA-Z]+)=(?:(?:"([^"]*)")|(\d+))', sig_header)
params = dict(((key, (v_str or v_num)) for key, v_str, v_num in param_key_values))
if len(param_key_values) != len(params):
return 'Repeated parameter', (None, None)
try:
key_id_param = params['keyId']
except KeyError:
return f'Missing keyId parameter', (None, None)
try:
headers_param = params['headers']
except KeyError:
return f'Missing headers parameter', (None, None)
try:
signature_param = params['signature']
except KeyError:
return f'Missing signature parameter', (None, None)
try:
created_param = params['created']
except KeyError:
return f'Missing created parameter', (None, None)
try:
created = int(created_param)
except ValueError:
return 'Invalid created paramater', (None, None)
################################
# Ensure time skew not too large
if abs(now - created) > max_skew:
return 'Created skew too large', (None, None)
########################################
# Ensure required claimed-signed headers
claimed_signed_headers = re.findall(r'\S+', headers_param)
claimed_signed_headers_set = set(claimed_signed_headers)
if len(claimed_signed_headers) != len(claimed_signed_headers_set):
return 'Repeated signed header', (None, None)
if '(created)' not in claimed_signed_headers_set:
return 'Unsigned (created) pseudo-header', (None, None)
if '(request-target)' not in claimed_signed_headers_set:
return 'Unsigned (request-target) pseudo-header', (None, None)
###################################################
# Ensure have values for all claimed-signed headers
method_lower = method.lower()
available_headers = (
('(created)', created_param),
('(request-target)', f'{method_lower} {path}'),
) + tuple(
(key.lower(), value.strip()) for key, value in headers
)
available_headers_dict = dict(available_headers)
for header in claimed_signed_headers:
if header not in available_headers_dict:
return f'Missing signed {header} header value', (None, None)
##################
# Verify signature
def signature_input() -> Tuple[Tuple[str, str], ...]:
headers_lists: DefaultDict[str, List[str]] = defaultdict(list)
for key, value in available_headers:
headers_lists[key].append(value)
return tuple((key, ', '.join(headers_lists[key])) for key in claimed_signed_headers)
verified = verify(
key_id_param, b64decode(signature_param), '\n'.join(
f'{key}: {value}' for key, value in signature_input()
).encode('ascii'))
if verified is None:
return 'Unknown keyId', (None, None)
if not verified:
return 'Signature does not verify', (None, None)
##############################################
# Generate key value pairs of verified headers
def verified_headers() -> Tuple[Tuple[str, str], ...]:
key_values = []
for key, value in headers:
if key.lower() in claimed_signed_headers_set:
key_values.append((key, value))
return tuple(key_values)
return None, (key_id_param, verified_headers())