Home > Software design >  Pyspark streaming from Kafka to Hudi
Pyspark streaming from Kafka to Hudi

Time:10-01

I'm new using hudi and I have a problem. I'm working with an EMR in AWS with pyspark, Kafka and what I want to do is to read a topic from the Kafka cluster with pyspark streaming and then move it to S3 in hudi format. To be honest I've tried a lot since a few weeks ago and I don't know if it is not possible. Can someone tell help me, please? The code i'm working with is:

    #Reading
    df_T = spark.readStream \
        .format("kafka") \
        .options(**options_read) \
        .option("subscribe", topic) \
        .load() 

....

    hudi_options = {
        'hoodie.table.name': MyTable,
        'hoodie.datasource.write.table.name': MyTable,
        'hoodie.datasource.write.recordkey.field': MyKeyInTable,
        'hoodie.datasource.write.partitionpath.field': MyPartitionKey,
        'hoodie.datasource.write.hive_style_partitioning': "true",
        'hoodie.datasource.write.row.writer.enable': "false",
        'hoodie.datasource.write.operation': 'bulk_insert',
        'hoodie.datasource.write.precombine.field': MyTimeStamp,
        'hoodie.insert.shuffle.parallelism': 1,
        'hoodie.consistency.check.enabled': "true",
        'hoodie.cleaner.policy': "KEEP_LATEST_COMMITS",
        'hoodie.datasource.write.storage.type': 'MERGE_ON_READ',
        'hoodie.compact.inline': "false",
        'hoodie.datasource.hive_sync.table': MyTable,
        'hoodie.datasource.hive_sync.partition_fields': MyPartitionKey,
        'hoodie.datasource.hive_sync.database' : Mydatabase,
        'hoodie.datasource.hive_sync.auto_create_database': "true",
        'hoodie.datasource.write.keygenerator.class': "org.apache.hudi.keygen.ComplexKeyGenerator",
        'hoodie.datasource.hive_sync.partition_extractor_class': "org.apache.hudi.hive.MultiPartKeysValueExtractor",
        'hoodie.datasource.hive_sync.enable': 'true',
        'hoodie.datasource.hive_sync.skip_ro_suffix': 'true'
    }

....

    ds = df_T \
        .writeStream \
        .outputMode('append') \
        .format("org.apache.hudi") \
        .options(**hudi_options)\
        .option('checkpointLocation', MyCheckpointLocation) \
        .start(MyPathLocation) \
        .awaitTermination(300)

....

This code in the EMR says that works fine, but when i'm going to look for the hudi files it does not create any. I know that the kafka configuration works, because when in the output mode I set 'console' it works fine, can someone help me?

CodePudding user response:

Hello guys I could fix this error, first of all you have to clean the dataframe, not everything, but at least all the fields that the Primary keys you have in the tables are null. As a second point, in the hoodie.datasource.write.precombine.field you can set the

...

import datetime

currentDate = datetime.datetime.now() 

#As for example:

    hudi_options = {
...
        'hoodie.datasource.write.precombine.field': currentDate,
...
    }

Finally, if you don't have a timestamp in your dataframe, you can set this:

.withColumn('Loaded_Date', F.lit(currentDate).cast('timestamp')) 

  • Related