Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions elementary/clients/slack/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import ssl
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Tuple, Union

Expand All @@ -13,6 +14,7 @@
from elementary.config.config import Config
from elementary.tracking.tracking_interface import Tracking
from elementary.utils.log import get_logger
from elementary.utils.ssl import create_ssl_context

logger = get_logger(__name__)

Expand All @@ -25,8 +27,9 @@ class SlackClient(ABC):
def __init__(
self,
tracking: Optional[Tracking] = None,
ssl_context: Optional[ssl.SSLContext] = None,
):
self.client = self._initial_client()
self.client = self._initial_client(ssl_context)
self.tracking = tracking
self._initial_retry_handlers()
self.email_to_user_id_cache: Dict[str, str] = {}
Expand All @@ -37,20 +40,22 @@ def create_client(
) -> Optional["SlackClient"]:
if not config.has_slack:
return None
ssl_context = create_ssl_context(config.ssl_ca_bundle)
if config.slack_token:
logger.debug("Creating Slack client with token.")
return SlackWebClient(token=config.slack_token, tracking=tracking)
return SlackWebClient(
token=config.slack_token, tracking=tracking, ssl_context=ssl_context
)
elif config.slack_webhook:
logger.debug("Creating Slack client with webhook.")
return SlackWebhookClient(
webhook=config.slack_webhook,
is_workflow=config.is_slack_workflow,
tracking=tracking,
ssl_context=ssl_context,
)
return None

@abstractmethod
def _initial_client(self):
def _initial_client(self, ssl_context: Optional[ssl.SSLContext]):
raise NotImplementedError

def _initial_retry_handlers(self):
Expand Down Expand Up @@ -85,12 +90,13 @@ def __init__(
self,
token: str,
tracking: Optional[Tracking] = None,
ssl_context: Optional[ssl.SSLContext] = None,
):
self.token = token
super().__init__(tracking)
super().__init__(tracking, ssl_context)

def _initial_client(self):
return WebClient(token=self.token)
def _initial_client(self, ssl_context: Optional[ssl.SSLContext]):
return WebClient(token=self.token, ssl=ssl_context)

@sleep_and_retry
@limits(calls=1, period=ONE_SECOND)
Expand Down Expand Up @@ -231,16 +237,22 @@ def __init__(
webhook: str,
is_workflow: bool,
tracking: Optional[Tracking] = None,
ssl_context: Optional[ssl.SSLContext] = None,
):
self.webhook = webhook
self.is_workflow = is_workflow
super().__init__(tracking)
super().__init__(tracking, ssl_context)

def _initial_client(self):
def _initial_client(self, ssl_context: Optional[ssl.SSLContext]):
if self.is_workflow:
# Workflow webhooks do not support the ssl_context parameter.
# requests.Session() uses the requests default CA bundle (certifi).
return requests.Session()

return WebhookClient(
url=self.webhook, default_headers={"Content-type": "application/json"}
url=self.webhook,
default_headers={"Content-type": "application/json"},
ssl=ssl_context,
)

@sleep_and_retry
Expand Down
6 changes: 6 additions & 0 deletions elementary/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def __init__(
run_dbt_deps_if_needed: Optional[bool] = None,
project_name: Optional[str] = None,
quiet_logs: Optional[bool] = None,
ssl_ca_bundle: Optional[str] = None,
):
self.config_dir = config_dir
self.profiles_dir = profiles_dir
Expand Down Expand Up @@ -223,6 +224,11 @@ def __init__(
quiet_logs, config.get("quiet_logs"), False
)

self.ssl_ca_bundle = self._first_not_none(
ssl_ca_bundle,
config.get("ssl_ca_bundle"),
)

def _load_configuration(self) -> dict:
config_file_path = os.path.join(self.config_dir, self._CONFIG_FILE_NAME)
if not os.path.exists(config_file_path):
Expand Down
9 changes: 7 additions & 2 deletions elementary/messages/messaging_integrations/slack_web.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import ssl
import time
from typing import Any, Dict, Iterator, Optional

Expand Down Expand Up @@ -54,9 +55,13 @@ def __init__(

@classmethod
def from_token(
cls, token: str, tracking: Optional[Tracking] = None, **kwargs: Any
cls,
token: str,
tracking: Optional[Tracking] = None,
ssl_context: Optional[ssl.SSLContext] = None,
**kwargs: Any,
) -> "SlackWebMessagingIntegration":
client = WebClient(token=token)
client = WebClient(token=token, ssl=ssl_context)
client.retry_handlers.append(RateLimitErrorRetryHandler(max_retry_count=5))
return cls(client, tracking, **kwargs)

Expand Down
8 changes: 6 additions & 2 deletions elementary/messages/messaging_integrations/slack_webhook.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import ssl
from datetime import datetime, timezone
from http import HTTPStatus
from typing import Any, Optional
Expand Down Expand Up @@ -37,9 +38,12 @@ def __init__(

@classmethod
def from_url(
cls, url: str, tracking: Optional[Tracking] = None
cls,
url: str,
tracking: Optional[Tracking] = None,
ssl_context: Optional[ssl.SSLContext] = None,
) -> "SlackWebhookMessagingIntegration":
client = WebhookClient(url)
client = WebhookClient(url, ssl=ssl_context)
client.retry_handlers.append(RateLimitErrorRetryHandler(max_retry_count=5))
return cls(client, tracking)

Expand Down
13 changes: 13 additions & 0 deletions elementary/monitor/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ def decorator(func):
default=None,
help="The Slack token for your workspace.",
)(func)
func = click.option(
"--ssl-ca-bundle",
type=str,
default=None,
help="Override the CA bundle used for SSL connections. "
"Accepted values: 'certifi' (use the certifi package bundle), "
"'system' (use the OS CA store), or a file path to a custom CA bundle. "
"When omitted each underlying library uses its own default.",
)(func)
if cmd in (Command.REPORT, Command.SEND_REPORT):
func = click.option(
"--exclude-elementary-models",
Expand Down Expand Up @@ -331,6 +340,7 @@ def monitor(
teams_webhook,
maximum_columns_in_alert_samples,
quiet_logs,
ssl_ca_bundle,
):
"""
Get alerts on failures in dbt jobs.
Expand Down Expand Up @@ -365,6 +375,7 @@ def monitor(
teams_webhook=teams_webhook,
maximum_columns_in_alert_samples=maximum_columns_in_alert_samples,
quiet_logs=quiet_logs,
ssl_ca_bundle=ssl_ca_bundle,
)
anonymous_tracking = AnonymousCommandLineTracking(config)
anonymous_tracking.set_env("use_select", bool(select))
Expand Down Expand Up @@ -692,6 +703,7 @@ def send_report(
include,
target_path,
quiet_logs,
ssl_ca_bundle,
):
"""
Generate and send the report to an external platform.
Expand Down Expand Up @@ -735,6 +747,7 @@ def send_report(
env=env,
project_name=project_name,
quiet_logs=quiet_logs,
ssl_ca_bundle=ssl_ca_bundle,
)
anonymous_tracking = AnonymousCommandLineTracking(config)
anonymous_tracking.set_env("use_select", bool(select))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
)
from elementary.tracking.tracking_interface import Tracking
from elementary.utils.log import get_logger
from elementary.utils.ssl import create_ssl_context

logger = get_logger(__name__)

Expand All @@ -43,18 +44,19 @@ def get_integration(
tracking: Optional[Tracking] = None,
) -> Union[BaseMessagingIntegration, BaseIntegration]:
if config.has_slack:
ssl_context = create_ssl_context(config.ssl_ca_bundle)
if config.is_slack_workflow:
return SlackIntegration(
config=config,
tracking=tracking,
)
if config.slack_token:
return SlackWebMessagingIntegration.from_token(
config.slack_token, tracking
config.slack_token, tracking, ssl_context=ssl_context
)
elif config.slack_webhook:
return SlackWebhookMessagingIntegration.from_url(
config.slack_webhook, tracking
config.slack_webhook, tracking, ssl_context=ssl_context
)
else:
raise UnsupportedAlertIntegrationError
Expand Down
41 changes: 41 additions & 0 deletions elementary/utils/ssl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import os
import ssl
from typing import Optional

import certifi

from elementary.utils.log import get_logger

logger = get_logger(__name__)

CERTIFI = "certifi"
SYSTEM = "system"


def create_ssl_context(ssl_ca_bundle: Optional[str] = None) -> Optional[ssl.SSLContext]:
"""Resolve an ssl_ca_bundle setting into an SSLContext.

Returns ``None`` when *ssl_ca_bundle* is ``None`` so that each
library keeps its own default CA behaviour.
"""
if ssl_ca_bundle is None:
return None

value = ssl_ca_bundle.strip()
if not value:
raise ValueError(
"ssl_ca_bundle cannot be empty. Use 'certifi', 'system', or a CA bundle file path."
)

if value.lower() == CERTIFI:
logger.debug("Using certifi CA bundle for SSL context.")
return ssl.create_default_context(cafile=certifi.where())

if value.lower() == SYSTEM:
logger.debug("Using system CA store for SSL context.")
return ssl.create_default_context()

if not os.path.isfile(value):
raise ValueError(f"ssl_ca_bundle path does not exist or is not a file: {value}")
logger.debug("Using custom CA bundle for SSL context: %s", value)
return ssl.create_default_context(cafile=value)
Loading