Skip to content

support download option for upload command #51

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 66 additions & 10 deletions lynkctx.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,38 @@
projectId: $projectId
}
) {
id
errors
}
}
"""

QUERY_SBOM_DOWNLOAD = """
query downloadSbom($envId: Uuid!, $sbomId: Uuid!, $includeVulns: Boolean) {
sbom(projectId: $envId, sbomId: $sbomId) {
query downloadSbom(
$projectId: Uuid!,
$sbomId: Uuid!,
$includeVulns: Boolean,
$spec: SbomSpec,
$original: Boolean,
$package: Boolean,
$lite: Boolean,
$excludeParts: Boolean
) {
sbom(projectId: $projectId, sbomId: $sbomId) {
download(
sbomId: $sbomId
includeVulns: $includeVulns
)
spec: $spec
original: $original
dontPackageSbom: $package
lite: $lite
excludeParts: $excludeParts
) {
content
contentType
filename
__typename
}
__typename
}
}
Expand Down Expand Up @@ -226,6 +246,7 @@ def resolve_env(self):

def resolve_ver(self):
env = self.env or 'default'

if not self.ver_id:
for product in self.data.get('data', {}).get('organization', {}).get('productNodes', {}).get('products', []):
if product['id'] == self.prod_id:
Expand All @@ -236,6 +257,7 @@ def resolve_ver(self):
self.ver_id = ver['id']
self.ver_status = self.vuln_status_to_status(
ver['vulnRunStatus'])

empty_ver = False
if not self.ver:
for product in self.data.get('data', {}).get('organization', {}).get('productNodes', {}).get('products', []):
Expand All @@ -244,11 +266,25 @@ def resolve_ver(self):
if env['id'] == self.env_id:
for ver in env['versions']:
if ver['id'] == self.ver_id:
self.ver = ver['primaryComponent']['version']
if ver.get('primaryComponent'):
self.ver = ver['primaryComponent']['version']
if not self.ver:
empty_ver = True
self.ver_status = self.vuln_status_to_status(
ver['vulnRunStatus'])


# if ver is not empty
if not empty_ver:
for product in self.data.get('data', {}).get('organization', {}).get('productNodes', {}).get('products', []):
if product['id'] == self.prod_id:
for env in product['environments']:
if env['id'] == self.env_id:
for ver in env['versions']:
if ver['id'] == self.ver_id:
if ver.get('primaryComponent'):
self.ver = ver['primaryComponent']['version']
self.ver_status = self.vuln_status_to_status(ver['vulnRunStatus'])

return (empty_ver or self.ver) and self.ver_id

Expand Down Expand Up @@ -285,6 +321,7 @@ def versions(self):
return versions_node

def status(self):
self.data = self._fetch_context()
self.resolve_ver()
return self.ver_status

Expand All @@ -293,11 +330,16 @@ def download(self):
self.env_id, self.ver_id)

variables = {
"envId": self.env_id,
"projectId": self.env_id,
"sbomId": self.ver_id,
"includeVulns": False
"includeVulns": False,
"spec": "CycloneDX",
"original": False,
"package": False,
"lite": False,
"excludeParts": True
}

logging.debug("Variables for request: %s", variables)
request_data = {
"query": QUERY_SBOM_DOWNLOAD,
"variables": variables,
Expand All @@ -323,10 +365,14 @@ def download(self):
print('No SBOM matched with the given ID')
logging.debug(data)
return None
b64data = sbom.get('download')
decoded_content = base64.b64decode(b64data)
download_data = sbom.get('download', {})
b64data = download_data.get('content')
if not b64data:
logging.error("No content found in the download response.")
return None
decoded_content = base64.b64decode(b64data).decode('utf-8')
logging.debug('Completed download and decoding')
return decoded_content.decode('utf-8')
return decoded_content
except json.JSONDecodeError:
logging.error("Failed to parse JSON response.")
else:
Expand Down Expand Up @@ -373,14 +419,24 @@ def upload(self, sbom_file):
timeout=INTERLYNK_API_TIMEOUT)
if response.status_code == 200:
resp_json = response.json()
version_id = resp_json.get('data', {}).get('sbomUpload', {}).get('id')
logging.debug("version_id or sbom_id: %s", version_id)
errors = resp_json.get('data', {}).get(
'sbomUpload', {}).get('errors')
if errors:
print(f"Error uploading sbom: {errors}")
return 1
if version_id:
self.ver_id = version_id
logging.debug("SBOM ID successfully returned in the response: %s", self.ver_id)
logging.debug("SBOM upload response: %s", response.text)
else:
print("Error: SBOM ID not returned in the response.")
return 1
print('Uploaded successfully')
logging.debug("SBOM Uploading response: %s", response.text)
return 0
print("Error uploading sbom")
logging.error("Error uploading sbom: %d", response.status_code)
except requests.exceptions.RequestException as ex:
logging.error("RequestException: %s", ex)
Expand Down
38 changes: 35 additions & 3 deletions pylynk.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,18 +213,45 @@ def download_sbom(lynk_ctx):
return 0


def upload_sbom(lynk_ctx, sbom_file):
def upload_sbom(lynk_ctx, sbom_file, download):
"""
Upload SBOM to the lynk_ctx.

Args:
lynk_ctx: The lynk context object.
sbom_file: The path to the SBOM file.
download: download file after the automationStatus is completed

Returns:
The result of the upload operation.
If download is true, then along with upload operation it also performs download operation
"""
return lynk_ctx.upload(sbom_file)
upload_result = lynk_ctx.upload(sbom_file)

if upload_result != 0:
return 1

if download:
max_retries = 5
retries = 0
while retries < max_retries:
status = lynk_ctx.status()
if status is None:
print('Failed to fetch status for the version')
return 1

if status.get('automationStatus') == "COMPLETED":
download_sbom(lynk_ctx)
break
else:
time.sleep(5)
retries += 1

if retries == max_retries:
print("Error: automationStatus could not be completed within the maximum retry limit.")
return 1

return 0


def add_output_format_group(parser):
Expand Down Expand Up @@ -290,6 +317,10 @@ def setup_args():
upload_parser.add_argument("--token",
required=False,
help="Security token")
upload_parser.add_argument("--download", action="store_true",
help="Download SBOM after upload (default: False)")
upload_parser.add_argument(
"--output", help="Output file", required=False)

download_parser = subparsers.add_parser("download", help="Download SBOM")
download_group = download_parser.add_mutually_exclusive_group(
Expand Down Expand Up @@ -374,7 +405,8 @@ def main() -> int:
elif args.subcommand == "status":
print_status(lynk_ctx, fmt_json)
elif args.subcommand == "upload":
upload_sbom(lynk_ctx, args.sbom)
download_flag = getattr(args, 'download', False)
upload_sbom(lynk_ctx, args.sbom, download_flag)
elif args.subcommand == "download":
download_sbom(lynk_ctx)
else:
Expand Down