From b8ccb24ee083262503ac3e80e1ceaf2b6e126802 Mon Sep 17 00:00:00 2001 From: Amit Phulera Date: Wed, 4 Mar 2026 22:32:24 +0530 Subject: [PATCH 01/13] start with creating a migration config to be used for migration s3 between accounts --- hq_s3_migration/config.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 hq_s3_migration/config.py diff --git a/hq_s3_migration/config.py b/hq_s3_migration/config.py new file mode 100644 index 0000000000..eef14481a0 --- /dev/null +++ b/hq_s3_migration/config.py @@ -0,0 +1,26 @@ +from dataclasses import dataclass + +@dataclass +class MigrationConfig: + """Configuration for S3 cross-account migration.""" + source_profile: str + dest_profile: str + source_bucket: str + dest_bucket: str + source_account_id: str + dest_account_id: str + region: str + replication_role_name: str + datasync_role_name: str + enable_rtc: bool = True # S3 Replication Time Control (15-min SLA) + enable_delete_replication: bool = True + + +ACCOUNT_IDS = { + 'production': '051428382917', + 'staging': '737236193635', + 'backup-production': '213307118311', + 'dimagi':'437781348816' +} + +ACCOUNT_NAMES = {v: k for k, v in ACCOUNT_IDS.items()} From 25216ed9986ce681cf0a5d728bc6d5aaf70aa740 Mon Sep 17 00:00:00 2001 From: Amit Phulera Date: Wed, 4 Mar 2026 22:33:28 +0530 Subject: [PATCH 02/13] orchestrator to call aws services for both source and destination accounts --- hq_s3_migration/orchestrator.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 hq_s3_migration/orchestrator.py diff --git a/hq_s3_migration/orchestrator.py b/hq_s3_migration/orchestrator.py new file mode 100644 index 0000000000..729dc5f6b5 --- /dev/null +++ b/hq_s3_migration/orchestrator.py @@ -0,0 +1,29 @@ +import boto3 + +from .config import MigrationConfig + + +class S3MigrationContext: + """Thin container for boto3 sessions and clients used across migration phases.""" + + def __init__(self, config: MigrationConfig): + self.config = config + + # Initialize boto3 sessions + self.source_session = boto3.Session( + profile_name=config.source_profile, + region_name=config.region + ) + self.dest_session = boto3.Session( + profile_name=config.dest_profile, + region_name=config.region + ) + + # Initialize clients + self.source_s3 = self.source_session.client('s3') + self.dest_s3 = self.dest_session.client('s3') + self.source_iam = self.source_session.client('iam') + self.dest_iam = self.dest_session.client('iam') + self.source_datasync = self.source_session.client('datasync') + self.source_sts = self.source_session.client('sts') + self.dest_sts = self.dest_session.client('sts') From 80dc5376fd44006969f13950c33793efcdedaa7e Mon Sep 17 00:00:00 2001 From: Amit Phulera Date: Wed, 4 Mar 2026 22:34:06 +0530 Subject: [PATCH 03/13] put __init__ --- hq_s3_migration/__init__.py | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 hq_s3_migration/__init__.py diff --git a/hq_s3_migration/__init__.py b/hq_s3_migration/__init__.py new file mode 100644 index 0000000000..e11d478ddf --- /dev/null +++ b/hq_s3_migration/__init__.py @@ -0,0 +1,5 @@ +"""S3 Cross-Account Migration Tool — hybrid DataSync + Live Replication.""" + +from .orchestrator import S3MigrationContext + +__all__ = ["S3MigrationContext"] From cbc5fca127b998589ab53b1ce07fd6029d2cb904 Mon Sep 17 00:00:00 2001 From: Amit Phulera Date: Wed, 4 Mar 2026 22:36:01 +0530 Subject: [PATCH 04/13] add different roles and policies that needs to be added to the source and destination envs --- hq_s3_migration/policies/__init__.py | 20 ++++++++ .../policies/datasync_bucket_stmt.json | 25 ++++++++++ hq_s3_migration/policies/datasync_role.json | 46 +++++++++++++++++++ hq_s3_migration/policies/datasync_trust.json | 16 +++++++ .../policies/destination_bucket.json | 33 +++++++++++++ .../policies/replication_role.json | 43 +++++++++++++++++ .../policies/replication_trust.json | 12 +++++ 7 files changed, 195 insertions(+) create mode 100644 hq_s3_migration/policies/__init__.py create mode 100644 hq_s3_migration/policies/datasync_bucket_stmt.json create mode 100644 hq_s3_migration/policies/datasync_role.json create mode 100644 hq_s3_migration/policies/datasync_trust.json create mode 100644 hq_s3_migration/policies/destination_bucket.json create mode 100644 hq_s3_migration/policies/replication_role.json create mode 100644 hq_s3_migration/policies/replication_trust.json diff --git a/hq_s3_migration/policies/__init__.py b/hq_s3_migration/policies/__init__.py new file mode 100644 index 0000000000..993e0444d0 --- /dev/null +++ b/hq_s3_migration/policies/__init__.py @@ -0,0 +1,20 @@ +import json +from pathlib import Path + +_POLICIES_DIR = Path(__file__).parent + + +def render_policy(template_name: str, **kwargs) -> dict: + """Load a JSON policy template and substitute {placeholder} variables. + + Args: + template_name: Filename (e.g. "replication_role.json") inside policies/. + **kwargs: Placeholder values to substitute. + + Returns: + Parsed dict with all placeholders replaced. + """ + text = (_POLICIES_DIR / template_name).read_text() + for key, value in kwargs.items(): + text = text.replace(f"{{{key}}}", value) + return json.loads(text) diff --git a/hq_s3_migration/policies/datasync_bucket_stmt.json b/hq_s3_migration/policies/datasync_bucket_stmt.json new file mode 100644 index 0000000000..9ac8f8282e --- /dev/null +++ b/hq_s3_migration/policies/datasync_bucket_stmt.json @@ -0,0 +1,25 @@ +{ + "Sid": "AllowDataSyncAccess", + "Effect": "Allow", + "Principal": { + "AWS": "{datasync_role_arn}" + }, + "Action": [ + "s3:GetBucketLocation", + "s3:ListBucket", + "s3:ListBucketMultipartUploads", + "s3:GetObject", + "s3:GetObjectTagging", + "s3:GetObjectVersion", + "s3:GetObjectVersionTagging", + "s3:PutObject", + "s3:PutObjectTagging", + "s3:DeleteObject", + "s3:AbortMultipartUpload", + "s3:ListMultipartUploadParts" + ], + "Resource": [ + "arn:aws:s3:::{dest_bucket}", + "arn:aws:s3:::{dest_bucket}/*" + ] +} diff --git a/hq_s3_migration/policies/datasync_role.json b/hq_s3_migration/policies/datasync_role.json new file mode 100644 index 0000000000..0a0e1b6cb9 --- /dev/null +++ b/hq_s3_migration/policies/datasync_role.json @@ -0,0 +1,46 @@ +{ + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "ReadSource", + "Effect": "Allow", + "Action": [ + "s3:GetBucketLocation", + "s3:ListBucket", + "s3:ListBucketMultipartUploads", + "s3:GetObject", + "s3:GetObjectTagging", + "s3:GetObjectVersion", + "s3:GetObjectVersionTagging", + "s3:ListMultipartUploadParts", + "s3:AbortMultipartUpload" + ], + "Resource": [ + "arn:aws:s3:::{source_bucket}", + "arn:aws:s3:::{source_bucket}/*" + ] + }, + { + "Sid": "WriteDestination", + "Effect": "Allow", + "Action": [ + "s3:GetBucketLocation", + "s3:ListBucket", + "s3:ListBucketMultipartUploads", + "s3:GetObject", + "s3:GetObjectTagging", + "s3:GetObjectVersion", + "s3:GetObjectVersionTagging", + "s3:PutObject", + "s3:PutObjectTagging", + "s3:DeleteObject", + "s3:ListMultipartUploadParts", + "s3:AbortMultipartUpload" + ], + "Resource": [ + "arn:aws:s3:::{dest_bucket}", + "arn:aws:s3:::{dest_bucket}/*" + ] + } + ] +} diff --git a/hq_s3_migration/policies/datasync_trust.json b/hq_s3_migration/policies/datasync_trust.json new file mode 100644 index 0000000000..4594a9368d --- /dev/null +++ b/hq_s3_migration/policies/datasync_trust.json @@ -0,0 +1,16 @@ +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "Service": [ + "datasync.amazonaws.com" + ] + }, + "Action": [ + "sts:AssumeRole" + ] + } + ] +} diff --git a/hq_s3_migration/policies/destination_bucket.json b/hq_s3_migration/policies/destination_bucket.json new file mode 100644 index 0000000000..dfc7649cb2 --- /dev/null +++ b/hq_s3_migration/policies/destination_bucket.json @@ -0,0 +1,33 @@ +{ + "Version": "2012-10-17", + "Id": "PolicyForCrossAccountReplication", + "Statement": [ + { + "Sid": "AllowReplicationFromSource", + "Effect": "Allow", + "Principal": { + "AWS": "{role_arn}" + }, + "Action": [ + "s3:List*", + "s3:GetBucketVersioning", + "s3:PutBucketVersioning", + "s3:ReplicateDelete", + "s3:ReplicateObject" + ], + "Resource": [ + "arn:aws:s3:::{dest_bucket}", + "arn:aws:s3:::{dest_bucket}/*" + ] + }, + { + "Sid": "AllowBucketOwnerOverride", + "Effect": "Allow", + "Principal": { + "AWS": "{role_arn}" + }, + "Action": "s3:ObjectOwnerOverrideToBucketOwner", + "Resource": "arn:aws:s3:::{dest_bucket}/*" + } + ] +} diff --git a/hq_s3_migration/policies/replication_role.json b/hq_s3_migration/policies/replication_role.json new file mode 100644 index 0000000000..3054dcb225 --- /dev/null +++ b/hq_s3_migration/policies/replication_role.json @@ -0,0 +1,43 @@ +{ + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "GetSourceBucketConfiguration", + "Effect": "Allow", + "Action": [ + "s3:InitiateReplication", + "s3:ListBucket", + "s3:GetBucketLocation", + "s3:GetBucketAcl", + "s3:GetReplicationConfiguration", + "s3:GetObjectVersionForReplication", + "s3:GetObjectVersionAcl", + "s3:GetObjectVersionTagging", + "s3:PutInventoryConfiguration" + ], + "Resource": [ + "arn:aws:s3:::{source_bucket}", + "arn:aws:s3:::{source_bucket}/*" + ] + }, + { + "Sid": "ReplicateToDestinationBucket", + "Effect": "Allow", + "Action": [ + "s3:List*", + "s3:GetObject", + "s3:PutObject", + "s3:PutObjectAcl", + "s3:PutObjectTagging", + "s3:ReplicateObject", + "s3:ReplicateDelete", + "s3:ReplicateTags", + "s3:ObjectOwnerOverrideToBucketOwner" + ], + "Resource": [ + "arn:aws:s3:::{dest_bucket}", + "arn:aws:s3:::{dest_bucket}/*" + ] + } + ] +} diff --git a/hq_s3_migration/policies/replication_trust.json b/hq_s3_migration/policies/replication_trust.json new file mode 100644 index 0000000000..873e3abe14 --- /dev/null +++ b/hq_s3_migration/policies/replication_trust.json @@ -0,0 +1,12 @@ +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "Service": "s3.amazonaws.com" + }, + "Action": "sts:AssumeRole" + } + ] +} From 78301587b59db78c4cec0154b71a7ed194f3c9aa Mon Sep 17 00:00:00 2001 From: Amit Phulera Date: Wed, 4 Mar 2026 22:36:45 +0530 Subject: [PATCH 05/13] add iam util to view and create policies on aws accounts --- hq_s3_migration/iam.py | 227 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 227 insertions(+) create mode 100644 hq_s3_migration/iam.py diff --git a/hq_s3_migration/iam.py b/hq_s3_migration/iam.py new file mode 100644 index 0000000000..98d5d5f6ad --- /dev/null +++ b/hq_s3_migration/iam.py @@ -0,0 +1,227 @@ +import json +import time +from typing import Optional + +from botocore.exceptions import ClientError + +from .orchestrator import S3MigrationContext +from .policies import render_policy + + +def print_iam_policies(ctx: S3MigrationContext): + """Print all IAM policies for manual setup.""" + cfg = ctx.config + + role_arn = f"arn:aws:iam::{cfg.source_account_id}:role/{cfg.replication_role_name}" + datasync_role_arn = f"arn:aws:iam::{cfg.source_account_id}:role/{cfg.datasync_role_name}" + + print("\n" + "=" * 60) + print("IAM POLICIES FOR MANUAL SETUP") + print("=" * 60) + + print("\n" + "-" * 40) + print(f"1. REPLICATION ROLE TRUST POLICY (Source Account: {cfg.source_account_id})") + print(f" Role Name: {cfg.replication_role_name}") + print("-" * 40) + print(json.dumps(render_policy("replication_trust.json"), indent=2)) + + print("\n" + "-" * 40) + print(f"2. REPLICATION ROLE POLICY (Source Account: {cfg.source_account_id})") + print("-" * 40) + print(json.dumps( + render_policy("replication_role.json", + source_bucket=cfg.source_bucket, + dest_bucket=cfg.dest_bucket), + indent=2)) + + print("\n" + "-" * 40) + print(f"3. DATASYNC ROLE TRUST POLICY (Source Account: {cfg.source_account_id})") + print(f" Role Name: {cfg.datasync_role_name}") + print("-" * 40) + print(json.dumps(render_policy("datasync_trust.json"), indent=2)) + + print("\n" + "-" * 40) + print(f"4. DATASYNC ROLE POLICY (Source Account: {cfg.source_account_id})") + print("-" * 40) + print(json.dumps( + render_policy("datasync_role.json", + source_bucket=cfg.source_bucket, + dest_bucket=cfg.dest_bucket), + indent=2)) + + print("\n" + "-" * 40) + print(f"5. DESTINATION BUCKET POLICY (Dest Account: {cfg.dest_account_id})") + print(f" Bucket: {cfg.dest_bucket}") + print("-" * 40) + dest_policy = render_policy("destination_bucket.json", + role_arn=role_arn, + dest_bucket=cfg.dest_bucket) + datasync_stmt = render_policy("datasync_bucket_stmt.json", + datasync_role_arn=datasync_role_arn, + dest_bucket=cfg.dest_bucket) + dest_policy['Statement'].append(datasync_stmt) + print(json.dumps(dest_policy, indent=2)) + + +def create_replication_role(ctx: S3MigrationContext) -> Optional[str]: + """Create IAM role for S3 replication in source account.""" + cfg = ctx.config + print(f"\nCreating replication role '{cfg.replication_role_name}'...") + + trust_policy = render_policy("replication_trust.json") + role_policy = render_policy("replication_role.json", + source_bucket=cfg.source_bucket, + dest_bucket=cfg.dest_bucket) + + try: + response = ctx.source_iam.create_role( + RoleName=cfg.replication_role_name, + AssumeRolePolicyDocument=json.dumps(trust_policy), + Description="Role for S3 cross-account replication" + ) + role_arn = response['Role']['Arn'] + print(f" Created role: {role_arn}") + + ctx.source_iam.put_role_policy( + RoleName=cfg.replication_role_name, + PolicyName="S3ReplicationPolicy", + PolicyDocument=json.dumps(role_policy) + ) + print(f" Attached replication policy") + + print(f" Waiting for role propagation...") + time.sleep(10) + + return role_arn + except ClientError as e: + if e.response['Error']['Code'] == 'EntityAlreadyExists': + print(f" Role already exists") + response = ctx.source_iam.get_role(RoleName=cfg.replication_role_name) + return response['Role']['Arn'] + print(f" ERROR: {e}") + return None + + +def create_datasync_role(ctx: S3MigrationContext) -> Optional[str]: + """Create IAM role for DataSync in source account.""" + cfg = ctx.config + print(f"\nCreating DataSync role '{cfg.datasync_role_name}'...") + + trust_policy = render_policy("datasync_trust.json") + role_policy = render_policy("datasync_role.json", + source_bucket=cfg.source_bucket, + dest_bucket=cfg.dest_bucket) + + try: + response = ctx.source_iam.create_role( + RoleName=cfg.datasync_role_name, + AssumeRolePolicyDocument=json.dumps(trust_policy), + Description="Role for DataSync S3 to S3 transfer" + ) + role_arn = response['Role']['Arn'] + print(f" Created role: {role_arn}") + + ctx.source_iam.put_role_policy( + RoleName=cfg.datasync_role_name, + PolicyName="DataSyncS3Policy", + PolicyDocument=json.dumps(role_policy) + ) + print(f" Attached DataSync policy") + + print(f" Waiting for role propagation...") + time.sleep(10) + + return role_arn + except ClientError as e: + if e.response['Error']['Code'] == 'EntityAlreadyExists': + print(f" Role already exists, updating policy...") + ctx.source_iam.put_role_policy( + RoleName=cfg.datasync_role_name, + PolicyName="DataSyncS3Policy", + PolicyDocument=json.dumps(role_policy) + ) + response = ctx.source_iam.get_role(RoleName=cfg.datasync_role_name) + return response['Role']['Arn'] + print(f" ERROR: {e}") + return None + + + +def apply_destination_bucket_policy(ctx: S3MigrationContext) -> bool: + """Apply bucket policy to destination bucket, with option to merge or replace.""" + cfg = ctx.config + print(f"\nApplying bucket policy to destination bucket...") + + role_arn = f"arn:aws:iam::{cfg.source_account_id}:role/{cfg.replication_role_name}" + datasync_role_arn = f"arn:aws:iam::{cfg.source_account_id}:role/{cfg.datasync_role_name}" + + try: + # Check for existing policy + existing_policy = None + try: + existing = ctx.dest_s3.get_bucket_policy(Bucket=cfg.dest_bucket) + existing_policy = json.loads(existing['Policy']) + except ClientError as e: + if e.response['Error']['Code'] != 'NoSuchBucketPolicy': + raise + + # Build the statements we need + dest_policy = render_policy("destination_bucket.json", + role_arn=role_arn, + dest_bucket=cfg.dest_bucket) + datasync_stmt = render_policy("datasync_bucket_stmt.json", + datasync_role_arn=datasync_role_arn, + dest_bucket=cfg.dest_bucket) + new_statements = dest_policy['Statement'] + new_statements.append(datasync_stmt) + + if existing_policy: + existing_statements = existing_policy.get('Statement', []) + print(f"\n EXISTING bucket policy ({len(existing_statements)} statements):") + print(json.dumps(existing_policy, indent=2)) + + print(f"\n NEW statements to add ({len(new_statements)} statements):") + print(json.dumps(new_statements, indent=2)) + + print(f"\n Options:") + print(f" 1. APPEND - Add new statements to existing policy") + print(f" 2. REPLACE - Replace entire policy with new statements only") + print(f" 3. SKIP - Do not modify bucket policy") + choice = input("\n Choose (1/2/3): ").strip() + + if choice == '1': + new_sids = {s['Sid'] for s in new_statements if 'Sid' in s} + merged = [s for s in existing_statements if s.get('Sid') not in new_sids] + merged.extend(new_statements) + existing_policy['Statement'] = merged + final_policy = existing_policy + elif choice == '2': + final_policy = render_policy("destination_bucket.json", + role_arn=role_arn, + dest_bucket=cfg.dest_bucket) + final_policy['Statement'].append( + render_policy("datasync_bucket_stmt.json", + datasync_role_arn=datasync_role_arn, + dest_bucket=cfg.dest_bucket)) + else: + print(f" Skipped bucket policy update.") + return False + else: + print(f"\n No existing bucket policy found.") + final_policy = render_policy("destination_bucket.json", + role_arn=role_arn, + dest_bucket=cfg.dest_bucket) + final_policy['Statement'].append( + render_policy("datasync_bucket_stmt.json", + datasync_role_arn=datasync_role_arn, + dest_bucket=cfg.dest_bucket)) + + ctx.dest_s3.put_bucket_policy( + Bucket=cfg.dest_bucket, + Policy=json.dumps(final_policy) + ) + print(f" Applied bucket policy to '{cfg.dest_bucket}'") + return True + except ClientError as e: + print(f" ERROR: {e}") + return False From 832630da710cfa8100ce8967bd92bf54559765df Mon Sep 17 00:00:00 2001 From: Amit Phulera Date: Wed, 4 Mar 2026 22:37:08 +0530 Subject: [PATCH 06/13] add utils to start replication --- hq_s3_migration/replication.py | 103 +++++++++++++++++++++++++++++++++ 1 file changed, 103 insertions(+) create mode 100644 hq_s3_migration/replication.py diff --git a/hq_s3_migration/replication.py b/hq_s3_migration/replication.py new file mode 100644 index 0000000000..406f7f629f --- /dev/null +++ b/hq_s3_migration/replication.py @@ -0,0 +1,103 @@ +from botocore.exceptions import ClientError + +from .orchestrator import S3MigrationContext + + +def enable_live_replication(ctx: S3MigrationContext) -> bool: + """Enable S3 live replication from source to destination.""" + cfg = ctx.config + + print("\n" + "=" * 60) + print("PHASE 2: Enabling S3 Live Replication") + print("=" * 60) + + role_arn = f"arn:aws:iam::{cfg.source_account_id}:role/{cfg.replication_role_name}" + + rule = { + 'ID': 'CrossAccountReplication', + 'Status': 'Enabled', + 'Priority': 1, + 'Filter': {}, + 'Destination': { + 'Bucket': f'arn:aws:s3:::{cfg.dest_bucket}', + 'Account': cfg.dest_account_id, + 'AccessControlTranslation': { + 'Owner': 'Destination' + } + }, + 'DeleteMarkerReplication': { + 'Status': 'Enabled' if cfg.enable_delete_replication else 'Disabled' + } + } + + if cfg.enable_rtc: + rule['Destination']['ReplicationTime'] = { + 'Status': 'Enabled', + 'Time': {'Minutes': 15} + } + rule['Destination']['Metrics'] = { + 'Status': 'Enabled', + 'EventThreshold': {'Minutes': 15} + } + + replication_config = { + 'Role': role_arn, + 'Rules': [rule] + } + + print(f"\nConfiguring replication rule...") + print(f" Source: {cfg.source_bucket}") + print(f" Destination: {cfg.dest_bucket}") + print(f" Delete replication: {'Enabled' if cfg.enable_delete_replication else 'Disabled'}") + print(f" RTC (15-min SLA): {'Enabled' if cfg.enable_rtc else 'Disabled'}") + + try: + ctx.source_s3.put_bucket_replication( + Bucket=cfg.source_bucket, + ReplicationConfiguration=replication_config + ) + print(f"\n SUCCESS: Live replication enabled") + print(f" All NEW objects will now replicate automatically") + return True + except ClientError as e: + print(f"\n ERROR: {e}") + return False + + +def get_replication_status(ctx: S3MigrationContext) -> dict: + """Get current replication configuration and status.""" + print("\nChecking replication status...") + + result = { + 'configured': False, + 'rules': [], + 'metrics': None + } + + try: + config = ctx.source_s3.get_bucket_replication(Bucket=ctx.config.source_bucket) + result['configured'] = True + result['rules'] = config['ReplicationConfiguration']['Rules'] + print(f" Replication is configured") + for rule in result['rules']: + print(f" Rule '{rule['ID']}': {rule['Status']}") + except ClientError as e: + if 'ReplicationConfigurationNotFoundError' in str(e): + print(f" Replication is NOT configured") + else: + print(f" ERROR: {e}") + + return result + + +def disable_replication(ctx: S3MigrationContext) -> bool: + """Disable S3 replication rule.""" + print("\nDisabling replication...") + + try: + ctx.source_s3.delete_bucket_replication(Bucket=ctx.config.source_bucket) + print(f" Replication disabled") + return True + except ClientError as e: + print(f" ERROR: {e}") + return False From 06bc72927455ad41e2fd7c9b77ba79182f639bea Mon Sep 17 00:00:00 2001 From: Amit Phulera Date: Wed, 4 Mar 2026 22:52:37 +0530 Subject: [PATCH 07/13] add util for datasync --- hq_s3_migration/datasync.py | 188 ++++++++++++++++++++++++++++++++++++ 1 file changed, 188 insertions(+) create mode 100644 hq_s3_migration/datasync.py diff --git a/hq_s3_migration/datasync.py b/hq_s3_migration/datasync.py new file mode 100644 index 0000000000..c1c373cf31 --- /dev/null +++ b/hq_s3_migration/datasync.py @@ -0,0 +1,188 @@ +import time +from datetime import datetime, timezone +from typing import Optional + +from botocore.exceptions import ClientError + +from .orchestrator import S3MigrationContext + + +def create_datasync_source_location(ctx: S3MigrationContext) -> Optional[str]: + """Create DataSync source location (S3 bucket).""" + cfg = ctx.config + print(f"\nCreating DataSync source location...") + + datasync_role_arn = f"arn:aws:iam::{cfg.source_account_id}:role/{cfg.datasync_role_name}" + + try: + response = ctx.source_datasync.create_location_s3( + S3BucketArn=f"arn:aws:s3:::{cfg.source_bucket}", + S3Config={ + 'BucketAccessRoleArn': datasync_role_arn + } + ) + location_arn = response['LocationArn'] + print(f" Created source location: {location_arn}") + return location_arn + except ClientError as e: + print(f" ERROR: {e}") + return None + + +def create_datasync_destination_location(ctx: S3MigrationContext) -> Optional[str]: + """Create DataSync destination location (S3 bucket in different account).""" + cfg = ctx.config + print(f"\nCreating DataSync destination location...") + + datasync_role_arn = f"arn:aws:iam::{cfg.source_account_id}:role/{cfg.datasync_role_name}" + + try: + response = ctx.source_datasync.create_location_s3( + S3BucketArn=f"arn:aws:s3:::{cfg.dest_bucket}", + S3Config={ + 'BucketAccessRoleArn': datasync_role_arn + } + ) + location_arn = response['LocationArn'] + print(f" Created destination location: {location_arn}") + return location_arn + except ClientError as e: + print(f" ERROR: {e}") + return None + + +def create_datasync_task(ctx: S3MigrationContext, + source_location_arn: str, + dest_location_arn: str) -> Optional[str]: + """Create DataSync task for S3 to S3 transfer.""" + cfg = ctx.config + print(f"\nCreating DataSync task...") + + try: + response = ctx.source_datasync.create_task( + SourceLocationArn=source_location_arn, + DestinationLocationArn=dest_location_arn, + Name=f"s3-migration-{cfg.source_bucket}-to-{cfg.dest_bucket}", + TaskMode='ENHANCED', + Options={ + 'VerifyMode': 'ONLY_FILES_TRANSFERRED', + 'OverwriteMode': 'ALWAYS', + 'PreserveDeletedFiles': 'REMOVE', + 'PreserveDevices': 'NONE', + 'PosixPermissions': 'NONE', + 'Uid': 'NONE', + 'Gid': 'NONE', + 'TaskQueueing': 'ENABLED', + 'TransferMode': 'CHANGED', + 'ObjectTags': 'PRESERVE', + 'LogLevel': 'BASIC', + }, + ) + task_arn = response['TaskArn'] + print(f" Created task: {task_arn}") + return task_arn + except ClientError as e: + print(f" ERROR: {e}") + return None + + +def start_datasync_task(ctx: S3MigrationContext, task_arn: str) -> Optional[str]: + """Start a DataSync task execution.""" + print(f"\nStarting DataSync task execution...") + + try: + response = ctx.source_datasync.start_task_execution( + TaskArn=task_arn + ) + execution_arn = response['TaskExecutionArn'] + print(f" Started execution: {execution_arn}") + return execution_arn + except ClientError as e: + print(f" ERROR: {e}") + return None + + +def get_datasync_task_status(ctx: S3MigrationContext, execution_arn: str) -> dict: + """Get status of DataSync task execution.""" + try: + response = ctx.source_datasync.describe_task_execution( + TaskExecutionArn=execution_arn + ) + + status = { + 'status': response['Status'], + 'bytes_transferred': response.get('BytesTransferred', 0), + 'bytes_written': response.get('BytesWritten', 0), + 'files_transferred': response.get('FilesTransferred', 0), + 'estimated_bytes': response.get('EstimatedBytesToTransfer', 0), + 'estimated_files': response.get('EstimatedFilesToTransfer', 0), + } + + if status['estimated_bytes'] > 0: + status['progress_pct'] = (status['bytes_transferred'] / status['estimated_bytes']) * 100 + else: + status['progress_pct'] = 0 + + return status + except ClientError as e: + return {'status': 'ERROR', 'error': str(e)} + + +def monitor_datasync_task(ctx: S3MigrationContext, execution_arn: str, interval: int = 60): + """Monitor DataSync task until completion.""" + print(f"\nMonitoring DataSync task execution...") + print(f" Execution ARN: {execution_arn}") + print(f" Checking every {interval} seconds (Ctrl+C to stop monitoring)...") + print() + + status = None + try: + while True: + status = get_datasync_task_status(ctx, execution_arn) + + timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC') + print(f"[{timestamp}] Status: {status['status']}") + + if status['status'] == 'ERROR' and 'error' in status: + print(f" API error: {status['error']}, will retry...") + time.sleep(interval) + continue + + if status['status'] not in ['QUEUED', 'LAUNCHING', 'PREPARING', 'TRANSFERRING', 'VERIFYING']: + print(f"\nTask completed with status: {status['status']}") + if status['status'] == 'SUCCESS': + print(f" Files transferred: {status['files_transferred']:,}") + print(f" Bytes transferred: {status['bytes_transferred']:,}") + break + + if status['estimated_bytes'] > 0: + print(f" Progress: {status['progress_pct']:.1f}%") + print(f" Transferred: {status['bytes_transferred']:,} / {status['estimated_bytes']:,} bytes") + print(f" Files: {status['files_transferred']:,} / {status['estimated_files']:,}") + + time.sleep(interval) + except KeyboardInterrupt: + print(f"\n\nMonitoring stopped by user. Task is still running.") + print(f" Resume monitoring with the same command and profile arguments.") + + return status + + +def list_datasync_tasks(ctx: S3MigrationContext) -> list: + """List all DataSync tasks.""" + print("\nListing DataSync tasks...") + + try: + tasks = [] + paginator = ctx.source_datasync.get_paginator('list_tasks') + for page in paginator.paginate(): + tasks.extend(page.get('Tasks', [])) + + for task in tasks: + print(f" {task['Name']}: {task['Status']}") + print(f" ARN: {task['TaskArn']}") + + return tasks + except ClientError as e: + print(f" ERROR: {e}") + return [] From 3a4b6b2143d0163e6a20243859029ac2b9fef410 Mon Sep 17 00:00:00 2001 From: Amit Phulera Date: Wed, 4 Mar 2026 22:53:13 +0530 Subject: [PATCH 08/13] add validation checks after sync --- hq_s3_migration/validation.py | 258 ++++++++++++++++++++++++++++++++++ 1 file changed, 258 insertions(+) create mode 100644 hq_s3_migration/validation.py diff --git a/hq_s3_migration/validation.py b/hq_s3_migration/validation.py new file mode 100644 index 0000000000..5463b3c120 --- /dev/null +++ b/hq_s3_migration/validation.py @@ -0,0 +1,258 @@ +import random +import string +from datetime import datetime, timedelta, timezone + +from botocore.exceptions import ClientError + +from .orchestrator import S3MigrationContext +from .replication import get_replication_status + + +def format_size(size_bytes: int) -> str: + """Format bytes to human readable string.""" + for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']: + if size_bytes < 1024.0: + return f"{size_bytes:.2f} {unit}" + size_bytes /= 1024.0 + return f"{size_bytes:.2f} EB" + + +def get_bucket_stats(ctx: S3MigrationContext, s3_client, bucket_name: str) -> dict: + """Get object count and total size using CloudWatch metrics (scales to 1B+ objects).""" + print(f"\nGetting stats for bucket '{bucket_name}'...") + + session = ctx.source_session if s3_client is ctx.source_s3 else ctx.dest_session + cloudwatch = session.client('cloudwatch') + + now = datetime.now(timezone.utc) + metrics_result = {} + + try: + for metric_name, stat_key in [('NumberOfObjects', 'object_count'), ('BucketSizeBytes', 'total_size')]: + response = cloudwatch.get_metric_statistics( + Namespace='AWS/S3', + MetricName=metric_name, + Dimensions=[ + {'Name': 'BucketName', 'Value': bucket_name}, + {'Name': 'StorageType', 'Value': 'AllStorageTypes' if metric_name == 'NumberOfObjects' else 'StandardStorage'} + ], + StartTime=now - timedelta(days=3), + EndTime=now, + Period=86400, + Statistics=['Average'] + ) + if response['Datapoints']: + latest = max(response['Datapoints'], key=lambda x: x['Timestamp']) + metrics_result[stat_key] = int(latest['Average']) + + if 'object_count' in metrics_result: + total_objects = metrics_result['object_count'] + total_size = metrics_result.get('total_size', 0) + print(f" Total objects: {total_objects:,} (from CloudWatch metrics)") + print(f" Total size: {format_size(total_size)}") + print(f" Note: CloudWatch S3 metrics may be up to 24h delayed") + return {'object_count': total_objects, 'total_size': total_size} + else: + print(f" WARNING: No CloudWatch metrics available for this bucket") + print(f" Ensure S3 request metrics are enabled or wait for daily storage metrics") + return {'object_count': 0, 'total_size': 0, 'error': 'no_metrics'} + except ClientError as e: + print(f" ERROR: {e}") + return {'object_count': 0, 'total_size': 0, 'error': str(e)} + + +def verify_sample(ctx: S3MigrationContext, sample_size: int, mismatches: list) -> bool: + """Verify a sample of objects between source and destination. + + Uses random prefixes to sample from across the entire key space, + avoiding bias toward lexicographically early keys. + """ + cfg = ctx.config + sample_keys = [] + prefixes_tried = set() + prefix_chars = string.ascii_lowercase + string.digits + + try: + while len(sample_keys) < sample_size * 3 and len(prefixes_tried) < len(prefix_chars): + prefix = random.choice(prefix_chars) + if prefix in prefixes_tried: + continue + prefixes_tried.add(prefix) + + response = ctx.source_s3.list_objects_v2( + Bucket=cfg.source_bucket, + Prefix=prefix, + MaxKeys=sample_size + ) + if 'Contents' in response: + for obj in response['Contents']: + sample_keys.append(obj['Key']) + except ClientError: + return False + + if not sample_keys: + print(" No objects to sample") + return True + + sample = random.sample(sample_keys, min(sample_size, len(sample_keys))) + + verified = 0 + for key in sample: + try: + source_obj = ctx.source_s3.head_object(Bucket=cfg.source_bucket, Key=key) + dest_obj = ctx.dest_s3.head_object(Bucket=cfg.dest_bucket, Key=key) + + if source_obj['ETag'] == dest_obj['ETag']: + verified += 1 + elif source_obj['ContentLength'] == dest_obj['ContentLength']: + verified += 1 + else: + mismatches.append({ + 'key': key, + 'reason': 'Size mismatch', + 'source_size': source_obj['ContentLength'], + 'dest_size': dest_obj['ContentLength'], + 'source_etag': source_obj['ETag'], + 'dest_etag': dest_obj['ETag'] + }) + except ClientError as e: + mismatches.append({ + 'key': key, + 'reason': str(e) + }) + + print(f" Verified: {verified}/{sample_size}") + if mismatches: + print(f" Mismatches: {len(mismatches)}") + for m in mismatches[:5]: + print(f" - {m['key']}: {m['reason']}") + + return len(mismatches) == 0 + + +def validate_migration(ctx: S3MigrationContext, sample_size: int = 100) -> dict: + """Validate migration by comparing buckets.""" + cfg = ctx.config + + print("\n" + "=" * 60) + print("PHASE 4: Validating Migration") + print("=" * 60) + + result = { + 'source_stats': None, + 'dest_stats': None, + 'sample_verified': False, + 'mismatches': [] + } + + print("\nComparing bucket statistics...") + result['source_stats'] = get_bucket_stats(ctx, ctx.source_s3, cfg.source_bucket) + result['dest_stats'] = get_bucket_stats(ctx, ctx.dest_s3, cfg.dest_bucket) + + source_count = result['source_stats']['object_count'] + dest_count = result['dest_stats']['object_count'] + + print(f"\nComparison:") + print(f" Source objects: {source_count:,}") + print(f" Destination objects: {dest_count:,}") + print(f" Difference: {abs(source_count - dest_count):,}") + + if sample_size > 0: + print(f"\nVerifying {sample_size} random objects...") + result['sample_verified'] = verify_sample(ctx, sample_size, result['mismatches']) + + return result + + +def check_replication_lag(ctx: S3MigrationContext) -> dict: + """Check replication lag using CloudWatch metrics.""" + cfg = ctx.config + print("\nChecking replication lag...") + + cloudwatch = ctx.source_session.client('cloudwatch') + + try: + response = cloudwatch.get_metric_statistics( + Namespace='AWS/S3', + MetricName='ReplicationLatency', + Dimensions=[ + {'Name': 'SourceBucket', 'Value': cfg.source_bucket}, + {'Name': 'DestinationBucket', 'Value': cfg.dest_bucket}, + {'Name': 'RuleId', 'Value': 'CrossAccountReplication'} + ], + StartTime=datetime.now(timezone.utc) - timedelta(hours=1), + EndTime=datetime.now(timezone.utc), + Period=300, + Statistics=['Average', 'Maximum'] + ) + + if response['Datapoints']: + latest = max(response['Datapoints'], key=lambda x: x['Timestamp']) + print(f" Average latency: {latest.get('Average', 'N/A')} seconds") + print(f" Maximum latency: {latest.get('Maximum', 'N/A')} seconds") + return { + 'avg_latency': latest.get('Average'), + 'max_latency': latest.get('Maximum'), + 'timestamp': latest['Timestamp'] + } + else: + print(" No replication metrics available") + return {'error': 'No metrics available'} + except ClientError as e: + print(f" ERROR: {e}") + return {'error': str(e)} + + +def cutover_checklist(ctx: S3MigrationContext) -> dict: + """Run pre-cutover checklist.""" + cfg = ctx.config + + print("\n" + "=" * 60) + print("PHASE 5: Pre-Cutover Checklist") + print("=" * 60) + + checks = { + 'replication_active': False, + 'replication_lag_ok': False, + 'object_counts_match': False, + 'sample_verified': False + } + + print("\n1. Checking replication is active...") + rep_status = get_replication_status(ctx) + checks['replication_active'] = rep_status['configured'] + + print("\n2. Checking replication lag...") + lag = check_replication_lag(ctx) + if 'avg_latency' in lag: + checks['replication_lag_ok'] = lag['avg_latency'] < 300 + else: + checks['replication_lag_ok'] = True + + print("\n3. Comparing object counts...") + source_stats = get_bucket_stats(ctx, ctx.source_s3, cfg.source_bucket) + dest_stats = get_bucket_stats(ctx, ctx.dest_s3, cfg.dest_bucket) + + diff = abs(source_stats['object_count'] - dest_stats['object_count']) + diff_pct = (diff / max(source_stats['object_count'], 1)) * 100 + checks['object_counts_match'] = diff_pct < 1 + + print("\n4. Verifying sample objects...") + mismatches = [] + checks['sample_verified'] = verify_sample(ctx, 50, mismatches) + + print("\n" + "-" * 40) + print("Cutover Checklist Results:") + all_passed = True + for check, passed in checks.items(): + status = "PASS" if passed else "FAIL" + print(f" {check}: {status}") + if not passed: + all_passed = False + + if all_passed: + print("\nAll checks passed. Ready for cutover.") + else: + print("\nSome checks failed. Review before proceeding with cutover.") + + return checks From 04540e9e22980430dea9cdcb80438a83a2438896 Mon Sep 17 00:00:00 2001 From: Amit Phulera Date: Wed, 4 Mar 2026 22:53:29 +0530 Subject: [PATCH 09/13] wrap everything up in cli --- hq_s3_migration/cli.py | 331 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 331 insertions(+) create mode 100644 hq_s3_migration/cli.py diff --git a/hq_s3_migration/cli.py b/hq_s3_migration/cli.py new file mode 100644 index 0000000000..f3b402d9c1 --- /dev/null +++ b/hq_s3_migration/cli.py @@ -0,0 +1,331 @@ +#!/usr/bin/env python3 +""" +S3 Cross-Account Migration Tool + +Implements hybrid approach for large-scale S3 migrations: +- DataSync for bulk transfer of existing data +- S3 Live Replication for ongoing sync + +Designed for 204 TB / 1B+ objects with zero-downtime cutover. +""" + +import argparse +import sys + +from botocore.exceptions import ClientError + +from .config import ACCOUNT_IDS, ACCOUNT_NAMES, MigrationConfig +from .datasync import (create_datasync_destination_location, + create_datasync_source_location, create_datasync_task, + list_datasync_tasks, monitor_datasync_task, + start_datasync_task) +from .iam import (apply_destination_bucket_policy, create_datasync_role, + create_replication_role, print_iam_policies) +from .orchestrator import S3MigrationContext +from .replication import enable_live_replication, get_replication_status +from .validation import cutover_checklist, validate_migration + + +def _check_prerequisites(ctx: S3MigrationContext) -> dict: + """Check all prerequisites for migration.""" + cfg = ctx.config + + print("\n" + "=" * 60) + print("PHASE 1: Checking Prerequisites") + print("=" * 60) + + results = { + 'source_bucket_exists': False, + 'dest_bucket_exists': False, + 'source_versioning': False, + 'dest_versioning': False, + 'source_account_verified': False, + 'dest_account_verified': False, + } + + print("\nVerifying AWS account access...") + try: + source_identity = ctx.source_sts.get_caller_identity() + results['source_account_verified'] = True + actual_source_account = source_identity['Account'] + print(f" Source account: {actual_source_account}") + if actual_source_account != cfg.source_account_id: + print(f" WARNING: Configured source account ID ({cfg.source_account_id}) " + f"doesn't match actual ({actual_source_account})") + except ClientError as e: + print(f" ERROR: Cannot access source account: {e}") + + try: + dest_identity = ctx.dest_sts.get_caller_identity() + results['dest_account_verified'] = True + actual_dest_account = dest_identity['Account'] + print(f" Destination account: {actual_dest_account}") + if actual_dest_account != cfg.dest_account_id: + print(f" WARNING: Configured dest account ID ({cfg.dest_account_id}) " + f"doesn't match actual ({actual_dest_account})") + except ClientError as e: + print(f" ERROR: Cannot access destination account: {e}") + + print(f"\nChecking source bucket '{cfg.source_bucket}'...") + try: + ctx.source_s3.head_bucket(Bucket=cfg.source_bucket) + results['source_bucket_exists'] = True + print(f" Bucket exists") + + versioning = ctx.source_s3.get_bucket_versioning(Bucket=cfg.source_bucket) + status = versioning.get('Status', 'Disabled') + results['source_versioning'] = status == 'Enabled' + print(f" Versioning: {status}") + except ClientError as e: + print(f" ERROR: {e}") + + print(f"\nChecking destination bucket '{cfg.dest_bucket}'...") + try: + ctx.dest_s3.head_bucket(Bucket=cfg.dest_bucket) + results['dest_bucket_exists'] = True + print(f" Bucket exists") + + versioning = ctx.dest_s3.get_bucket_versioning(Bucket=cfg.dest_bucket) + status = versioning.get('Status', 'Disabled') + results['dest_versioning'] = status == 'Enabled' + print(f" Versioning: {status}") + except ClientError as e: + if e.response['Error']['Code'] == '404': + print(f" Bucket does not exist (will be created)") + else: + print(f" ERROR: {e}") + + print("\n" + "-" * 40) + print("Prerequisites Summary:") + for key, value in results.items(): + status = "OK" if value else "MISSING" + print(f" {key}: {status}") + + return results + + +def _create_destination_bucket(ctx: S3MigrationContext) -> bool: + """Create destination bucket with versioning enabled.""" + cfg = ctx.config + print(f"\nCreating destination bucket '{cfg.dest_bucket}'...") + + try: + if cfg.region == 'us-east-1': + ctx.dest_s3.create_bucket(Bucket=cfg.dest_bucket) + else: + print(f" This tool is designed to create buckets in the Staging and Production accounts.") + return False + print(f" Created bucket") + + ctx.dest_s3.put_bucket_versioning( + Bucket=cfg.dest_bucket, + VersioningConfiguration={'Status': 'Enabled'} + ) + print(f" Enabled versioning") + + return True + except ClientError as e: + if e.response['Error']['Code'] == 'BucketAlreadyOwnedByYou': + print(f" Bucket already exists") + ctx.dest_s3.put_bucket_versioning( + Bucket=cfg.dest_bucket, + VersioningConfiguration={'Status': 'Enabled'} + ) + print(f" Enabled versioning") + return True + print(f" ERROR: {e}") + return False + + +def _enable_source_versioning(ctx: S3MigrationContext) -> bool: + """Enable versioning on source bucket if not already enabled.""" + cfg = ctx.config + print(f"\nEnabling versioning on source bucket...") + + try: + ctx.source_s3.put_bucket_versioning( + Bucket=cfg.source_bucket, + VersioningConfiguration={'Status': 'Enabled'} + ) + print(f" Versioning enabled on '{cfg.source_bucket}'") + return True + except ClientError as e: + print(f" ERROR: {e}") + return False + + +def _resolve_account_id(value): + """Resolve account name alias or validate 12-digit account ID.""" + if value in ACCOUNT_IDS: + return ACCOUNT_IDS[value] + if value.isdigit() and len(value) == 12: + return value + aliases = ', '.join(ACCOUNT_IDS.keys()) + raise argparse.ArgumentTypeError( + f"Must be a 12-digit account ID or one of: {aliases}. Got: '{value}'" + ) + + +def main(): + parser = argparse.ArgumentParser( + description='S3 Cross-Account Migration Tool (Hybrid: DataSync + Live Replication)', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Commands: + prepare Check prerequisites and create destination bucket + setup-iam Print IAM policies for manual setup or create them + enable-replication Enable S3 live replication + create-datasync Create DataSync task + start-datasync Start DataSync task execution + monitor-datasync Monitor DataSync task execution + validate Validate migration status + cutover-check Run pre-cutover checklist + status Show current migration status + +Examples: + # Check prerequisites + python -m hq_s3_migration prepare --source-bucket my-source --dest-bucket my-dest \\ + --source-account 111111111111 --dest-account 222222222222 + + # Print IAM policies + python -m hq_s3_migration setup-iam --source-bucket my-source --dest-bucket my-dest \\ + --source-account 111111111111 --dest-account 222222222222 + + # Enable live replication + python -m hq_s3_migration enable-replication --source-bucket my-source --dest-bucket my-dest \\ + --source-account 111111111111 --dest-account 222222222222 + + # Create and start DataSync task + python -m hq_s3_migration create-datasync --source-bucket my-source --dest-bucket my-dest \\ + --source-account 111111111111 --dest-account 222222222222 + """ + ) + + parser.add_argument('command', choices=[ + 'prepare', 'setup-iam', 'enable-replication', 'create-datasync', + 'start-datasync', 'monitor-datasync', 'validate', 'cutover-check', + 'status' + ], help='Command to execute') + + parser.add_argument('--source-profile', default='StagingAdminAccess', + help='AWS profile for source account') + parser.add_argument('--dest-profile', default='BackupAdminAccess', + help='AWS profile for destination account') + parser.add_argument('--source-bucket', default='ap-source-for-replication', + help='Source bucket name') + parser.add_argument('--dest-bucket', default='ap-destination-for-replication', + help='Destination bucket name') + parser.add_argument('--source-account', required=True, type=_resolve_account_id, + help=f'Source AWS account ID or alias ({", ".join(ACCOUNT_IDS.keys())})') + parser.add_argument('--dest-account', required=True, type=_resolve_account_id, + help=f'Destination AWS account ID or alias ({", ".join(ACCOUNT_IDS.keys())})') + parser.add_argument('--region', default='us-east-1', + help='AWS region') + parser.add_argument('--disable-rtc', action='store_true', + help='Disable S3 Replication Time Control (enabled by default)') + parser.add_argument('--create-iam', action='store_true', + help='Create IAM roles (for setup-iam command)') + parser.add_argument('--task-arn', + help='DataSync task ARN (for start-datasync)') + parser.add_argument('--execution-arn', + help='DataSync execution ARN (for monitor-datasync)') + + args = parser.parse_args() + + # Append environment name to role names for uniqueness across accounts + env_name = ACCOUNT_NAMES.get(args.source_account, args.source_account) + replication_role = f"s3-cross-account-replication-role-{env_name}" + datasync_role = f"datasync-s3-access-role-{env_name}" + + config = MigrationConfig( + source_profile=args.source_profile, + dest_profile=args.dest_profile, + source_bucket=args.source_bucket, + dest_bucket=args.dest_bucket, + source_account_id=args.source_account, + dest_account_id=args.dest_account, + region=args.region, + enable_rtc=not args.disable_rtc, + replication_role_name=replication_role, + datasync_role_name=datasync_role, + ) + + ctx = S3MigrationContext(config) + + if args.command == 'prepare': + results = _check_prerequisites(ctx) + if not results['source_bucket_exists']: + print("\nERROR: Source bucket does not exist. Aborting.") + sys.exit(1) + if not results['source_account_verified'] or not results['dest_account_verified']: + print("\nERROR: Cannot verify AWS account access. Aborting.") + sys.exit(1) + if not results['dest_bucket_exists']: + _create_destination_bucket(ctx) + else: + if not results['dest_versioning']: + ctx.dest_s3.put_bucket_versioning( + Bucket=config.dest_bucket, + VersioningConfiguration={'Status': 'Enabled'} + ) + print(f"\n Enabled versioning on existing destination bucket") + _enable_source_versioning(ctx) + + elif args.command == 'setup-iam': + print_iam_policies(ctx) + if args.create_iam: + print("\n" + "=" * 60) + print("Creating IAM Resources") + print("=" * 60) + create_replication_role(ctx) + create_datasync_role(ctx) + apply_destination_bucket_policy(ctx) + + elif args.command == 'enable-replication': + enable_live_replication(ctx) + + elif args.command == 'create-datasync': + source_loc = create_datasync_source_location(ctx) + if source_loc: + dest_loc = create_datasync_destination_location(ctx) + if dest_loc: + task_arn = create_datasync_task(ctx, source_loc, dest_loc) + if task_arn: + print(f"\nDataSync task created successfully!") + print(f"Task ARN: {task_arn}") + print(f"\nTo start the task, run:") + print(f" python -m hq_s3_migration.cli start-datasync --task-arn '{task_arn}' \\") + print(f" --source-profile {args.source_profile} --dest-profile {args.dest_profile} \\") + print(f" --source-account {args.source_account} --dest-account {args.dest_account}") + + elif args.command == 'start-datasync': + if not args.task_arn: + print("ERROR: --task-arn is required for start-datasync") + sys.exit(1) + execution_arn = start_datasync_task(ctx, args.task_arn) + if execution_arn: + print(f"\nTo monitor the task, run:") + print(f" python -m hq_s3_migration.cli monitor-datasync --execution-arn '{execution_arn}' \\") + print(f" --source-profile {args.source_profile} --dest-profile {args.dest_profile} \\") + print(f" --source-account {args.source_account} --dest-account {args.dest_account}") + + elif args.command == 'monitor-datasync': + if not args.execution_arn: + print("ERROR: --execution-arn is required for monitor-datasync") + sys.exit(1) + monitor_datasync_task(ctx, args.execution_arn) + + elif args.command == 'validate': + validate_migration(ctx) + + elif args.command == 'cutover-check': + cutover_checklist(ctx) + + elif args.command == 'status': + _check_prerequisites(ctx) + get_replication_status(ctx) + list_datasync_tasks(ctx) + + +if __name__ == '__main__': + main() From 7e239a1a524701321975f2f136641553a83ed5da Mon Sep 17 00:00:00 2001 From: Amit Phulera Date: Wed, 4 Mar 2026 22:53:50 +0530 Subject: [PATCH 10/13] calling module calls cli --- hq_s3_migration/__main__.py | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 hq_s3_migration/__main__.py diff --git a/hq_s3_migration/__main__.py b/hq_s3_migration/__main__.py new file mode 100644 index 0000000000..4e28416e10 --- /dev/null +++ b/hq_s3_migration/__main__.py @@ -0,0 +1,3 @@ +from .cli import main + +main() From 4fa5a6f1c5b1d13749fade905a051451c455d736 Mon Sep 17 00:00:00 2001 From: Amit Phulera Date: Fri, 6 Mar 2026 16:47:02 +0530 Subject: [PATCH 11/13] instead of calling head, rely on different apis which are not added to block list --- hq_s3_migration/cli.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/hq_s3_migration/cli.py b/hq_s3_migration/cli.py index f3b402d9c1..3644b8b03c 100644 --- a/hq_s3_migration/cli.py +++ b/hq_s3_migration/cli.py @@ -68,11 +68,10 @@ def _check_prerequisites(ctx: S3MigrationContext) -> dict: print(f"\nChecking source bucket '{cfg.source_bucket}'...") try: - ctx.source_s3.head_bucket(Bucket=cfg.source_bucket) + versioning = ctx.source_s3.get_bucket_versioning(Bucket=cfg.source_bucket) results['source_bucket_exists'] = True print(f" Bucket exists") - versioning = ctx.source_s3.get_bucket_versioning(Bucket=cfg.source_bucket) status = versioning.get('Status', 'Disabled') results['source_versioning'] = status == 'Enabled' print(f" Versioning: {status}") @@ -81,16 +80,15 @@ def _check_prerequisites(ctx: S3MigrationContext) -> dict: print(f"\nChecking destination bucket '{cfg.dest_bucket}'...") try: - ctx.dest_s3.head_bucket(Bucket=cfg.dest_bucket) + versioning = ctx.dest_s3.get_bucket_versioning(Bucket=cfg.dest_bucket) results['dest_bucket_exists'] = True print(f" Bucket exists") - versioning = ctx.dest_s3.get_bucket_versioning(Bucket=cfg.dest_bucket) status = versioning.get('Status', 'Disabled') results['dest_versioning'] = status == 'Enabled' print(f" Versioning: {status}") except ClientError as e: - if e.response['Error']['Code'] == '404': + if e.response['Error']['Code'] in ('404', 'NoSuchBucket'): print(f" Bucket does not exist (will be created)") else: print(f" ERROR: {e}") From cd0d812773794d36fc61ab7cfcc3a3c25aa9651c Mon Sep 17 00:00:00 2001 From: Amit Phulera Date: Fri, 6 Mar 2026 16:49:32 +0530 Subject: [PATCH 12/13] during datasync and live replication set the buckets to be of intelligent tiering --- hq_s3_migration/datasync.py | 1 + hq_s3_migration/replication.py | 1 + 2 files changed, 2 insertions(+) diff --git a/hq_s3_migration/datasync.py b/hq_s3_migration/datasync.py index c1c373cf31..5266cb064d 100644 --- a/hq_s3_migration/datasync.py +++ b/hq_s3_migration/datasync.py @@ -39,6 +39,7 @@ def create_datasync_destination_location(ctx: S3MigrationContext) -> Optional[st try: response = ctx.source_datasync.create_location_s3( S3BucketArn=f"arn:aws:s3:::{cfg.dest_bucket}", + S3StorageClass='INTELLIGENT_TIERING', S3Config={ 'BucketAccessRoleArn': datasync_role_arn } diff --git a/hq_s3_migration/replication.py b/hq_s3_migration/replication.py index 406f7f629f..94879c21ef 100644 --- a/hq_s3_migration/replication.py +++ b/hq_s3_migration/replication.py @@ -21,6 +21,7 @@ def enable_live_replication(ctx: S3MigrationContext) -> bool: 'Destination': { 'Bucket': f'arn:aws:s3:::{cfg.dest_bucket}', 'Account': cfg.dest_account_id, + 'StorageClass': 'INTELLIGENT_TIERING', 'AccessControlTranslation': { 'Owner': 'Destination' } From 681233ce1383eb369dbb3f489def704081c434a8 Mon Sep 17 00:00:00 2001 From: Amit Phulera Date: Fri, 6 Mar 2026 17:23:09 +0530 Subject: [PATCH 13/13] add Environment tag in the destination s3 bucket --- hq_s3_migration/cli.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/hq_s3_migration/cli.py b/hq_s3_migration/cli.py index 3644b8b03c..d65afb9211 100644 --- a/hq_s3_migration/cli.py +++ b/hq_s3_migration/cli.py @@ -121,6 +121,17 @@ def _create_destination_bucket(ctx: S3MigrationContext) -> bool: ) print(f" Enabled versioning") + env_name = ACCOUNT_NAMES.get(cfg.dest_account_id, 'unknown') + ctx.dest_s3.put_bucket_tagging( + Bucket=cfg.dest_bucket, + Tagging={ + 'TagSet': [ + {'Key': 'Environment', 'Value': env_name}, + ] + } + ) + print(f" Tagged with Environment={env_name}") + return True except ClientError as e: if e.response['Error']['Code'] == 'BucketAlreadyOwnedByYou':