Home > Blockchain >  Why am I getting AttributeError: 'NoneType' object has no attribute 'send' and h
Why am I getting AttributeError: 'NoneType' object has no attribute 'send' and h

Time:07-18

I get the following error when running scrapeBackend.py

AttributeError: 'NoneType' object has no attribute 'send'

I am trying to use the send_topic method inherited from the ProducerConsumer class in customKafka.py

import time
import pymongo
from pymongo import MongoClient
from customKafka import ProducerConsumer


class scrapeBackend(ProducerConsumer):

    def run(self):
        ARTICLE_SCRAPED_KAFKA_TOPIC = "raw_text"
        ARTICLE_LIMIT = 20
        print("Generating raw article")
        print("generating every 3 seconds")

        for i in range(1,ARTICLE_LIMIT):
            data={
                "article_id": i,
                "source" : "bbc",
                "article" : "this is an article"
            }
            super().send_topic("raw_text",data)
            #collection.insert_one(data)
            print(f"done sending {i}")
            time.sleep(3)

scrap1=scrapeBackend(True,False,"raw_text","localhost:29092")

scrap1.run()

customKafka.py

from kafka import KafkaConsumer
from kafka import KafkaProducer
import json
class ProducerConsumer:
    __isProducer=None
    __isConsumer=None
    __producer=None
    __consumer=None

    def __init__(self,isProducer,isConsumer,topic,bootstrap_servers):
        if (self.__isProducer):
            self.__producer=KafkaProducer(bootstrap_servers=bootstrap_servers)
        if (self.__isConsumer):
            self.__consumer= KafkaConsumer(
                topic,
                bootstrap_servers=bootstrap_servers,
                auto_offset_reset='earliest',
                enable_auto_commit=True,
                consumer_timeout_ms = 10000)

    def get_producer(self):
        return self.__producer

    def get_consumer(self):
        return self.__consumer

    def send_topic(self,topic,data):
        self.get_producer().send(topic, json.dumps(data).encode("utf-8"))

Looks like the getProducer() is returning None however it should return something since I initaliazed before running

CodePudding user response:

The issue is in your __init__ function of parent class.

It should be implemented like this:

def __init__(self,isProducer,isConsumer,topic,bootstrap_servers):
        if isProducer:
            self.__producer=KafkaProducer(bootstrap_servers=bootstrap_servers)
        if isConsumer:
            self.__consumer= KafkaConsumer(
                topic,
                bootstrap_servers=bootstrap_servers,
                auto_offset_reset='earliest',
                enable_auto_commit=True,
                consumer_timeout_ms = 10000)

Notice the difference in the if statements. Hope this helps!!

CodePudding user response:

you should call super() in your __init__ of the subclass then, you should be able to call the function send_topic from the object scrap1

class scrapeBackend(ProducerConsumer):
    def __init__(self, *args, **kwargs):
        super(scrapeBackend, self).__init__(*args, **kwargs)

    def run(self):
        ARTICLE_SCRAPED_KAFKA_TOPIC = "raw_text"
        ARTICLE_LIMIT = 20
        print("Generating raw article")
        print("generating every 3 seconds")

        for i in range(1,ARTICLE_LIMIT):
            data={
                "article_id": i,
                "source" : "bbc",
                "article" : "this is an article"
            }
            self.send_topic("raw_text", data)
            #collection.insert_one(data)
            print(f"done sending {i}")
            time.sleep(3)

and your __init__ of ProductConsumer can't deal with input parameters (the 2 if-conditions), it should be implemented like this:

class ProducerConsumer:
    def __init__(self,isProducer,isConsumer,topic,bootstrap_servers):
        if isProducer:
            self.__producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
        if isConsumer:
            self.__consumer = KafkaConsumer(
                topic,
                bootstrap_servers=bootstrap_servers,
                auto_offset_reset='earliest',
                enable_auto_commit=True,
                consumer_timeout_ms = 10000)

    def get_producer(self):
        return self.__producer

    def get_consumer(self):
        return self.__consumer

    def send_topic(self,topic,data):
        self.get_producer().send(topic, json.dumps(data).encode("utf-8"))
  • Related