Skip to content

Commit 212515e

Browse files
Add feature: effective grant management for users
1 parent 26cf8d9 commit 212515e

File tree

2 files changed

+69
-3
lines changed

2 files changed

+69
-3
lines changed

soauth/database/user.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,33 @@ def remove_grant(self, grant: str):
8787

8888
self.grants = " ".join([x for x in self.grants.split(" ") if x != grant])
8989

90+
def get_effective_grants(self) -> set[str]:
91+
"""Get all grants from user + all groups they're members of."""
92+
all_grants = set()
93+
94+
# Add user's individual grants
95+
if self.grants:
96+
all_grants.update(self.grants.split())
97+
98+
# Add grants from all groups
99+
for group in self.groups:
100+
if group.grants:
101+
all_grants.update(group.grants.split())
102+
103+
return set(all_grants)
104+
105+
def has_effective_grant(self, grant: str) -> bool:
106+
"""Check if user has grant either individually or through groups."""
107+
return grant in self.get_effective_grants()
108+
90109
def to_core(self, include_groups=True) -> UserData:
110+
effective_grants = self.get_effective_grants()
91111
return UserData(
92112
user_id=self.user_id,
93113
user_name=self.user_name,
94114
full_name=self.full_name,
95115
email=self.email,
96-
grants=set(self.grants.split(" ")),
116+
grants=effective_grants,
97117
group_names=[x.group_name for x in self.groups] if include_groups else None,
98118
group_ids=[str(x.group_id) for x in self.groups]
99119
if include_groups

tests/test_service/test_user.py

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ async def test_create_user(server_settings, session_manager, logger):
3535
user = await user_service.read_by_id(user_id=USER_ID, conn=conn)
3636

3737
assert len(user.groups) > 0
38-
assert user.has_grant("test_grant")
38+
assert user.has_effective_grant("test_grant")
3939

4040
async with session_manager.session() as conn:
4141
async with conn.begin():
@@ -47,7 +47,7 @@ async def test_create_user(server_settings, session_manager, logger):
4747
async with conn.begin():
4848
user = await user_service.read_by_id(user_id=USER_ID, conn=conn)
4949

50-
assert not user.has_grant("test_grant")
50+
assert not user.has_effective_grant("test_grant")
5151

5252
async with session_manager.session() as conn:
5353
async with conn.begin():
@@ -64,6 +64,52 @@ async def test_create_user(server_settings, session_manager, logger):
6464
user = await user_service.read_by_name(user_name=USER_NAME, conn=conn)
6565

6666

67+
@pytest.mark.asyncio(loop_scope="session")
68+
async def test_user_effective_grants_with_group(server_settings, session_manager, logger):
69+
async with session_manager.session() as conn:
70+
async with conn.begin():
71+
# Create a new user
72+
user = await user_service.create(
73+
user_name="test_user",
74+
email="test_user@email.com",
75+
full_name="Test User Group Grants",
76+
grants="user_grant",
77+
conn=conn,
78+
log=logger,
79+
)
80+
USER_ID = user.user_id
81+
82+
# Create a new group with grants and add the user
83+
group = await group_service.create(
84+
group_name="test_group_with_grants",
85+
created_by_user_id=USER_ID,
86+
member_ids=[USER_ID],
87+
grants="group_grant another_group_grant",
88+
conn=conn,
89+
log=logger,
90+
)
91+
GROUP_ID = group.group_id
92+
93+
async with session_manager.session() as conn:
94+
async with conn.begin():
95+
# Read the user and check effective grants
96+
user = await user_service.read_by_id(user_id=USER_ID, conn=conn)
97+
effective_grants = user.get_effective_grants()
98+
99+
assert "user_grant" in effective_grants
100+
assert "group_grant" in effective_grants
101+
assert "another_group_grant" in effective_grants
102+
assert user.has_effective_grant("user_grant")
103+
assert user.has_effective_grant("group_grant")
104+
assert user.has_effective_grant("another_group_grant")
105+
assert not user.has_effective_grant("non_existent_grant")
106+
107+
async with session_manager.session() as conn:
108+
async with conn.begin():
109+
# Delete the group
110+
await group_service.delete_group(group_id=GROUP_ID, conn=conn, log=logger)
111+
112+
67113
@pytest.mark.asyncio(loop_scope="session")
68114
async def test_user_with_no_groups(server_settings, session_manager, logger):
69115
async with session_manager.session() as conn:

0 commit comments

Comments
 (0)