Currently in the middle of porting a project to .Net 5, I have been struggling with MassTransit and the Quartz scheduler configuration.
Non-scheduled messages are working fine but scheduled ones are not.
I am able to see the scheduled jobs in the Quartz DB but I get the following error when a job is automatically triggered:
Quartz.SchedulerException: Problem instantiating class 'MassTransit.QuartzIntegration.ScheduledMessageJob: Cannot instantiate type which has no empty constructor (Parameter 'ScheduledMessageJob')'
In this project there are
- a web application, supposed to send messages and scheduled messages.
- a windows service (BackgroundService), supposed to consume messages and also send messages (non scheduled ones).
The website
In startup.cs I have:
NameValueCollection properties = new();
properties.Add("quartz.scheduler.instanceName", "QuartzSchedulerInstance1");
properties.Add("quartz.scheduler.instanceId", "Quartz_instance_1");
properties.Add("quartz.threadPool.type", "Quartz.Simpl.SimpleThreadPool, Quartz");
properties.Add("quartz.threadPool.threadCount", "10");
properties.Add("quartz.threadPool.threadPriority", "2");
properties.Add("quartz.jobStore.misfireThreshold", "60000");
properties.Add("quartz.jobStore.type", "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz");
properties.Add("quartz.jobStore.useProperties", "true");
properties.Add("quartz.jobStore.dataSource", "default");
properties.Add("quartz.jobStore.tablePrefix", "QRTZ_");
properties.Add("quartz.jobStore.lockHandler.type", "Quartz.Impl.AdoJobStore.UpdateLockRowSemaphore, Quartz");
properties.Add("quartz.dataSource.default.connectionString", "Server=(local);Database=Quartz;Trusted_Connection=True;");
properties.Add("quartz.dataSource.default.provider", "SqlServer");
properties.Add("quartz.serializer.type", "json");
StdSchedulerFactory factory = new StdSchedulerFactory(properties);
var scheduler = factory.GetScheduler().Result;
services.AddSingleton(scheduler);
services.AddMassTransit(x =>
{
x.AddMessageScheduler(busSettings.QuartzQueueAddress);
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(busSettings.RabbitMQAddress, host =>
{
host.Username(busSettings.RabbitMQLogin);
host.Password(busSettings.RabbitMQPassword);
});
cfg.UseMessageScheduler(busSettings.QuartzQueueAddress);
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
A non-scheduled message is sent this way:
var message = new BuildReminderDeploySetRequest(Guid.NewGuid(), ... other parameters...);
var target = await _bus.GetSendEndpoint(_busSettings.ReminderRequestQueueAddress);
await target.Send(message);
While a scheduled one is sent like that:
var message = new BuildReminderDeploySetRequest(Guid.NewGuid(), ... other parameters...);
await _scheduler.ScheduleSend(_busSettings.ReminderRequestQueueAddress, reminderDate, message);
Respectively, the following were injected through DI: IBusControl _bus for non scheduled messages IMessageScheduler _scheduler for scheduled messages.
The Windows service
In Program.cs I have:
services.AddTransient<ProcessBuildReminderDeploySetConsumer>();
NameValueCollection properties = new();
properties.Add("quartz.scheduler.instanceName", "QuartzSchedulerInstance1");
properties.Add("quartz.scheduler.instanceId", "Quartz_instance_1");
properties.Add("quartz.threadPool.type", "Quartz.Simpl.SimpleThreadPool, Quartz");
properties.Add("quartz.threadPool.threadCount", "10");
properties.Add("quartz.threadPool.threadPriority", "2");
properties.Add("quartz.jobStore.misfireThreshold", "60000");
properties.Add("quartz.jobStore.type", "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz");
properties.Add("quartz.jobStore.useProperties", "true");
properties.Add("quartz.jobStore.dataSource", "default");
properties.Add("quartz.jobStore.tablePrefix", "QRTZ_");
properties.Add("quartz.jobStore.lockHandler.type", "Quartz.Impl.AdoJobStore.UpdateLockRowSemaphore, Quartz");
properties.Add("quartz.dataSource.default.connectionString", "Server=(local);Database=Quartz;Trusted_Connection=True;");
properties.Add("quartz.dataSource.default.provider", "SqlServer");
properties.Add("quartz.serializer.type", "json");
StdSchedulerFactory factory = new StdSchedulerFactory(properties);
var scheduler = factory.GetScheduler().Result;
scheduler.Start();
services.AddSingleton(scheduler);
services.AddMassTransit(conf =>
{
conf.AddMessageScheduler(busSettings.QuartzQueueAddress);
conf.UsingRabbitMq((context, cfg) =>
{
cfg.Host(busSettings.RabbitMQAddress, host =>
{
host.Username(busSettings.RabbitMQLogin);
host.Password(busSettings.RabbitMQPassword);
});
cfg.UseMessageScheduler(busSettings.QuartzQueueAddress);
cfg.ReceiveEndpoint(busSettings.QuartzQueueName, epc =>
{
var partitioner = cfg.CreatePartitioner(16);
epc.Consumer(() =>
new ScheduleMessageConsumer(scheduler),
x => x.Message<ScheduleMessage>(
m => m.UsePartitioner(
partitioner,
p => p.Message.CorrelationId)
)
);
epc.Consumer(
() => new CancelScheduledMessageConsumer(scheduler),
x => x.Message<CancelScheduledMessage>(
m => m.UsePartitioner(partitioner, p => p.Message.TokenId)
)
);
cfg.UseMessageScheduler(epc.InputAddress);
cfg.Durable = true;
// THIS USED TO BE IN THE OLD .NET 4.6 PROJECT
// BUT THOSE CLASSES DO NOT EXIST ANYMORE
//var specification =
//new SchedulerBusFactorySpecification(scheduler, epc.InputAddress);
//cfg.AddBusFactorySpecification(specification);
});
cfg.ReceiveEndpoint(busSettings.ReminderRequestQueueName, e =>
{
// Enforce loading only 1 message at a time for now
e.PrefetchCount = 1;
e.UseMessageRetry(r =>
{
r.Interval(retryCount: 5, interval: TimeSpan.FromMilliseconds(500));
});
e.Consumer<ProcessBuildReminderDeploySetConsumer>(context);
});
cfg.UseMessageScheduler(busSettings.QuartzQueueAddress);
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
It has no issue consuming the messages sent directly to the queues. About scheduled messages, since they are not triggered properly by Quartz, it cannot consume them.
I'm sure there might be a few configuration issues that led to this error, but between the various ways of configuring changes between versions I can't figure this out.
CodePudding user response:
With current versions of Quartz, there are extension methods for configuring the service collection. Those methods add ISchedulerFactory
to the container.
Then, in the process you want to host quartz, you can configure it in MassTransit using the non-obvious extension method in your configuration:
services.AddMassTransit(conf =>
{
conf.AddMessageScheduler(busSettings.QuartzQueueAddress);
conf.UsingRabbitMq((context, cfg) =>
{
cfg.Host(busSettings.RabbitMQAddress, host =>
{
host.Username(busSettings.RabbitMQLogin);
host.Password(busSettings.RabbitMQPassword);
});
cfg.UseInMemoryScheduler(context, busSettings.QuartzQueueName)
This will use the Quartz configured scheduler factory (which you'd configure to not be in-memory, obviously. It also configures a bus observer to start/stop quartz automatically as the bus starts/stops.