I wrote the script in python that works perfectly fine if executed as-is. What I am trying to do is to break this script into meaningful functions and create main.py to execute this as a proper python application.
Here is my LiveStream.py code with which I am collecting data from the sensor at the beginning of every minute, and sending it to the MySQL database, and also posting it to the URL. As mentioned this works perfectly fine if I execute: python3 LiveStream.py
# Import Dependencies
import board
import pandas as pd
from busio import I2C
import adafruit_bme680
from datetime import datetime, timedelta
import time
import requests
import mysql.connector
import json
import sqlalchemy
# read database config file
with open("config.json") as config:
param = json.load(config)
# Create library object using Bus I2C port
i2c = I2C(board.SCL, board.SDA)
bme680 = adafruit_bme680.Adafruit_BME680_I2C(i2c, debug=False)
# change this to match the location's pressure (hPa) at sea level
bme680.sea_level_pressure = 1013.25
# Read data from sensors
while True:
# Create the now variable to capture the current moment
TimeStamp = datetime.now()
Temperature = round((bme680.temperature * 9/5) 32, 2)
Gas = round(bme680.gas, 2)
Humidity = round(bme680.humidity, 2)
Pressure = round(bme680.pressure, 2)
Altitude = round(bme680.altitude, 2)
now = datetime.strftime(TimeStamp,"%Y-%m-%dT%H:%M:%S")
# Adding collected measurements into dataframe
data = pd.DataFrame([
{
"TimeStamp": now,
"Temperature": Temperature,
"Gas": Gas,
"Humidity": Humidity,
"Pressure": Pressure,
"Altitude": Altitude
}
])
# Try establishing connection with database
try:
engine = sqlalchemy.create_engine('mysql mysqlconnector://{0}:{1}@{2}/{3}'.
format(param['MyDemoServer'][0]['user'],
param['MyDemoServer'][0]['password'],
param['MyDemoServer'][0]['host'],
param['MyDemoServer'][0]['database']), echo=False)
# Cleaning the data from existing tables MetricValues and Metrics
db_con = engine.connect()
if db_con.connect():
try:
data.to_sql('sensordata', con = db_con, if_exists = 'append', index = False)
db_con.close()
# Dispose the engine
engine.dispose()
except OSError as e:
print(e)
except OSError as e:
print(e)
# Power BI API
# BI Address to push the data to
url = 'https://api.powerbi.com/beta/94cd2fa9-eb6a-490b-af36-53bf7f5ef485/datasets/2a7a2529-dbfd-4c32-9513-7d5857b61137/rows?noSignUpCheck=1&key=nS3bP1Mo4qN9/p6XJcTBgHBUV/cOZb0edYrK+tVWDg6iWwzRtY16HWUGSqB9YsqF3GHMNO2fe3r5ltB7NhVIvw=='
# post/push data to the streaming API
headers = {
"Content-Type": "application/json"
}
response = requests.request(
method="POST",
url=url,
headers=headers,
data=json.dumps(data.to_json())
)
data = pd.DataFrame()
# Re-run the script at the beginning of every new minute.
dt = datetime.now() timedelta(minutes=1)
dt = dt.replace(second=1)
while datetime.now() < dt:
time.sleep(1)
Here is what I have tried so far... I created a lib folder where I have etl.py file. in this file I tried creating functions such us:
def sensorsreading():
# Create library object using Bus I2C port
i2c = I2C(board.SCL, board.SDA)
bme680 = adafruit_bme680.Adafruit_BME680_I2C(i2c, debug=False)
# change this to match the location's pressure (hPa) at sea level
bme680.sea_level_pressure = 1013.25
# Read data from sensors
while True:
# Create the now variable to capture the current moment
TimeStamp = datetime.now()
Temperature = round((bme680.temperature * 9 / 5) 32, 2)
Gas = round(bme680.gas, 2)
Humidity = round(bme680.humidity, 2)
Pressure = round(bme680.pressure, 2)
Altitude = round(bme680.altitude, 2)
now = datetime.strftime(TimeStamp, "%Y-%m-%dT%H:%M:%S")
# Adding collected measurements into dataframe
data = pd.DataFrame([
{
"TimeStamp": now,
"Temperature": Temperature,
"Gas": Gas,
"Humidity": Humidity,
"Pressure": Pressure,
"Altitude": Altitude
}
])
return data
And also function:
def dataload(data):
# Try establishing connection with database
try:
engine = sqlalchemy.create_engine('mysql mysqlconnector://{0}:{1}@{2}/{3}'.
format(param['MyDemoServer'][0]['user'],
param['MyDemoServer'][0]['password'],
param['MyDemoServer'][0]['host'],
param['MyDemoServer'][0]['database']), echo=False)
# Cleaning the data from existing tables MetricValues and Metrics
db_con = engine.connect()
if db_con.connect():
try:
data.to_sql('sensordata', con=db_con, if_exists='append', index=False)
db_con.close()
# Dispose the engine
engine.dispose()
except OSError as e:
print(e)
except OSError as e:
print(e)
And my main.py looks like this:
import pandas as pd
from datetime import datetime, timedelta
import time
from lib.etl import *
def etl(name):
data = sensorsreading()
dataload(data)
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
etl('PyCharm')
# Re-run the script at the beginning of every new minute.
dt = datetime.now() timedelta(minutes=1)
dt = dt.replace(second=1)
while datetime.now() < dt:
time.sleep(1)
When I run main.py it seems that I am not passing the data frame from sensorsreading() to dataload() function.
Any idea what am I doing wrong here?
CodePudding user response:
To address you original question, you were using yield instead of return. Yields is used in generators, as you can read more here: https://www.geeksforgeeks.org/use-yield-keyword-instead-return-keyword-python/
In the case you don't need a precise execution, this will call the function each 60 seconds. Anyways, I'll sugest using a scheduler like systemctl or cron.
import time
while True:
etl('PyCharm')
time.sleep(60)
If you want something more precise you could use:
import time
starttime = time.time()
while True:
etl('PyCharm')
time.sleep(60.0 - ((time.time() - starttime) % 60.0))
as explained in What is the best way to repeatedly execute a function every x seconds?