diff --git a/lynkctx.py b/lynkctx.py index fe8eb0d..14b2d02 100644 --- a/lynkctx.py +++ b/lynkctx.py @@ -77,18 +77,38 @@ projectId: $projectId } ) { + id errors } } """ QUERY_SBOM_DOWNLOAD = """ -query downloadSbom($envId: Uuid!, $sbomId: Uuid!, $includeVulns: Boolean) { - sbom(projectId: $envId, sbomId: $sbomId) { +query downloadSbom( + $projectId: Uuid!, + $sbomId: Uuid!, + $includeVulns: Boolean, + $spec: SbomSpec, + $original: Boolean, + $package: Boolean, + $lite: Boolean, + $excludeParts: Boolean +) { + sbom(projectId: $projectId, sbomId: $sbomId) { download( sbomId: $sbomId includeVulns: $includeVulns - ) + spec: $spec + original: $original + dontPackageSbom: $package + lite: $lite + excludeParts: $excludeParts + ) { + content + contentType + filename + __typename + } __typename } } @@ -226,6 +246,7 @@ def resolve_env(self): def resolve_ver(self): env = self.env or 'default' + if not self.ver_id: for product in self.data.get('data', {}).get('organization', {}).get('productNodes', {}).get('products', []): if product['id'] == self.prod_id: @@ -236,6 +257,7 @@ def resolve_ver(self): self.ver_id = ver['id'] self.ver_status = self.vuln_status_to_status( ver['vulnRunStatus']) + empty_ver = False if not self.ver: for product in self.data.get('data', {}).get('organization', {}).get('productNodes', {}).get('products', []): @@ -244,11 +266,25 @@ def resolve_ver(self): if env['id'] == self.env_id: for ver in env['versions']: if ver['id'] == self.ver_id: - self.ver = ver['primaryComponent']['version'] + if ver.get('primaryComponent'): + self.ver = ver['primaryComponent']['version'] if not self.ver: empty_ver = True self.ver_status = self.vuln_status_to_status( ver['vulnRunStatus']) + + + # if ver is not empty + if not empty_ver: + for product in self.data.get('data', {}).get('organization', {}).get('productNodes', {}).get('products', []): + if product['id'] == self.prod_id: + for env in product['environments']: + if env['id'] == self.env_id: + for ver in env['versions']: + if ver['id'] == self.ver_id: + if ver.get('primaryComponent'): + self.ver = ver['primaryComponent']['version'] + self.ver_status = self.vuln_status_to_status(ver['vulnRunStatus']) return (empty_ver or self.ver) and self.ver_id @@ -285,6 +321,7 @@ def versions(self): return versions_node def status(self): + self.data = self._fetch_context() self.resolve_ver() return self.ver_status @@ -293,11 +330,16 @@ def download(self): self.env_id, self.ver_id) variables = { - "envId": self.env_id, + "projectId": self.env_id, "sbomId": self.ver_id, - "includeVulns": False + "includeVulns": False, + "spec": "CycloneDX", + "original": False, + "package": False, + "lite": False, + "excludeParts": True } - + logging.debug("Variables for request: %s", variables) request_data = { "query": QUERY_SBOM_DOWNLOAD, "variables": variables, @@ -323,10 +365,14 @@ def download(self): print('No SBOM matched with the given ID') logging.debug(data) return None - b64data = sbom.get('download') - decoded_content = base64.b64decode(b64data) + download_data = sbom.get('download', {}) + b64data = download_data.get('content') + if not b64data: + logging.error("No content found in the download response.") + return None + decoded_content = base64.b64decode(b64data).decode('utf-8') logging.debug('Completed download and decoding') - return decoded_content.decode('utf-8') + return decoded_content except json.JSONDecodeError: logging.error("Failed to parse JSON response.") else: @@ -373,14 +419,24 @@ def upload(self, sbom_file): timeout=INTERLYNK_API_TIMEOUT) if response.status_code == 200: resp_json = response.json() + version_id = resp_json.get('data', {}).get('sbomUpload', {}).get('id') + logging.debug("version_id or sbom_id: %s", version_id) errors = resp_json.get('data', {}).get( 'sbomUpload', {}).get('errors') if errors: print(f"Error uploading sbom: {errors}") return 1 + if version_id: + self.ver_id = version_id + logging.debug("SBOM ID successfully returned in the response: %s", self.ver_id) + logging.debug("SBOM upload response: %s", response.text) + else: + print("Error: SBOM ID not returned in the response.") + return 1 print('Uploaded successfully') logging.debug("SBOM Uploading response: %s", response.text) return 0 + print("Error uploading sbom") logging.error("Error uploading sbom: %d", response.status_code) except requests.exceptions.RequestException as ex: logging.error("RequestException: %s", ex) diff --git a/pylynk.py b/pylynk.py index 93b8212..cc2a051 100644 --- a/pylynk.py +++ b/pylynk.py @@ -213,18 +213,45 @@ def download_sbom(lynk_ctx): return 0 -def upload_sbom(lynk_ctx, sbom_file): +def upload_sbom(lynk_ctx, sbom_file, download): """ Upload SBOM to the lynk_ctx. Args: lynk_ctx: The lynk context object. sbom_file: The path to the SBOM file. + download: download file after the automationStatus is completed Returns: The result of the upload operation. + If download is true, then along with upload operation it also performs download operation """ - return lynk_ctx.upload(sbom_file) + upload_result = lynk_ctx.upload(sbom_file) + + if upload_result != 0: + return 1 + + if download: + max_retries = 5 + retries = 0 + while retries < max_retries: + status = lynk_ctx.status() + if status is None: + print('Failed to fetch status for the version') + return 1 + + if status.get('automationStatus') == "COMPLETED": + download_sbom(lynk_ctx) + break + else: + time.sleep(5) + retries += 1 + + if retries == max_retries: + print("Error: automationStatus could not be completed within the maximum retry limit.") + return 1 + + return 0 def add_output_format_group(parser): @@ -290,6 +317,10 @@ def setup_args(): upload_parser.add_argument("--token", required=False, help="Security token") + upload_parser.add_argument("--download", action="store_true", + help="Download SBOM after upload (default: False)") + upload_parser.add_argument( + "--output", help="Output file", required=False) download_parser = subparsers.add_parser("download", help="Download SBOM") download_group = download_parser.add_mutually_exclusive_group( @@ -374,7 +405,8 @@ def main() -> int: elif args.subcommand == "status": print_status(lynk_ctx, fmt_json) elif args.subcommand == "upload": - upload_sbom(lynk_ctx, args.sbom) + download_flag = getattr(args, 'download', False) + upload_sbom(lynk_ctx, args.sbom, download_flag) elif args.subcommand == "download": download_sbom(lynk_ctx) else: