Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
9c8a432
Moved permission checks to the BaseModel class
mohamedelabbas1996 Jul 2, 2025
4fdae24
Refactor Job model permission checks to include job type for finer-gr…
mohamedelabbas1996 Jul 2, 2025
c43ef9c
Added fine-grained permissions for each job type
mohamedelabbas1996 Jul 2, 2025
4407af1
Modified roles to use the fine-grained job permissions
mohamedelabbas1996 Jul 2, 2025
1ec1595
Updated views to use the new permission checks
mohamedelabbas1996 Jul 2, 2025
c4de3de
Removed old job permissions from tests
mohamedelabbas1996 Jul 2, 2025
43d7bbf
Added migration file
mohamedelabbas1996 Jul 2, 2025
35a6ea8
Merge branch 'main' into feat/restrict-ml-processing-jobs
mohamedelabbas1996 Jul 2, 2025
b4ef1bc
Added permission for processing single source image
mohamedelabbas1996 Jul 3, 2025
a83e919
Merge branch 'feat/restrict-ml-processing-jobs' of https://github.com…
mohamedelabbas1996 Jul 3, 2025
095b899
Changed PermissionError to PermissionDenied
mohamedelabbas1996 Jul 3, 2025
c694706
Added migration file
mohamedelabbas1996 Jul 3, 2025
5515a87
Merge branch 'main' into feat/restrict-ml-processing-jobs
mohamedelabbas1996 Jul 3, 2025
3fc6906
Merge branch 'main' into feat/restrict-ml-processing-jobs
mohamedelabbas1996 Jul 3, 2025
5123aef
tests: Added tests for fine-grained job run permission
mohamedelabbas1996 Jul 3, 2025
805f87b
Move process single source image permission from source image to job …
mohamedelabbas1996 Jul 3, 2025
e995d9d
Added migration files
mohamedelabbas1996 Jul 3, 2025
cdbae94
Remove process_sourceimage permission from job details response
mohamedelabbas1996 Jul 3, 2025
22882a6
Added type hints
mohamedelabbas1996 Jul 4, 2025
a7eab7a
Modified process single image permission name
mohamedelabbas1996 Jul 4, 2025
4ee7e53
Added migration file
mohamedelabbas1996 Jul 4, 2025
7adf27d
Changed tests to check for run single image permission in the job det…
mohamedelabbas1996 Jul 4, 2025
9642ab7
chore: update FE permission handling for processing single captures
annavik Jul 4, 2025
b4e16ad
chore: update FE permission handling to use single run permission
annavik Jul 4, 2025
f385af3
Squashed migrations
mohamedelabbas1996 Jul 7, 2025
a9b39d7
Merge branch 'feat/restrict-ml-processing-jobs' of https://github.com…
mohamedelabbas1996 Jul 7, 2025
d166865
Merge branch 'main' into feat/restrict-ml-processing-jobs
mohamedelabbas1996 Jul 7, 2025
a15e9cf
Cleaned up permission check classes defined for each model and used a…
mohamedelabbas1996 Jul 7, 2025
19deb5f
Cleaned up BaseModel permission checks
mohamedelabbas1996 Jul 7, 2025
952e5ff
Returned run_single_image_ml_job permission with the source image object
mohamedelabbas1996 Jul 7, 2025
4824df3
Implemented custom permission checks for the identification model to …
mohamedelabbas1996 Jul 7, 2025
28c605d
Removed generic job run, cancel and retry permissions
mohamedelabbas1996 Jul 7, 2025
1f8b0e2
Revoked ml job run permission
mohamedelabbas1996 Jul 7, 2025
737da22
Merge branch 'feat/restrict-ml-processing-jobs' of https://github.com…
mohamedelabbas1996 Jul 7, 2025
81fc2a9
Added a migration to delete deprecated job permissions
mohamedelabbas1996 Jul 7, 2025
e215ac9
Added tests to make sure that job permissions are returned correctly …
mohamedelabbas1996 Jul 8, 2025
6a16ac0
Changed permission name for run single image
mohamedelabbas1996 Jul 8, 2025
4e045e5
Added tests for single image ml job
mohamedelabbas1996 Jul 8, 2025
38b7fdf
Delete migration file
mohamedelabbas1996 Jul 8, 2025
d7028e7
Merge branch 'main' into feat/restrict-ml-processing-jobs
mohamedelabbas1996 Jul 9, 2025
2f87531
Fixed conflicting migrations issue
mohamedelabbas1996 Jul 9, 2025
b37b43b
Merge branch 'main' of github.com:RolnickLab/antenna into feat/restri…
mihow Aug 15, 2025
dd35803
chore: resolve migration conflicts
mihow Aug 15, 2025
c956ffa
Use logger.info instead of print in migration for deleting deprecated…
mohamedelabbas1996 Aug 18, 2025
b388931
Merge branch 'main' into feat/restrict-ml-processing-jobs
mohamedelabbas1996 Aug 18, 2025
1d18e31
Merged migrations
mohamedelabbas1996 Aug 18, 2025
003e7e6
Granted basic members create_job permission
mohamedelabbas1996 Aug 18, 2025
1fd3f5d
Modified tests to allow basic members to create a job
mohamedelabbas1996 Aug 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions ami/base/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from django.contrib.auth.models import AbstractUser, AnonymousUser
from django.db import models
from guardian.shortcuts import get_perms

import ami.tasks

Expand Down Expand Up @@ -29,5 +31,94 @@ def update_calculated_fields(self, *args, **kwargs):
"""Update calculated fields specific to each model."""
pass

def _get_object_perms(self, user):
"""
Get the object-level permissions for the user on this instance.
This method retrieves permissions like `update_modelname`, `create_modelname`, etc.
"""
project = self.get_project()
if not project:
return []

model_name = self._meta.model_name
all_perms = get_perms(user, project)
object_perms = [perm for perm in all_perms if perm.endswith(f"_{model_name}")]
return object_perms

def check_permission(self, user: AbstractUser | AnonymousUser, action: str) -> bool:
"""
Check if the user has permission to perform the action
on this instance.
This method is used to determine if the user can perform
CRUD operations or custom actions on the model instance.
"""
project = self.get_project() if hasattr(self, "get_project") else None
if not project:
return False
if action == "retrieve":
# Allow view
return True

model = self._meta.model_name
crud_map = {
"create": f"create_{model}",
"update": f"update_{model}",
"partial_update": f"update_{model}",
"destroy": f"delete_{model}",
}

if action in crud_map:
return user.has_perm(crud_map[action], project)

# Delegate to model-specific logic
return self.check_custom_permission(user, action)

def check_custom_permission(self, user: AbstractUser | AnonymousUser, action: str) -> bool:
"""Check custom permissions for the user on this instance.
This is used for actions that are not standard CRUD operations.
"""
assert self._meta.model_name is not None, "Model must have a model_name defined in Meta class."
model_name = self._meta.model_name.lower()
permission_codename = f"{action}_{model_name}"
project = self.get_project() if hasattr(self, "get_project") else None

return user.has_perm(permission_codename, project)

def get_user_object_permissions(self, user) -> list[str]:
"""
Returns a list of object-level permissions the user has on this instance.
This is used by frontend to determine what actions the user can perform.
"""
# Return all permissions for superusers
if user.is_superuser:
allowed_custom_actions = self.get_custom_user_permissions(user)
return ["update", "delete"] + allowed_custom_actions

object_perms = self._get_object_perms(user)
# Check for update and delete permissions
allowed_actions = set()
for perm in object_perms:
action = perm.split("_", 1)[0]
if action in {"update", "delete"}:
allowed_actions.add(action)

allowed_custom_actions = self.get_custom_user_permissions(user)
allowed_actions.update(set(allowed_custom_actions))
return list(allowed_actions)

def get_custom_user_permissions(self, user: AbstractUser | AnonymousUser) -> list[str]:
"""
Returns a list of custom permissions (not standard CRUD actions) that the user has on this instance.
"""
object_perms = self._get_object_perms(user)
custom_perms = set()
# Extract custom permissions that are not standard CRUD actions
for perm in object_perms:
action = perm.split("_", 1)[0]
# Make sure to exclude standard CRUD actions
if action not in ["view", "create", "update", "delete"]:
custom_perms.add(action)
return list(custom_perms)

class Meta:
abstract = True
190 changes: 8 additions & 182 deletions ami/base/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,7 @@
from guardian.shortcuts import get_perms
from rest_framework import permissions

from ami.jobs.models import Job
from ami.main.models import (
BaseModel,
Deployment,
Device,
Project,
S3StorageSource,
Site,
SourceImage,
SourceImageCollection,
SourceImageUpload,
)
from ami.users.roles import ProjectManager
from ami.main.models import BaseModel

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -64,18 +52,8 @@ def add_object_level_permissions(
"""

permissions = response_data.get("user_permissions", set())
project = instance.get_project() if hasattr(instance, "get_project") else None
model_name = instance._meta.model_name # Get model name
if user and user.is_superuser:
permissions.update(["update", "delete"])

if project:
user_permissions = get_perms(user, project)
# Filter and extract only the action part of "action_modelname" based on instance type
filtered_permissions = filter_permissions(permissions=user_permissions, model_name=model_name)
# Do not return create, view permissions at object-level
filtered_permissions -= {"create", "view"}
permissions.update(filtered_permissions)
if isinstance(instance, BaseModel):
permissions.update(instance.get_user_object_permissions(user))
response_data["user_permissions"] = list(permissions)
return response_data

Expand All @@ -99,165 +77,13 @@ def add_collection_level_permissions(user: User | None, response_data: dict, mod
return response_data


class CRUDPermission(permissions.BasePermission):
class ObjectPermission(permissions.BasePermission):
"""
Generic CRUD permission class that dynamically checks user permissions on an object.
Permission names follow the convention: `create_<model>`, `update_<model>`, `delete_<model>`.
Generic permission class that delegates to the model's `check_permission(user, action)` method.
"""

model = None

def has_permission(self, request, view):
"""Handles general permission checks"""

return True # Fallback to object level permissions

def has_object_permission(self, request, view, obj):
"""Handles object-level permission checks."""
model_name = self.model._meta.model_name
project = obj.get_project() if hasattr(obj, "get_project") else None
# check for create action
if view.action == "create":
return request.user.is_superuser or request.user.has_perm(f"create_{model_name}", project)

if view.action == "retrieve":
return True # Allow all users to view objects

# Map ViewSet actions to permission names
action_perms = {
"retrieve": f"view_{model_name}",
"update": f"update_{model_name}",
"partial_update": f"update_{model_name}",
"destroy": f"delete_{model_name}",
}

required_perm = action_perms.get(view.action)
if not required_perm:
return True
return request.user.has_perm(required_perm, project)


class ProjectCRUDPermission(CRUDPermission):
model = Project


class JobCRUDPermission(CRUDPermission):
model = Job


class DeploymentCRUDPermission(CRUDPermission):
model = Deployment


class SourceImageCollectionCRUDPermission(CRUDPermission):
model = SourceImageCollection


class SourceImageUploadCRUDPermission(CRUDPermission):
model = SourceImageUpload


class SourceImageCRUDPermission(CRUDPermission):
model = SourceImage


class CanStarSourceImage(permissions.BasePermission):
"""Custom permission to check if the user can star a Source image."""

permission = Project.Permissions.STAR_SOURCE_IMAGE

def has_object_permission(self, request, view, obj):
if view.action in ["unstar", "star"]:
project = obj.get_project() if hasattr(obj, "get_project") else None
return request.user.has_perm(self.permission, project)
return True


class S3StorageSourceCRUDPermission(CRUDPermission):
model = S3StorageSource


class SiteCRUDPermission(CRUDPermission):
model = Site


class DeviceCRUDPermission(CRUDPermission):
model = Device


# Identification permission checks
class CanUpdateIdentification(permissions.BasePermission):
"""Custom permission to check if the user can update/create an identification."""

permission = Project.Permissions.UPDATE_IDENTIFICATION

def has_object_permission(self, request, view, obj):
if view.action in ["create", "update", "partial_update"]:
project = obj.get_project() if hasattr(obj, "get_project") else None
return request.user.has_perm(self.permission, project)
return True


class CanDeleteIdentification(permissions.BasePermission):
"""Custom permission to check if the user can delete an identification."""

permission = Project.Permissions.DELETE_IDENTIFICATION

def has_object_permission(self, request, view, obj):
project = obj.get_project() if hasattr(obj, "get_project") else None
# Check if user is superuser or staff or project manager
if view.action == "destroy":
if request.user.is_superuser or ProjectManager.has_role(request.user, project):
return True
# Check if the user is the owner of the object
return obj.user == request.user
return True


# Job run permission check
class CanRunJob(permissions.BasePermission):
"""Custom permission to check if the user can run a job."""

permission = Project.Permissions.RUN_JOB

def has_object_permission(self, request, view, obj):
if view.action == "run":
project = obj.get_project() if hasattr(obj, "get_project") else None
return request.user.has_perm(self.permission, project)
return True


class CanRetryJob(permissions.BasePermission):
"""Custom permission to check if the user can retry a job."""

permission = Project.Permissions.RETRY_JOB

def has_object_permission(self, request, view, obj):
if view.action == "retry":
project = obj.get_project() if hasattr(obj, "get_project") else None
return request.user.has_perm(self.permission, project)
return True


class CanCancelJob(permissions.BasePermission):
"""Custom permission to check if the user can cancel a job."""

permission = Project.Permissions.CANCEL_JOB

def has_object_permission(self, request, view, obj):
if view.action == "cancel":
project = obj.get_project() if hasattr(obj, "get_project") else None
return request.user.has_perm(self.permission, project)
return True


class CanPopulateSourceImageCollection(permissions.BasePermission):
"""Custom permission to check if the user can populate a collection."""

permission = Project.Permissions.POPULATE_COLLECTION
return True # Always allow — object-level handles actual checks
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it work to default this to False?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here if has permission returns False it will just deny access and won't even check object level permissions.
This is because has_permission() is evaluated first for view-level access, and only if it passes does DRF proceed to check object-level permissions (if applicable). So even if has_object_permission() would allow the action, it won’t matter if has_permission() blocks the request upfront.


def has_object_permission(self, request, view, obj):
if view.action == "populate":
project = obj.get_project() if hasattr(obj, "get_project") else None
return request.user.has_perm(self.permission, project)
return True
def has_object_permission(self, request, view, obj: BaseModel):
return obj.check_permission(request.user, view.action)
32 changes: 32 additions & 0 deletions ami/jobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from django.db import models, transaction
from django.utils.text import slugify
from django_pydantic_field import SchemaField
from guardian.shortcuts import get_perms

from ami.base.models import BaseModel
from ami.base.schemas import ConfigurableStage, ConfigurableStageParam
Expand Down Expand Up @@ -907,6 +908,37 @@ def save(self, update_progress=True, *args, **kwargs):
if self.progress.summary.status != self.status:
logger.warning(f"Job {self} status mismatches progress: {self.progress.summary.status} != {self.status}")

def check_custom_permission(self, user, action: str) -> bool:
job_type = self.job_type_key.lower()
if self.source_image_single:
action = "run_single_image"
if action in ["run", "cancel", "retry"]:
permission_codename = f"run_{job_type}_job"
else:
permission_codename = f"{action}_{job_type}_job"

project = self.get_project() if hasattr(self, "get_project") else None
return user.has_perm(permission_codename, project)

def get_custom_user_permissions(self, user) -> list[str]:
project = self.get_project()
if not project:
return []

custom_perms = set()
model_name = "job"
perms = get_perms(user, project)
job_type = self.job_type_key.lower()
for perm in perms:
# permissions are in the format "action_modelname"
if perm.endswith(f"{job_type}_{model_name}"):
action = perm[: -len(f"_{job_type}_{model_name}")]
# make sure to exclude standard CRUD actions
if action not in ["view", "create", "update", "delete"]:
custom_perms.add(action)
logger.debug(f"Custom permissions for user {user} on project {self}, with jobtype {job_type}: {custom_perms}")
return list(custom_perms)

@classmethod
def default_progress(cls) -> JobProgress:
"""Return the progress of each stage of this job as a dictionary"""
Expand Down
2 changes: 1 addition & 1 deletion ami/jobs/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def test_run_job(self):
self.client.force_authenticate(user=self.user)
# give user run permission

assign_perm(Project.Permissions.RUN_JOB, self.user, self.project)
assign_perm(Project.Permissions.RUN_POPULATE_CAPTURES_COLLECTION_JOB, self.user, self.project)

resp = self.client.post(jobs_run_url)
self.assertEqual(resp.status_code, 200)
Expand Down
Loading