Home > database >  Using multiple consumer in rdkafka laravel
Using multiple consumer in rdkafka laravel

Time:09-26

I have used Rdkafka in laravel to insert record to mongodb. First i'm using 2 broker and 1 consumer and i have result 121000 record per minutes. Want more consumer to consumer message from producer, by duplicate consumer in code and its seems no work. How can i add more consumer to handle more request insert?.

Here is my code consumer.

    <?php
    
    namespace App\Console\Commands;        
    use App\Core\BillLadingReport\BillLadingReport;
    use Illuminate\Console\Command;
    use RdKafka\Conf;
    use RdKafka\Consumer;
    
    class SyncBillLading extends Command
    {
        /**
         * The name and signature of the console command.
         *
         * @var string
         */
        protected $signature = 'sync:bill';
    
        /**
         * The console command description.
         *
         * @var string
         */
        protected $description = 'Command description';
    
        /**
         * Create a new command instance.
         *
         * @return void
         */
        public function __construct()
        {
            parent::__construct();
        }
    
        /**
         * Execute the console command.
         *
         * @return int
         */
        public function handle()
        {
            $queue = env('KAFKA_QUEUE');
            $conf = new \RdKafka\Conf();
            $conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
                switch ($err) {
                    case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                        echo "Assign: ";
                        $kafka->assign($partitions);
                        break;
    
                    case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                        echo "Revoke: ";
                        $kafka->assign(NULL);
                        break;
    
                    default:
                        throw new Exception($err);
                }
            });
            $conf->set('group.id', env('KAFKA_CONSUMER_GROUP_ID', 'laravel_queue'));
            $conf->set('metadata.broker.list', env('KAFKA_BROKERS', 'localhost:9092'));
            $conf->set('auto.offset.reset', 'largest');
            $conf->set('enable.auto.commit', 'true');
            $conf->set('auto.commit.interval.ms', '101');
    
            $consumer = new \RdKafka\KafkaConsumer($conf);
            $consumer->subscribe([$queue]);
            
            $consumer2 = new \RdKafka\KafkaConsumer($conf);
            $consumer2->subscribe([$queue]);
    
            while (true) {
                $message = $consumer->consume(120 * 1000);
                $message2 = $consumer2->consume(120 * 1000);
                switch ($message->err) {
                    case RD_KAFKA_RESP_ERR_NO_ERROR:
                        $this->handleDataBillLading($message);
                        $this->handleDataBillLading($message2);
                        break;
                    case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                        $this->line("'[' . date('H:i:s') . \"][partition {$message->partition}] No more messages; will wait for more [key: '{$message->key}' offset: {$message->offset}]\n");
                        break;
                    case RD_KAFKA_RESP_ERR__TIMED_OUT:
                        $this->line('[' . date('H:i:s') . "] Timed out \n");
                        break;
                    default:
                        throw new \Exception($message->errstr(), $message->err);
                        break;
                }
            }
        }
    
        public function handleDataBillLading($data)
        {
            BillLadingReport::insert(json_decode($data->payload, true));
    //        foreach ($data as $bill) {
    //            dump($bill);
    //            BillLadingReport::insert($bill);
    //        }
        }
    }

And my code producer

<?php

namespace App\Console\Commands;

use App\Components\MessageBroker\Services\KafkaService;
use App\Core\BillLading\BillLading;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;

class SyncBillLadingToMongoDbReport extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'sync:bill-mongo-report';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'sync bill to mongodb report';

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle()
    {
        DB::table('bill_lading')->orderBy('id', 'desc')->chunk(20, function ($bills) {
            $data = [];
            foreach ($bills as $bill) {
                $data[] = [
                    'id' => $bill->id,
                    'send_data' => $bill->send_data ?? '',
                ];
                $kafkaService = new KafkaService($data);
                $kafkaService->handle();
            }

        });
        $this->info('DONE');
    }

And my docker-compose

version: "3.7"
services:
  app:
    build:
      args:
        user: www
        uid: 1000
      context: ./
      dockerfile: Dockerfile
    image: laravel-image
    container_name: shipping-report-app
    restart: unless-stopped
    working_dir: /var/www/
    privileged: true
    volumes:
      - ./:/var/www
      - ./php/local.ini:/usr/local/etc/php/conf.d/local.ini
      - ./docker/php/php.ini:/usr/local/etc/php/conf.d/php.ini
    networks:
      - app
    extra_hosts:
      - "host.docker.internal:host-gateway"

  zookeeper:
    image: confluentinc/cp-zookeeper
    hostname: zookeeper
    extra_hosts:
      - "moby:127.0.0.1"
    ports:
      - "12181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 12181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - app

  broker:
    image: confluentinc/cp-kafka
    hostname: broker
    extra_hosts:
      - "moby:127.0.0.1"
    depends_on:
      - zookeeper
    ports:
      - '9092:9092'
    volumes:
      - ./docker/kafka-logs:/tmp/kafka-logs
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:12181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:19092,HOST://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:19092,HOST://localhost:9092
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    networks:
      - app

  broker2:
    image: confluentinc/cp-kafka
    hostname: broker
    extra_hosts:
      - "moby:127.0.0.1"
    depends_on:
      - zookeeper
    ports:
      - '29092:29092'
    volumes:
      - ./docker/kafka-logs:/tmp/kafka-logs
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:12181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:39092,HOST://0.0.0.0:29092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker2:39092,HOST://localhost:29092
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    networks:
      - app

  nginx:
    image: nginx:alpine
    restart: unless-stopped
    container_name: shipping-report-nginx
    ports:
      - 8000:80
    volumes:
      - ./:/var/www
      - ./docker-compose/nginx:/etc/nginx/conf.d/
    networks:
      - app

  redis:
    image: 'bitnami/redis:5.0'
    hostname: redis-report
    environment:
      # ALLOW_EMPTY_PASSWORD is recommended only for development.
      - ALLOW_EMPTY_PASSWORD=yes
    ports:
      - '6382:6379'
    restart: unless-stopped
    volumes:
      - 'redis_data:/var/www/redis'
    networks:
      - app

  mongo:
    image: mongo
    hostname: mongo
    container_name: shipping-report-mongo
    ports:
      - "27021:27017"
    volumes:
      - mongo_data:/data/db
    restart: unless-stopped
    networks:
      - app

  mariadb5:
    image: 'bitnami/mariadb:latest'
    ports:
      - "3313:3306"
    hostname: mariadb
    environment:
      - ALLOW_EMPTY_PASSWORD=yes
      - MARIADB_DATABASE=kvshipping-report
      - MARIADB_ROOT_USER=root
      - MARIADB_ROOT_PASSWORD=root
      - MARIADB_USER=quyetbq
      - MARIADB_PASSWORD=quyetbq
    restart: unless-stopped
    volumes:
      - mariadb5:/data/db
    networks:
      - app

  manager:
    image: sheepkiller/kafka-manager
    ports:
      - 9001:9000
    environment:
      - ZK_HOSTS=zookeeper:12181
    depends_on:
      - zookeeper
    networks:
      - app

networks:
  app:
    driver: bridge

volumes:
  redis_data:
    driver: local
  mariadb5:
    driver: local
  mongo_data:
    driver: local

And i got error when run cron job enter image description here

CodePudding user response:

Your compose file has two Kafka containers with hostname: broker. Your logs indicate it's failing to connect to at least one of them, probably it's confused which is correct one.

You do not really need two brokers, so I suggest simplifying your setup. Especially, given that your replication factor variables are all set to 1. Also, brokers only use variables starting with KAFKA_ (not KAFKA_CFG_, though; this isn't use by Confluent images at all), so unclear why you've added CONNECT_ ones there.

Then, your PHP service should connect to broker:19092 as KAFKA_BROKERS variable, not 29092 or 39092.

The above should solve your connection attempts, but increasing consumption speed is limited by partitions, not number of brokers. You've not mentioned how many partitions you have.

  • Related