I have a stream of messages produced by Kafka and I'm in need to create mini-batches based of the content on the messages.
Well, to make it simple, consider the empty dataframe below:
import pandas as pd
dict1 = {
"exchange": "binance",
"base": "kp3r",
"quote": "busd",
"resolution": 15,
"ohlcv": [
{
"source_id": 7905,
"timestamp": "2022-07-05T14:45:00.000Z",
"open": "131.5500000000000000",
"high": "131.8400000000000000",
"low": "130.3800000000000000",
"close": "130.9400000000000000",
"volume": "90.2900000000000000"
}]
}
dict2 = {
"exchange": "kucoin",
"base": "mln",
"quote": "usdt",
"resolution": 15,
"ohlcv": [
{
"source_id": 19252,
"timestamp": "2022-07-05T13:00:00.000Z",
"open": "24.5610000000000000",
"high": "24.5610000000000000",
"low": "24.5610000000000000",
"close": "24.5610000000000000",
"volume": "0.0000000000000000"
}]
}
df_cols = ["_".join(list(dict1.values())[1:-2]), "_".join(list(dict2.values())[1:-2])]
df_sub_cols = list(dict1['ohlcv'][0].keys())[:-1]
print(df_cols)
print(df_sub_cols)
mux = pd.MultiIndex.from_product([df_cols, df_sub_cols])
df = pd.DataFrame(columns=mux)
print(df)
How can we fill this dataframe with corresponding values given in dictionaries above?
CodePudding user response:
Here is a solution using pandas explode
, pop
and set_index
.
For the data dict
you have only one data in the ohlcv
list. Hence you won't be able to visualize the data properly. I have added some more values to your list ohlcv
so that its easier to visualize the resultant dataframe.
Lets say both your data is present in a list.
data_dict = [{
"exchange": "binance",
"base": "kp3r",
"quote": "busd",
"resolution": 15,
"ohlcv": [
{
"source_id": 7905,
"timestamp": "2022-07-05T14:45:00.000Z",
"open": "131.55",
"high": "131.84",
"low": "130.38",
"close": "130.94",
"volume": "90.29"
},
{
"source_id": 7905,
"timestamp": "2021-07-05T08:35:00.000Z",
"open": "169.55",
"high": "125.84",
"low": "136.38",
"close": "126.94",
"volume": "900.29"
},
{
"source_id": 19252,
"timestamp": "2022-07-05T13:00:00.000Z",
"open": "24.5610000000000000",
"high": "24.5610000000000000",
"low": "24.5610000000000000",
"close": "24.5610000000000000",
"volume": "0.0000000000000000"
}]
},
{
"exchange": "kucoin",
"base": "mln",
"quote": "usdt",
"resolution": 15,
"ohlcv": [
{
"source_id": 19252,
"timestamp": "2022-07-05T13:00:00.000Z",
"open": "24.56",
"high": "24.56",
"low": "24.56",
"close": "24.56",
"volume": "0.0"
}]
}
]
The two values in data_dict list will be read as 2 rows with pd.DataFrame, using explode to duplicate the rows "exchange","base", "quote", "resolution"
for each value in the list ohlcv
df = pd.DataFrame(data_dict).explode("ohlcv")
This gives us
Now we would like to read dictionary from ohlcv
and convert the keys to columns.
df = df.pop("ohlcv").apply(pd.Series)
This gives us
We would like the resultant table after exploding to join the main dataframe and join corresponding values by the same index.
df = pd.concat([df, df.pop("ohlcv").apply(pd.Series)], axis=1)
This gives us
Now we would like duplicated values in "exchange","base", "quote", "resolution"
to occur only once, we can set them as index values. The values from "source_id","timestamp","open","high","low","close","volume"
should be retained.
df = df.set_index(["exchange","base", "quote", "resolution", "source_id"])
This gives us the expected output :
Coming to your code for the 2 dictionaries dict1
and dict2
.
All you need to do is :
df = pd.DataFrame([dict1, dict2]).explode("ohlcv")
df = pd.concat([df, df.pop("ohlcv").apply(pd.Series)], axis=1)
df = df.set_index(["exchange","base", "quote", "resolution", "source_id"])
This gives you :