Home > Blockchain >  Loading many JSON files with nested data structures to form latitude and longitude plot using Dask
Loading many JSON files with nested data structures to form latitude and longitude plot using Dask

Time:05-03

I stumbled onto this video showing how you can load many JSON files and extract the data into a Dask dataframe. It looked ideal for something that I've been wanting to do. I have lot of JSON files in sub-directories (one for each day). Each JSON file holds trip data for a vehicle. Actually subtrips which consist of multiple start and end positions. A file can have multiple SubTrips in a JSON array. There can be multiple StartPosition in a SubTrips. StartPosition is optional within SubTrips. The StartPosition contains a Latitude and Longitude. The data (all on line in the file) looks something like this (also see this example file):

[{"SubTrips":[{"StartPosition":{"IsAvl":false,"Source":"Gps","OdometerKilometres":147918,"DistanceSinceReadingKilometres":0,"AgeOfReadingSeconds":9528,"Pdop":0,"Vdop":0,"Hdop":1,"NumberOfSatellites":10,"Heading":0,"AltitudeMetres":38,"SpeedKilometresPerHour":0,"Longitude":18.4726696014404,"Latitude":-33.8071098327637,"Timestamp":"2019-01-01T06:31:51Z","DriverId":-9119609258885412605,"AssetId":-5651694037363791566,"PositionId":816711615055360000},"EndPosition":{

I was able to read the JSON files using the following:

import dask.bag as db
lines = db.read_text("C:/Trips/2019*/*.json")
records = lines.map(json.loads)

However I am having trouble with extracting the nested JSON data structure into a Dask dataframe with each row being a StartPosition record from the JSON files. The youtube video only show one level of JSON, whereas I have multiple levels and multiple arrays of data. The multiple levels seem doable, but I am failing on the arrays. There is also an issue with StartPosition being optional which was sorted out below with a filter. I can do something like this; but that only takes the first subtrip of the file in the arrays, as well as the first start position of a subtrip consisting of multiple start positions:

records.filter(lambda d: "StartPosition" in d[0]["SubTrips"][0]).map(lambda d: d[0]["SubTrips"][0]["StartPosition"]["NumberOfSatellites"]).frequencies(sort=True).compute()

It could be that the first array can be sorted out using this, but I am not sure:

subtrips = records.flatten()

How can I extract all the StartPosition record data into a Dask dataframe? Basically a row with the attributes of StartPosition including mainly the latitude and longitude.

At the end I would like to plot the Dask locations using datashader onto a mapbox map.

CodePudding user response:

It looks like you are very close to the result. In addition to the video, another helpful resource is this tutorial.

To handle the custom logic (like the optionality of the StartPosition), it's best to define custom functions:

from json import loads

from dask.bag import read_text


def process(record):
    # implement custom logic here
    for entry in record.get("SubTrips"):
        if data_of_interest := entry.get("StartPosition"):
            yield {
                k: v
                for k, v in data_of_interest.items()
                if k in ["DriverId", "Longitude", "Latitude"]
            }


def process_json(json_data):
    # this will produce a list of records, so make sure to flatten
    return [result for record in json_data for result in process(record)]


bag = (
    read_text("7044247166111462221*.json")
    .map(loads)
    .map(process_json)
    .flatten()
    .to_dataframe()
)

Applying json.loads to the text/string data gives a dict, the key function is process which defines the logic for converting a specific record and process_json is a wrapper to handle the general case (assuming multiple records per file, but if there's only one record per file this wrapper is not needed). Before converting the data to dataframe, we will need to flatten it, so that each element of the bag is a dict.

  • Related