Home > Mobile >  Why is the data streamed using the Java client appears as encoded on Apache Pulsar?
Why is the data streamed using the Java client appears as encoded on Apache Pulsar?

Time:10-05

I'm trying to stream some data from a Java app into an Apache-Pulsar cluster. The issue that I'm facing is that the data appears to be encoded. .e.g. "\u0000\u0000\u0000\u0004\u0018l@\u0000\u0000\u0000�@�\u000fV�\u0001\u0000\u00006B\u0000\u0000�@\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000�\t���AUU�A\u0002(2021-10-04T14:00:00Z\u0002H88c8dc24-233c-45f5-b366-85382d7d52c6\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0001\u0000\u0000\u0000\u0000"
just the parameters that I send as strings are correct apparently.

My producer code is build this way:

PulsarClient client = PulsarClient.builder()
                .serviceUrl(service_url)
                .tlsTrustCertsFilePath("/etc/ssl/certs/ca-certificates.crt")
                .authentication(
                        AuthenticationFactory.token(token)
                )
                .build();

Producer<DavisMessage> producer = client.newProducer(Schema.AVRO(DavisMessage.class))
                .topic(topic)
                .create();

Timestamp timestamp = new Timestamp(rec.getTimestamp().getTime());
final String formattedtimestamp = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'")
                .format(timestamp);

float lon = Float.parseFloat((prop.getProperty("sensor.longitude")));
float lat = Float.parseFloat((prop.getProperty("sensor.latitude")));
float alt = Float.parseFloat((prop.getProperty("sensor.altitude")));


log.fine("Sending message to Pulsar.");
producer.newMessage().value(DavisMessage.builder()
                .uuid(prop.getProperty("sensor.uuid"))
                .latitude(lat)
                .longitude(lon)
                .altitude(alt)
                .ts(formattedtimestamp)
                .temp_out((float) rec.getOutsideTemperature())
                .temp_in((float) rec.getInsideTemperature())
                .hum_out((short) rec.getOutsideHumidity())
                .hum_in((short) rec.getInsideHumidity())
                .barometer(rec.getBarometer())
                .rain((float) rec.getRainFall())
                .rain_rate((float) rec.getRainRateHigh())
                .wind_avg((float) rec.getWindSpeedAvg())
                .wind_dir((short) rec.getWindDirection())
                .wind_high((float) rec.getWindSpeedHigh())
                .solar((short) rec.getSolarRadiation())
                .uv((float) rec.getUvIndex())
                .build()).send();

CodePudding user response:

It is encoded with Avro serialization as requested:

Schema.AVRO(DavisMessage.class)

You can specify the same schema on the consumer to have it automatically deserialized for you.

If you want to use human readable payload, you can use Schema.JSON(DavisMessage.class) instead.

  • Related