-
Notifications
You must be signed in to change notification settings - Fork 2.5k
feat: support conditional AWS::NoValue in SAM Function's IAM Role #3842
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
ef9cd22
6a37502
79e8680
b01c455
cf96ad9
83e1f03
615f202
026679c
31cec02
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -293,7 +293,7 @@ def resources_to_link(self, resources: Dict[str, Any]) -> Dict[str, Any]: | |
| raise InvalidResourceException(self.logical_id, e.message) from e | ||
|
|
||
| @cw_timer | ||
| def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: PLR0915 | ||
| def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: PLR0915, PLR0912 | ||
| """Returns the Lambda function, role, and event resources to which this SAM Function corresponds. | ||
|
|
||
| :param dict kwargs: already-converted resources that may need to be modified when converting this \ | ||
|
|
@@ -388,16 +388,30 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: P | |
| managed_policy_map = kwargs.get("managed_policy_map", {}) | ||
| get_managed_policy_map = kwargs.get("get_managed_policy_map") | ||
|
|
||
| execution_role = None | ||
| execution_role = self._construct_role( | ||
| managed_policy_map, | ||
| event_invoke_policies, | ||
| intrinsics_resolver, | ||
| get_managed_policy_map, | ||
| ) | ||
|
|
||
| if lambda_function.Role is None: | ||
| execution_role = self._construct_role( | ||
| managed_policy_map, | ||
| event_invoke_policies, | ||
| intrinsics_resolver, | ||
| get_managed_policy_map, | ||
| ) | ||
| lambda_function.Role = execution_role.get_runtime_attr("arn") | ||
| resources.append(execution_role) | ||
| elif is_intrinsic_if(lambda_function.Role): | ||
| role_changes = self._make_lambda_role(lambda_function, intrinsics_resolver, execution_role) | ||
|
|
||
| if role_changes["should_append_role"]: | ||
| resources.append(execution_role) | ||
|
|
||
| if role_changes["lambda_role_value"] is not None: | ||
| lambda_function.Role = role_changes["lambda_role_value"] | ||
|
|
||
| if role_changes["execution_role_condition"] is not None: | ||
| execution_role.set_resource_attribute("Condition", role_changes["execution_role_condition"]) | ||
|
|
||
| if role_changes["new_condition"] is not None: | ||
| conditions.update(role_changes["new_condition"]) | ||
|
|
||
| try: | ||
| resources += self._generate_event_resources( | ||
|
|
@@ -415,6 +429,58 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: P | |
|
|
||
| return resources | ||
|
|
||
| def _make_lambda_role( | ||
| self, | ||
| lambda_function: LambdaFunction, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this implementation has some unnecessary change IMO, we can keep the
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One other concern is we are modifying values for lambda_function, execution_role, resources, conditions ( which are the function's input) inside this function which returns None, but all these values might have changed after running this function. This might make this part of the code harder to review in the future. If we need to change all these variable's value in place, I would perfer to implement this directly in |
||
| intrinsics_resolver: IntrinsicsResolver, | ||
| execution_role: IAMRole, | ||
| ) -> Dict[str, Any]: | ||
| """ | ||
| Analyzes lambda role requirements and returns the changes needed. | ||
|
|
||
| Returns: | ||
| Dict containing: | ||
| - 'should_append_role': bool - whether to append execution_role to resources | ||
| - 'lambda_role_value': Any - value to set for lambda_function.Role | ||
| - 'execution_role_condition': str|None - condition to set on execution_role | ||
| - 'new_condition': Dict|None - new condition to add to conditions dict | ||
| """ | ||
| lambda_role = lambda_function.Role | ||
| execution_role_arn = execution_role.get_runtime_attr("arn") | ||
|
|
||
| result: Dict[str, Any] = { | ||
| "should_append_role": False, | ||
| "lambda_role_value": None, | ||
| "execution_role_condition": None, | ||
| "new_condition": None, | ||
| } | ||
|
|
||
| # We need to create and if else condition here | ||
| role_resolved_value = intrinsics_resolver.resolve_parameter_refs(lambda_role) | ||
| role_condition, role_if, role_else = role_resolved_value.get("Fn::If") | ||
madsid marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| if is_intrinsic_no_value(role_if) and is_intrinsic_no_value(role_else): | ||
| result["lambda_role_value"] = execution_role_arn | ||
| result["should_append_role"] = True | ||
|
|
||
| # first value is none so we should create condition ? create : [2] | ||
| # create a condition for IAM role to only create on if case | ||
| elif is_intrinsic_no_value(role_if): | ||
| result["lambda_role_value"] = make_conditional(role_condition, execution_role_arn, role_else) | ||
| result["execution_role_condition"] = f"{role_condition}" | ||
| result["should_append_role"] = True | ||
|
|
||
| # second value is none so we should create condition ? [1] : create | ||
| # create a condition for IAM role to only create on else case | ||
| # with top level condition that negates the condition passed | ||
| elif is_intrinsic_no_value(role_else): | ||
| result["lambda_role_value"] = make_conditional(role_condition, role_if, execution_role_arn) | ||
| result["execution_role_condition"] = f"NOT{role_condition}" | ||
| result["new_condition"] = {f"NOT{role_condition}": make_not_conditional(role_condition)} | ||
| result["should_append_role"] = True | ||
|
|
||
| return result | ||
|
|
||
| def _construct_event_invoke_config( # noqa: PLR0913 | ||
| self, | ||
| function_name: str, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -741,6 +741,113 @@ def test_function_datasource_set_with_none(): | |
| assert none_datasource | ||
|
|
||
|
|
||
| class TestSamFunctionRoleResolver(TestCase): | ||
| """ | ||
| Tests for resolving IAM role property values in SamFunction | ||
| """ | ||
|
|
||
| def setUp(self): | ||
| self.function = SamFunction("foo") | ||
| self.function.CodeUri = "s3://foobar/foo.zip" | ||
| self.function.Runtime = "foo" | ||
| self.function.Handler = "bar" | ||
| self.kwargs = { | ||
| "intrinsics_resolver": IntrinsicsResolver({}), | ||
| "event_resources": [], | ||
| "managed_policy_map": {}, | ||
| "resource_resolver": ResourceResolver({}), | ||
| "conditions": {"Conditions": {}}, | ||
| } | ||
|
|
||
| def test_role_none_creates_execution_role(self): | ||
| self.function.Role = None | ||
| cfn_resources = self.function.to_cloudformation(**self.kwargs) | ||
| generated_roles = [x for x in cfn_resources if isinstance(x, IAMRole)] | ||
|
|
||
| self.assertEqual(len(generated_roles), 1) # Should create execution role | ||
|
|
||
| def test_role_explicit_arn_no_execution_role(self): | ||
| test_role = "arn:aws:iam::123456789012:role/existing-role" | ||
| self.function.Role = test_role | ||
|
|
||
| cfn_resources = self.function.to_cloudformation(**self.kwargs) | ||
| generated_roles = [x for x in cfn_resources if isinstance(x, IAMRole)] | ||
| lambda_function = next(r for r in cfn_resources if r.resource_type == "AWS::Lambda::Function") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can do the same than for the roles with the functions: But it's not a big deal. We can probably change it in the future if needed. |
||
|
|
||
| self.assertEqual(len(generated_roles), 0) # Should not create execution role | ||
| self.assertEqual(lambda_function.Role, test_role) | ||
|
|
||
| def test_role_fn_if_no_aws_no_value_keeps_original(self): | ||
| role_conditional = { | ||
| "Fn::If": ["Condition", "arn:aws:iam::123456789012:role/existing-role", {"Ref": "iamRoleArn"}] | ||
| } | ||
| self.function.Role = role_conditional | ||
|
|
||
| kwargs = dict(self.kwargs) | ||
| kwargs["conditions"] = {"Condition": True} | ||
|
|
||
| cfn_resources = self.function.to_cloudformation(**self.kwargs) | ||
| generated_roles = [x for x in cfn_resources if isinstance(x, IAMRole)] | ||
| lambda_function = next(r for r in cfn_resources if r.resource_type == "AWS::Lambda::Function") | ||
|
|
||
| # Should not create a role if a role is passed in for both cases | ||
| self.assertEqual(len(generated_roles), 0) | ||
| self.assertEqual(lambda_function.Role, role_conditional) | ||
|
|
||
| def test_role_fn_if_both_no_value_creates_execution_role(self): | ||
| role_conditional = {"Fn::If": ["Condition", {"Ref": "AWS::NoValue"}, {"Ref": "AWS::NoValue"}]} | ||
| self.function.Role = role_conditional | ||
|
|
||
| kwargs = dict(self.kwargs) | ||
| kwargs["conditions"] = {"Condition": True} | ||
|
|
||
| cfn_resources = self.function.to_cloudformation(**self.kwargs) | ||
| generated_roles = [x for x in cfn_resources if isinstance(x, IAMRole)] | ||
|
|
||
| self.assertEqual(len(generated_roles), 1) | ||
|
|
||
| def test_role_fn_if_first_no_value_creates_conditional_role(self): | ||
| role_conditional = {"Fn::If": ["Condition", {"Ref": "AWS::NoValue"}, {"Ref": "iamRoleArn"}]} | ||
| self.function.Role = role_conditional | ||
|
|
||
| kwargs = dict(self.kwargs) | ||
| kwargs["conditions"] = {"Condition": True} | ||
|
|
||
| cfn_resources = self.function.to_cloudformation(**self.kwargs) | ||
| generated_roles = [x for x in cfn_resources if isinstance(x, IAMRole)] | ||
| lambda_function = next(r for r in cfn_resources if r.resource_type == "AWS::Lambda::Function") | ||
|
|
||
| self.assertEqual(len(generated_roles), 1) | ||
| self.assertEqual( | ||
| lambda_function.Role, {"Fn::If": ["Condition", {"Fn::GetAtt": ["fooRole", "Arn"]}, {"Ref": "iamRoleArn"}]} | ||
| ) | ||
|
|
||
| def test_role_fn_if_second_no_value_creates_conditional_role(self): | ||
| role_conditional = {"Fn::If": ["Condition", {"Ref": "iamRoleArn"}, {"Ref": "AWS::NoValue"}]} | ||
| self.function.Role = role_conditional | ||
|
|
||
| kwargs = dict(self.kwargs) | ||
| kwargs["conditions"] = {"Condition": True} | ||
|
|
||
| cfn_resources = self.function.to_cloudformation(**self.kwargs) | ||
| generated_roles = [x for x in cfn_resources if isinstance(x, IAMRole)] | ||
| lambda_function = next(r for r in cfn_resources if r.resource_type == "AWS::Lambda::Function") | ||
|
|
||
| self.assertEqual(len(generated_roles), 1) | ||
| self.assertEqual( | ||
| lambda_function.Role, {"Fn::If": ["Condition", {"Ref": "iamRoleArn"}, {"Fn::GetAtt": ["fooRole", "Arn"]}]} | ||
| ) | ||
|
|
||
| def test_role_get_att_no_execution_role(self): | ||
| role_get_att = {"Fn::GetAtt": ["MyCustomRole", "Arn"]} | ||
| self.function.Role = role_get_att | ||
|
|
||
| cfn_resources = self.function.to_cloudformation(**self.kwargs) | ||
| lambda_function = next(r for r in cfn_resources if r.resource_type == "AWS::Lambda::Function") | ||
|
|
||
| self.assertEqual(lambda_function.Role, role_get_att) | ||
|
|
||
|
|
||
| class TestSamCapacityProvider(TestCase): | ||
| """Tests for SamCapacityProvider""" | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| Resources: | ||
| MinimalFunction: | ||
| Type: AWS::Serverless::Function | ||
| Properties: | ||
| CodeUri: s3://sam-demo-bucket/hello.zip | ||
| Handler: hello.handler | ||
| Runtime: python3.10 | ||
| Role: 2 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| Parameters: | ||
| iamRoleArn: | ||
| Type: String | ||
| Description: The ARN of an IAM role to use as this function's execution role. | ||
| If a role isn't specified, one is created for you with a logical ID of <function-logical-id>Role. | ||
|
|
||
| Conditions: | ||
| CreateRole: !Not [!Equals ['', !Ref iamRoleArn]] | ||
|
|
||
| Resources: | ||
| MinimalFunction: | ||
| Type: AWS::Serverless::Function | ||
| Properties: | ||
| CodeUri: s3://sam-demo-bucket/hello.zip | ||
| Handler: hello.handler | ||
| Runtime: python3.10 | ||
| Role: !If | ||
| - CreateRole | ||
| - !Ref "AWS::NoValue" | ||
| - !Ref "iamRoleArn" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| Parameters: | ||
| iamRoleArn: | ||
| Type: String | ||
| Description: The ARN of an IAM role to use as this function's execution role. | ||
| If a role isn't specified, one is created for you with a logical ID of <function-logical-id>Role. | ||
|
|
||
| Conditions: | ||
| RoleExists: !Not [!Equals ['', !Ref iamRoleArn]] | ||
|
|
||
| Resources: | ||
| MinimalFunction: | ||
| Type: AWS::Serverless::Function | ||
| Properties: | ||
| CodeUri: s3://sam-demo-bucket/hello.zip | ||
| Handler: hello.handler | ||
| Runtime: python3.10 | ||
| Role: !If | ||
| - RoleExists | ||
| - !Ref "iamRoleArn" | ||
| - !Ref "AWS::NoValue" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| { | ||
| "_autoGeneratedBreakdownErrorMessage": [ | ||
| "Invalid Serverless Application Specification document. ", | ||
| "Number of errors found: 1. ", | ||
| "Resource with id [MinimalFunction] is invalid. ", | ||
| "Property 'Role' should be a string." | ||
| ], | ||
| "errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [MinimalFunction] is invalid. Property 'Role' should be a string.", | ||
| "errors": [ | ||
| { | ||
| "errorMessage": "Resource with id [MinimalFunction] is invalid. Property 'Role' should be a string." | ||
| } | ||
| ] | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,86 @@ | ||
| { | ||
| "Conditions": { | ||
| "CreateRole": { | ||
| "Fn::Not": [ | ||
| { | ||
| "Fn::Equals": [ | ||
| "", | ||
| { | ||
| "Ref": "iamRoleArn" | ||
| } | ||
| ] | ||
| } | ||
| ] | ||
| } | ||
| }, | ||
| "Parameters": { | ||
| "iamRoleArn": { | ||
| "Description": "The ARN of an IAM role to use as this function's execution role. If a role isn't specified, one is created for you with a logical ID of <function-logical-id>Role.", | ||
| "Type": "String" | ||
| } | ||
| }, | ||
| "Resources": { | ||
| "MinimalFunction": { | ||
| "Properties": { | ||
| "Code": { | ||
| "S3Bucket": "sam-demo-bucket", | ||
| "S3Key": "hello.zip" | ||
| }, | ||
| "Handler": "hello.handler", | ||
| "Role": { | ||
| "Fn::If": [ | ||
| "CreateRole", | ||
| { | ||
| "Fn::GetAtt": [ | ||
| "MinimalFunctionRole", | ||
| "Arn" | ||
| ] | ||
| }, | ||
| { | ||
| "Ref": "iamRoleArn" | ||
| } | ||
| ] | ||
| }, | ||
| "Runtime": "python3.10", | ||
| "Tags": [ | ||
| { | ||
| "Key": "lambda:createdBy", | ||
| "Value": "SAM" | ||
| } | ||
| ] | ||
| }, | ||
| "Type": "AWS::Lambda::Function" | ||
| }, | ||
| "MinimalFunctionRole": { | ||
| "Condition": "CreateRole", | ||
| "Properties": { | ||
| "AssumeRolePolicyDocument": { | ||
| "Statement": [ | ||
| { | ||
| "Action": [ | ||
| "sts:AssumeRole" | ||
| ], | ||
| "Effect": "Allow", | ||
| "Principal": { | ||
| "Service": [ | ||
| "lambda.amazonaws.com" | ||
| ] | ||
| } | ||
| } | ||
| ], | ||
| "Version": "2012-10-17" | ||
| }, | ||
| "ManagedPolicyArns": [ | ||
| "arn:aws-cn:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" | ||
| ], | ||
| "Tags": [ | ||
| { | ||
| "Key": "lambda:createdBy", | ||
| "Value": "SAM" | ||
| } | ||
| ] | ||
| }, | ||
| "Type": "AWS::IAM::Role" | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can change this a little bit so you return a
role_changes["execution_role_resource"]that it's eitherexecution_roleorNone. And you set theConditionattribute inside the function not here outside.so then you have
and that role comes ready to be added, you can change its
resource_attribute("Condition,inside the function instead of here.