Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 74 additions & 8 deletions samtranslator/model/sam_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ def resources_to_link(self, resources: Dict[str, Any]) -> Dict[str, Any]:
raise InvalidResourceException(self.logical_id, e.message) from e

@cw_timer
def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: PLR0915
def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: PLR0915, PLR0912
"""Returns the Lambda function, role, and event resources to which this SAM Function corresponds.

:param dict kwargs: already-converted resources that may need to be modified when converting this \
Expand Down Expand Up @@ -388,16 +388,30 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: P
managed_policy_map = kwargs.get("managed_policy_map", {})
get_managed_policy_map = kwargs.get("get_managed_policy_map")

execution_role = None
execution_role = self._construct_role(
managed_policy_map,
event_invoke_policies,
intrinsics_resolver,
get_managed_policy_map,
)

if lambda_function.Role is None:
execution_role = self._construct_role(
managed_policy_map,
event_invoke_policies,
intrinsics_resolver,
get_managed_policy_map,
)
lambda_function.Role = execution_role.get_runtime_attr("arn")
resources.append(execution_role)
elif is_intrinsic_if(lambda_function.Role):
role_changes = self._make_lambda_role(lambda_function, intrinsics_resolver, execution_role)

if role_changes["should_append_role"]:
resources.append(execution_role)

if role_changes["lambda_role_value"] is not None:
lambda_function.Role = role_changes["lambda_role_value"]

if role_changes["execution_role_condition"] is not None:
execution_role.set_resource_attribute("Condition", role_changes["execution_role_condition"])
Comment on lines +404 to +411
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can change this a little bit so you return a role_changes["execution_role_resource"] that it's either execution_role or None. And you set the Condition attribute inside the function not here outside.

so then you have

if role_changes["execution_role_resource"] is not None:
    resources.append(role_changes["execution_role_resource"])

and that role comes ready to be added, you can change its resource_attribute("Condition, inside the function instead of here.


if role_changes["new_condition"] is not None:
conditions.update(role_changes["new_condition"])

try:
resources += self._generate_event_resources(
Expand All @@ -415,6 +429,58 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: P

return resources

def _make_lambda_role(
self,
lambda_function: LambdaFunction,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this implementation has some unnecessary change IMO, we can keep the if lambda_function.Role is None part as is, then _make_lambda_role can return the created role, so we don't need to pass in lambda_function in this function.

Copy link
Member

@roger-zhangg roger-zhangg Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One other concern is we are modifying values for lambda_function, execution_role, resources, conditions ( which are the function's input) inside this function which returns None, but all these values might have changed after running this function. This might make this part of the code harder to review in the future. If we need to change all these variable's value in place, I would perfer to implement this directly in to_cloudformation instead of this subfunction. If you really prefer to have this subfunction. Maybe we can return the new value instead of modifying them inplace. So future reviewers could easily understand these variables are modified in this subfunction.

intrinsics_resolver: IntrinsicsResolver,
execution_role: IAMRole,
) -> Dict[str, Any]:
"""
Analyzes lambda role requirements and returns the changes needed.

Returns:
Dict containing:
- 'should_append_role': bool - whether to append execution_role to resources
- 'lambda_role_value': Any - value to set for lambda_function.Role
- 'execution_role_condition': str|None - condition to set on execution_role
- 'new_condition': Dict|None - new condition to add to conditions dict
"""
lambda_role = lambda_function.Role
execution_role_arn = execution_role.get_runtime_attr("arn")

result: Dict[str, Any] = {
"should_append_role": False,
"lambda_role_value": None,
"execution_role_condition": None,
"new_condition": None,
}

# We need to create and if else condition here
role_resolved_value = intrinsics_resolver.resolve_parameter_refs(lambda_role)
role_condition, role_if, role_else = role_resolved_value.get("Fn::If")

if is_intrinsic_no_value(role_if) and is_intrinsic_no_value(role_else):
result["lambda_role_value"] = execution_role_arn
result["should_append_role"] = True

# first value is none so we should create condition ? create : [2]
# create a condition for IAM role to only create on if case
elif is_intrinsic_no_value(role_if):
result["lambda_role_value"] = make_conditional(role_condition, execution_role_arn, role_else)
result["execution_role_condition"] = f"{role_condition}"
result["should_append_role"] = True

# second value is none so we should create condition ? [1] : create
# create a condition for IAM role to only create on else case
# with top level condition that negates the condition passed
elif is_intrinsic_no_value(role_else):
result["lambda_role_value"] = make_conditional(role_condition, role_if, execution_role_arn)
result["execution_role_condition"] = f"NOT{role_condition}"
result["new_condition"] = {f"NOT{role_condition}": make_not_conditional(role_condition)}
result["should_append_role"] = True

return result

def _construct_event_invoke_config( # noqa: PLR0913
self,
function_name: str,
Expand Down
107 changes: 107 additions & 0 deletions tests/model/test_sam_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,113 @@ def test_function_datasource_set_with_none():
assert none_datasource


class TestSamFunctionRoleResolver(TestCase):
"""
Tests for resolving IAM role property values in SamFunction
"""

def setUp(self):
self.function = SamFunction("foo")
self.function.CodeUri = "s3://foobar/foo.zip"
self.function.Runtime = "foo"
self.function.Handler = "bar"
self.kwargs = {
"intrinsics_resolver": IntrinsicsResolver({}),
"event_resources": [],
"managed_policy_map": {},
"resource_resolver": ResourceResolver({}),
"conditions": {"Conditions": {}},
}

def test_role_none_creates_execution_role(self):
self.function.Role = None
cfn_resources = self.function.to_cloudformation(**self.kwargs)
generated_roles = [x for x in cfn_resources if isinstance(x, IAMRole)]

self.assertEqual(len(generated_roles), 1) # Should create execution role

def test_role_explicit_arn_no_execution_role(self):
test_role = "arn:aws:iam::123456789012:role/existing-role"
self.function.Role = test_role

cfn_resources = self.function.to_cloudformation(**self.kwargs)
generated_roles = [x for x in cfn_resources if isinstance(x, IAMRole)]
lambda_function = next(r for r in cfn_resources if r.resource_type == "AWS::Lambda::Function")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can do the same than for the roles with the functions: generated_functions = [x for x in cfnResources if isinstance(x, LambdaFunction)]
and then you can also confirm that there's only one created.

But it's not a big deal. We can probably change it in the future if needed.


self.assertEqual(len(generated_roles), 0) # Should not create execution role
self.assertEqual(lambda_function.Role, test_role)

def test_role_fn_if_no_aws_no_value_keeps_original(self):
role_conditional = {
"Fn::If": ["Condition", "arn:aws:iam::123456789012:role/existing-role", {"Ref": "iamRoleArn"}]
}
self.function.Role = role_conditional

kwargs = dict(self.kwargs)
kwargs["conditions"] = {"Condition": True}

cfn_resources = self.function.to_cloudformation(**self.kwargs)
generated_roles = [x for x in cfn_resources if isinstance(x, IAMRole)]
lambda_function = next(r for r in cfn_resources if r.resource_type == "AWS::Lambda::Function")

# Should not create a role if a role is passed in for both cases
self.assertEqual(len(generated_roles), 0)
self.assertEqual(lambda_function.Role, role_conditional)

def test_role_fn_if_both_no_value_creates_execution_role(self):
role_conditional = {"Fn::If": ["Condition", {"Ref": "AWS::NoValue"}, {"Ref": "AWS::NoValue"}]}
self.function.Role = role_conditional

kwargs = dict(self.kwargs)
kwargs["conditions"] = {"Condition": True}

cfn_resources = self.function.to_cloudformation(**self.kwargs)
generated_roles = [x for x in cfn_resources if isinstance(x, IAMRole)]

self.assertEqual(len(generated_roles), 1)

def test_role_fn_if_first_no_value_creates_conditional_role(self):
role_conditional = {"Fn::If": ["Condition", {"Ref": "AWS::NoValue"}, {"Ref": "iamRoleArn"}]}
self.function.Role = role_conditional

kwargs = dict(self.kwargs)
kwargs["conditions"] = {"Condition": True}

cfn_resources = self.function.to_cloudformation(**self.kwargs)
generated_roles = [x for x in cfn_resources if isinstance(x, IAMRole)]
lambda_function = next(r for r in cfn_resources if r.resource_type == "AWS::Lambda::Function")

self.assertEqual(len(generated_roles), 1)
self.assertEqual(
lambda_function.Role, {"Fn::If": ["Condition", {"Fn::GetAtt": ["fooRole", "Arn"]}, {"Ref": "iamRoleArn"}]}
)

def test_role_fn_if_second_no_value_creates_conditional_role(self):
role_conditional = {"Fn::If": ["Condition", {"Ref": "iamRoleArn"}, {"Ref": "AWS::NoValue"}]}
self.function.Role = role_conditional

kwargs = dict(self.kwargs)
kwargs["conditions"] = {"Condition": True}

cfn_resources = self.function.to_cloudformation(**self.kwargs)
generated_roles = [x for x in cfn_resources if isinstance(x, IAMRole)]
lambda_function = next(r for r in cfn_resources if r.resource_type == "AWS::Lambda::Function")

self.assertEqual(len(generated_roles), 1)
self.assertEqual(
lambda_function.Role, {"Fn::If": ["Condition", {"Ref": "iamRoleArn"}, {"Fn::GetAtt": ["fooRole", "Arn"]}]}
)

def test_role_get_att_no_execution_role(self):
role_get_att = {"Fn::GetAtt": ["MyCustomRole", "Arn"]}
self.function.Role = role_get_att

cfn_resources = self.function.to_cloudformation(**self.kwargs)
lambda_function = next(r for r in cfn_resources if r.resource_type == "AWS::Lambda::Function")

self.assertEqual(lambda_function.Role, role_get_att)


class TestSamCapacityProvider(TestCase):
"""Tests for SamCapacityProvider"""

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Resources:
MinimalFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: s3://sam-demo-bucket/hello.zip
Handler: hello.handler
Runtime: python3.10
Role: 2
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
Parameters:
iamRoleArn:
Type: String
Description: The ARN of an IAM role to use as this function's execution role.
If a role isn't specified, one is created for you with a logical ID of <function-logical-id>Role.

Conditions:
CreateRole: !Not [!Equals ['', !Ref iamRoleArn]]

Resources:
MinimalFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: s3://sam-demo-bucket/hello.zip
Handler: hello.handler
Runtime: python3.10
Role: !If
- CreateRole
- !Ref "AWS::NoValue"
- !Ref "iamRoleArn"
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
Parameters:
iamRoleArn:
Type: String
Description: The ARN of an IAM role to use as this function's execution role.
If a role isn't specified, one is created for you with a logical ID of <function-logical-id>Role.

Conditions:
RoleExists: !Not [!Equals ['', !Ref iamRoleArn]]

Resources:
MinimalFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: s3://sam-demo-bucket/hello.zip
Handler: hello.handler
Runtime: python3.10
Role: !If
- RoleExists
- !Ref "iamRoleArn"
- !Ref "AWS::NoValue"
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"_autoGeneratedBreakdownErrorMessage": [
"Invalid Serverless Application Specification document. ",
"Number of errors found: 1. ",
"Resource with id [MinimalFunction] is invalid. ",
"Property 'Role' should be a string."
],
"errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [MinimalFunction] is invalid. Property 'Role' should be a string.",
"errors": [
{
"errorMessage": "Resource with id [MinimalFunction] is invalid. Property 'Role' should be a string."
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
{
"Conditions": {
"CreateRole": {
"Fn::Not": [
{
"Fn::Equals": [
"",
{
"Ref": "iamRoleArn"
}
]
}
]
}
},
"Parameters": {
"iamRoleArn": {
"Description": "The ARN of an IAM role to use as this function's execution role. If a role isn't specified, one is created for you with a logical ID of <function-logical-id>Role.",
"Type": "String"
}
},
"Resources": {
"MinimalFunction": {
"Properties": {
"Code": {
"S3Bucket": "sam-demo-bucket",
"S3Key": "hello.zip"
},
"Handler": "hello.handler",
"Role": {
"Fn::If": [
"CreateRole",
{
"Fn::GetAtt": [
"MinimalFunctionRole",
"Arn"
]
},
{
"Ref": "iamRoleArn"
}
]
},
"Runtime": "python3.10",
"Tags": [
{
"Key": "lambda:createdBy",
"Value": "SAM"
}
]
},
"Type": "AWS::Lambda::Function"
},
"MinimalFunctionRole": {
"Condition": "CreateRole",
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": [
"lambda.amazonaws.com"
]
}
}
],
"Version": "2012-10-17"
},
"ManagedPolicyArns": [
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
],
"Tags": [
{
"Key": "lambda:createdBy",
"Value": "SAM"
}
]
},
"Type": "AWS::IAM::Role"
}
}
}
Loading
Loading