Skip to main content

Patterns

Jobly provides four patterns for dispatching work. All share a unified pipeline and type hierarchy.

PatternInterfacePersistenceHandlersResponse
MessagesIMessageDatabaseMultipleNone (Unit)
JobsIJobDatabaseSingleNone (Unit)
RequestsIRequest<T>In-memorySingleTyped T
StreamsIStreamRequest<T>In-memorySingleIAsyncEnumerable<T>

Type Hierarchy

All types implement IRequest<TResponse>:

public interface IRequest<out TResponse>; // Base
public interface IJob : IRequest<Unit>; // Persistent, single handler
public interface IMessage : IRequest<Unit>; // Persistent, multiple handlers
// IRequest<TResponse> used directly // In-memory, returns TResponse
public interface IStreamRequest<out TResponse> : IRequest<IAsyncEnumerable<TResponse>>; // In-memory, streams TResponse

Pipeline Behaviors

A unified pipeline wraps all handler invocations — jobs, messages, and requests:

public class LoggingBehavior<T, TResponse> : IPipelineBehavior<T, TResponse>
where T : IRequest<TResponse>
{
public async Task<TResponse> HandleAsync(T request, RequestHandlerDelegate<TResponse> next, CancellationToken ct)
{
_logger.LogInformation("Starting {Type}", typeof(T).Name);
var result = await next();
_logger.LogInformation("Completed {Type}", typeof(T).Name);
return result;
}
}

Register as an open generic:

builder.Services.AddTransient(typeof(IPipelineBehavior<,>), typeof(LoggingBehavior<,>));

You can also target specific types:

// Only for GetUser requests
public class CacheBehavior : IPipelineBehavior<GetUser, UserDto> { ... }

// Only for jobs (any IJob)
public class RetryBehavior<T> : IPipelineBehavior<T, Unit> where T : IJob { ... }

For jobs and messages, TResponse is Unit. For requests, it's your custom response type. For streams, it's IAsyncEnumerable<T>. Logger output from pipeline behaviors appears in the job detail "Handler Output" section.

Stream requests use a separate pipeline — IStreamPipelineBehavior<TRequest, TResponse>:

public class StreamLoggingBehavior<T, TResponse> : IStreamPipelineBehavior<T, TResponse>
where T : IStreamRequest<TResponse>
{
public async IAsyncEnumerable<TResponse> HandleAsync(T request, StreamHandlerDelegate<T, TResponse> next, [EnumeratorCancellation] CancellationToken ct)
{
_logger.LogInformation("Starting stream {Type}", typeof(T).Name);
await foreach (var item in next(request, ct).WithCancellation(ct))
{
yield return item;
}

_logger.LogInformation("Completed stream {Type}", typeof(T).Name);
}
}