ご無沙汰しております!園部です。
「IoTのためのクラウドシステム構築」の続きです。今回はセンサデータに基づいたメール通知をやっていきます!
◇ 全体の流れ
- SNS(Simple Notification Service)のトピック、サブスクリプションを設定をする
- Lambda関数を作成する
- IoTCoreのルールを設定する
- Lambda関数でSNSへデータを連携するプログラムを実装する
- センサデータをAWSに送るプログラムを作成し、実行する
※ 前提としてAWSアカウントは持っているものとします
SNSはシンプルなメッセージングサービスです!
Lambdaはサーバーレスコンピューティングにより手軽にコード実行ができるサービスです!
詳しくはAWSのページをご覧ください。
SNS:https://aws.amazon.com/jp/sns/
Lambda:https://aws.amazon.com/jp/lambda/
1.SNS(Simple Notification Service)のトピック、サブスクリプションを設定をする
1-1.トピックの作成
- SNSの画面に行き、左メニューからトピックを選ぶ
- トピックの作成ボタンを押下
- トピックのタイプは”スタンダード”を選択し適当な名前を付ける
- “トピックの作成”ボタンを押下
- 記入したアドレスに確認メールが届くので、確認する
1-2.サブスクリプションの作成
-
- 作成したトピックのページを開き、”サブスクリプションの作成”ボタンを押下
- プロトコルの選択リストからEメールを選択
- エンドポイントのテキストボックスに、メールの送り先アドレス(自分が確認できるアドレス)を入力
- サブスクリプションの作成ボタンを押下
サブスクリプションの作成
2.Lambda関数の作成
- Lambda画面を開き、左メニューから関数を選択
- 関数の作成ボタンを押下
- 一から作成を選択した状態で、関数名を入力。ランタイムのリストからpythonを選択(今回は3.9)し、アクセス権限の”デフォルトの実行ロールの変更”で既存のロールを使用するを選ぶ。既存のロールリストから適当なロールを選択(SNSとIoTCoreへのアクセス権限があるもの)
- 関数の作成ボタンを押下
3.IoTCoreのルールを設定する
- 左のメニューを開いて”メッセージのルーティング”>”ルール”を押下
- ”ルールを作成”ボタンを押下
- ”ルール名”を適当に入力し、”次へ”を押下
- ”SQLステートメント”に「SELECT * FROM ’トピック名’」を記入
- ”ルールアクション”で”Lambda”を選択
- 関数名リストから自分が作成したLambda関数を選択し、”次へ”を押下
- “ルールの作成”ボタンを押下
Lambda関数プログラム実装
4.Lambda関数でSNSへデータを連携するプログラムを実装する
- Lambda画面でコードを選択し、コードソースから”lambda_function.py”を選択する。(lambda_handlerメソッド内が実際に実行される)
- 実行プログラムを作成する(下記参考)
- FileからSaveを選び保存。Deployボタンで実装
- 作成ボタンを押しルール作成完了
- 設定タブを選択し、環境変数を開く
- 環境変数の追加ボタンを押下し、キーに”Alert_Trigger”、値に”True”を入力し保存ボタンを押下(下記プログラムを使った場合)
■参考コード
import boto3
import os
from botocore.exceptions import ClientError
TOPIC_ARN = u'arn:aws:sns:us-west-2:261265052204:IoT_train_topic'#自分のSNSTopicを代入する
sns = boto3.client('sns')
iot = boto3.client('iot-data')
temp_threshold =29
# センサデータがしきい値を超え続けた時の、メール連投防止トリガー
# Lambdaコンソール画面の設定から環境変数でトリガーを設定しておく。Alert_Triggerはstring
alert_trigger = os.environ["Alert_Trigger"]
# メールを設定し、送信する
def send_mail(arn,temp,time):
# 本文
msg = (u'室温が'+str(temp_threshold)+u'℃を超えています。\n'+str(time) +'現在の室温は'+str(temp)+u'℃です。')
# 件名
subject = u'暑い!!!'
request = {
'TopicArn': arn,
'Message': msg,
'Subject': subject
}
sns.publish(**request)
def lambda_handler(event, context):
global alert_trigger
print(alert_trigger)
print(event)
sensor = event['sensor']
temp = sensor['temp']
time_stamp = event['timestamp']
if temp >= temp_threshold:
# トリガーがTrueの場合のみ、メールを送る
if alert_trigger == "True":
try:
send_mail(TOPIC_ARN,temp,time_stamp)
alert_trigger = "False"
os.environ["Alert_Trigger"] = alert_trigger
except ClientError as e:
print(e.response['Error']['Message'])
# しきい値を下回った時、トリガーを戻す
else:
alert_trigger = "True"
os.environ["Alert_Trigger"] = alert_trigger
5.センサデータをAWSに送るプログラムを作成し、実行する
■ 実行プログラムの作成
#coding: utf-8
from bme280_get_data import BME280
import clientConfig as client
import time
from awsiot import mqtt5_client_builder
from awscrt import mqtt5, http
from concurrent.futures import Future
sensor = BME280()
# MQTT5 クライアントの設定
TIMEOUT = 100
clientName = client.clientName
endpoint_path = client.endpoint_path
amazonroot_path = client.amazonroot_path
privatekey_path = client.privatekey_path
certificate_path = client.certificate_path
# サブスクライブしたメッセージを受信したときのコールバック
def on_publish_received(publish_packet_data):
publish_packet = publish_packet_data.publish_packet
assert isinstance(publish_packet, mqtt5.PublishPacket)
print("Received message from topic '{}': {}".format(publish_packet.topic, publish_packet.payload))
# ライフサイクルイベント: 停止
def on_lifecycle_stopped(lifecycle_stopped_data: mqtt5.LifecycleStoppedData):
print("Lifecycle Stopped")
# ライフサイクルイベント: 接続成功
def on_lifecycle_connection_success(lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData):
print("Lifecycle Connection Success")
# ライフサイクルイベント: 接続失敗
def on_lifecycle_connection_failure(lifecycle_connection_failure: mqtt5.LifecycleConnectFailureData):
print("Lifecycle Connection Failure")
print("Connection failed with exception: {}".format(lifecycle_connection_failure.exception))
#クライアント情報を設定する
def set_mqttclient(name=clientName,endpoint=endpoint_path,root=amazonroot_path,key=privatekey_path,certificate=certificate_path):
mqtt5_client = mqtt5_client_builder.mtls_from_path(
endpoint=endpoint_path,
cert_filepath=certificate_path,
pri_key_filepath=privatekey_path,
ca_filepath=amazonroot_path,
on_publish_received=on_publish_received,
on_lifecycle_stopped=on_lifecycle_stopped,
on_lifecycle_connection_success=on_lifecycle_connection_success,
on_lifecycle_connection_failure=on_lifecycle_connection_failure,
client_id=clientName,
clean_start=True,
keep_alive_secs=30
)
return mqtt5_client
if __name__ == '__main__':
mqtt5_client = set_mqttclient()
try:
print("Connecting to endpoint...")
mqtt5_client.start()
try:
while True:
messageJson = sensor.getMessage()
print(f"Publishing message: {messageJson}")
publish_future = mqtt5_client.publish(mqtt5.PublishPacket(
topic="your topic",
payload=messageJson.encode(),
qos=mqtt5.QoS.AT_LEAST_ONCE
))
publish_completion_data = publish_future.result(TIMEOUT)
print("Publish successful. PUBACK: {}".format(publish_completion_data.puback.reason_code)) #pubbackは0が返れば成功
time.sleep(5)
except Exception as e:
print(f"Publish failed: {e}")
except KeyboardInterrupt:
pass
※ インポートされている”clientConfig”は前々回記事の物と同じです。
このようなメールが届くはずです!!
以上、AWS LambdaとAmazon Simple Notification Serviceを組み合わせて、センサデータに基づくメール通知アプリケーションを作ってみました!!
センサーデータからメールで遠隔地にアラート、最近浸透してきているシステムっぽいですよね!
次回はいよいよ、センサデータを使ってアクチュエータ制御をしていきます!