Server dockerization and related restructuring
This commit is contained in:
parent
a152beb864
commit
d7c53601fa
|
@ -0,0 +1,13 @@
|
|||
FROM debian:stable
|
||||
|
||||
RUN mkdir -p /inenvmon/data && apt-get update && apt-get install python3 python3-pip mosquitto -y && python3 -m pip install poetry --quiet && python3 -m poetry config virtualenvs.create false
|
||||
COPY mosquitto_users /etc/mosquitto/passwd
|
||||
COPY mosquitto.conf /etc/mosquitto/conf.d/default.conf
|
||||
COPY inenvmon/pyproject.toml /inenvmon/pyproject.toml
|
||||
RUN mkdir /var/run/mosquitto && chown mosquitto: /var/run/mosquitto && mosquitto_passwd -U /etc/mosquitto/passwd && cd /inenvmon && python3 -m poetry install
|
||||
|
||||
EXPOSE 8080
|
||||
EXPOSE 9888
|
||||
WORKDIR /inenvmon
|
||||
CMD python3 /inenvmon/start.py
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
version: "3"
|
||||
services:
|
||||
inenvmon:
|
||||
container_name: inenvmon_docker
|
||||
build: .
|
||||
ports:
|
||||
- "8088:8080/tcp"
|
||||
- "9885:9888/tcp"
|
||||
volumes:
|
||||
- './inenvmon:/inenvmon'
|
|
@ -7,20 +7,21 @@ import datetime
|
|||
import requests
|
||||
|
||||
MQTT_Broker = "127.0.0.1" # usually on the same machine, if not, adjust accordingly
|
||||
MQTT_Port = 9883 # mqtt port for internal use, can be whatever, determined by the broker config
|
||||
MQTT_Port = 9888 # mqtt port for internal use, can be whatever, determined by the broker config
|
||||
Keep_Alive_Interval = 60 # keep alive interval for broker connection
|
||||
MQTT_Topic = "topic" # mqtt topic, set to the same value as the monitoring device
|
||||
MQTT_Auth = {'username':"", 'password':""} # mqtt authentication, determined by the broker config
|
||||
DB = "./inenvmon_data.db" # path to database file
|
||||
MQTT_Topic = "inenvmon" # mqtt topic, set to the same value as the monitoring device
|
||||
MQTT_Auth = {'username':"inenvmon", 'password':"mqttsecret"} # mqtt authentication, determined by the broker config
|
||||
DB = "/inenvmon/data/inenvmon_data.db" # path to database file
|
||||
DB_TABLE = "envdata0" # name of the database table (mqtt topic is fitting here)
|
||||
API_SECRET = "secret" # set to same value as in the api
|
||||
API_PORT = 8080
|
||||
API_SECRET = "itsasecret" # set to same value as in the api
|
||||
|
||||
def write_db(sql): # function for writing to the database
|
||||
def write_db(sql, vals1): # function for writing to the database
|
||||
dbconn = sql3db.connect(DB)
|
||||
cursor = dbconn.cursor()
|
||||
|
||||
try:
|
||||
cursor.execute(sql)
|
||||
cursor.execute(sql, vals)
|
||||
except sql3db.Error as e:
|
||||
print(e)
|
||||
return False
|
||||
|
@ -30,9 +31,9 @@ def write_db(sql): # function for writing to the database
|
|||
return True
|
||||
|
||||
def send_heartbeat(): # function for heartbeat reporting, adjust according to server setup
|
||||
return requests.post("http://localhost:80/api/heartbeat", data={'message':time.time(),'key': API_SECRET})
|
||||
return requests.post("http://localhost:%d/api/heartbeat" % API_PORT, data={'message':time.time(),'key': API_SECRET})
|
||||
|
||||
# function to check character at the start of payload, the corrupted messages usually have the start mangled
|
||||
# function to check character at the start of payload, the corrupted messages usually have the start mangled
|
||||
# so a missing first char is a good indicator of corruption
|
||||
# the previous value is stored and returned in case of corruption in order to minimize missing data
|
||||
# this behavior can be changed if desired by removing the retention
|
||||
|
@ -89,8 +90,8 @@ def on_message(mosq, obj, msg):
|
|||
\"co2\" int(5) DEFAULT NULL)" % DB_TABLE
|
||||
write_db(sql0)
|
||||
# formulate sql string with values
|
||||
sql = "INSERT INTO %s (timestamp, temp, hum, pres, co2) VALUES (\"%s\", %s, %s, %s, %s)" % (DB_TABLE, datetime.datetime.now().strftime(f), chkd_msg[1], chkd_msg[2], chkd_msg[3], chkd_msg[4])
|
||||
write_db(sql) # execute the sql string
|
||||
sql = "INSERT INTO %s (timestamp, temp, hum, pres, co2) VALUES (?,?,?,?,?)" % DB_TABLE
|
||||
write_db(sql, (datetime.datetime.now().strftime(f), chkd_msg[1], chkd_msg[2], chkd_msg[3], chkd_msg[4])) # execute the sql string
|
||||
send_heartbeat() # send heartbeat
|
||||
|
||||
mqttc = mqtt.Client() # instantiate the client class
|
|
@ -0,0 +1,19 @@
|
|||
[tool.poetry]
|
||||
name = "inenvmon_docker"
|
||||
version = "1.0"
|
||||
description = ""
|
||||
authors = ["Andrej Pillar <192235@vutbr.cz>"]
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = ">=3.9"
|
||||
Flask = "^2.0.2"
|
||||
pandas = "^1.3.5"
|
||||
waitress = "^2.0.0"
|
||||
paho-mqtt = "^1.6.1"
|
||||
requests = "^2.27.1"
|
||||
|
||||
[tool.poetry.dev-dependencies]
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry-core>=1.0.0"]
|
||||
build-backend = "poetry.core.masonry.api"
|
|
@ -0,0 +1,49 @@
|
|||
import os
|
||||
import sys
|
||||
import threading
|
||||
import subprocess
|
||||
import signal
|
||||
|
||||
class A_process(object):
|
||||
def __init__(self, name, cmd):
|
||||
self.name = name
|
||||
self.cmd = cmd
|
||||
|
||||
def run(self):
|
||||
output = ""
|
||||
self.popen = subprocess.Popen(self.cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||
while self.popen.poll() is None:
|
||||
line = self.popen.stdout.readline().rstrip()
|
||||
if line:
|
||||
output += "{}\n".format(line)
|
||||
print(line)
|
||||
sys.stdout.flush()
|
||||
self.popen.wait()
|
||||
|
||||
def signal_handler(self, signum, frame):
|
||||
print("INFO: {}: Received signal {}".format(self.name, signum))
|
||||
print("Terminating {} {}".format(self.name, self.cmd))
|
||||
self.popen.terminate()
|
||||
sys.exit(0)
|
||||
|
||||
def main():
|
||||
print("INFO: Starting Mosquitto MQTT broker")
|
||||
mosquitto_run = threading.Thread(target=A_process("Mosquitto", ["/usr/sbin/mosquitto", "-c", "/etc/mosquitto/mosquitto.conf"]).run,
|
||||
name="Mosquitto MQTT broker",
|
||||
args=(), daemon=True)
|
||||
mosquitto_run.start()
|
||||
|
||||
print("INFO: Starting MQTT data collector")
|
||||
collector_run = threading.Thread(target=A_process("Collector", ["python3", "/inenvmon/inenvmon_collector.py"]).run,
|
||||
name="MQTT listener and data collector",
|
||||
args=(), daemon=True)
|
||||
collector_run.start()
|
||||
|
||||
print("INFO: Starting inenvmon web app and API")
|
||||
inenvmon_proc = A_process("Inenvmon_web", ["python3", "/inenvmon/web/inenvmon_web.py"])
|
||||
signal.signal(signal.SIGINT, inenvmon_proc.signal_handler)
|
||||
signal.signal(signal.SIGTERM, inenvmon_proc.signal_handler)
|
||||
inenvmon_proc.run()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -1,4 +1,5 @@
|
|||
from flask import Flask, render_template, make_response, jsonify, request
|
||||
from flask import Flask, current_app, render_template, make_response, jsonify, request
|
||||
from waitress import serve
|
||||
import sqlite3 as sql3db
|
||||
import datetime
|
||||
import time
|
||||
|
@ -7,22 +8,25 @@ import json
|
|||
import pandas as pd
|
||||
from collections import OrderedDict
|
||||
|
||||
DB = "../inenvmon_data.db" # path to the database file as set up in inenvmon_collector.py
|
||||
PORT = 8080 # port to serve the api on
|
||||
DB = "/inenvmon/data/inenvmon_data.db" # path to the database file as set up in inenvmon_collector.py
|
||||
DB_TABLE = "envdata0" # name of database table, as set up in inenvmon_collector.py
|
||||
API_SECRET = "secret" # api secret for the heartbeat method, set the same in inenvmon_collector.py
|
||||
API_SECRET = "itsasecret" # api secret for the heartbeat method, set the same in inenvmon_collector.py
|
||||
|
||||
class Source():
|
||||
def __init__(self):
|
||||
self.status = "unknown" #used to persistently store status
|
||||
self.last_msg = 0.0
|
||||
|
||||
source = Source()
|
||||
app = Flask(__name__)
|
||||
|
||||
with app.app_context():
|
||||
current_app.source = Source()
|
||||
|
||||
def query_db(sql): #function for executing database queries
|
||||
dbconn = sql3db.connect(DB)
|
||||
cursor = dbconn.cursor()
|
||||
|
||||
|
||||
try:
|
||||
cursor.execute(sql)
|
||||
except sql3db.Error as e:
|
||||
|
@ -37,10 +41,10 @@ def query_db(sql): #function for executing database queries
|
|||
def index():
|
||||
# no actual template rendering is going on, this is used for convenience since the api and web app are on the same domain
|
||||
# the html is only a skeleton, all functionality is in the js
|
||||
return render_template('index.html')
|
||||
return render_template('index.html')
|
||||
|
||||
@app.route('/api/getdata', methods = ['GET','POST'])
|
||||
def process_data():
|
||||
@app.route('/api/getdata', methods = ['GET','POST'])
|
||||
def process_data():
|
||||
if request.method == 'GET':
|
||||
if request.args.get('samples') is not None:
|
||||
try:
|
||||
|
@ -92,34 +96,34 @@ def process_data():
|
|||
# numpy because that's what available without additional work
|
||||
darry = df.to_numpy()
|
||||
start = len(darry) # save the length for further slicing
|
||||
# array slicing and ordering
|
||||
# array slicing and ordering
|
||||
# because js graphing function expects the dataset with inverted timebase
|
||||
# different procedures for different scenarios
|
||||
# in case of missing values
|
||||
if len(darry) != samples:
|
||||
if samples > len(darry):
|
||||
labels=darry[samples-start:start:1,0]
|
||||
temps=darry[samples-start:start:1,1]
|
||||
temps=darry[samples-start:start:1,1]
|
||||
hums=darry[samples-start:start:1,2]
|
||||
press=darry[samples-start:start:1,3]
|
||||
concs=darry[samples-start:start:1,4]
|
||||
else:
|
||||
labels=darry[start-samples:start:1,0]
|
||||
temps=darry[start-samples:start:1,1]
|
||||
temps=darry[start-samples:start:1,1]
|
||||
hums=darry[start-samples:start:1,2]
|
||||
press=darry[start-samples:start:1,3]
|
||||
concs=darry[start-samples:start:1,4]
|
||||
# in case of a single value (when updating the plot)
|
||||
elif len(darry) == 1:
|
||||
labels=darry[0,0]
|
||||
temps=darry[0,1]
|
||||
temps=darry[0,1]
|
||||
hums=darry[0,2]
|
||||
press=darry[0,3]
|
||||
concs=darry[0,4]
|
||||
# in case of uninterrupted dataset
|
||||
else:
|
||||
labels=darry[0:start:1,0]
|
||||
temps=darry[0:start:1,1]
|
||||
temps=darry[0:start:1,1]
|
||||
hums=darry[0:start:1,2]
|
||||
press=darry[0:start:1,3]
|
||||
concs=darry[0:start:1,4]
|
||||
|
@ -128,7 +132,7 @@ def process_data():
|
|||
tempdict = OrderedDict()
|
||||
humdict = OrderedDict()
|
||||
presdict = OrderedDict()
|
||||
co2dict = OrderedDict()
|
||||
co2dict = OrderedDict()
|
||||
# fill the dict straight away with the single value
|
||||
if len(darry) == 1:
|
||||
tempdict[labels.strftime(f)] = str(temps)
|
||||
|
@ -136,7 +140,7 @@ def process_data():
|
|||
presdict[labels.strftime(f)] = str(press)
|
||||
co2dict[labels.strftime(f)] = str(concs)
|
||||
# iterate over the arrays and fill the dicts
|
||||
else:
|
||||
else:
|
||||
for i in range(len(labels)):
|
||||
tempdict[labels[i].strftime(f)] = str(temps[i])
|
||||
humdict[labels[i].strftime(f)] = str(hums[i])
|
||||
|
@ -144,7 +148,7 @@ def process_data():
|
|||
co2dict[labels[i].strftime(f)] = str(concs[i])
|
||||
|
||||
# check status
|
||||
if (time.time() - source.last_msg) > 180:
|
||||
if (time.time() - current_app.source.last_msg) > 180:
|
||||
status = "dead"
|
||||
else:
|
||||
status = "alive"
|
||||
|
@ -152,10 +156,10 @@ def process_data():
|
|||
respdict = OrderedDict()
|
||||
# ready data and corresponding key for json
|
||||
datas = ["status", "temp", "hum", "pres", "co2"]
|
||||
dicts = [status, tempdict, humdict, presdict, co2dict]
|
||||
dicts = [status, tempdict, humdict, presdict, co2dict]
|
||||
# iterate over the data and fill the response dict
|
||||
for i in range(len(datas)):
|
||||
respdict[datas[i]] = dicts[i]
|
||||
respdict[datas[i]] = dicts[i]
|
||||
# formulate json response from the prepared dict
|
||||
response = app.response_class(
|
||||
response=json.dumps(respdict),
|
||||
|
@ -165,16 +169,15 @@ def process_data():
|
|||
return response
|
||||
|
||||
@app.route('/api/heartbeat', methods = ['GET','POST'])
|
||||
def heartbeat(): # device status reporting
|
||||
global source
|
||||
if request.method == 'POST':
|
||||
def heartbeat(): # device status reporting
|
||||
if request.method == 'POST':
|
||||
key = request.form.get('key') # check the secret
|
||||
if key == API_SECRET:
|
||||
if request.form.get('message') is not None:
|
||||
source.last_msg = float(request.form.get('message')) # save the status if timestamp present
|
||||
current_app.source.last_msg = float(request.form.get('message')) # save the status if timestamp present
|
||||
else:
|
||||
source.last_msg = "nodata"
|
||||
source.status = "unknown" #unknown status if no timestamp
|
||||
current_app.source.last_msg = "nodata"
|
||||
current_app.source.status = "unknown" #unknown status if no timestamp
|
||||
#respond with 200 ok
|
||||
post_response = app.response_class(
|
||||
status=200)
|
||||
|
@ -186,19 +189,20 @@ def heartbeat(): # device status reporting
|
|||
return err_response
|
||||
|
||||
if request.method == 'GET':
|
||||
# if get request calculate time delta
|
||||
if (time.time() - source.last_msg) > 180:
|
||||
source.status = "dead"
|
||||
# if get request calculate time delta
|
||||
if (time.time() - current_app.source.last_msg) > 180:
|
||||
current_app.source.status = "dead"
|
||||
else:
|
||||
source.status = "alive"
|
||||
current_app.source.status = "alive"
|
||||
# formulate response json
|
||||
resp_data = {'status': source.status, 'last_msg': source.last_msg}
|
||||
resp_data = {'status': current_app.source.status, 'last_msg': current_app.source.last_msg}
|
||||
get_response=app.response_class(
|
||||
response=json.dumps(resp_data),
|
||||
status=200,
|
||||
mimetype='application/json'
|
||||
)
|
||||
return get_response
|
||||
return get_response
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.run()
|
||||
print("INFO: Starting inenvmon REST API on port {}".format(PORT))
|
||||
serve(app, host="0.0.0.0", port=PORT)
|
Before Width: | Height: | Size: 15 KiB After Width: | Height: | Size: 15 KiB |
|
@ -0,0 +1,3 @@
|
|||
listener 9888 0.0.0.0
|
||||
allow_anonymous false
|
||||
password_file /etc/mosquitto/passwd
|
|
@ -0,0 +1 @@
|
|||
inenvmon:mqttsecret
|
|
@ -1,22 +0,0 @@
|
|||
# webapp
|
||||
|
||||
Flask web app for data visalization. Client side is built with [Chart.js](https://chartjs.org) and [justgage](https://github.com/toorshia/justgage) js libraries and jQuery.
|
||||
|
||||
The app exposes a simple api for querying the database and source sensor status available at /api/*function*.
|
||||
The functions are:
|
||||
|
||||
- /api/getdata
|
||||
- Method: GET, POST
|
||||
- Number of samples can be specified with argument samples=*desired number*, defaults to 120 i.e 2 hours.
|
||||
- Returns json with the datasets and information about sensor status
|
||||
- /api/heartbeat
|
||||
- Method: GET
|
||||
- Returns json with status info and last received message timestamp
|
||||
- Method: POST
|
||||
- Reserved for internal use in reporting sensor status. Requires a secret to be set.
|
||||
|
||||
The client side comprises of several parts:
|
||||
|
||||
- `templates/index.html` provides the basic html skeleton
|
||||
- `static/styles/style.css` takes care of some basic styling and responsive layout
|
||||
- `static/js/main.js` script taking care of creating the plots and gauges, requesting data from server and periodically updating the view
|
|
@ -1,11 +0,0 @@
|
|||
[uwsgi]
|
||||
module = wsgi
|
||||
|
||||
master = true
|
||||
processes = 2
|
||||
|
||||
socket = inenvmon_web.sock
|
||||
chmod-socket = 666
|
||||
vacuum = true
|
||||
|
||||
die-on-term = true
|
|
@ -1,4 +0,0 @@
|
|||
from inenvmon_web import app as application
|
||||
|
||||
if __name__ == "__main__":
|
||||
application.run()
|
Loading…
Reference in New Issue