Simple test¶
Ensure your device works with this simple test.
1# SPDX-FileCopyrightText: Tony DiCola for Adafruit Industries
2# SPDX-FileCopyrightText: 2019 Adafruit Industries for Adafruit Industries
3# SPDX-License-Identifier: MIT
4
5# Example of using the Adafruit IO CircuitPython MQTT client
6# to subscribe to an Adafruit IO feed and publish random data
7# to be received by the feed.
8import time
9from random import randint
10
11
12import board
13import busio
14from digitalio import DigitalInOut
15import adafruit_connection_manager
16from adafruit_esp32spi import adafruit_esp32spi
17from adafruit_esp32spi import adafruit_esp32spi_wifimanager
18import neopixel
19import adafruit_minimqtt.adafruit_minimqtt as MQTT
20from adafruit_io.adafruit_io import IO_MQTT
21
22### WiFi ###
23
24# Get wifi details and more from a secrets.py file
25try:
26 from secrets import secrets
27except ImportError:
28 print("WiFi secrets are kept in secrets.py, please add them there!")
29 raise
30
31# If you are using a board with pre-defined ESP32 Pins:
32esp32_cs = DigitalInOut(board.ESP_CS)
33esp32_ready = DigitalInOut(board.ESP_BUSY)
34esp32_reset = DigitalInOut(board.ESP_RESET)
35
36# If you have an externally connected ESP32:
37# esp32_cs = DigitalInOut(board.D9)
38# esp32_ready = DigitalInOut(board.D10)
39# esp32_reset = DigitalInOut(board.D5)
40
41spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
42esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
43"""Use below for Most Boards"""
44status_light = neopixel.NeoPixel(
45 board.NEOPIXEL, 1, brightness=0.2
46) # Uncomment for Most Boards
47"""Uncomment below for ItsyBitsy M4"""
48# status_light = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=0.2)
49# Uncomment below for an externally defined RGB LED
50# import adafruit_rgbled
51# from adafruit_esp32spi import PWMOut
52# RED_LED = PWMOut.PWMOut(esp, 26)
53# GREEN_LED = PWMOut.PWMOut(esp, 27)
54# BLUE_LED = PWMOut.PWMOut(esp, 25)
55# status_light = adafruit_rgbled.RGBLED(RED_LED, BLUE_LED, GREEN_LED)
56wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light)
57
58
59# Define callback functions which will be called when certain events happen.
60# pylint: disable=unused-argument
61def connected(client):
62 # Connected function will be called when the client is connected to Adafruit IO.
63 # This is a good place to subscribe to feed changes. The client parameter
64 # passed to this function is the Adafruit IO MQTT client so you can make
65 # calls against it easily.
66 print("Connected to Adafruit IO! Listening for DemoFeed changes...")
67 # Subscribe to changes on a feed named DemoFeed.
68 client.subscribe("DemoFeed")
69
70
71def subscribe(client, userdata, topic, granted_qos):
72 # This method is called when the client subscribes to a new feed.
73 print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))
74
75
76def unsubscribe(client, userdata, topic, pid):
77 # This method is called when the client unsubscribes from a feed.
78 print("Unsubscribed from {0} with PID {1}".format(topic, pid))
79
80
81# pylint: disable=unused-argument
82def disconnected(client):
83 # Disconnected function will be called when the client disconnects.
84 print("Disconnected from Adafruit IO!")
85
86
87# pylint: disable=unused-argument
88def message(client, feed_id, payload):
89 # Message function will be called when a subscribed feed has a new value.
90 # The feed_id parameter identifies the feed, and the payload parameter has
91 # the new value.
92 print("Feed {0} received new value: {1}".format(feed_id, payload))
93
94
95# Connect to WiFi
96print("Connecting to WiFi...")
97wifi.connect()
98print("Connected!")
99
100pool = adafruit_connection_manager.get_radio_socketpool(esp)
101ssl_context = adafruit_connection_manager.get_radio_ssl_context(esp)
102
103# Initialize a new MQTT Client object
104mqtt_client = MQTT.MQTT(
105 broker="io.adafruit.com",
106 port=1883,
107 username=secrets["aio_username"],
108 password=secrets["aio_key"],
109 socket_pool=pool,
110 ssl_context=ssl_context,
111)
112
113
114# Initialize an Adafruit IO MQTT Client
115io = IO_MQTT(mqtt_client)
116
117# Connect the callback methods defined above to Adafruit IO
118io.on_connect = connected
119io.on_disconnect = disconnected
120io.on_subscribe = subscribe
121io.on_unsubscribe = unsubscribe
122io.on_message = message
123
124# Connect to Adafruit IO
125print("Connecting to Adafruit IO...")
126io.connect()
127
128# Below is an example of manually publishing a new value to Adafruit IO.
129last = 0
130print("Publishing a new message every 10 seconds...")
131while True:
132 # Explicitly pump the message loop.
133 io.loop()
134 # Send a new message every 10 seconds.
135 if (time.monotonic() - last) >= 5:
136 value = randint(0, 100)
137 print("Publishing {0} to DemoFeed.".format(value))
138 io.publish("DemoFeed", value)
139 last = time.monotonic()