2024.08.23
技術関連

IoTのためのクラウドシステム構築⑤

IoTのためのクラウドシステム構築⑤

ご無沙汰しております!園部です。

「IoTのためのクラウドシステム構築」の続きです。今回はセンサデータに基づいたメール通知をやっていきます!

◇ 全体の流れ

  1. SNS(Simple Notification Service)のトピック、サブスクリプションを設定をする
  2. Lambda関数を作成する
  3. IoTCoreのルールを設定する
  4. Lambda関数でSNSへデータを連携するプログラムを実装する
  5. センサデータをAWSに送るプログラムを作成し、実行する

※ 前提としてAWSアカウントは持っているものとします

 

SNSはシンプルなメッセージングサービスです!

Lambdaはサーバーレスコンピューティングにより手軽にコード実行ができるサービスです!

詳しくはAWSのページをご覧ください。

SNS:https://aws.amazon.com/jp/sns/

Lambda:https://aws.amazon.com/jp/lambda/

 

1.SNS(Simple Notification Service)のトピック、サブスクリプションを設定をする

1-1.トピックの作成

 

  1. SNSの画面に行き、左メニューからトピックを選ぶ
  2. トピックの作成ボタンを押下
  3. トピックのタイプは”スタンダード”を選択し適当な名前を付ける
  4. “トピックの作成”ボタンを押下
  5. 記入したアドレスに確認メールが届くので、確認する

 

1-2.サブスクリプションの作成
    1. 作成したトピックのページを開き、”サブスクリプションの作成”ボタンを押下
    2. プロトコルの選択リストからEメールを選択
    3. エンドポイントのテキストボックスに、メールの送り先アドレス(自分が確認できるアドレス)を入力
    4. サブスクリプションの作成ボタンを押下

 

サブスクリプションの作成

 

 

 

2.Lambda関数の作成

  1. Lambda画面を開き、左メニューから関数を選択
  2. 関数の作成ボタンを押下
  3. 一から作成を選択した状態で、関数名を入力。ランタイムのリストからpythonを選択(今回は3.9)し、アクセス権限の”デフォルトの実行ロールの変更”で既存のロールを使用するを選ぶ。既存のロールリストから適当なロールを選択(SNSとIoTCoreへのアクセス権限があるもの)
  4. 関数の作成ボタンを押下

 

 

3.IoTCoreのルールを設定する

  1. 左のメニューを開いて”メッセージのルーティング”>”ルール”を押下
  2. ”ルールを作成”ボタンを押下
  3. ”ルール名”を適当に入力し、”次へ”を押下
  4. ”SQLステートメント”に「SELECT * FROM ’トピック名’」を記入
  5. ”ルールアクション”で”Lambda”を選択
  6. 関数名リストから自分が作成したLambda関数を選択し、”次へ”を押下
  7. “ルールの作成”ボタンを押下

 

Lambda関数プログラム実装

 

 

4.Lambda関数でSNSへデータを連携するプログラムを実装する

 

  1. Lambda画面でコードを選択し、コードソースから”lambda_function.py”を選択する。(lambda_handlerメソッド内が実際に実行される)
  2. 実行プログラムを作成する(下記参考)
  3. FileからSaveを選び保存。Deployボタンで実装
  4. 作成ボタンを押しルール作成完了
  5. 設定タブを選択し、環境変数を開く
  6. 環境変数の追加ボタンを押下し、キーに”Alert_Trigger”、値に”True”を入力し保存ボタンを押下(下記プログラムを使った場合)

 

 

 

 

■参考コード

 

import boto3
import os
from botocore.exceptions import ClientError

TOPIC_ARN = u'arn:aws:sns:us-west-2:261265052204:IoT_train_topic'#自分のSNSTopicを代入する
sns = boto3.client('sns')
iot = boto3.client('iot-data')
temp_threshold =29

# センサデータがしきい値を超え続けた時の、メール連投防止トリガー
# Lambdaコンソール画面の設定から環境変数でトリガーを設定しておく。Alert_Triggerはstring
alert_trigger = os.environ["Alert_Trigger"]

# メールを設定し、送信する
def send_mail(arn,temp,time):
    # 本文
    msg = (u'室温が'+str(temp_threshold)+u'℃を超えています。\n'+str(time) +'現在の室温は'+str(temp)+u'℃です。')
    # 件名
    subject = u'暑い!!!'
    
    request = {
        'TopicArn': arn,
        'Message': msg,
        'Subject': subject
    }
    sns.publish(**request)

def lambda_handler(event, context):
    global alert_trigger
    print(alert_trigger)
    print(event)
    sensor = event['sensor']
    temp = sensor['temp']
    time_stamp = event['timestamp']
    
    if temp >= temp_threshold:
        # トリガーがTrueの場合のみ、メールを送る
        if alert_trigger == "True":
            try:
                send_mail(TOPIC_ARN,temp,time_stamp)
                alert_trigger = "False"
                os.environ["Alert_Trigger"] = alert_trigger
            except ClientError as e:
                print(e.response['Error']['Message'])
    # しきい値を下回った時、トリガーを戻す
    else:
        alert_trigger = "True"
        os.environ["Alert_Trigger"] = alert_trigger

 

 

 

5.センサデータをAWSに送るプログラムを作成し、実行する

■ 実行プログラムの作成

#coding: utf-8

from bme280_get_data import BME280
import clientConfig as client
import time
from awsiot import mqtt5_client_builder
from awscrt import mqtt5, http
from concurrent.futures import Future

sensor = BME280()

# MQTT5 クライアントの設定
TIMEOUT = 100
clientName = client.clientName
endpoint_path = client.endpoint_path
amazonroot_path = client.amazonroot_path
privatekey_path = client.privatekey_path
certificate_path = client.certificate_path

# サブスクライブしたメッセージを受信したときのコールバック
def on_publish_received(publish_packet_data):
    publish_packet = publish_packet_data.publish_packet
    assert isinstance(publish_packet, mqtt5.PublishPacket)
    print("Received message from topic '{}': {}".format(publish_packet.topic, publish_packet.payload))

# ライフサイクルイベント: 停止
def on_lifecycle_stopped(lifecycle_stopped_data: mqtt5.LifecycleStoppedData):
    print("Lifecycle Stopped")

# ライフサイクルイベント: 接続成功
def on_lifecycle_connection_success(lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData):
    print("Lifecycle Connection Success")

# ライフサイクルイベント: 接続失敗
def on_lifecycle_connection_failure(lifecycle_connection_failure: mqtt5.LifecycleConnectFailureData):
    print("Lifecycle Connection Failure")
    print("Connection failed with exception: {}".format(lifecycle_connection_failure.exception))

#クライアント情報を設定する
def set_mqttclient(name=clientName,endpoint=endpoint_path,root=amazonroot_path,key=privatekey_path,certificate=certificate_path):
	mqtt5_client = mqtt5_client_builder.mtls_from_path(
    endpoint=endpoint_path,
    cert_filepath=certificate_path,
    pri_key_filepath=privatekey_path,
    ca_filepath=amazonroot_path,
    on_publish_received=on_publish_received,
    on_lifecycle_stopped=on_lifecycle_stopped,
    on_lifecycle_connection_success=on_lifecycle_connection_success,
    on_lifecycle_connection_failure=on_lifecycle_connection_failure,
    client_id=clientName,
    clean_start=True,
    keep_alive_secs=30
)
	return mqtt5_client

if __name__ == '__main__':
	mqtt5_client = set_mqttclient()
	try:
		print("Connecting to endpoint...")
		mqtt5_client.start()
		
		try:
			while True:
				messageJson = sensor.getMessage()
				print(f"Publishing message: {messageJson}")

				publish_future = mqtt5_client.publish(mqtt5.PublishPacket(
					topic="your topic",
					payload=messageJson.encode(),
					qos=mqtt5.QoS.AT_LEAST_ONCE
				))
				publish_completion_data = publish_future.result(TIMEOUT)
				print("Publish successful. PUBACK: {}".format(publish_completion_data.puback.reason_code)) #pubbackは0が返れば成功
				time.sleep(5)
		except Exception as e:
			print(f"Publish failed: {e}")
	except KeyboardInterrupt:
		pass






※ インポートされている”clientConfig”は前々回記事の物と同じです。

 

このようなメールが届くはずです!!

 

 

 

以上、AWS LambdaとAmazon Simple Notification Serviceを組み合わせて、センサデータに基づくメール通知アプリケーションを作ってみました!!
センサーデータからメールで遠隔地にアラート、最近浸透してきているシステムっぽいですよね!

次回はいよいよ、センサデータを使ってアクチュエータ制御をしていきます!

Contact

各サービスの詳しい資料のダウンロードや、
ご相談・お問い合わせ等お気軽にお問い合わせください。

資料ダウンロード一覧へ お問い合わせフォームへ