AWS IoT Core 入門1 ~AWS Lambdaとの連携~

AWS IoT Core 入門1 ~AWS Lambdaとの連携~

最近、AWSを使用する増えてきました。
数多あるAWSサービス、その中の一つ「AWS IoT Core」というものについて気になったので調べ、実際に他のAWSサービスと組み合わせて使ってみました。

AWS IoT Core とは?

AWS のページ ( https://aws.amazon.com/jp/iot-core/ ) によると、
「AWS IoT Core では、何十億もの IoT デバイスを接続し、何兆ものメッセージをインフラストラクチャを管理することなく、AWS のサービスにルーティングすることができます。 」
と書いてあります。……通信サービスなことは分かりますが、肝心なところはよくわからないですね。

使ってみた感触としての技術のキモは「(AWSとして) 安全な通信ができる!!」こと。

通常、デバイスからAWSサービスにアクセスするには boto3 (Pythonの場合) 等ミドルウェアを利用します。
その際、アクセス情報としてクライアントIDが必要となり、デバイス内で保持する必要がありますが、このクライアントIDが漏れると不正にAWSサービスにアクセスされたり大変なことになります。
1つのデバイス紛失が大問題に。

対して、AWS IoT Core による通信は pub/sub 形式のテキストメッセージのみですが、

  • ネットワーク証明書をデバイス内に持たせておいて通信 (クライアントIDほどセキュアな情報じゃない)
  • 接続権限情報は AWS 側で管理 (不正利用があったらクラウド側から切断できる)

ことでデバイスとの安全な通信が可能となります。
テキストメッセージは JSON を利用すれば自由な設計が可能です。

また、「IoT」とあるので、Raspberry Pi 等小型PCとの通信用途か?と思いましたが、実態は pub/sub 形式のテキストメッセージ通信なので、ネットワークつながって MQTT 等 IoT Core 対応プロトコルが利用できればデバイスはどんなものでも通信可能です。

利用シーン

上記の通り、pub/sub 形式の通信なので、簡単なメッセージの送受信に向いています。
仕組みの整備が必要になりますが、以下のようなことに活用できそうです。

  • 定期的にデバイスからログを受け付け、統計情報をクラウド側でまとめて通知する。
  • ハートビート通信による死活監視。
  • 複数台デバイスに対して定時処理を実行させる。etc.

今回実施すること

上記の通り、通信サービスであることを意識し、他のAWSサービス、AWS Lambdaと組み合わせた実行サンプルを作ってみます。
( AWS Lambda は簡単にスクリプトを実行できる機能ですね。各種イベントと紐づけて機能を呼び出すことができます。)

デバイス側でトピック ( sample/message01 ) を送信し、それを IoT Core を介して Lambda が受信。
Lambda は反応して別のトピック (sample/message02) を送信し、デバイスが受け付けます。

AWS IoT と AWS Lambda を組み合わせた実行イメージ

下準備

「デバイスとの通信」なので、何かしらのデバイス (PC) を用意する必要があります。外部に接続できるネットワークがあれば何でもOK。
ここでは、社内の PC 上に構築した docker container ( Ubuntu 20.04 ) を使用します。接続プロトコルは MQTT (& Python) を利用。

AWS IoT Core 用のMQTTが使いやすいモジュールが提供されているのでインストールしておきます。

pip install awscrt
pip install awsiotsdk

AWS IoT Core 設定

個別に設定することも可能なようですが、対話式で設定できるため、初めての場合は対話式で設定するとよさそうです。

AWS IoT Core設定画面サンプル

「mono_k_nagasawa_001」と名前を付けて進めると、接続キットがダウンロードできます。接続キットには

  • 証明書 (mono_k_nagasawa_001.cert.pem)
  • ルート証明書 (root-CA.crt)
  • プライベートキー (mono_k_nagasawa_001.private.key)

が含まれていて、これらを使って接続できます。( デバイス側に渡しておきます。)

接続設定が終わったら、次にポリシーの設定を行います。ここが通信における権限設定になっていて、デバイス名やトピック名に対して許可を与えることができます。

初期ではサンプル用のトピックが許可されていますが、今回用の message01 および message02 のみに変更しておきます。

ポリシーアクション内容
iot:Publish
iot:Receive
送受信可能トピック
iot:Subscribe受け取り可能トピック
iot:Connect接続可能デバイス名

ポイントは iot:Subscribe で、送受信の設定だけでは許可されず、別途 Subscribe (受信) 用フィルタ設定が必要でした。

また、今回は使用していませんが、それぞれ * のワイルドカードが使用可能とのこと。デバイス名に対して区別して送信を行うといった使い方ができそうです。

デバイス側設定

取得した証明書を使って通信できるよう、デバイス側のスクリプトを構築します。
Python の場合、

from awsiot import mqtt_connection_builder

として MQTT モジュールのビルダーを取得し、

mqtt_connection = mqtt_connection_builder.mtls_from_path(
    endpoint = endpoint,    # 接続エンドポイント
    cert_filepath = cert,    # 証明書
    pri_key_filepath = key,    # プライベートキー
    ca_filepath = root_ca,    # ルート証明書
    client_bootstrap = client_bootstrap,
    on_connection_interrupted = on_connection_interrupted,
    on_connection_resumed = on_connection_resumed,
    client_id = client_id,
    clean_session = False,
    keep_alive_secs = 30,
    http_proxy_options = proxy_options)

のように接続用インスタンスを得ることができます。

Publish (送信) は送信トピック名 (宛先) とペイロード (送信内容) を指定すれば OK。

mqtt_connection.publish(
    topic = topic,    # 送信先
    payload = payload,    # 送信内容
    qos = mqtt.QoS.AT_MOST_ONCE)

Subscribe (受信) は、受信後処理 (callback) を設定して呼び出しておくことで、受信タイミングで callback が呼び出されます。

def callback(topic, payload, dup, qos, retain, **kwargs):
    print(Received message from topic '{}': {}".format(topic, payload))
mqtt_connection.subscribe(
    topic = topic,    # 受信元
    callback = callback,    # 受信したとき呼び出される処理
    qos = mqtt.QoS.AT_MOST_ONCE)

送信機能の確認は AWS IoT Core のコンソールにて MQTT のテスト機能があるので、こちらで動作確認しながら進めると分かりやすいですね。

AWS IoP Core コンソール画面サンプル

AWS Lambda 側設定

AWS Lambda 側は Python + boto3 にて、イベントとして呼び出されたらトピック sample/message02 を返すように実装します。
boto3 から AWS IoT Core を呼び出すと、デバイス名を設定することができないのですが、簡単に呼び出せてトピックを投げられるので簡単ですね。

(コード一部)

import boto3

### IoT Core Topic send settings
topic = 'sample/message02'
endpoint = 'https://<endpoint>'    # NOTE: boto3 で使用する場合、endpoint は URL 形式で指定

### IoT Core
iot = boto3.client('iot-data', endpoint_url = endpoint)

iot.publish(
    topic = topic,    # 送り先
    qos = 1,
    payload = payload,    # 本文
)

受信トピックと AWS Lambda との紐づけ

AWS Lambda を作成したら、AWS IoT Core の「メッセージのルーティング」機能で紐づけを行います。
ルーティングの際、SQL の形でどのメッセージのフィルタリングを設定します。

例えば、メッセージが JSON 形式で

{
    "counter" : <番号>,
    "message": <テキストメッセージ>
}

という形の場合、

SELECT message FROM 'sample/message01' WHERE counter % 5 = 0

としたとき、counter が 5 の倍数の時だけメッセージを先のアクション (今回は AWS Lambda) に伝えます。

WHERE を外して全文を対象にしたり、SELECT * FROM ~としてすべてのデータを対象とすることも可能です。

送信対象トピックを決めたら、アクションとして作成したAWS Lambdaを設定すれば準備完了です。

AWS Lambda 設定サンプル

実行

すべての準備ができたら実行してみます。

デバイス側、コンソールログを確認すると、送信状態が確認できました。
更に、5回に1回、トピックがAWS Lambdaに届き、その返信の受信が確認できました。
( 0, 5, 10, … の  message01  送信タイミングの後に  message02  を受信。非同期なので間隔がきれいではないですが…。)

コンソールログ

AWS Lambda の方は、CloudWatch logs を確認し、同様に正常に動いていることが確認できました。

ログイベント画面サンプル

まとめ

AWS IoT Core の使い方を覚えて、AWS Lambda と組み合わせてみることができました。
簡単な通信の仕組みは構築することができたので、次回はこれをベースにデータをやり取りする仕組みを作ってみようと思います。

コード

最後に、今回使用した Python コードを張り付けておきます。

デバイス側

デバイス側のソースを表示
import datetime
import json
import os
import time

from awscrt import io, mqtt
from awsiot import mqtt_connection_builder

BASE_DIR = "connect_device_package"

ENDPOINT = "<AWS IoT Core endpoint name>"

# Public
CERT = os.path.join(BASE_DIR, '<cert file path>')

# Private
KEY = os.path.join(BASE_DIR, '<private key file path>')

# Root CA
ROOT_CA = os.path.join(BASE_DIR, 'root-CA.crt')

# 送信 Topic
p_topic = "sample/message01"

# 受信 Topic
s_topic = "sample/message02"

# JST
JST = datetime.timezone(datetime.timedelta(hours = +9), 'JST')

# Callback when connection is accidentally lost.
def on_connection_interrupted(connection, error, **kwargs):
    print("Connection interrupted. error: {}".format(error))

# Callback when an interrupted connection is re-established.
def on_connection_resumed(connection, return_code, session_present, **kwargs):
    print("Connection resumed. return_code: {} session_present: {}".format(return_code, session_present))


### MQTT
class Mqtt() :
    def __init__(self, endpoint, client_id, root_ca, cert, key) :
        event_loop_group = io.EventLoopGroup(2)
        host_resolver = io.DefaultHostResolver(event_loop_group)
        client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)

        proxy_options = None
      
        self.__mqtt_connection = mqtt_connection_builder.mtls_from_path(
            endpoint = endpoint,
            cert_filepath = cert,
            pri_key_filepath = key,
            ca_filepath = root_ca,
            client_bootstrap = client_bootstrap,
            on_connection_interrupted = on_connection_interrupted,
            on_connection_resumed = on_connection_resumed,
            client_id = client_id,
            clean_session = False,
            keep_alive_secs = 30,
            http_proxy_options = proxy_options)
        connect_future = self.__mqtt_connection.connect()
        connect_future.result()
        print(f"Connected! {client_id}")

    def __del__(self) :
        disconnect_future = self.__mqtt_connection.disconnect()
        disconnect_future.result()

     def publish(self, topic, payload) :
        self.__mqtt_connection.publish(
            topic = topic,
            payload = json.dumps(payload),
            qos = mqtt.QoS.AT_LEAST_ONCE)

    def subscribe(self, topic, callback) :
        sub_feature, packet_id = self.__mqtt_connection.subscribe(
            topic = topic,
            callback = callback,
            qos = mqtt.QoS.AT_LEAST_ONCE)
        return sub_feature, packet_id

##################################################
### Main                                       ###
##################################################

if __name__ == '__main__' :

    client_id = "lambda"
    client = Mqtt(ENDPOINT, client_id, ROOT_CA, CERT, KEY)

    # Subscribe
    def on_message_received(topic, payload, dup, qos, retain, **kwargs):
        print("Received message from topic '{}': {}".format(topic, payload))
    client.subscribe(s_topic, on_message_received)

    # 送信カウンタ
    counter = 0

    ### While Ctrl-C pressed
    try :

        # 1秒おきに data を publish  
        while True :
            print(f"Send data ({counter})...")
            payload = {
                'timestamp': datetime.datetime.now(JST).strftime('%Y/%m/%d %H:%M:%S'),
                'counter': counter,
                'message': f'Hello {counter}',
            }
            client.publish(p_topic, payload)
            counter += 1
            time.sleep(1)

    except KeyboardInterrupt :
        pass

    # Clean
    del client
    print("finish")


AWS Lambda側

AWS Lambda側のソースを表示
import datetime
import json
import time

import boto3

### IoT Core Topic send settings
topic = 'sample/message02'
endpoint = '<AWS IoT Core ARN Endpoint URL>'

### IoT Core
iot = boto3.client('iot-data', endpoint_url = endpoint)

### JST
JST = datetime.timezone(datetime.timedelta(hours = +9), 'JST')

def lambda_handler(event, context):
    
    ### Print receive event
    print(event)  

    ### Send IoT Core Topic
    # NOTE: あくまで動作確認なので、カウンタおよびメッセージは固定。
    payload = {
        "timestamp": datetime.datetime.now(JST).strftime('%Y/%m/%d %H:%M:%S'),
        "counter": 0,
        "data": "Return message"
    }

        iot.publish(
        topic = topic,
        qos = 1,
        payload = json.dumps(payload, ensure_ascii = False)
    )
    time.sleep(1)
    
    return "OK"