diff --git a/roles/database/files/sql/creation/fworch-create-constraints.sql b/roles/database/files/sql/creation/fworch-create-constraints.sql index 74fffb3ba..e693ce372 100755 --- a/roles/database/files/sql/creation/fworch-create-constraints.sql +++ b/roles/database/files/sql/creation/fworch-create-constraints.sql @@ -17,8 +17,8 @@ Alter Table "import_control" add Constraint "control_id_stop_time_unique" UNIQUE Alter Table "object" add Constraint "obj_altkey" UNIQUE ("mgm_id","zone_id","obj_uid","obj_create"); ALTER TABLE object ADD CONSTRAINT object_obj_ip_is_host CHECK (is_single_ip(obj_ip)); ALTER TABLE object ADD CONSTRAINT object_obj_ip_end_is_host CHECK (is_single_ip(obj_ip_end)); -ALTER TABLE object ADD CONSTRAINT object_obj_ip_not_null CHECK (obj_ip IS NOT NULL OR obj_typ_id=2); -ALTER TABLE object ADD CONSTRAINT object_obj_ip_end_not_null CHECK (obj_ip_end IS NOT NULL OR obj_typ_id=2); +ALTER TABLE "object" ADD CONSTRAINT object_obj_ip_not_null CHECK NOT (obj_ip IS NULL AND obj_typ_id IN (1, 3, 4)); +ALTER TABLE "object" ADD CONSTRAINT object_obj_ip_end_not_null CHECK NOT (obj_ip_end IS NULL AND obj_typ_id IN (1, 3, 4)); ALTER TABLE owner ADD CONSTRAINT owner_name_unique_in_tenant UNIQUE ("name","tenant_id"); ALTER TABLE owner_network ADD CONSTRAINT port_in_valid_range CHECK (port > 0 and port <= 65535); ALTER TABLE owner_network ADD CONSTRAINT owner_network_ip_is_host CHECK (is_single_ip(ip)); diff --git a/roles/database/files/upgrade/9.0.sql b/roles/database/files/upgrade/9.0.sql index 9b0e58739..e443e091e 100644 --- a/roles/database/files/upgrade/9.0.sql +++ b/roles/database/files/upgrade/9.0.sql @@ -1061,6 +1061,12 @@ ALTER TABLE "rulebase_link" ADD COLUMN IF NOT EXISTS "removed" BIGINT; -- add obj type access-role for cp import INSERT INTO stm_obj_typ (obj_typ_id,obj_typ_name) VALUES (21,'access-role') ON CONFLICT DO NOTHING; +-- change ip not Null constraint, only applicable if obj_typ_name is host, network or machines_range +ALTER TABLE "object" DROP CONSTRAINT IF EXISTS "object_obj_ip_not_null" CASCADE; +ALTER TABLE "object" DROP CONSTRAINT IF EXISTS "object_obj_ip_end_not_null" CASCADE; +ALTER TABLE "object" ADD CONSTRAINT object_obj_ip_not_null CHECK NOT (obj_ip IS NULL AND obj_typ_id IN (1, 3, 4)); +ALTER TABLE "object" ADD CONSTRAINT object_obj_ip_end_not_null CHECK NOT (obj_ip_end IS NULL AND obj_typ_id IN (1, 3, 4)); + -- remove dev_id fk and set nullable if column exists ALTER TABLE changelog_rule DROP CONSTRAINT IF EXISTS changelog_rule_dev_id_fkey; diff --git a/roles/importer/files/importer/checkpointR8x/cp_rule.py b/roles/importer/files/importer/checkpointR8x/cp_rule.py index 32c33ea06..63e90ad42 100644 --- a/roles/importer/files/importer/checkpointR8x/cp_rule.py +++ b/roles/importer/files/importer/checkpointR8x/cp_rule.py @@ -24,7 +24,7 @@ - migrate section headers from rule to ordering element ... """ -def normalize_rulebases (nativeConfig, native_config_global, importState, normalized_config_dict, +def normalize_rulebases(nativeConfig, native_config_global, importState, normalized_config_dict, normalized_config_global, is_global_loop_iteration): normalized_config_dict['policies'] = [] diff --git a/roles/importer/files/importer/common.py b/roles/importer/files/importer/common.py index 3c4b2c5c8..557b9a34d 100644 --- a/roles/importer/files/importer/common.py +++ b/roles/importer/files/importer/common.py @@ -92,8 +92,9 @@ def import_management(mgmId=None, ssl_verification=None, debug_level_in=0, except (FwoApiWriteError, FwoImporterError) as e: importState.addError(f"FwoApiWriteError or FwoImporterError: {str(e.args)} - aborting import") rollBackExceptionHandler(importState, configImporter=config_importer, exc=e, errorText="") - except FwoImporterErrorInconsistencies: + except FwoImporterErrorInconsistencies as e: importState.delete_import() # delete whole import + importState.addError(str(e.args)) except ValueError: importState.addError("ValueError - aborting import") raise diff --git a/roles/importer/files/importer/fortiadom5ff/fmgr_consts.py b/roles/importer/files/importer/fortiadom5ff/fmgr_consts.py index ba810971a..44127b2e6 100644 --- a/roles/importer/files/importer/fortiadom5ff/fmgr_consts.py +++ b/roles/importer/files/importer/fortiadom5ff/fmgr_consts.py @@ -2,10 +2,12 @@ nw_obj_types = ['firewall/address', 'firewall/address6', 'firewall/addrgrp', 'firewall/addrgrp6', 'firewall/ippool', 'firewall/vip', 'system/external-resource', 'firewall/wildcard-fqdn/custom', 'firewall/wildcard-fqdn/group'] -# TODO add + svc_obj_types = ['application/list', 'application/group', 'application/categories', 'application/custom', 'firewall/service/custom', 'firewall/service/group'] +nat_types = ['central/dnat', 'central/dnat6', 'firewall/central-snat-map'] + # delte_v: beide typen können weg # v4_object_types = ['nw_obj_global_firewall/address', 'nw_obj_global_firewall/addrgrp'] # v6_object_types = ['nw_obj_adom_firewall/address', 'nw_obj_adom_firewall/addrgrp','nw_obj_global_firewall/address', \ diff --git a/roles/importer/files/importer/fortiadom5ff/fmgr_getter.py b/roles/importer/files/importer/fortiadom5ff/fmgr_getter.py index 8ad342dcc..8adc75e89 100644 --- a/roles/importer/files/importer/fortiadom5ff/fmgr_getter.py +++ b/roles/importer/files/importer/fortiadom5ff/fmgr_getter.py @@ -98,7 +98,6 @@ def update_config_with_fortinet_api_call(config_json, sid, api_base_url, api_pat # adding options if len(options)>0: payload['params'][0].update({'option': options}) - # payload['params'][0].update({'filter': options}) result = fortinet_api_call(sid, api_base_url, api_path, payload=payload, method=method) full_result.extend(result) @@ -159,12 +158,3 @@ def parse_device_and_vdom(fmgr_device, mgm_details_device, device_vdom_dict, fou else: device_vdom_dict.update({fmgr_device['name']: {fmgr_vdom['name']: ''}}) return found_fmgr_device - -def get_policy_packages_from_manager(sid, fm_api_url, adom=''): - if adom == '': - url = '/pm/pkg/global' - else: - url = '/pm/pkg/adom/' + adom - policy_packages_result = fortinet_api_call(sid, fm_api_url, url) - - return policy_packages_result diff --git a/roles/importer/files/importer/fortiadom5ff/fmgr_network.py b/roles/importer/files/importer/fortiadom5ff/fmgr_network.py index c5727b499..cdf8d5fea 100644 --- a/roles/importer/files/importer/fortiadom5ff/fmgr_network.py +++ b/roles/importer/files/importer/fortiadom5ff/fmgr_network.py @@ -2,14 +2,14 @@ import ipaddress from fwo_log import getFwoLogger from fwo_const import list_delimiter, nat_postfix -from fmgr_zone import add_zone_if_missing +from fmgr_zone import find_zones_in_normalized_config from fwo_config import readConfig from model_controllers.import_state_controller import ImportStateController from copy import deepcopy from fwo_exceptions import FwoImporterErrorInconsistencies -def normalize_network_objects(import_state: ImportStateController, native_config, native_config_global, normalized_config, normalized_config_global, nw_obj_types): +def normalize_network_objects(native_config, normalized_config_adom, normalized_config_global, nw_obj_types): nw_objects = [] if 'objects' not in native_config: @@ -19,7 +19,7 @@ def normalize_network_objects(import_state: ImportStateController, native_config if not(current_obj_type in nw_obj_types and 'data' in native_config['objects'][current_obj_type]): continue for obj_orig in native_config['objects'][current_obj_type]['data']: - normalize_network_object(obj_orig, nw_objects, normalized_config, native_config['objects'], current_obj_type) + normalize_network_object(obj_orig, nw_objects, normalized_config_adom, normalized_config_global, native_config['objects'], current_obj_type) if native_config.get('is-super-manager',False): # finally add "Original" network object for natting (only in global domain) @@ -28,7 +28,7 @@ def normalize_network_objects(import_state: ImportStateController, native_config nw_objects.append(create_network_object(name=original_obj_name, type='network', ip='0.0.0.0', ip_end='255.255.255.255',\ uid=original_obj_uid, zone='global', color='black', comment='"original" network object created by FWO importer for NAT purposes')) - normalized_config.update({'network_objects': nw_objects}) + normalized_config_adom.update({'network_objects': nw_objects}) def get_obj_member_refs_list(obj_orig, native_config_objects, current_obj_type): obj_member_refs_list = [] @@ -52,8 +52,7 @@ def exclude_object_types_in_member_ref_search(obj_type, current_obj_type): skip_member_ref_loop = True return skip_member_ref_loop -def normalize_network_object(obj_orig, nw_objects, normalized_config, native_config_objects, current_obj_type): - obj_zone = 'global' +def normalize_network_object(obj_orig, nw_objects, normalized_config_adom, normalized_config_global, native_config_objects, current_obj_type): obj = {} obj.update({'obj_name': obj_orig['name']}) if 'subnet' in obj_orig: # ipv4 object @@ -98,17 +97,12 @@ def normalize_network_object(obj_orig, nw_objects, normalized_config, native_con obj.update({'obj_uid': obj_orig.get('uuid', obj_orig['name'])}) # using name as fallback, but this should not happen - # here only picking first associated interface as zone: - if 'associated-interface' in obj_orig and len(obj_orig['associated-interface'])>0: # and obj_orig['associated-interface'][0] != 'any': - obj_zone = deepcopy(obj_orig['associated-interface'][0]) - # adding zone if it not yet exists - obj_zone = add_zone_if_missing (normalized_config, obj_zone) - obj.update({'obj_zone': obj_zone }) + associated_interfaces = find_zones_in_normalized_config( + obj_orig.get('associated-interface', []), normalized_config_adom, normalized_config_global) + obj.update({'obj_zone': list_delimiter.join(associated_interfaces)}) - #obj.update({'control_id': import_state.ImportId}) nw_objects.append(obj) - def _parse_subnet (obj, obj_orig): ipa = ipaddress.ip_network(str(obj_orig['subnet'][0]) + '/' + str(obj_orig['subnet'][1])) if ipa.num_addresses > 1: diff --git a/roles/importer/files/importer/fortiadom5ff/fmgr_rule.py b/roles/importer/files/importer/fortiadom5ff/fmgr_rule.py index f973accd8..44e6be11e 100644 --- a/roles/importer/files/importer/fortiadom5ff/fmgr_rule.py +++ b/roles/importer/files/importer/fortiadom5ff/fmgr_rule.py @@ -1,26 +1,23 @@ import copy -import jsonpickle import ipaddress -import time +from time import strftime, localtime from fwo_const import list_delimiter, nat_postfix, dummy_ip -from fwo_base import extend_string_list, sanitize +from fwo_base import extend_string_list from fmgr_service import create_svc_object from fmgr_network import create_network_object, get_first_ip_of_destination -from fmgr_zone import add_zone_if_missing +from fmgr_zone import find_zones_in_normalized_config +from fmgr_consts import nat_types import fmgr_getter -from fmgr_gw_networking import get_device_from_package from fwo_log import getFwoLogger from model_controllers.route_controller import get_matching_route_obj, get_ip_of_interface_obj from fwo_exceptions import FwoDeviceWithoutLocalPackage, FwoImporterErrorInconsistencies -#from fmgr_base import resolve_raw_objects, resolve_objects -from models.rule import Rule, RuleNormalized, RuleAction, RuleTrack, RuleType +from models.rule import RuleNormalized, RuleAction, RuleTrack, RuleType from models.rulebase import Rulebase -import fwo_globals -from models.import_state import ImportState - NETWORK_OBJECT='network_object' STRING_PKG = '/pkg/' +STRING_PM_CONFIG_GLOBAL_PKG = '/pm/config/global/pkg/' +STRING_PM_CONFIG_ADOM = '/pm/config/adom/' rule_access_scope_v4 = ['rules_global_header_v4', 'rules_adom_v4', 'rules_global_footer_v4'] rule_access_scope_v6 = ['rules_global_header_v6', 'rules_adom_v6', 'rules_global_footer_v6'] rule_access_scope = rule_access_scope_v6 + rule_access_scope_v4 @@ -29,37 +26,34 @@ def normalize_rulebases( - import_state: ImportState, mgm_uid: str, - nativeConfig: dict, + native_config: dict, native_config_global: dict, - normalized_config_dict: dict, + normalized_config_adom: dict, normalized_config_global: dict, is_global_loop_iteration: bool ) -> None: - normalized_config_dict['policies'] = [] - + + normalized_config_adom['policies'] = [] fetched_rulebase_uids: list = [] - if normalized_config_global is not None: + if normalized_config_global != {}: for normalized_rulebase_global in normalized_config_global.get('policies', []): fetched_rulebase_uids.append(normalized_rulebase_global.uid) - for gateway in nativeConfig['gateways']: + for gateway in native_config['gateways']: normalize_rulebases_for_each_link_destination( - gateway, mgm_uid, fetched_rulebase_uids, nativeConfig, native_config_global, - is_global_loop_iteration, normalized_config_dict, + gateway, mgm_uid, fetched_rulebase_uids, native_config, native_config_global, + is_global_loop_iteration, normalized_config_adom, normalized_config_global) - # todo: parse nat rulebase here - - -def normalize_rulebases_for_each_link_destination(gateway, mgm_uid, fetched_rulebase_uids, nativeConfig, native_config_global, is_global_loop_iteration, normalized_config_dict, normalized_config_global): +def normalize_rulebases_for_each_link_destination(gateway, mgm_uid, fetched_rulebase_uids, native_config, + native_config_global, is_global_loop_iteration, normalized_config_adom, normalized_config_global): logger = getFwoLogger() for rulebase_link in gateway['rulebase_links']: if rulebase_link['to_rulebase_uid'] not in fetched_rulebase_uids and rulebase_link['to_rulebase_uid'] != '': - rulebase_to_parse = find_rulebase_to_parse(nativeConfig['rulebases'], rulebase_link['to_rulebase_uid']) + rulebase_to_parse = find_rulebase_to_parse(native_config['rulebases'], rulebase_link['to_rulebase_uid']) # search in global rulebase found_rulebase_in_global = False - if rulebase_to_parse == {} and not is_global_loop_iteration and native_config_global is not None: + if rulebase_to_parse == {} and not is_global_loop_iteration and native_config_global != {}: rulebase_to_parse = find_rulebase_to_parse( native_config_global['rulebases'], rulebase_link['to_rulebase_uid'] ) @@ -69,13 +63,31 @@ def normalize_rulebases_for_each_link_destination(gateway, mgm_uid, fetched_rule continue normalized_rulebase = initialize_normalized_rulebase(rulebase_to_parse, mgm_uid) - parse_rulebase(normalized_config_dict, normalized_config_global, rulebase_to_parse, normalized_rulebase) + parse_rulebase(normalized_config_adom, normalized_config_global, rulebase_to_parse, normalized_rulebase, found_rulebase_in_global) fetched_rulebase_uids.append(rulebase_link['to_rulebase_uid']) if found_rulebase_in_global: normalized_config_global['policies'].append(normalized_rulebase) else: - normalized_config_dict['policies'].append(normalized_rulebase) + normalized_config_adom['policies'].append(normalized_rulebase) + + # normalizing nat rulebases is work in progress + #normalize_nat_rulebase(rulebase_link, native_config, normalized_config_adom, normalized_config_global) + +def normalize_nat_rulebase(rulebase_link, native_config, normalized_config_adom, normalized_config_global): + if not rulebase_link['is_section']: + for nat_type in nat_types: + nat_type_string = nat_type + '_' + rulebase_link['to_rulebase_uid'] + nat_rulebase = get_native_nat_rulebase(native_config, nat_type_string) + parse_nat_rulebase(nat_rulebase, nat_type_string, normalized_config_adom, normalized_config_global) + +def get_native_nat_rulebase(native_config, nat_type_string): + logger = getFwoLogger() + for nat_rulebase in native_config['nat_rulebases']: + if nat_type_string == nat_rulebase['type']: + return nat_rulebase['data'] + logger.warning('no nat data for '+ nat_type_string) + return [] def find_rulebase_to_parse(rulebase_list, rulebase_uid): for rulebase in rulebase_list: @@ -92,13 +104,62 @@ def initialize_normalized_rulebase(rulebase_to_parse, mgm_uid): normalized_rulebase = Rulebase(uid=rulebaseUid, name=rulebaseName, mgm_uid=mgm_uid, Rules={}) return normalized_rulebase -def parse_rulebase(normalized_config_dict, normalized_config_global, rulebase_to_parse, normalized_rulebase): +def parse_rulebase(normalized_config_adom, normalized_config_global, rulebase_to_parse, normalized_rulebase, found_rulebase_in_global): rule_num = 1 for native_rule in rulebase_to_parse['data']: - rule_num = parse_single_rule(normalized_config_dict, normalized_config_global, native_rule, normalized_rulebase, rule_num) + rule_num = parse_single_rule(normalized_config_adom, normalized_config_global, native_rule, normalized_rulebase, rule_num) + if not found_rulebase_in_global: + add_implicit_deny_rule(rule_num, normalized_config_adom, normalized_config_global, normalized_rulebase) + +def add_implicit_deny_rule(rule_num, normalized_config_adom, normalized_config_global, rulebase: Rulebase): + + deny_rule = {'srcaddr': ['all'], 'srcaddr6': ['all'], + 'dstaddr': ['all'], 'dstaddr6': ['all'], + 'service': ['ALL'], + 'srcintf': ['any'], 'dstintf': ['any']} + + rule_src_list, rule_src_refs_list = rule_parse_addresses(deny_rule, 'src', normalized_config_adom, normalized_config_global, False) + rule_dst_list, rule_dst_refs_list = rule_parse_addresses(deny_rule, 'dst', normalized_config_adom, normalized_config_global, False) + rule_svc_list, rule_svc_refs_list = rule_parse_service(deny_rule) + rule_src_zones = find_zones_in_normalized_config( + deny_rule.get('srcintf', []), normalized_config_adom, normalized_config_global) + rule_dst_zones = find_zones_in_normalized_config( + deny_rule.get('dstintf', []), normalized_config_adom, normalized_config_global) + + rule_normalized = RuleNormalized( + rule_num=rule_num, + rule_num_numeric=0, + rule_disabled=False, + rule_src_neg=False, + rule_src=list_delimiter.join(rule_src_list), + rule_src_refs=list_delimiter.join(rule_src_refs_list), + rule_dst_neg=False, + rule_dst=list_delimiter.join(rule_dst_list), + rule_dst_refs=list_delimiter.join(rule_dst_refs_list), + rule_svc_neg=False, + rule_svc=list_delimiter.join(rule_svc_list), + rule_svc_refs=list_delimiter.join(rule_svc_refs_list), + rule_action=RuleAction.DROP, + rule_track=RuleTrack.NONE, + rule_installon='', + rule_time='', # Time-based rules not commonly used in basic Fortinet configs + rule_name='Implicit Deny', + rule_uid='', + rule_custom_fields=str({}), + rule_implied=False, + rule_type=RuleType.ACCESS, + last_change_admin='', + parent_rule_uid=None, + last_hit=None, + rule_comment='', + rule_src_zone=list_delimiter.join(rule_src_zones), + rule_dst_zone=list_delimiter.join(rule_dst_zones), + rule_head_text=None + ) + rulebase.Rules[rule_normalized.rule_uid] = rule_normalized -def parse_single_rule(normalized_config_dict, normalized_config_global, native_rule, rulebase: Rulebase, rule_num): +def parse_single_rule(normalized_config_adom, normalized_config_global, native_rule, rulebase: Rulebase, rule_num): # Extract basic rule information rule_disabled = True # Default to disabled if 'status' in native_rule and (native_rule['status'] == 1 or native_rule['status'] == 'enable'): @@ -108,16 +169,21 @@ def parse_single_rule(normalized_config_dict, normalized_config_global, native_r rule_track = rule_parse_tracking_info(native_rule) - rule_src_list, rule_src_refs_list = rule_parse_addresses(native_rule, 'src', normalized_config_dict, normalized_config_global) - rule_dst_list, rule_dst_refs_list = rule_parse_addresses(native_rule, 'dst', normalized_config_dict, normalized_config_global) + rule_src_list, rule_src_refs_list = rule_parse_addresses(native_rule, 'src', normalized_config_adom, normalized_config_global, False) + rule_dst_list, rule_dst_refs_list = rule_parse_addresses(native_rule, 'dst', normalized_config_adom, normalized_config_global, False) rule_svc_list, rule_svc_refs_list = rule_parse_service(native_rule) - rule_src_zone, rule_dst_zone = rule_parse_zone(native_rule, normalized_config_dict) + rule_src_zones = find_zones_in_normalized_config( + native_rule.get('srcintf', []), normalized_config_adom, normalized_config_global) + rule_dst_zones = find_zones_in_normalized_config( + native_rule.get('dstintf', []), normalized_config_adom, normalized_config_global) rule_src_neg, rule_dst_neg, rule_svc_neg = rule_parse_negation_flags(native_rule) rule_installon = rule_parse_installon(native_rule, rulebase.name) + last_hit = rule_parse_last_hit(native_rule) + # Create the normalized rule rule_normalized = RuleNormalized( rule_num=rule_num, @@ -141,19 +207,19 @@ def parse_single_rule(normalized_config_dict, normalized_config_global, native_r rule_custom_fields=str(native_rule.get('meta fields', {})), rule_implied=False, rule_type=RuleType.ACCESS, - last_change_admin=None, #TODO: native_rule.get('_last-modified-by'), - see #3589 + last_change_admin=native_rule.get('_last-modified-by', ''), parent_rule_uid=None, - last_hit=None, # TODO: get last hit + last_hit=last_hit, rule_comment=native_rule.get('comments'), - rule_src_zone=rule_src_zone, - rule_dst_zone=rule_dst_zone, + rule_src_zone=list_delimiter.join(rule_src_zones), + rule_dst_zone=list_delimiter.join(rule_dst_zones), rule_head_text=None ) # Add the rule to the rulebase rulebase.Rules[rule_normalized.rule_uid] = rule_normalized - # TODO: handle NAT + # TODO: handle combined NAT, see handle_combined_nat_rule return rule_num + 1 @@ -187,46 +253,49 @@ def rule_parse_service(native_rule): return rule_svc_list, rule_svc_refs_list -def rule_parse_zone(native_rule, normalized_config_dict): - # TODO: only using the first zone for now - rule_src_zone = None - if len(native_rule.get('srcintf', [])) > 0: - rule_src_zone = add_zone_if_missing(normalized_config_dict, native_rule['srcintf'][0]) - - rule_dst_zone = None - if len(native_rule.get('dstintf', [])) > 0: - rule_dst_zone = add_zone_if_missing(normalized_config_dict, native_rule['dstintf'][0]) - return rule_src_zone, rule_dst_zone - -def rule_parse_addresses(native_rule, target, normalized_config_dict, normalized_config_global): +def rule_parse_addresses(native_rule, target, normalized_config_adom, normalized_config_global, is_nat): if target not in ['src', 'dst']: raise FwoImporterErrorInconsistencies(f"target '{target}' must either be src or dst.") addr_list = [] addr_ref_list = [] - build_addr_list(native_rule, True, target, normalized_config_dict, normalized_config_global, addr_list, addr_ref_list) - build_addr_list(native_rule, False, target, normalized_config_dict, normalized_config_global, addr_list, addr_ref_list) + if not is_nat: + build_addr_list(native_rule, True, target, normalized_config_adom, normalized_config_global, addr_list, addr_ref_list) + build_addr_list(native_rule, False, target, normalized_config_adom, normalized_config_global, addr_list, addr_ref_list) + else: + build_nat_addr_list(native_rule, target, normalized_config_adom, normalized_config_global, addr_list, addr_ref_list) return addr_list, addr_ref_list -def build_addr_list(native_rule, is_v4, target, normalized_config_dict, normalized_config_global, addr_list, addr_ref_list): +def build_addr_list(native_rule, is_v4, target, normalized_config_adom, normalized_config_global, addr_list, addr_ref_list): if is_v4 and target == 'src': for addr in native_rule.get('srcaddr', []) + native_rule.get('internet-service-src-name', []): addr_list.append(addr) - addr_ref_list.append(find_addr_ref(addr, is_v4, normalized_config_dict, normalized_config_global)) + addr_ref_list.append(find_addr_ref(addr, is_v4, normalized_config_adom, normalized_config_global)) elif not is_v4 and target == 'src': for addr in native_rule.get('srcaddr6', []): addr_list.append(addr) - addr_ref_list.append(find_addr_ref(addr, is_v4, normalized_config_dict, normalized_config_global)) + addr_ref_list.append(find_addr_ref(addr, is_v4, normalized_config_adom, normalized_config_global)) elif is_v4 and target == 'dst': for addr in native_rule.get('dstaddr', []) + native_rule.get('internet-service-name', []): addr_list.append(addr) - addr_ref_list.append(find_addr_ref(addr, is_v4, normalized_config_dict, normalized_config_global)) + addr_ref_list.append(find_addr_ref(addr, is_v4, normalized_config_adom, normalized_config_global)) else: for addr in native_rule.get('dstaddr6', []): addr_list.append(addr) - addr_ref_list.append(find_addr_ref(addr, is_v4, normalized_config_dict, normalized_config_global)) + addr_ref_list.append(find_addr_ref(addr, is_v4, normalized_config_adom, normalized_config_global)) + +def build_nat_addr_list(native_rule, target, normalized_config_adom, normalized_config_global, addr_list, addr_ref_list): + # so far only ip v4 expected + if target == 'src': + for addr in native_rule.get('orig-addr', []): + addr_list.append(addr) + addr_ref_list.append(find_addr_ref(addr, True, normalized_config_adom, normalized_config_global)) + if target == 'dst': + for addr in native_rule.get('dst-addr', []): + addr_list.append(addr) + addr_ref_list.append(find_addr_ref(addr, True, normalized_config_adom, normalized_config_global)) -def find_addr_ref(addr, is_v4, normalized_config_dict, normalized_config_global): - for nw_obj in normalized_config_dict['network_objects'] + normalized_config_global.get('network_objects', []): +def find_addr_ref(addr, is_v4, normalized_config_adom, normalized_config_global): + for nw_obj in normalized_config_adom['network_objects'] + normalized_config_global.get('network_objects', []): if addr == nw_obj['obj_name']: if (is_v4 and ip_type(nw_obj) == 4) or (not is_v4 and ip_type(nw_obj) == 6): return nw_obj['obj_uid'] @@ -259,98 +328,91 @@ def rule_parse_installon(native_rule, rulebase_name): rule_installon = rulebase_name return rule_installon -def get_access_policy(sid, fm_api_url, native_config_domain, adom_device_vdom_policy_package_structure, adom_name, mgm_details_device, device_config, limit): - consolidated = '' # '/consolidated' - logger = getFwoLogger() +def rule_parse_last_hit(native_rule): + last_hit = native_rule.get('_last_hit', None) + if last_hit != None: + last_hit = strftime("%Y-%m-%d %H:%M:%S", localtime(last_hit)) + return last_hit + +def get_access_policy(sid, fm_api_url, native_config_adom, native_config_global, adom_device_vdom_policy_package_structure, adom_name, mgm_details_device, device_config, limit): previous_rulebase = None - local_pkg_name = find_local_pkg(adom_device_vdom_policy_package_structure, adom_name, mgm_details_device) - # delete_v: hier global_pkg_name später - #global_pkg_name = device['global_rulebase_name'] + link_list = [] + local_pkg_name, global_pkg_name = find_packages(adom_device_vdom_policy_package_structure, adom_name, mgm_details_device) options = ['extra info', 'scope member', 'get meta'] - # pkg_name = device['package_name'] pkg_name is not used at all - - # delete_v: hier initial link wenn global header existiert - # get global header rulebase: - # if device['global_rulebase_name'] is None or device['global_rulebase_name'] == '': - # logger.debug('no global rulebase name defined in fortimanager, ADOM=' + adom_name + ', local_package=' + local_pkg_name) - # else: - # fmgr_getter.update_config_with_fortinet_api_call( - # nativeConfig['rules_global_header_v4'], sid, fm_api_url, "/pm/config/global/pkg/" + global_pkg_name + "/global/header" + consolidated + "/policy", local_pkg_name, limit=limit) - # fmgr_getter.update_config_with_fortinet_api_call( - # nativeConfig['rules_global_header_v6'], sid, fm_api_url, "/pm/config/global/pkg/" + global_pkg_name + "/global/header" + consolidated + "/policy6", local_pkg_name, limit=limit) - - is_global = False - # get local rulebase - fmgr_getter.update_config_with_fortinet_api_call( - native_config_domain['rulebases'], sid, fm_api_url, "/pm/config/adom/" + adom_name + STRING_PKG + local_pkg_name + "/firewall" + consolidated + "/policy", 'rules_adom_v4_' + local_pkg_name, options=options, limit=limit) - fmgr_getter.update_config_with_fortinet_api_call( - native_config_domain['rulebases'], sid, fm_api_url, "/pm/config/adom/" + adom_name + STRING_PKG + local_pkg_name + "/firewall" + consolidated + "/policy6", 'rules_adom_v6_' + local_pkg_name, limit=limit) - # delete_v: hier initial link immer lokal, erweitern wenn wir global header/footer holen - link_list, previous_rulebase = link_v4_and_v6_rulebase(native_config_domain['rulebases'], local_pkg_name, previous_rulebase, is_global) - device_config['rulebase_links'].extend(link_list) - # get global footer rulebase: - # if device['global_rulebase_name'] != None and device['global_rulebase_name'] != '': - # fmgr_getter.update_config_with_fortinet_api_call( - # nativeConfig['rules_global_footer_v4'], sid, fm_api_url, "/pm/config/global/pkg/" + global_pkg_name + "/global/footer" + consolidated + "/policy", local_pkg_name, limit=limit) - # fmgr_getter.update_config_with_fortinet_api_call( - # nativeConfig['rules_global_footer_v6'], sid, fm_api_url, "/pm/config/global/pkg/" + global_pkg_name + "/global/footer" + consolidated + "/policy6", local_pkg_name, limit=limit) - - ### now dealing with hitcounts - - # get hitcount task number - hitcount_payload = { - "params": [ - { - "data": { - "adom": adom_name, - "pkg": local_pkg_name - } - } - ] - } - hitcount_task = fmgr_getter.fortinet_api_call( - sid, fm_api_url, "/sys/hitcount", payload=hitcount_payload, method="get") - time.sleep(2) - - if len(hitcount_task) == 0 or 'task' not in hitcount_task[0]: - logger.warning(f"did not get hitcount task for adom {adom_name} and package {local_pkg_name} - skipping hitcount") - return - - # execute hitcount task - hitcount_payload = { - "params": [ - { - "data": { - "taskid": hitcount_task[0]['task'] - } - } - ] - } - fmgr_getter.update_config_with_fortinet_api_call( - native_config_domain['rules_hitcount'], sid, fm_api_url, "/sys/task/result", local_pkg_name, payload=hitcount_payload, limit=limit) + previous_rulebase = get_and_link_global_rulebase( + 'header', previous_rulebase, global_pkg_name, native_config_global, sid, fm_api_url, options, limit, link_list) + + previous_rulebase = get_and_link_local_rulebase( + 'rules_adom', previous_rulebase, adom_name, local_pkg_name, native_config_adom, sid, fm_api_url, options, limit, link_list) + + previous_rulebase = get_and_link_global_rulebase( + 'footer', previous_rulebase, global_pkg_name, native_config_global, sid, fm_api_url, options, limit, link_list) + + device_config['rulebase_links'].extend(link_list) -def find_local_pkg(adom_device_vdom_policy_package_structure, adom_name, mgm_details_device): +def get_and_link_global_rulebase(header_or_footer, previous_rulebase, global_pkg_name, native_config_global, sid, fm_api_url, options, limit, link_list): + rulebase_type_prefix = 'rules_global_' + header_or_footer + if global_pkg_name != '': + if not is_rulebase_already_fetched(native_config_global['rulebases'], rulebase_type_prefix + '_v4_' + global_pkg_name): + fmgr_getter.update_config_with_fortinet_api_call( + native_config_global['rulebases'], + sid, fm_api_url, + STRING_PM_CONFIG_GLOBAL_PKG + global_pkg_name + '/global/' + header_or_footer + '/policy', + rulebase_type_prefix + '_v4_' + global_pkg_name, + options=options, limit=limit) + if not is_rulebase_already_fetched(native_config_global['rulebases'], rulebase_type_prefix + '_v6_' + global_pkg_name): + # delete_v: hier auch options=options? + fmgr_getter.update_config_with_fortinet_api_call( + native_config_global['rulebases'], + sid, fm_api_url, + STRING_PM_CONFIG_GLOBAL_PKG + global_pkg_name + '/global/' + header_or_footer + '/policy6', + rulebase_type_prefix + '_v6_' + global_pkg_name, + limit=limit) + previous_rulebase = link_rulebase(link_list, native_config_global['rulebases'], global_pkg_name, rulebase_type_prefix, previous_rulebase, True) + return previous_rulebase + +def get_and_link_local_rulebase(rulebase_type_prefix, previous_rulebase, adom_name, local_pkg_name, native_config_adom, sid, fm_api_url, options, limit, link_list): + if not is_rulebase_already_fetched(native_config_adom['rulebases'], rulebase_type_prefix + '_v4_' + local_pkg_name): + fmgr_getter.update_config_with_fortinet_api_call( + native_config_adom['rulebases'], + sid, fm_api_url, + STRING_PM_CONFIG_ADOM + adom_name + STRING_PKG + local_pkg_name + '/firewall/policy', + rulebase_type_prefix + '_v4_' + local_pkg_name, + options=options, limit=limit) + if not is_rulebase_already_fetched(native_config_adom['rulebases'], rulebase_type_prefix + '_v6_' + local_pkg_name): + fmgr_getter.update_config_with_fortinet_api_call( + native_config_adom['rulebases'], + sid, fm_api_url, + STRING_PM_CONFIG_ADOM + adom_name + STRING_PKG + local_pkg_name + '/firewall/policy6', + rulebase_type_prefix + '_v6_' + local_pkg_name, + limit=limit) + previous_rulebase = link_rulebase(link_list, native_config_adom['rulebases'], local_pkg_name, rulebase_type_prefix, previous_rulebase, False) + return previous_rulebase + +def find_packages(adom_device_vdom_policy_package_structure, adom_name, mgm_details_device): for device in adom_device_vdom_policy_package_structure[adom_name]: for vdom in adom_device_vdom_policy_package_structure[adom_name][device]: if mgm_details_device['name'] == device + '_' + vdom: - return adom_device_vdom_policy_package_structure[adom_name][device][vdom] + return adom_device_vdom_policy_package_structure[adom_name][device][vdom]['local'], adom_device_vdom_policy_package_structure[adom_name][device][vdom]['global'] raise FwoDeviceWithoutLocalPackage('Could not find local package for ' + mgm_details_device['name'] + ' in Fortimanager Config') from None -def link_v4_and_v6_rulebase(rulebases, pkg_name, previous_rulebase, is_global): - link_list = [] - has_v4_data = has_rulebase_data(rulebases, 'rules_adom_v4', pkg_name) - has_v6_data = has_rulebase_data(rulebases, 'rules_adom_v6', pkg_name) - - if has_v4_data: - link_list.append(build_link(previous_rulebase, 'rules_adom_v4' + '_' + pkg_name, is_global)) - previous_rulebase = 'rules_adom_v4' + '_' + pkg_name - if has_v6_data: - link_list.append(build_link(previous_rulebase, 'rules_adom_v6' + '_' + pkg_name, is_global)) - previous_rulebase = 'rules_adom_v6' + '_' + pkg_name +def is_rulebase_already_fetched(rulebases, type): + for rulebase in rulebases: + if rulebase['type'] == type: + return True + return False + +def link_rulebase(link_list, rulebases, pkg_name, rulebase_type_prefix, previous_rulebase, is_global): + for version in ['v4', 'v6']: + full_pkg_name = rulebase_type_prefix + '_' + version + '_' + pkg_name + has_data = has_rulebase_data(rulebases, full_pkg_name, is_global, version, pkg_name) + if has_data: + link_list.append(build_link(previous_rulebase, full_pkg_name, is_global)) + previous_rulebase = full_pkg_name - return link_list, previous_rulebase + return previous_rulebase def build_link(previous_rulebase, full_pkg_name, is_global): if previous_rulebase is None: @@ -367,243 +429,173 @@ def build_link(previous_rulebase, full_pkg_name, is_global): 'is_section': False } -def has_rulebase_data(rulebases, type_prefix, pkg_name): +def has_rulebase_data(rulebases, full_pkg_name, is_global, version, pkg_name): + """adds name and uid to rulebase and removes empty global rulebases""" has_data = False + if version == 'v4': + is_v4 = True + else: + is_v4 = False for rulebase in rulebases: - if rulebase['type'] == type_prefix + '_' + pkg_name: - rulebase.update({'name': type_prefix + '_' + pkg_name, - 'uid': type_prefix + '_' + pkg_name}) + if rulebase['type'] == full_pkg_name: + rulebase.update({'name': full_pkg_name, + 'uid': full_pkg_name, + 'is_global': is_global, + 'is_v4': is_v4, + 'package': pkg_name}) if len(rulebase['data']) > 0: has_data = True + elif is_global: + rulebases.remove(rulebase) return has_data -def getNatPolicy(sid, fm_api_url, nativeConfig, adom_name, device, limit): - scope = 'global' - pkg = device['global_rulebase_name'] - if pkg is not None and pkg != '': # only read global rulebase if it exists - for nat_type in ['central/dnat', 'central/dnat6', 'firewall/central-snat-map']: +def get_nat_policy(sid, fm_api_url, native_config, adom_device_vdom_policy_package_structure, adom_name, mgm_details_device, limit): + local_pkg_name, global_pkg_name = find_packages(adom_device_vdom_policy_package_structure, adom_name, mgm_details_device) + if adom_name == '': + for nat_type in nat_types: fmgr_getter.update_config_with_fortinet_api_call( - nativeConfig['rules_global_nat'], sid, fm_api_url, "/pm/config/" + scope + STRING_PKG + pkg + '/' + nat_type, device['local_rulebase_name'], limit=limit) - - scope = 'adom/'+adom_name - pkg = device['local_rulebase_name'] - for nat_type in ['central/dnat', 'central/dnat6', 'firewall/central-snat-map']: - fmgr_getter.update_config_with_fortinet_api_call( - nativeConfig['rules_adom_nat'], sid, fm_api_url, "/pm/config/" + scope + STRING_PKG + pkg + '/' + nat_type, device['local_rulebase_name'], limit=limit) - - -# delete_v: ab hier kann sehr viel weg, ich lasses vorerst zB für die hitcounter -def normalize_rule(rule_orig, rules, native_config, rule_table, localPkgName, rule_number, src_ref_all, dst_ref_all, normalized_config_dict): - rule: dict = {'rule_src': '', 'rule_dst': '', 'rule_svc': ''} - xlate_rule = None - # rule.update({ 'control_id': import_id}) - rule.update({ 'rulebase_name': localPkgName}) # the rulebase_name will be set to the pkg_name as there is no rulebase_name in FortiMangaer - rule.update({ 'rule_ruleid': rule_orig['policyid']}) - rule.update({ 'rule_uid': rule_orig['uuid']}) - rule.update({ 'rule_num': rule_number}) - rule.update({ 'rule_name': rule_orig.get('name', None)}) - - # parse_target_gateways(rule_orig, rule, localPkgName) - - rule.update({ 'rule_implied': False }) - rule.update({ 'rule_time': None }) - rule.update({ 'rule_type': 'access' }) - rule.update({ 'parent_rule_id': None }) - - rule.update({ 'rule_comment': rule_orig.get('comments', None)}) - if rule_orig['action']==0: - rule.update({ 'rule_action': 'Drop' }) + native_config['nat_rulebases'], sid, fm_api_url, + STRING_PM_CONFIG_GLOBAL_PKG + global_pkg_name + '/' + nat_type, + nat_type + '_global_' + global_pkg_name, limit=limit) else: - rule.update({ 'rule_action': 'Accept' }) - if 'status' in rule_orig and (rule_orig['status']=='enable' or rule_orig['status']==1): - rule.update({ 'rule_disabled': False }) - else: - rule.update({ 'rule_disabled': True }) - if rule_orig['logtraffic'] == 'disable': - rule.update({ 'rule_track': 'None'}) - else: - rule.update({ 'rule_track': 'Log'}) - - rule['rule_src'] = extend_string_list(rule['rule_src'], rule_orig, 'srcaddr', list_delimiter) - rule['rule_dst'] = extend_string_list(rule['rule_dst'], rule_orig, 'dstaddr', list_delimiter) - rule['rule_svc'] = extend_string_list(rule['rule_svc'], rule_orig, 'service', list_delimiter) - rule['rule_src'] = extend_string_list(rule['rule_src'], rule_orig, 'srcaddr6', list_delimiter) - rule['rule_dst'] = extend_string_list(rule['rule_dst'], rule_orig, 'dstaddr6', list_delimiter) - rule['rule_src'] = extend_string_list(rule['rule_src'], rule_orig, 'internet-service-src-name', list_delimiter) - - if len(rule_orig['srcintf'])>0: - src_obj_zone = add_zone_if_missing (normalized_config_dict, rule_orig['srcintf'][0]) - rule.update({ 'rule_from_zone': src_obj_zone }) # todo: currently only using the first zone - if len(rule_orig['dstintf'])>0: - dst_obj_zone = add_zone_if_missing (normalized_config_dict, rule_orig['dstintf'][0]) - rule.update({ 'rule_to_zone': dst_obj_zone }) # todo: currently only using the first zone - - if 'srcaddr-negate' in rule_orig: - rule.update({ 'rule_src_neg': rule_orig['srcaddr-negate']=='disable'}) - elif 'internet-service-src-negate' in rule_orig: - rule.update({ 'rule_src_neg': rule_orig['internet-service-src-negate']=='disable'}) - if 'dstaddr-negate' in rule_orig: - rule.update({ 'rule_dst_neg': rule_orig['dstaddr-negate']=='disable'}) - if 'service-negate' in rule_orig: - rule.update({ 'rule_svc_neg': rule_orig['service-negate']=='disable'}) - - rule.update({ 'rule_src_refs': resolve_raw_objects(rule['rule_src'], list_delimiter, native_config, 'name', 'uuid', \ - rule_type=rule_table, jwt=None, import_id=None, rule_uid=rule_orig['uuid'], object_type=NETWORK_OBJECT, mgm_id=mgm_details['id']) }) - rule.update({ 'rule_dst_refs': resolve_raw_objects(rule['rule_dst'], list_delimiter, native_config, 'name', 'uuid', \ - rule_type=rule_table, jwt=None, import_id=None, rule_uid=rule_orig['uuid'], object_type=NETWORK_OBJECT, mgm_id=mgm_details['id']) }) - rule.update({ 'rule_svc_refs': rule['rule_svc'] }) # services do not have uids, so using name instead - add_users_to_rule(rule_orig, rule) - - # new in v8.0.3: - rule.update({ 'rule_custom_fields': rule_orig.get('meta fields', None) }) - rule.update({ 'last_change_admin': rule_orig.get('_last-modified-by',None) }) - - update_hit_counters(native_config, rule_table, rule_orig, rule, localPkgName, rule_access_scope_v4, rule_access_scope_v6) - - xlate_rule = handle_combined_nat_rule(rule, rule_orig, normalized_config_dict, nat_rule_number, import_id, localPkgName, dev_id) - rules.append(rule) - if xlate_rule is not None: - rules.append(xlate_rule) - rule_number += 1 # nat rules have their own numbering - - -def update_hit_counters(native_config, rule_table, rule_orig, rule, localPkgName, rule_access_scope_v4, rule_access_scope_v6): - if rule_table in rule_access_scope_v4 and len(native_config['rules_hitcount'][localPkgName])>0: - for hitcount_config in native_config['rules_hitcount'][localPkgName][0]['firewall policy']: - if rule_orig['policyid'] == hitcount_config['last_hit']: - rule.update({ 'last_hit': time.strftime("%Y-%m-%dT%H:%M:%S%z", time.localtime(hitcount_config['policyid']))}) - elif rule_table in rule_access_scope_v6 and len(native_config['rules_hitcount'][localPkgName])>0: - for hitcount_config in native_config['rules_hitcount'][localPkgName][0]['firewall policy6']: - if rule_orig['policyid'] == hitcount_config['last_hit']: - rule.update({ 'last_hit': time.strftime("%Y-%m-%dT%H:%M:%S%z", time.localtime(hitcount_config['policyid']))}) - else: - rule.update({ 'last_hit': None}) + for nat_type in nat_types: + fmgr_getter.update_config_with_fortinet_api_call( + native_config['nat_rulebases'], sid, fm_api_url, + STRING_PM_CONFIG_ADOM + adom_name + STRING_PKG + local_pkg_name + '/' + nat_type, + nat_type + '_adom_' + adom_name + '_' + local_pkg_name, limit=limit) +# delete_v: ab hier kann sehr viel weg, ich lasses vorerst zB für die nat # pure nat rules -def normalize_nat_rules(full_config, config2import, import_id, jwt=None): + +def parse_nat_rulebase(nat_rulebase, nat_type_string, normalized_config_adom, normalized_config_global): + # this function is not called until it is ready + return nat_rules = [] rule_number = 0 + for rule_orig in nat_rulebase: + + rule_src_list, rule_src_refs_list = rule_parse_addresses(rule_orig, 'src', normalized_config_adom, normalized_config_global, True) + rule_dst_list, rule_dst_refs_list = rule_parse_addresses(rule_orig, 'dst', normalized_config_adom, normalized_config_global, True) + + rule_normalized = RuleNormalized( + rule_num=rule_number, + rule_num_numeric=0, + rule_disabled=False, + rule_src_neg=False, + rule_src=list_delimiter.join(rule_src_list), + rule_src_refs=list_delimiter.join(rule_src_refs_list), + rule_dst_neg=False, + rule_dst=list_delimiter.join(rule_dst_list), + rule_dst_refs=list_delimiter.join(rule_dst_refs_list), + rule_svc_neg=False, + rule_svc=list_delimiter.join(rule_svc_list), + rule_svc_refs=list_delimiter.join(rule_svc_refs_list), + rule_action=RuleAction.DROP, + rule_track=RuleTrack.NONE, + rule_installon=nat_type_string, + rule_time='', # Time-based rules not commonly used in basic Fortinet configs + rule_name=rule_orig.get('name', ''), + rule_uid=rule_orig.get('uuid'), + rule_custom_fields=str({}), + rule_implied=False, + rule_type=RuleType.NAT, + last_change_admin=native_rule.get('_last-modified-by', ''), + parent_rule_uid=None, + last_hit=last_hit, + rule_comment=native_rule.get('comments'), + rule_src_zone=list_delimiter.join(rule_src_zones), + rule_dst_zone=list_delimiter.join(rule_dst_zones), + rule_head_text=None + ) + - for rule_table in rule_nat_scope: - for localPkgName in full_config['rules_global_nat']: - for rule_orig in full_config[rule_table][localPkgName]: - rule = {'rule_src': '', 'rule_dst': '', 'rule_svc': ''} - if rule_orig['nat'] == 1: # assuming source nat - rule.update({ 'control_id': import_id}) - rule.update({ 'rulebase_name': localPkgName}) # the rulebase_name just has to be a unique string among devices - rule.update({ 'rule_ruleid': rule_orig['policyid']}) - rule.update({ 'rule_uid': rule_orig['uuid']}) - # rule.update({ 'rule_num': rule_orig['obj seq']}) - rule.update({ 'rule_num': rule_number }) - if 'comments' in rule_orig: - rule.update({ 'rule_comment': rule_orig['comments']}) - rule.update({ 'rule_action': 'Drop' }) # not used for nat rules - rule.update({ 'rule_track': 'None'}) # not used for nat rules - - rule['rule_src'] = extend_string_list(rule['rule_src'], rule_orig, 'orig-addr', list_delimiter, jwt=jwt, import_id=import_id) - rule['rule_dst'] = extend_string_list(rule['rule_dst'], rule_orig, 'dst-addr', list_delimiter, jwt=jwt, import_id=import_id) - - if rule_orig['protocol']==17: - svc_name = 'udp_' + str(rule_orig['orig-port']) - elif rule_orig['protocol']==6: - svc_name = 'tcp_' + str(rule_orig['orig-port']) - else: - svc_name = 'svc_' + str(rule_orig['orig-port']) - # need to create a helper service object and add it to the nat rule, also needs to be added to service list - - if not 'service_objects' in config2import: # is normally defined - config2import['service_objects'] = [] - config2import['service_objects'].append(create_svc_object( \ - import_id=import_id, name=svc_name, proto=rule_orig['protocol'], port=rule_orig['orig-port'], comment='service created by FWO importer for NAT purposes')) - rule['rule_svc'] = svc_name - - #rule['rule_src'] = extend_string_list(rule['rule_src'], rule_orig, 'srcaddr6', list_delimiter, jwt=jwt, import_id=import_id) - #rule['rule_dst'] = extend_string_list(rule['rule_dst'], rule_orig, 'dstaddr6', list_delimiter, jwt=jwt, import_id=import_id) - - if len(rule_orig['srcintf'])>0: - rule.update({ 'rule_from_zone': rule_orig['srcintf'][0] }) # todo: currently only using the first zone - if len(rule_orig['dstintf'])>0: - rule.update({ 'rule_to_zone': rule_orig['dstintf'][0] }) # todo: currently only using the first zone - - rule.update({ 'rule_src_neg': False}) - rule.update({ 'rule_dst_neg': False}) - rule.update({ 'rule_svc_neg': False}) - rule.update({ 'rule_src_refs': resolve_raw_objects(rule['rule_src'], list_delimiter, full_config, 'name', 'uuid', rule_type=rule_table) }, \ - jwt=jwt, import_id=import_id, rule_uid=rule_orig['uuid'], object_type=NETWORK_OBJECT) - rule.update({ 'rule_dst_refs': resolve_raw_objects(rule['rule_dst'], list_delimiter, full_config, 'name', 'uuid', rule_type=rule_table) }, \ - jwt=jwt, import_id=import_id, rule_uid=rule_orig['uuid'], object_type=NETWORK_OBJECT) - # services do not have uids, so using name instead - rule.update({ 'rule_svc_refs': rule['rule_svc'] }) - rule.update({ 'rule_type': 'original' }) - rule.update({ 'rule_installon': localPkgName }) - if 'status' in rule_orig and (rule_orig['status']=='enable' or rule_orig['status']==1): - rule.update({ 'rule_disabled': False }) - else: - rule.update({ 'rule_disabled': True }) - rule.update({ 'rule_implied': False }) - rule.update({ 'rule_time': None }) - rule.update({ 'parent_rule_id': None }) - - nat_rules.append(rule) - add_users_to_rule(rule_orig, rule) - - ############## now adding the xlate rule part ########################## - xlate_rule = dict(rule) # copy the original (match) rule - xlate_rule.update({'rule_src': '', 'rule_dst': '', 'rule_svc': ''}) - xlate_rule['rule_src'] = extend_string_list(xlate_rule['rule_src'], rule_orig, 'orig-addr', list_delimiter, jwt=jwt, import_id=import_id) - xlate_rule['rule_dst'] = 'Original' - - if rule_orig['protocol']==17: - svc_name = 'udp_' + str(rule_orig['nat-port']) - elif rule_orig['protocol']==6: - svc_name = 'tcp_' + str(rule_orig['nat-port']) - else: - svc_name = 'svc_' + str(rule_orig['nat-port']) - # need to create a helper service object and add it to the nat rule, also needs to be added to service list! - # fmgr_service.create_svc_object(name=svc_name, proto=rule_orig['protocol'], port=rule_orig['orig-port'], comment='service created by FWO importer for NAT purposes') - config2import['service_objects'].append(create_svc_object(import_id=import_id, name=svc_name, proto=rule_orig['protocol'], port=rule_orig['nat-port'], comment='service created by FWO importer for NAT purposes')) - xlate_rule['rule_svc'] = svc_name - - xlate_rule.update({ 'rule_src_refs': resolve_objects(xlate_rule['rule_src'], list_delimiter, full_config, 'name', 'uuid', rule_type=rule_table, jwt=jwt, import_id=import_id ) }) - xlate_rule.update({ 'rule_dst_refs': resolve_objects(xlate_rule['rule_dst'], list_delimiter, full_config, 'name', 'uuid', rule_type=rule_table, jwt=jwt, import_id=import_id ) }) - xlate_rule.update({ 'rule_svc_refs': xlate_rule['rule_svc'] }) # services do not have uids, so using name instead - - xlate_rule.update({ 'rule_type': 'xlate' }) - - nat_rules.append(xlate_rule) - rule_number += 1 - config2import['rules'].extend(nat_rules) - - -def insert_header(rules, import_id, header_text, rulebase_name, rule_uid, rule_number, src_refs, dst_refs): - rule = { - "control_id": import_id, - "rule_head_text": header_text, - "rulebase_name": rulebase_name, - "rule_ruleid": None, - "rule_uid": rule_uid + rulebase_name, - "rule_num": rule_number, - "rule_disabled": False, - "rule_src": "all", - "rule_dst": "all", - "rule_svc": "ALL", - "rule_src_neg": False, - "rule_dst_neg": False, - "rule_svc_neg": False, - "rule_src_refs": src_refs, - "rule_dst_refs": dst_refs, - "rule_svc_refs": "ALL", - "rule_action": "Accept", - "rule_track": "None", - "rule_installon": None, - "rule_time": None, - "rule_type": "access", - "parent_rule_id": None, - "rule_implied": False, - "rule_comment": None - } - rules.append(rule) + # for rule_table in rule_nat_scope: + # for localPkgName in native_config['rules_global_nat']: + for rule_orig in nat_rulebase: + rule = {'rule_src': '', 'rule_dst': '', 'rule_svc': ''} + if rule_orig['nat'] == 1: # assuming source nat + rule.update({ 'rule_ruleid': rule_orig['policyid']}) + rule.update({ 'rule_uid': rule_orig['uuid']}) + # rule.update({ 'rule_num': rule_orig['obj seq']}) + rule.update({ 'rule_num': rule_number }) + if 'comments' in rule_orig: + rule.update({ 'rule_comment': rule_orig['comments']}) + rule.update({ 'rule_action': 'Drop' }) # not used for nat rules + rule.update({ 'rule_track': 'None'}) # not used for nat rules + + rule['rule_src'] = extend_string_list(rule['rule_src'], rule_orig, 'orig-addr', list_delimiter, jwt=jwt, import_id=import_id) + rule['rule_dst'] = extend_string_list(rule['rule_dst'], rule_orig, 'dst-addr', list_delimiter, jwt=jwt, import_id=import_id) + + if rule_orig['protocol']==17: + svc_name = 'udp_' + str(rule_orig['orig-port']) + elif rule_orig['protocol']==6: + svc_name = 'tcp_' + str(rule_orig['orig-port']) + else: + svc_name = 'svc_' + str(rule_orig['orig-port']) + # need to create a helper service object and add it to the nat rule, also needs to be added to service list + + if not 'service_objects' in normalized_config_adom: # is normally defined + normalized_config_adom['service_objects'] = [] + normalized_config_adom['service_objects'].append(create_svc_object( \ + import_id=import_id, name=svc_name, proto=rule_orig['protocol'], port=rule_orig['orig-port'], comment='service created by FWO importer for NAT purposes')) + rule['rule_svc'] = svc_name + + #rule['rule_src'] = extend_string_list(rule['rule_src'], rule_orig, 'srcaddr6', list_delimiter, jwt=jwt, import_id=import_id) + #rule['rule_dst'] = extend_string_list(rule['rule_dst'], rule_orig, 'dstaddr6', list_delimiter, jwt=jwt, import_id=import_id) + + if len(rule_orig['srcintf'])>0: + rule.update({ 'rule_from_zone': rule_orig['srcintf'][0] }) # todo: currently only using the first zone + if len(rule_orig['dstintf'])>0: + rule.update({ 'rule_to_zone': rule_orig['dstintf'][0] }) # todo: currently only using the first zone + + rule.update({ 'rule_src_neg': False}) + rule.update({ 'rule_dst_neg': False}) + rule.update({ 'rule_svc_neg': False}) + rule.update({ 'rule_src_refs': resolve_raw_objects(rule['rule_src'], list_delimiter, native_config, 'name', 'uuid', rule_type=rule_table) }, \ + jwt=jwt, import_id=import_id, rule_uid=rule_orig['uuid'], object_type=NETWORK_OBJECT) + rule.update({ 'rule_dst_refs': resolve_raw_objects(rule['rule_dst'], list_delimiter, native_config, 'name', 'uuid', rule_type=rule_table) }, \ + jwt=jwt, import_id=import_id, rule_uid=rule_orig['uuid'], object_type=NETWORK_OBJECT) + # services do not have uids, so using name instead + rule.update({ 'rule_svc_refs': rule['rule_svc'] }) + rule.update({ 'rule_type': 'original' }) + rule.update({ 'rule_installon': localPkgName }) + if 'status' in rule_orig and (rule_orig['status']=='enable' or rule_orig['status']==1): + rule.update({ 'rule_disabled': False }) + else: + rule.update({ 'rule_disabled': True }) + rule.update({ 'rule_implied': False }) + rule.update({ 'rule_time': None }) + rule.update({ 'parent_rule_id': None }) + + nat_rules.append(rule) + add_users_to_rule(rule_orig, rule) + + ############## now adding the xlate rule part ########################## + xlate_rule = dict(rule) # copy the original (match) rule + xlate_rule.update({'rule_src': '', 'rule_dst': '', 'rule_svc': ''}) + xlate_rule['rule_src'] = extend_string_list(xlate_rule['rule_src'], rule_orig, 'orig-addr', list_delimiter, jwt=jwt, import_id=import_id) + xlate_rule['rule_dst'] = 'Original' + + if rule_orig['protocol']==17: + svc_name = 'udp_' + str(rule_orig['nat-port']) + elif rule_orig['protocol']==6: + svc_name = 'tcp_' + str(rule_orig['nat-port']) + else: + svc_name = 'svc_' + str(rule_orig['nat-port']) + # need to create a helper service object and add it to the nat rule, also needs to be added to service list! + # fmgr_service.create_svc_object(name=svc_name, proto=rule_orig['protocol'], port=rule_orig['orig-port'], comment='service created by FWO importer for NAT purposes') + normalized_config_adom['service_objects'].append(create_svc_object(import_id=import_id, name=svc_name, proto=rule_orig['protocol'], port=rule_orig['nat-port'], comment='service created by FWO importer for NAT purposes')) + xlate_rule['rule_svc'] = svc_name + xlate_rule.update({ 'rule_src_refs': resolve_objects(xlate_rule['rule_src'], list_delimiter, native_config, 'name', 'uuid', rule_type=rule_table, jwt=jwt, import_id=import_id ) }) + xlate_rule.update({ 'rule_dst_refs': resolve_objects(xlate_rule['rule_dst'], list_delimiter, native_config, 'name', 'uuid', rule_type=rule_table, jwt=jwt, import_id=import_id ) }) + xlate_rule.update({ 'rule_svc_refs': xlate_rule['rule_svc'] }) # services do not have uids, so using name instead + + xlate_rule.update({ 'rule_type': 'xlate' }) + + nat_rules.append(xlate_rule) + rule_number += 1 + normalized_config_adom['rules'].extend(nat_rules) def create_xlate_rule(rule): xlate_rule = copy.deepcopy(rule) @@ -649,7 +641,7 @@ def handle_combined_nat_rule(rule, rule_orig, config2import, nat_rule_number, im interface_name = matching_route.interface hideInterface=interface_name if hideInterface is None: - logger.warning('src nat behind interface: found route with undefined interface ' + str(jsonpickle.dumps(matching_route, unpicklable=True))) + logger.warning('src nat behind interface: found route with undefined interface ') #+ str(jsonpickle.dumps(matching_route, unpicklable=True))) if destination_interface_ip is None: logger.warning('src nat behind interface: found no matching interface IP in rule with UID ' + rule['rule_uid'] + ', dest_ip: ' + destination_ip) @@ -718,37 +710,6 @@ def handle_combined_nat_rule(rule, rule_orig, config2import, nat_rule_number, im return xlate_rule - -def insert_headers(rule_table, first_v6, first_v4, full_config, rules, import_id, localPkgName,src_ref_all,dst_ref_all,rule_number): - if rule_table in rule_access_scope_v6 and first_v6: - insert_header(rules, import_id, "IPv6 rules", localPkgName, "IPv6HeaderText", rule_number, src_ref_all, dst_ref_all) - rule_number += 1 - first_v6 = False - elif rule_table in rule_access_scope_v4 and first_v4: - insert_header(rules, import_id, "IPv4 rules", localPkgName, "IPv4HeaderText", rule_number, src_ref_all, dst_ref_all) - rule_number += 1 - first_v4 = False - if rule_table == 'rules_adom_v4' and len(full_config['rules_adom_v4'][localPkgName])>0: - insert_header(rules, import_id, "Adom Rules IPv4", localPkgName, "IPv4AdomRules", rule_number, src_ref_all, dst_ref_all) - rule_number += 1 - elif rule_table == 'rules_adom_v6' and len(full_config['rules_adom_v6'][localPkgName])>0: - insert_header(rules, import_id, "Adom Rules IPv6", localPkgName, "IPv6AdomRules", rule_number, src_ref_all, dst_ref_all) - rule_number += 1 - elif rule_table == 'rules_global_header_v4' and len(full_config['rules_global_header_v4'][localPkgName])>0: - insert_header(rules, import_id, "Global Header Rules IPv4", localPkgName, "IPv4GlobalHeaderRules", rule_number, src_ref_all, dst_ref_all) - rule_number += 1 - elif rule_table == 'rules_global_header_v6' and len(full_config['rules_global_header_v6'][localPkgName])>0: - insert_header(rules, import_id, "Global Header Rules IPv6", localPkgName, "IPv6GlobalHeaderRules", rule_number, src_ref_all, dst_ref_all) - rule_number += 1 - elif rule_table == 'rules_global_footer_v4' and len(full_config['rules_global_footer_v4'][localPkgName])>0: - insert_header(rules, import_id, "Global Footer Rules IPv4", localPkgName, "IPv4GlobalFooterRules", rule_number, src_ref_all, dst_ref_all) - rule_number += 1 - elif rule_table == 'rules_global_footer_v6' and len(full_config['rules_global_footer_v6'][localPkgName])>0: - insert_header(rules, import_id, "Global Footer Rules IPv6", localPkgName, "IPv6GlobalFooterRules", rule_number, src_ref_all, dst_ref_all) - rule_number += 1 - return rule_number, first_v4, first_v6 - - def extract_nat_objects(nwobj_list, all_nwobjects): nat_obj_list = [] for obj in nwobj_list: diff --git a/roles/importer/files/importer/fortiadom5ff/fmgr_service.py b/roles/importer/files/importer/fortiadom5ff/fmgr_service.py index 8d806765c..7a6fcf567 100644 --- a/roles/importer/files/importer/fortiadom5ff/fmgr_service.py +++ b/roles/importer/files/importer/fortiadom5ff/fmgr_service.py @@ -4,10 +4,8 @@ from fwo_log import getFwoLogger from typing import Any -def normalize_service_objects(import_state: ImportStateController, native_config, native_config_global, normalized_config, - normalized_config_global, svc_obj_types): +def normalize_service_objects(native_config, normalized_config_adom, svc_obj_types): svc_objects = [] - logger = getFwoLogger() if 'objects' not in native_config: return # no objects to normalize @@ -23,7 +21,7 @@ def normalize_service_objects(import_state: ImportStateController, native_config svc_objects.append(create_svc_object(name=original_obj_name, proto=0, color='foreground', port=None,\ comment='"original" service object created by FWO importer for NAT purposes')) - normalized_config.update({'service_objects': svc_objects}) + normalized_config_adom.update({'service_objects': svc_objects}) def normalize_service_object(obj_orig, svc_objects): member_names = '' @@ -119,7 +117,7 @@ def check_split(obj_orig) -> bool: return (count > 1) -def extractPorts(port_ranges) -> tuple[list[Any], list[Any]]: +def extract_ports(port_ranges) -> 'tuple[list[Any], list[Any]]': ports = [] port_ends = [] if port_ranges is not None and len(port_ranges) > 0: @@ -150,7 +148,7 @@ def extractPorts(port_ranges) -> tuple[list[Any], list[Any]]: return ports, port_ends -def create_svc_object(name, proto, color, port, comment) -> dict[str, Any]: +def create_svc_object(name, proto, color, port, comment) -> 'dict[str, Any]': return { 'svc_name': name, 'svc_typ': 'simple', @@ -179,7 +177,7 @@ def add_object(svc_objects, type, name, color, proto, port_ranges, member_names, }]) else: range_names = '' - ports, port_ends = extractPorts(port_ranges) + ports, port_ends = extract_ports(port_ranges) split = (len(ports) > 1) for index, port in enumerate(ports): port_end = port_ends[index] diff --git a/roles/importer/files/importer/fortiadom5ff/fmgr_zone.py b/roles/importer/files/importer/fortiadom5ff/fmgr_zone.py index 1335efcc2..08fed159d 100644 --- a/roles/importer/files/importer/fortiadom5ff/fmgr_zone.py +++ b/roles/importer/files/importer/fortiadom5ff/fmgr_zone.py @@ -1,28 +1,58 @@ +import fmgr_getter +from fwo_exceptions import FwoNormalizedConfigParseError -def normalize_zones(full_config, config2import, import_id): +def get_zones(sid, fm_api_url, native_config, adom_name, limit): + + if adom_name == '': + fmgr_getter.update_config_with_fortinet_api_call( + native_config['zones'], sid, fm_api_url, '/pm/config/global/obj/dynamic/interface', 'interface_global', limit=limit) + else: + fmgr_getter.update_config_with_fortinet_api_call( + native_config['zones'], sid, fm_api_url, '/pm/config/adom/' + adom_name + '/obj/dynamic/interface', 'interface_' + adom_name, limit=limit) + +def normalize_zones(native_config, normalized_config_adom, is_global_loop_iteration): zones = [] - for orig_zone in full_config['zone_objects']['zone_list']: - zone = {} - zone.update({'zone_name': orig_zone}) - zones.append(zone) - - config2import.update({'zone_objects': zones}) + fetched_zones = [] + if is_global_loop_iteration: # can not find the following zones in api return + fetched_zones.append('any') + fetched_zones.append('sslvpn_tun_intf') + for zone_type in native_config['zones']: + for mapping in zone_type.get('data', []): + if 'defmap-intf' in mapping and not mapping['defmap-intf'] in fetched_zones: + fetched_zones.append(mapping['defmap-intf']) + if not mapping['dynamic_mapping'] is None: + fetch_dynamic_mapping(mapping, fetched_zones) + if not mapping['platform_mapping'] is None: + fetch_platform_mapping(mapping, fetched_zones) + + for zone in fetched_zones: + zones.append({'zone_name': zone}) + normalized_config_adom.update({'zone_objects': zones}) +def fetch_dynamic_mapping(mapping, fetched_zones): + for dyn_mapping in mapping['dynamic_mapping']: + if 'name' in dyn_mapping and not dyn_mapping['name'] in fetched_zones: + fetched_zones.append(dyn_mapping['name']) + if 'local-intf' in dyn_mapping: + for local_interface in dyn_mapping['local-intf']: + if local_interface not in fetched_zones: + fetched_zones.append(local_interface) -def add_zone_if_missing(normalized_config_dict: dict, zone_string): - # adding zone if it not yet exists +def fetch_platform_mapping(mapping, fetched_zones): + for dyn_mapping in mapping['platform_mapping']: + if 'intf-zone' in dyn_mapping and not dyn_mapping['intf-zone'] in fetched_zones: + fetched_zones.append(dyn_mapping['intf-zone']) - # also transforming any into global (normalized global zone) - if zone_string == 'any': - zone_string = 'global' - if zone_string is not None: - if 'zone_objects' not in normalized_config_dict: # no zones yet? add empty zone_objects array - normalized_config_dict.update({'zone_objects': []}) - zone_exists = False - for zone in normalized_config_dict['zone_objects']: - if zone_string == zone['zone_name']: - zone_exists = True - if not zone_exists: - normalized_config_dict['zone_objects'].append({'zone_name': zone_string}) - return zone_string - \ No newline at end of file +def find_zones_in_normalized_config(native_zone_list : list, normalized_config_adom, normalized_config_global): + """Verifies that input zones exist in normalized config""" + zone_out_list = [] + for nativ_zone in native_zone_list: + was_zone_found = False + for normalized_zone in normalized_config_adom['zone_objects'] + normalized_config_global['zone_objects']: + if nativ_zone == normalized_zone['zone_name']: + zone_out_list.append(normalized_zone['zone_name']) + was_zone_found = True + break + if not was_zone_found: + raise FwoNormalizedConfigParseError('Could not find zone ' + nativ_zone + ' in normalized config.') + return zone_out_list diff --git a/roles/importer/files/importer/fortiadom5ff/fwcommon.py b/roles/importer/files/importer/fortiadom5ff/fwcommon.py index 22e664d04..e642fd62b 100644 --- a/roles/importer/files/importer/fortiadom5ff/fwcommon.py +++ b/roles/importer/files/importer/fortiadom5ff/fwcommon.py @@ -1,8 +1,5 @@ -from curses import raw from typing import Any - -import json import fwo_const from copy import deepcopy from model_controllers.import_state_controller import ImportStateController @@ -11,16 +8,16 @@ import fmgr_getter from fwo_log import getFwoLogger from fmgr_gw_networking import getInterfacesAndRouting, normalize_network_data -from model_controllers.route_controller import get_ip_of_interface_obj from model_controllers.fwconfigmanagerlist_controller import FwConfigManagerListController from model_controllers.fwconfig_normalized_controller import FwConfigNormalizedController from models.fwconfigmanager import FwConfigManager from model_controllers.management_controller import ManagementController from fmgr_network import normalize_network_objects from fmgr_service import normalize_service_objects -from fmgr_rule import normalize_rulebases, get_access_policy +from fmgr_rule import normalize_rulebases, get_access_policy, get_nat_policy from fmgr_consts import nw_obj_types, svc_obj_types, user_obj_types from fwo_base import ConfigAction +from fmgr_zone import get_zones, normalize_zones from models.fwconfig_normalized import FwConfigNormalized @@ -48,11 +45,10 @@ def get_config(config_in: FwConfigManagerListController, importState: ImportStat # delete_v: das geht schief für unschöne adoms arbitrary_vdom_for_updateable_objects = get_arbitrary_vdom(adom_device_vdom_structure) adom_device_vdom_policy_package_structure = add_policy_package_to_vdoms(adom_device_vdom_structure, sid, fm_api_url) - # adom_device_vdom_policy_package_structure = {adom: {device: {vdom1: pol_pkg1}, {vdom2: pol_pkg2}}} - global_packages = get_global_packages(sid, fm_api_url, adom_device_vdom_policy_package_structure) # get global get_objects(sid, fm_api_url, native_config_global, native_config_global, '', limit, nw_obj_types, svc_obj_types, 'global', arbitrary_vdom_for_updateable_objects) + get_zones(sid, fm_api_url, native_config_global, '', limit) for adom in adom_list: adom_name = adom.DomainName @@ -62,7 +58,7 @@ def get_config(config_in: FwConfigManagerListController, importState: ImportStat adom_scope = 'adom/'+adom_name get_objects(sid, fm_api_url, native_config_adom, native_config_global, adom_name, limit, nw_obj_types, svc_obj_types, adom_scope, arbitrary_vdom_for_updateable_objects) # currently reading zone from objects/rules for backward compat with FortiManager 6.x - # getZones(sid, fm_api_url, full_config, adom_name, limit, debug_level) + get_zones(sid, fm_api_url, native_config_adom, adom_name, limit) # todo: bring interfaces and routing in new domain native config format #getInterfacesAndRouting( @@ -72,10 +68,9 @@ def get_config(config_in: FwConfigManagerListController, importState: ImportStat device_config = initialize_device_config(mgm_details_device) native_config_adom['gateways'].append(device_config) get_access_policy( - sid, fm_api_url, native_config_adom, adom_device_vdom_policy_package_structure, adom_name, mgm_details_device, device_config, limit) - # delete_v: nat später - #fmgr_rule.getNatPolicy( - # sid, fm_api_url, nativeConfig, adom_name, mgm_details_device, limit) + sid, fm_api_url, native_config_adom, native_config_global, adom_device_vdom_policy_package_structure, adom_name, mgm_details_device, device_config, limit) + get_nat_policy( + sid, fm_api_url, native_config_adom, adom_device_vdom_policy_package_structure, adom_name, mgm_details_device, limit) try: # logout of fortimanager API fmgr_getter.logout( @@ -100,7 +95,7 @@ def initialize_native_config_domain(mgm_details : ManagementController): 'objects': [], 'rulebases': [], 'nat_rulebases': [], - 'rules_hitcount': [], + 'zones': [], 'gateways': []} def get_arbitrary_vdom(adom_device_vdom_structure): @@ -110,37 +105,37 @@ def get_arbitrary_vdom(adom_device_vdom_structure): return {'adom': adom, 'device': device, 'vdom': vdom} -def normalize_config(import_state, native_config: dict[str,Any]) -> FwConfigManagerListController: +def normalize_config(import_state, native_config: 'dict[str,Any]') -> FwConfigManagerListController: manager_list = FwConfigManagerListController() - native_config_global = {} # TODO: implement reading for FortiManager 5ff - normalized_config_global = {} # TODO: implement reading for FortiManager 5ff - if 'domains' not in native_config: - logger = getFwoLogger() - logger.error("No domains found in native config. Cannot normalize config.") raise ImportInterruption("No domains found in native config. Cannot normalize config.") rewrite_native_config_obj_type_as_key(native_config) # for easier accessability of objects in normalization process + native_config_global = {} + normalized_config_global = {} + for native_conf in native_config['domains']: - normalized_config_dict = deepcopy(fwo_const.emptyNormalizedFwConfigJsonDict) + normalized_config_adom = deepcopy(fwo_const.emptyNormalizedFwConfigJsonDict) + is_global_loop_iteration = False - # delete_v: is_global_loop_iteration scheint immer False zu sein, kann dann weg - normalize_single_manager_config(native_conf, native_config_global, normalized_config_dict, normalized_config_global, - import_state, is_global_loop_iteration=False) - if native_conf['is-super-manager']: - normalized_config_global = deepcopy(normalized_config_dict) + native_config_global = native_conf + normalized_config_global = normalized_config_adom + is_global_loop_iteration = True + + normalize_single_manager_config(native_conf, native_config_global, normalized_config_adom, normalized_config_global, + import_state, is_global_loop_iteration) normalized_config = FwConfigNormalized( action=ConfigAction.INSERT, - network_objects=FwConfigNormalizedController.convertListToDict(normalized_config_dict.get('network_objects', []), 'obj_uid'), - service_objects=FwConfigNormalizedController.convertListToDict(normalized_config_dict.get('service_objects', []), 'svc_uid'), - zone_objects=FwConfigNormalizedController.convertListToDict(normalized_config_dict.get('zone_objects', []), 'zone_name'), - rulebases=normalized_config_dict.get('policies', []), - gateways=normalized_config_dict.get('gateways', []) + network_objects=FwConfigNormalizedController.convertListToDict(normalized_config_adom.get('network_objects', []), 'obj_uid'), + service_objects=FwConfigNormalizedController.convertListToDict(normalized_config_adom.get('service_objects', []), 'svc_uid'), + zone_objects=FwConfigNormalizedController.convertListToDict(normalized_config_adom.get('zone_objects', []), 'zone_name'), + rulebases=normalized_config_adom.get('policies', []), + gateways=normalized_config_adom.get('gateways', []) ) # TODO: identify the correct manager @@ -174,7 +169,7 @@ def rewrite_native_config_obj_type_as_key(native_config): domain['objects'] = obj_dict -def normalize_single_manager_config(native_config: dict[str, Any], native_config_global: dict[str, Any], normalized_config_dict: dict, +def normalize_single_manager_config(native_config: 'dict[str, Any]', native_config_global: 'dict[str, Any]', normalized_config_adom: dict, normalized_config_global: dict, import_state: ImportStateController, is_global_loop_iteration: bool): @@ -189,20 +184,19 @@ def normalize_single_manager_config(native_config: dict[str, Any], native_config current_svc_obj_types = [f"svc_obj_adom/{native_config.get('domain_name','')}_{t}" for t in current_svc_obj_types] logger = getFwoLogger() - normalize_network_objects(import_state, native_config, native_config_global, normalized_config_dict, normalized_config_global, + normalize_zones(native_config, normalized_config_adom, is_global_loop_iteration) + logger.info("completed normalizing zones for manager: " + native_config.get('domain_name','')) + normalize_network_objects(native_config, normalized_config_adom, normalized_config_global, current_nw_obj_types) logger.info("completed normalizing network objects for manager: " + native_config.get('domain_name','')) - normalize_service_objects(import_state, native_config, native_config_global, normalized_config_dict, normalized_config_global, - current_svc_obj_types) + normalize_service_objects(native_config, normalized_config_adom, current_svc_obj_types) logger.info("completed normalizing service objects for manager: " + native_config.get('domain_name','')) - #fmgr_gateway.normalizeGateways(native_conf, import_state, normalized_config_dict) - mgm_uid = native_config["management_uid"] - normalize_rulebases(import_state, mgm_uid, native_config, native_config_global, normalized_config_dict, normalized_config_global, + normalize_rulebases(mgm_uid, native_config, native_config_global, normalized_config_adom, normalized_config_global, is_global_loop_iteration) logger.info("completed normalizing rulebases for manager: " + native_config.get('domain_name','')) - normalize_gateways(native_config, normalized_config_dict) + normalize_gateways(native_config, normalized_config_adom) def build_adom_list(importState : ImportStateController): @@ -224,10 +218,11 @@ def build_adom_device_vdom_structure(adom_list, sid, fm_api_url) -> dict: def add_policy_package_to_vdoms(adom_device_vdom_structure, sid, fm_api_url): adom_device_vdom_policy_package_structure = deepcopy(adom_device_vdom_structure) for adom in adom_device_vdom_policy_package_structure: - policy_packages_result = fmgr_getter.get_policy_packages_from_manager(sid, fm_api_url, adom=adom) + policy_packages_result = fmgr_getter.fortinet_api_call(sid, fm_api_url, '/pm/pkg/adom/' + adom) for policy_package in policy_packages_result: if 'scope member' in policy_package: parse_policy_package(policy_package, adom_device_vdom_policy_package_structure, adom) + add_global_policy_package_to_vdom(adom_device_vdom_policy_package_structure, sid, fm_api_url, adom) return adom_device_vdom_policy_package_structure def parse_policy_package(policy_package, adom_device_vdom_policy_package_structure, adom): @@ -236,17 +231,43 @@ def parse_policy_package(policy_package, adom_device_vdom_policy_package_structu if device == scope_member['name']: for vdom in adom_device_vdom_policy_package_structure[adom][device]: if vdom == scope_member['vdom']: - adom_device_vdom_policy_package_structure[adom][device].update({vdom: policy_package['name']}) - -def get_global_packages(sid, fm_api_url, adom_device_vdom_policy_package_structure) -> list: - global_packages = [] - global_packages_results = fmgr_getter.get_policy_packages_from_manager(sid, fm_api_url, adom='') - for policy_package in global_packages_results: - if 'scope member' in policy_package: - for adom in adom_device_vdom_policy_package_structure: - if policy_package['scope member'] == adom: - global_packages.append(policy_package['name']) - return global_packages + adom_device_vdom_policy_package_structure[adom][device].update({vdom: {'local': policy_package['name'], 'global': ''}}) + +def add_global_policy_package_to_vdom(adom_device_vdom_policy_package_structure, sid, fm_api_url, adom): + global_assignment_result = fmgr_getter.fortinet_api_call(sid, fm_api_url, '/pm/config/adom/' + adom + '/_adom/options') + for global_assignment in global_assignment_result: + if global_assignment['assign_excluded'] == 0 and global_assignment['specify_assign_pkg_list'] == 0: + assign_case_all(adom_device_vdom_policy_package_structure, adom, global_assignment) + elif global_assignment['assign_excluded'] == 0 and global_assignment['specify_assign_pkg_list'] == 1: + assign_case_include(adom_device_vdom_policy_package_structure, adom, global_assignment) + elif global_assignment['assign_excluded'] == 1 and global_assignment['specify_assign_pkg_list'] == 1: + assign_case_exclude(adom_device_vdom_policy_package_structure, adom, global_assignment) + else: + raise ImportInterruption('Broken global assign format.') + +def assign_case_all(adom_device_vdom_policy_package_structure, adom, global_assignment): + for device in adom_device_vdom_policy_package_structure[adom]: + for vdom in adom_device_vdom_policy_package_structure[adom][device]: + adom_device_vdom_policy_package_structure[adom][device][vdom]['global'] = global_assignment['assign_name'] + +def assign_case_include(adom_device_vdom_policy_package_structure, adom, global_assignment): + for device in adom_device_vdom_policy_package_structure[adom]: + for vdom in adom_device_vdom_policy_package_structure[adom][device]: + match_assign_and_vdom_policy_package(global_assignment, adom_device_vdom_policy_package_structure[adom][device][vdom], True) + +def assign_case_exclude(adom_device_vdom_policy_package_structure, adom, global_assignment): + for device in adom_device_vdom_policy_package_structure[adom]: + for vdom in adom_device_vdom_policy_package_structure[adom][device]: + match_assign_and_vdom_policy_package(global_assignment, adom_device_vdom_policy_package_structure[adom][device][vdom], False) + +def match_assign_and_vdom_policy_package(global_assignment, vdom_structure, is_include): + for package in global_assignment['pkg list']: + if is_include: + if package['name'] == vdom_structure['local']: + vdom_structure['global'] = global_assignment['assign_name'] + else: + if package['name'] != vdom_structure['local']: + vdom_structure['global'] = global_assignment['assign_name'] def initialize_device_config(mgm_details_device): device_config = {'name': mgm_details_device['name'], @@ -309,7 +330,7 @@ def get_objects(sid, fm_api_url, native_config_domain, native_config_global, ado native_config_global['objects'], sid, fm_api_url, "sys/proxy/json", "nw_obj_global_firewall/internet-service-basic", limit=limit, payload=payload, method='exec') -def normalize_gateways(native_config, normalized_config_dict): +def normalize_gateways(native_config, normalized_config_adom): for gateway in native_config['gateways']: normalized_gateway = {} normalized_gateway['Uid'] = gateway['uid'] @@ -317,7 +338,7 @@ def normalize_gateways(native_config, normalized_config_dict): normalized_gateway['Interfaces'] = normalize_interfaces() normalized_gateway['Routing'] = normalize_routing() normalized_gateway['RulebaseLinks'] = normalize_links(gateway['rulebase_links']) - normalized_config_dict['gateways'].append(normalized_gateway) + normalized_config_adom['gateways'].append(normalized_gateway) def normalize_interfaces(): # TODO @@ -338,37 +359,3 @@ def normalize_links(rulebase_links : list): if link['from_rule_uid'] != None: link['from_rule_uid'] = None return rulebase_links - -# def getZones(sid, fm_api_url, nativeConfig, adom_name, limit, debug_level): -# nativeConfig.update({"zones": {}}) - -# # get global zones? - -# # get local zones -# for device in nativeConfig['devices']: -# local_pkg_name = device['package'] -# for adom in nativeConfig['adoms']: -# if adom['name']==adom_name: -# if local_pkg_name not in adom['package_names']: -# logger.error('local rulebase/package ' + local_pkg_name + ' not found in management ' + adom_name) -# return 1 -# else: -# fmgr_getter.update_config_with_fortinet_api_call( -# nativeConfig['zones'], sid, fm_api_url, "/pm/config/adom/" + adom_name + "/obj/dynamic/interface", device['id'], debug=debug_level, limit=limit) - -# nativeConfig['zones']['zone_list'] = [] -# for device in nativeConfig['zones']: -# for mapping in nativeConfig['zones'][device]: -# if not isinstance(mapping, str): -# if not mapping['dynamic_mapping'] is None: -# for dyn_mapping in mapping['dynamic_mapping']: -# if 'name' in dyn_mapping and not dyn_mapping['name'] in nativeConfig['zones']['zone_list']: -# nativeConfig['zones']['zone_list'].append(dyn_mapping['name']) -# if 'local-intf' in dyn_mapping and not dyn_mapping['local-intf'][0] in nativeConfig['zones']['zone_list']: -# nativeConfig['zones']['zone_list'].append(dyn_mapping['local-intf'][0]) -# if not mapping['platform_mapping'] is None: -# for dyn_mapping in mapping['platform_mapping']: -# if 'intf-zone' in dyn_mapping and not dyn_mapping['intf-zone'] in nativeConfig['zones']['zone_list']: -# nativeConfig['zones']['zone_list'].append(dyn_mapping['intf-zone']) - - diff --git a/roles/importer/files/importer/fwo_const.py b/roles/importer/files/importer/fwo_const.py index d2d657fdb..8bd2536bb 100644 --- a/roles/importer/files/importer/fwo_const.py +++ b/roles/importer/files/importer/fwo_const.py @@ -39,6 +39,7 @@ api_call_chunk_size = 1000 rule_num_numeric_steps = 1024.0 +# TODO replace rules with policies, breaks all importers except forti and cp emptyNormalizedFwConfigJsonDict = { 'network_objects': [], 'service_objects': [], diff --git a/roles/importer/files/importer/fwo_exceptions.py b/roles/importer/files/importer/fwo_exceptions.py index af180129f..625f24ec0 100644 --- a/roles/importer/files/importer/fwo_exceptions.py +++ b/roles/importer/files/importer/fwo_exceptions.py @@ -29,6 +29,13 @@ def __init__(self, message="Login to FW management failed"): self.message = message super().__init__(self.message) +class FwoNormalizedConfigParseError(Exception): + """Raised while parsing normalized config""" + + def __init__(self, message="Parsing normalized config failed"): + self.message = message + super().__init__(self.message) + class SecretDecryptionFailed(Exception): """Raised when the attempt to decrypt a secret with the given key fails"""