Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Add `minimum_severity_level` and `trace_based_sampling` logger parameters to filter logs
([#4765](https://github.com/open-telemetry/opentelemetry-python/pull/4765))
- `opentelemetry-sdk`: Fix the type hint of the `_metrics_data` property to allow `None`
([#4837](https://github.com/open-telemetry/opentelemetry-python/pull/4837)
- Regenerate opentelemetry-proto code with v1.9.0 release
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-sdk/src/opentelemetry/sdk/_logs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

from opentelemetry.sdk._logs._internal import (
FilteringLogRecordProcessor,
LogDroppedAttributesWarning,
Logger,
LoggerProvider,
Expand All @@ -32,6 +33,7 @@
"LogLimits",
"LogRecordLimits",
"LogRecordProcessor",
"FilteringLogRecordProcessor",
"LogDroppedAttributesWarning",
"LogRecordDroppedAttributesWarning",
"ReadableLogRecord",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
from opentelemetry.trace import (
format_span_id,
format_trace_id,
get_current_span,
)
from opentelemetry.util.types import AnyValue, _ExtendedAttributes

Expand Down Expand Up @@ -317,6 +318,39 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
"""


class FilteringLogRecordProcessor(LogRecordProcessor):
"""A processor that drops records based on minimum severity and/or trace based sampling parameters provided by the user."""

def __init__(
self,
log_record_processor: LogRecordProcessor,
*,
minimum_severity_level: SeverityNumber = SeverityNumber.UNSPECIFIED,
enable_trace_based_sampling: bool = False,
):
self._log_record_processor = log_record_processor
self._minimum_severity_level = minimum_severity_level
self._enable_trace_based_sampling = enable_trace_based_sampling

def on_emit(self, log_record: ReadWriteLogRecord):
record = log_record.log_record
if is_less_than_minimum_severity_level(
record, self._minimum_severity_level
):
return
if should_drop_logs_for_trace_based_sampling(
record, self._enable_trace_based_sampling
):
return
self._log_record_processor.on_emit(log_record)

def shutdown(self):
self._log_record_processor.shutdown()

def force_flush(self, timeout_millis: int = 30000) -> bool:
return self._log_record_processor.force_flush(timeout_millis)


# Temporary fix until https://github.com/PyCQA/pylint/issues/4098 is resolved
# pylint:disable=no-member
class SynchronousMultiLogRecordProcessor(LogRecordProcessor):
Expand Down Expand Up @@ -835,3 +869,41 @@ def std_to_otel(levelno: int) -> SeverityNumber:
if levelno > 53:
return SeverityNumber.FATAL4
return _STD_TO_OTEL[levelno]


def is_less_than_minimum_severity_level(
record: LogRecord, minimum_severity_level: SeverityNumber
) -> bool:
"""Checks if the log record's severity number is less than the minimum severity level.

:return: True if the log record's severity number is less than the minimum
severity level, False otherwise. Log records with an unspecified severity (i.e. `0`)
are not affected by this parameter and therefore bypass minimum severity filtering.
"""
if record.severity_number is not None:
if (
minimum_severity_level is not None
and minimum_severity_level != SeverityNumber.UNSPECIFIED
and record.severity_number.value < minimum_severity_level.value
):
return True
return False


def should_drop_logs_for_trace_based_sampling(
record: LogRecord, enable_trace_based_sampling: bool
) -> bool:
"""Determines whether the logger should drop log records associated with unsampled traces.

If `enable_trace_based_sampling` is `true`, log records associated with unsampled traces are dropped by the `Logger`.
A log record is considered associated with an unsampled trace if it has a valid `SpanId` and its
`TraceFlags` indicate that the trace is unsampled. A log record that isn't associated with a trace
context is not affected by this parameter and therefore bypasses trace-based filtering.
"""
if enable_trace_based_sampling:
if record.context is not None:
span = get_current_span(record.context)
span_context = span.get_span_context()
if span_context.is_valid and not span_context.trace_flags.sampled:
return True
return False
246 changes: 246 additions & 0 deletions opentelemetry-sdk/tests/logs/test_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
ReadableLogRecord,
)
from opentelemetry.sdk._logs._internal import (
FilteringLogRecordProcessor,
NoOpLogger,
SynchronousMultiLogRecordProcessor,
)
Expand Down Expand Up @@ -214,3 +215,248 @@ def test_can_emit_with_keywords_arguments(self):
self.assertEqual(result_log_record.attributes, {"some": "attributes"})
self.assertEqual(result_log_record.event_name, "event_name")
self.assertEqual(log_data.resource, logger.resource)


class TestFilteringLogRecordProcessor(unittest.TestCase):
@staticmethod
def _build_logger(
minimum_severity_level: SeverityNumber = SeverityNumber.UNSPECIFIED,
enable_trace_based_sampling: bool = False,
):
processor_mock = Mock()
filtering_processor = FilteringLogRecordProcessor(
processor_mock,
minimum_severity_level=minimum_severity_level,
enable_trace_based_sampling=enable_trace_based_sampling,
)
provider = LoggerProvider(resource=Resource.create({}))
provider.add_log_record_processor(filtering_processor)
logger = provider.get_logger(
"name",
version="version",
schema_url="schema_url",
attributes={"an": "attribute"},
)
return logger, processor_mock

def test_emit_logrecord_with_minimum_severity_level_filtering(self):
logger, processor_mock = self._build_logger(
minimum_severity_level=SeverityNumber.DEBUG4
)

log_record_info = LogRecord(
observed_timestamp=0,
body="info log line",
severity_number=SeverityNumber.DEBUG,
severity_text="DEBUG",
)

logger.emit(log_record_info)
processor_mock.on_emit.assert_not_called()

processor_mock.reset_mock()

log_record_error = LogRecord(
observed_timestamp=0,
body="error log line",
severity_number=SeverityNumber.ERROR,
severity_text="ERROR",
)

logger.emit(log_record_error)

processor_mock.on_emit.assert_called_once()
log_data = processor_mock.on_emit.call_args.args[0]
self.assertTrue(isinstance(log_data.log_record, LogRecord))
self.assertEqual(
log_data.log_record.severity_number, SeverityNumber.ERROR
)

def test_emit_logrecord_with_minimum_severity_level_unspecified(self):
logger, processor_mock = self._build_logger()
log_record = LogRecord(
observed_timestamp=0,
body="debug log line",
severity_number=SeverityNumber.DEBUG,
severity_text="DEBUG",
)
logger.emit(log_record)
processor_mock.on_emit.assert_called_once()

def test_emit_logrecord_with_trace_based_filtering(self):
logger, processor_mock = self._build_logger(
enable_trace_based_sampling=True
)

mock_span_context = Mock()
mock_span_context.is_valid = True
mock_span_context.trace_flags.sampled = False

mock_span = Mock()
mock_span.get_span_context.return_value = mock_span_context

mock_context = Mock()

with patch(
"opentelemetry.sdk._logs._internal.get_current_span",
return_value=mock_span,
):
log_record = LogRecord(
observed_timestamp=0,
body="should be dropped",
severity_number=SeverityNumber.INFO,
severity_text="INFO",
context=mock_context,
)

logger.emit(log_record)
processor_mock.on_emit.assert_not_called()

processor_mock.reset_mock()

mock_span_context = Mock()
mock_span_context.is_valid = True
mock_span_context.trace_flags.sampled = True

mock_span = Mock()
mock_span.get_span_context.return_value = mock_span_context

with patch(
"opentelemetry.sdk._logs._internal.get_current_span",
return_value=mock_span,
):
log_record = LogRecord(
observed_timestamp=0,
body="should pass",
severity_number=SeverityNumber.INFO,
severity_text="INFO",
context=mock_context,
)

logger.emit(log_record)
processor_mock.on_emit.assert_called_once()

def test_emit_logrecord_trace_based_filtering_disabled(self):
logger, processor_mock = self._build_logger(
enable_trace_based_sampling=False
)

mock_span_context = Mock()
mock_span_context.is_valid = False
mock_span_context.trace_flags.sampled = False

mock_span = Mock()
mock_span.get_span_context.return_value = mock_span_context

mock_context = Mock()

with patch(
"opentelemetry.sdk._logs._internal.get_current_span",
return_value=mock_span,
):
log_record = LogRecord(
observed_timestamp=0,
body="should be emitted when filtering disabled",
severity_number=SeverityNumber.INFO,
severity_text="INFO",
context=mock_context,
)

logger.emit(log_record)
processor_mock.on_emit.assert_called_once()

def test_emit_logrecord_trace_based_filtering_edge_cases(self):
logger, processor_mock = self._build_logger(
enable_trace_based_sampling=True
)

mock_span_context = Mock()
mock_span_context.is_valid = False
mock_span_context.trace_flags.sampled = True

mock_span = Mock()
mock_span.get_span_context.return_value = mock_span_context

mock_context = Mock()

with patch(
"opentelemetry.sdk._logs._internal.get_current_span",
return_value=mock_span,
):
log_record = LogRecord(
observed_timestamp=0,
body="invalid but sampled",
severity_number=SeverityNumber.INFO,
severity_text="INFO",
context=mock_context,
)

logger.emit(log_record)
processor_mock.on_emit.assert_called_once()

processor_mock.reset_mock()

mock_span_context = Mock()
mock_span_context.is_valid = True
mock_span_context.trace_flags.sampled = False

mock_span = Mock()
mock_span.get_span_context.return_value = mock_span_context

with patch(
"opentelemetry.sdk._logs._internal.get_current_span",
return_value=mock_span,
):
log_record = LogRecord(
observed_timestamp=0,
body="valid but not sampled",
severity_number=SeverityNumber.INFO,
severity_text="INFO",
context=mock_context,
)

logger.emit(log_record)
processor_mock.on_emit.assert_not_called()

def test_emit_both_minimum_severity_level_and_trace_based_filtering(self):
logger, processor_mock = self._build_logger(
minimum_severity_level=SeverityNumber.WARN,
enable_trace_based_sampling=True,
)

mock_span_context = Mock()
mock_span_context.is_valid = True
mock_span_context.trace_flags.sampled = True

mock_span = Mock()
mock_span.get_span_context.return_value = mock_span_context

mock_context = Mock()

with patch(
"opentelemetry.sdk._logs._internal.get_current_span",
return_value=mock_span,
):
log_record_info = LogRecord(
observed_timestamp=0,
body="info log line",
severity_number=SeverityNumber.INFO,
severity_text="INFO",
context=mock_context,
)

logger.emit(log_record_info)
processor_mock.on_emit.assert_not_called()

processor_mock.reset_mock()

log_record_error = LogRecord(
observed_timestamp=0,
body="error log line",
severity_number=SeverityNumber.ERROR,
severity_text="ERROR",
context=mock_context,
)

logger.emit(log_record_error)
processor_mock.on_emit.assert_called_once()
Loading