Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 55 additions & 16 deletions aws_lambda_script/app/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,29 +33,66 @@
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def get_user_attributes(args):
"""Get user attributes from Cognito token.

# Function to get the user email from the token
def get_user_email(args):
Args:
args (dict): Request arguments containing the Authorization header.

Returns:
dict: User attributes extracted from the token.

Raises:
Exception: If token verification fails.
"""
token = args["Authorization"]
try:
user_attributes = verify_cognito_token(token)
owner_email = user_attributes["email"]
if not owner_email:
logger.error("No email found in user attributes")
abort(401, description="Not authorized")
return owner_email
return user_attributes
except Exception as error:
logger.exception("Error verifying token: %s", error)
abort(401, description="Not authorized")

def get_user_information() -> dict:
"""A function to collect user information from a given Cognito Token.

Returns:
dict: A dictionary containing the user's email and groups.

Raises:
abort: If the email is not found in user attributes.
"""
user_attributes = get_user_attributes(parser.parse_args())

owner_email = user_attributes.get("email", "")
user_groups = user_attributes.get("cognito:groups", [])

if not owner_email:
logger.error("No email found in user attributes")
abort(401, description="Not authorized")

return {"email": owner_email, "groups": user_groups}

def is_auth_user_in_admin_group(user_groups: list) -> bool:
"""Check if the authenticated user is in the admin group.

Args:
user_groups (list): List of groups the user belongs to.

Returns:
bool: True if user is in admin group, False otherwise.
"""
return "Admin" in user_groups

# Route to return the user email from the token in authorization header
@ns.route("/user")
class User(Resource):
@ns.doc(responses={200: "Success", 401: "Authorization is required"})
def get(self):
owner_email = get_user_email(parser.parse_args())
return {"email": owner_email}, 200
user_info = get_user_information()
owner_email = user_info["email"]
user_groups = user_info["groups"]
return {"email": owner_email, "groups": user_groups}, 200


# Route to return all projects with optional filters
Expand Down Expand Up @@ -339,7 +376,6 @@ class Projects(Resource):
@ns.doc(responses={200: "Success", 401: "Authorization is required"})
# @ns.marshal_list_with(project_model)
def get(self):
owner_email = get_user_email(parser.parse_args())
data = read_data("new_project_data.json")
user_projects = [proj for proj in data["projects"]]
return user_projects, 200
Expand All @@ -358,7 +394,8 @@ def get(self):
},
)
def post(self):
owner_email = get_user_email(parser.parse_args())
user_info = get_user_information()
owner_email = user_info["email"]

# Check that required fields are present in the JSON payload
new_project = ns.payload
Expand Down Expand Up @@ -438,8 +475,6 @@ class ProjectDetail(Resource):
# the name and the user email in the first user item in the user list
@ns.marshal_with(project_model)
def get(self, project_name):
owner_email = get_user_email(parser.parse_args())

# Sanitize project_name by replacing '%20' with spaces
project_name = (
project_name.replace("%20", " ").replace("\r\n", "").replace("\n", "")
Expand Down Expand Up @@ -474,7 +509,10 @@ def get(self, project_name):
},
)
def put(self, project_name):
owner_email = get_user_email(parser.parse_args())
user_info = get_user_information()
owner_email = user_info["email"]
user_groups = user_info["groups"]

project_name = (
project_name.replace("%20", " ").replace("\r\n", "").replace("\n", "")
)
Expand All @@ -494,17 +532,18 @@ def put(self, project_name):
if key not in environments or not isinstance(environments[key], bool):
abort(400, description=f"Invalid environments data: '{key}' must be a boolean")


# Ensure the email is set to owner_email

data = read_data("new_project_data.json")

logger.info(f"Authenticated user is in admin group: {is_auth_user_in_admin_group(user_groups)}")

project = next(
(
proj
for proj in data["projects"]
if proj["details"][0]["name"] == project_name
and any(user["email"] == owner_email for user in proj["user"])
and (any(user["email"] == owner_email for user in proj["user"]) or is_auth_user_in_admin_group(user_groups))
),
None,
)
Expand Down
8 changes: 8 additions & 0 deletions terraform/api_gateway/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,14 @@ resource "aws_api_gateway_deployment" "main" {
lifecycle {
create_before_destroy = true
}

// This block will cause a redeployment of the API Gateway whenever the main.tf file changes
// Since the API routes are defined in this file, any changes to the routes will trigger a redeployment
triggers = {
redeployment = sha1(jsonencode([
file("main.tf")
]))
}
}

# API Gateway Stage
Expand Down
1 change: 1 addition & 0 deletions terraform/authentication/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ module "cognito" {
token_validity_values = var.token_validity_values
token_validity_units = var.token_validity_units
callback_urls = var.callback_urls
user_groups = {"Admin": "Admin User Group. Adds an admin user group to the user pool so users can edit all projects."}
}
2 changes: 1 addition & 1 deletion terraform/lambda/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ resource "aws_lambda_function" "tech_audit_lambda" {
variables = {
TECH_AUDIT_DATA_BUCKET = data.terraform_remote_state.storage.outputs.tech_audit_data_bucket_name
TECH_AUDIT_SECRET_MANAGER = data.terraform_remote_state.secrets.outputs.secret_name
AWS_COGNITO_TOKEN_URL = "https://${var.service_subdomain}-${var.domain}.auth.eu-west-2.amazoncognito.com/oauth2/token"
AWS_COGNITO_TOKEN_URL = "https://${var.domain}-${var.service_subdomain}.auth.eu-west-2.amazoncognito.com/oauth2/token"
}
}

Expand Down