6
6
from guardian .shortcuts import get_perms
7
7
from rest_framework import permissions
8
8
9
- from ami .jobs .models import Job
10
- from ami .main .models import (
11
- BaseModel ,
12
- Deployment ,
13
- Device ,
14
- Project ,
15
- S3StorageSource ,
16
- Site ,
17
- SourceImage ,
18
- SourceImageCollection ,
19
- SourceImageUpload ,
20
- )
21
- from ami .users .roles import ProjectManager
9
+ from ami .main .models import BaseModel
22
10
23
11
logger = logging .getLogger (__name__ )
24
12
@@ -64,18 +52,8 @@ def add_object_level_permissions(
64
52
"""
65
53
66
54
permissions = response_data .get ("user_permissions" , set ())
67
- project = instance .get_project () if hasattr (instance , "get_project" ) else None
68
- model_name = instance ._meta .model_name # Get model name
69
- if user and user .is_superuser :
70
- permissions .update (["update" , "delete" ])
71
-
72
- if project :
73
- user_permissions = get_perms (user , project )
74
- # Filter and extract only the action part of "action_modelname" based on instance type
75
- filtered_permissions = filter_permissions (permissions = user_permissions , model_name = model_name )
76
- # Do not return create, view permissions at object-level
77
- filtered_permissions -= {"create" , "view" }
78
- permissions .update (filtered_permissions )
55
+ if isinstance (instance , BaseModel ):
56
+ permissions .update (instance .get_user_object_permissions (user ))
79
57
response_data ["user_permissions" ] = list (permissions )
80
58
return response_data
81
59
@@ -99,165 +77,13 @@ def add_collection_level_permissions(user: User | None, response_data: dict, mod
99
77
return response_data
100
78
101
79
102
- class CRUDPermission (permissions .BasePermission ):
80
+ class ObjectPermission (permissions .BasePermission ):
103
81
"""
104
- Generic CRUD permission class that dynamically checks user permissions on an object.
105
- Permission names follow the convention: `create_<model>`, `update_<model>`, `delete_<model>`.
82
+ Generic permission class that delegates to the model's `check_permission(user, action)` method.
106
83
"""
107
84
108
- model = None
109
-
110
85
def has_permission (self , request , view ):
111
- """Handles general permission checks"""
112
-
113
- return True # Fallback to object level permissions
114
-
115
- def has_object_permission (self , request , view , obj ):
116
- """Handles object-level permission checks."""
117
- model_name = self .model ._meta .model_name
118
- project = obj .get_project () if hasattr (obj , "get_project" ) else None
119
- # check for create action
120
- if view .action == "create" :
121
- return request .user .is_superuser or request .user .has_perm (f"create_{ model_name } " , project )
122
-
123
- if view .action == "retrieve" :
124
- return True # Allow all users to view objects
125
-
126
- # Map ViewSet actions to permission names
127
- action_perms = {
128
- "retrieve" : f"view_{ model_name } " ,
129
- "update" : f"update_{ model_name } " ,
130
- "partial_update" : f"update_{ model_name } " ,
131
- "destroy" : f"delete_{ model_name } " ,
132
- }
133
-
134
- required_perm = action_perms .get (view .action )
135
- if not required_perm :
136
- return True
137
- return request .user .has_perm (required_perm , project )
138
-
139
-
140
- class ProjectCRUDPermission (CRUDPermission ):
141
- model = Project
142
-
143
-
144
- class JobCRUDPermission (CRUDPermission ):
145
- model = Job
146
-
147
-
148
- class DeploymentCRUDPermission (CRUDPermission ):
149
- model = Deployment
150
-
151
-
152
- class SourceImageCollectionCRUDPermission (CRUDPermission ):
153
- model = SourceImageCollection
154
-
155
-
156
- class SourceImageUploadCRUDPermission (CRUDPermission ):
157
- model = SourceImageUpload
158
-
159
-
160
- class SourceImageCRUDPermission (CRUDPermission ):
161
- model = SourceImage
162
-
163
-
164
- class CanStarSourceImage (permissions .BasePermission ):
165
- """Custom permission to check if the user can star a Source image."""
166
-
167
- permission = Project .Permissions .STAR_SOURCE_IMAGE
168
-
169
- def has_object_permission (self , request , view , obj ):
170
- if view .action in ["unstar" , "star" ]:
171
- project = obj .get_project () if hasattr (obj , "get_project" ) else None
172
- return request .user .has_perm (self .permission , project )
173
- return True
174
-
175
-
176
- class S3StorageSourceCRUDPermission (CRUDPermission ):
177
- model = S3StorageSource
178
-
179
-
180
- class SiteCRUDPermission (CRUDPermission ):
181
- model = Site
182
-
183
-
184
- class DeviceCRUDPermission (CRUDPermission ):
185
- model = Device
186
-
187
-
188
- # Identification permission checks
189
- class CanUpdateIdentification (permissions .BasePermission ):
190
- """Custom permission to check if the user can update/create an identification."""
191
-
192
- permission = Project .Permissions .UPDATE_IDENTIFICATION
193
-
194
- def has_object_permission (self , request , view , obj ):
195
- if view .action in ["create" , "update" , "partial_update" ]:
196
- project = obj .get_project () if hasattr (obj , "get_project" ) else None
197
- return request .user .has_perm (self .permission , project )
198
- return True
199
-
200
-
201
- class CanDeleteIdentification (permissions .BasePermission ):
202
- """Custom permission to check if the user can delete an identification."""
203
-
204
- permission = Project .Permissions .DELETE_IDENTIFICATION
205
-
206
- def has_object_permission (self , request , view , obj ):
207
- project = obj .get_project () if hasattr (obj , "get_project" ) else None
208
- # Check if user is superuser or staff or project manager
209
- if view .action == "destroy" :
210
- if request .user .is_superuser or ProjectManager .has_role (request .user , project ):
211
- return True
212
- # Check if the user is the owner of the object
213
- return obj .user == request .user
214
- return True
215
-
216
-
217
- # Job run permission check
218
- class CanRunJob (permissions .BasePermission ):
219
- """Custom permission to check if the user can run a job."""
220
-
221
- permission = Project .Permissions .RUN_JOB
222
-
223
- def has_object_permission (self , request , view , obj ):
224
- if view .action == "run" :
225
- project = obj .get_project () if hasattr (obj , "get_project" ) else None
226
- return request .user .has_perm (self .permission , project )
227
- return True
228
-
229
-
230
- class CanRetryJob (permissions .BasePermission ):
231
- """Custom permission to check if the user can retry a job."""
232
-
233
- permission = Project .Permissions .RETRY_JOB
234
-
235
- def has_object_permission (self , request , view , obj ):
236
- if view .action == "retry" :
237
- project = obj .get_project () if hasattr (obj , "get_project" ) else None
238
- return request .user .has_perm (self .permission , project )
239
- return True
240
-
241
-
242
- class CanCancelJob (permissions .BasePermission ):
243
- """Custom permission to check if the user can cancel a job."""
244
-
245
- permission = Project .Permissions .CANCEL_JOB
246
-
247
- def has_object_permission (self , request , view , obj ):
248
- if view .action == "cancel" :
249
- project = obj .get_project () if hasattr (obj , "get_project" ) else None
250
- return request .user .has_perm (self .permission , project )
251
- return True
252
-
253
-
254
- class CanPopulateSourceImageCollection (permissions .BasePermission ):
255
- """Custom permission to check if the user can populate a collection."""
256
-
257
- permission = Project .Permissions .POPULATE_COLLECTION
86
+ return True # Always allow — object-level handles actual checks
258
87
259
- def has_object_permission (self , request , view , obj ):
260
- if view .action == "populate" :
261
- project = obj .get_project () if hasattr (obj , "get_project" ) else None
262
- return request .user .has_perm (self .permission , project )
263
- return True
88
+ def has_object_permission (self , request , view , obj : BaseModel ):
89
+ return obj .check_permission (request .user , view .action )
0 commit comments