Module plugins.mqtt.Sensor
Expand source code
from enum import auto
from modules.base.Configuration import *
from modules.base.Instances import *
@configuration
class MqttSensorConfiguration(SensorConfiguration):
'''Topic to listen to'''
topic: str
@validator('platform')
def check_platform(cls, v):
platform_name = "mqtt"
if v != platform_name:
raise ValueError("wrong script platform: " + platform_name + ", is: " + v)
return v
on_message: Optional[list[AutomationConfiguration]] = []
'''List of Automations to execute when a message is received, see `modules.base.Configuration.AutomationConfiguration`'''
class MqttSensorState(BaseState):
'''Represents the state of the GPIO pin'''
topic: str = ""
'''Last received topic'''
payload: Union[str, dict] = ""
'''Last received payload'''
class Sensor(BaseSensor, Debuggable):
'''Allows to listen to a given MQTT topic.'''
from plugins.mqtt.Platform import Platform
def __init__(self, parent: Platform, config: MqttSensorConfiguration) -> None:
super().__init__(parent, config)
self.configuration = config
self.state = MqttSensorState()
self.on_message_automations = Automation.create_automations(self, config.on_message)
def start(self, call_stack: CallStack):
topic = call_stack.get(self.configuration.topic)
self.parent.subscribe(topic, self.on_message)
def on_message(self, call_stack: CallStack):
self.state.topic = str(call_stack.get("{{topic}}"))
self.state.payload = call_stack.get("{{{payload}}}")
self.log_debug("Got message from " + self.state.topic + ", with payload: "+ str(self.state.payload))
for automation in self.on_message_automations:
automation.invoke(call_stack)
self.on_state_changed(call_stack)
Classes
class MqttSensorConfiguration (**data: Any)-
Topic to listen to
YAML configuration
Expand source code
@configuration class MqttSensorConfiguration(SensorConfiguration): '''Topic to listen to''' topic: str @validator('platform') def check_platform(cls, v): platform_name = "mqtt" if v != platform_name: raise ValueError("wrong script platform: " + platform_name + ", is: " + v) return v on_message: Optional[list[AutomationConfiguration]] = [] '''List of Automations to execute when a message is received, see `modules.base.Configuration.AutomationConfiguration`'''Ancestors
- SensorConfiguration
- ScriptConfiguration
- StackableConfiguration
- IdConfiguration
- VariablesConfiguration
- Configuration
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var on_message : Optional[list]-
List of Automations to execute when a message is received, see
AutomationConfiguration var topic : str
Static methods
def check_platform(v)-
Expand source code
@validator('platform') def check_platform(cls, v): platform_name = "mqtt" if v != platform_name: raise ValueError("wrong script platform: " + platform_name + ", is: " + v) return v
Inherited members
class MqttSensorState-
Represents the state of the GPIO pin
Expand source code
class MqttSensorState(BaseState): '''Represents the state of the GPIO pin''' topic: str = "" '''Last received topic''' payload: Union[str, dict] = "" '''Last received payload'''Ancestors
Class variables
var payload : Union[str, dict]-
Last received payload
var topic : str-
Last received topic
class Sensor (parent: Platform, config: MqttSensorConfiguration)-
Allows to listen to a given MQTT topic.
Expand source code
class Sensor(BaseSensor, Debuggable): '''Allows to listen to a given MQTT topic.''' from plugins.mqtt.Platform import Platform def __init__(self, parent: Platform, config: MqttSensorConfiguration) -> None: super().__init__(parent, config) self.configuration = config self.state = MqttSensorState() self.on_message_automations = Automation.create_automations(self, config.on_message) def start(self, call_stack: CallStack): topic = call_stack.get(self.configuration.topic) self.parent.subscribe(topic, self.on_message) def on_message(self, call_stack: CallStack): self.state.topic = str(call_stack.get("{{topic}}")) self.state.payload = call_stack.get("{{{payload}}}") self.log_debug("Got message from " + self.state.topic + ", with payload: "+ str(self.state.payload)) for automation in self.on_message_automations: automation.invoke(call_stack) self.on_state_changed(call_stack)Ancestors
Class variables
var Platform-
MQTT Platform
Methods
def on_message(self, call_stack: CallStack)-
Expand source code
def on_message(self, call_stack: CallStack): self.state.topic = str(call_stack.get("{{topic}}")) self.state.payload = call_stack.get("{{{payload}}}") self.log_debug("Got message from " + self.state.topic + ", with payload: "+ str(self.state.payload)) for automation in self.on_message_automations: automation.invoke(call_stack) self.on_state_changed(call_stack)
Inherited members