User implemented functions

In order to enable some functionalities like actuators, configurations and firmware update, certain functions or classes must be implemented.

This page will explain the mechanisms behind each of these functionalities.

Device status provider

In order to know what devices are currently available to receive commands, the Platform and gateway need to be notified of all of the modules’ device’s current status. Whether they are connected, offline, in sleep mode or in service mode. Once the connection to the gateway is terminated, the module will automatically publish offline states for all devices that have been added to it.

Note: This function is required in order to create a Wolk object.

device_status_provider.get_device_status()

Get current device status.

Parameters:device_key (str) – Device identifier
Returns:status
Return type:DeviceStatus

Available device states:

class wolk_gateway_module.model.device_status.DeviceStatus[source]

Enumeration of available device statuses.

Variables:
  • CONNECTED (str) – Device currently connected
  • OFFLINE (str) – Device currently offline
  • SERVICE_MODE (str) – Device currently in service mode
  • SLEEP (str) – Device currently in sleep mode

An stub implementation would look something like this:

def get_device_status(device_key):
    if device_key == "DEVICE_KEY":
        # Handle getting current device status here
        return wolk_gateway_module.DeviceStatus.CONNECTED  # OFFLINE, SLEEP, SERVICE_MODE

Actuator functions

Actuators enable remote control over a device peripheral that can change between predefined states, like turning a switch on or off, or setting a light dimmer to 20% intensity.

In order to enable remote control, the Platform first needs to be notified about the actuators current state - is it ready to receive a command, is it busy changing its position or perhaps something has gone wrong with the actuator and it is unable to perform at that moment. This information about the actuator’s current state and value is obtained through an actuator status provider function.

actuator_status_provider.get_actuator_status(reference: str) → Tuple[wolk_gateway_module.model.actuator_state.ActuatorState, Union[bool, int, float, str]]

Get current actuator status identified by device key and reference.

Reads the status of actuator from the device and returns it as a tuple containing the actuator state and current value.

Must be implemented as non blocking. Must be implemented as thread safe.

Parameters:
  • device_key (str) – Device key to which the actuator belongs to
  • reference (str) – Actuator reference
Returns:

(state, value)

Return type:

(ActuatorState, bool or int or float or str)

Available actuator states:

class wolk_gateway_module.model.actuator_state.ActuatorState[source]

Enumeration of available actuator states.

Variables:
  • BUSY (str) – Actuator currently in busy state
  • ERROR (str) – Actuator currently in error state
  • READY (str) – Actuator currently in ready state

A stub implementation would look something like this:

def get_actuator_status(device_key, reference):
    if device_key == "DEVICE_KEY":
        if reference == "SW":
            # Handle getting current actuator value here
            return wolk_gateway_module.ActuatorState.READY, switch.value  # BUSY, ERROR

Now that the Platform and gateway are able to get information about the actuator’s current value and state, it should also be able to send commands to the actuator. This is achieved through another function called actuation handler.

actuation_handler.handle_actuation(reference: str, value: Union[bool, int, float, str]) → None

Set device actuator identified by reference to value.

Must be implemented as non blocking. Must be implemented as thread safe.

Parameters:
  • device_key (str) – Device identifier
  • reference (str) – Reference of the actuator
  • value (str) – Value to which to set the actuator

A stub implementation would look something like this:

def handle_actuation(device_key, reference, value):
    if device_key == "DEVICE_KEY":
        if reference == "SW":
            # Handle setting the actuator value here
            switch.value = value

Finally, these two functions are passed as arguments to the Wolk class as actuation_handler and actuator_status_provider

wolk_module = wolk.Wolk(
    host=configuration["host"],
    port=configuration["port"],
    module_name=configuration["module_name"],
    device_status_provider=get_device_status,
    actuation_handler=handle_actuation,
    actuator_status_provider=get_actuator_status,
)

Once Wolk.connect() has been called, it will call actuator_status_provider to get the current actuator status for each actuator of all added device. However, publishing actuator statuses can be done explicitly by calling:

wolk_module.publish_actuator_status("DEVICE_KEY" ,"ACTUATOR_REFERENCE")

Configuration option functions

Configuration options enable modification of device properties from WolkAbout IoT Platform with the goal to change device behavior, eg. measurement heartbeat, enabling/disabling device interfaces, increase/decrease device logging level etc.

Configuration options require a similar way of handling messages as actuators. When a configuration command is issued from WolkAbout IoT Platform, it will be passed to a configuration_handler that will attempt to execute the command. Then the configuration_provider will report back to WolkAbout IoT Platform with the current values of the device’s configuration options.

Configuration options are always sent as a whole, even when only one value changes. They are sent as a dictionary, where the key represents the configuration’s reference and the value is the current value.

configuration_provider.get_configuration() → Dict[str, Union[int, float, bool, str, Tuple[int, int], Tuple[int, int, int], Tuple[float, float], Tuple[float, float, float], Tuple[str, str], Tuple[str, str, str]]]

Get current configuration options.

Reads device configuration and returns it as a dictionary with device configuration reference as key, and device configuration value as value. Must be implemented as non blocking. Must be implemented as thread safe.

Parameters:device_key (str) – Device identifier
Returns:configuration
Return type:dict

Stub implementation:

def get_configuration(device_key):
    if device_key == "DEVICE_KEY":
        # Handle getting configuration values here
        return {
            "configuration_1": configuration_1.value,
            "configuration_2": configuration_2.value,
        }

After implementing how to get current configuration option values, another function for setting new values is required

configuration_handler.handle_configuration(configuration: Dict[str, Union[int, float, bool, str, Tuple[int, int], Tuple[int, int, int], Tuple[float, float], Tuple[float, float, float], Tuple[str, str], Tuple[str, str, str]]]) → None

Change device’s configuration options.

Must be implemented as non blocking. Must be implemented as thread safe.

Parameters:
  • device_key (str) – Device identifier
  • configuration (dict) – Configuration option reference:value pairs
def handle_configuration(device_key,configuration):
    if device_key == "DEVICE_KEY":
        # Handle setting configuration values here
        for reference, value in configuration.items():
            if reference == "configuration_1":
                configuration_1.value = value
            elif reference == "configuration_2":
                configuration_2.value = value

Finally, these two functions are passed as arguments to the Wolk class as configuration_handler and configuration_provider

wolk_module = wolk.Wolk(
    host=configuration["host"],
    port=configuration["port"],
    module_name=configuration["module_name"],
    device_status_provider=get_device_status,
    configuration_handler=handle_configuration,
    configuration_provider=get_configuration,
)

Once Wolk.connect() has been called, it will call configuration_provider to get the current configuration options for each added device with configurations. However, publishing configurations can be done explicitly by calling:

wolk_module.publish_configuration("DEVICE_KEY")

Enabling firmware update

WolkAbout IoT Platform has the option of updating device software/firmware. In order to enable this functionality on a device, the user has to implement the FirmwareHandler abstract base class and pass it to Wolk.

class wolk_gateway_module.interface.firmware_handler.FirmwareHandler[source]

Handle firmware installation and abort commands, and report version.

Once an object of this class is passed to a Wolk object, it will set callback methods on_install_success and on_install_fail used for reporting the result of the firmware update process. Use these callbacks in install_firmware and abort_installation methods.

Variables:
  • on_install_fail (Callable[[str, FirmwareUpdateStatus], None]) – Installation failure callback method
  • on_install_success (Callable[[str], None]) – Installation successful callback method
abort_installation(device_key: str) → None[source]

Attempt to abort the firmware installation process for device.

Call self.on_install_fail(device_key, status) to report if the installation process was able to be aborted with status = FirmwareUpdateStatus(FirmwareUpdateState.ABORTED). If unable to stop the installation process, no action is required.

Parameters:device_key (str) – Device for which to abort installation
get_firmware_version(device_key: str) → str[source]

Return device’s current firmware version.

Parameters:device_key (str) – Device identifier
Returns:version
Return type:str
install_firmware(device_key: str, firmware_file_path: str) → None[source]

Handle the installation of the firmware file.

Call self.on_install_success(device_key) to report success. Reporting success will also get new firmware version.

If installation fails, call self.on_install_fail(device_key, status) where:

status = FirmwareUpdateStatus(
    FirmwareUpdateState.ERROR,
    FirmwareUpdateErrorCode.INSTALLATION_FAILED
)

or use other values from FirmwareUpdateErrorCode if they fit better.

Parameters:
  • device_key (str) – Device for which the firmware command is intended
  • firmware_file_path (str) – Path where the firmware file is located

The enumerations used to report current firmware update states are listed below:

Firmware update status model.

class wolk_gateway_module.model.firmware_update_status.FirmwareUpdateErrorCode[source]

Enumeration of possible firmware update errors.

Variables:
  • DEVICE_NOT_PRESENT (int) – Unable to pass firmware install command to device
  • FILE_NOT_PRESENT (int) – Firmware file was not present at specified location
  • FILE_SYSTEM_ERROR (int) – File system error occurred
  • INSTALLATION_FAILED (int) – Firmware installation failed
  • UNSPECIFIED_ERROR (int) – Unspecified error occurred
class wolk_gateway_module.model.firmware_update_status.FirmwareUpdateState[source]

Enumeration of available firmware update states.

Variables:
  • ABORTED (str) – Firmware installation aborted
  • COMPLETED (str) – Firmware installation completed
  • ERROR (str) – Firmware installation error
  • INSTALLATION (str) – Firmware installation in progress
class wolk_gateway_module.model.firmware_update_status.FirmwareUpdateStatus(status: wolk_gateway_module.model.firmware_update_status.FirmwareUpdateState, error_code: wolk_gateway_module.model.firmware_update_status.FirmwareUpdateErrorCode = None)[source]

Holds information about current firmware update status.

Variables:
class FirmwareHandlerImplementation(wolk_gateway_module.FirmwareHandler):

    def install_firmware(self, device_key, firmware_file_path):
        if device_key == "DEVICE_KEY":
            print(
                f"Installing firmware: '{firmware_file_path}' "
                f"on device '{device_key}'"
            )
            # Handle the actual installation here

            if True:
                # If installation was successful
                self.on_install_success(device_key)
            else:
                # If installation failed
                status = wolk_gateway_module.FirmwareUpdateStatus(
                    wolk_gateway_module.FirmwareUpdateState.ERROR,
                    wolk_gateway_module.FirmwareUpdateErrorCode.INSTALLATION_FAILED,
                )
                self.on_install_fail(device_key, status)

    def abort_installation(self, device_key):
        if device_key == "DEVICE_KEY":
            # Manage to stop firmware installation
            status = wolk_gateway_module.FirmwareUpdateStatus(
                wolk_gateway_module.FirmwareUpdateState.ABORTED
            )
            self.on_install_fail(device_key, status)

    def get_firmware_version(self, device_key):
        if device_key == "DEVICE_KEY":
            return device.firmware_version

An object of this class needs to be passed to Wolk like so:

wolk_module = wolk.Wolk(
    host=configuration["host"],
    port=configuration["port"],
    module_name=configuration["module_name"],
    device_status_provider=get_device_status,
    firmware_handler=FirmwareHandlerImplementation(),
)

When Wolk.connect() is called it will use firmware_handler.get_firmware_version() for each added device that has support for firmware update and report to WolkAbout IoT Platform.