Home > Enterprise >  How can I read incoming BLE data? [Raspberry Pi]
How can I read incoming BLE data? [Raspberry Pi]

Time:08-02

I developed a program for IOS and now it needs to communicate with Raspberry Pi over BLE. I am able to send BLE data to characteristic and I can see the data from bluetoothctl terminal. I tried to create a Python program so it can read incoming BLE data but I was not able to make it. I searched and find pexpect but I was not able to read data from terminal (Hard to detect changes and read last line). I searched and found python dbus but I never experienced. Is there anyone can help me?

  • IOS application pairs automatically.

  • IOS application sets notify to Raspberry Pi

  • Raspberry Pi

    • Service: 0xffff
    • Characteristic: 0xbbbb read,write,notify

incoming data

CodePudding user response:

I am just not sure that the functionality in bluetoothctl was ever intended to be interacted with from a programming language like Python.

The D-Bus APIs documented at: https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/doc are how the developers expect people to access data.

I accept that using those D-Bus APIs has a learning curve attached to them and for creating a BLE Peripheral this is somewhat significant especially if this is your first time using D-Bus.

What bluetoothctl does behind the scenes is create an "application" on D-Bus and it tells you what the path is. In the example you posted it is /org/bluez/app/service0/chrc0.

The problem is that there is no easy way to get hold of the data written to this characteristic as it is not published to D-Bus in a way that other clients can get hold of the data.

You can monitor all the BlueZ D-Bus activity on the command line with:

$ sudo busctl monitor org.bluez
Monitoring bus message stream.
‣ Type=method_call  Endian=l  Flags=0  Version=1  Priority=0 Cookie=104
  Sender=:1.1742  Destination=:1.1744  Path=/org/bluez/app/service0/chrc0  Interface=org.bluez.GattCharacteristic1  Member=WriteValue
  UniqueName=:1.1742
  MESSAGE "aya{sv}" {
          ARRAY "y" {
                  BYTE 68;
                  BYTE 105;
                  BYTE 114;
                  BYTE 101;
                  BYTE 107;
          };
          ARRAY "{sv}" {
                  DICT_ENTRY "sv" {
                          STRING "device";
                          VARIANT "o" {
                                  OBJECT_PATH "/org/bluez/hci0/dev_6C_40_BE_46_E5_5A";
                          };
                  };
                  DICT_ENTRY "sv" {
                          STRING "link";
                          VARIANT "s" {
                                  STRING "LE";
                          };
                  };
                  DICT_ENTRY "sv" {
                          STRING "mtu";
                          VARIANT "q" {
                                  UINT16 517;
                          };
                  };
          };
  };
         -       -          

If you extract the bytes sent then that is your value

>>> value = [68, 105, 114, 101, 107]
>>> bytes(value)
b'Direk'

However, that looks like a lot of work and is quite hacky.

The BlueZ project includes in its source repository a Python example advertisement:

https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/test/example-advertisement

example BLE peripheral:

https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/test/example-gatt-server

Below I've done an example of putting these both into one file. I've also tried to split out "boiler plate" code that you should not have to change and the code that is specific to your application.

The function my_write_callback(txt) will get called each time your iOS app writes data (string) to the peripheral.

import dbus
import dbus.exceptions
import dbus.mainloop.glib
import dbus.service

from gi.repository import GLib

# BlueZ D-Bus interfaces
BLUEZ_SERVICE_NAME = "org.bluez"
GATT_MANAGER_IFACE = "org.bluez.GattManager1"
DBUS_OM_IFACE = "org.freedesktop.DBus.ObjectManager"
DBUS_PROP_IFACE = "org.freedesktop.DBus.Properties"

GATT_SERVICE_IFACE = "org.bluez.GattService1"
GATT_CHRC_IFACE = "org.bluez.GattCharacteristic1"
GATT_DESC_IFACE = "org.bluez.GattDescriptor1"
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1'

# Test UUIDs
TEST_SERVICE = "0000ffff-beef-c0c0-c0de-c0ffeefacade"
TEST_CHARACTERISTIC = "0000bbbb-beef-c0c0-c0de-c0ffeefacade"


# Boiler plate start
class InvalidArgsException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'


class NotSupportedException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.NotSupported'


class NotPermittedException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.NotPermitted'


class InvalidValueLengthException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.InvalidValueLength'


class FailedException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.Failed'


def register_app_cb():
    print("GATT application registered")


def register_app_error_cb(error):
    print("Failed to register application: "   str(error))
    mainloop.quit()


def register_ad_cb():
    print('Advertisement registered')


def register_ad_error_cb(error):
    print('Failed to register advertisement: '   str(error))
    mainloop.quit()


class Advertisement(dbus.service.Object):
    PATH_BASE = '/org/bluez/example/advertisement'

    def __init__(self, bus, index, advertising_type):
        self.path = self.PATH_BASE   str(index)
        self.bus = bus
        self.ad_type = advertising_type
        self.service_uuids = None
        self.manufacturer_data = None
        self.solicit_uuids = None
        self.service_data = None
        self.local_name = None
        self.include_tx_power = False
        self.data = None
        dbus.service.Object.__init__(self, bus, self.path)

    def get_properties(self):
        properties = dict()
        properties['Type'] = self.ad_type
        if self.service_uuids is not None:
            properties['ServiceUUIDs'] = dbus.Array(self.service_uuids,
                                                    signature='s')
        if self.solicit_uuids is not None:
            properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids,
                                                    signature='s')
        if self.manufacturer_data is not None:
            properties['ManufacturerData'] = dbus.Dictionary(
                self.manufacturer_data, signature='qv')
        if self.service_data is not None:
            properties['ServiceData'] = dbus.Dictionary(self.service_data,
                                                        signature='sv')
        if self.local_name is not None:
            properties['LocalName'] = dbus.String(self.local_name)
        if self.include_tx_power:
            properties['Includes'] = dbus.Array(["tx-power"], signature='s')

        if self.data is not None:
            properties['Data'] = dbus.Dictionary(
                self.data, signature='yv')
        return {LE_ADVERTISEMENT_IFACE: properties}

    def get_path(self):
        return dbus.ObjectPath(self.path)

    def add_service_uuid(self, uuid):
        if not self.service_uuids:
            self.service_uuids = []
        self.service_uuids.append(uuid)

    def add_solicit_uuid(self, uuid):
        if not self.solicit_uuids:
            self.solicit_uuids = []
        self.solicit_uuids.append(uuid)

    def add_manufacturer_data(self, manuf_code, data):
        if not self.manufacturer_data:
            self.manufacturer_data = dbus.Dictionary({}, signature='qv')
        self.manufacturer_data[manuf_code] = dbus.Array(data, signature='y')

    def add_service_data(self, uuid, data):
        if not self.service_data:
            self.service_data = dbus.Dictionary({}, signature='sv')
        self.service_data[uuid] = dbus.Array(data, signature='y')

    def add_local_name(self, name):
        if not self.local_name:
            self.local_name = ""
        self.local_name = dbus.String(name)

    def add_data(self, ad_type, data):
        if not self.data:
            self.data = dbus.Dictionary({}, signature='yv')
        self.data[ad_type] = dbus.Array(data, signature='y')

    @dbus.service.method(DBUS_PROP_IFACE,
                         in_signature='s',
                         out_signature='a{sv}')
    def GetAll(self, interface):
        print('GetAll')
        if interface != LE_ADVERTISEMENT_IFACE:
            raise InvalidArgsException()
        print('returning props')
        return self.get_properties()[LE_ADVERTISEMENT_IFACE]

    @dbus.service.method(LE_ADVERTISEMENT_IFACE,
                         in_signature='',
                         out_signature='')
    def Release(self):
        print('%s: Released!' % self.path)


class Service(dbus.service.Object):
    """
    org.bluez.GattService1 interface implementation
    """

    PATH_BASE = "/org/bluez/app/service"

    def __init__(self, bus, index, uuid, primary):
        self.path = self.PATH_BASE   str(index)
        self.bus = bus
        self.uuid = uuid
        self.primary = primary
        self.characteristics = []
        dbus.service.Object.__init__(self, bus, self.path)

    def get_properties(self):
        return {
            GATT_SERVICE_IFACE: {
                "UUID": self.uuid,
                "Primary": self.primary,
                "Characteristics": dbus.Array(
                    self.get_characteristic_paths(), signature="o"
                ),
            }
        }

    def get_path(self):
        return dbus.ObjectPath(self.path)

    def add_characteristic(self, characteristic):
        self.characteristics.append(characteristic)

    def get_characteristic_paths(self):
        result = []
        for chrc in self.characteristics:
            result.append(chrc.get_path())
        return result

    def get_characteristics(self):
        return self.characteristics

    @dbus.service.method(DBUS_PROP_IFACE, in_signature="s", out_signature="a{sv}")
    def GetAll(self, interface):
        if interface != GATT_SERVICE_IFACE:
            raise InvalidArgsException()

        return self.get_properties()[GATT_SERVICE_IFACE]


class Characteristic(dbus.service.Object):
    """
    org.bluez.GattCharacteristic1 interface implementation
    """

    def __init__(self, bus, index, uuid, flags, service):
        self.path = service.path   "/chrc"   str(index)
        self.bus = bus
        self.uuid = uuid
        self.service = service
        self.flags = flags
        self.descriptors = []
        dbus.service.Object.__init__(self, bus, self.path)

    def get_properties(self):
        return {
            GATT_CHRC_IFACE: {
                "Service": self.service.get_path(),
                "UUID": self.uuid,
                "Flags": self.flags,
                "Descriptors": dbus.Array(self.get_descriptor_paths(), signature="o"),
            }
        }

    def get_path(self):
        return dbus.ObjectPath(self.path)

    def add_descriptor(self, descriptor):
        self.descriptors.append(descriptor)

    def get_descriptor_paths(self):
        result = []
        for desc in self.descriptors:
            result.append(desc.get_path())
        return result

    def get_descriptors(self):
        return self.descriptors

    @dbus.service.method(DBUS_PROP_IFACE, in_signature="s", out_signature="a{sv}")
    def GetAll(self, interface):
        if interface != GATT_CHRC_IFACE:
            raise InvalidArgsException()

        return self.get_properties()[GATT_CHRC_IFACE]

    @dbus.service.method(GATT_CHRC_IFACE, in_signature="a{sv}", out_signature="ay")
    def ReadValue(self, options):
        print("Default ReadValue called, returning error")
        raise NotSupportedException()

    @dbus.service.method(GATT_CHRC_IFACE, in_signature="aya{sv}")
    def WriteValue(self, value, options):
        print("Default WriteValue called, returning error")
        raise NotSupportedException()

    @dbus.service.method(GATT_CHRC_IFACE)
    def StartNotify(self):
        print("Default StartNotify called, returning error")
        raise NotSupportedException()

    @dbus.service.method(GATT_CHRC_IFACE)
    def StopNotify(self):
        print("Default StopNotify called, returning error")
        raise NotSupportedException()

    @dbus.service.signal(DBUS_PROP_IFACE, signature="sa{sv}as")
    def PropertiesChanged(self, interface, changed, invalidated):
        pass


def find_adapter(bus, iface):
    remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
                               DBUS_OM_IFACE)
    objects = remote_om.GetManagedObjects()

    for o, props in objects.items():
        if iface in props:
            return o

    return None

# Boiler plate end


class TestService(Service):
    """
    Test service that provides a characteristic
    """

    def __init__(self, bus, index):
        Service.__init__(self, bus, index, TEST_SERVICE, True)
        self.add_characteristic(TestCharacteristic(bus, 0, self))


class TestCharacteristic(Characteristic):
    """
    Test characteristic. Allows writing arbitrary bytes to its value
    """

    def __init__(self, bus, index, service):
        Characteristic.__init__(
            self, bus, index, TEST_CHARACTERISTIC, ["write"], service
        )
        self.value = ""

    def WriteValue(self, value, options):
        print(f"TestCharacteristic Write: {value}")
        txt = bytes(value).decode('utf8')
        print(f"As text: {txt}")
        self.value = txt
        my_write_callback(txt)


class TestAdvertisement(Advertisement):

    def __init__(self, bus, index):
        Advertisement.__init__(self, bus, index, 'peripheral')
        self.add_local_name('My Test Peripheral')
        self.include_tx_power = True


class Application(dbus.service.Object):
    """
    org.bluez.GattApplication1 interface implementation
    """

    def __init__(self, bus):
        self.path = "/"
        self.services = []
        dbus.service.Object.__init__(self, bus, self.path)
        self.add_service(TestService(bus, 0))

    def get_path(self):
        return dbus.ObjectPath(self.path)

    def add_service(self, service):
        self.services.append(service)

    @dbus.service.method(DBUS_OM_IFACE, out_signature="a{oa{sa{sv}}}")
    def GetManagedObjects(self):
        response = {}
        print("GetManagedObjects")

        for service in self.services:
            response[service.get_path()] = service.get_properties()
            chrcs = service.get_characteristics()
            for chrc in chrcs:
                response[chrc.get_path()] = chrc.get_properties()
                descs = chrc.get_descriptors()
                for desc in descs:
                    response[desc.get_path()] = desc.get_properties()

        return response


def my_write_callback(txt):
    print(f"This is where I can use the <<{txt}>> value")


def main():
    global mainloop

    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

    bus = dbus.SystemBus()

    adapter = find_adapter(bus, LE_ADVERTISING_MANAGER_IFACE)
    if not adapter:
        print("Adapter not found")
        return

    service_manager = dbus.Interface(
        bus.get_object(BLUEZ_SERVICE_NAME, adapter), GATT_MANAGER_IFACE
    )

    app = Application(bus)

    test_advertisement = TestAdvertisement(bus, 0)

    ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
                                LE_ADVERTISING_MANAGER_IFACE)

    ad_manager.RegisterAdvertisement(test_advertisement.get_path(), {},
                                     reply_handler=register_ad_cb,
                                     error_handler=register_ad_error_cb)
    mainloop = GLib.MainLoop()

    print("Registering GATT application...")

    service_manager.RegisterApplication(
        app.get_path(),
        {},
        reply_handler=register_app_cb,
        error_handler=register_app_error_cb,
    )

    mainloop.run()


if __name__ == "__main__":
    main()

I change the UUIDs used in the peripheral as custom service & characteristics need to be outside the range reserved for Bluetooth SIG. More information on this at: https://novelbits.io/uuid-for-custom-services-and-characteristics/

Hopefully this is enough to get you moving up the D-Bus learning curve for BlueZ.

  • Related