106 lines
4.5 KiB
Python
Executable File
106 lines
4.5 KiB
Python
Executable File
#!/usr/bin/pyhton -u
|
|
|
|
import sqlite3 as sql3db
|
|
import paho.mqtt.client as mqtt
|
|
import time
|
|
import datetime
|
|
import requests
|
|
|
|
MQTT_Broker = "0.0.0.0"
|
|
MQTT_Port = 9888 # mqtt port for internal use, can be whatever, determined by the broker config
|
|
Keep_Alive_Interval = 60 # keep alive interval for broker connection
|
|
MQTT_Topic = "inenvmon" # mqtt topic, set to the same value as the monitoring device
|
|
MQTT_Auth = {'username':"inenvmon", 'password':"mqttsecret"} # mqtt authentication, determined by the broker config
|
|
DB = "/inenvmon/data/inenvmon_data.db" # path to database file
|
|
DB_TABLE = "envdata0" # name of the database table (mqtt topic is fitting here)
|
|
API_PORT = 8080
|
|
API_SECRET = "itsasecret" # set to same value as in the api
|
|
|
|
def write_db(sql, vals=None): # function for writing to the database
|
|
dbconn = sql3db.connect(DB)
|
|
cursor = dbconn.cursor()
|
|
|
|
try:
|
|
if vals is not None:
|
|
cursor.execute(sql, vals)
|
|
else:
|
|
cursor.execute(sql)
|
|
except sql3db.Error as e:
|
|
print(e)
|
|
return False
|
|
|
|
dbconn.commit()
|
|
dbconn.close()
|
|
return True
|
|
|
|
def send_heartbeat(): # function for heartbeat reporting, adjust according to server setup
|
|
return requests.post("http://localhost:%d/api/heartbeat" % API_PORT, data={'message':time.time(),'key': API_SECRET})
|
|
|
|
# function to check character at the start of payload, the corrupted messages usually have the start mangled
|
|
# so a missing first char is a good indicator of corruption
|
|
# the previous value is stored and returned in case of corruption in order to minimize missing data
|
|
# this behavior can be changed if desired by removing the retention
|
|
|
|
def check_msg(message):
|
|
if "prev_val" not in check_msg.__dict__:
|
|
check_msg.prev_val = None
|
|
|
|
if check_msg.prev_val is not None:
|
|
if message[0] != "=": # the checked character as per the monitoring device firmware
|
|
return check_msg.prev_val
|
|
else:
|
|
check_msg.prev_val = message
|
|
return message
|
|
else:
|
|
check_msg.prev_val = message
|
|
return message
|
|
|
|
# on connection callback, used to subscirbe to the set up topic after connecting to the broker
|
|
def on_connect(client, userdata, flags, rc):
|
|
print("connected: " + str(rc))
|
|
mqttc.subscribe(MQTT_Topic, 0)
|
|
print("subscribed!")
|
|
|
|
# on message callback, executes every time a message is published to the topic
|
|
def on_message(mosq, obj, msg):
|
|
payload = msg.payload # save the message payload
|
|
try:
|
|
dec_msg = payload.decode("unicode_escape").split(",") # decode the payload and split at commas
|
|
# out of range value check, replaces erroneous values with no value
|
|
# included as a precaution, the sensors could malfunction and the communication between the devices is not always perfect
|
|
# the ranges are set to what can be reasonably expected in an interior environment, can be tweaked if desired
|
|
if float(dec_msg[1]) > 45 or float(dec_msg[1]) < 0: # temperature
|
|
dec_msg[1] = "NULL"
|
|
if float(dec_msg[2]) < 0 or float(dec_msg[2]) > 100: # relative humidity
|
|
dec_msg[2] = "NULL"
|
|
if int(dec_msg[3]) < 900 or int(dec_msg[3]) > 1100: # pressure
|
|
dec_msg[3] = "NULL"
|
|
if dec_msg[4] == "-1": # co2, value of -1 indicates a measurement error
|
|
dec_msg[4] = "NULL"
|
|
|
|
except Exception as ex:
|
|
print("Excp: %s" % ex) # print exceptions
|
|
|
|
chkd_msg = check_msg(dec_msg)
|
|
|
|
f = '%Y-%m-%d %H:%M:%S' # timestamp format string
|
|
# create table
|
|
sql0 = "CREATE TABLE IF NOT EXISTS %s \
|
|
(\"timestamp\" datetime DEFAULT NULL, \
|
|
\"temp\" decimal(5 , 2) DEFAULT NULL, \
|
|
\"hum\" decimal(5 , 2) DEFAULT NULL, \
|
|
\"pres\" int(4) DEFAULT NULL, \
|
|
\"co2\" int(5) DEFAULT NULL)" % DB_TABLE
|
|
write_db(sql0)
|
|
# formulate sql string with values
|
|
sql = "INSERT INTO %s (timestamp, temp, hum, pres, co2) VALUES (?,?,?,?,?)" % DB_TABLE
|
|
write_db(sql, (datetime.datetime.now().strftime(f), chkd_msg[1], chkd_msg[2], chkd_msg[3], chkd_msg[4])) # execute the sql string
|
|
send_heartbeat() # send heartbeat
|
|
|
|
mqttc = mqtt.Client() # instantiate the client class
|
|
mqttc.on_message = on_message
|
|
mqttc.on_connect = on_connect # set up callbacks
|
|
mqttc.username_pw_set(MQTT_Auth['username'], MQTT_Auth['password']) # set mqtt credentials
|
|
mqttc.connect(MQTT_Broker, int(MQTT_Port), int(Keep_Alive_Interval)) # connect to the broker using provided settings
|
|
mqttc.loop_forever() # keep the connection running as long as the script is running
|