Home > Blockchain >  How to create a kinesis firehose delivery stream with dynamic partitions enabled using python cdk?
How to create a kinesis firehose delivery stream with dynamic partitions enabled using python cdk?

Time:06-10

I am trying to create a firehose delivery stream with dynamic partitions enabled. Below is what I have got so far.

analytics_delivery_stream = kinesisfirehose.CfnDeliveryStream(
    self, "AnalyticsDeliveryStream",
    delivery_stream_name='analytics',
    extended_s3_destination_configuration=kinesisfirehose.CfnDeliveryStream.ExtendedS3DestinationConfigurationProperty(
        bucket_arn=f'arn:aws:s3:::{analytic_bucket_name}',
        buffering_hints=kinesisfirehose.CfnDeliveryStream.BufferingHintsProperty(
            interval_in_seconds=60
        ),
        dynamic_partitioning_configuration = kinesisfirehose.CfnDeliveryStream.DynamicPartitioningConfigurationProperty(
        enabled=True,
        retry_options=kinesisfirehose.CfnDeliveryStream.RetryOptionsProperty(
            duration_in_seconds=123
        )),
        compression_format="UNCOMPRESSED",
        role_arn=firehose_role.role_arn,
        prefix="!{partitionKeyFromQuery:log_type}/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/",
        error_output_prefix="errors/!{firehose:error-output-type}/!{timestamp:yyyy}/anyMonth/!{timestamp:dd}/",
    )
)

When I run this, I get below error .

Processing Configuration is not enabled when DataPartitioning is enabled. 

I found below references to Processing Configuration in the docs.

processing_configuration=kinesisfirehose.CfnDeliveryStream.ProcessingConfigurationProperty(
    enabled=False,
    processors=[kinesisfirehose.CfnDeliveryStream.ProcessorProperty(
        type="type",

        # the properties below are optional
        parameters=[kinesisfirehose.CfnDeliveryStream.ProcessorParameterProperty(
            parameter_name="parameterName",
            parameter_value="parameterValue"
        )]
    )]
),

I am not sure what values to put for parameters or type inside processing_configuration.

I have logs being put into firehose with below structure.

type A - {'log_type':'type_A_log',....other props....}

type B - {'log_type':'type_B_log',....other props....}

Using dynamic partitioning, I want to achieve the scenario where all logs of type A go into type_A_log directory inside s3 and type B log into type_B_log directory.

Can someone please help here ? I am going down a rabbithole.

CodePudding user response:

I am not sure what values to put for parameters or type inside processing_configuration.

Here's the documentation for CfnDeliveryStream.ProcessorProperty: https://docs.aws.amazon.com/cdk/api/v1/python/aws_cdk.aws_kinesisfirehose/CfnDeliveryStream.html#processorproperty

There's only 1 valid value for type: Lambda.

Here's the documentation for CfnDeliveryStream.ProcessorParameterProperty: https://docs.aws.amazon.com/cdk/api/v1/python/aws_cdk.aws_kinesisfirehose/CfnDeliveryStream.html#aws_cdk.aws_kinesisfirehose.CfnDeliveryStream.ProcessorParameterProperty

The parameters list describes the configuration for the Lambda function that is doing the data partitioning.

Here's the documentation for the valid values for parameter_name: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-kinesisfirehose-deliverystream-processorparameter.html

At the very minimum, you'll need to provide the LambdaArn parameter.

Example:

kinesisfirehose.CfnDeliveryStream.ProcessorProperty(
    type="Lambda",
    parameters=[
        kinesisfirehose.CfnDeliveryStream.ProcessorParameterProperty(
            parameter_name="LambdaArn",
            parameter_value="<lambda_arn_value>"
        ),
        ...
    ]
)
  • Related