I implemented a connection to a kafka stream as described here. Now I attempt to write the data into a postgres database using a Jdbc sink.
Now the source with Kafka seems to have no type. So when writing statements for SQL it all looks like type Nothing
.
How can I use fromSource
that I have actually a typed source for Kafka?
What I so far tried is the following:
object Main {
def main(args: Array[String]) {
val builder = KafkaSource.builder
builder.setBootstrapServers("localhost:29092")
builder.setProperty("partition.discovery.interval.ms", "10000")
builder.setTopics("created")
builder.setBounded(OffsetsInitializer.latest)
builder.setStartingOffsets(OffsetsInitializer.earliest)
builder.setDeserializer(KafkaRecordDeserializationSchema.of(new CreatedEventSchema))
val source = builder.build()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val streamSource = env
.fromSource(source, WatermarkStrategy.noWatermarks, "Kafka Source")
streamSource.addSink(JdbcSink.sink(
"INSERT INTO conversations (timestamp, active_conversations, total_conversations) VALUES (?,?,?)",
(statement, event) => {
statement.setTime(1, event.date)
statement.setInt(1, event.a)
statement.setInt(3, event.b)
},JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql://localhost:5432/reporting")
.withDriverName("org.postgresql.Driver")
.withUsername("postgres")
.withPassword("veryverysecret:-)")
.build()
))
env.execute()
}
}
Which does not compile because event
is of type Nothing. But I think it must not be that because with CreatedEventSchema Flink should be able to deserialise.
Maybe it also important to note that actually I just want to process the values of the kafka messages.
CodePudding user response:
In Java you might do something like this:
KafkaSource<Event> source =
KafkaSource.<Event>builder()
.setBootstrapServers("localhost:9092")
.setTopics(TOPIC)
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new EventDeserializationSchema())
.build();
with a value deserializer along these lines:
public class EventDeserializationSchema extends AbstractDeserializationSchema<Event> {
private static final long serialVersionUID = 1L;
private transient ObjectMapper objectMapper;
@Override
public void open(InitializationContext context) {
objectMapper = JsonMapper.builder().build().registerModule(new JavaTimeModule());
}
@Override
public Event deserialize(byte[] message) throws IOException {
return objectMapper.readValue(message, Event.class);
}
@Override
public TypeInformation<Event> getProducedType() {
return TypeInformation.of(Event.class);
}
}
Sorry I don't have a Scala example handy, but hopefully this will point you in the right direction.