-
Notifications
You must be signed in to change notification settings - Fork 61
Rafactoring ACL attributes #464
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from 14 commits
2e62757
df3174c
0998615
30fd4e3
126d5bc
ff94d47
b252742
efa273b
f202f3d
38912b0
bad0a89
8af6483
0e4d19e
07858d8
7a9e7e0
d91c286
52d6f23
7c5ef0f
3f2f198
b54584e
f9e91e1
ac70581
e530d09
435963b
95852c0
79ba760
5ca290f
68d834c
b5547a3
d81a7c2
5ee0a05
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ | |
|
||
import itertools | ||
import copy | ||
import types | ||
import typing as t | ||
from netutils.protocol_mapper import PROTO_NAME_TO_NUM, TCP_NAME_TO_NUM, UDP_NAME_TO_NUM | ||
from netutils.ip import is_ip_within | ||
|
@@ -16,8 +17,8 @@ | |
INPUT_SCHEMA = { | ||
"type": "object", | ||
"properties": { | ||
"name": {"type": "string"}, | ||
"src_zone": {"type": ["string", "array"]}, | ||
"name": {"type": ["string", "null"]}, | ||
"src_zone": {"type": ["string", "array", "null"]}, | ||
"src_ip": {"$ref": "#/definitions/arrayOrIP"}, | ||
"dst_ip": {"$ref": "#/definitions/arrayOrIP"}, | ||
"dst_port": { | ||
|
@@ -35,8 +36,9 @@ | |
}, | ||
], | ||
}, | ||
"dst_zone": {"type": ["string", "array"]}, | ||
"dst_zone": {"type": ["string", "array", "null"]}, | ||
"action": {"type": "string"}, | ||
"protocol": {"type": ["string", "null"]}, | ||
}, | ||
"definitions": { | ||
"ipv4": {"type": "string", "pattern": "^(?:\\d{1,3}\\.){3}\\d{1,3}$"}, | ||
|
@@ -130,6 +132,8 @@ def _cartesian_product(data: t.Dict[str, str]) -> t.List[t.Dict[str, t.Any]]: | |
keys.append(key) | ||
if isinstance(value, (str, int)): | ||
values.append([value]) | ||
elif value is None: | ||
values.append([None]) | ||
else: | ||
values.append(value) | ||
product = list(itertools.product(*values)) | ||
|
@@ -147,30 +151,54 @@ def _check_schema(data: t.Any, schema: t.Any, verify: bool) -> None: | |
raise ValueError() | ||
|
||
|
||
def get_attributes(obj): | ||
"""Function that describes class attributes.""" | ||
result = { | ||
# name for name in dir(cls) | ||
attr: getattr(obj, attr) | ||
for attr in dir(obj) | ||
if not attr.startswith("_") | ||
and not callable(getattr(obj, attr)) | ||
and not isinstance(getattr(obj, attr), types.FunctionType) | ||
} | ||
return result | ||
|
||
|
||
class ACLRule: | ||
"""A class that helps you imagine an acl rule via methodologies.""" | ||
|
||
attrs: t.List[str] = ["name", "src_ip", "src_zone", "dst_ip", "dst_port", "dst_zone", "action"] | ||
permit: str = "permit" | ||
deny: str = "deny" | ||
name: t.Any = None | ||
src_ip: t.Any = None | ||
src_zone: t.Any = None | ||
dst_ip: t.Any = None | ||
dst_port: t.Any = None | ||
dst_zone: t.Any = None | ||
protocol: t.Any = None | ||
action: t.Any = None | ||
|
||
input_data_verify: bool = False | ||
input_data_schema: t.Any = INPUT_SCHEMA | ||
class Meta: # pylint: disable=too-few-public-methods | ||
"""Default meta class.""" | ||
|
||
result_data_verify: bool = False | ||
result_data_schema: t.Any = RESULT_SCHEMA | ||
permit: str = "permit" | ||
deny: str = "deny" | ||
|
||
matrix: t.Any = {} | ||
matrix_enforced: bool = False | ||
matrix_definition: t.Any = {} | ||
input_data_verify: bool = False | ||
input_data_schema: t.Any = INPUT_SCHEMA | ||
|
||
dst_port_process: bool = True | ||
result_data_verify: bool = False | ||
result_data_schema: t.Any = RESULT_SCHEMA | ||
|
||
order_validate: t.List[str] = [] | ||
order_enforce: t.List[str] = [] | ||
filter_same_ip: bool = True | ||
matrix: t.Any = {} | ||
matrix_enforced: bool = False | ||
matrix_definition: t.Any = {} | ||
|
||
def __init__(self, data: t.Any, *args: t.Any, **kwargs: t.Any): # pylint: disable=unused-argument | ||
dst_port_process: bool = True | ||
|
||
order_validate: t.List[str] = [] | ||
order_enforce: t.List[str] = [] | ||
filter_same_ip: bool = True | ||
|
||
def __init__(self, **kwargs): # pylint: disable=unused-argument | ||
"""Initialize and load data. | ||
|
||
Args: | ||
|
@@ -179,55 +207,80 @@ def __init__(self, data: t.Any, *args: t.Any, **kwargs: t.Any): # pylint: disab | |
Examples: | ||
>>> from netutils.acl import ACLRule | ||
>>> | ||
>>> acl_data = dict( | ||
>>> rule = ACLRule( | ||
itdependsnetworks marked this conversation as resolved.
Show resolved
Hide resolved
|
||
... name="Check no match", | ||
... src_ip=["10.1.1.1"], | ||
... dst_ip="172.16.0.10", | ||
... dst_port="tcp/www-http", | ||
... action="permit", | ||
... ) | ||
>>> | ||
>>> rule = ACLRule(acl_data) | ||
>>> | ||
>>> rule.expanded_rules | ||
[{'name': 'Check no match', 'src_ip': '10.1.1.1', 'dst_ip': '172.16.0.10', 'dst_port': '6/80', 'action': 'permit'}] | ||
>>> | ||
""" | ||
self.processed: t.Dict[str, str] = {} | ||
self.data = data | ||
self.load_data() | ||
self.__load_data(kwargs=kwargs) | ||
|
||
def load_data(self) -> None: | ||
def __load_data(self, kwargs) -> None: | ||
"""Load the data into the rule while verifying input data, result data, and processing data.""" | ||
# Remaining kwargs stored under ACLRule.Meta | ||
pop_kwargs = [] | ||
for key, val in kwargs.items(): | ||
if key not in get_attributes(self): | ||
setattr(self.Meta, key, val) | ||
pop_kwargs.append(key) | ||
|
||
# Pop unneeded keys | ||
for key in pop_kwargs: | ||
kwargs.pop(key) | ||
|
||
# Ensure each class attr is in init kwargs. | ||
for attr in get_attributes(self): | ||
if attr not in kwargs: | ||
kwargs[attr] = getattr(self, attr) | ||
|
||
# Store the init input | ||
self._preprocessed_data = copy.deepcopy(kwargs) | ||
self._processed_data = copy.deepcopy(self._preprocessed_data) | ||
|
||
# Input check | ||
self.input_data_check() | ||
for attr in self.attrs: | ||
if not self.data.get(attr): | ||
continue | ||
if hasattr(self, f"process_{attr}"): | ||
proccessor = getattr(self, f"process_{attr}") | ||
_attr_data = proccessor(self.data[attr]) | ||
|
||
for attr in get_attributes(self): | ||
processor_func = getattr(self, f"process_{attr}", None) | ||
if processor_func: | ||
_attr_data = processor_func(self._processed_data[attr]) | ||
else: | ||
_attr_data = self.data[attr] | ||
self.processed[attr] = _attr_data | ||
_attr_data = self._processed_data[attr] | ||
|
||
self._processed_data[attr] = _attr_data | ||
setattr(self, attr, _attr_data) | ||
|
||
self.result_data_check() | ||
self.validate() | ||
self.expanded_rules = _cartesian_product(self.processed) | ||
if self.filter_same_ip: | ||
self.expanded_rules = [item for item in self.expanded_rules if item["dst_ip"] != item["src_ip"]] | ||
|
||
@property | ||
def expanded_rules(self): | ||
"""Expanded rule property.""" | ||
_expanded_rules = _cartesian_product(self._processed_data) | ||
if self.Meta.filter_same_ip: | ||
_expanded_rules = [item for item in _expanded_rules if item["dst_ip"] != item["src_ip"]] | ||
|
||
return _expanded_rules | ||
|
||
def input_data_check(self) -> None: | ||
"""Verify the input data against the specified JSONSchema or using a simple dictionary check.""" | ||
return _check_schema(self.data, self.input_data_schema, self.input_data_verify) | ||
return _check_schema(self._preprocessed_data, self.Meta.input_data_schema, self.Meta.input_data_verify) | ||
|
||
def result_data_check(self) -> None: | ||
"""Verify the result data against the specified JSONSchema or using a simple dictionary check.""" | ||
return _check_schema(self.processed, self.result_data_schema, self.result_data_verify) | ||
return _check_schema(self._processed_data, self.Meta.result_data_schema, self.Meta.result_data_verify) | ||
|
||
def validate(self) -> t.Any: | ||
"""Run through any method that startswith('validate_') and run that method.""" | ||
if self.order_validate: | ||
method_order = self.order_validate | ||
if self.Meta.order_validate: | ||
method_order = self.Meta.order_validate | ||
else: | ||
method_order = dir(self) | ||
results = [] | ||
|
@@ -255,8 +308,8 @@ def process_dst_port( | |
https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.csv. | ||
""" | ||
output = [] | ||
if not self.dst_port_process: | ||
return None | ||
if not self.Meta.dst_port_process: | ||
return self.dst_port | ||
if not isinstance(dst_port, list): | ||
dst_port = [dst_port] | ||
for item in dst_port: | ||
|
@@ -286,8 +339,8 @@ def enforce(self) -> t.List[t.Dict[str, t.Any]]: | |
Returns: | ||
A list of dictionaries that explains the results of the enforcement. | ||
""" | ||
if self.order_enforce: | ||
method_order = self.order_enforce | ||
if self.Meta.order_enforce: | ||
method_order = self.Meta.order_enforce | ||
else: | ||
method_order = dir(self) | ||
results = [] | ||
|
@@ -308,11 +361,11 @@ def enforce_matrix(self) -> t.Union[t.List[t.Dict[str, t.Any]], None]: | |
Returns: | ||
A list of dictionaries that explains the results of the matrix being enforced. | ||
""" | ||
if not self.matrix_enforced: | ||
if not self.Meta.matrix_enforced: | ||
return None | ||
if not self.matrix: | ||
if not self.Meta.matrix: | ||
raise ValueError("You must set a matrix dictionary to use the matrix feature.") | ||
if not self.matrix_definition: | ||
if not self.Meta.matrix_definition: | ||
raise ValueError("You must set a matrix definition dictionary to use the matrix feature.") | ||
actions = [] | ||
for rule in self.expanded_rules: | ||
|
@@ -322,14 +375,14 @@ def enforce_matrix(self) -> t.Union[t.List[t.Dict[str, t.Any]], None]: | |
src_zone = "" | ||
dst_zone = "" | ||
as_tuple = (source, destination, port) | ||
for zone, ips in self.matrix_definition.items(): | ||
for zone, ips in self.Meta.matrix_definition.items(): | ||
if is_ip_within(source, ips): | ||
src_zone = zone | ||
if is_ip_within(destination, ips): | ||
dst_zone = zone | ||
if port in self.matrix.get(src_zone, {}).get(dst_zone, {}).get("allow", []): | ||
if port in self.Meta.matrix.get(src_zone, {}).get(dst_zone, {}).get("allow", []): | ||
actions.append({"obj": as_tuple, "action": "allow"}) | ||
elif port in self.matrix.get(src_zone, {}).get(dst_zone, {}).get("notify", []): | ||
elif port in self.Meta.matrix.get(src_zone, {}).get(dst_zone, {}).get("notify", []): | ||
actions.append({"obj": as_tuple, "action": "notify"}) | ||
else: | ||
actions.append({"obj": as_tuple, "action": "deny"}) | ||
|
@@ -429,9 +482,9 @@ def match_details(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint: | |
rules_unmatched: t.List[t.Dict[str, t.Any]] = [] | ||
rules_matched: t.List[t.Dict[str, t.Any]] = [] | ||
|
||
if not match_rule.expanded_rules: | ||
if not match_rule.expanded_rules: # pylint: disable=protected-access | ||
raise ValueError("There is no expanded rules to test against.") | ||
for rule in match_rule.expanded_rules: | ||
for rule in match_rule.expanded_rules: # pylint: disable=protected-access | ||
rules_found.append(False) | ||
for existing_rule in self.expanded_rules: | ||
missing = False | ||
|
@@ -452,8 +505,8 @@ def match_details(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint: | |
break | ||
detailed_info = { | ||
"existing_rule_product": existing_rule, # pylint: disable=undefined-loop-variable | ||
"match_rule": match_rule.processed, | ||
"existing_rule": self.processed, | ||
"match_rule": match_rule._processed_data, # pylint: disable=protected-access | ||
"existing_rule": self._processed_data, | ||
} | ||
if rules_found[-1]: | ||
detailed_info["match_rule_product"] = rule | ||
|
@@ -474,34 +527,38 @@ def match(self, match_rule: "ACLRule") -> bool: | |
details = self.match_details(match_rule) | ||
return not bool(details["rules_unmatched"]) | ||
|
||
def __repr__(self) -> str: | ||
"""Set repr of the object to be sane.""" | ||
output = [] | ||
for attr in self.attrs: | ||
if self.processed.get(attr): | ||
output.append(f"{attr}: {self.processed[attr]}") | ||
return ", ".join(output) | ||
# def __repr__(self) -> str: | ||
# """Set repr of the object to be sane.""" | ||
# output = [] | ||
# for attr in self.attrs: | ||
# if self._processed.get(attr): | ||
# output.append(f"{attr}: {self._processed[attr]}") | ||
# return ", ".join(output) | ||
|
||
|
||
class ACLRules: | ||
"""Class to help match multiple ACLRule objects.""" | ||
|
||
class_obj = ACLRule | ||
rules: t.List[t.Any] = [] | ||
|
||
class Meta: # pylint: disable=too-few-public-methods | ||
"""Default meta class.""" | ||
|
||
class_obj = ACLRule | ||
|
||
def __init__(self, data: t.Any, *args: t.Any, **kwargs: t.Any): # pylint: disable=unused-argument | ||
"""Class to help match multiple ACLRule. | ||
|
||
Args: | ||
data: A list of `ACLRule` rules. | ||
""" | ||
self.data: t.Any = data | ||
self.rules: t.List[t.Any] = [] | ||
self.load_data() | ||
self.load_data(data=data) | ||
|
||
def load_data(self) -> None: | ||
def load_data(self, data) -> None: | ||
"""Load the data for multiple rules.""" | ||
for item in self.data: | ||
self.rules.append(self.class_obj(item)) | ||
for item in data: | ||
self.rules.append(self.Meta.class_obj(**item)) | ||
|
||
def match(self, rule: ACLRule) -> str: | ||
"""Check the rules loaded in `load_data` match against a new `rule`. | ||
|
@@ -513,9 +570,9 @@ def match(self, rule: ACLRule) -> str: | |
The response from the rule that matched, or `deny` by default. | ||
""" | ||
for item in self.rules: | ||
if item.match(self.class_obj(rule)): | ||
return str(item.action) | ||
return str(item.deny) # pylint: disable=undefined-loop-variable | ||
if item.match(rule): # mzb: bugfix | ||
return True # mzb: change to bool | ||
return False # pylint: disable=undefined-loop-variable | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had this as string as most vendors have more than just permit or deny, this is Palo's
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is reasonable to have different levels. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think, I see this in a little different way :
I would really like to avoid putting opinionated logic into generic |
||
|
||
def match_details(self, rule: ACLRule) -> t.Any: | ||
"""Verbosely check the rules loaded in `load_data` match against a new `rule`. | ||
|
@@ -528,5 +585,5 @@ def match_details(self, rule: ACLRule) -> t.Any: | |
""" | ||
output = [] | ||
for item in self.rules: | ||
output.append(item.match_details(self.class_obj(rule))) | ||
output.append(item.match_details(rule)) # mzb: bugfix | ||
return output |
Uh oh!
There was an error while loading. Please reload this page.