ブログ

  • Raspberry Pi Pico 2 W で水切れ検知

    植木への水やり自体はスマートプラグで別途タイマー制御して、貯水タンクからポンプで吸い上げるようにしています。しかし、このタンクが見えない場所にあるので、タンク自体の水が枯渇しそうな場合に、タンクへの水の補充を LINE 通知する機能が欲しくなりました。

    ちょうど良い製品が無かったので Pico 2 W で作成することにしました。はじめての Pico なので sozorablog さんにご教示いただきながら、試行錯誤で以下の配線にしました。

    ご参考として main.py の Python コードも載せておきます。5 秒おきにセンサーの値を読み出して、変化があったら LINE Messaging API で通知しているだけです。2025/3/31 に LINE Notify API がサービス終了してしまったので Messaging API を使用します。

    同じ通知でも対処せずに放置したら再送したい (水不足のメッセージが来ないから大丈夫と思っていたら監視システムが止まっていたとかは困るな) とか考えて、少し余計な制御を入れています。

    消費電力の節約のため、メッセージ送信する時だけ WiFi に接続して、送信後は切断します。Deep Sleep が Pico 2 では、まだ使用できないようなので Deep Sleep 対応はファームウェアの更新を待ちたいと思います。

    WiFi アクセスポイントが停止しているような場合は例外がスローされるので再試行されます。しかし、動作テスト中に ENOMEM (メモリ不足) の OS エラーが出ることがありました。このような場合は、再試行では回復できないケースが多いです。このため、例外を捕捉した回数を累積して、上限に到達したら Pico 自体を強制リセットするようにしています。

    また、短時間で再試行が連続して Pico や WiFi アクセスポイントに過度な負担を掛けないように、例外発生時の待機時間は Exponential Backoff で 5 秒, 10 秒, 20 秒 … (最大で約 21 分) のように 2 倍ずつ延ばすようにしています。

    尚、センサーの設置条件に依ると思いますが、水面がセンサーの高さとスレスレの場合や、水面が揺れると、短時間で連続して通知が来てしまうので、HISTORY_LENGTH で指定した回数だけ同じ結果が連続で得られた場合だけ、安定したセンサーの結果として採用するようにしています。

    main.py
    import machine
    import network
    import ujson
    import urequests
    import utime
    
    import config
    
    # Seconds in a day
    SECONDS_IN_A_DAY = 60 * 60 * 24
    
    # Detect interval (sec)
    INTERVAL_SEC = 5
    
    # Number of times to check if the same result is obtained consecutively
    HISTORY_LENGTH = 5
    
    # Number of times an exception occurs before reset
    MAX_EXCEPTIONS = 9
    
    # Embeded LED
    led = machine.Pin("LED", machine.Pin.OUT)
    
    # Wireless LAN
    wlan = network.WLAN(network.STA_IF)
    
    # Liquid level sensor
    sensor = machine.Pin(28, machine.Pin.IN, machine.Pin.PULL_UP)
    
    
    def console_print(text):
        """Print to console.
    
        Args:
            text (str): message.
        """
        print(f"{utime.time()}: {text}")
    
    
    def connect_wifi():
        """Connect to wifi network.
    
        Raises:
            RuntimeError: Network connect failure.
        """
        if wlan.status() == 3:
            return
        console_print("Connecting ...")
        led.on()
        wlan.active(True)
        wlan.connect(config.WLAN_SSID, config.WLAN_PASSWORD)
        count = 30
        while 0 < count:
            led.toggle()
            if wlan.status() < 0 or 3 <= wlan.status():
                led.off()
                wlan_status = wlan.ifconfig()
                console_print(f"Connected {wlan_status[0]}.")
                return
            count -= 1
            utime.sleep(0.5)
        led.off()
        raise RuntimeError("Connect failed.")
    
    
    def disconnect_wifi():
        """Disconnect from wifi network."""
        wlan.disconnect()
        wlan.active(False)
        console_print("Disconnected.")
    
    
    def send_message(message):
        """Send message using LINE Messaging API
    
        Args:
            message (str): message
        """
        connect_wifi()
        console_print(message)
        response = urequests.post(
            config.CHANNEL_ENDPOINT_URL,
            headers={
                "Content-Type": "application/json",
                "Authorization": f"Bearer {config.CHANNEL_ACCESS_TOKEN}",
            },
            data=ujson.dumps(
                {
                    "to": config.CHANNEL_USER_ID,
                    "messages": [{"type": "text", "text": message}],
                }
            ).encode("utf-8"),
        )
        response.close()
        disconnect_wifi
    
    
    def get_liquid_level():
        """Detect liquid level
    
        Returns:
            bool: Liquid detect status
        """
        return sensor.value() == 0
    
    
    # Initialize
    led.off()
    results = []
    last_message = (None, 0)
    exceptions = 0
    
    # Loop
    while True:
        try:
            # Get current result
            current_result = get_liquid_level()
            led.value(current_result)
    
            # Update results
            results.append(current_result)
            results = results[-HISTORY_LENGTH:]
    
            # Check continuity
            if len(set(results)) == 1:
    
                # Select message
                if current_result:
                    message = (
                        "タンクは水が入っています",
                        utime.time() + 7 * SECONDS_IN_A_DAY,
                    )
                else:
                    message = (
                        "タンクの水が不足しています!!",
                        utime.time() + 1 * SECONDS_IN_A_DAY,
                    )
    
                # Send message
                if last_message[0] != message[0] or last_message[1] < utime.time():
                    send_message(message[0])
                    last_message = message
    
            # Continue
            exceptions = 0
            utime.sleep(INTERVAL_SEC)
    
        except KeyboardInterrupt:
    
            # Break
            break
    
        except Exception as e:
    
            # Error
            console_print(e)
    
            # Check force reset
            exceptions += 1
            if MAX_EXCEPTIONS < exceptions:
                console_print("Reset.")
                machine.reset()
    
            # Continue
            retry_factor = 2 ** min(exceptions - 1, 8)
            retry_duration = INTERVAL_SEC * retry_factor
            console_print(f"Continue after {retry_duration} seconds.")
            utime.sleep(retry_duration)
    
    # Break
    console_print("Break.")

    別途 config.py というファイルに、以下のように環境固有の情報を定義して main.py と並べて Pico に保存します。

    config.py
    WLAN_SSID = "WiFi の SSID"
    WLAN_PASSWORD = "WiFi のパスワード"
    
    CHANNEL_ENDPOINT_URL = "https://api.line.me/v2/bot/message/push"
    CHANNEL_ACCESS_TOKEN = "自分のチャネルアクセストークン"
    CHANNEL_USER_ID = "自分の LINE のユーザ ID"

    Pico 面白い。

  • Raspberry Pi Pico ケース

    ラズパイピコ2で久しぶりの電子工作を楽しみ中です。ハンダごてを持ったのは何年ぶりだろう。

    折角、薄くて小さいピコなので、悩ましいのはケースです。ダイソーでこのケースを発見して即購入しました。旅行グッズのコーナーにありました。2個入りで100円です。

    USB コネクタがケースに収まらないので Micro USB からは給電出来ませんが、もともと 5V の AC アダプタを使用する計画だったのでシンデレラフィットです。

    ケースに穴を開けてケーブルを通す必要がありますが、良い感じです。結束バンドでケーブルの引っ張り力が端子に掛からないようにしました。

    ケースのヒンジの方に USB コネクタを向けてしまいました。この向きだと、USB コネクタにケーブルを挿すときは、蓋を外さないといけません。反対に入れていたら蓋を開けた隙間から USB 接続できたのに。

    私はケーブルをハンダ付けした後にボードの向きの失敗に気付いたので諦めました。メンテナンス時は蓋を外す事にします。同じケースを使う方はお気を付けください。

    Amazon で買った非接触の液面センサーが明日届くはずなので楽しみです。