# Jobly — Complete AI Reference > Distributed job processing, message queue, and in-memory mediator library for .NET 10. Jobly provides three patterns — Messages (pub/sub), Jobs (orchestrated background work), and Requests (in-memory mediator) — through a unified pipeline. It uses the transactional outbox pattern: jobs are committed atomically with business data in the same DbContext. No separate message broker required. Ships as NuGet packages: Moberg.Jobly.Core, Moberg.Jobly.Worker, Moberg.Jobly.UI. Supports PostgreSQL and SQL Server. --- ## Installation ```bash dotnet add package Moberg.Jobly.Core # Publishing (required) dotnet add package Moberg.Jobly.Worker # Worker service (if processing jobs) dotnet add package Moberg.Jobly.UI # Dashboard (optional) ``` --- ## Setup ### 1. Register DbContext Register your DbContext normally. Jobly hooks into it automatically: ```csharp builder.Services.AddDbContext(options => options.UseNpgsql(builder.Configuration.GetConnectionString("Default"))); ``` ### 2. Register Jobly For publisher-only apps (create jobs but don't process them): ```csharp builder.Services.AddJobly(); builder.Services.AddJobHandlers(typeof(Program).Assembly); ``` For worker apps (create AND process jobs): ```csharp builder.Services.AddJoblyWorker(options => { options.WorkerCount = 10; options.Queues = ["default"]; options.PollingInterval = TimeSpan.FromSeconds(1); }); builder.Services.AddJobHandlers(typeof(Program).Assembly); ``` `AddJoblyWorker` includes `AddJobly` internally — don't call both. ### 3. Create Database Schema Jobly entities are added to your DbContext model automatically. Use standard EF Core migrations: ```bash dotnet ef migrations add AddJobly dotnet ef database update ``` When upgrading the Jobly NuGet package: ```bash dotnet ef migrations add UpgradeJobly dotnet ef database update ``` For quick start/development only (does not support upgrades): ```csharp await context.Database.EnsureCreatedAsync(); ``` ### 4. Add Dashboard (Optional) ```csharp app.UseJoblyUI(); // Serves at /jobly ``` ### 5. Complete Minimal Example ```csharp var builder = WebApplication.CreateBuilder(args); builder.Services.AddDbContext(options => options.UseNpgsql(builder.Configuration.GetConnectionString("Default"))); builder.Services.AddJoblyWorker(options => { options.WorkerCount = 5; options.Queues = ["default"]; }); builder.Services.AddJobHandlers(typeof(Program).Assembly); var app = builder.Build(); app.UseJoblyUI(); app.Run(); ``` --- ## Three Patterns ### Type Hierarchy All types implement `IRequest`: ```csharp public interface IRequest; // Base — all types implement this public interface IJob : IRequest; // Persistent, single handler public interface IMessage : IRequest; // Persistent, multiple handlers // IRequest used directly // In-memory, returns TResponse ``` | Pattern | Interface | Persistence | Handlers | Response | Dispatch | |---------|-----------|-------------|----------|----------|----------| | Messages | `IMessage` | Database | Multiple | Unit | `publisher.Publish()` | | Jobs | `IJob` | Database | Single | Unit | `publisher.Enqueue()` / `Schedule()` | | Requests | `IRequest` | In-memory | Single | Typed `T` | `mediator.Send()` | --- ## Pattern 1: Messages (Pub/Sub) Messages implement `IMessage` and have multiple handlers. Each handler becomes an independent, retryable job. ### Define a message and handlers ```csharp public class OrderPlaced : IMessage { public int OrderId { get; set; } } public class SendConfirmationEmail : IMessageHandler { private readonly ILogger _logger; public SendConfirmationEmail(ILogger logger) => _logger = logger; public async Task HandleAsync(OrderPlaced message, CancellationToken ct) { _logger.LogInformation("Sending email for order {OrderId}", message.OrderId); // ILogger output is captured and viewable in the dashboard } } public class NotifyWarehouse : IMessageHandler { public async Task HandleAsync(OrderPlaced message, CancellationToken ct) { // Runs independently — if email handler fails, this still succeeds } } ``` ### Publish ```csharp await publisher.Publish(new OrderPlaced { OrderId = 123 }); await context.SaveChangesAsync(); // Committed atomically with business data ``` ### How message routing works 1. `Publish()` creates a `Job` entity with `Kind = Message` in the database 2. `MessageRouter` (background task, polls every 1s) discovers all registered `IMessageHandler` implementations 3. For each handler, a child `Job` is created with `Kind = Job` and the handler pre-assigned 4. Workers pick up each child job independently 5. When all children complete, the parent message transitions to `Completed` or `Failed` --- ## Pattern 2: Jobs (Background Work) Jobs implement `IJob` and have a single handler. They support scheduling, retries, continuations, batches, named queues, and mutex. ### Define a job and handler ```csharp public class GenerateReport : IJob { public int ReportId { get; set; } } public class GenerateReportHandler : IJobHandler { private readonly ILogger _logger; public GenerateReportHandler(ILogger logger) => _logger = logger; public async Task HandleAsync(GenerateReport message, CancellationToken ct) { _logger.LogInformation("Generating report {ReportId}", message.ReportId); // Handler logs appear in the job's Activity Log on the dashboard } } ``` ### Enqueue ```csharp await publisher.Enqueue(new GenerateReport { ReportId = 1 }); await publisher.SaveChangesAsync(); ``` ### Schedule for later ```csharp await publisher.Schedule(new GenerateReport { ReportId = 1 }, DateTime.UtcNow.AddHours(1)); await publisher.SaveChangesAsync(); ``` ### Retries Declare retry policy on the handler or job class: ```csharp [Retry(3, Delays = [15, 60, 300])] public class GenerateReportHandler : IJobHandler { ... } ``` Or configure global defaults: ```csharp services.AddJoblyRetry(o => { o.MaxRetries = 3; o.Delays = [15, 60, 300]; o.JitterFactor = 0.2; }); ``` Priority: per-enqueue metadata > handler attribute > job attribute > global RetryOptions. Failed jobs are retried automatically. Crash requeues do NOT count against the retry limit. `JitterFactor` (global only, default `0.0`, clamped to `[0, 1]`) applies multiplicative random jitter to each computed delay: `delay * (1 + JitterFactor * rand(-1, 1))`. Spreads retries to avoid thundering herds when many jobs fail at the same time. ### Named queues ```csharp await publisher.Enqueue(new GenerateReport { ReportId = 1 }, queue: "reports"); ``` Queues are processed in alphabetical order. Use prefixes to control priority: ```csharp options.Queues = ["a-critical", "b-default", "c-low"]; // Workers always pick from a-critical before b-default, etc. ``` ### Continuations ```csharp var parentId = await publisher.Enqueue(new ProcessPayment { OrderId = 1 }); await publisher.Enqueue(new SendReceipt { OrderId = 1 }, parentId); // Runs after parent completes await publisher.SaveChangesAsync(); ``` ### Batches ```csharp var batchPublisher = serviceProvider.GetRequiredService(); var jobs = orders.Select(o => new ProcessOrder { OrderId = o.Id }).ToList(); var batchId = await batchPublisher.StartNew(jobs); await batchPublisher.SaveChangesAsync(); // Continuation after batch completes var followUps = new List { new() }; await batchPublisher.ContinueBatchWith(followUps, batchId); await batchPublisher.SaveChangesAsync(); ``` #### ContinuationOptions ```csharp // Default: continuation fires only when ALL jobs succeed await batchPublisher.StartNew(jobs, options: ContinuationOptions.OnlyOnSucceeded); // Fire when all jobs finish, regardless of success/failure await batchPublisher.StartNew(jobs, options: ContinuationOptions.OnAnyFinishedState); ``` With `OnlyOnSucceeded` (default): if any job fails, the batch transitions to `Failed` and continuations stay in `Awaiting` state. You can requeue failed jobs from the dashboard. With `OnAnyFinishedState`: continuations fire as soon as all jobs reach a terminal state. ### Mutex (concurrency control) Only one job per mutex key can be processing at a time: ```csharp await publisher.Enqueue( new ProcessPayment { CustomerId = 123 }, new JobParameters { Mutex = "payment:123" }); ``` If a worker picks up a job whose mutex is already held by another processing job, the new job is cancelled automatically. Jobs without a mutex skip the check entirely (zero overhead). ### NoRestart (stale-recovery opt-out) Opt-in addon: `services.AddJoblyNoRestart()`. Lets specific job types stay `Failed` on worker crash instead of being auto-requeued by `StaleJobRecovery`. ```csharp [NoRestart] public class ChargeCard : IJob { } ``` Override chain (first non-null wins): per-publish `.WithRestart(bool)` > `[NoRestart]`/`[Restart]` attribute (inherits through base classes, with direct-declaration precedence) > global `RestartStaleJobsByDefault` (default `true`). Use for non-idempotent work: payments, webhooks, outbound notifications. ### Circuit Breaker Opt-in addon: `services.AddJoblyCircuitBreaker()`. Adds a `CircuitBreakerState` entity (migration required). Opens after `Threshold` consecutive failures, stays open for `Duration`, then transitions to **HalfOpen** where an atomic CAS guarantees exactly one worker probes while others reschedule. ```csharp services.AddJoblyCircuitBreaker(o => { o.Threshold = 5; o.Duration = TimeSpan.FromMinutes(1); o.ResetJitter = TimeSpan.FromSeconds(10); }); [CircuitBreaker(Group = "payments-gateway")] public class ChargeCard : IJob { } ``` Jobs rescheduled by an open circuit do NOT increment Retry's `RetriedTimes` — the retry budget is preserved for when the circuit closes. ### Batched Completions (Dispatcher Mode) When `UseDispatcher = true`, each worker buffers job completions and commits them as a single multi-row transaction. Tune via `CompletionBatchSize` (default `50`) and `CompletionFlushInterval` (default `100ms`). Set `CompletionBatchSize = 1` to opt out. Poison-entry isolation: a single bad row in a batch of 50 is dropped via recursive split, the other 49 still commit. SIGTERM mid-flush is safe — `FlushAsync()` commits to completion using `CancellationToken.None` internally. Trade-off: at-least-once semantics. Pair with `[NoRestart]` for non-idempotent handlers. ### JobParameters For full control, pass a `JobParameters` object: ```csharp await publisher.Enqueue(new MyJob(), new JobParameters { MaxRetries = 3, Queue = "critical", Mutex = "my-resource-key", ScheduleTime = DateTime.UtcNow.AddMinutes(30), ParentId = parentJobId, // continuation }); ``` --- ## Pattern 3: Requests (In-Memory Mediator) Requests implement `IRequest` and execute immediately in-process. No database, no worker, no retries. ### Define a request and handler ```csharp public class GetUser : IRequest { public int UserId { get; set; } } public class GetUserHandler : IRequestHandler { private readonly AppDbContext _db; public GetUserHandler(AppDbContext db) => _db = db; public async Task HandleAsync(GetUser request, CancellationToken ct) { var user = await _db.Users.FindAsync(request.UserId, ct); return new UserDto { Id = user.Id, Name = user.Name }; } } ``` ### Send via IMediator ```csharp public class UserController : ControllerBase { private readonly IMediator _mediator; public async Task GetUser(int id) { var user = await _mediator.Send(new GetUser { UserId = id }); return Ok(user); } } ``` ### Key differences from Jobs/Messages - Requests are NOT persisted to the database - They execute immediately in-process, returning `TResponse` directly - Errors bubble up to the caller (no automatic retries) - Not visible in the dashboard - Use `IMediator.Send()`, not `IPublisher.Enqueue()` --- ## Transactional Outbox Pattern Jobs are created inside the same database transaction as your business data. A single `SaveChangesAsync()` commits everything atomically. ```csharp public async Task PlaceOrder(CreateOrderRequest request) { var order = new Order { CustomerId = request.CustomerId }; _context.Orders.Add(order); // These jobs are written to the same DbContext — not yet committed await _publisher.Enqueue(new SendConfirmationEmail { OrderId = order.Id }); await _publisher.Publish(new OrderPlaced { OrderId = order.Id }); // Single SaveChangesAsync commits the order AND all jobs atomically await _context.SaveChangesAsync(); return Ok(order.Id); } ``` If `SaveChangesAsync()` fails, both the order and the jobs roll back. No orphaned jobs. ### DbContext must be Scoped The publisher and your application code must share the same DbContext instance. Register your DbContext as Scoped (the EF Core default). Do NOT use Transient lifetime. ### With explicit transactions ```csharp await using var transaction = await context.Database.BeginTransactionAsync(); context.Orders.Add(order); await publisher.Enqueue(new ProcessOrder { OrderId = order.Id }); await context.SaveChangesAsync(); await transaction.CommitAsync(); // Jobs visible to workers only after commit ``` --- ## Pipeline Behaviors A unified pipeline wraps all handler invocations — jobs, messages, and requests. Implement `IPipelineBehavior` for cross-cutting concerns: ```csharp public class LoggingBehavior : IPipelineBehavior where TRequest : IRequest { private readonly ILogger> _logger; public LoggingBehavior(ILogger> logger) => _logger = logger; public async Task HandleAsync( TRequest request, RequestHandlerDelegate next, CancellationToken ct) { _logger.LogInformation("Handling {RequestType}", typeof(TRequest).Name); var response = await next(); _logger.LogInformation("Handled {RequestType}", typeof(TRequest).Name); return response; } } ``` ### Registration Auto-scan an assembly: ```csharp builder.Services.AddPipelineBehaviors(typeof(Program).Assembly); ``` Or register manually: ```csharp builder.Services.AddTransient(typeof(IPipelineBehavior<,>), typeof(LoggingBehavior<,>)); ``` ### Targeting specific types ```csharp // Only for GetUser requests public class CacheBehavior : IPipelineBehavior { ... } // Only for jobs (any IJob) public class RetryBehavior : IPipelineBehavior where T : IJob { ... } ``` For jobs and messages, `TResponse` is `Unit`. Behaviors execute in registration order, outermost first. --- ## Recurring Jobs Register a cron-based recurring job: ```csharp var recurringPublisher = serviceProvider.GetRequiredService(); await recurringPublisher.AddOrUpdateRecurringJob( new CleanupSessions(), name: "session-cleanup", cron: "0 * * * *"); // Every hour ``` `AddOrUpdateRecurringJob` only registers the definition — it does NOT create a job. The `RecurringJobScheduler` background task creates jobs when the cron time arrives. ### Cron expressions Standard 5-part (minute, hour, day, month, weekday) and 6-part with seconds: ``` * * * * * Every minute 0 * * * * Every hour 0 9 * * * Daily at 9 AM 0 0 * * 1 Every Monday at midnight */5 * * * * Every 5 minutes 0 9 * * 1-5 Weekdays at 9 AM ``` ### Update Call `AddOrUpdateRecurringJob` again with the same name: ```csharp await recurringPublisher.AddOrUpdateRecurringJob( new CleanupSessions(), name: "session-cleanup", cron: "*/30 * * * *"); // Changed to every 30 minutes ``` ### Manual trigger ```csharp var svc = serviceProvider.GetRequiredService(); await svc.TriggerRecurringJob(id); ``` ### Delete ```csharp await recurringJobService.DeleteRecurringJob(id); ``` ### Deduplication Before creating a new job, the scheduler checks if the most recent execution is still Enqueued or Processing. If so, it skips — no duplicate jobs. --- ## Job Cancellation Jobly uses graceful cancellation. When you cancel a processing job, the handler gets a chance to stop cleanly. ### Cancel a job ```csharp await jobCommandService.DeleteJob(jobId); ``` ### Flow 1. `DeleteJob` sets `CancellationMode = Graceful` (state stays `Processing`) 2. `RunJobMonitor` detects it and cancels the handler's `CancellationToken` 3. If handler respects the token: state becomes `Deleted` 4. If handler completes despite cancellation: state becomes `Completed` (work happened) ### Writing cancellable handlers ```csharp public class LongRunningHandler : IJobHandler { public async Task HandleAsync(LongRunningJob message, CancellationToken ct) { foreach (var item in items) { ct.ThrowIfCancellationRequested(); await ProcessItem(item); } } } ``` --- ## Crash Recovery Jobly uses a sliding invisibility timeout to detect and recover from worker crashes. 1. When a worker picks up a job, it sets `LastKeepAlive = now` 2. During execution, keep-alive is refreshed every `InvisibilityTimeout / 5` (default: every 60s with a 5-minute timeout) 3. If the worker crashes, the keep-alive stops 4. After `InvisibilityTimeout` (default 5 minutes), the job is requeued automatically 5. Crash requeues do NOT count against `MaxRetries` --- ## Job Tracing Every job gets two trace fields: - `TraceId` — All related jobs share this ID. The first job creates it. - `SpawnedByJobId` — Direct "who created me" link. When a handler creates new jobs, they automatically inherit the trace via `AsyncLocal` context: ```csharp public class ProcessOrderHandler : IJobHandler { private readonly IBatchPublisher _batchPublisher; public async Task HandleAsync(ProcessOrder message, CancellationToken ct) { // These jobs automatically inherit the trace from ProcessOrder var items = orders.Select(i => new ShipItem { ItemId = i.Id }).ToList(); await _batchPublisher.StartNew(items); } } ``` --- ## Job Metadata Attach key-value metadata to jobs at publish time. Metadata is inherited by child jobs, accessible in handlers via `IJobContext`, and visible in the dashboard. ### Ad-hoc metadata ```csharp await publisher.Enqueue(new ProcessOrder { OrderId = 123 }, new JobParameters { Metadata = new Dictionary { ["tenant"] = "acme-corp", ["priority"] = "high", }, }); ``` ### Reading metadata in handlers ```csharp public class ProcessOrderHandler : IJobHandler { private readonly IJobContext _jobContext; public ProcessOrderHandler(IJobContext jobContext) => _jobContext = jobContext; public async Task HandleAsync(ProcessOrder message, CancellationToken ct) { var tenant = _jobContext.Metadata["tenant"]; var jobId = _jobContext.JobId; var traceId = _jobContext.TraceId; } } ``` ### Publish pipeline behaviors For cross-cutting metadata (e.g., tenant ID on every job), implement `IPublishPipelineBehavior`: ```csharp public class TenantMetadataBehavior : IPublishPipelineBehavior { private readonly ITenantProvider _tenantProvider; public TenantMetadataBehavior(ITenantProvider tenantProvider) => _tenantProvider = tenantProvider; public async Task PublishAsync(PublishContext context, PublishDelegate next, CancellationToken ct) { context.Metadata["tenant"] = _tenantProvider.CurrentTenantId; await next(); } } // Register as open generic builder.Services.AddTransient(typeof(IPublishPipelineBehavior<>), typeof(TenantMetadataBehavior<>)); ``` ### Metadata inheritance When a handler creates child jobs, metadata from the parent is automatically inherited. Ad-hoc metadata from `JobParameters` merges with inherited metadata (ad-hoc wins on key conflict). ### Metadata sources (priority order) 1. Inherited metadata — from parent job's execution context 2. Ad-hoc metadata — from `JobParameters.Metadata` 3. Pipeline metadata — from `IPublishPipelineBehavior` implementations Later sources override earlier ones for the same key. --- ## Pause / Resume Pause and resume job processing at the server or worker group level. ### API ``` POST /api/servers/{serverId}/pause POST /api/servers/{serverId}/resume POST /api/groups/{groupId}/pause POST /api/groups/{groupId}/resume ``` ### How it works 1. Pause request sets `PausedAt` timestamp on the server or worker group 2. `Heartbeat` task (every ~3s) reads pause state from database into `PauseStateHolder` 3. Workers check `PauseStateHolder` before each poll — if paused, skip 4. Resume clears `PausedAt`, next heartbeat propagates the change Paused workers stop picking up new jobs. Jobs already in progress continue to completion. --- ## Real-time Handler Logs Handler `ILogger` output is flushed to the database every ~1 second during execution (not just after completion). Logs are visible in the dashboard while the job is still processing. The `RunJobMonitor` task runs alongside each handler execution, periodically draining the log collector and persisting entries. This also handles keep-alive updates and cancellation detection. --- ## Configuration Reference ### Core Configuration (AddJobly) ```csharp builder.Services.AddJobly(options => { options.Schema = "jobly"; // Database schema (default: "jobly", null for default) options.DefaultQueue = "default"; // Default queue name (default: "default") options.JobExpirationTimeout = TimeSpan.FromDays(1); // Completed/deleted job retention (default: 1 day) }); ``` ### Worker Configuration (AddJoblyWorker) ```csharp builder.Services.AddJoblyWorker(options => { // Worker options.WorkerCount = 10; // Concurrent workers (default: min(CPU*5, 20)) options.PollingInterval = TimeSpan.FromSeconds(1); // Poll delay when idle; also floor for backoff (default: 1s) options.MaxPollingInterval = TimeSpan.FromSeconds(30); // Ceiling for exponential backoff (default: 30s) options.PollingIntervalFactor = 2.0; // Multiplier per empty poll; 1.0 disables backoff (default: 2.0) options.Queues = ["a-critical", "b-default", "c-low"]; // Subscribed queues (default: ["default"]) // Handler logging options.EnableHandlerLogging = true; // false to suppress ILogger capture (default: true) options.LogFlushInterval = TimeSpan.FromSeconds(1); // How often handler logs drain to DB (default: 1s) // Dispatcher mode (batch-fetch instead of per-worker polling) options.UseDispatcher = false; // default: false options.CompletionBatchSize = 50; // Dispatcher only. Max completions per flush (default: 50) options.CompletionFlushInterval = TimeSpan.FromMilliseconds(100); // Dispatcher only (default: 100ms) // Stale recovery opt-out default — with NoRestart addon and [NoRestart]/[Restart] attributes, flip per-type options.RestartStaleJobsByDefault = true; // false makes stale jobs Failed by default (default: true) // Cancellation options.CancellationCheckInterval = TimeSpan.FromSeconds(5); // default: 5s // Server identity options.ServerName = "my-api-server"; // Dashboard display name options.ServerId = Guid.NewGuid(); // Auto-generated // Health & crash recovery options.HealthCheckInterval = TimeSpan.FromSeconds(10); // default: 10s options.HealthCheckTimeout = TimeSpan.FromMinutes(5); // Server considered dead after this (default: 5m) options.InvisibilityTimeout = TimeSpan.FromMinutes(5); // Stale job requeue threshold (default: 5m) // Job retention options.JobExpirationTimeout = TimeSpan.FromDays(1); // default: 1 day options.ExpirationBatchSize = 1000; // default: 1000 options.MaxExpirableJobCount = null; // null = unlimited (default: null) // Background task intervals options.OrchestrationInterval = TimeSpan.FromSeconds(10); options.MessageRoutingInterval = TimeSpan.FromSeconds(1); options.CounterAggregationInterval = TimeSpan.FromSeconds(5); options.ServerCleanupInterval = TimeSpan.FromSeconds(30); options.StaleJobRecoveryInterval = TimeSpan.FromSeconds(30); options.ExpirationCleanupInterval = TimeSpan.FromSeconds(60); // Inherited from JoblyConfiguration options.DefaultQueue = "default"; }); ``` ### Worker Groups Split workers into groups with independent queues and polling intervals: ```csharp builder.Services.AddJoblyWorker(options => { // Top-level settings become the first worker group options.WorkerCount = 5; options.Queues = ["critical"]; options.PollingInterval = TimeSpan.FromMilliseconds(100); // Additional group options.AddWorkerGroup(group => { group.WorkerCount = 2; group.Queues = ["reports", "default"]; group.PollingInterval = TimeSpan.FromSeconds(5); group.MaxPollingInterval = TimeSpan.FromSeconds(60); // per-group ceiling group.PollingIntervalFactor = 2.0; // per-group multiplier }); }); // Creates 7 workers total: 5 on "critical" @ 100ms, 2 on "reports"/"default" @ 5s // On idle queues, each group backs off geometrically: floor → floor*factor → … → MaxPollingInterval. // Any processed job resets the delay to the floor instantly. ``` ### Dashboard Configuration ```csharp app.UseJoblyUI(options => { options.Authorization = new MyAuthFilter(); options.UnauthorizedRedirectUrl = "/login"; // optional redirect for browsers }); ``` --- ## Dashboard Authorization By default, the dashboard is open. Two approaches to restrict access: ### Custom auth filter Use your existing authentication: ```csharp public class MyAuthFilter : IJoblyAuthorizationFilter { public bool Authorize(HttpContext httpContext) { return httpContext.User.Identity?.IsAuthenticated == true && httpContext.User.IsInRole("Admin"); } } app.UseJoblyUI(options => { options.Authorization = new MyAuthFilter(); options.UnauthorizedRedirectUrl = "/login"; }); ``` `UseJoblyUI()` must come AFTER `UseAuthentication()` and `UseAuthorization()` in the pipeline. ### Built-in login Jobly serves its own login page and manages session via HTTP-only signed cookie: ```csharp builder.Services.AddDataProtection(); builder.Services.AddScoped(); app.UseJoblyUI(options => { options.UseBuiltInLogin(); }); ``` ```csharp public class MyCredentialValidator : IJoblyCredentialValidator { private readonly AppDbContext _db; public MyCredentialValidator(AppDbContext db) => _db = db; public async Task ValidateAsync(string username, string password) { var user = await _db.Users.FirstOrDefaultAsync(u => u.Username == username); return user != null && BCrypt.Verify(password, user.PasswordHash); } } ``` ### Localhost-only filter (built-in) ```csharp options.Authorization = new LocalRequestsOnlyAuthorizationFilter(); ``` --- ## Job Retention - Completed/Deleted jobs: auto-expire after `JobExpirationTimeout` (default 1 day) - Failed jobs: NEVER auto-expire (manual intervention required) - Optional count-based cleanup: `MaxExpirableJobCount` deletes oldest by ExpireAt when threshold exceeded - Statistics survive job deletion (persistent counters) - Recurring job logs: last 100 per recurring job retained --- ## DI Services Available After calling `AddJobly()` or `AddJoblyWorker()`, these services are available via dependency injection: | Service | Purpose | |---------|---------| | `IPublisher` | Enqueue jobs, publish messages, schedule jobs | | `IMediator` | Send in-memory requests | | `IBatchPublisher` | Create batches with continuations | | `IRecurringJobPublisher` | Register/update recurring job definitions | | `IJobQueryService` | Query jobs, job details, traces | | `IJobCommandService` | Delete, requeue, bulk operations on jobs | | `IJobGroupQueryService` | Query messages and batches | | `IRecurringJobService` | Trigger, delete recurring jobs, query history | | `IDashboardStatsService` | Dashboard statistics, server/worker info | | `IJobContext` | Current job's ID, trace ID, and metadata (available inside handlers) | | `IServerCommandService` | Pause/resume servers and worker groups | | `TimeProvider` | Injectable time (registered as `TimeProvider.System` by default, override in tests) | --- ## Public API Reference ### Handler Interfaces ```csharp // Base request — all types implement this public interface IRequest; // Job marker — persistent, single handler public interface IJob : IRequest; // Message marker — persistent, multiple handlers public interface IMessage : IRequest; // Handler for jobs public interface IJobHandler where T : IJob { Task HandleAsync(T message, CancellationToken cancellationToken); } // Handler for messages public interface IMessageHandler where T : IMessage { Task HandleAsync(T message, CancellationToken cancellationToken); } // Handler for requests public interface IRequestHandler where TRequest : IRequest { Task HandleAsync(TRequest request, CancellationToken cancellationToken); } // Pipeline behavior — wraps all handler types public interface IPipelineBehavior where TRequest : IRequest { Task HandleAsync(TRequest request, RequestHandlerDelegate next, CancellationToken cancellationToken); } ``` ### IJobContext & IJobMetadata ```csharp public interface IJobMetadata { IReadOnlyDictionary Metadata { get; } } public interface IJobContext : IJobMetadata { Guid JobId { get; } Guid TraceId { get; } } ``` ### IPublishPipelineBehavior ```csharp public class PublishContext : IJobMetadata { public required T Job { get; init; } public Dictionary Metadata { get; init; } = new(); } public interface IPublishPipelineBehavior { Task PublishAsync(PublishContext context, PublishDelegate next, CancellationToken ct); } ``` ### IPublisher The most common overloads are shown below. For full control, use the `JobParameters` overload which accepts all options in a single call. ```csharp public interface IPublisher { // Messages (pub/sub) Task Publish(T message) where T : class, IMessage; Task Publish(T message, string? queue) where T : class, IMessage; // Jobs — common overloads (additional combinations of maxRetries, queue, parentJobId exist) Task Enqueue(T job) where T : class, IJob; Task Enqueue(T job, int maxRetries) where T : class, IJob; Task Enqueue(T job, Guid parentJobId) where T : class, IJob; Task Enqueue(T job, string queue) where T : class, IJob; Task Enqueue(T job, JobParameters parameters) where T : class, IJob; // Full control Task Schedule(T job, DateTime scheduleTime) where T : class, IJob; Task SaveChangesAsync(CancellationToken cancellationToken = default); } ``` Note: `publisher.SaveChangesAsync()` and `context.SaveChangesAsync()` flush the same underlying DbContext — they are interchangeable. Use whichever is available. The outbox pattern examples use `context.SaveChangesAsync()` to emphasize that jobs and business data are committed together. ### IMediator ```csharp public interface IMediator { Task Send(IRequest request, CancellationToken cancellationToken = default); } ``` ### IBatchPublisher ```csharp public interface IBatchPublisher { Task StartNew(List jobs, string? name = null, ContinuationOptions options = ContinuationOptions.OnlyOnSucceeded) where T : class, IJob; Task ContinueBatchWith(List jobs, Guid parentId, string? name = null, ContinuationOptions options = ContinuationOptions.OnlyOnSucceeded) where T : class, IJob; Task SaveChangesAsync(CancellationToken cancellationToken = default); } ``` ### IRecurringJobPublisher ```csharp public interface IRecurringJobPublisher { Task AddOrUpdateRecurringJob(T message, string name, string cron) where T : class, IJob; } ``` ### IJoblyAuthorizationFilter ```csharp public interface IJoblyAuthorizationFilter { bool Authorize(HttpContext httpContext); } ``` ### IJoblyCredentialValidator ```csharp public interface IJoblyCredentialValidator { Task ValidateAsync(string username, string password); } ``` ### JobParameters ```csharp public class JobParameters { public int? MaxRetries { get; set; } public string? Queue { get; set; } public string? Mutex { get; set; } public DateTime? ScheduleTime { get; set; } public Guid? ParentId { get; set; } public Dictionary? Metadata { get; set; } } ``` ### Enums ```csharp public enum State { Enqueued = 1, Awaiting = 2, Processing = 3, Completed = 4, Failed = 5, Deleted = 6 } public enum ContinuationOptions { OnlyOnSucceeded = 1, OnAnyFinishedState = 2 } public enum JobKind { Job = 1, Message = 2, Batch = 3 } public enum CancellationMode { None = 0, Graceful = 1 } ``` ### Extension Methods ```csharp // Core registration IServiceCollection.AddJobly(); IServiceCollection.AddJobly(Action options); // Worker registration (includes AddJobly internally) IServiceCollection.AddJoblyWorker(); IServiceCollection.AddJoblyWorker(Action options); // Handler scanning IServiceCollection.AddJobHandlers(Assembly assembly); // IJobHandler<>, IMessageHandler<>, IRequestHandler<,> IServiceCollection.AddPipelineBehaviors(Assembly assembly); // IPipelineBehavior<,> // Dashboard WebApplication.UseJoblyUI(); WebApplication.UseJoblyUI(Action setupAction); ``` --- ## Common Patterns ### Controller with all three patterns ```csharp public class OrderController : ControllerBase { private readonly IPublisher _publisher; private readonly IMediator _mediator; private readonly AppDbContext _context; public OrderController(IPublisher publisher, IMediator mediator, AppDbContext context) { _publisher = publisher; _mediator = mediator; _context = context; } [HttpPost] public async Task CreateOrder(CreateOrderRequest request) { // In-memory request for validation var isValid = await _mediator.Send(new ValidateOrder { Data = request }); if (!isValid) return BadRequest(); var order = new Order { CustomerId = request.CustomerId }; _context.Orders.Add(order); // Background job await _publisher.Enqueue(new SendConfirmationEmail { OrderId = order.Id }); // Pub/sub message — multiple handlers await _publisher.Publish(new OrderPlaced { OrderId = order.Id }); // All committed atomically await _context.SaveChangesAsync(); return Ok(order.Id); } } ``` ### Validation pipeline behavior ```csharp public class ValidationBehavior : IPipelineBehavior where TRequest : IRequest { private readonly IEnumerable> _validators; public ValidationBehavior(IEnumerable> validators) => _validators = validators; public async Task HandleAsync( TRequest request, RequestHandlerDelegate next, CancellationToken ct) { var failures = _validators .Select(v => v.Validate(request)) .SelectMany(r => r.Errors) .Where(f => f != null) .ToList(); if (failures.Count > 0) throw new ValidationException(failures); return await next(); } } ``` ### Batch with continuation ```csharp public async Task ProcessMonthlyReports(List months) { var jobs = months.Select(m => new GenerateReport { Month = m }).ToList(); var batchId = await _batchPublisher.StartNew(jobs, options: ContinuationOptions.OnAnyFinishedState); var summary = new List { new() }; await _batchPublisher.ContinueBatchWith(summary, batchId); await _batchPublisher.SaveChangesAsync(); // All individual reports run in parallel, then SendReportSummary fires when all finish } ``` ### Separate publisher and worker apps Publisher app (web API): ```csharp // Only creates jobs, doesn't process them builder.Services.AddDbContext(o => o.UseNpgsql(connStr)); builder.Services.AddJobly(); builder.Services.AddJobHandlers(typeof(Program).Assembly); ``` Worker app (background service): ```csharp // Only processes jobs builder.Services.AddDbContext(o => o.UseNpgsql(connStr)); builder.Services.AddJoblyWorker(options => { options.WorkerCount = 20; options.Queues = ["default"]; }); builder.Services.AddJobHandlers(typeof(SharedHandlers).Assembly); ``` Both apps share the same database. The worker polls for and executes jobs created by the publisher. --- ## Key Design Rules 1. **DbContext must be Scoped** (not Transient) — required for the outbox pattern to work 2. **Call SaveChangesAsync()** after publishing — jobs are not committed until you save 3. **Failed jobs never auto-delete** — they persist until manually handled 4. **Queue order is alphabetical** — use prefixes like "a-", "b-", "c-" for priority 5. **AddJoblyWorker includes AddJobly** — don't call both 6. **UseJoblyUI after auth middleware** — so HttpContext.User is populated 7. **TimeProvider is injectable** — use it for testability, Jobly registers TimeProvider.System by default 8. **Handler ILogger output is captured** — logs are flushed every ~1s and appear in the dashboard in real time 9. **Job metadata is inherited** — child jobs automatically inherit parent metadata via ambient context 10. **Pause is propagated via heartbeat** — there may be a ~3s delay between pause command and workers stopping 11. **Jobly tables use the `jobly` schema by default** — configurable via `JoblyConfiguration.Schema`, set to `null` for default schema 12. **Naming conventions are respected** — entity configs don't hardcode table/column names, so `UseSnakeCaseNamingConvention()` works