ある案件でRaspberry Pi Pico 2 Wを使うことになりました.前々から気になっていたのですが,なかなか手を出すまでには至らずでしたが,ある案件でいよいよ使うことになりました.開発環境はCでもPythonでもよかったのですが,勉強を兼ねてPythonを使うことにしました.通常のPythonとは異なり,MicroPythonで開発ということの様です.
Raspberry Pi PicoをBOOTモードで立ち上げるには,基板に備わるプッシュスイッチを押しながらパソコンとUSBケーブルでつなぐとよいようです.そうするとパソコン側からはRaspberry Pi Picoが外付けデバイスのように見えるので,そこへこちらのサイトから入手可能なuf2という拡張子のファイルをダウンロードし,そのファイルを外付けデバイスのように見えているRaspberry Pi Picoにコピーするだけでよいようです.あとはいったんUSBケーブルを取り外し,プッシュスイッチを押さずにUSBケーブルを接続するとPythonが動作させられる状況になります.
今回の目標は,Raspberry Pi Pico でMQTTをサブスクライブし,そのデータ内にある情報をもとにフルカラーLEDを制御するというものです.フルカラーLEDにはR,G,Bそれぞれの電圧を変更できるような可変レギュレータがあり,その可変レギュレータの電圧を決定するのに用いる抵抗として,ディジタルポテンショメータを使うというものです.このディジタルポテンショメータはI2Cで抵抗値を変えられることから,Raspberry Pi Picoを使ってディジタルポテンショメータをI2Cで制御することで,結果としてフルカラーLEDを制御することを実現します.
まずはMQTTのブローカです.今回,ブローカとしてRaspberry Pi 5を用いるため,こちらのサイトを参考にしました.
つぎにRaspberry Pi Pico 2 WにMQTTクライアントを作成します.まずはMQTTを使えなければならないため,こちらのサイトを参考にMQTTのライブラリを設定しました.行ったこととしては,Raspberry Pi Pico にmqttというフォルダを作成し,その中にsimple.pyとrobust.pyという2つのPythonプログラムを入れることです.実際,robust.pyの方は使わなくても動作しているようでした.そして,WiFiのアクセスポイントへの接続,MQTT Clientとしてブローカに接続,そしてサブスクライブする,というようなプログラムを書きました.下のプログラムはクラス化したものです.このクラスを使う時,サブスクライブした時点で呼び出される関数をコールバック関数として設定しておきます.あとはサブスクライブしたデータに基づき,I2Cで接続されたディジタルポテンショメータを制御すればよいはずです.
import network from time import sleep from umqtt.simple import MQTTClient class Subscriber: def __init__(self, ssid, ssid_password, client_id, server, port, mqtt_user, mqtt_password, keepalive, ssl, ssl_params, callback): wlan = network.WLAN(network.STA_IF) wlan.active(True) # Connect to the network wlan.connect(ssid, ssid_password) # Wait for Wi-Fi connection connection_timeout = 10 while connection_timeout > 0: if wlan.status() >= 3: break connection_timeout -= 1 print('Waiting for Wi-Fi connection...') sleep(1) # Check if connection is successful if wlan.status() != 3: self.is_error = True return else: print('Connection successful!') network_info = wlan.ifconfig() print('IP address:', network_info[0]) self.is_error = False print(mqtt_user) print(mqtt_password) try: self.client = MQTTClient(client_id, server, port, mqtt_user, mqtt_password, keepalive, ssl, ssl_params) self.client.connect() self.client.set_callback(callback) except Exception as e: print('Error connecting to MQTT:', e) raise # Re-raise the exception to see the full traceback def subscribe(self, topics): while True: self.client.subscribe(topics, 1);
I2Cについてはまた今度.