Skip to content

SamualAdams/Sams_Tools

Repository files navigation

# Sam's Databricks Tooling Suite A collection of Unix-philosophy inspired tools for demand planning and data processing workflows in Databricks/PySpark environments. ## Philosophy: Small, Composable Tools This toolset follows the Unix philosophy of building small, focused tools that do one thing well and can be composed together to create powerful workflows. Each tool is designed to be: - **Single-purpose**: Clear, focused responsibility - **Composable**: Works seamlessly with other tools - **Predictable**: Consistent interfaces and behavior - **Extensible**: Easy to add new tools without changing existing ones ## =�� Tools Overview ### 1. `tool__workstation` - Universal Spark Session Management **Purpose**: Centralized Spark session orchestration with full Databricks compatibility **Key Features**: - **Universal compatibility**: Works seamlessly in local and Databricks environments - **Auto-environment detection**: Automatically detects and configures for your environment - **Databricks integration**: Uses existing Databricks sessions without interference - **Java auto-configuration**: Automatically finds and sets up Java 17+ for local development - **Multiple presets**: local_delta, local_basic, local_performance, databricks - **Health monitoring**: Environment-aware diagnostics and session validation ```python from tool__workstation import get_spark, spark_health_check spark = get_spark("auto") # Auto-detects local vs Databricks health = spark_health_check() # Environment-aware health check ``` ### 2. `tool__dag_chainer` - DataFrame Workflow Management **Purpose**: Chain and manage DataFrames in workflows with inspection capabilities **Key Features**: - Workflow container for DataFrame management - Visual inspection and debugging capabilities - Integration with workstation session management - Direct writing to Delta tables ```python from tool__dag_chainer import DagChain chain = DagChain() chain.dag__raw_data = df_source chain.dag__clean_data = process(chain.dag__raw_data) chain.trace(shape=True) # View all DataFrames with counts chain.look() # Inspect latest DataFrame ``` ### 3. `tool__table_polisher` - Data Standardization **Purpose**: Consistent DataFrame standardization for medallion architecture **Key Features**: - Column name standardization (lowercase, special chars � underscores) - Key column value cleaning (keyP__/keyF__ patterns) - Intelligent column reordering - Null value handling ```python from tool__table_polisher import polish df_standardized = polish(df_raw) # Complete standardization ``` ### 4. `tool__table_indexer` - Entity Indexing with Persistence **Purpose**: Persistent, consistent indexing for entities with Delta table management **Key Features**: - Persistent index mapping using Delta tables - Race condition safe MERGE operations - Automatic catalog/schema creation - Support for customers, plants, materials, and custom entities ```python from tool__table_indexer import TableIndexer indexer = TableIndexer(df_entities) df_indexed = indexer.customer("customer_name") # Persistent customer indices ``` ## =� Quick Start ### Installation ```bash pip install pyspark delta-spark # Clone or download this repository ``` ### Basic Workflow ```python # Initialize session (works everywhere!) from tool__workstation import get_spark from tool__dag_chainer import DagChain from tool__table_polisher import polish from tool__table_indexer import TableIndexer # Start workflow - auto-detects environment spark = get_spark("auto") # Local or Databricks chain = DagChain() # Load and process data chain.dag__raw = spark.read.csv("data.csv", header=True) chain.dag__clean = polish(chain.dag__raw) # Add persistent entity indexing indexer = TableIndexer(chain.dag__clean) chain.dag__indexed = indexer.customer("customer_name") # Inspect and save chain.trace(shape=True) chain.write("my_table", -1) # Write latest to Delta table ``` ## =� Tool Composition Patterns ### Pattern 1: Linear Pipeline Sequential tool application for data processing: ```python spark = get_spark("local_delta") chain = DagChain() # Compose: Import � Standardize � Index � Write chain.dag__raw = spark.read.csv("demand.csv", header=True) chain.dag__polished = polish(chain.dag__raw) indexer = TableIndexer(chain.dag__polished) chain.dag__indexed = indexer.material("material_code") chain.write_to_path("gold/indexed_demand", -1) ``` ### Pattern 2: Multi-Entity Processing Applying indexing to multiple entity types: ```python indexer = TableIndexer(df_clean) df_with_customers = indexer.customer("customer_name") df_with_plants = indexer.plant("plant_location") df_final = indexer.material("material_code") # Final result has all indices ``` ### Pattern 3: Inspection and Debugging Using chain visibility for troubleshooting: ```python chain = DagChain() chain.dag__step1 = initial_data chain.dag__step2 = polish(chain.dag__step1) chain.dag__step3 = complex_transformation(chain.dag__step2) # Debug any step chain.trace(shape=True) # See all steps with row counts chain.look(1) # Inspect specific step chain.pick(1).show() # Get DataFrame for analysis ``` ## =�� Project Structure ``` Sams_Databricks_Tooling/ � README.md # This file � requirements.txt # Python dependencies � sample.ipynb # Complete workflow demonstration � tool__workstation.py # Spark session management � tool__dag_chainer.py # DataFrame workflow orchestration � tool__table_polisher.py # Data standardization utility � tool__table_indexer.py # Entity indexing with persistence � resources/ � guide__tooling.md # Comprehensive tooling guide � data__example_demand.csv # Sample demand data � data__realistic_demand.csv # Realistic demand scenario ``` ## <� Use Cases ### Demand Planning Workflows - **Data Ingestion**: Load messy CSV/Parquet files - **Standardization**: Clean column names and values - **Entity Management**: Create consistent entity mappings - **Analytics Ready**: Prepare data for ML models and reporting ### Data Engineering Pipelines - **Medallion Architecture**: Bronze � Silver � Gold layer processing - **Master Data Management**: Consistent entity indexing across systems - **Workflow Orchestration**: Visible, debuggable data transformations - **Multi-Environment**: Local development to production Databricks ## =� Requirements - **Python**: 3.8+ - **PySpark**: 3.4+ - **Delta Lake**: 2.4+ - **Environment**: Local Spark, Databricks, or any Spark cluster ## =' Configuration ### Workstation Presets - **`auto`**: Auto-detects environment and selects optimal preset (recommended) - **`local_delta`**: Delta Lake enabled, 1GB memory (local) - **`local_basic`**: Basic Spark, 1GB memory (local) - **`local_performance`**: High memory, advanced optimizations (local) - **`databricks`**: Uses existing Databricks session (auto-selected) ### Catalog Structure **Auto-detected based on environment**: - **Local**: `test_catalog.supply_chain` (automatically created) - **Databricks**: Uses current catalog or falls back to `spark_catalog.default` - **Unity Catalog**: Detects and uses existing Unity Catalog setup **Table Naming**: `{catalog}.{schema}.mapping__active_{entity}s` **Databricks Setup**: - Works out-of-the-box with default workspace setup - Automatically detects Unity Catalog if available - Falls back gracefully to `spark_catalog.default` if needed - No manual metastore configuration required ## >� Contributing ### Adding New Tools Follow the naming convention `tool__.py` and ensure: 1. Single responsibility principle 2. Integration with workstation session management 3. Compatibility with dag chainer workflows 4. Consistent error handling and logging 5. Usage examples and documentation ### Tool Integration Checklist - [ ] Uses workstation for session management (if needed) - [ ] Returns DataFrame or compatible types for chaining - [ ] Handles errors gracefully with meaningful messages - [ ] Follows naming conventions (`tool__`) - [ ] Includes usage examples and docstrings - [ ] Works in both local and Databricks environments ## =� Performance Considerations ### Session Management - Reuse Spark sessions via workstation singleton - Leverage Spark's lazy evaluation through chains - Clean up sessions when workflows complete ### Delta Lake Optimizations - Auto-compaction enabled on mapping tables - Write optimization for better performance - Schema evolution support for changing data ### Memory Management - Configurable memory settings per use case - Adaptive query execution enabled by default - Partition optimization for large datasets ## =� Troubleshooting ### Common Issues **Session Not Active** ```python from tool__workstation import is_spark_active, get_spark if not is_spark_active(): spark = get_spark("local_delta") ``` **Table Not Found** - Ensure catalog/schema exists: `spark.sql("SHOW CATALOGS")` - Check workstation health: `spark_health_check()` **Memory Issues** - Use `local_performance` preset for larger datasets - Adjust partition settings in workstation configuration ### Debug Mode ```python # Enable debug logging import logging logging.getLogger("tool__table_indexer").setLevel(logging.DEBUG) # Check session health from tool__workstation import spark_health_check print(spark_health_check()) # Inspect workflow steps chain.trace(shape=True) ``` ### Databricks Specific Issues **Metastore Storage Error**: If you see "Metastore storage root URL does not exist": - The TableIndexer now auto-detects and falls back to `spark_catalog.default` - Update to latest version: `TableIndexer(df)` (no catalog parameter needed) - For Unity Catalog: Ensure your workspace has Unity Catalog enabled **Import Errors**: - Ensure all tool files are uploaded to your Databricks workspace - Run the import cell first: all PySpark imports are consolidated there - Check that the cluster is running and has sufficient resources ## =� License This project is released under the MIT License. See individual tool files for specific licensing information. ## =K B� Support For questions, issues, or contributions: 1. Check the comprehensive tooling guide: `resources/guide__tooling.md` 2. Review the sample notebook: `sample.ipynb` 3. Examine tool source code for implementation details 4. Use workstation health checks for session troubleshooting --- *Built with the Unix philosophy: Do one thing, do it well, and work together.*

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published