Home > Mobile >  Passing pandas dataframe to fastapi
Passing pandas dataframe to fastapi

Time:12-24

I wish to create an API using which I can take Pandas dataframe as an input and store that in my DB.

I am able to do so with the csv file. However, the problem with that is that, my datatype information is lost (column datatypes like: int, array, float and so on) which is important for what I am trying to do.

I have already read this: Passing a pandas dataframe to FastAPI for NLP ML

I cannot create a class like this:

class Data(BaseModel):
    # id: str
    project: str
    messages: str

The reason being I don't have any fixed schema. the dataframe could be of any shape with varying data types. I have created a dynamic query to create a table as per coming data frame and insert into that dataframe as well.

However, being new to fastapi, I am not able to figure out if there is an efficient way of sending this changing (dynamic) dataframe requirement of mine and store it via the queries that I have created.

If the information is not sufficient, I can try to provide more examples.

Is there a way I can send pandas dataframe from my jupyter notebook itself.

Any guidance on this would be greatly appreciated.

@router.post("/send-df")
async def push_df_funct(
        target_name: Optional[str] = Form(...),
        join_key: str = Form(...),
        local_csv_file: UploadFile = File(None),
        db: Session = Depends(pg.get_db)
):
    """
    API  to upload dataframe to database
    """
    return upload_dataframe(db, featureset_name, local_csv_file, join_key)
def registration_cassandra(self, feature_registation_dict):
        '''
         # Table creation in cassandra as per the given feature registration JSON
         Takes:
         1. feature_registration_dict: Feature registration JSON
         Returns: 
         - Response stating that the table has been created in cassandra
         '''
        logging.info(feature_registation_dict)
        target_table_name = feature_registation_dict.get('featureset_name')
        join_key = feature_registation_dict.get('join_key')
        metadata_list = feature_registation_dict.get('metadata_list')
        
        table_name_delimiter = "__"

        logging.info(metadata_list)

        column_names = [ sub['name'] for sub in metadata_list ]
        data_types = [ DataType.to_cass_datatype(eval(sub['data_type']).value) for sub in metadata_list ]
        
        logging.info(f"Column names: {column_names}")
        logging.info(f"Data types: {data_types}")
        
        ls = list(zip(column_names, data_types))

        target_table_name = target_table_name   table_name_delimiter   join_key

        base_query = f"CREATE TABLE {self.keyspace}.{target_table_name} ("
        
        # CREATE TABLE images_by_month5(tid object PRIMARY KEY , cc_num object,amount object,fraud_label object,activity_time object,month object);

        # create_query_new = "CREATE TABLE vpinference_dev.images_by_month4 (month int,activity_time timestamp,amount double,cc_num varint,fraud_label varint,
        # tid text,PRIMARY KEY (month, activity_time, tid)) WITH CLUSTERING ORDER BY (activity_time DESC, tid ASC)"

        #CREATE TABLE group_join_dates ( groupname text, joined timeuuid, username text, email text, age int, PRIMARY KEY (groupname, joined) )
        flag = True
        for name, data_type in ls:
            base_query  = " "   name
            base_query  = " "   data_type
            #if flag :
            #    base_query  = " PRIMARY KEY "
            #    flag = False
            base_query  = ','
        
        create_query = base_query.strip(',').rstrip(' ')   ', month varchar, activity_time timestamp,'   ' PRIMARY KEY ('   f'month, activity_time, {join_key}) )'   f' WITH CLUSTERING ORDER BY (activity_time DESC, {join_key} ASC'    ');'
        logging.info(f"Query to create table in cassandra: {create_query}")
        try: 
            session = self.get_session()
            session.execute((create_query))
        except Exception as e:
            logging.exception(f"Some error occurred while doing the registration in cassandra. Details :: {str(e)}")
            raise AppException(f"Some error occurred while doing the registration in cassandra. Details :: {str(e)}")

        response = f"Table created successfully in cassandra at: vpinference_dev.{target_table_name}__{join_key};"
        return response

This is the dictionary that I am passing:

feature_registation_dict = {
 'featureSetName': 'data_type_testing_29',
 'teamName': 'Harsh',
 'frequency': 'DAILY',
 'joinKey': 'tid',
 'model_version': 'v1',
 'model_name': 'data type testing',
  'metadata_list': [{'name': 'tid',
   'data_type': 'text',
   'definition': 'Credit Card Number (Unique)'},
  {'name': 'cc_num',
   'data_type': 'bigint',
   'definition': 'Aggregated Metric: Average number of transactions for the card aggregated by past 10 minutes'},
  {'name': 'amount',
   'data_type': 'double',
   'definition': 'Aggregated Metric: Average transaction amount for the card aggregated by past 10 minutes'},
  {'name': 'datetime',
   'data_type': 'text',
   'definition': 'Required feature for event timestamp'}]}

CodePudding user response:

Not sure I understood exactly what you need but I'll give it a try. To send any dataframe to fastapi, you could do something like:

#fastapi
@app.post("/receive_df")
def receive_df(df_in: str):
    df = pd.DataFrame.read_json(df_in)

#jupyter
payload={"df_in":df.to_json()}
requests.post("localhost:8000/receive_df", data=payload)

Can't really test this right now, there's probably some mistakes in there but the gist is just serializing the DataFrame to json and then serializing it in the endpoint. If you need (json) validation, you can also use the pydantic.Json data type. If there is no fixed schema then you can't use BaseModel in any useful way. But just sending a plain json string should be all you need, if your data comes only from reliable sources (your jupyter Notebook).

  • Related