Browse Source

Update project structure

dev
Yasin Kaya 2 years ago
parent
commit
3905d02ed1
5 changed files with 5 additions and 1001 deletions
  1. 2
    0
      __init__.py
  2. 0
    32
      crc16.py
  3. 0
    758
      pms_api.py
  4. 0
    208
      pms_command.py
  5. 3
    3
      setup.py

+ 2
- 0
__init__.py View File

@@ -0,0 +1,2 @@
pms_command.py
crc16.py

+ 0
- 32
crc16.py View File

@@ -1,32 +0,0 @@

class CRC16:

CRC16_CCITT = 0x1021 #X.25, V.41, HDLC FCS, Bluetooth, ...

def CRC16(self):
return 0

# crc16 base function
def crc16(self, crcValue, newByte):

for i in range(8):
if ((((crcValue & 0x8000) >> 8) ^ (newByte & 0x80)) > 0):
crcValue = (crcValue << 1) ^ self.CRC16_CCITT
else:
crcValue = crcValue << 1
newByte <<= 1

return crcValue


# calculate crc function.
def exampleOfUseCRC16(self, Data, len):
aux = 0
crc = 0xFFFF # Initialization of crc to 0xFFFF for CCITT

while (aux < len):

crc = self.crc16(crc, Data[aux])
aux += 1

return crc

+ 0
- 758
pms_api.py View File

@@ -1,758 +0,0 @@
#!/usr/bin/python3

import RPi.GPIO as GPIO
import time
from pms_command import Command

command = Command()

#############################################################
### Communication Protocol ##################################
#############################################################
bufferSend = list()
bufferRecieve = list()
bufferRecieveIndex = 0

RESPONSE_DELAY = 100

START_BYTE_RECIEVED = 0xDC # Start Byte Recieved
START_BYTE_SENT = 0xCD # Start Byte Sent
PROTOCOL_HEADER_SIZE = 5
PROTOCOL_FRAME_SIZE = 7
COMMAND_SIZE_FOR_FLOAT = 11
COMMAND_SIZE_FOR_DOUBLE = 13
COMMAND_SIZE_FOR_INT16 = 9
COMMAND_SIZE_FOR_INT32 = 11
COMMAND_SIZE_FOR_UINT8 = 8
COMMAND_SIZE_FOR_INT64 = 15

COMMAND_TYPE_REQUEST = 0x01
COMMAND_TYPE_RESPONSE = 0x02

DEVICE_ADDRESS = 0x41 #7 bit address (will be left shifted to add the read write bit)



###########################################
### Private Methods #######################
###########################################

# Function for printing debug message
def debug_print(message):
print(message)

# Function for getting time as miliseconds
def millis():
return int(time.time())

# Function for delay as miliseconds
def delay_ms(ms):
time.sleep(float(ms/1000.0))

#############################################################
### PMS CLASS ###############################################
#############################################################
class SixfabPMS:
board = "Sixfab Raspberry Pi UPS HAT"
# Special Characters
CTRL_Z = '\x1A'
# Initializer function
def __init__(self):
debug_print(self.board + " Class initialized!")

def __del__(self):
#GPIO.cleanup()
print("Class Destructed")

#############################################################
### API Call Methods ########################################
#############################################################
# -----------------------------------------------------------
# Function for getting input temperature
# Parameter : None
# Return : float temp [Celcius]
# -----------------------------------------------------------
def getInputTemp(self):
command.createCommand(command.PROTOCOL_COMMAND_GET_INPUT_TEMP)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_INT32)

temp = int.from_bytes(raw[PROTOCOL_HEADER_SIZE:COMMAND_SIZE_FOR_INT32 - 2], "big")
return temp / 100


# -----------------------------------------------------------
# Function for getting input voltage
# Parameter : None
# Return : float voltage [Volt]
# -----------------------------------------------------------
def getInputVoltage(self):
command.createCommand(command.PROTOCOL_COMMAND_GET_INPUT_VOLTAGE)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_INT32)

voltage = int.from_bytes(raw[PROTOCOL_HEADER_SIZE : COMMAND_SIZE_FOR_INT32 - 2 ], "big")
return voltage / 100


# -----------------------------------------------------------
# Function for getting input current
# Parameter : None
# Return : float current [Ampere]
# -----------------------------------------------------------
def getInputCurrent(self):
command.createCommand(command.PROTOCOL_COMMAND_GET_INPUT_CURRENT)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_INT32)

current = int.from_bytes(raw[PROTOCOL_HEADER_SIZE : COMMAND_SIZE_FOR_INT32 - 2 ], "big")
return current / 100


# -----------------------------------------------------------
# Function for getting input power
# Parameter : None
# Return : float power [Watt]
# -----------------------------------------------------------
def getInputPower(self):
command.createCommand(command.PROTOCOL_COMMAND_GET_INPUT_POWER)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_INT32)

power = int.from_bytes(raw[PROTOCOL_HEADER_SIZE : COMMAND_SIZE_FOR_INT32 - 2 ], "big")
return power / 100


# -----------------------------------------------------------
# Function for getting system temperature
# Parameter : None
# Return : float temp [Celcius]
# -----------------------------------------------------------
def getSystemTemp(self):
command.createCommand(command.PROTOCOL_COMMAND_GET_SYSTEM_TEMP)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_INT32)

temp = int.from_bytes(raw[PROTOCOL_HEADER_SIZE:COMMAND_SIZE_FOR_INT32 - 2], "big")
return temp / 100


# -----------------------------------------------------------
# Function for getting system voltage
# Parameter : None
# Return : float voltage [Volt]
# -----------------------------------------------------------
def getSystemVoltage(self):
command.createCommand(command.PROTOCOL_COMMAND_GET_SYSTEM_VOLTAGE)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_INT32)

voltage = int.from_bytes(raw[PROTOCOL_HEADER_SIZE : COMMAND_SIZE_FOR_INT32 - 2 ], "big")
return voltage / 100


# -----------------------------------------------------------
# Function for getting system current
# Parameter : None
# Return : float current [Ampere]
# -----------------------------------------------------------
def getSystemCurrent(self):
command.createCommand(command.PROTOCOL_COMMAND_GET_SYSTEM_CURRENT)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_INT32)

current = int.from_bytes(raw[PROTOCOL_HEADER_SIZE : COMMAND_SIZE_FOR_INT32 - 2 ], "big")
return current / 100


# -----------------------------------------------------------
# Function for getting system power
# Parameter : None
# Return : float power [Watt]
# -----------------------------------------------------------
def getSystemPower(self):
command.createCommand(command.PROTOCOL_COMMAND_GET_SYSTEM_POWER)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_INT32)

power = int.from_bytes(raw[PROTOCOL_HEADER_SIZE : COMMAND_SIZE_FOR_INT32 - 2 ], "big")
return power / 100


# -----------------------------------------------------------
# Function for getting battery temperature
# Parameter : None
# Return : float temp [Celcius]
# -----------------------------------------------------------
def getBatteryTemp(self):
command.createCommand(command.PROTOCOL_COMMAND_GET_BATTERY_TEMP)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_INT32)

temp = int.from_bytes(raw[PROTOCOL_HEADER_SIZE:COMMAND_SIZE_FOR_INT32 - 2], "big")
return temp / 100


# -----------------------------------------------------------
# Function for getting battery voltage
# Parameter : None
# Return : float voltage [Volt]
# -----------------------------------------------------------
def getBatteryVoltage(self):
command.createCommand(command.PROTOCOL_COMMAND_GET_BATTERY_VOLTAGE)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_INT32)

voltage = int.from_bytes(raw[PROTOCOL_HEADER_SIZE : COMMAND_SIZE_FOR_INT32 - 2 ], "big")
return voltage / 1000


# -----------------------------------------------------------
# Function for getting battery current
# Parameter : None
# Return : float current [Ampere]
# -----------------------------------------------------------
def getBatteryCurrent(self):
command.createCommand(command.PROTOCOL_COMMAND_GET_BATTERY_CURRENT)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_INT32)

current = int.from_bytes(raw[PROTOCOL_HEADER_SIZE : COMMAND_SIZE_FOR_INT32 - 2 ], "big")
return current / 1000


# -----------------------------------------------------------
# Function for getting battery power
# Parameter : None
# Return : float power [Watt]
# -----------------------------------------------------------
def getBatteryPower(self):
command.createCommand(command.PROTOCOL_COMMAND_GET_BATTERY_POWER)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_INT32)

power = int.from_bytes(raw[PROTOCOL_HEADER_SIZE : COMMAND_SIZE_FOR_INT32 - 2 ], "big")
return power / 1000


# -----------------------------------------------------------
# Function for getting battery level
# Parameter : None
# Return : int level [%]
# -----------------------------------------------------------
def getBatteryLevel(self):
command.createCommand(command.PROTOCOL_COMMAND_GET_BATTERY_LEVEL)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_INT16)

level = int.from_bytes(raw[PROTOCOL_HEADER_SIZE : COMMAND_SIZE_FOR_INT16 - 2 ], "big")
return level

# -----------------------------------------------------------
# Function for getting fan health
# Parameter : None
# Return : int level [HEALTY, BROKEN]
# -----------------------------------------------------------
def getFanHealth(self):
command.createCommand(command.PROTOCOL_COMMAND_GET_FAN_HEALTH)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_INT16)

health = int.from_bytes(raw[PROTOCOL_HEADER_SIZE : COMMAND_SIZE_FOR_INT16 - 2 ], "big")
return health


# -----------------------------------------------------------
# Function for getting battery health
# Parameter : None
# Return : int health [%]
# -----------------------------------------------------------
def getBatteryHealth(self):
command.createCommand(command.PROTOCOL_COMMAND_GET_BATTERY_HEALTH)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_INT16)

health = int.from_bytes(raw[PROTOCOL_HEADER_SIZE : COMMAND_SIZE_FOR_INT16 - 2 ], "big")
return health


# -----------------------------------------------------------
# Function for getting fan speed
# Parameter : None
# Return : int speed [RPM]
# -----------------------------------------------------------
def getFanSpeed(self):
command.createCommand(command.PROTOCOL_COMMAND_GET_FAN_SPEED)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_INT16)

rpm = int.from_bytes(raw[PROTOCOL_HEADER_SIZE : COMMAND_SIZE_FOR_INT16 - 2 ], "big")
return rpm


# -----------------------------------------------------------
# Function for setting fan speed
# Parameter : uint8 status [true, false]
# Return : uint8 result [true, false]
# -----------------------------------------------------------
def setFanSpeed(self, status):
command.createSetCommand(command.PROTOCOL_COMMAND_SET_FAN_SPEED, status, 1)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_UINT8)

result = raw[PROTOCOL_HEADER_SIZE]
return result


# -----------------------------------------------------------
# Function for getting watchdog status
# Parameter : None
# Return : int8 status [true, false]
# -----------------------------------------------------------
def getWatchdogStatus(self):
command.createCommand(command.PROTOCOL_COMMAND_GET_WATCHDOG_STATUS)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_UINT8)

status = raw[PROTOCOL_HEADER_SIZE]
return status


# -----------------------------------------------------------
# Function for setting watchdog status
# Parameter : uint8 status [true, false]
# Return : uint8 result [true, false]
# -----------------------------------------------------------
def setWatchdogStatus(self, status):
command.createSetCommand(command.PROTOCOL_COMMAND_SET_WATCHDOG_STATUS, status, 1)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_UINT8)

result = raw[PROTOCOL_HEADER_SIZE]
return result


# -----------------------------------------------------------
# Function for setting RGB animation
# Parameter : uint8 type [DISABLED, HEARTBEAT, TEMP_MAP]
# Parameter : uint8 color [RED, GREEN, BLUE, YELLOW, CYAN, MAGENTA, WHITE]
# Parameter : uint8 speed [SLOW, NORMAL, FAST]
# Return : uint8 result [true, false]
# -----------------------------------------------------------
def setRgbAnimation(self, animType, color, speed):
value = bytearray()
value.append(bytes(animType))
value.append(bytes(color))
value.append(bytes(speed))

command.createSetCommand(command.PROTOCOL_COMMAND_SET_RGB_ANIMATION, value, 3)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_UINT8)

result = raw[PROTOCOL_HEADER_SIZE]
return result



# -----------------------------------------------------------
# Function for getting RGB animation
# Parameter : None
# Return : byteArray(3) - ledAnimation[animType, color, speed]
# -----------------------------------------------------------
def getRgbAnimation(self):
command.createCommand(command.PROTOCOL_COMMAND_GET_RGB_ANIMATION)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(10)

animation = bytearray()
for i in range(3):
animation.append(raw[PROTOCOL_HEADER_SIZE + i])
return animation


# -----------------------------------------------------------
# Function for setting fan automation
# Parameter : uint8 disable-treshold [celcius]
# Parameter : uint8 slow-treshold [celcius]
# Parameter : uint8 fast-treshold [celcius]
# Return : result
# -----------------------------------------------------------
def setFanAutomation(self, disableTreshold, slowTreshold, fastTreshold):

value = bytearray()
value.append(bytes(disableTreshold))
value.append(bytes(slowTreshold))
value.append(bytes(fastTreshold))

command.createSetCommand(command.PROTOCOL_COMMAND_SET_FAN_AUTOMATION, value, 3)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_UINT8)

result = raw[PROTOCOL_HEADER_SIZE]
return result



# -----------------------------------------------------------
# Function for getting fan automation
# Parameter : None
# Return : byteArray(3) - fanAutomation[disableTreshold, slowTreshold, fastTreshold]
# -----------------------------------------------------------
def getFanAutomation(self):
command.createCommand(command.PROTOCOL_COMMAND_GET_FAN_AUTOMATION)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(10)

fanAutomation = bytearray()
for i in range(3):
fanAutomation.append(raw[PROTOCOL_HEADER_SIZE + i])
return fanAutomation


# -----------------------------------------------------------
# Function for setting battery max charge level
# Parameter : uint8 level [%]
# Return : uint8 result [true, false]
# -----------------------------------------------------------
def setBatteryMaxChargeLevel(self, level):
command.createSetCommand(command.PROTOCOL_COMMAND_SET_BATTERY_MAX_CHARGE_LEVEL, level, 1)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_UINT8)

level = raw[PROTOCOL_HEADER_SIZE]
return level


# -----------------------------------------------------------
# Function for getting battery max charge level
# Parameter : None
# Return : int8 level [true, false]
# -----------------------------------------------------------
def getBatteryMaxChargeLevel(self):
command.createCommand(command.PROTOCOL_COMMAND_GET_BATTERY_MAX_CHARGE_LEVEL)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_UINT8)

level = raw[PROTOCOL_HEADER_SIZE + 1]
return level

# -----------------------------------------------------------
# Function for setting safe shutdown battery level
# Parameter : uint8 level [%]
# Return : uint8 result [true, false]
# -----------------------------------------------------------
def setSafeShutdownBatteryLevel(self, level):
command.createSetCommand(command.PROTOCOL_COMMAND_SET_SAFE_SHUTDOWN_BATTERY_LEVEL, level, 1)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_UINT8)

level = raw[PROTOCOL_HEADER_SIZE]
return level


# -----------------------------------------------------------
# Function for getting safe shutdown battery level
# Parameter : None
# Return : int8 level [true, false]
# -----------------------------------------------------------
def getSafeShutdownBatteryLevel(self):
command.createCommand(command.PROTOCOL_COMMAND_GET_SAFE_SHUTDOWN_BATTERY_LEVEL)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_UINT8)

level = raw[PROTOCOL_HEADER_SIZE + 1]
return level


# -----------------------------------------------------------
# Function for setting safe shutdown status
# Parameter : uint8 status [%]
# Return : uint8 result [true, false]
# -----------------------------------------------------------
def setSafeShutdownBatteryStatus(self, status):
command.createSetCommand(command.PROTOCOL_COMMAND_SET_SAFE_SHUTDOWN_STATUS, status, 1)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_UINT8)

status = raw[PROTOCOL_HEADER_SIZE]
return status


# -----------------------------------------------------------
# Function for setting safe shutdown status
# Parameter : None
# Return : int8 status [true, false]
# -----------------------------------------------------------
def getSafeShutdownBatteryStatus(self):
command.createCommand(command.PROTOCOL_COMMAND_GET_SAFE_SHUTDOWN_STATUS)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_UINT8)

status = raw[PROTOCOL_HEADER_SIZE]
return status


# -----------------------------------------------------------
# Function for getting working mode
# Parameter : None
# Return : int8 workingMode [charging, Fully Charged - Adapter Powered, Battery Powered ]
# -----------------------------------------------------------
def getWorkingMode(self):
command.createCommand(command.PROTOCOL_COMMAND_GET_WORKING_MODE)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_UINT8)

mode = raw[PROTOCOL_HEADER_SIZE]
return mode


# -----------------------------------------------------------
# Function for getting button 1
# Parameter : None
# Return : int8 buttonStatus [pressed, released]
# -----------------------------------------------------------
def getButton1Status(self):
command.createCommand(command.PROTOCOL_COMMAND_GET_BUTTON1_STATUS)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_UINT8)

status = raw[PROTOCOL_HEADER_SIZE]
return status


# -----------------------------------------------------------
# Function for getting button 2
# Parameter : None
# Return : int8 buttonStatus [pressed, released]
# -----------------------------------------------------------
def getButton2Status(self):
command.createCommand(command.PROTOCOL_COMMAND_GET_BUTTON2_STATUS)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_UINT8)

status = raw[PROTOCOL_HEADER_SIZE]
return status



# -----------------------------------------------------------
# Function for setting RTC Time
# Parameter : uint64 timestamp [timestamp]
# Return : uint8 result [true, false]
# -----------------------------------------------------------
def setRtcTime(self, timestamp):
command.createSetCommand(command.PROTOCOL_COMMAND_SET_RTC_TIME, timestamp, 8)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_UINT8)

result = raw[PROTOCOL_HEADER_SIZE]
return result


# -----------------------------------------------------------
# Function for getting RTC Time
# Parameter : None
# Return : uint64 timestamp [timestamp]
# -----------------------------------------------------------
def getRtcTime(self):
command.createCommand(command.PROTOCOL_COMMAND_GET_RTC_TIME)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_INT64)

timestamp = int.from_bytes(raw[PROTOCOL_HEADER_SIZE : COMMAND_SIZE_FOR_INT64 - 2], "big")
return timestamp



# -----------------------------------------------------------
# Function for hard powering off
# Parameter : None
# Return : uint8 result [true, false]
# -----------------------------------------------------------
def hardPowerOff(self):
command.createCommand(command.PROTOCOL_COMMAND_HARD_POWER_OFF)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_UINT8)

result = raw[PROTOCOL_HEADER_SIZE]
return result

# -----------------------------------------------------------
# Function for soft powering off
# Parameter : None
# Return : uint8 result [true, false]
# -----------------------------------------------------------
def softPowerOff(self):
command.createCommand(command.PROTOCOL_COMMAND_SOFT_POWER_OFF)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_UINT8)

result = raw[PROTOCOL_HEADER_SIZE]
return result

# -----------------------------------------------------------
# Function for hard rebooting
# Parameter : None
# Return : uint8 result [true, false]
# -----------------------------------------------------------
def hardReboot(self):
command.createCommand(command.PROTOCOL_COMMAND_HARD_REBOOT)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_UINT8)

result = raw[PROTOCOL_HEADER_SIZE]
return result


# -----------------------------------------------------------
# Function for soft rebooting
# Parameter : None
# Return : uint8 result [true, false]
# -----------------------------------------------------------
def softReboot(self):
command.createCommand(command.PROTOCOL_COMMAND_SOFT_REBOOT)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_UINT8)

result = raw[PROTOCOL_HEADER_SIZE]
return result


# -----------------------------------------------------------
# Function for asking watchdog alarm exist
# Parameter : None
# Return : uint8 alarm [true, false]
# -----------------------------------------------------------
def askWatchdogAlarm(self):
command.createCommand(command.PROTOCOL_COMAMND_WATCHDOG_ALARM)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_INT16)

alarm_status = int.from_bytes(raw[PROTOCOL_HEADER_SIZE : COMMAND_SIZE_FOR_INT16 - 2 ], "big")
return alarm_status


# -----------------------------------------------------------
# Function for creating scheduling event
# Parameter : uint8 eventID [id]
# Parameter : uint8 scheduletype [time, interval]
# Parameter : uint8 timeOrInterval [exact time, interval]
# Parameter : uint8 repeat [once, repeated]
# Parameter : uint8 repeatPeriod [everyday, specific days]
# Parameter : uint8 action [start, hard shutdown, soft shutdown, hard reboot, soft reboot]
# Return : result
# -----------------------------------------------------------
def createScheduledEvent(self, eventID, scheduletype, timeOrInterval, repeat, repeatPeriod, action):

value = bytearray()
value.append(bytes(eventID))
value.append(bytes(scheduletype))
value.append(bytes(timeOrInterval))
value.append(bytes(repeat))
value.append(bytes(repeatPeriod))
value.append(bytes(action))

command.createSetCommand(command.PROTOCOL_COMMAND_SET_FAN_AUTOMATION, value, 6)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_UINT8)

result = raw[PROTOCOL_HEADER_SIZE]
return result


# -----------------------------------------------------------
# Function for removing scheduling event
# Parameter : uint8 eventID [celcius]
# Return : result
# -----------------------------------------------------------
def removeScheduledEvent(self, eventID):

command.createSetCommand(command.PROTOCOL_COMMAND_SET_FAN_AUTOMATION, eventID, 1)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_UINT8)

result = raw[PROTOCOL_HEADER_SIZE]
return result
# -----------------------------------------------------------
# Function for removing scheduling event
# Parameter : uint8 eventID [celcius]
# Return : result
# -----------------------------------------------------------
def removeAllScheduledEvents(self, eventID):

command.createCommand(command.PROTOCOL_COMMAND_REMOVE_ALL_SCHEDULED_EVENTS)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(COMMAND_SIZE_FOR_UINT8)

result = raw[PROTOCOL_HEADER_SIZE]
return result

# -----------------------------------------------------------
# Function for getting firmware ver
# Parameter : None
# Return : char[8] ver [Ex. v1.00.00]
# -----------------------------------------------------------
def getFirmwareVer(self):
command.createCommand(command.PROTOCOL_COMMAND_GET_FIRMWARE_VER)
command.sendCommand()
delay_ms(RESPONSE_DELAY)
raw = command.recieveCommand(15)

ver = str(raw[PROTOCOL_HEADER_SIZE : 8])
return ver


+ 0
- 208
pms_command.py View File

@@ -1,208 +0,0 @@
#!/usr/bin/python3

import smbus2
import time
from crc16 import CRC16
import struct


bus = smbus2.SMBus(1)
crc = CRC16()

#############################################################
### Communication Protocol ##################################
#############################################################
bufferSend = list()
bufferRecieve = list()
bufferRecieveIndex = 0

RESPONSE_DELAY = 100

START_BYTE_RECIEVED = 0xDC # Start Byte Recieved
START_BYTE_SENT = 0xCD # Start Byte Sent
PROTOCOL_HEADER_SIZE = 5
PROTOCOL_FRAME_SIZE = 7
COMMAND_SIZE_FOR_FLOAT = 11
COMMAND_SIZE_FOR_DOUBLE = 13
COMMAND_SIZE_FOR_INT16 = 9
COMMAND_SIZE_FOR_INT32 = 11
COMMAND_SIZE_FOR_UINT8 = 8
COMMAND_SIZE_FOR_INT64 = 15

COMMAND_TYPE_REQUEST = 0x01
COMMAND_TYPE_RESPONSE = 0x02

DEVICE_ADDRESS = 0x41 #7 bit address (will be left shifted to add the read write bit)

class Command:

PROTOCOL_COMMAND_GET_INPUT_TEMP = 1
PROTOCOL_COMMAND_GET_INPUT_VOLTAGE = 2
PROTOCOL_COMMAND_GET_INPUT_CURRENT = 3
PROTOCOL_COMMAND_GET_INPUT_POWER = 4
PROTOCOL_COMMAND_GET_SYSTEM_TEMP = 5
PROTOCOL_COMMAND_GET_SYSTEM_VOLTAGE = 6
PROTOCOL_COMMAND_GET_SYSTEM_CURRENT = 7
PROTOCOL_COMMAND_GET_SYSTEM_POWER = 8
PROTOCOL_COMMAND_GET_BATTERY_TEMP = 9
PROTOCOL_COMMAND_GET_BATTERY_VOLTAGE = 10
PROTOCOL_COMMAND_GET_BATTERY_CURRENT = 11
PROTOCOL_COMMAND_GET_BATTERY_POWER = 12
PROTOCOL_COMMAND_GET_BATTERY_LEVEL = 13
PROTOCOL_COMMAND_GET_BATTERY_HEALTH = 14
PROTOCOL_COMMAND_GET_FAN_SPEED = 15
PROTOCOL_COMMAND_GET_WATCHDOG_STATUS = 16
PROTOCOL_COMMAND_SET_WATCHDOG_STATUS = 17
PROTOCOL_COMMAND_SET_RGB_ANIMATION = 18
PROTOCOL_COMMAND_GET_RGB_ANIMATION = 19
PROTOCOL_COMMAND_SET_FAN_SPEED = 20
PROTOCOL_COMMAND_SET_FAN_AUTOMATION = 21
PROTOCOL_COMMAND_GET_FAN_AUTOMATION = 22
PROTOCOL_COMMAND_GET_FAN_HEALTH = 23
PROTOCOL_COMMAND_SET_BATTERY_MAX_CHARGE_LEVEL = 24
PROTOCOL_COMMAND_GET_BATTERY_MAX_CHARGE_LEVEL = 25
PROTOCOL_COMMAND_SET_SAFE_SHUTDOWN_BATTERY_LEVEL = 26
PROTOCOL_COMMAND_GET_SAFE_SHUTDOWN_BATTERY_LEVEL = 27
PROTOCOL_COMMAND_SET_SAFE_SHUTDOWN_STATUS = 28
PROTOCOL_COMMAND_GET_SAFE_SHUTDOWN_STATUS = 29
PROTOCOL_COMMAND_GET_WORKING_MODE = 30
PROTOCOL_COMMAND_GET_BUTTON1_STATUS = 31
PROTOCOL_COMMAND_GET_BUTTON2_STATUS = 32
PROTOCOL_COMMAND_SET_RTC_TIME = 33
PROTOCOL_COMMAND_GET_RTC_TIME = 34
PROTOCOL_COMMAND_HARD_POWER_OFF = 35
PROTOCOL_COMMAND_SOFT_POWER_OFF = 36
PROTOCOL_COMMAND_HARD_REBOOT = 37
PROTOCOL_COMMAND_SOFT_REBOOT = 38
PROTOCOL_COMAMND_WATCHDOG_ALARM = 39
# .
# .
# .
PROTOCOL_COMMAND_CREATE_SCHEDULED_EVENT = 100
PROTOCOL_COMMAND_REMOVE_SCHEDULED_EVENT = 101
PROTOCOL_COMMAND_REMOVE_ALL_SCHEDULED_EVENTS = 102
# .
# .
# .
PROTOCOL_COMMAND_GET_FIRMWARE_VER = 200

# Initializer function
def __init__(self):
print("Comamnd Class initialized!")


def __del__(self):
print("Command Class Destructed")

#############################################################
### I2C Protocol Functions ##################################
#############################################################
# Function for sending command
def sendCommand(self):
global bufferSend
#print("Sent Command:")
#print('[{}]'.format(', '.join(hex(x) for x in bufferSend)))
try:
bus.write_i2c_block_data(DEVICE_ADDRESS, 0x01, bufferSend)
except:
pass

# Function for checking command according to protocol
def checkCommand(self, recievedByte):
global bufferRecieve
global bufferRecieveIndex
datalen = 0

if(bufferRecieveIndex == 0 and recievedByte != START_BYTE_RECIEVED):
return -1
bufferRecieve.append(recievedByte)
bufferRecieveIndex += 1
if(bufferRecieveIndex < PROTOCOL_HEADER_SIZE):
return -1
datalen = (bufferRecieve[3] << 8) | bufferRecieve[4]
datalen = int(datalen / 2)

if(bufferRecieveIndex == (PROTOCOL_FRAME_SIZE + datalen)):
crcRecieved = (bufferRecieve[PROTOCOL_FRAME_SIZE + datalen -2] << 8) | bufferRecieve[PROTOCOL_FRAME_SIZE + datalen - 1]

#print("CRC Check ABORT!")
#print('[{}]'.format(', '.join(hex(x) for x in bufferRecieve)))
bufferRecieveIndex = 0
return bufferRecieve[0:PROTOCOL_FRAME_SIZE + datalen]


# Function for recieving command
def recieveCommand(self, lenOfResponse):
global bufferRecieve
for i in range(lenOfResponse):

c = bus.read_byte(DEVICE_ADDRESS)
#print("Recieved byte: " + str(hex(c)))
msg = self.checkCommand(c)
if(msg != None and msg != -1):
bufferRecieve.clear()
return msg


# Function for creating command according to protocol
def createCommand(self, command, command_type = COMMAND_TYPE_REQUEST):
global bufferSend
bufferSend.clear()
bufferSend.append(START_BYTE_SENT)
bufferSend.append(command)
bufferSend.append(command_type)
bufferSend.append(0x00)
bufferSend.append(0x00)
(crcHigh, crcLow) = self.calculateCRC16(bufferSend[0:PROTOCOL_HEADER_SIZE])
bufferSend.append(crcHigh)
bufferSend.append(crcLow)


# Function for creating set command according to protocol
def createSetCommand(self, command, value, lenByte, command_type = COMMAND_TYPE_REQUEST):
global bufferSend
bufferSend.clear()
bufferSend.append(START_BYTE_SENT)
bufferSend.append(command)
bufferSend.append(command_type)

lenLow = lenByte & 0xFF
lenHigh = (lenByte >> 8) & 0xFF

bufferSend.append(lenHigh)
bufferSend.append(lenLow)

byteArray = value.to_bytes(len(value),"big")
bufferSend.append(byteArray)
print(bufferSend)

(crcHigh, crcLow) = self.calculateCRC16(bufferSend[0:PROTOCOL_HEADER_SIZE+lenByte])
bufferSend.append(crcHigh)
bufferSend.append(crcLow)
print(bufferSend)


# Function for calculating CRC16
def calculateCRC16(self, command, returnType = 0):
datalen = (command[3] << 8) + (command[4] & 0xFF)
command = command[0 : PROTOCOL_HEADER_SIZE + datalen]
#print("calculateCRC Func: " + str(command))

calCRC = crc.exampleOfUseCRC16(bytes(command), PROTOCOL_HEADER_SIZE + datalen)
crcHigh = (calCRC >> 8) & 0xFF
crcLow = calCRC & 0xFF
#print("CRC16: " + str(calCRC) + "\t" + "CRC16 High: " + str(crcHigh) + "\t" + "CRC16 Low: " + str(crcLow))
if(returnType == 0):
return (crcHigh, crcLow)
else:
return calCRC

+ 3
- 3
setup.py View File

@@ -9,6 +9,6 @@ setup(
license='MIT',
url='https://github.com/sixfab/pms-python-api',
dependency_links = [],
install_requires = ['smbus2'],
packages=find_packages()
)
install_requires = ['smbus2'],
packages=find_packages()
)

Loading…
Cancel
Save