I developed a program for IOS and now it needs to communicate with Raspberry Pi over BLE. I am able to send BLE data to characteristic and I can see the data from bluetoothctl
terminal. I tried to create a Python program so it can read incoming BLE data but I was not able to make it. I searched and find pexpect
but I was not able to read data from terminal (Hard to detect changes and read last line). I searched and found python dbus but I never experienced. Is there anyone can help me?
IOS application pairs automatically.
IOS application sets notify to Raspberry Pi
Raspberry Pi
- Service: 0xffff
- Characteristic: 0xbbbb read,write,notify
CodePudding user response:
I am just not sure that the functionality in bluetoothctl
was ever intended to be interacted with from a programming language like Python.
The D-Bus APIs documented at: https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/doc are how the developers expect people to access data.
I accept that using those D-Bus APIs has a learning curve attached to them and for creating a BLE Peripheral this is somewhat significant especially if this is your first time using D-Bus.
What bluetoothctl
does behind the scenes is create an "application" on D-Bus and it tells you what the path is. In the example you posted it is /org/bluez/app/service0/chrc0
.
The problem is that there is no easy way to get hold of the data written to this characteristic as it is not published to D-Bus in a way that other clients can get hold of the data.
You can monitor all the BlueZ D-Bus activity on the command line with:
$ sudo busctl monitor org.bluez
Monitoring bus message stream.
‣ Type=method_call Endian=l Flags=0 Version=1 Priority=0 Cookie=104
Sender=:1.1742 Destination=:1.1744 Path=/org/bluez/app/service0/chrc0 Interface=org.bluez.GattCharacteristic1 Member=WriteValue
UniqueName=:1.1742
MESSAGE "aya{sv}" {
ARRAY "y" {
BYTE 68;
BYTE 105;
BYTE 114;
BYTE 101;
BYTE 107;
};
ARRAY "{sv}" {
DICT_ENTRY "sv" {
STRING "device";
VARIANT "o" {
OBJECT_PATH "/org/bluez/hci0/dev_6C_40_BE_46_E5_5A";
};
};
DICT_ENTRY "sv" {
STRING "link";
VARIANT "s" {
STRING "LE";
};
};
DICT_ENTRY "sv" {
STRING "mtu";
VARIANT "q" {
UINT16 517;
};
};
};
};
- -
If you extract the bytes sent then that is your value
>>> value = [68, 105, 114, 101, 107]
>>> bytes(value)
b'Direk'
However, that looks like a lot of work and is quite hacky.
The BlueZ project includes in its source repository a Python example advertisement:
https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/test/example-advertisement
example BLE peripheral:
https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/test/example-gatt-server
Below I've done an example of putting these both into one file. I've also tried to split out "boiler plate" code that you should not have to change and the code that is specific to your application.
The function my_write_callback(txt)
will get called each time your iOS app writes data (string) to the peripheral.
import dbus
import dbus.exceptions
import dbus.mainloop.glib
import dbus.service
from gi.repository import GLib
# BlueZ D-Bus interfaces
BLUEZ_SERVICE_NAME = "org.bluez"
GATT_MANAGER_IFACE = "org.bluez.GattManager1"
DBUS_OM_IFACE = "org.freedesktop.DBus.ObjectManager"
DBUS_PROP_IFACE = "org.freedesktop.DBus.Properties"
GATT_SERVICE_IFACE = "org.bluez.GattService1"
GATT_CHRC_IFACE = "org.bluez.GattCharacteristic1"
GATT_DESC_IFACE = "org.bluez.GattDescriptor1"
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1'
# Test UUIDs
TEST_SERVICE = "0000ffff-beef-c0c0-c0de-c0ffeefacade"
TEST_CHARACTERISTIC = "0000bbbb-beef-c0c0-c0de-c0ffeefacade"
# Boiler plate start
class InvalidArgsException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
class NotSupportedException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.NotSupported'
class NotPermittedException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.NotPermitted'
class InvalidValueLengthException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.InvalidValueLength'
class FailedException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.Failed'
def register_app_cb():
print("GATT application registered")
def register_app_error_cb(error):
print("Failed to register application: " str(error))
mainloop.quit()
def register_ad_cb():
print('Advertisement registered')
def register_ad_error_cb(error):
print('Failed to register advertisement: ' str(error))
mainloop.quit()
class Advertisement(dbus.service.Object):
PATH_BASE = '/org/bluez/example/advertisement'
def __init__(self, bus, index, advertising_type):
self.path = self.PATH_BASE str(index)
self.bus = bus
self.ad_type = advertising_type
self.service_uuids = None
self.manufacturer_data = None
self.solicit_uuids = None
self.service_data = None
self.local_name = None
self.include_tx_power = False
self.data = None
dbus.service.Object.__init__(self, bus, self.path)
def get_properties(self):
properties = dict()
properties['Type'] = self.ad_type
if self.service_uuids is not None:
properties['ServiceUUIDs'] = dbus.Array(self.service_uuids,
signature='s')
if self.solicit_uuids is not None:
properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids,
signature='s')
if self.manufacturer_data is not None:
properties['ManufacturerData'] = dbus.Dictionary(
self.manufacturer_data, signature='qv')
if self.service_data is not None:
properties['ServiceData'] = dbus.Dictionary(self.service_data,
signature='sv')
if self.local_name is not None:
properties['LocalName'] = dbus.String(self.local_name)
if self.include_tx_power:
properties['Includes'] = dbus.Array(["tx-power"], signature='s')
if self.data is not None:
properties['Data'] = dbus.Dictionary(
self.data, signature='yv')
return {LE_ADVERTISEMENT_IFACE: properties}
def get_path(self):
return dbus.ObjectPath(self.path)
def add_service_uuid(self, uuid):
if not self.service_uuids:
self.service_uuids = []
self.service_uuids.append(uuid)
def add_solicit_uuid(self, uuid):
if not self.solicit_uuids:
self.solicit_uuids = []
self.solicit_uuids.append(uuid)
def add_manufacturer_data(self, manuf_code, data):
if not self.manufacturer_data:
self.manufacturer_data = dbus.Dictionary({}, signature='qv')
self.manufacturer_data[manuf_code] = dbus.Array(data, signature='y')
def add_service_data(self, uuid, data):
if not self.service_data:
self.service_data = dbus.Dictionary({}, signature='sv')
self.service_data[uuid] = dbus.Array(data, signature='y')
def add_local_name(self, name):
if not self.local_name:
self.local_name = ""
self.local_name = dbus.String(name)
def add_data(self, ad_type, data):
if not self.data:
self.data = dbus.Dictionary({}, signature='yv')
self.data[ad_type] = dbus.Array(data, signature='y')
@dbus.service.method(DBUS_PROP_IFACE,
in_signature='s',
out_signature='a{sv}')
def GetAll(self, interface):
print('GetAll')
if interface != LE_ADVERTISEMENT_IFACE:
raise InvalidArgsException()
print('returning props')
return self.get_properties()[LE_ADVERTISEMENT_IFACE]
@dbus.service.method(LE_ADVERTISEMENT_IFACE,
in_signature='',
out_signature='')
def Release(self):
print('%s: Released!' % self.path)
class Service(dbus.service.Object):
"""
org.bluez.GattService1 interface implementation
"""
PATH_BASE = "/org/bluez/app/service"
def __init__(self, bus, index, uuid, primary):
self.path = self.PATH_BASE str(index)
self.bus = bus
self.uuid = uuid
self.primary = primary
self.characteristics = []
dbus.service.Object.__init__(self, bus, self.path)
def get_properties(self):
return {
GATT_SERVICE_IFACE: {
"UUID": self.uuid,
"Primary": self.primary,
"Characteristics": dbus.Array(
self.get_characteristic_paths(), signature="o"
),
}
}
def get_path(self):
return dbus.ObjectPath(self.path)
def add_characteristic(self, characteristic):
self.characteristics.append(characteristic)
def get_characteristic_paths(self):
result = []
for chrc in self.characteristics:
result.append(chrc.get_path())
return result
def get_characteristics(self):
return self.characteristics
@dbus.service.method(DBUS_PROP_IFACE, in_signature="s", out_signature="a{sv}")
def GetAll(self, interface):
if interface != GATT_SERVICE_IFACE:
raise InvalidArgsException()
return self.get_properties()[GATT_SERVICE_IFACE]
class Characteristic(dbus.service.Object):
"""
org.bluez.GattCharacteristic1 interface implementation
"""
def __init__(self, bus, index, uuid, flags, service):
self.path = service.path "/chrc" str(index)
self.bus = bus
self.uuid = uuid
self.service = service
self.flags = flags
self.descriptors = []
dbus.service.Object.__init__(self, bus, self.path)
def get_properties(self):
return {
GATT_CHRC_IFACE: {
"Service": self.service.get_path(),
"UUID": self.uuid,
"Flags": self.flags,
"Descriptors": dbus.Array(self.get_descriptor_paths(), signature="o"),
}
}
def get_path(self):
return dbus.ObjectPath(self.path)
def add_descriptor(self, descriptor):
self.descriptors.append(descriptor)
def get_descriptor_paths(self):
result = []
for desc in self.descriptors:
result.append(desc.get_path())
return result
def get_descriptors(self):
return self.descriptors
@dbus.service.method(DBUS_PROP_IFACE, in_signature="s", out_signature="a{sv}")
def GetAll(self, interface):
if interface != GATT_CHRC_IFACE:
raise InvalidArgsException()
return self.get_properties()[GATT_CHRC_IFACE]
@dbus.service.method(GATT_CHRC_IFACE, in_signature="a{sv}", out_signature="ay")
def ReadValue(self, options):
print("Default ReadValue called, returning error")
raise NotSupportedException()
@dbus.service.method(GATT_CHRC_IFACE, in_signature="aya{sv}")
def WriteValue(self, value, options):
print("Default WriteValue called, returning error")
raise NotSupportedException()
@dbus.service.method(GATT_CHRC_IFACE)
def StartNotify(self):
print("Default StartNotify called, returning error")
raise NotSupportedException()
@dbus.service.method(GATT_CHRC_IFACE)
def StopNotify(self):
print("Default StopNotify called, returning error")
raise NotSupportedException()
@dbus.service.signal(DBUS_PROP_IFACE, signature="sa{sv}as")
def PropertiesChanged(self, interface, changed, invalidated):
pass
def find_adapter(bus, iface):
remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
DBUS_OM_IFACE)
objects = remote_om.GetManagedObjects()
for o, props in objects.items():
if iface in props:
return o
return None
# Boiler plate end
class TestService(Service):
"""
Test service that provides a characteristic
"""
def __init__(self, bus, index):
Service.__init__(self, bus, index, TEST_SERVICE, True)
self.add_characteristic(TestCharacteristic(bus, 0, self))
class TestCharacteristic(Characteristic):
"""
Test characteristic. Allows writing arbitrary bytes to its value
"""
def __init__(self, bus, index, service):
Characteristic.__init__(
self, bus, index, TEST_CHARACTERISTIC, ["write"], service
)
self.value = ""
def WriteValue(self, value, options):
print(f"TestCharacteristic Write: {value}")
txt = bytes(value).decode('utf8')
print(f"As text: {txt}")
self.value = txt
my_write_callback(txt)
class TestAdvertisement(Advertisement):
def __init__(self, bus, index):
Advertisement.__init__(self, bus, index, 'peripheral')
self.add_local_name('My Test Peripheral')
self.include_tx_power = True
class Application(dbus.service.Object):
"""
org.bluez.GattApplication1 interface implementation
"""
def __init__(self, bus):
self.path = "/"
self.services = []
dbus.service.Object.__init__(self, bus, self.path)
self.add_service(TestService(bus, 0))
def get_path(self):
return dbus.ObjectPath(self.path)
def add_service(self, service):
self.services.append(service)
@dbus.service.method(DBUS_OM_IFACE, out_signature="a{oa{sa{sv}}}")
def GetManagedObjects(self):
response = {}
print("GetManagedObjects")
for service in self.services:
response[service.get_path()] = service.get_properties()
chrcs = service.get_characteristics()
for chrc in chrcs:
response[chrc.get_path()] = chrc.get_properties()
descs = chrc.get_descriptors()
for desc in descs:
response[desc.get_path()] = desc.get_properties()
return response
def my_write_callback(txt):
print(f"This is where I can use the <<{txt}>> value")
def main():
global mainloop
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
adapter = find_adapter(bus, LE_ADVERTISING_MANAGER_IFACE)
if not adapter:
print("Adapter not found")
return
service_manager = dbus.Interface(
bus.get_object(BLUEZ_SERVICE_NAME, adapter), GATT_MANAGER_IFACE
)
app = Application(bus)
test_advertisement = TestAdvertisement(bus, 0)
ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
LE_ADVERTISING_MANAGER_IFACE)
ad_manager.RegisterAdvertisement(test_advertisement.get_path(), {},
reply_handler=register_ad_cb,
error_handler=register_ad_error_cb)
mainloop = GLib.MainLoop()
print("Registering GATT application...")
service_manager.RegisterApplication(
app.get_path(),
{},
reply_handler=register_app_cb,
error_handler=register_app_error_cb,
)
mainloop.run()
if __name__ == "__main__":
main()
I change the UUIDs used in the peripheral as custom service & characteristics need to be outside the range reserved for Bluetooth SIG. More information on this at: https://novelbits.io/uuid-for-custom-services-and-characteristics/
Hopefully this is enough to get you moving up the D-Bus learning curve for BlueZ.