PymoTube
A python module for logging data from an AtmoTube via bluetooth. Very much in development.
PymoTube is currently just a set of helper functions and classes for taking bytearrays returned by the AtmoTube and turning it into a basic struct. The actual connection to the AtmoTube is managed by Bleak. A minimal example of connecting to an AtmoTube and logging results into an asynchronous queue is shown here
A Logging Example
As an example of how to use this, consider the case where you want to connect to your AtmoTube from a PC, log data from it over a pre-defined period, then exit. This will be done asynchronously and the retrieved data put into an asynchronous queue for processing.
The first step is to create a function which connects to the AtmoTube, collects data, and then puts that data into a queue.
from bleak import BleakClient, BleakScanner
from atmotube import start_gatt_notifications, get_available_services
import asyncio
async def collect_data(mac, queue, collection_time):
1 async def callback_queue(packet):
await queue.put(packet)
2 device = await BleakScanner.find_device_by_address(mac)
if not device:
raise Exception("Device not found")
3 async with BleakClient(device) as client:
if not client.is_connected:
raise Exception("Failed to connect to device")
4 packet_list = get_available_services(client)
5 await start_gatt_notifications(client, callback_queue,
packet_list=packet_list)
6 await asyncio.sleep(collection_time)
7 await queue.put(None)- 1
- Define a callback function which takes a packet of data and does something with it. In this case it puts it in an asynchronous queue.
- 2
- Find the device using Bleak, in this case by mac address
- 3
- Connect to the device using Bleak
- 4
-
Call the
get_available_servicesfunction with the connected device, this generates a list of GATT services that both the atmotube library knows about and the AtmoTube device supports. - 5
- Start the GATT notifications for the list of available services. If no list is provided, it will attempt to start GATT notifications for all services supported by the atmotube library.
- 6
- Wait while data is collected, for the pre-defined collection time (in seconds)
- 7
-
Put a
Noneon the queue to indicate that the collection has ended.
What ends up on the queue is a series of AtmoTubePacket objects representing the different types of data packets the AtmoTube returns. Each packet has an associated datetime object representing the time when the packet was received. As a somewhat silly example, this takes those packets and logs them to the logger – a more realistic thing to do might be to put the data in a database or append the data to a CSV.
from atmotube import SPS30Packet, StatusPacket, BME280Packet, SGPC3Packet
import logging
def log_packet(packet):
1 match packet:
case StatusPacket():
2 logging.info(f"{str(packet.date_time)} - Status Packet - "
f"Battery: {packet.battery_level}%, "
f"PM Sensor: {packet.pm_sensor_status}, "
f"Pre-heating: {packet.pre_heating}, "
f"Error: {packet.error_flag}")
case SPS30Packet():
logging.info(f"{str(packet.date_time)} - SPS30 Packet - "
f"PM1: {packet.pm1} µg/m³, "
f"PM2.5: {packet.pm2_5} µg/m³, "
f"PM4: {packet.pm4} µg/m³, "
f"PM10: {packet.pm10} µg/m³")
case BME280Packet():
logging.info(f"{str(packet.date_time)} - BME280 Packet - "
f"Humidity: {packet.humidity}%, "
f"Temperature: {packet.temperature}°C, "
f"Pressure: {packet.pressure} mbar")
case SGPC3Packet():
logging.info(f"{str(packet.date_time)} - SGPC3 Packet - "
f"TVOC: {packet.tvoc} ppb")
case _:
logging.info("Unknown packet type")- 1
- Use structural pattern matching to identify which data has been returned.
- 2
- Send some information about it to the logger
Finally, an asynchronous event loop is created which runs the collector and then logs the data.
ATMOTUBE = "00:00:00:00:00:00" # the mac address of the ATMOTUBE
def main():
mac = ATMOTUBE
collection_time = 60 # seconds
1 queue = asyncio.Queue()
2 async def runner():
3 collector = asyncio.create_task(
collect_data(mac, queue, collection_time)
)
4 while True:
item = await queue.get()
if item is None:
break
log_packet(item)
await collector
asyncio.run(runner())
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
main()- 1
- Initialize an asynchronous queue, this will be used to pass the data between the two workers
- 2
-
Create a
runnerfunction to handle the main sequence of events - 3
- Start the collector
- 4
-
Wait for data to appear on the queue, and then pass the data to
log_packet