Home > Blockchain >  Issues publishing to device shadow using the aws-iot-device-sdk-python-v2
Issues publishing to device shadow using the aws-iot-device-sdk-python-v2

Time:10-25

In a python application that uses the aws iot device sdk for python v2 (v1.7.1) I am running into an issue where I cannot update the device shadow.

After starting the program, the DeviceShadowManager will attempt to get the latest shadow state and set it locally. If a delta state is present the DeviceShadowManager will merge the last reported state and delta state and publish it. That works. However, when the manager subscribes for updates, after the initial setup, I am running into an error, where when the desired state changes, the manager cannot update the reported state. Here is the error:

Exception ignored in: <class 'TypeError'>
Traceback (most recent call last):
  File "/Users/tom/.../lib/python3.9/site-packages/awscrt/mqtt.py", line 506, in callback_wrapper
    callback(topic=topic, payload=payload)
TypeError: callback_wrapper() missing 3 required positional arguments: 'dup', 'qos', and 'retain'

I looked at the source, but just do not understand why a TypeError is raised, especially because this exact scenario seems to be handled by the try and except block or am I getting it all wrong?

The source of the error:

 if callback:
    def callback_wrapper(topic, payload, dup, qos, retain):
        try:
            callback(topic=topic, payload=payload, dup=dup, qos=QoS(qos), retain=retain)
        except TypeError:
            # This callback used to have fewer args.
            # Try again, passing only those those args, to cover case where
            # user function failed to take forward-compatibility **kwargs.
            callback(topic=topic, payload=payload) # this is line 506

Below you can find my code and the log of the program.

This dataclass represents the shadow:

from dataclasses import dataclass

@dataclass
class DeviceShadow:
    score_threshold: float = 0.6
    minimum_distance: int = 150

The shadow is managed by the DeviceShadowManager. Most of this is based on the shadow sample from the aforementioned repository.

from dataclasses import asdict
from queue import Queue
from threading import Lock

from awscrt import mqtt
from awsiot import iotshadow
from awsiot.iotshadow import IotShadowClient

from app.device_shadow.device_shadow import DeviceShadow, from_json as device_shadow_from_json
from app.models import log

SHADOW_VALUE_DEFAULT = DeviceShadow()


class DeviceShadowManager:
    _shadow_client: IotShadowClient
    shadow_value: DeviceShadow = DeviceShadow()

    _lock = Lock()
    _thing_name: str

    def __init__(self, thing_name: str, mqtt_connection: mqtt.Connection):
        self._thing_name = thing_name
        self._shadow_client = iotshadow.IotShadowClient(mqtt_connection)

        update_accepted_subscribed_future, _ = self._shadow_client.subscribe_to_update_shadow_accepted(
            request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=self._thing_name),
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=self.on_update_shadow_accepted  # omitted
        )

        update_rejected_subscribed_future, _ = self._shadow_client.subscribe_to_update_shadow_rejected(
            request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=self._thing_name),
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=self.on_update_shadow_rejected  # omitted
        )

        # Wait for subscriptions to succeed
        update_accepted_subscribed_future.result(60)
        update_rejected_subscribed_future.result(60)

        log.info("Subscribing to Get responses...")
        get_accepted_subscribed_future, _ = self._shadow_client.subscribe_to_get_shadow_accepted(
            request=iotshadow.GetShadowSubscriptionRequest(thing_name=self._thing_name),
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=self.on_get_shadow_accepted)

        get_rejected_subscribed_future, _ = self._shadow_client.subscribe_to_get_shadow_rejected(
            request=iotshadow.GetShadowSubscriptionRequest(thing_name=self._thing_name),
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=self.on_get_shadow_rejected)  # omitted

        # Wait for subscriptions to succeed
        get_accepted_subscribed_future.result()
        get_rejected_subscribed_future.result()

        log.info("Subscribing to Delta events...")
        delta_subscribed_future, _ = self._shadow_client.subscribe_to_shadow_delta_updated_events(
            request=iotshadow.ShadowDeltaUpdatedSubscriptionRequest(
                thing_name=self._thing_name
            ),
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=self.on_shadow_delta_updated)

        # Wait for subscription to succeed
        delta_subscribed_future.result()

        # From here on out the rest runs asynchronously.

        # Issue request for shadow's current value.
        # The response will be received by the on_get_accepted() callback
        with self._lock:
            publish_get_future = self._shadow_client.publish_get_shadow(
                request=iotshadow.GetShadowRequest(
                    thing_name=self._thing_name,
                ),
                qos=mqtt.QoS.AT_LEAST_ONCE
            )

        # Ensure that publish succeeds
        publish_get_future.result()

    def on_get_shadow_accepted(self, response: iotshadow.GetShadowResponse) -> None:
        log.info("Finished getting initial shadow value.")

        if response.state and response.state.delta:
            if not response.state.reported:
                response.state.reported = {}
            merged_state = self.merge_states(response.state.delta, response.state.desired)
            return self.set_desired(device_shadow_from_json(merged_state))

        if response.state and response.state.reported:
            return self.set_local(device_shadow_from_json(response.state.reported))

        self.set_desired(SHADOW_VALUE_DEFAULT)
        return

    def on_shadow_delta_updated(self, delta: iotshadow.ShadowDeltaUpdatedEvent) -> None:
        if delta.state:
            if delta.state is None:
                log.info("Delta reports that nothing is set. Setting defaults...")
                self.set_desired(SHADOW_VALUE_DEFAULT)
                return

            log.info("Delta reports that desired shadow is '{}'. Changing local shadow...".format(delta.state))
            self.set_desired(self.merge_states(delta.state, self.shadow_value))

        else:
            log.info("Delta did not report a change")

    @staticmethod
    def merge_states(delta: dict, reported: DeviceShadow):
        for key, value in delta.items():
            reported[key] = value

        return reported

    def set_local(self, value: DeviceShadow) -> None:
        with self._lock:
            self.shadow_value = value

    def set_desired(self, new_value: DeviceShadow) -> None:
        with self._lock:
            if self.shadow_value == new_value:
                log.debug("Local shadow is already '{}'.".format(new_value))
                return

            log.debug("Changing local shadow to '{}'.".format(new_value))
            self.shadow_value = new_value

            log.debug("Updating reported shadow  to '{}'...".format(new_value))
            request = iotshadow.UpdateShadowRequest(
                thing_name=self._thing_name,
                state=iotshadow.ShadowState(
                    desired=asdict(new_value),
                    reported=asdict(new_value),
                ),
            )

            self._shadow_client.publish_update_shadow(request, mqtt.QoS.AT_LEAST_ONCE)

Below you will find the log:

DEBUG:app.mqtt:Connecting to xxxxxxxxxxxxxx-ats.iot.eu-central-1.amazonaws.com with client ID '80d8bc54-971e-0e65-a537-37d14a3cb630'...
INFO:app.models:Subscribing to Get responses...
INFO:app.models:Subscribing to Delta events...
INFO:app.models:Finished getting initial shadow value.
DEBUG:app.models:Changed local shadow to 'DeviceShadow(score_threshold=0.7, minimum_distance=1503)'.
DEBUG:app.models:Updating reported shadow  to 'DeviceShadow(score_threshold=0.7, minimum_distance=1503)'...
INFO:app.models:Update request published.
DEBUG:app.models:Finished updating reported shadow to '{'score_threshold': 0.7, 'minimum_distance': 1503}'.
INFO:app.models:Delta reports that desired shadow is '{'minimum_distance': 15035}'. Changing local shadow...
Exception ignored in: <class 'TypeError'>
Traceback (most recent call last):
  File "/Users/tom/.../lib/python3.9/site-packages/awscrt/mqtt.py", line 506, in callback_wrapper
    callback(topic=topic, payload=payload)
TypeError: callback_wrapper() missing 3 required positional arguments: 'dup', 'qos', and 'retain'
DEBUG:app.models:Finished updating reported shadow to '{'score_threshold': 0.7, 'minimum_distance': 1503}'.

As you can see the stacktrace is pretty short, is there a way to debug this better? Any ideas to why it is giving me this particular error and maybe how to solve it? All help is appreciated!

CodePudding user response:

I am pretty sure the problem lies within

@staticmethod
def merge_states(delta: dict, reported: DeviceShadow):
   for key, value in delta.items():
      reported[key] = value
   return reported

where the __setitem__ call on the reported argument raises a TypeError because the reported argument is a DeviceShadow dataclass object that doesn't support item assignment.

If you want to set fields of a dataclass where you have a string of the field name, you can use setattr(reported, key, value).

  • Related