Home > Back-end >  Convert timestamp fields to ISO 8601 when push data to topic
Convert timestamp fields to ISO 8601 when push data to topic

Time:12-07

I have a stream that pulls data from a table in postgres with the following definition:

CREATE TABLE "user" (
  "_uid" UUID NOT NULL DEFAULT gen_random_uuid() PRIMARY KEY,
  "_created" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
  "_updated" TIMESTAMP(3) NULL,
  "_disabled" TIMESTAMP(3) NULL,
  "display_name" VARCHAR(100) NOT NULL,
  "email" VARCHAR(100) NOT NULL UNIQUE,
  "password" TEXT NOT NULL
);

And inside ksqldb I created a SOURCE CONNECTOR like this:

CREATE SOURCE CONNECTOR "source-postgres-api_auth" WITH (
  "connector.class"='io.confluent.connect.jdbc.JdbcSourceConnector',
  "connection.url"='jdbc:postgresql://postgres:5432/api_auth',
  "connection.user"='postgres',
  "connection.password"='postgres',
  "mode"='bulk',
  "topic.prefix"='source-postgres-api_auth-',
  "table.blacklist"='_changelog, _changelog_lock'
);

So that I can detect changes and also generate a history I have a STREAM like this:

CREATE STREAM "stream-api_auth-user" (
  "_uid" STRING,
  "_created" TIMESTAMP,
  "_updated" TIMESTAMP,
  "_disabled" TIMESTAMP,
  "display_name" STRING,
  "email" STRING,
  "password" STRING
) WITH (
  KAFKA_TOPIC = 'source-postgres-api_auth-user',
  VALUE_FORMAT = 'AVRO'
);

And I created a TABLE from this STREAM:

CREATE TABLE "table-api_auth-user" WITH (
  KAFKA_TOPIC = 'table-api_auth-user',
  VALUE_FORMAT = 'AVRO'
) AS SELECT 
  "_uid",
  LATEST_BY_OFFSET("_created") AS "_created",
  LATEST_BY_OFFSET("_updated") AS "_updated",
  LATEST_BY_OFFSET("_disabled") AS "_disabled",
  LATEST_BY_OFFSET("display_name") AS "display_name",
  LATEST_BY_OFFSET("email") AS "email",
  LATEST_BY_OFFSET("password") AS "password"
FROM "stream-api_auth-user"
GROUP BY "_uid"
EMIT CHANGES;

Lastly, I have a SYNC to elasticsearch like this:

CREATE SINK CONNECTOR "sync-elasticsearch-user" WITH (
  'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'connection.url' = 'http://elasticsearch:9200',
  'type.name' = 'kafka-connect',
  'topics' = 'table-api_auth-user'
);

My problem is that when I look in elasticsearch, the fields that are of TIMESTAMP type are coming in as numbers and I realized that the topic data that the TABLE is using that is being converted to numbers and not to ISO 8601:

ksql> print "table-api_auth-user";
Key format: HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2022/12/01 21:13:36.844 Z, key: [a2d9ff97-2c95-4da0-98e0-5492@7293921773168638261/-], value: {"_created":1669926069726,"_updated":null,"_disabled":null,"display_name":"Super User","email":"[email protected]","password":"4072d7365233d8ede7ca8548543222dfb96b17780aa8d6ff93ab69c0985ef21fc8105d03590a61b9"}, partition: 0
rowtime: 2022/12/01 21:13:36.847 Z, key: [b60448d2-e518-4479-9aff-2734@3631370472181359666/-], value: {"_created":1669916433173,"_updated":1669916803008,"_disabled":1669916803008,"display_name":"Cremin 7a8c281c4bed","email":"[email protected]","password":"e89af05eae87f0667eba762fdd382ce942bb76b796b8fe20d9e71f142bac9f7a6fbbfc6b51d4527e"}, partition: 0

Is there anything I can do so that when the table sends the data to the topic, these fields of type timestamp are converted to ISO 8601?

Can someone help me please?

CodePudding user response:

You can convert the fields from the Elasticsearch side via ingest pipeline:

https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html

I read there is no option to specify ingest pipeline on the sink connector:

https://github.com/confluentinc/kafka-connect-elasticsearch/issues/72

So you have to create an index template that will capture the name of the index, and apply the pipeline.

Step 1: Create the ingest pipeline

I will use the date processor to convert your format (UNIX_MS) to ISO8601

https://www.elastic.co/guide/en/elasticsearch/reference/current/date-processor.html

PUT _ingest/pipeline/parsedate
{
  "processors": [
    {
      "date": {
        "field": "date",
        "formats": [
          "UNIX_MS"
        ],
        "target_field": "date_converted",
        "ignore_failure": true
      }
    }
  ]
}

Test output (date field vs date_converted:

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_id": "_id",
        "_version": "-3",
        "_source": {
          "date": 1669916803008,
          "date_converted": "2022-12-01T17:46:43.008Z"
        },
        "_ingest": {
          "timestamp": "2022-12-02T07:54:02.731666786Z"
        }
      }
    }
  ]
}

Step 2: Create the index template

Assuming your index name is table-api_auth-user*

PUT _index_template/template_1
{
  "index_patterns": ["table-api_auth-user*"],
  "template": {
    "settings": {
      "index.default_pipeline": "parsedate"
    }
  }
}

From now, everytime you send a document to this index: table-api_auth-user* , will apply the ingest pipeline you set at the beginning.

  • Related