I'm a beginner in python, I have an Azure function that runs with a time trigger. This function reads a batch of raw JSON data from an Azure service bus with string format.
This is a two-row of data. In reality, I received about 50 like this message is continuous. Now I want to split this message row by row and then archive it to Azure Storage.
The message is like the below sample ( concat of row1 and row2 ) :
{"Name":"","Seri":21000000,"SiName":"","As":"","PId":21070101,"ICheck":0,"SeeNum":405097041391424,"Type":0,"Counter":33,"PaId":0,"MeType":30,"RecTime":"2021-10-21T09:04:41.0151Z","ReaTime":null,"Cape":"2021-10-21T09:04:40.644","Status":0,"text":"{\"TYPE_TAG\":\"00\",\"ENSORAG\":{\"date_time\":\"2021-10-21 09:04:40.644\",\"seber\":10,\"seqmber\":405097041391424,\"lo_name\":\"\",\"accati\":{\"0\":0.0,\"1\":-0.037665367,\"2\":-0.033863068,\"3\":-0.026795387,\"4\":-0.03757,\"5\":-0.02809906,\"6\":-0.016090393,\"7\":-0.040496826,\"8\":-0.05318451,\"9\":-0.025012016,\"10\":-0.057872772}},\"ATTACHED_DEVICE_SERIAL_NUMBER_TAG\":\"21000000\",\"error\":{}}","CerId":null,"Id":null,"Asse":null,"Id":0,"id":"075f0a38-2816-42c7-b95c-66c425b8ba9d","t":-1}{"Name":"","Seri":21000000,"SiName":"","As":"","PId":21070101,"ICheck":0,"SeeNum":405097041391424,"Type":0,"Counter":33,"PaId":0,"MeType":30,"RecTime":"2021-10-21T09:04:41.0151Z","ReaTime":null,"Cape":"2021-10-21T09:04:40.644","Status":0,"text":"{\"TYPE_TAG\":\"00\",\"ENSORAG\":{\"date_time\":\"2021-10-21 09:04:40.644\",\"seber\":10,\"seqmber\":405097041391424,\"lo_name\":\"\",\"accati\":{\"0\":0.0,\"1\":-0.037665367,\"2\":-0.033863068,\"3\":-0.026795387,\"4\":-0.03757,\"5\":-0.02809906,\"6\":-0.016090393,\"7\":-0.040496826,\"8\":-0.05318451,\"9\":-0.025012016,\"10\":-0.057872772}},\"NUMBER_TAG\":\"21000000\",\"error\":{}}","CerId":null,"Id":null,"Asse":null,"Id":0,"id":"075f0a38-2816-42c7-b95c-66c425b8ba9d","t":-1}{"Name":"","Seri":4560000,"SiName":"","As":"","PId":2107401,"ICheck":0,"SeeNum":40509704561424,"Type":0,"Counter":34,"PaId":0,"MeType":31,"RecTime":"2021-10-21T09:04:41.0151Z","ReaTime":null,"Cape":"2021-10-21T09:04:40.644","Status":0,"text":"{\"TYPE_TAG\":\"00\",\"ENSORAG\":{\"date_time\":\"2021-10-21 09:04:40.644\",\"seber\":10,\"seqmber\":405097041391424,\"lo_name\":\"\",\"accati\":{\"0\":0.0,\"1\":-0.037665367,\"2\":-0.033863068,\"3\":-0.026795387,\"4\":-0.03757,\"5\":-0.02809906,\"6\":-0.016090393,\"7\":-0.040496826,\"8\":-0.05318451,\"9\":-0.025012016,\"10\":-0.057872772}},\"ATTACHED_DEVICE_SERIAL_NUMBER_TAG\":\"21000000\",\"error\":{}}","CerId":null,"Id":null,"Asse":null,"Id":0,"id":"075f0a38-2816-42c7-b95c-66c425b8ba9d","t":-1}{"Name":"","Seri":21000000,"SiName":"","As":"","PId":21070101,"ICheck":0,"SeeNum":405097041391424,"Type":0,"Counter":33,"PaId":0,"MeType":30,"RecTime":"2021-10-21T09:04:41.0151Z","ReaTime":null,"Cape":"2021-10-21T09:04:40.644","Status":0,"text":"{\"TYPE_TAG\":\"00\",\"ENSORAG\":{\"date_time\":\"2021-10-21 09:04:40.644\",\"seber\":10,\"seqmber\":405097041391424,\"lo_name\":\"\",\"accati\":{\"0\":0.0,\"1\":-0.037665367,\"2\":-0.033863068,\"3\":-0.026795387,\"4\":-0.03757,\"5\":-0.02809906,\"6\":-0.016090393,\"7\":-0.040496826,\"8\":-0.05318451,\"9\":-0.0254566,\"10\":-0.054562772}},\"NUMBER_TAG\":\"2145600\",\"error\":{}}","CerId":null,"Id":null,"Asse":null,"Id":1,"id":"074222a38-2816-42c7-b95c-6644448ba9d","t":-2}
Row 1 is:
{"Name":"","Seri":21000000,"SiName":"","As":"","PId":21070101,"ICheck":0,"SeeNum":405097041391424,"Type":0,"Counter":33,"PaId":0,"MeType":30,"RecTime":"2021-10-21T09:04:41.0151Z","ReaTime":null,"Cape":"2021-10-21T09:04:40.644","Status":0,"text":"{\"TYPE_TAG\":\"00\",\"ENSORAG\":{\"date_time\":\"2021-10-21 09:04:40.644\",\"seber\":10,\"seqmber\":405097041391424,\"lo_name\":\"\",\"accati\":{\"0\":0.0,\"1\":-0.037665367,\"2\":-0.033863068,\"3\":-0.026795387,\"4\":-0.03757,\"5\":-0.02809906,\"6\":-0.016090393,\"7\":-0.040496826,\"8\":-0.05318451,\"9\":-0.025012016,\"10\":-0.057872772}},\"ATTACHED_DEVICE_SERIAL_NUMBER_TAG\":\"21000000\",\"error\":{}}","CerId":null,"Id":null,"Asse":null,"Id":0,"id":"075f0a38-2816-42c7-b95c-66c425b8ba9d","t":-1}
Row 2 is:
{"Name":"","Seri":4560000,"SiName":"","As":"","PId":2107401,"ICheck":0,"SeeNum":40509704561424,"Type":0,"Counter":34,"PaId":0,"MeType":31,"RecTime":"2021-10-21T09:04:41.0151Z","ReaTime":null,"Cape":"2021-10-21T09:04:40.644","Status":0,"text":"{\"TYPE_TAG\":\"00\",\"ENSORAG\":{\"date_time\":\"2021-10-21 09:04:40.644\",\"seber\":10,\"seqmber\":405097041391424,\"lo_name\":\"\",\"accati\":{\"0\":0.0,\"1\":-0.037665367,\"2\":-0.033863068,\"3\":-0.026795387,\"4\":-0.03757,\"5\":-0.02809906,\"6\":-0.016090393,\"7\":-0.040496826,\"8\":-0.05318451,\"9\":-0.025012016,\"10\":-0.057872772}},\"ATTTAG\":\"21000000\",\"error\":{}}","CerId":null,"Id":null,"Asse":null,"Id":0,"id":"075f0a38-2816-42c7-b95c-66c425b8ba9d","t":-2}
The structure of a row is like the below image:
In my opinion, First I should split each row and then create a data frame and insert each value in the related column. After that, I append to a blob. Is it right?
How can I do? What is your suggested solution?
Edited: My code for reading from service bus:
from azure.servicebus import ServiceBusClient, ServiceBusMessage
connection_str = "**"
topic_name = "***"
subscription_name = "***"
servicebus_client = ServiceBusClient.from_connection_string(
conn_str=connection_str, logging_enable=True)
with servicebus_client:
# get the Subscription Receiver object for the subscription
receiver = servicebus_client.get_subscription_receiver(
topic_name=topic_name, subscription_name=subscription_name, )
with receiver:
for msg in receiver:
print("Received: " str(msg))
# complete the message so that the message is removed from the subscription
receiver.complete_message(msg)
CodePudding user response:
Consider sample data with three rows:
data = '{"Name": "Hassan", "code":"12"}{"Name": "Jack", "code":"345"}{"Name": "Jack", "code":"345"}'
Here is how you can get dataframe from this data:
from ast import literal_eval
data = [literal_eval(d '}')for d in data.split('}')[0:-1]]
df = pd.DataFrame.from_records(data)
Output:
Name code
0 Hassan 12
1 Jack 345
2 Jack 345
CodePudding user response:
Since the messages are sent individually, you can process them individually. There is no need to concatenate into a string. Just keep appending them into a data frame. Below sample is for a queue but you can extend to a topic/subscription. I've also attached the results to show you what the output looks like.
from azure.servicebus import ServiceBusClient
import pandas as pd
import json
from pandas import json_normalize
CONNECTION_STR = 'Endpoint=sb://xxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
QUEUE_NAME = 'xxxxxxxxxxx'
servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR)
with servicebus_client:
receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME)
# create an Empty DataFrame object
df = pd.DataFrame()
msg_concat = ""
dfs = []
with receiver:
received_msgs = receiver.receive_messages(max_message_count=10, max_wait_time=5)
for msg in received_msgs:
msg_dict = json.loads(str(msg))
df2 = json_normalize(msg_dict)
df = df.append(df2, ignore_index = True)
receiver.complete_message(msg)
print(df)
print("Receive is done.")
Name Seri SiName As ... Id Asse id t
0 21000000 ... 0 None 075f0a38-2816-42c7-b95c-66c425b8ba9d -1
1 4560000 ... 0 None 075f0a38-2816-42c7-b95c-66c425b8ba9d -2
[2 rows x 21 columns]
Receive is done.