I have a Timer Triggered function that is sending objects to a service bus Topic like this
[FunctionName("ProcessTermedEmployee")]
public static async Task Run(
[TimerTrigger("%TimerInterval%"
#if DEBUG
, RunOnStartup=true
#endif
)] TimerInfo myTimer,
[ServiceBus("%TermedEmployeeTopicName%", Connection = "ServiceBusConnectionString")] IAsyncCollector<string> termedEmployeeCollector,
ILogger log)
{
log.LogInformation("ProcessTermedEmployee Timer function invoked.");
try
{
// var termedEmployees = new List<TermedEmployee>();
using (SqlConnection conn = new SqlConnection(Environment.GetEnvironmentVariable("HRSQLConNString")))
{
conn.Open();
SqlCommand cmd = new SqlCommand("hp_Cdi_Get_Termed_Employees_Recent", conn);
cmd.CommandType = CommandType.StoredProcedure;
cmd.Parameters.Add(new SqlParameter("@TermedAfterDate", DateTime.Now.AddDays(-10)));
using (SqlDataReader rdr = cmd.ExecuteReader())
{
try
{
while (rdr.Read())
{
TermedEmployee termedEmployee = ConvertTermedEmployee(rdr);
// termedEmployees.Add(termedEmployee);
await
termedEmployeeCollector.AddAsync(JsonConvert.SerializeObject(termedEmployee));
}
}
catch (Exception ex)
{
ex.Data.Add("row", rdr);
log.LogCritical(ex, ex.Message);
throw ex;
}
}
}
log.LogInformation("ProcessTermedEmployee Timer function finished.");
}
catch (Exception ex)
{
log.LogCritical(ex, ex.Message);
throw ex;
}
}
then i have a service bus trigger function that is suppose to receive the message and adds it to CosmosDB
public static class LogTermedEmployee
{
[FunctionName("LogTermedEmployee")]
public static async Task Run([ServiceBusTrigger("%TermedEmployeeTopicName%", "%ServiceBusSubscriptionName%", Connection = "ServiceBusConnectionString")] BrokeredMessage message,
[CosmosDB(
databaseName: "%CosmosDbName%",
collectionName: "%TermedEmployeesLogCollection%",
ConnectionStringSetting = "CosmosConnection")]
IAsyncCollector<TermedEmployeeLog> termedEmployeesLogCollection,
ILogger log)
{
log.LogInformation($"C# ServiceBus topic trigger function LogTermedEmployee,
processed at {DateTime.Now}");
try
{
if(message.GetBody<TermedEmployee>() != null)
{
//StreamReader reader = new StreamReader(message.Body);
// string s = System.Text.Encoding.Default.GetString(message.Body);
var termedEmp = JsonConvert.DeserializeObject<TermedEmployee>
(message.GetBody<string>());
await termedEmployeesLogCollection.AddAsync(new TermedEmployeeLog() {
TermedEmployee = termedEmp, DateOfProcess = DateTime.Now, Id =
Guid.NewGuid(), PartitionKey = "TermedEmployee", ModifiedOn =
DateTime.Now, ModifiedBy = "TermedEployeeLogService", MessageId = ""
});// message.MessageId });
}
}
catch (Exception ex)
{
log.LogCritical(ex, ex.Message);
throw ex;
}
}
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.6.0" />
<PackageReference Include="AzureFunctions.Extensions.DependencyInjection" Version="1.1.3" />
<PackageReference Include="CDI.Utilities.LogHelpers" Version="0.1.3" />
<PackageReference Include="Microsoft.AspNetCore.Mvc.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Azure.Functions.Extensions" Version="1.1.0" />
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="5.2.0" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions" Version="4.0.1" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.CosmosDB" Version="3.0.10" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Http" Version="3.0.12" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.ServiceBus" Version="5.2.0" />
<PackageReference Include="Microsoft.Azure.WebJobs.Script.ExtensionsMetadataGenerator" Version="4.0.1" />
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="4.0.1" />
<PackageReference Include="System.Data.SqlClient" Version="4.8.3" />
<PackageReference Include="WindowsAzure.ServiceBus" Version="6.2.2" />
Problem
in the ServiceBus trigger's Signature
if I use BrokeredMessage message message.GetBody() is null
if I use Obsoleted Message object then Message.Body is null
but if i use simple string i get the proper Json.
any suggestion please?
CodePudding user response:
I'd suggest changing BrokeredMessage
to ServiceBusReceivedMessage
in in your trigger:
public static async Task Run(
[ServiceBusTrigger(
"%TermedEmployeeTopicName%",
"%ServiceBusSubscriptionName%",
Connection = "ServiceBusConnectionString")]
ServiceBusReceivedMessage message,
[CosmosDB(
databaseName: "%CosmosDbName%",
collectionName: "%TermedEmployeesLogCollection%",
ConnectionStringSetting = "CosmosConnection")]
IAsyncCollector<TermedEmployeeLog> termedEmployeesLogCollection,
ILogger log)
I believe that will resolve the binding issue.
Additional Context
Starting in v5.0.0, the Microsoft.Azure.WebJobs.Extensions.ServiceBus
package began using Azure.Messaging.ServiceBus
internally. There is no BrokeredMessage
type in the new package. Incoming messages are typed as ServiceBusReceivedMessage
, outgoing messages as ServiceBusMessage
.
More information can be found in the Microsoft.Azure.WebJobs.Extensions.ServiceBus docs.