I stumbled onto this video showing how you can load many JSON files and extract the data into a Dask dataframe. It looked ideal for something that I've been wanting to do. I have lot of JSON files in sub-directories (one for each day). Each JSON file holds trip data for a vehicle. Actually subtrips which consist of multiple start and end positions. A file can have multiple SubTrips
in a JSON array. There can be multiple StartPosition
in a SubTrips
. StartPosition
is optional within SubTrips
. The StartPosition
contains a Latitude
and Longitude
. The data (all on line in the file) looks something like this (also see this example file):
[{"SubTrips":[{"StartPosition":{"IsAvl":false,"Source":"Gps","OdometerKilometres":147918,"DistanceSinceReadingKilometres":0,"AgeOfReadingSeconds":9528,"Pdop":0,"Vdop":0,"Hdop":1,"NumberOfSatellites":10,"Heading":0,"AltitudeMetres":38,"SpeedKilometresPerHour":0,"Longitude":18.4726696014404,"Latitude":-33.8071098327637,"Timestamp":"2019-01-01T06:31:51Z","DriverId":-9119609258885412605,"AssetId":-5651694037363791566,"PositionId":816711615055360000},"EndPosition":{
I was able to read the JSON files using the following:
import dask.bag as db
lines = db.read_text("C:/Trips/2019*/*.json")
records = lines.map(json.loads)
However I am having trouble with extracting the nested JSON data structure into a Dask dataframe with each row being a StartPosition
record from the JSON files. The youtube video only show one level of JSON, whereas I have multiple levels and multiple arrays of data. The multiple levels seem doable, but I am failing on the arrays. There is also an issue with StartPosition
being optional which was sorted out below with a filter
. I can do something like this; but that only takes the first subtrip of the file in the arrays, as well as the first start position of a subtrip consisting of multiple start positions:
records.filter(lambda d: "StartPosition" in d[0]["SubTrips"][0]).map(lambda d: d[0]["SubTrips"][0]["StartPosition"]["NumberOfSatellites"]).frequencies(sort=True).compute()
It could be that the first array can be sorted out using this, but I am not sure:
subtrips = records.flatten()
How can I extract all the StartPosition
record data into a Dask dataframe? Basically a row with the attributes of StartPosition
including mainly the latitude and longitude.
At the end I would like to plot the Dask locations using datashader
onto a mapbox
map.
CodePudding user response:
It looks like you are very close to the result. In addition to the video, another helpful resource is this tutorial.
To handle the custom logic (like the optionality of the StartPosition
), it's best to define custom functions:
from json import loads
from dask.bag import read_text
def process(record):
# implement custom logic here
for entry in record.get("SubTrips"):
if data_of_interest := entry.get("StartPosition"):
yield {
k: v
for k, v in data_of_interest.items()
if k in ["DriverId", "Longitude", "Latitude"]
}
def process_json(json_data):
# this will produce a list of records, so make sure to flatten
return [result for record in json_data for result in process(record)]
bag = (
read_text("7044247166111462221*.json")
.map(loads)
.map(process_json)
.flatten()
.to_dataframe()
)
Applying json.loads
to the text/string data gives a dict, the key function is process
which defines the logic for converting a specific record and process_json
is a wrapper to handle the general case (assuming multiple records per file, but if there's only one record per file this wrapper is not needed). Before converting the data to dataframe, we will need to flatten
it, so that each element of the bag is a dict.