Module plugins.rfcomm_server.Sensor

Expand source code
from enum import auto
from modules.base.Configuration import *
from modules.base.Instances import *
import bluetooth

@configuration
class RfCommSensorConfiguration(SensorConfiguration):

    @validator('platform')
    def check_platform(cls, v):
        platform_name = "rfcomm_server"
        if v != platform_name:
            raise ValueError("wrong script platform: " + platform_name + ", is: " + v)
        return v   

    on_message: Optional[list[AutomationConfiguration]] = [] 
    '''List of Automations to execute when a message is received, see `modules.base.Configuration.AutomationConfiguration`'''


class RfCommSensorState(BaseState):
    '''Represents the last received payload'''

    payload: Union[str, dict] = ""
    '''Last received payload'''


class Sensor(BaseSensor, Debuggable):
    '''Allows to listen to a given MQTT topic.'''
    from plugins.rfcomm_server.Platform import Platform

    def __init__(self, parent: Platform, config: RfCommSensorConfiguration) -> None:
        super().__init__(parent, config)
        self.configuration = config
        self.state = RfCommSensorState()

        self.on_message_automations = Automation.create_automations(self, config.on_message)


    def start(self, call_stack: CallStack):
        topic = call_stack.get(self.configuration.topic)
        self.parent.subscribe(topic, self.on_message)


    def on_message(self, call_stack: CallStack):
        self.state.payload = call_stack.get("{{{payload}}}")
        self.log_debug("Got payload: "+ str(self.state.payload))

        for automation in self.on_message_automations:
            automation.invoke(call_stack)

        self.on_state_changed(call_stack)

Classes

class RfCommSensorConfiguration (**data: Any)

Base clas for all sensor configuration classes

YAML configuration

Expand source code
@configuration
class RfCommSensorConfiguration(SensorConfiguration):

    @validator('platform')
    def check_platform(cls, v):
        platform_name = "rfcomm_server"
        if v != platform_name:
            raise ValueError("wrong script platform: " + platform_name + ", is: " + v)
        return v   

    on_message: Optional[list[AutomationConfiguration]] = [] 
    '''List of Automations to execute when a message is received, see `modules.base.Configuration.AutomationConfiguration`'''

Ancestors

Class variables

var on_message : Optional[list]

List of Automations to execute when a message is received, see AutomationConfiguration

Static methods

def check_platform(v)
Expand source code
@validator('platform')
def check_platform(cls, v):
    platform_name = "rfcomm_server"
    if v != platform_name:
        raise ValueError("wrong script platform: " + platform_name + ", is: " + v)
    return v   

Inherited members

class RfCommSensorState

Represents the last received payload

Expand source code
class RfCommSensorState(BaseState):
    '''Represents the last received payload'''

    payload: Union[str, dict] = ""
    '''Last received payload'''

Ancestors

Class variables

var payload : Union[str, dict]

Last received payload

class Sensor (parent: Platform, config: RfCommSensorConfiguration)

Allows to listen to a given MQTT topic.

Expand source code
class Sensor(BaseSensor, Debuggable):
    '''Allows to listen to a given MQTT topic.'''
    from plugins.rfcomm_server.Platform import Platform

    def __init__(self, parent: Platform, config: RfCommSensorConfiguration) -> None:
        super().__init__(parent, config)
        self.configuration = config
        self.state = RfCommSensorState()

        self.on_message_automations = Automation.create_automations(self, config.on_message)


    def start(self, call_stack: CallStack):
        topic = call_stack.get(self.configuration.topic)
        self.parent.subscribe(topic, self.on_message)


    def on_message(self, call_stack: CallStack):
        self.state.payload = call_stack.get("{{{payload}}}")
        self.log_debug("Got payload: "+ str(self.state.payload))

        for automation in self.on_message_automations:
            automation.invoke(call_stack)

        self.on_state_changed(call_stack)

Ancestors

Class variables

var Platform

RFComm Server Platform

Methods

def on_message(self, call_stack: CallStack)
Expand source code
def on_message(self, call_stack: CallStack):
    self.state.payload = call_stack.get("{{{payload}}}")
    self.log_debug("Got payload: "+ str(self.state.payload))

    for automation in self.on_message_automations:
        automation.invoke(call_stack)

    self.on_state_changed(call_stack)

Inherited members