ある案件でRaspberry Pi Pico 2 Wを使うことになりました.前々から気になっていたのですが,なかなか手を出すまでには至らずでしたが,ある案件でいよいよ使うことになりました.開発環境はCでもPythonでもよかったのですが,勉強を兼ねてPythonを使うことにしました.通常のPythonとは異なり,MicroPythonで開発ということの様です.

Raspberry Pi PicoをBOOTモードで立ち上げるには,基板に備わるプッシュスイッチを押しながらパソコンとUSBケーブルでつなぐとよいようです.そうするとパソコン側からはRaspberry Pi Picoが外付けデバイスのように見えるので,そこへこちらのサイトから入手可能なuf2という拡張子のファイルをダウンロードし,そのファイルを外付けデバイスのように見えているRaspberry Pi Picoにコピーするだけでよいようです.あとはいったんUSBケーブルを取り外し,プッシュスイッチを押さずにUSBケーブルを接続するとPythonが動作させられる状況になります.

今回の目標は,Raspberry Pi Pico でMQTTをサブスクライブし,そのデータ内にある情報をもとにフルカラーLEDを制御するというものです.フルカラーLEDにはR,G,Bそれぞれの電圧を変更できるような可変レギュレータがあり,その可変レギュレータの電圧を決定するのに用いる抵抗として,ディジタルポテンショメータを使うというものです.このディジタルポテンショメータはI2Cで抵抗値を変えられることから,Raspberry Pi Picoを使ってディジタルポテンショメータをI2Cで制御することで,結果としてフルカラーLEDを制御することを実現します.

まずはMQTTのブローカです.今回,ブローカとしてRaspberry Pi 5を用いるため,こちらのサイトを参考にしました.

つぎにRaspberry Pi Pico 2 WにMQTTクライアントを作成します.まずはMQTTを使えなければならないため,こちらのサイトを参考にMQTTのライブラリを設定しました.行ったこととしては,Raspberry Pi Pico にmqttというフォルダを作成し,その中にsimple.pyとrobust.pyという2つのPythonプログラムを入れることです.実際,robust.pyの方は使わなくても動作しているようでした.そして,WiFiのアクセスポイントへの接続,MQTT Clientとしてブローカに接続,そしてサブスクライブする,というようなプログラムを書きました.下のプログラムはクラス化したものです.このクラスを使う時,サブスクライブした時点で呼び出される関数をコールバック関数として設定しておきます.あとはサブスクライブしたデータに基づき,I2Cで接続されたディジタルポテンショメータを制御すればよいはずです.

import network
from time import sleep
from umqtt.simple import MQTTClient

class Subscriber:
    def __init__(self, ssid, ssid_password, client_id, server, port, mqtt_user, mqtt_password, keepalive, ssl, ssl_params, callback):
        wlan = network.WLAN(network.STA_IF)
        wlan.active(True)

        # Connect to the network
        wlan.connect(ssid, ssid_password)

        # Wait for Wi-Fi connection
        connection_timeout = 10
        while connection_timeout > 0:
            if wlan.status() >= 3:
                break
            connection_timeout -= 1
            print('Waiting for Wi-Fi connection...')
            sleep(1)

        # Check if connection is successful
        if wlan.status() != 3:
            self.is_error = True
            return
        else:
            print('Connection successful!')
            network_info = wlan.ifconfig()
            print('IP address:', network_info[0])
            self.is_error = False
        
        print(mqtt_user)
        print(mqtt_password)
        try:
            self.client = MQTTClient(client_id,
                                server,
                                port,
                                mqtt_user,
                                mqtt_password,
                                keepalive,
                                ssl,
                                ssl_params)
            self.client.connect()
            self.client.set_callback(callback)
            
        except Exception as e:
            print('Error connecting to MQTT:', e)
            raise  # Re-raise the exception to see the full traceback
    def subscribe(self, topics):
        while True:
            self.client.subscribe(topics, 1);    



I2Cについてはまた今度.