I get the following error when running scrapeBackend.py
AttributeError: 'NoneType' object has no attribute 'send'
I am trying to use the send_topic method inherited from the ProducerConsumer class in customKafka.py
import time
import pymongo
from pymongo import MongoClient
from customKafka import ProducerConsumer
class scrapeBackend(ProducerConsumer):
def run(self):
ARTICLE_SCRAPED_KAFKA_TOPIC = "raw_text"
ARTICLE_LIMIT = 20
print("Generating raw article")
print("generating every 3 seconds")
for i in range(1,ARTICLE_LIMIT):
data={
"article_id": i,
"source" : "bbc",
"article" : "this is an article"
}
super().send_topic("raw_text",data)
#collection.insert_one(data)
print(f"done sending {i}")
time.sleep(3)
scrap1=scrapeBackend(True,False,"raw_text","localhost:29092")
scrap1.run()
customKafka.py
from kafka import KafkaConsumer
from kafka import KafkaProducer
import json
class ProducerConsumer:
__isProducer=None
__isConsumer=None
__producer=None
__consumer=None
def __init__(self,isProducer,isConsumer,topic,bootstrap_servers):
if (self.__isProducer):
self.__producer=KafkaProducer(bootstrap_servers=bootstrap_servers)
if (self.__isConsumer):
self.__consumer= KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest',
enable_auto_commit=True,
consumer_timeout_ms = 10000)
def get_producer(self):
return self.__producer
def get_consumer(self):
return self.__consumer
def send_topic(self,topic,data):
self.get_producer().send(topic, json.dumps(data).encode("utf-8"))
Looks like the getProducer() is returning None however it should return something since I initaliazed before running
CodePudding user response:
The issue is in your __init__
function of parent class.
It should be implemented like this:
def __init__(self,isProducer,isConsumer,topic,bootstrap_servers):
if isProducer:
self.__producer=KafkaProducer(bootstrap_servers=bootstrap_servers)
if isConsumer:
self.__consumer= KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest',
enable_auto_commit=True,
consumer_timeout_ms = 10000)
Notice the difference in the if statements. Hope this helps!!
CodePudding user response:
you should call super()
in your __init__
of the subclass then, you should be able to call the function send_topic
from the object scrap1
class scrapeBackend(ProducerConsumer):
def __init__(self, *args, **kwargs):
super(scrapeBackend, self).__init__(*args, **kwargs)
def run(self):
ARTICLE_SCRAPED_KAFKA_TOPIC = "raw_text"
ARTICLE_LIMIT = 20
print("Generating raw article")
print("generating every 3 seconds")
for i in range(1,ARTICLE_LIMIT):
data={
"article_id": i,
"source" : "bbc",
"article" : "this is an article"
}
self.send_topic("raw_text", data)
#collection.insert_one(data)
print(f"done sending {i}")
time.sleep(3)
and your __init__
of ProductConsumer
can't deal with input parameters (the 2 if-conditions), it should be implemented like this:
class ProducerConsumer:
def __init__(self,isProducer,isConsumer,topic,bootstrap_servers):
if isProducer:
self.__producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
if isConsumer:
self.__consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest',
enable_auto_commit=True,
consumer_timeout_ms = 10000)
def get_producer(self):
return self.__producer
def get_consumer(self):
return self.__consumer
def send_topic(self,topic,data):
self.get_producer().send(topic, json.dumps(data).encode("utf-8"))