Chapter 08

后台服务与 Worker

掌握 BackgroundService、Quartz.NET 定时任务、MassTransit 消息总线与 gRPC 服务开发。

IHostedService 与 BackgroundService

IHostedService
后台任务的基础接口,只有两个方法:StartAsync(应用启动时调用)和 StopAsync(应用关闭时调用)。适合需要精确控制启动/停止行为的场景。
BackgroundService
IHostedService 的抽象基类,你只需重写 ExecuteAsync(CancellationToken)。当 stoppingToken 被取消时,你的服务应当优雅退出。适合持续运行的后台循环(消息消费、心跳检测等)。

基础 BackgroundService

public class HeartbeatService(ILogger<HeartbeatService> logger) : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        logger.LogInformation("HeartbeatService 启动");

        while (!stoppingToken.IsCancellationRequested)
        {
            logger.LogInformation("心跳 {Time}", DateTimeOffset.UtcNow);

            try
            {
                await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);
            }
            catch (OperationCanceledException)
            {
                break;  // 优雅退出
            }
        }

        logger.LogInformation("HeartbeatService 停止");
    }
}

// 注册
builder.Services.AddHostedService<HeartbeatService>();

使用 Scoped 服务(在 BackgroundService 中注入 DbContext)

// BackgroundService 是 Singleton,不能直接注入 Scoped 服务
// 需要通过 IServiceScopeFactory 手动创建 Scope
public class OutboxProcessor(IServiceScopeFactory scopeFactory, ILogger<OutboxProcessor> logger)
    : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        while (!ct.IsCancellationRequested)
        {
            await using var scope = scopeFactory.CreateAsyncScope();
            var db  = scope.ServiceProvider.GetRequiredService<AppDbContext>();
            var bus = scope.ServiceProvider.GetRequiredService<IBus>();

            var messages = await db.OutboxMessages
                .Where(m => !m.Processed)
                .Take(50)
                .ToListAsync(ct);

            foreach (var msg in messages)
            {
                await bus.PublishAsync(msg.Deserialize(), ct);
                msg.Processed   = true;
                msg.ProcessedAt = DateTimeOffset.UtcNow;
            }

            await db.SaveChangesAsync(ct);
            await Task.Delay(TimeSpan.FromSeconds(5), ct);
        }
    }
}

Quartz.NET:定时任务调度

Quartz.NET 是 .NET 最完善的定时任务库,支持 Cron 表达式、持久化存储、集群调度、任务链等高级特性。

// 安装: dotnet add package Quartz.Extensions.Hosting

builder.Services.AddQuartz(q =>
{
    // 使用 DI 创建 Job 实例(支持构造器注入)
    q.UseMicrosoftDependencyInjectionJobFactory();

    var jobKey = JobKey.Create("daily-email-report");

    q.AddJob<DailyEmailReportJob>(opts => opts.WithIdentity(jobKey));
    q.AddTrigger(opts => opts
        .ForJob(jobKey)
        .WithIdentity("daily-email-trigger")
        .WithCronSchedule("0 0 8 * * ?")  // 每天 08:00
        .WithDescription("每日早 8 点发送报告邮件")
    );
});

builder.Services.AddQuartzHostedService(q => q.WaitForJobsToComplete = true);

// Job 定义
public class DailyEmailReportJob(IEmailService email, IReportService report)
    : IJob
{
    public async Task Execute(IJobExecutionContext context)
    {
        var ct       = context.CancellationToken;
        var summary  = await report.GenerateDailyAsync(DateOnly.FromDateTime(DateTime.Today), ct);
        var admins   = await email.GetAdminEmailsAsync(ct);

        foreach (var admin in admins)
            await email.SendAsync(admin, "每日数据报告", summary, ct);
    }
}

MassTransit:消息总线

MassTransit 是 .NET 最成熟的消息总线抽象库,支持 RabbitMQ、Kafka、Azure Service Bus、Amazon SQS 等多种传输后端,让业务代码与具体 MQ 解耦。

// 安装: dotnet add package MassTransit.RabbitMQ

builder.Services.AddMassTransit(x =>
{
    // 自动发现当前程序集中的所有 Consumer
    x.AddConsumers(typeof(Program).Assembly);

    x.UsingRabbitMq((ctx, cfg) =>
    {
        cfg.Host("rabbitmq://localhost", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });
        cfg.ConfigureEndpoints(ctx);  // 自动创建队列
    });
});

// 消息定义(契约)
public record OrderCreatedEvent(
    Guid        OrderId,
    string      UserId,
    decimal     Total,
    DateTimeOffset CreatedAt
);

// 发布消息
public class OrderService(IBus bus)
{
    public async Task<Order> CreateAsync(CreateOrderRequest req, CancellationToken ct)
    {
        var order = /* 保存到数据库 */;

        await bus.Publish(new OrderCreatedEvent(
            order.Id, req.UserId, order.Total, order.CreatedAt), ct);

        return order;
    }
}

// 消费消息
public class OrderCreatedConsumer(IEmailService email) : IConsumer<OrderCreatedEvent>
{
    public async Task Consume(ConsumeContext<OrderCreatedEvent> context)
    {
        var evt = context.Message;
        await email.SendOrderConfirmationAsync(evt.UserId, evt.OrderId);
    }
}

gRPC 服务

gRPC 使用 Protobuf 序列化和 HTTP/2 传输,比 JSON/HTTP 1.1 快 5-10 倍,非常适合内部微服务通信。

# proto 文件:定义服务契约
# Protos/greeter.proto
syntax = "proto3";
option csharp_namespace = "MyApp.Grpc";

package greeter;

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply);
  // 服务端流式 RPC
  rpc StreamNumbers (NumberRequest) returns (stream NumberReply);
}

message HelloRequest { string name = 1; }
message HelloReply   { string message = 1; }
message NumberRequest { int32 count = 1; }
message NumberReply   { int32 value = 1; }
// 服务端实现
public class GreeterService : Greeter.GreeterBase
{
    public override Task<HelloReply> SayHello(HelloRequest req, ServerCallContext ctx)
        => Task.FromResult(new HelloReply { Message = $"Hello, {req.Name}!" });

    public override async Task StreamNumbers(
        NumberRequest req,
        IServerStreamWriter<NumberReply> stream,
        ServerCallContext ctx)
    {
        for (int i = 0; i < req.Count; i++)
        {
            await stream.WriteAsync(new NumberReply { Value = i });
            await Task.Delay(100, ctx.CancellationToken);
        }
    }
}

// 注册
builder.Services.AddGrpc();
app.MapGrpcService<GreeterService>();

实战:定时发送报告的 Worker Service

// 独立 Worker 项目(dotnet new worker)
// Worker.cs
public class ReportWorker(
    IServiceScopeFactory scopeFactory,
    IOptions<WorkerOptions> options,
    ILogger<ReportWorker> logger) : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        // 等到下一个整点再开始
        var now = DateTime.UtcNow;
        var nextHour = now.AddHours(1).AddMinutes(-now.Minute).AddSeconds(-now.Second);
        await Task.Delay(nextHour - now, ct);

        using var timer = new PeriodicTimer(TimeSpan.FromHours(1));

        do
        {
            try
            {
                await using var scope = scopeFactory.CreateAsyncScope();
                var reportSvc = scope.ServiceProvider.GetRequiredService<IReportService>();
                var emailSvc  = scope.ServiceProvider.GetRequiredService<IEmailService>();

                logger.LogInformation("生成小时报告 {Time}", DateTime.UtcNow);
                var report = await reportSvc.GenerateHourlyAsync(ct);
                await emailSvc.SendToTeamAsync("小时报告", report, ct);
            }
            catch (Exception ex) when (ex is not OperationCanceledException)
            {
                logger.LogError(ex, "生成报告失败");
            }
        }
        while (await timer.WaitForNextTickAsync(ct));
    }
}
Tip — PeriodicTimer .NET 6 引入的 PeriodicTimerTask.Delay 循环更精准——它不会因任务执行时间而产生漂移(比如任务耗时 3 秒,下次触发仍在整点而不是整点 +3 秒)。优先使用 PeriodicTimer 替代手写的 delay 循环。
本章小结 BackgroundService 是 .NET 后台任务的标准基类,通过 IServiceScopeFactory 可以在 Singleton 的 Worker 中安全使用 Scoped 服务(DbContext 等)。Quartz.NET 提供企业级定时调度能力,支持持久化和集群。MassTransit 让消息队列的生产消费变得像 DI 一样简单,支持多种 MQ 后端切换。gRPC 是微服务内部通信的性能首选。