PymoTube

python
A python module for logging data from an AtmoTube over bluetooth.

LICENSE Tests codecov

A python module for logging data from an AtmoTube via bluetooth. Very much in development.

PymoTube is currently just a set of helper functions and classes for taking bytearrays returned by the AtmoTube and turning it into a basic struct. The actual connection to the AtmoTube is managed by Bleak. A minimal example of connecting to an AtmoTube and logging results into an asynchronous queue is shown here

A Logging Example

As an example of how to use this, consider the case where you want to connect to your AtmoTube from a PC, log data from it over a pre-defined period, then exit. This will be done asynchronously and the retrieved data put into an asynchronous queue for processing.

The first step is to create a function which connects to the AtmoTube, collects data, and then puts that data into a queue.

from bleak import BleakClient, BleakScanner
from atmotube import start_gatt_notifications, get_available_services
import asyncio


async def collect_data(mac, queue, collection_time):
1    async def callback_queue(packet):
        await queue.put(packet)

2    device = await BleakScanner.find_device_by_address(mac)
    if not device:
        raise Exception("Device not found")

3    async with BleakClient(device) as client:
        if not client.is_connected:
            raise Exception("Failed to connect to device")
4        packet_list = get_available_services(client)
5        await start_gatt_notifications(client, callback_queue,
                                       packet_list=packet_list)
6        await asyncio.sleep(collection_time)
7        await queue.put(None)
1
Define a callback function which takes a packet of data and does something with it. In this case it puts it in an asynchronous queue.
2
Find the device using Bleak, in this case by mac address
3
Connect to the device using Bleak
4
Call the get_available_services function with the connected device, this generates a list of GATT services that both the atmotube library knows about and the AtmoTube device supports.
5
Start the GATT notifications for the list of available services. If no list is provided, it will attempt to start GATT notifications for all services supported by the atmotube library.
6
Wait while data is collected, for the pre-defined collection time (in seconds)
7
Put a None on the queue to indicate that the collection has ended.

What ends up on the queue is a series of AtmoTubePacket objects representing the different types of data packets the AtmoTube returns. Each packet has an associated datetime object representing the time when the packet was received. As a somewhat silly example, this takes those packets and logs them to the logger – a more realistic thing to do might be to put the data in a database or append the data to a CSV.

from atmotube import SPS30Packet, StatusPacket, BME280Packet, SGPC3Packet

import logging


def log_packet(packet):
1    match packet:
        case StatusPacket():
2            logging.info(f"{str(packet.date_time)} - Status Packet - "
                         f"Battery: {packet.battery_level}%, "
                         f"PM Sensor: {packet.pm_sensor_status}, "
                         f"Pre-heating: {packet.pre_heating}, "
                         f"Error: {packet.error_flag}")
        case SPS30Packet():
            logging.info(f"{str(packet.date_time)} - SPS30 Packet - "
                         f"PM1: {packet.pm1} µg/m³, "
                         f"PM2.5: {packet.pm2_5} µg/m³, "
                         f"PM4: {packet.pm4} µg/m³, "
                         f"PM10: {packet.pm10} µg/m³")
        case BME280Packet():
            logging.info(f"{str(packet.date_time)} - BME280 Packet - "
                         f"Humidity: {packet.humidity}%, "
                         f"Temperature: {packet.temperature}°C, "
                         f"Pressure: {packet.pressure} mbar")
        case SGPC3Packet():
            logging.info(f"{str(packet.date_time)} - SGPC3 Packet - "
                         f"TVOC: {packet.tvoc} ppb")
        case _:
            logging.info("Unknown packet type")
1
Use structural pattern matching to identify which data has been returned.
2
Send some information about it to the logger

Finally, an asynchronous event loop is created which runs the collector and then logs the data.

ATMOTUBE = "00:00:00:00:00:00" # the mac address of the ATMOTUBE

def main():
    mac = ATMOTUBE
    collection_time = 60  # seconds
1    queue = asyncio.Queue()

2    async def runner():
3        collector = asyncio.create_task(
            collect_data(mac, queue, collection_time)
            )
4        while True:
            item = await queue.get()
            if item is None:
                break
            log_packet(item)
        await collector

    asyncio.run(runner())


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    main()
1
Initialize an asynchronous queue, this will be used to pass the data between the two workers
2
Create a runner function to handle the main sequence of events
3
Start the collector
4
Wait for data to appear on the queue, and then pass the data to log_packet