generated from aboutcode-org/skeleton
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtasks.py
63 lines (55 loc) · 2.09 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# FederatedCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/nexB/federatedcode for support or download.
# See https://aboutcode.org for more information about AboutCode.org OSS projects.
#
from django.core.management.base import BaseCommand
from django.core.management.base import CommandError
from fedcode.importer import Importer
from fedcode.models import FederateRequest
from fedcode.models import SyncRequest
from fedcode.signatures import FEDERATEDCODE_PRIVATE_KEY
from fedcode.signatures import HttpSignature
def sync_task():
"""
sync_task is a task to run the Importer and save the status
"""
for sync_r in SyncRequest.objects.all().order_by("created_at"):
if not sync_r.done:
try:
repo = sync_r.repo
repo.git_repo_obj.remotes.origin.pull()
importer = Importer(repo, repo.admin)
importer.run()
sync_r.done = True
except Exception as e:
sync_r.error_message = e
finally:
sync_r.save()
def send_fed_req_task():
"""
send_fed_req_task is a task to send the http signed request to the target and save the status of the request
"""
for rq in FederateRequest.objects.all().order_by("created_at"):
if not rq.done:
try:
HttpSignature.signed_request(
rq.target, rq.body, FEDERATEDCODE_PRIVATE_KEY, rq.key_id
)
rq.done = True
rq.save()
except Exception as e:
rq.error_message = e
finally:
rq.save()
class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument("task", choices=["sync", "federate"])
def handle(self, *args, **options):
if options["task"] == "sync":
sync_task()
elif options["task"] == "federate":
send_fed_req_task()