-
Notifications
You must be signed in to change notification settings - Fork 19
feat(cleanup): Smart Cleanup and Disk Space Optimizer (#125) #298
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
hyaku0121
wants to merge
3
commits into
cortexlinux:main
Choose a base branch
from
hyaku0121:feature/disk-cleanup-optimization
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 2 commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| """ | ||
| Cleanup module for Cortex. | ||
|
|
||
| This module provides disk cleanup functionality including: | ||
| - Scanning for cleanup opportunities (package cache, orphaned packages, temp files, logs) | ||
| - Executing cleanup operations with undo capability | ||
| - Managing quarantined files for safe recovery | ||
| - Scheduling automatic cleanup tasks | ||
| """ | ||
|
|
||
| from cortex.cleanup.scanner import CleanupScanner, ScanResult | ||
| from cortex.cleanup.cleaner import DiskCleaner | ||
| from cortex.cleanup.manager import CleanupManager, QuarantineItem | ||
|
|
||
| __all__ = [ | ||
| "CleanupScanner", | ||
| "ScanResult", | ||
| "DiskCleaner", | ||
| "CleanupManager", | ||
| "QuarantineItem", | ||
| ] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,222 @@ | ||
| import shutil | ||
| import gzip | ||
| import logging | ||
| from typing import List, Dict | ||
| from pathlib import Path | ||
| from cortex.utils.commands import run_command | ||
| from cortex.cleanup.scanner import CleanupScanner, ScanResult | ||
| from cortex.cleanup.manager import CleanupManager | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| class DiskCleaner: | ||
| """ | ||
| Handles the actual cleanup operations including package cleaning, | ||
| orphaned package removal, temp file deletion, and log compression. | ||
| """ | ||
| def __init__(self, dry_run: bool = False): | ||
| """ | ||
| Initialize the DiskCleaner. | ||
|
|
||
| Args: | ||
| dry_run (bool): If True, simulate actions without modifying the filesystem. | ||
| """ | ||
| self.dry_run = dry_run | ||
| self.scanner = CleanupScanner() | ||
| self.manager = CleanupManager() | ||
|
|
||
| def clean_package_cache(self) -> int: | ||
| """ | ||
| Clean apt package cache using 'apt-get clean'. | ||
|
|
||
| Returns: | ||
| int: Number of bytes freed (estimated). | ||
| """ | ||
| # Get size before cleaning for reporting | ||
| scan_result = self.scanner.scan_package_cache() | ||
| size_freed = scan_result.size_bytes | ||
|
|
||
| if self.dry_run: | ||
| return size_freed | ||
|
|
||
| # Run apt-get clean (use -n for non-interactive mode) | ||
| cmd = "sudo -n apt-get clean" | ||
| result = run_command(cmd, validate=True) | ||
|
|
||
| if result.success: | ||
| return size_freed | ||
| else: | ||
| logger.error(f"Failed to clean package cache: {result.stderr}") | ||
| return 0 | ||
|
|
||
| def remove_orphaned_packages(self, packages: List[str]) -> int: | ||
| """ | ||
| Remove orphaned packages using 'apt-get autoremove'. | ||
|
|
||
| Args: | ||
| packages (List[str]): List of package names to remove. | ||
|
|
||
| Returns: | ||
| int: Number of bytes freed (estimated). | ||
| """ | ||
| if not packages: | ||
| return 0 | ||
|
|
||
| if self.dry_run: | ||
| return 0 # Size is estimated in scanner | ||
|
|
||
| # Use -n for non-interactive mode | ||
| cmd = "sudo -n apt-get autoremove -y" | ||
| result = run_command(cmd, validate=True) | ||
|
|
||
| freed_bytes = 0 | ||
| if result.success: | ||
| freed_bytes = self._parse_freed_space(result.stdout) | ||
| return freed_bytes | ||
| else: | ||
| logger.error(f"Failed to remove orphaned packages: {result.stderr}") | ||
| return 0 | ||
|
|
||
| def _parse_freed_space(self, stdout: str) -> int: | ||
| """ | ||
| Helper to parse freed space from apt output. | ||
|
|
||
| Args: | ||
| stdout (str): Output from apt command. | ||
|
|
||
| Returns: | ||
| int: Bytes freed. | ||
| """ | ||
| freed_bytes = 0 | ||
| for line in stdout.splitlines(): | ||
| if "disk space will be freed" in line: | ||
| parts = line.split() | ||
| try: | ||
| for i, part in enumerate(parts): | ||
| if part.isdigit() or part.replace('.', '', 1).isdigit(): | ||
| val = float(part) | ||
| unit = parts[i+1] | ||
| if unit.upper().startswith('KB'): | ||
| freed_bytes = int(val * 1024) | ||
| elif unit.upper().startswith('MB'): | ||
| freed_bytes = int(val * 1024 * 1024) | ||
| elif unit.upper().startswith('GB'): | ||
| freed_bytes = int(val * 1024 * 1024 * 1024) | ||
| break | ||
| except Exception: | ||
| pass | ||
| return freed_bytes | ||
|
|
||
| def clean_temp_files(self, files: List[str]) -> int: | ||
| """ | ||
| Remove temporary files by moving them to quarantine. | ||
|
|
||
| Args: | ||
| files (List[str]): List of file paths to remove. | ||
|
|
||
| Returns: | ||
| int: Number of bytes freed (estimated). | ||
| """ | ||
| freed_bytes = 0 | ||
|
|
||
| for filepath_str in files: | ||
| filepath = Path(filepath_str) | ||
| if not filepath.exists(): | ||
| continue | ||
|
|
||
| # Get size before any operation | ||
| try: | ||
| size = filepath.stat().st_size | ||
| except OSError: | ||
| size = 0 | ||
|
|
||
| if self.dry_run: | ||
| freed_bytes += size | ||
| continue | ||
|
|
||
| # Move to quarantine | ||
| item_id = self.manager.quarantine_file(str(filepath)) | ||
| if item_id: | ||
| freed_bytes += size | ||
| else: | ||
| logger.warning(f"Failed to quarantine temp file: {filepath}") | ||
|
|
||
| return freed_bytes | ||
|
|
||
| def compress_logs(self, files: List[str]) -> int: | ||
| """ | ||
| Compress log files using gzip. | ||
|
|
||
| Args: | ||
| files (List[str]): List of log file paths to compress. | ||
|
|
||
| Returns: | ||
| int: Number of bytes freed. | ||
| """ | ||
| freed_bytes = 0 | ||
|
|
||
| for filepath_str in files: | ||
| filepath = Path(filepath_str) | ||
| if not filepath.exists(): | ||
| continue | ||
|
|
||
| try: | ||
| original_size = filepath.stat().st_size | ||
|
|
||
| if self.dry_run: | ||
| # Estimate compression ratio (e.g. 90% reduction) | ||
| freed_bytes += int(original_size * 0.9) | ||
| continue | ||
|
|
||
| # Compress | ||
| gz_path = filepath.with_suffix(filepath.suffix + '.gz') | ||
| with open(filepath, 'rb') as f_in: | ||
| with gzip.open(gz_path, 'wb') as f_out: | ||
| shutil.copyfileobj(f_in, f_out) | ||
|
|
||
| # Verify compressed file exists and has size | ||
| if gz_path.exists(): | ||
| compressed_size = gz_path.stat().st_size | ||
| # Remove original | ||
| filepath.unlink() | ||
| freed_bytes += (original_size - compressed_size) | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Failed to compress {filepath}: {e}") | ||
|
|
||
| return freed_bytes | ||
|
|
||
| def run_cleanup(self, scan_results: List[ScanResult], safe: bool = True) -> Dict[str, int]: | ||
| """ | ||
| Run cleanup based on scan results. | ||
|
|
||
| Args: | ||
| scan_results (List[ScanResult]): Results from scanner. | ||
| safe (bool): If True, perform safe cleanup (default). | ||
|
|
||
| Returns: | ||
| Dict[str, int]: Summary of bytes freed per category. | ||
| """ | ||
| summary = { | ||
| "Package Cache": 0, | ||
| "Orphaned Packages": 0, | ||
| "Temporary Files": 0, | ||
| "Old Logs": 0 | ||
| } | ||
|
|
||
| for result in scan_results: | ||
| if result.category == "Package Cache": | ||
| summary["Package Cache"] = self.clean_package_cache() | ||
|
|
||
| elif result.category == "Orphaned Packages": | ||
| # Only remove orphaned packages in non-safe mode | ||
| if not safe: | ||
| summary["Orphaned Packages"] = self.remove_orphaned_packages(result.items) | ||
|
|
||
| elif result.category == "Temporary Files": | ||
| summary["Temporary Files"] = self.clean_temp_files(result.items) | ||
|
|
||
| elif result.category == "Old Logs": | ||
| summary["Old Logs"] = self.compress_logs(result.items) | ||
|
|
||
| return summary | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential IndexError when parsing freed space.
At line 98,
parts[i+1]could raise anIndexErrorif the numeric value is the last element inparts. The broadexcept Exceptionmasks this silently.try: for i, part in enumerate(parts): if part.isdigit() or part.replace('.', '', 1).isdigit(): + if i + 1 >= len(parts): + continue val = float(part) unit = parts[i+1]🤖 Prompt for AI Agents