Home > OS >  Multiprocessing pipeline with Kafka Airflow Kubernetes
Multiprocessing pipeline with Kafka Airflow Kubernetes

Time:10-09

I have a simple python pipeline that crawls a property page for data, the data on the page is divided into states and what type of property. The steps of the pipeline are as follow:

  • A loop over all combinations of state and property type
  • For each combination, a crawler goes through the corresponding page and collects the URLs of all properties
  • For each property, data is crawled, cleaned and enriched before being stored in a SQLite DB

Currently this is a single-threaded and very simple process. I want to improve this and I am looking for modern tools to use in my new pipeline. Both to visualize the status of the processing and run it as a multiprocessing pipeline.

Currently I have a first idea of using Kafka and Airflow. One process crawls a page for property URLs, and creates Kafka messages for each URL. A second process then takes a single Kafka message and processes it; crawl, clean, enrich, store. Meanwhile in Airflow I can have a nice overview of the status of processes and even retry failed ones. There each combination of state and property type is split into separate DAGs.

The issue is however that crawling is not something I can do with multiprocessing, as that will cause too many request to the target page and calls will become blocked eventually. The pipeline will fail.

My new idea is to also include Kubernetes. I will have one pod that does the crawling of property URLs. Then a second pod will crawl one property URL at a time. The final pod would be responsible for processing the property data (clean,enrich,store), but this pod I want to have X instances of because crawling the data will be faster than processing it.

Because there is a lot of data crawled for each property (around 20 fields, at least one contains a long description of the property), I do not think Kafka is a good option to transfer information between the pods. But I see no other option to include a work queue. The only option I could think of was that messages always only include the URL of a listing. But after crawling, data is stored in SQLite, and the final pod that will clean and enrich the data, will instead need to pull the data from the SQLite DB. Is that a reasonable idea, or are there better options?

I have tried to google for tutorials and suggestions on how to setup a system with Kubernetes Airflow Kafka, but I find nothing. Some pages are specifically only about running Airflow withing Kubernetes, but there is never information about Kafka. Does this mean the combination is not possible, and if so, why not? Also do you have suggestions for better tools or complete systems that I should look into instead?

Apologies if my question is too vague or open, I could not find other places where I could find suggestions for building up this pipeline in the best way possible and give me skills to find a job.

CodePudding user response:

If you necessarily want to deploy Kafka on K8s you can use https://strimzi.io/ which is a mature open source operator allowing to deploy Kafka in Kubernetes quite easily.

However, I think you can implement what you want to achieve with some simple cloud serverless features, in GCP you can use pubsub Cloud functions.

There are equivalent services in other public clouds.

CodePudding user response:

If you want to know why the runtime of a program is so long, you need to profile it.

Start with profiling the current single-threaded implementation using the cProfile module from the standard library. Try:

python -m cProfile -s cumtime yourscript.py <arguments>

This will give you an idea where your program is spending its time.

I cannot stress enough how important this is! Sometimes the bottlenecks are totally not what you think they are.

For example, in one of my programs it turned out that it spent most of its time in string formatting! And by experimenting I found out that I could halve that time by using the appropriate type speficier:

In [1]: f = 139.3592529296875;

In [3]: %timeit str(f)
724 ns ± 0.31 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

In [4]: %timeit "{0}".format(f)
734 ns ± 1.21 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

In [5]: %timeit "{0:.5f}".format(f)
314 ns ± 0.0365 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

In [7]: %timeit "{0:f}".format(f)
313 ns ± 7.35 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

In [8]: %timeit "{0:e}".format(f)
382 ns ± 0.171 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

Only after you have profiled your script can you think about improvements. Generally, I would advise to start with whatever code takes longest (but not built-ins; not much you can do about those).

Keep in mind that the largest improvement often comes from a better algorithm.

Later in the development of the same program, I converted the numerical data to strings once before building the strings, saving a lot of duplicated effort because all the numbers were used multiple times.

CodePudding user response:

Airflow is for scheduling programs, maybe within Kubernetes pods, but not needed; you could run a standalone Airflow worker cluster.

Regarding Kafka, Airflow isn't really needed for consuming since Kafka topics are endless and continuous. You could publish a url to a multi partitioned Kafka topic, then run as many consumers (threads or processes) as partitions, then you have your parallel processing. Since processing is in parallel, don't use sqlite, since that would require only one instance consuming all data.

You can still use Kubernetes to do that processing with Knative or OpenFaaS, instead, for example.

You could also use NATS or RabbitMQ since you just need a Queue. Or Celery and Redis are commonly used with Python.

  • Related