In a python application that uses the aws iot device sdk for python v2 (v1.7.1) I am running into an issue where I cannot update the device shadow.
After starting the program, the DeviceShadowManager
will attempt to get the latest shadow state and set it locally.
If a delta
state is present the DeviceShadowManager
will merge the last reported
state and delta
state and publish it.
That works. However, when the manager subscribes for updates, after the initial setup, I am running into an error,
where when the desired
state changes, the manager cannot update the reported
state. Here is the error:
Exception ignored in: <class 'TypeError'>
Traceback (most recent call last):
File "/Users/tom/.../lib/python3.9/site-packages/awscrt/mqtt.py", line 506, in callback_wrapper
callback(topic=topic, payload=payload)
TypeError: callback_wrapper() missing 3 required positional arguments: 'dup', 'qos', and 'retain'
I looked at the source, but just do not understand why a TypeError
is raised,
especially because this exact scenario seems to be handled by the try
and except
block or am I getting it all wrong?
The source of the error:
if callback:
def callback_wrapper(topic, payload, dup, qos, retain):
try:
callback(topic=topic, payload=payload, dup=dup, qos=QoS(qos), retain=retain)
except TypeError:
# This callback used to have fewer args.
# Try again, passing only those those args, to cover case where
# user function failed to take forward-compatibility **kwargs.
callback(topic=topic, payload=payload) # this is line 506
Below you can find my code and the log of the program.
This dataclass represents the shadow:
from dataclasses import dataclass
@dataclass
class DeviceShadow:
score_threshold: float = 0.6
minimum_distance: int = 150
The shadow is managed by the DeviceShadowManager
. Most of this is based on the shadow sample from the aforementioned repository.
from dataclasses import asdict
from queue import Queue
from threading import Lock
from awscrt import mqtt
from awsiot import iotshadow
from awsiot.iotshadow import IotShadowClient
from app.device_shadow.device_shadow import DeviceShadow, from_json as device_shadow_from_json
from app.models import log
SHADOW_VALUE_DEFAULT = DeviceShadow()
class DeviceShadowManager:
_shadow_client: IotShadowClient
shadow_value: DeviceShadow = DeviceShadow()
_lock = Lock()
_thing_name: str
def __init__(self, thing_name: str, mqtt_connection: mqtt.Connection):
self._thing_name = thing_name
self._shadow_client = iotshadow.IotShadowClient(mqtt_connection)
update_accepted_subscribed_future, _ = self._shadow_client.subscribe_to_update_shadow_accepted(
request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=self._thing_name),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=self.on_update_shadow_accepted # omitted
)
update_rejected_subscribed_future, _ = self._shadow_client.subscribe_to_update_shadow_rejected(
request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=self._thing_name),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=self.on_update_shadow_rejected # omitted
)
# Wait for subscriptions to succeed
update_accepted_subscribed_future.result(60)
update_rejected_subscribed_future.result(60)
log.info("Subscribing to Get responses...")
get_accepted_subscribed_future, _ = self._shadow_client.subscribe_to_get_shadow_accepted(
request=iotshadow.GetShadowSubscriptionRequest(thing_name=self._thing_name),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=self.on_get_shadow_accepted)
get_rejected_subscribed_future, _ = self._shadow_client.subscribe_to_get_shadow_rejected(
request=iotshadow.GetShadowSubscriptionRequest(thing_name=self._thing_name),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=self.on_get_shadow_rejected) # omitted
# Wait for subscriptions to succeed
get_accepted_subscribed_future.result()
get_rejected_subscribed_future.result()
log.info("Subscribing to Delta events...")
delta_subscribed_future, _ = self._shadow_client.subscribe_to_shadow_delta_updated_events(
request=iotshadow.ShadowDeltaUpdatedSubscriptionRequest(
thing_name=self._thing_name
),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=self.on_shadow_delta_updated)
# Wait for subscription to succeed
delta_subscribed_future.result()
# From here on out the rest runs asynchronously.
# Issue request for shadow's current value.
# The response will be received by the on_get_accepted() callback
with self._lock:
publish_get_future = self._shadow_client.publish_get_shadow(
request=iotshadow.GetShadowRequest(
thing_name=self._thing_name,
),
qos=mqtt.QoS.AT_LEAST_ONCE
)
# Ensure that publish succeeds
publish_get_future.result()
def on_get_shadow_accepted(self, response: iotshadow.GetShadowResponse) -> None:
log.info("Finished getting initial shadow value.")
if response.state and response.state.delta:
if not response.state.reported:
response.state.reported = {}
merged_state = self.merge_states(response.state.delta, response.state.desired)
return self.set_desired(device_shadow_from_json(merged_state))
if response.state and response.state.reported:
return self.set_local(device_shadow_from_json(response.state.reported))
self.set_desired(SHADOW_VALUE_DEFAULT)
return
def on_shadow_delta_updated(self, delta: iotshadow.ShadowDeltaUpdatedEvent) -> None:
if delta.state:
if delta.state is None:
log.info("Delta reports that nothing is set. Setting defaults...")
self.set_desired(SHADOW_VALUE_DEFAULT)
return
log.info("Delta reports that desired shadow is '{}'. Changing local shadow...".format(delta.state))
self.set_desired(self.merge_states(delta.state, self.shadow_value))
else:
log.info("Delta did not report a change")
@staticmethod
def merge_states(delta: dict, reported: DeviceShadow):
for key, value in delta.items():
reported[key] = value
return reported
def set_local(self, value: DeviceShadow) -> None:
with self._lock:
self.shadow_value = value
def set_desired(self, new_value: DeviceShadow) -> None:
with self._lock:
if self.shadow_value == new_value:
log.debug("Local shadow is already '{}'.".format(new_value))
return
log.debug("Changing local shadow to '{}'.".format(new_value))
self.shadow_value = new_value
log.debug("Updating reported shadow to '{}'...".format(new_value))
request = iotshadow.UpdateShadowRequest(
thing_name=self._thing_name,
state=iotshadow.ShadowState(
desired=asdict(new_value),
reported=asdict(new_value),
),
)
self._shadow_client.publish_update_shadow(request, mqtt.QoS.AT_LEAST_ONCE)
Below you will find the log:
DEBUG:app.mqtt:Connecting to xxxxxxxxxxxxxx-ats.iot.eu-central-1.amazonaws.com with client ID '80d8bc54-971e-0e65-a537-37d14a3cb630'...
INFO:app.models:Subscribing to Get responses...
INFO:app.models:Subscribing to Delta events...
INFO:app.models:Finished getting initial shadow value.
DEBUG:app.models:Changed local shadow to 'DeviceShadow(score_threshold=0.7, minimum_distance=1503)'.
DEBUG:app.models:Updating reported shadow to 'DeviceShadow(score_threshold=0.7, minimum_distance=1503)'...
INFO:app.models:Update request published.
DEBUG:app.models:Finished updating reported shadow to '{'score_threshold': 0.7, 'minimum_distance': 1503}'.
INFO:app.models:Delta reports that desired shadow is '{'minimum_distance': 15035}'. Changing local shadow...
Exception ignored in: <class 'TypeError'>
Traceback (most recent call last):
File "/Users/tom/.../lib/python3.9/site-packages/awscrt/mqtt.py", line 506, in callback_wrapper
callback(topic=topic, payload=payload)
TypeError: callback_wrapper() missing 3 required positional arguments: 'dup', 'qos', and 'retain'
DEBUG:app.models:Finished updating reported shadow to '{'score_threshold': 0.7, 'minimum_distance': 1503}'.
As you can see the stacktrace is pretty short, is there a way to debug this better? Any ideas to why it is giving me this particular error and maybe how to solve it? All help is appreciated!
CodePudding user response:
I am pretty sure the problem lies within
@staticmethod
def merge_states(delta: dict, reported: DeviceShadow):
for key, value in delta.items():
reported[key] = value
return reported
where the __setitem__
call on the reported
argument raises a TypeError
because the reported argument is a DeviceShadow
dataclass object that doesn't support item assignment.
If you want to set fields of a dataclass where you have a string of the field name, you can use setattr(reported, key, value)
.