diff --git a/elementary/clients/slack/client.py b/elementary/clients/slack/client.py index f66a9b969..0c74a28a3 100644 --- a/elementary/clients/slack/client.py +++ b/elementary/clients/slack/client.py @@ -1,4 +1,5 @@ import json +import ssl from abc import ABC, abstractmethod from typing import Dict, List, Optional, Tuple, Union @@ -13,6 +14,7 @@ from elementary.config.config import Config from elementary.tracking.tracking_interface import Tracking from elementary.utils.log import get_logger +from elementary.utils.ssl import create_ssl_context logger = get_logger(__name__) @@ -25,8 +27,9 @@ class SlackClient(ABC): def __init__( self, tracking: Optional[Tracking] = None, + ssl_context: Optional[ssl.SSLContext] = None, ): - self.client = self._initial_client() + self.client = self._initial_client(ssl_context) self.tracking = tracking self._initial_retry_handlers() self.email_to_user_id_cache: Dict[str, str] = {} @@ -37,20 +40,22 @@ def create_client( ) -> Optional["SlackClient"]: if not config.has_slack: return None + ssl_context = create_ssl_context(config.ssl_ca_bundle) if config.slack_token: - logger.debug("Creating Slack client with token.") - return SlackWebClient(token=config.slack_token, tracking=tracking) + return SlackWebClient( + token=config.slack_token, tracking=tracking, ssl_context=ssl_context + ) elif config.slack_webhook: - logger.debug("Creating Slack client with webhook.") return SlackWebhookClient( webhook=config.slack_webhook, is_workflow=config.is_slack_workflow, tracking=tracking, + ssl_context=ssl_context, ) return None @abstractmethod - def _initial_client(self): + def _initial_client(self, ssl_context: Optional[ssl.SSLContext]): raise NotImplementedError def _initial_retry_handlers(self): @@ -85,12 +90,13 @@ def __init__( self, token: str, tracking: Optional[Tracking] = None, + ssl_context: Optional[ssl.SSLContext] = None, ): self.token = token - super().__init__(tracking) + super().__init__(tracking, ssl_context) - def _initial_client(self): - return WebClient(token=self.token) + def _initial_client(self, ssl_context: Optional[ssl.SSLContext]): + return WebClient(token=self.token, ssl=ssl_context) @sleep_and_retry @limits(calls=1, period=ONE_SECOND) @@ -231,16 +237,22 @@ def __init__( webhook: str, is_workflow: bool, tracking: Optional[Tracking] = None, + ssl_context: Optional[ssl.SSLContext] = None, ): self.webhook = webhook self.is_workflow = is_workflow - super().__init__(tracking) + super().__init__(tracking, ssl_context) - def _initial_client(self): + def _initial_client(self, ssl_context: Optional[ssl.SSLContext]): if self.is_workflow: + # Workflow webhooks do not support the ssl_context parameter. + # requests.Session() uses the requests default CA bundle (certifi). return requests.Session() + return WebhookClient( - url=self.webhook, default_headers={"Content-type": "application/json"} + url=self.webhook, + default_headers={"Content-type": "application/json"}, + ssl=ssl_context, ) @sleep_and_retry diff --git a/elementary/config/config.py b/elementary/config/config.py index b20fa2309..fbce626c5 100644 --- a/elementary/config/config.py +++ b/elementary/config/config.py @@ -76,6 +76,7 @@ def __init__( run_dbt_deps_if_needed: Optional[bool] = None, project_name: Optional[str] = None, quiet_logs: Optional[bool] = None, + ssl_ca_bundle: Optional[str] = None, ): self.config_dir = config_dir self.profiles_dir = profiles_dir @@ -223,6 +224,11 @@ def __init__( quiet_logs, config.get("quiet_logs"), False ) + self.ssl_ca_bundle = self._first_not_none( + ssl_ca_bundle, + config.get("ssl_ca_bundle"), + ) + def _load_configuration(self) -> dict: config_file_path = os.path.join(self.config_dir, self._CONFIG_FILE_NAME) if not os.path.exists(config_file_path): diff --git a/elementary/messages/messaging_integrations/slack_web.py b/elementary/messages/messaging_integrations/slack_web.py index cc4834062..f83b2ae51 100644 --- a/elementary/messages/messaging_integrations/slack_web.py +++ b/elementary/messages/messaging_integrations/slack_web.py @@ -1,4 +1,5 @@ import json +import ssl import time from typing import Any, Dict, Iterator, Optional @@ -54,9 +55,13 @@ def __init__( @classmethod def from_token( - cls, token: str, tracking: Optional[Tracking] = None, **kwargs: Any + cls, + token: str, + tracking: Optional[Tracking] = None, + ssl_context: Optional[ssl.SSLContext] = None, + **kwargs: Any, ) -> "SlackWebMessagingIntegration": - client = WebClient(token=token) + client = WebClient(token=token, ssl=ssl_context) client.retry_handlers.append(RateLimitErrorRetryHandler(max_retry_count=5)) return cls(client, tracking, **kwargs) diff --git a/elementary/messages/messaging_integrations/slack_webhook.py b/elementary/messages/messaging_integrations/slack_webhook.py index 8e465b7a0..8c65230fe 100644 --- a/elementary/messages/messaging_integrations/slack_webhook.py +++ b/elementary/messages/messaging_integrations/slack_webhook.py @@ -1,3 +1,4 @@ +import ssl from datetime import datetime, timezone from http import HTTPStatus from typing import Any, Optional @@ -37,9 +38,12 @@ def __init__( @classmethod def from_url( - cls, url: str, tracking: Optional[Tracking] = None + cls, + url: str, + tracking: Optional[Tracking] = None, + ssl_context: Optional[ssl.SSLContext] = None, ) -> "SlackWebhookMessagingIntegration": - client = WebhookClient(url) + client = WebhookClient(url, ssl=ssl_context) client.retry_handlers.append(RateLimitErrorRetryHandler(max_retry_count=5)) return cls(client, tracking) diff --git a/elementary/monitor/cli.py b/elementary/monitor/cli.py index 9b47133ff..c86b4b992 100644 --- a/elementary/monitor/cli.py +++ b/elementary/monitor/cli.py @@ -75,6 +75,15 @@ def decorator(func): default=None, help="The Slack token for your workspace.", )(func) + func = click.option( + "--ssl-ca-bundle", + type=str, + default=None, + help="Override the CA bundle used for SSL connections. " + "Accepted values: 'certifi' (use the certifi package bundle), " + "'system' (use the OS CA store), or a file path to a custom CA bundle. " + "When omitted each underlying library uses its own default.", + )(func) if cmd in (Command.REPORT, Command.SEND_REPORT): func = click.option( "--exclude-elementary-models", @@ -331,6 +340,7 @@ def monitor( teams_webhook, maximum_columns_in_alert_samples, quiet_logs, + ssl_ca_bundle, ): """ Get alerts on failures in dbt jobs. @@ -365,6 +375,7 @@ def monitor( teams_webhook=teams_webhook, maximum_columns_in_alert_samples=maximum_columns_in_alert_samples, quiet_logs=quiet_logs, + ssl_ca_bundle=ssl_ca_bundle, ) anonymous_tracking = AnonymousCommandLineTracking(config) anonymous_tracking.set_env("use_select", bool(select)) @@ -692,6 +703,7 @@ def send_report( include, target_path, quiet_logs, + ssl_ca_bundle, ): """ Generate and send the report to an external platform. @@ -735,6 +747,7 @@ def send_report( env=env, project_name=project_name, quiet_logs=quiet_logs, + ssl_ca_bundle=ssl_ca_bundle, ) anonymous_tracking = AnonymousCommandLineTracking(config) anonymous_tracking.set_env("use_select", bool(select)) diff --git a/elementary/monitor/data_monitoring/alerts/integrations/integrations.py b/elementary/monitor/data_monitoring/alerts/integrations/integrations.py index 7a8ed59db..aeba80dae 100644 --- a/elementary/monitor/data_monitoring/alerts/integrations/integrations.py +++ b/elementary/monitor/data_monitoring/alerts/integrations/integrations.py @@ -23,6 +23,7 @@ ) from elementary.tracking.tracking_interface import Tracking from elementary.utils.log import get_logger +from elementary.utils.ssl import create_ssl_context logger = get_logger(__name__) @@ -43,6 +44,7 @@ def get_integration( tracking: Optional[Tracking] = None, ) -> Union[BaseMessagingIntegration, BaseIntegration]: if config.has_slack: + ssl_context = create_ssl_context(config.ssl_ca_bundle) if config.is_slack_workflow: return SlackIntegration( config=config, @@ -50,11 +52,11 @@ def get_integration( ) if config.slack_token: return SlackWebMessagingIntegration.from_token( - config.slack_token, tracking + config.slack_token, tracking, ssl_context=ssl_context ) elif config.slack_webhook: return SlackWebhookMessagingIntegration.from_url( - config.slack_webhook, tracking + config.slack_webhook, tracking, ssl_context=ssl_context ) else: raise UnsupportedAlertIntegrationError diff --git a/elementary/utils/ssl.py b/elementary/utils/ssl.py new file mode 100644 index 000000000..ffd5ea257 --- /dev/null +++ b/elementary/utils/ssl.py @@ -0,0 +1,41 @@ +import os +import ssl +from typing import Optional + +import certifi + +from elementary.utils.log import get_logger + +logger = get_logger(__name__) + +CERTIFI = "certifi" +SYSTEM = "system" + + +def create_ssl_context(ssl_ca_bundle: Optional[str] = None) -> Optional[ssl.SSLContext]: + """Resolve an ssl_ca_bundle setting into an SSLContext. + + Returns ``None`` when *ssl_ca_bundle* is ``None`` so that each + library keeps its own default CA behaviour. + """ + if ssl_ca_bundle is None: + return None + + value = ssl_ca_bundle.strip() + if not value: + raise ValueError( + "ssl_ca_bundle cannot be empty. Use 'certifi', 'system', or a CA bundle file path." + ) + + if value.lower() == CERTIFI: + logger.debug("Using certifi CA bundle for SSL context.") + return ssl.create_default_context(cafile=certifi.where()) + + if value.lower() == SYSTEM: + logger.debug("Using system CA store for SSL context.") + return ssl.create_default_context() + + if not os.path.isfile(value): + raise ValueError(f"ssl_ca_bundle path does not exist or is not a file: {value}") + logger.debug("Using custom CA bundle for SSL context: %s", value) + return ssl.create_default_context(cafile=value)