The following code works great on an API.
builder.Services.AddMassTransit(x =>
{
x.AddConsumer<RelationshipCreatedConsumer>();
x.UsingAmazonSqs((context, cfg) =>
{
cfg.Host(new Uri("amazonsqs://localhost:4566"), h =>
{
h.AccessKey("your-iam-access-key");
h.SecretKey("your-iam-secret-key");
h.Config(new AmazonSimpleNotificationServiceConfig { ServiceURL = "http://host.docker.internal:4566" });
h.Config(new AmazonSQSConfig { ServiceURL = "http://host.docker.internal:4566" });
});
cfg.ReceiveEndpoint("rc_queue", e =>
{
e.UseMessageRetry(r => r.Interval(2, TimeSpan.FromMilliseconds(5000)));
e.ConfigureConsumeTopology = false;
e.Subscribe(nameof(RelationshipCreated));
e.ConfigureConsumer<RelationshipCreatedConsumer>(context);
});
cfg.Message<RelationshipCreated>(top =>
{
top.SetEntityName(nameof(RelationshipCreated));
});
});
});
In this setup, consumer is also located in the API. I want to extract it to an Lambda function. So I removed AddConsumer
and ReceiveEndpoint
calls from the code abone. Now my API is able to publish a message and create a topic when needed. Great.
Following is the code I created for my Lambda function:
public class Function
{
private ServiceProvider _provider;
/// <summary>
/// Default constructor. This constructor is used by Lambda to construct the instance. When invoked in a Lambda environment
/// the AWS credentials will come from the IAM role associated with the function and the AWS region will be set to the
/// region the Lambda function is executed in.
/// </summary>
public Function()
{
Console.WriteLine("ok");
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.AddConsumer<RelationshipCreatedConsumer>();
x.UsingAmazonSqs((context, cfg) =>
{
cfg.Host(new Uri("amazonsqs://localhost:4566"), h =>
{
h.AccessKey("your-iam-access-key");
h.SecretKey("your-iam-secret-key");
h.Config(new AmazonSQSConfig { ServiceURL = "http://host.docker.internal:4566" });
h.Config(new AmazonSimpleNotificationServiceConfig { ServiceURL = "http://host.docker.internal:4566" });
});
cfg.ReceiveEndpoint(queueName: "dummy", e =>
{
e.Subscribe(nameof(RelationshipCreated));
e.ConfigureConsumer<RelationshipCreatedConsumer>(context);
});
cfg.ConfigureEndpoints(context);
});
});
_provider = services.BuildServiceProvider(true);
}
public class RelationshipCreatedConsumer : IConsumer<RelationshipCreated>
{
public Task Consume(ConsumeContext<RelationshipCreated> context)
{
Console.WriteLine("YUS");
// TODO: write something to S3
return Task.CompletedTask;
}
}
/// <summary>
/// This method is called for every Lambda invocation. This method takes in an SNS event object and can be used
/// to respond to SNS messages.
/// </summary>
/// <param name="evnt"></param>
/// <param name="context"></param>
/// <returns></returns>
public async Task FunctionHandler(RelationshipCreated evnt, ILambdaContext context)
{
Console.WriteLine("YUS2");
var consumer = _provider.GetRequiredService<IConsumer<RelationshipCreated>>();
return;
}
}
I expected the code above to create a queue called dummy and subscribe to the topic created by the API. None of those happen and also there are no errors. I executed the lambda via http call so the constructor is executed.
What am I getting wrong here?
CodePudding user response:
There is a sample showing how to use MassTransit consumers within AWS Lambda functions (triggered by SQS). I'd suggest using it to build your consumer functions.
And, of course, MassTransit doesn't create the queue since it must exist before you can register the Lambda trigger. When used in functions (Lambda, or Azure Functions), MassTransit is not the transport, just the pipeline/dispatch engine.