Browse Source

Merge branch 'dev' of sixfab-power/agent into master

master
bisguzar 1 month ago
parent
commit
5b2f95c4a5

+ 10
- 15
agent.py View File

@@ -1,33 +1,28 @@
import os
import logging

from core import Agent
from core.modules import fixer
from configparser import ConfigParser

__version__ = "0.1.5"
__version__ = "0.2.1"


# is_debugger_true = os.getenv('ENABLE_PMS_AGENT_DEBUG')
# is_debugger_true = True if is_debugger_true == "True" else False
is_debugger_true = True # debug is always enabled for now.

logging.basicConfig(level=logging.DEBUG if is_debugger_true else logging.CRITICAL)

environments = ConfigParser()
environments.read("/opt/sixfab/.env")
environments = environments["pms"]
environments_object = ConfigParser()
environments_object.read("/opt/sixfab/.env")
pms_environments = environments_object["pms"]

configs = {
"version": __version__,
"feeder_interval": int(environments.get("INTERVAL", 10)),
"firmware_update_repository": "https://git.sixfab.com/sixfab-power/firmwares.git",
"feeder_interval": int(pms_environments.get("INTERVAL", 10)),
"experimental_enabled": True if pms_environments.get("EXPERIMENTAL", False) == "True" else False,
"environments": pms_environments,
"environments_object": environments_object,
"firmware_update_repository": "https://git.sixfab.com/sixfab-power/firmwares.git"
}

if __name__ == "__main__":
agent = Agent(
environments["TOKEN"],
pms_environments["TOKEN"],
configs=configs
)
agent.loop()

+ 158
- 43
core/__init__.py View File

@@ -1,29 +1,39 @@
import os
import time
import json
import logging
from threading import Thread, Lock
import logging.handlers
import subprocess
import paho.mqtt.client as mqtt

from uuid import uuid4
from pms_api import SixfabPMS
from typing import List
from threading import Thread, Lock

from .modules import *
from .modules.set_configurations import update_timezone

from .helpers.configs import config_object_to_string
from .helpers import network
from .helpers.logger import initialize_logger

MQTT_HOST = "power.sixfab.com"
MQTT_PORT = 1883

COMMANDS = {"healthcheck": health_check, "configurations": set_configurations}

logger = initialize_logger()

class Agent(object):
def __init__(
self, token: str, configs: dict, lwt: bool = True, enable_feeder: bool = True,
):
client = mqtt.Client()
client = mqtt.Client(protocol=mqtt.MQTTv31,
client_id=f"device/{uuid4().hex}")
self.client = client
self.token = token
self.configs = configs
self.PMSAPI = SixfabPMS()
self.is_connected = False

self.lock_thread = Lock()

@@ -37,7 +47,6 @@ class Agent(object):
retain=True,
)

client.connect(MQTT_HOST, MQTT_PORT, keepalive=120)
client.on_connect = self.__on_connect
client.on_message = self.__on_message
client.on_disconnect = self.__on_disconnect
@@ -47,29 +56,115 @@ class Agent(object):
self.PMSAPI = SixfabPMS()

def loop(self):
listener = Thread(target=self.client.loop_forever)
feeder = Thread(target=self.feeder)
ping_addr = "power.sixfab.com"
ping_host = None

Thread(target=self.routine_worker).start()
Thread(target=self.feeder_worker).start()

while True:
if network.is_network_available(ping_host or ping_addr):

if not ping_host:
ping_host = network.get_host_by_addr(ping_addr)

if not self.is_connected:
logger.debug("[LOOP] Network online, starting mqtt agent")
self.client.connect(
self.configs["environments"].get("MQTT_HOST", MQTT_HOST),
MQTT_PORT,
keepalive=30
)
self.client.loop_start()
self.is_connected = True

time.sleep(30)
else:
if ping_host:
ping_host = None
continue

if self.is_connected:
logger.debug("[LOOP] Network ofline, blocking mqtt agent")
self.is_connected = False
self.client.loop_stop()
self.client.disconnect()

listener.start()
feeder.start()
time.sleep(10)

def feeder(self):
def feeder_worker(self):
""" Feeds cloud with sensor datas """
while True:
if not self.is_connected:
time.sleep(1)
continue

try:
logging.debug("[FEEDER] Starting, locking")
logger.debug("[FEEDER] Starting, locking")
with self.lock_thread:
self.client.publish(
"/device/{token}/feed".format(token=self.token),
json.dumps(
read_data(self.PMSAPI, agent_version=self.configs["version"])
read_data(
self.PMSAPI, agent_version=self.configs["version"]
)
),
)
logging.debug("[FEEDER] Done, releasing setters")
logger.debug("[FEEDER] Done, releasing setters")

time.sleep(self.configs["feeder_interval"])
except:
time.sleep(1)

def routine_worker(self):
while True:
with self.lock_thread:
try:
self.PMSAPI.softPowerOff()
self.PMSAPI.softReboot()
self.PMSAPI.sendSystemTemp()
except Exception as e:
logger.debug("[ROUTINE WORKER] Error occured, trying again in 15secs")
else:
logger.debug("[ROUTINE WORKER] Metrics sent to hat")

time.sleep(15)

def _upsert_environments(self, items: List[tuple]):
"""
Update environments for Power Management System
Params:
items: list, contains (key, value) pairs for every configurations to upsert
"""
environments = self.configs["environments_object"]

for key, value in items:
environments.set("pms", str(key), str(value))

subprocess.call(f"echo \"{config_object_to_string(environments)}\" | sudo tee /opt/sixfab/.env",
shell=True, stdout=subprocess.DEVNULL)
return True

def _wait_ntp_and_update_rtc(self, timezone):
while True:
is_ntp_synchronized = subprocess.check_output(
["timedatectl"]).decode()
is_ntp_synchronized = is_ntp_synchronized[is_ntp_synchronized.find(
"synchronized: ")+14:]
is_ntp_synchronized = is_ntp_synchronized[:is_ntp_synchronized.find(
"\n")]

if is_ntp_synchronized == 'yes':
logger.debug("NTP synchronized, updating timezone")

with self.lock_thread:
logger.debug("Setting RTC timezone to " + timezone)
update_timezone(self.PMSAPI, timezone)

return True
logger.debug("Waiting for NTP synchronization")
time.sleep(15)

def _lock_feeder_for_firmware_update(self):
with self.lock_thread:
update_firmware(
@@ -77,6 +172,7 @@ class Agent(object):
repository=self.configs["firmware_update_repository"],
mqtt_client=self.client,
token=self.token,
experimental_enabled=self.configs["experimental_enabled"],
)

time.sleep(15)
@@ -87,34 +183,51 @@ class Agent(object):
commandID = message.get("commandID", None)
command_data = message.get("data", {})

if COMMANDS.get(command, False):
if "connected" in message:
logger.info(
"[CONNECTION] status message recieved from broker")
if not message["connected"]:
logger.warning(
"[CONNECTION] looks like broker thinks we are disconnected, sending status message again")
self.client.publish(
"/device/{}/status".format(self.token),
json.dumps({"connected": True}),
retain=True,
)
logger.info(
"[CONNECTION] status changed to true")

return

if command == "configurations":
def _lock_and_execute_command():
if "interval" in command_data:
new_feeder_interval = command_data["interval"]
self.configs["feeder_interval"] = new_feeder_interval
self._upsert_environments(
[('interval', new_feeder_interval)])

with self.lock_thread:
executed_command_output = COMMANDS[command](
self.PMSAPI, command_data
is_configured = set_configurations(
self.PMSAPI, command_data, configs=self.configs
)

if command == "configurations":
response = json.dumps({
"command": "update_status_configurations",
"commandID": commandID,
"response": {"updated": True},
})
else:
if is_configured:
response = json.dumps({
"command": command,
"commandID": commandID,
"response": executed_command_output,
"command": "update_status_configurations",
"commandID": commandID,
"response": {"updated": True},
})

self.client.publish(
"/device/{userdata}/hive".format(userdata=userdata), response
)
self.client.publish(
"/device/{userdata}/hive".format(
userdata=userdata), response
)

Thread(target=_lock_and_execute_command).start()
return

if command.startswith("update_"):
elif command and command.startswith("update_"):
update_type = command.split("_")[1]

if update_type == "firmware":
@@ -128,21 +241,19 @@ class Agent(object):

def _lock_and_update_agent(**kwargs):
with self.lock_thread:
update_agent(mqtt_client=self.client, token=self.token)
update_agent(
mqtt_client=self.client,
token=self.token,
experimental_enabled=self.configs["experimental_enabled"]
)

agent_update_thread = Thread(target=_lock_and_update_agent,)
agent_update_thread.start()
return

elif update_type == "rtc":

def update_timezone_thread():
with self.lock_thread:
logging.debug("Setting RTC time to " + command_data["timezone"])
update_timezone(self.PMSAPI, command_data["timezone"])


Thread(target=update_timezone_thread).start()
Thread(target=self._wait_ntp_and_update_rtc,
args=(command_data["timezone"],)).start()

else:
response = json.dumps(
@@ -158,15 +269,19 @@ class Agent(object):

def __on_connect(self, client, userdata, flags, rc):
print("Connected to the server")
self.client.subscribe("/device/{userdata}/directives".format(userdata=userdata))
self.is_connected = True

self.client.subscribe(f"/device/{self.token}/directives")
self.client.subscribe(f"/device/{self.token}/status")
self.client.publish(
"/device/{userdata}/status".format(userdata=userdata),
f"/device/{self.token}/status",
json.dumps({"connected": True}),
retain=True,
)

def __on_disconnect(self, client, userdata, rc):
print("Disconnected. Result Code: {rc}".format(rc=rc))
self.is_connected = False

def __on_log(self, mqttc, obj, level, string):
print(string)
print(string.replace(obj, "...censored_uuid..."))

+ 0
- 0
core/helpers/__init__.py View File


+ 23
- 0
core/helpers/configs.py View File

@@ -0,0 +1,23 @@
from configparser import ConfigParser

environments_object = ConfigParser()
environments_object.read("/opt/sixfab/.env")

def config_object_to_string(config_object):
cache_string = ""
def _section_parser(section_name):
nonlocal cache_string
nonlocal config_object

cache_string += f"[{section_name}]\n"

for key, value in config_object[section_name].items():
cache_string += f"{key.upper()}={value}\n"

cache_string += "\n"

for section in config_object.sections():
_section_parser(section)

return cache_string

+ 21
- 0
core/helpers/logger.py View File

@@ -0,0 +1,21 @@
import os
import logging
import logging.handlers


def initialize_logger():
logging_file_path = os.path.expanduser("~")+"/.sixfab/"
if not os.path.exists(logging_file_path):
os.mkdir(logging_file_path)

logger = logging.getLogger("agent")
logger.setLevel(logging.DEBUG)

formatter = logging.Formatter("%(asctime)s %(levelname)-8s %(message)s")
log_file_handler = logging.handlers.TimedRotatingFileHandler(filename=logging_file_path+"agent-log", when="midnight", backupCount=3)
log_file_handler.setFormatter(formatter)

logger.addHandler(log_file_handler)

return logger

+ 21
- 0
core/helpers/network.py View File

@@ -0,0 +1,21 @@
import socket
import subprocess

def is_network_available(host: str, size: int=0) -> bool:
try:
output = subprocess.check_output(["ping", "-c1", f"-s{size}", host]).decode()
except Exception as e:
return False
else:
if "1 received" in output:
return True

return False

def get_host_by_addr(addr: str) -> str:
try:
host = socket.gethostbyaddr(addr)[2][0]
except Exception as e:
host = None
return host

+ 52
- 50
core/modules/read_data.py View File

@@ -1,34 +1,20 @@
import logging
import time
import os
import re
import time
import logging

from subprocess import check_output
from pms_api import SixfabPMS

from .recovery import try_until_get

def read_data(api, **kwargs):

api.softPowerOff()
api.softReboot()
api.sendSystemTemp()

def fan_health():
response = try_until_get(api, "getFanHealth")
responses = {0: None, 1: True, 2: False}

return responses[response]

def working_mode():
response = try_until_get(api, "getWorkingMode")
responses = {
0: "n/a",
1: "Charging",
2: "Fully Charged - Adapter Powered",
3: "Battery Powered",
}

return responses[response]

def watchdog_signal():
response = try_until_get(api, "askWatchdogAlarm")
responses = {0: None, 1: True, 2: False}
@@ -45,36 +31,52 @@ def read_data(api, **kwargs):
else:
return '0.0.0'

def get_api_version():
api_version_file_path = "/opt/sixfab/pms/api/setup.py"

if not os.path.exists(api_version_file_path):
return "0.0.0"

file_content = check_output(["sudo", "cat", api_version_file_path]).decode()

for line in file_content.split("\n"):
if "version" in line:
return (
line.split("=")[1]
.replace(",", "")
.replace("'", "")
)

return '0.0.0'

return {
"timestamp": time.time(),
"charge_status": try_until_get(api, "getBatteryLevel"),
"battery_healt": try_until_get(api, "getBatteryHealth"),
"fanspeed": try_until_get(api, "getFanSpeed"),
"fan_health": fan_health(),
"working_status": working_mode(),
"watchdog_signal": watchdog_signal(),
"stats": {
"input": {
"temperature": try_until_get(api, "getInputTemp"),
"voltage": try_until_get(api, "getInputVoltage"),
"current": try_until_get(api, "getInputCurrent"),
"power": try_until_get(api, "getInputPower"),
},
"system": {
"temperature": try_until_get(api, "getSystemTemp"),
"voltage": try_until_get(api, "getSystemVoltage"),
"current": try_until_get(api, "getSystemCurrent"),
"power": try_until_get(api, "getSystemPower"),
},
"battery": {
"temperature": try_until_get(api, "getBatteryTemp"),
"voltage": try_until_get(api, "getBatteryVoltage"),
"current": try_until_get(api, "getBatteryCurrent"),
"power": try_until_get(api, "getBatteryPower"),
},
},
"versions": {
"firmware": firmware_version(),
"agent": kwargs.get("agent_version", "0.0.0")
}
}
"ts": time.time(),
"data": "{firmware_version},{agent_version},{api_version}|{fan_health},{watchdog}|{working_status},{charge_status},{battery_health},{fanspeed},{input_temperature},{input_voltage},{input_current},{input_power},{system_temperature},{system_voltage},{system_current},{system_power},{battery_temperature},{battery_voltage},{battery_current},{battery_power}".format(
firmware_version=firmware_version(),
agent_version=kwargs.get("agent_version", "0.0.0"),
api_version=get_api_version(),

fan_health="T" if fan_health() else "F",
watchdog="T" if watchdog_signal() else "F",

working_status=try_until_get(api, "getWorkingMode"),
charge_status=try_until_get(api, "getBatteryLevel"),
battery_health=try_until_get(api, "getBatteryHealth"),
fanspeed=try_until_get(api, "getFanSpeed"),

input_temperature=try_until_get(api, "getInputTemp"),
input_voltage=try_until_get(api, "getInputVoltage"),
input_current=try_until_get(api, "getInputCurrent"),
input_power=try_until_get(api, "getInputPower"),

system_temperature=try_until_get(api, "getSystemTemp"),
system_voltage=try_until_get(api, "getSystemVoltage"),
system_current=try_until_get(api, "getSystemCurrent"),
system_power=try_until_get(api, "getSystemPower"),

battery_temperature=try_until_get(api, "getBatteryTemp"),
battery_voltage=try_until_get(api, "getBatteryVoltage"),
battery_current=try_until_get(api, "getBatteryCurrent"),
battery_power=try_until_get(api, "getBatteryPower"),
)
}

+ 12
- 12
core/modules/recovery.py View File

@@ -3,31 +3,32 @@ import logging
from pms_api import SixfabPMS
from pms_api.exceptions import CRCCheckFailed

logger = logging.getLogger("agent")

def try_until_get(api, function):
try_count = 1
while True:
if try_count > 5:
logging.error("\033[33m[{}] \033[0m tried for 3 times and couldn't get response".format(function))
logger.error("[{}] tried for 3 times and couldn't get response".format(function))
raise OverflowError("")

try:
resp = getattr(api, function)()
except CRCCheckFailed:
logging.error("\033[33m[{}] \033[0m crc check failed, reinitializing api".format(function))
logger.error("[{}] crc check failed, reinitializing api".format(function))
del api
api = SixfabPMS()
except TypeError:
logging.error("\033[33m[{}] \033[0m TypeError raised, clearing pipe".format(function))
logger.error("[{}] TypeError raised, clearing pipe".format(function))
api.clearPipe()
except Exception as e:
logging.error("\033[33m[{}] \033[0m unknown exception raised".format(function))
logger.error("[{}] unknown exception raised".format(function))
else:
logging.debug("\033[94m[{}] \033[0m done".format(function))
return resp
finally:
try_count += 1
logging.error("[{}] trying again".format(function))
logger.error("[{}] trying again".format(function))
time.sleep(0.5)


@@ -36,26 +37,25 @@ def try_until_done(api, function, *args, **kwargs):

while True:
if try_count > 5:
logging.error("\033[33m[{}] \033[0m tried for 3 times and couldn't get response".format(function))
logger.error("[{}] tried for 3 times and couldn't get response".format(function))
raise OverflowError("")
try:
resp = getattr(api, function)(*args, **kwargs)
except CRCCheckFailed:
logging.error("\033[33m[{}] \033[0m crc check failed, reinitializing api".format(function))
logger.error("[{}] crc check failed, reinitializing api".format(function))
del api
api = SixfabPMS()
except TypeError:
logging.error("\033[33m[{}] \033[0m TypeError raised, clearing pipe".format(function))
logger.error("[{}] TypeError raised, clearing pipe".format(function))
api.clearPipe()
except Exception as e:
logging.error("\033[33m[{}] \033[0m unknown exception raised".format(function))
logger.error("[{}] unknown exception raised".format(function))
else:
logging.debug("\033[94m[{}] \033[0m Function executed success".format(function))
return resp
finally:
try_count += 1


logging.error("[{}] trying again".format(function))
logger.error("[{}] trying again".format(function))
time.sleep(0.5)

+ 60
- 1
core/modules/set_configurations.py View File

@@ -1,5 +1,7 @@
import os
import time
import logging
from subprocess import Popen
from pms_api.definitions import Definition
from pms_api.event import Event
from pms_api.exceptions import CRCCheckFailed
@@ -44,6 +46,57 @@ MAP_ACTIONS = {

MAP_INTERVAL_TYPE = {"seconds": 1, "minutes": 2, "hours": 3}

def update_experimental_status(**kwargs):
ENVIRONMENT_FILE="/opt/sixfab/.env"
REPOSITORIES = ("/opt/sixfab/pms/api", "/opt/sixfab/pms/agent", "/opt/sixfab/pms/firmwares")

if kwargs["current_status"] == kwargs["to_set"]:
return

if kwargs["to_set"] == True: # enable experimental version using
os.system(
"""
if grep -q "EXPERIMENTAL" {ENVIRONMENT_FILE};
then
sudo sed -i 's/EXPERIMENTAL=False/EXPERIMENTAL=True/' {ENVIRONMENT_FILE}
else
echo 'EXPERIMENTAL=True' | sudo tee -a {ENVIRONMENT_FILE}
fi
""".format(ENVIRONMENT_FILE=ENVIRONMENT_FILE))

for repo in REPOSITORIES:
os.system(
"""
cd {repo} &&
sudo git reset --hard &&
sudo git fetch &&
sudo git checkout dev &&
sudo git pull
""".format(repo=repo)
)
else:
os.system("sudo sed -i 's/EXPERIMENTAL=True/EXPERIMENTAL=False/' /opt/sixfab/.env")
os.system(
"""
if grep -q "EXPERIMENTAL" {ENVIRONMENT_FILE};
then
sudo sed -i 's/EXPERIMENTAL=True/EXPERIMENTAL=False/' {ENVIRONMENT_FILE}
else
echo 'EXPERIMENTAL=True' | sudo tee -a {ENVIRONMENT_FILE}
fi
""".format(ENVIRONMENT_FILE=ENVIRONMENT_FILE))

for repo in REPOSITORIES:
os.system(
"""
cd {repo} &&
sudo git reset --hard &&
sudo git checkout master &&
sudo git pull
""".format(repo=repo)
)

Popen("sleep 2 && sudo systemctl restart pms_agent", shell=True)

def update_timezone(api, timezone):
"""
@@ -82,7 +135,7 @@ def update_timezone(api, timezone):
try_until_done(api, "setRtcTime", epoch_to_set)


def set_configurations(api, data):
def set_configurations(api, data, **kwargs):
update_timezone(api, data["timezone"])

try_until_done(api, "setBatteryDesignCapacity", data["battery_capacity"])
@@ -170,6 +223,12 @@ def set_configurations(api, data):
)

try_until_done(api, "createScheduledEventWithEvent", event_to_save)
if "experimental" in data:
update_experimental_status(
to_set=data.get("experimental"),
current_status=kwargs["configs"].get("experimental_enabled")
)

return True


+ 21
- 4
core/modules/updater.py View File

@@ -11,6 +11,7 @@ def update_firmware(**kwargs):
token = kwargs.get("token")
remote_repository = kwargs.get("repository", None)
mqtt_client = kwargs.get("mqtt_client", None)
experimental_enabled = kwargs.get("experimental_enabled", False)

def send_status(status):
mqtt_client.publish(
@@ -23,9 +24,9 @@ def update_firmware(**kwargs):

send_status("git")
if os.path.exists(LOCAL_FIRMWARE_FOLDER):
os.system(f"cd {LOCAL_FIRMWARE_FOLDER} && sudo git pull")
os.system(f"cd {LOCAL_FIRMWARE_FOLDER} {'&& sudo git fetch && sudo git checkout dev' if experimental_enabled else ''} && sudo git pull")
else:
os.system(f"sudo git clone {remote_repository} {LOCAL_FIRMWARE_FOLDER}")
os.system(f"sudo git clone {remote_repository} {LOCAL_FIRMWARE_FOLDER} {'&& sudo git fetch && sudo git checkout dev' if experimental_enabled else ''}")

latest_version = open(f"{LOCAL_FIRMWARE_FOLDER}/latest_version").read().strip()
latest_firmware = f"{LOCAL_FIRMWARE_FOLDER}/sixfab_pms_firmware_{latest_version}.bin"
@@ -67,6 +68,7 @@ def update_firmware(**kwargs):
def update_agent(**kwargs):
mqtt_client = kwargs.get("mqtt_client")
token = kwargs.get("token")
experimental_enabled = kwargs.get("experimental_enabled", False)

def send_status(status):
mqtt_client.publish(
@@ -79,8 +81,23 @@ def update_agent(**kwargs):
send_status("git")

os.system("cd /opt/sixfab/pms/agent && sudo git reset --hard HEAD && sudo git pull")
os.system("cd /opt/sixfab/pms/api && sudo git reset --hard HEAD && sudo git pull && sudo pip3 install -r requirements.txt && pip3 install .")
os.system(f"""
cd /opt/sixfab/pms/agent
&& sudo git reset --hard HEAD
{'&& sudo git fetch && sudo git checkout dev' if experimental_enabled else ''}
&& sudo git pull
&& sudo pip3 install -r requirements.txt
""".replace("\n", ""))

os.system(f"""
cd /opt/sixfab/pms/api
&& sudo git reset --hard HEAD
{'&& sudo git fetch && sudo git checkout dev' if experimental_enabled else ''}
&& sudo git pull
&& sudo pip3 install -r requirements.txt
&& pip3 install .
""".replace("\n", ""))
send_status("restart")

os.system("sudo systemctl restart pms_agent")

Loading…
Cancel
Save