inenvmon_server/inenvmon/inenvmon_collector.py
2022-05-11 10:28:13 +02:00

106 lines
4.5 KiB
Python
Executable File

#!/usr/bin/pyhton -u
import sqlite3 as sql3db
import paho.mqtt.client as mqtt
import time
import datetime
import requests
MQTT_Broker = "127.0.0.1" # usually on the same machine, if not, adjust accordingly
MQTT_Port = 9888 # mqtt port for internal use, can be whatever, determined by the broker config
Keep_Alive_Interval = 60 # keep alive interval for broker connection
MQTT_Topic = "inenvmon" # mqtt topic, set to the same value as the monitoring device
MQTT_Auth = {'username':"inenvmon", 'password':"mqttsecret"} # mqtt authentication, determined by the broker config
DB = "/inenvmon/data/inenvmon_data.db" # path to database file
DB_TABLE = "envdata0" # name of the database table (mqtt topic is fitting here)
API_PORT = 8080
API_SECRET = "itsasecret" # set to same value as in the api
def write_db(sql, vals=None): # function for writing to the database
dbconn = sql3db.connect(DB)
cursor = dbconn.cursor()
try:
if vals is not None:
cursor.execute(sql, vals)
else:
cursor.execute(sql)
except sql3db.Error as e:
print(e)
return False
dbconn.commit()
dbconn.close()
return True
def send_heartbeat(): # function for heartbeat reporting, adjust according to server setup
return requests.post("http://localhost:%d/api/heartbeat" % API_PORT, data={'message':time.time(),'key': API_SECRET})
# function to check character at the start of payload, the corrupted messages usually have the start mangled
# so a missing first char is a good indicator of corruption
# the previous value is stored and returned in case of corruption in order to minimize missing data
# this behavior can be changed if desired by removing the retention
def check_msg(message):
if "prev_val" not in check_msg.__dict__:
check_msg.prev_val = None
if check_msg.prev_val is not None:
if message[0] != "=": # the checked character as per the monitoring device firmware
return check_msg.prev_val
else:
check_msg.prev_val = message
return message
else:
check_msg.prev_val = message
return message
# on connection callback, used to subscirbe to the set up topic after connecting to the broker
def on_connect(client, userdata, flags, rc):
print("connected: " + str(rc))
mqttc.subscribe(MQTT_Topic, 0)
print("subscribed!")
# on message callback, executes every time a message is published to the topic
def on_message(mosq, obj, msg):
payload = msg.payload # save the message payload
try:
dec_msg = payload.decode("unicode_escape").split(",") # decode the payload and split at commas
# out of range value check, replaces erroneous values with no value
# included as a precaution, the sensors could malfunction and the communication between the devices is not always perfect
# the ranges are set to what can be reasonably expected in an interior environment, can be tweaked if desired
if float(dec_msg[1]) > 45 or float(dec_msg[1]) < 0: # temperature
dec_msg[1] = "NULL"
if float(dec_msg[2]) < 0 or float(dec_msg[2]) > 100: # relative humidity
dec_msg[2] = "NULL"
if int(dec_msg[3]) < 900 or int(dec_msg[3]) > 1100: # pressure
dec_msg[3] = "NULL"
if dec_msg[4] == "-1": # co2, value of -1 indicates a measurement error
dec_msg[4] = "NULL"
except Exception as ex:
print("Excp: %s" % ex) # print exceptions
chkd_msg = check_msg(dec_msg)
f = '%Y-%m-%d %H:%M:%S' # timestamp format string
# create table
sql0 = "CREATE TABLE IF NOT EXISTS %s \
(\"timestamp\" datetime DEFAULT NULL, \
\"temp\" decimal(5 , 2) DEFAULT NULL, \
\"hum\" decimal(5 , 2) DEFAULT NULL, \
\"pres\" int(4) DEFAULT NULL, \
\"co2\" int(5) DEFAULT NULL)" % DB_TABLE
write_db(sql0)
# formulate sql string with values
sql = "INSERT INTO %s (timestamp, temp, hum, pres, co2) VALUES (?,?,?,?,?)" % DB_TABLE
write_db(sql, (datetime.datetime.now().strftime(f), chkd_msg[1], chkd_msg[2], chkd_msg[3], chkd_msg[4])) # execute the sql string
send_heartbeat() # send heartbeat
mqttc = mqtt.Client() # instantiate the client class
mqttc.on_message = on_message
mqttc.on_connect = on_connect # set up callbacks
mqttc.username_pw_set(MQTT_Auth['username'], MQTT_Auth['password']) # set mqtt credentials
mqttc.connect(MQTT_Broker, int(MQTT_Port), int(Keep_Alive_Interval)) # connect to the broker using provided settings
mqttc.loop_forever() # keep the connection running as long as the script is running