Skip to content

Commit 01888e0

Browse files
committed
Add command and event payloads to activity
By exposing these as custom properties, we allow listeners to perform additional operations on them without having to extend the message bus itself.
1 parent 0595fcc commit 01888e0

4 files changed

Lines changed: 84 additions & 13 deletions

File tree

src/Merq.Core/MessageBus.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ public bool CanHandle(IExecutable command) => canHandleMap.GetOrAdd(GetCommandTy
190190
public void Execute(ICommand command)
191191
{
192192
var type = GetCommandType(command);
193-
using var activity = StartActivity(type, Telemetry.Process);
193+
using var activity = StartCommandActivity(type, command);
194194

195195
try
196196
{
@@ -222,7 +222,7 @@ public void Execute(ICommand command)
222222
public TResult Execute<TResult>(ICommand<TResult> command)
223223
{
224224
var type = GetCommandType(command);
225-
using var activity = StartActivity(type, Telemetry.Process);
225+
using var activity = StartCommandActivity(type, command);
226226

227227
try
228228
{
@@ -253,7 +253,7 @@ public TResult Execute<TResult>(ICommand<TResult> command)
253253
public Task ExecuteAsync(IAsyncCommand command, CancellationToken cancellation = default)
254254
{
255255
var type = GetCommandType(command);
256-
using var activity = StartActivity(type, Telemetry.Process);
256+
using var activity = StartCommandActivity(type, command);
257257

258258
try
259259
{
@@ -286,7 +286,7 @@ public Task ExecuteAsync(IAsyncCommand command, CancellationToken cancellation =
286286
public Task<TResult> ExecuteAsync<TResult>(IAsyncCommand<TResult> command, CancellationToken cancellation = default)
287287
{
288288
var type = GetCommandType(command);
289-
using var activity = StartActivity(type, Telemetry.Process);
289+
using var activity = StartCommandActivity(type, command);
290290

291291
try
292292
{
@@ -315,7 +315,7 @@ public Task<TResult> ExecuteAsync<TResult>(IAsyncCommand<TResult> command, Cance
315315
public void Notify<TEvent>(TEvent e)
316316
{
317317
var type = (e ?? throw new ArgumentNullException(nameof(e))).GetType();
318-
using var activity = StartActivity(type, Publish);
318+
using var activity = StartEventActivity(type, e);
319319
var watch = Stopwatch.StartNew();
320320

321321
try

src/Merq.Core/Telemetry.cs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,12 @@ static Telemetry()
4444
// NOTE: this is not an entirely satisfactory way to tell events from commands apart.
4545
public const string Process = nameof(Process);
4646

47-
public static Activity? StartActivity(Type type, string operation, [CallerMemberName] string? member = default, [CallerFilePath] string? file = default, [CallerLineNumber] int? line = default)
47+
public static Activity? StartCommandActivity(Type type, object command) => StartActivity(type, Process, "Command", command);
48+
49+
public static Activity? StartEventActivity(Type type, object @event) => StartActivity(type, Publish, "Event", @event);
50+
51+
public static Activity? StartActivity(Type type, string operation, string? property = default, object? value = default,
52+
[CallerMemberName] string? member = default, [CallerFilePath] string? file = default, [CallerLineNumber] int? line = default)
4853
{
4954
if (operation == Publish)
5055
events.Add(1, new KeyValuePair<string, object?>("Name", type.FullName));
@@ -55,7 +60,7 @@ static Telemetry()
5560
// Requirement is that the destination has low cardinality. In our case, the destination is
5661
// the logical operation being performed, such as "Execute", "Notify" or "Deliver". The
5762
// operation is actually the type being acted on (such as CreateUser -a command- or UserCreated -event).
58-
return tracer.StartActivity(ActivityKind.Producer, name: $"{operation}/{type.FullName}")
63+
var activity = tracer.CreateActivity($"{operation}/{type.FullName}", ActivityKind.Producer)
5964
?.SetTag("code.function", member)
6065
?.SetTag("code.filepath", file)
6166
?.SetTag("code.lineno", line)
@@ -65,6 +70,13 @@ static Telemetry()
6570
?.SetTag("messaging.operation", operation.ToLowerInvariant())
6671
?.SetTag("messaging.protocol.name", type.Assembly.GetName().Name)
6772
?.SetTag("messaging.protocol.version", type.Assembly.GetName().Version?.ToString() ?? "unknown");
73+
74+
if (property != null && value != null)
75+
activity?.SetCustomProperty(property, value);
76+
77+
activity?.Start();
78+
79+
return activity;
6880
}
6981

7082
public static void RecordException(this Activity? activity, Exception e)

src/Merq.Tests/MessageBusSpec.cs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Diagnostics;
34
using System.Reactive.Subjects;
45
using System.Threading;
56
using System.Threading.Tasks;
@@ -443,5 +444,61 @@ public async Task when_executing_non_public_àsynccommand_result_then_invokes_ha
443444
Assert.Equal(42, await bus.ExecuteAsync(new NonPublicAsyncCommandResult(), CancellationToken.None));
444445
}
445446

447+
[Fact]
448+
public void when_notifying_can_access_event_from_activity_stop()
449+
{
450+
object? e = default;
451+
using var listener = new ActivityListener
452+
{
453+
ActivityStopped = activity => e = activity.GetCustomProperty("Event"),
454+
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData,
455+
ShouldListenTo = source => source.Name == "Merq",
456+
};
457+
458+
ActivitySource.AddActivityListener(listener);
459+
460+
bus.Notify(new ConcreteEvent());
461+
462+
Assert.NotNull(e);
463+
Assert.IsType<ConcreteEvent>(e);
464+
}
465+
466+
[Fact]
467+
public void when_executing_can_access_event_from_activity_stop()
468+
{
469+
object? c = default;
470+
using var listener = new ActivityListener
471+
{
472+
ActivityStopped = activity => c = activity.GetCustomProperty("Command"),
473+
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData,
474+
ShouldListenTo = source => source.Name == "Merq",
475+
};
476+
477+
ActivitySource.AddActivityListener(listener);
478+
479+
Assert.Throws<InvalidOperationException>(() => bus.Execute(new Command()));
480+
481+
Assert.NotNull(c);
482+
}
483+
484+
[Fact]
485+
public void when_execute_throws_activity_has_error_status()
486+
{
487+
Activity? activity = default;
488+
using var listener = new ActivityListener
489+
{
490+
ActivityStarted = x => activity = x,
491+
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData,
492+
ShouldListenTo = source => source.Name == "Merq",
493+
};
494+
495+
ActivitySource.AddActivityListener(listener);
496+
497+
Assert.Throws<InvalidOperationException>(() => bus.Execute(new Command()));
498+
499+
Assert.NotNull(activity);
500+
Assert.Equal(ActivityStatusCode.Error, activity.Status);
501+
}
502+
446503
class NestedEvent { }
447504
}

src/Samples/ConsoleApp/Program.cs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,14 @@
3636
var bus = services.GetRequiredService<IMessageBus>();
3737

3838
// .NET-style activity listening
39-
//using var listener = new ActivityListener
40-
//{
41-
// ActivityStarted = activity => MarkupLine($"[red]Activity started: {activity.OperationName}[/]"),
42-
// ActivityStopped = activity => MarkupLine($"[red]Activity stopped: {activity.OperationName}[/]"),
43-
// ShouldListenTo = source => source.Name == "Merq.Core",
44-
//};
39+
using var listener = new ActivityListener
40+
{
41+
ActivityStarted = activity => MarkupLine($"[grey]Activity started: {activity.OperationName}[/]"),
42+
ActivityStopped = activity => MarkupLine($"[grey]Activity stopped: {activity.OperationName}[/]"),
43+
ShouldListenTo = source => source.Name == "Merq",
44+
};
45+
46+
ActivitySource.AddActivityListener(listener);
4547

4648
// Setup OpenTelemetry: https://learn.microsoft.com/en-us/dotnet/core/diagnostics/distributed-tracing-instrumentation-walkthroughs
4749
using var tracer = Sdk

0 commit comments

Comments
 (0)