Module plugins.mqtt.Platform
Expand source code
import json
from sys import platform
import threading
from typing import Any, Optional
import paho.mqtt.client as mqtt
from pydantic.class_validators import validator
from modules.base.Configuration import *
from modules.base.Instances import *
class MqttAvailabilityConfiguration(Configuration):
'''Availability topic and last will.'''
topic: str
'''configured topic for the mqtt client's last will and we also send a message on connect'''
payload_on: str
'''payload to send when connected succsessfully'''
payload_off: str
'''payload to send when the connection dissapered (last will)'''
@configuration
class MqttPlatformConfiguration(PlatformConfiguration):
'''Configuration settings for the MQTT platform.'''
@validator('platform')
def check_platform(cls, v):
if "plugins.mqtt" not in v:
raise ValueError("wrong platform: plugins.mqtt, is: " + v)
return v
host: str
'''MQTT server address'''
port: int
'''MQTT server port'''
keep_alive: Optional[int] = 60
'''seconds to keep the server connection'''
availability: Optional[MqttAvailabilityConfiguration]
'''Availability topic and last will'''
on_connected: Optional[list[AutomationConfiguration]] = []
'''List of Automations to execute when the connection to the host is established, see `modules.base.Configuration.AutomationConfiguration`'''
on_disconnected: Optional[list[AutomationConfiguration]] = []
'''List of Automations to execute when the connection to the host is lost, see `modules.base.Configuration.AutomationConfiguration`'''
on_message: Optional[list[AutomationConfiguration]] = []
'''List of Automations to execute when a MQTT message is received, see `modules.base.Configuration.AutomationConfiguration`'''
class Platform(BasePlatform):
'''MQTT Platform'''
def __init__(self, parent: Stackable, config: MqttPlatformConfiguration) -> None:
super().__init__(parent, config)
self.app = parent.get_app()
self.configuration = config
self.initialized = False
self.callbacks = []
def publish_available(self, call_stack):
def render(var):
'''this is only to avoid typing errors'''
return str(call_stack.get(var))
if self.configuration.availability:
av = self.configuration.availability
av_topic = render(av.topic)
av_payload_on = render(av.payload_on)
av_payload_off = render(av.payload_off)
self.client.will_set(av_topic, av_payload_off)
self.client.subscribe(av_topic)
av = self.configuration.availability
self.publish(av_topic, av_payload_on, retain = True)
def connect(self, call_stack):
if not self.initialized:
print("MQTT Client not initialized, cancel 'connect'")
return
def render(var):
'''this is only to avoid typing errors'''
return str(call_stack.get(var))
self.client.connect(self.configuration.host, self.configuration.port, self.configuration.keep_alive)
self.publish_available(call_stack)
def start(self, call_stack: CallStack):
def render(var):
'''this is only to avoid typing errors'''
return str(call_stack.get(var))
app_id = "PiTomation_" + str(self.app.get_variable_value("id"))
client_name = app_id + "_" + render(self.app.get_id("device").configuration.name)
print("MQTT Client Name: " + app_id)
self.client = mqtt.Client(client_name, clean_session = True) #type: ignore
self.client.on_connect = self.__init_on_connect()
self.client.on_disconnect = self.__init_on_disconnect()
self.client.on_message = self.__init_on_message()
self.initialized = True
self.connect(call_stack)
def loop():
self.client.loop_start()
loop_thread = threading.Thread(target=loop)
loop_thread.start()
super().start(call_stack)
def dispose(self):
self.client.loop_stop()
return super().dispose()
def __init_on_message(self):
self.on_message_automations = []
if self.configuration.on_message:
for automation in self.configuration.on_message:
self.on_message_automations.append(Automation(self, automation))
def method(client, userdata, msg):
payload = msg.payload.decode("utf-8")
try:
payload = json.loads(payload)
except:
payload = str(payload)
call_stack = CallStack()\
.with_stack(self.get_full_stack()) \
.with_keys({
"payload": payload,
"topic": msg.topic
})
for callback in self.callbacks:
if callback["topic"] == msg.topic:
callback["callback"](call_stack)
elif str(callback["topic"]).endswith("+") or str(callback["topic"]).endswith("#"):
if str(msg.topic).startswith(str(callback["topic"])[0:-2]):
callback["callback"](call_stack)
for automation in self.on_message_automations:
automation.invoke(call_stack)
return method
def __init_on_disconnect(self):
self.on_disconnect_actions = []
if self.configuration.on_disconnected:
for automation in self.configuration.on_disconnected:
self.on_disconnect_actions.append(Automation(self, automation))
def method(client, userdata, flags):
print("MQTT disconnected!")
call_stack = CallStack()\
.with_stack(self.get_full_stack()) \
.with_key("flags", flags)
for automation in self.on_disconnect_actions:
automation.invoke(call_stack)
return method
def __init_on_connect(self):
self.on_connected_actions = []
if self.configuration.on_connected:
for automationConfig in self.configuration.on_connected:
self.on_connected_actions.append(Automation(self, automationConfig))
def method(client, userdata, flags, rc):
print("MQTT connection state: " + str(rc))
print(" if rc=0, the client is connected.")
#TODO:TEST if (rc is not 0):
#TODO:TEST sys.exit(0)
call_stack = CallStack()\
.with_stack(self.get_full_stack()) \
.with_key("return_code", rc)
self.publish_available(call_stack)
for callback in self.callbacks:
self.client.subscribe(callback["topic"])
for automation in self.on_connected_actions:
automation.invoke(call_stack)
return method
def subscribe(self, topic: str, callback=None):
if callback is not None:
self.callbacks.append({"topic": topic, "callback": callback})
self.client.subscribe(topic)
def publish(self, topic: str, payload: Any, retain: bool = False):
if type(payload) is dict:
payload = json.dumps(payload)
self.client.publish(topic, payload, qos = 1, retain = retain)
Classes
class MqttAvailabilityConfiguration (**data: Any)-
Availability topic and last will.
YAML configuration
Expand source code
class MqttAvailabilityConfiguration(Configuration): '''Availability topic and last will.''' topic: str '''configured topic for the mqtt client's last will and we also send a message on connect''' payload_on: str '''payload to send when connected succsessfully''' payload_off: str '''payload to send when the connection dissapered (last will)'''Ancestors
- Configuration
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var payload_off : str-
payload to send when the connection dissapered (last will)
var payload_on : str-
payload to send when connected succsessfully
var topic : str-
configured topic for the mqtt client's last will and we also send a message on connect
Inherited members
class MqttPlatformConfiguration (**data: Any)-
Configuration settings for the MQTT platform.
YAML configuration
Expand source code
@configuration class MqttPlatformConfiguration(PlatformConfiguration): '''Configuration settings for the MQTT platform.''' @validator('platform') def check_platform(cls, v): if "plugins.mqtt" not in v: raise ValueError("wrong platform: plugins.mqtt, is: " + v) return v host: str '''MQTT server address''' port: int '''MQTT server port''' keep_alive: Optional[int] = 60 '''seconds to keep the server connection''' availability: Optional[MqttAvailabilityConfiguration] '''Availability topic and last will''' on_connected: Optional[list[AutomationConfiguration]] = [] '''List of Automations to execute when the connection to the host is established, see `modules.base.Configuration.AutomationConfiguration`''' on_disconnected: Optional[list[AutomationConfiguration]] = [] '''List of Automations to execute when the connection to the host is lost, see `modules.base.Configuration.AutomationConfiguration`''' on_message: Optional[list[AutomationConfiguration]] = [] '''List of Automations to execute when a MQTT message is received, see `modules.base.Configuration.AutomationConfiguration`'''Ancestors
- PlatformConfiguration
- StackableConfiguration
- IdConfiguration
- VariablesConfiguration
- Configuration
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var availability : Optional[MqttAvailabilityConfiguration]-
Availability topic and last will
var host : str-
MQTT server address
var keep_alive : Optional[int]-
seconds to keep the server connection
var on_connected : Optional[list]-
List of Automations to execute when the connection to the host is established, see
AutomationConfiguration var on_disconnected : Optional[list]-
List of Automations to execute when the connection to the host is lost, see
AutomationConfiguration var on_message : Optional[list]-
List of Automations to execute when a MQTT message is received, see
AutomationConfiguration var port : int-
MQTT server port
Static methods
def check_platform(v)-
Expand source code
@validator('platform') def check_platform(cls, v): if "plugins.mqtt" not in v: raise ValueError("wrong platform: plugins.mqtt, is: " + v) return v
Inherited members
class Platform (parent: Stackable, config: MqttPlatformConfiguration)-
MQTT Platform
Expand source code
class Platform(BasePlatform): '''MQTT Platform''' def __init__(self, parent: Stackable, config: MqttPlatformConfiguration) -> None: super().__init__(parent, config) self.app = parent.get_app() self.configuration = config self.initialized = False self.callbacks = [] def publish_available(self, call_stack): def render(var): '''this is only to avoid typing errors''' return str(call_stack.get(var)) if self.configuration.availability: av = self.configuration.availability av_topic = render(av.topic) av_payload_on = render(av.payload_on) av_payload_off = render(av.payload_off) self.client.will_set(av_topic, av_payload_off) self.client.subscribe(av_topic) av = self.configuration.availability self.publish(av_topic, av_payload_on, retain = True) def connect(self, call_stack): if not self.initialized: print("MQTT Client not initialized, cancel 'connect'") return def render(var): '''this is only to avoid typing errors''' return str(call_stack.get(var)) self.client.connect(self.configuration.host, self.configuration.port, self.configuration.keep_alive) self.publish_available(call_stack) def start(self, call_stack: CallStack): def render(var): '''this is only to avoid typing errors''' return str(call_stack.get(var)) app_id = "PiTomation_" + str(self.app.get_variable_value("id")) client_name = app_id + "_" + render(self.app.get_id("device").configuration.name) print("MQTT Client Name: " + app_id) self.client = mqtt.Client(client_name, clean_session = True) #type: ignore self.client.on_connect = self.__init_on_connect() self.client.on_disconnect = self.__init_on_disconnect() self.client.on_message = self.__init_on_message() self.initialized = True self.connect(call_stack) def loop(): self.client.loop_start() loop_thread = threading.Thread(target=loop) loop_thread.start() super().start(call_stack) def dispose(self): self.client.loop_stop() return super().dispose() def __init_on_message(self): self.on_message_automations = [] if self.configuration.on_message: for automation in self.configuration.on_message: self.on_message_automations.append(Automation(self, automation)) def method(client, userdata, msg): payload = msg.payload.decode("utf-8") try: payload = json.loads(payload) except: payload = str(payload) call_stack = CallStack()\ .with_stack(self.get_full_stack()) \ .with_keys({ "payload": payload, "topic": msg.topic }) for callback in self.callbacks: if callback["topic"] == msg.topic: callback["callback"](call_stack) elif str(callback["topic"]).endswith("+") or str(callback["topic"]).endswith("#"): if str(msg.topic).startswith(str(callback["topic"])[0:-2]): callback["callback"](call_stack) for automation in self.on_message_automations: automation.invoke(call_stack) return method def __init_on_disconnect(self): self.on_disconnect_actions = [] if self.configuration.on_disconnected: for automation in self.configuration.on_disconnected: self.on_disconnect_actions.append(Automation(self, automation)) def method(client, userdata, flags): print("MQTT disconnected!") call_stack = CallStack()\ .with_stack(self.get_full_stack()) \ .with_key("flags", flags) for automation in self.on_disconnect_actions: automation.invoke(call_stack) return method def __init_on_connect(self): self.on_connected_actions = [] if self.configuration.on_connected: for automationConfig in self.configuration.on_connected: self.on_connected_actions.append(Automation(self, automationConfig)) def method(client, userdata, flags, rc): print("MQTT connection state: " + str(rc)) print(" if rc=0, the client is connected.") #TODO:TEST if (rc is not 0): #TODO:TEST sys.exit(0) call_stack = CallStack()\ .with_stack(self.get_full_stack()) \ .with_key("return_code", rc) self.publish_available(call_stack) for callback in self.callbacks: self.client.subscribe(callback["topic"]) for automation in self.on_connected_actions: automation.invoke(call_stack) return method def subscribe(self, topic: str, callback=None): if callback is not None: self.callbacks.append({"topic": topic, "callback": callback}) self.client.subscribe(topic) def publish(self, topic: str, payload: Any, retain: bool = False): if type(payload) is dict: payload = json.dumps(payload) self.client.publish(topic, payload, qos = 1, retain = retain)Ancestors
Methods
def connect(self, call_stack)-
Expand source code
def connect(self, call_stack): if not self.initialized: print("MQTT Client not initialized, cancel 'connect'") return def render(var): '''this is only to avoid typing errors''' return str(call_stack.get(var)) self.client.connect(self.configuration.host, self.configuration.port, self.configuration.keep_alive) self.publish_available(call_stack) def dispose(self)-
Expand source code
def dispose(self): self.client.loop_stop() return super().dispose() def publish(self, topic: str, payload: Any, retain: bool = False)-
Expand source code
def publish(self, topic: str, payload: Any, retain: bool = False): if type(payload) is dict: payload = json.dumps(payload) self.client.publish(topic, payload, qos = 1, retain = retain) def publish_available(self, call_stack)-
Expand source code
def publish_available(self, call_stack): def render(var): '''this is only to avoid typing errors''' return str(call_stack.get(var)) if self.configuration.availability: av = self.configuration.availability av_topic = render(av.topic) av_payload_on = render(av.payload_on) av_payload_off = render(av.payload_off) self.client.will_set(av_topic, av_payload_off) self.client.subscribe(av_topic) av = self.configuration.availability self.publish(av_topic, av_payload_on, retain = True) def subscribe(self, topic: str, callback=None)-
Expand source code
def subscribe(self, topic: str, callback=None): if callback is not None: self.callbacks.append({"topic": topic, "callback": callback}) self.client.subscribe(topic)
Inherited members