Skip to content

Commit 20bb984

Browse files
authored
Bugfixes, high level configuration classes. (#2)
* bug fixes * remove unused import * add missing undocumented api * implement high level tenant config * more high level config impls
1 parent b48b50c commit 20bb984

14 files changed

+645
-6
lines changed

cxone_api/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ def __init__(self):
130130

131131
class ApiDEU(CxOneApiEndpoint):
132132
def __init__(self):
133-
super().__init__("deu.checkmarx.net")
133+
super().__init__("deu.ast.checkmarx.net")
134134

135135
class ApiANZ(CxOneApiEndpoint):
136136
def __init__(self):

cxone_api/exceptions.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import re
2+
from typing import Type, Any, List
23

34
class EndpointException(BaseException):
45
pass
@@ -35,3 +36,17 @@ class ResponseException(BaseException):
3536

3637
class ScanException(BaseException):
3738
pass
39+
40+
class ConfigurationException(BaseException):
41+
42+
@staticmethod
43+
def wrong_type(value : Any, type : Type):
44+
return ConfigurationException(f"Value of [{value}] is not of type [{type}].")
45+
46+
@staticmethod
47+
def not_in_enum(value : Any, valid_values : List[str]):
48+
return ConfigurationException(f"Value of [{value}] is not one of {valid_values}.")
49+
50+
@staticmethod
51+
def read_only():
52+
return ConfigurationException("Configuration values are read-only.")
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
from typing import List, Dict
2+
from .base_config import BaseScanConfiguration
3+
from ...client import CxOneClient
4+
from ..util import json_on_ok
5+
from ...exceptions import ResponseException, ConfigurationException
6+
from ...low.scan_configuration import \
7+
retrieve_tenant_configuration, update_tenant_configuration, delete_tenant_configuration, \
8+
retrieve_project_configuration, update_project_configuration, delete_project_configuration, \
9+
retrieve_scan_configuration
10+
11+
12+
class TenantScanConfiguration(BaseScanConfiguration):
13+
14+
def __init__(self, client : CxOneClient):
15+
super().__init__(client)
16+
17+
def _origin_level(self) -> str:
18+
return "Tenant"
19+
20+
async def _load_config(self) -> List[Dict]:
21+
return json_on_ok(await retrieve_tenant_configuration(self.client))
22+
23+
async def _write_config(self, items : List[Dict]) -> None:
24+
resp = await update_tenant_configuration(self.client, items)
25+
if not resp.ok:
26+
raise ResponseException(resp)
27+
28+
async def _write_deletes(self, keys : List[str]) -> None:
29+
resp = await delete_tenant_configuration(self.client, config_keys=','.join(keys))
30+
if not resp.ok:
31+
raise ResponseException(resp)
32+
33+
class ProjectScanConfiguration(BaseScanConfiguration):
34+
def __init__(self, client : CxOneClient, project_id : str):
35+
super().__init__(client)
36+
self.__project_id = project_id
37+
38+
@property
39+
def project_id(self) -> str:
40+
return self.__project_id
41+
42+
def _origin_level(self) -> str:
43+
return "Project"
44+
45+
async def _load_config(self) -> List[Dict]:
46+
return json_on_ok(await retrieve_project_configuration(self.client, project_id=self.project_id))
47+
48+
async def _write_config(self, items : List[Dict]) -> None:
49+
resp = await update_project_configuration(self.client, items, project_id=self.project_id)
50+
if not resp.ok:
51+
raise ResponseException(resp)
52+
53+
async def _write_deletes(self, keys : List[str]) -> None:
54+
resp = await delete_project_configuration(self.client, config_keys=','.join(keys), project_id=self.project_id)
55+
if not resp.ok:
56+
raise ResponseException(resp)
57+
58+
class ScanConfiguration(ProjectScanConfiguration):
59+
def __init__(self, client : CxOneClient, project_id : str, scan_id : str):
60+
super().__init__(client, project_id)
61+
self.__scan_id = scan_id
62+
63+
@property
64+
def scan_id(self) -> str:
65+
return self.__scan_id
66+
67+
def _origin_level(self) -> str:
68+
return "Scan"
69+
70+
async def _load_config(self) -> List[Dict]:
71+
return json_on_ok(await retrieve_scan_configuration(self.client, project_id=self.project_id, scan_id=self.scan_id))
72+
73+
async def _write_config(self, items : List[Dict]) -> None:
74+
raise ConfigurationException.read_only()
75+
76+
async def _write_deletes(self, keys : List[str]) -> None:
77+
raise ConfigurationException.read_only()
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from .categories import SastScanConfig, ScaScanConfig, ApiSecurityScanConfig, ContainerScanConfig
2+
from .element_handler import CxOneConfigElementHandler
3+
from ...client import CxOneClient
4+
5+
6+
class BaseScanConfiguration(CxOneConfigElementHandler):
7+
def __init__(self, client : CxOneClient):
8+
super().__init__(client)
9+
self.__sast = SastScanConfig(self)
10+
self.__sca = ScaScanConfig(self)
11+
self.__api = ApiSecurityScanConfig(self)
12+
self.__containers = ContainerScanConfig(self)
13+
14+
@property
15+
def SAST(self) -> SastScanConfig:
16+
return self.__sast
17+
18+
@property
19+
def SCA(self) -> ScaScanConfig:
20+
return self.__sca
21+
22+
@property
23+
def APISec(self) -> ApiSecurityScanConfig:
24+
return self.__api
25+
26+
@property
27+
def Containers(self) -> ContainerScanConfig:
28+
return self.__containers
29+
30+
def _origin_level(self) -> str:
31+
raise NotImplemented("_origin_level")
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
from .element_handler import CxOneConfigElementHandler
2+
from ...exceptions import ConfigurationException, CommunicationException
3+
from ...low.misc import retrieve_preset_list
4+
from ..util import json_on_ok
5+
from typing import Any, Type, List, Awaitable, Union
6+
from .value_types import ScanLanguageMode, RestListEnum
7+
import asyncio
8+
9+
class BaseConfigurationCategory:
10+
def __init__(self, container : CxOneConfigElementHandler):
11+
self.__config = container
12+
13+
@property
14+
def _config(self) -> CxOneConfigElementHandler:
15+
return self.__config
16+
17+
18+
class ConfigurationPropertyHandler:
19+
20+
def __init__(self, handler : CxOneConfigElementHandler, prop_key : str, value_type : Union[Type, RestListEnum]):
21+
self.__handler = handler
22+
self.__key = prop_key
23+
self.__value_type = value_type
24+
25+
def __convert_from(self, value : Any) -> Any:
26+
if isinstance(self.__value_type, RestListEnum):
27+
return str(value)
28+
29+
if self.__value_type is bool:
30+
if isinstance(value, str):
31+
return (not value == '') or bool(value)
32+
else:
33+
return bool(value)
34+
else:
35+
return value
36+
37+
def __convert_to(self, value : Any) -> Any:
38+
if self.__value_type is bool:
39+
return str(bool(value)).lower()
40+
41+
if self.__value_type is str and not isinstance(value, str):
42+
return str(value)
43+
44+
return value
45+
46+
47+
async def setValue(self, value : Any) -> None:
48+
if isinstance(self.__value_type, RestListEnum):
49+
if not await self.__value_type.is_valid(value):
50+
raise ConfigurationException.not_in_enum(value, await self.__value_type.get_enum())
51+
elif not isinstance(value, self.__value_type) and \
52+
not isinstance(self.__value_type(value), self.__value_type):
53+
raise ConfigurationException.wrong_type(value, self.__value_type)
54+
55+
await self.__handler._set_value(self.__key, self.__convert_to(value))
56+
57+
async def getValue(self) -> Any:
58+
return self.__convert_from(await self.__handler._get_value(self.__key))
59+
60+
async def setOverride(self, value : bool) -> None:
61+
await self.__handler._set_override(self.__key, value)
62+
63+
async def getOverride(self) -> bool:
64+
return await self.__handler._get_override(self.__key)
65+
66+
async def reset(self) -> None:
67+
await self.__handler._delete_config(self.__key)
68+
69+
70+
class SastPresetType(RestListEnum):
71+
72+
def __init__(self, retrieve_lambda : Awaitable[List[str]]):
73+
super().__init__()
74+
self.__get_lambda = retrieve_lambda
75+
self.__lock = asyncio.Lock()
76+
self.__enum = None
77+
78+
async def get_enum(self) -> List[str]:
79+
async with self.__lock:
80+
if self.__enum == None:
81+
self.__enum = await self.__get_lambda()
82+
83+
return self.__enum
84+
85+
86+
class SastScanConfig(BaseConfigurationCategory):
87+
def __init__(self, container : CxOneConfigElementHandler):
88+
super().__init__(container)
89+
self.__fastscan = ConfigurationPropertyHandler(self._config, "scan.config.sast.fastScanMode", bool)
90+
self.__exclusions = ConfigurationPropertyHandler(self._config, "scan.config.sast.filter", str)
91+
self.__incremental = ConfigurationPropertyHandler(self._config, "scan.config.sast.incremental", bool)
92+
self.__rec = ConfigurationPropertyHandler(self._config, "scan.config.sast.recommendedExclusions", bool)
93+
self.__verbose = ConfigurationPropertyHandler(self._config, "scan.config.sast.engineVerbose", bool)
94+
self.__mode = ConfigurationPropertyHandler(self._config, "scan.config.sast.languageMode", ScanLanguageMode)
95+
96+
async def get_presets() -> List[str]:
97+
pl_resp = await retrieve_preset_list(self._config.client)
98+
if not pl_resp.ok:
99+
raise CommunicationException(pl_resp)
100+
101+
pl = json_on_ok(pl_resp)
102+
103+
return [x['name'] for x in pl]
104+
105+
self.__preset = ConfigurationPropertyHandler(self._config, "scan.config.sast.presetName",
106+
SastPresetType(get_presets))
107+
108+
@property
109+
def FastScan(self) -> ConfigurationPropertyHandler:
110+
return self.__fastscan
111+
112+
@property
113+
def Exclusions(self) -> ConfigurationPropertyHandler:
114+
return self.__exclusions
115+
116+
@property
117+
def IncrementalScan(self) -> ConfigurationPropertyHandler:
118+
return self.__incremental
119+
120+
@property
121+
def Preset(self) -> ConfigurationPropertyHandler:
122+
return self.__preset
123+
124+
@property
125+
def RecommendedExclusions(self) -> ConfigurationPropertyHandler:
126+
return self.__rec
127+
128+
@property
129+
def EngineVerbose(self) -> ConfigurationPropertyHandler:
130+
return self.__verbose
131+
132+
@property
133+
def LanguageMode(self) -> ConfigurationPropertyHandler:
134+
return self.__mode
135+
136+
class ScaScanConfig(BaseConfigurationCategory):
137+
def __init__(self, container : CxOneConfigElementHandler):
138+
super().__init__(container)
139+
self.__exclusions = ConfigurationPropertyHandler(self._config, "scan.config.sca.filter", str)
140+
self.__obs_pattern = ConfigurationPropertyHandler(self._config, "scan.config.sca.obfuscatePackagesPattern", str)
141+
self.__exp_path = ConfigurationPropertyHandler(self._config, "scan.config.sca.ExploitablePath", bool)
142+
self.__exp_path_days = ConfigurationPropertyHandler(self._config, "scan.config.sca.LastSastScanTime", str)
143+
144+
@property
145+
def Exclusions(self) -> ConfigurationPropertyHandler:
146+
return self.__exclusions
147+
148+
@property
149+
def ObfuscatePackagesPattern(self) -> ConfigurationPropertyHandler:
150+
return self.__obs_pattern
151+
152+
@property
153+
def ExploitablePath(self) -> ConfigurationPropertyHandler:
154+
return self.__exp_path
155+
156+
@property
157+
def ExploitablePathLastScanDays(self) -> ConfigurationPropertyHandler:
158+
return self.__exp_path_days
159+
160+
class ApiSecurityScanConfig(BaseConfigurationCategory):
161+
def __init__(self, container : CxOneConfigElementHandler):
162+
super().__init__(container)
163+
self.__exclusions = ConfigurationPropertyHandler(self._config, "scan.config.apisec.swaggerFilter", str)
164+
165+
@property
166+
def SwaggerFilter(self) -> ConfigurationPropertyHandler:
167+
return self.__exclusions
168+
169+
class ContainerScanConfig(BaseConfigurationCategory):
170+
def __init__(self, container : CxOneConfigElementHandler):
171+
super().__init__(container)
172+
self.__exclusions = ConfigurationPropertyHandler(self._config, "scan.config.containers.filesFilter", str)
173+
self.__package_filter = ConfigurationPropertyHandler(self._config, "scan.config.containers.packagesFilter", str)
174+
self.__img_tag_filter = ConfigurationPropertyHandler(self._config, "scan.config.containers.imagesFilter", str)
175+
self.__exclude_non_final = ConfigurationPropertyHandler(self._config, "scan.config.containers.nonFinalStagesFilter", bool)
176+
177+
@property
178+
def Exclusions(self) -> ConfigurationPropertyHandler:
179+
return self.__exclusions
180+
181+
@property
182+
def ImageTagFilter(self) -> ConfigurationPropertyHandler:
183+
return self.__img_tag_filter
184+
185+
@property
186+
def PrivatePackageFilter(self) -> ConfigurationPropertyHandler:
187+
return self.__package_filter
188+
189+
@property
190+
def ExclusionNonFinalStagesFilter(self) -> ConfigurationPropertyHandler:
191+
return self.__exclude_non_final

0 commit comments

Comments
 (0)