Home > Mobile >  Apache Flink typed KafkaSource
Apache Flink typed KafkaSource

Time:07-02

I implemented a connection to a kafka stream as described here. Now I attempt to write the data into a postgres database using a Jdbc sink.

Now the source with Kafka seems to have no type. So when writing statements for SQL it all looks like type Nothing.

How can I use fromSource that I have actually a typed source for Kafka?

What I so far tried is the following:

object Main {
  def main(args: Array[String]) {
    val builder = KafkaSource.builder
    builder.setBootstrapServers("localhost:29092")
    builder.setProperty("partition.discovery.interval.ms", "10000")
    builder.setTopics("created")
    builder.setBounded(OffsetsInitializer.latest)
    builder.setStartingOffsets(OffsetsInitializer.earliest)
    builder.setDeserializer(KafkaRecordDeserializationSchema.of(new CreatedEventSchema))
    val source = builder.build()
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val streamSource = env
      .fromSource(source, WatermarkStrategy.noWatermarks, "Kafka Source")
    streamSource.addSink(JdbcSink.sink(
        "INSERT INTO conversations (timestamp, active_conversations, total_conversations) VALUES (?,?,?)",
        (statement, event) => {
          statement.setTime(1, event.date)
          statement.setInt(1, event.a)
          statement.setInt(3, event.b)
        },JdbcExecutionOptions.builder()
          .withBatchSize(1000)
          .withBatchIntervalMs(200)
          .withMaxRetries(5)
          .build(),
        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
          .withUrl("jdbc:postgresql://localhost:5432/reporting")
          .withDriverName("org.postgresql.Driver")
          .withUsername("postgres")
          .withPassword("veryverysecret:-)")
          .build()

      ))
    env.execute()
  }
}

Which does not compile because event is of type Nothing. But I think it must not be that because with CreatedEventSchema Flink should be able to deserialise. Maybe it also important to note that actually I just want to process the values of the kafka messages.

CodePudding user response:

In Java you might do something like this:

KafkaSource<Event> source =
    KafkaSource.<Event>builder()
            .setBootstrapServers("localhost:9092")
            .setTopics(TOPIC)
            .setStartingOffsets(OffsetsInitializer.earliest())
            .setValueOnlyDeserializer(new EventDeserializationSchema())
            .build();

with a value deserializer along these lines:

public class EventDeserializationSchema extends AbstractDeserializationSchema<Event> {
    private static final long serialVersionUID = 1L;

    private transient ObjectMapper objectMapper;

    @Override
    public void open(InitializationContext context) {
        objectMapper = JsonMapper.builder().build().registerModule(new JavaTimeModule());
    }

    @Override
    public Event deserialize(byte[] message) throws IOException {
        return objectMapper.readValue(message, Event.class);
    }

    @Override
    public TypeInformation<Event> getProducedType() {
        return TypeInformation.of(Event.class);
    }
}

Sorry I don't have a Scala example handy, but hopefully this will point you in the right direction.

  • Related