IoTのためのクラウドシステム構築⑤

query_builder 2024/08/23
技術関連
IoTのためのクラウドシステム構築⑤

ご無沙汰しております!園部です。

「IoTのためのクラウドシステム構築」の続きです。今回はセンサデータに基づいたメール通知をやっていきます!

◇ 全体の流れ

  1. SNS(Simple Notification Service)のトピック、サブスクリプションを設定をする
  2. Lambda関数を作成する
  3. IoTCoreのルールを設定する
  4. Lambda関数でSNSへデータを連携するプログラムを実装する
  5. センサデータをAWSに送るプログラムを作成し、実行する

※ 前提としてAWSアカウントは持っているものとします

 

SNSはシンプルなメッセージングサービスです!

Lambdaはサーバーレスコンピューティングにより手軽にコード実行ができるサービスです!

詳しくはAWSのページをご覧ください。

SNS:https://aws.amazon.com/jp/sns/

Lambda:https://aws.amazon.com/jp/lambda/

 

1.SNS(Simple Notification Service)のトピック、サブスクリプションを設定をする

1-1.トピックの作成

 

  1. SNSの画面に行き、左メニューからトピックを選ぶ
  2. トピックの作成ボタンを押下
  3. トピックのタイプは”スタンダード”を選択し適当な名前を付ける
  4. “トピックの作成”ボタンを押下
  5. 記入したアドレスに確認メールが届くので、確認する

 



1-2.サブスクリプションの作成
    1. 作成したトピックのページを開き、”サブスクリプションの作成”ボタンを押下
    2. プロトコルの選択リストからEメールを選択
    3. エンドポイントのテキストボックスに、メールの送り先アドレス(自分が確認できるアドレス)を入力
    4. サブスクリプションの作成ボタンを押下

 

サブスクリプションの作成

 

 

 

2.Lambda関数の作成

  1. Lambda画面を開き、左メニューから関数を選択
  2. 関数の作成ボタンを押下
  3. 一から作成を選択した状態で、関数名を入力。ランタイムのリストからpythonを選択(今回は3.9)し、アクセス権限の”デフォルトの実行ロールの変更”で既存のロールを使用するを選ぶ。既存のロールリストから適当なロールを選択(SNSとIoTCoreへのアクセス権限があるもの)
  4. 関数の作成ボタンを押下

 

 

3.IoTCoreのルールを設定する

  1. 左のメニューを開いて”メッセージのルーティング”>”ルール”を押下
  2. ”ルールを作成”ボタンを押下
  3. ”ルール名”を適当に入力し、”次へ”を押下
  4. ”SQLステートメント”に「SELECT * FROM ’トピック名’」を記入
  5. ”ルールアクション”で”Lambda”を選択
  6. 関数名リストから自分が作成したLambda関数を選択し、”次へ”を押下
  7. “ルールの作成”ボタンを押下

 

Lambda関数プログラム実装

 

 

4.Lambda関数でSNSへデータを連携するプログラムを実装する

 

  1. Lambda画面でコードを選択し、コードソースから”lambda_function.py”を選択する。(lambda_handlerメソッド内が実際に実行される)
  2. 実行プログラムを作成する(下記参考)
  3. FileからSaveを選び保存。Deployボタンで実装
  4. 作成ボタンを押しルール作成完了
  5. 設定タブを選択し、環境変数を開く
  6. 環境変数の追加ボタンを押下し、キーに”Alert_Trigger”、値に”True”を入力し保存ボタンを押下(下記プログラムを使った場合)

 

 

 

 

■参考コード

 

import boto3
import os
from botocore.exceptions import ClientError

TOPIC_ARN = u'arn:aws:sns:us-west-2:261265052204:IoT_train_topic'#自分のSNSTopicを代入する
sns = boto3.client('sns')
iot = boto3.client('iot-data')
temp_threshold =29

# センサデータがしきい値を超え続けた時の、メール連投防止トリガー
# Lambdaコンソール画面の設定から環境変数でトリガーを設定しておく。Alert_Triggerはstring
alert_trigger = os.environ["Alert_Trigger"]

# メールを設定し、送信する
def send_mail(arn,temp,time):
# 本文
msg = (u'室温が'+str(temp_threshold)+u'℃を超えています。\n'+str(time) +'現在の室温は'+str(temp)+u'℃です。')
# 件名
subject = u'暑い!!!'

request = {
'TopicArn': arn,
'Message': msg,
'Subject': subject
}
sns.publish(**request)

def lambda_handler(event, context):
global alert_trigger
print(alert_trigger)
print(event)
sensor = event['sensor']
temp = sensor['temp']
time_stamp = event['timestamp']

if temp >= temp_threshold:
# トリガーがTrueの場合のみ、メールを送る
if alert_trigger == "True":
try:
send_mail(TOPIC_ARN,temp,time_stamp)
alert_trigger = "False"
os.environ["Alert_Trigger"] = alert_trigger
except ClientError as e:
print(e.response['Error']['Message'])
# しきい値を下回った時、トリガーを戻す
else:
alert_trigger = "True"
os.environ["Alert_Trigger"] = alert_trigger

 

 

 

5.センサデータをAWSに送るプログラムを作成し、実行する

■ 実行プログラムの作成

#coding: utf-8

from bme280_get_data import BME280
import clientConfig as client
import time
from awsiot import mqtt5_client_builder
from awscrt import mqtt5, http
from concurrent.futures import Future

sensor = BME280()

# MQTT5 クライアントの設定
TIMEOUT = 100
clientName = client.clientName
endpoint_path = client.endpoint_path
amazonroot_path = client.amazonroot_path
privatekey_path = client.privatekey_path
certificate_path = client.certificate_path

# サブスクライブしたメッセージを受信したときのコールバック
def on_publish_received(publish_packet_data):
publish_packet = publish_packet_data.publish_packet
assert isinstance(publish_packet, mqtt5.PublishPacket)
print("Received message from topic '{}': {}".format(publish_packet.topic, publish_packet.payload))

# ライフサイクルイベント: 停止
def on_lifecycle_stopped(lifecycle_stopped_data: mqtt5.LifecycleStoppedData):
print("Lifecycle Stopped")

# ライフサイクルイベント: 接続成功
def on_lifecycle_connection_success(lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData):
print("Lifecycle Connection Success")

# ライフサイクルイベント: 接続失敗
def on_lifecycle_connection_failure(lifecycle_connection_failure: mqtt5.LifecycleConnectFailureData):
print("Lifecycle Connection Failure")
print("Connection failed with exception: {}".format(lifecycle_connection_failure.exception))

#クライアント情報を設定する
def set_mqttclient(name=clientName,endpoint=endpoint_path,root=amazonroot_path,key=privatekey_path,certificate=certificate_path):
mqtt5_client = mqtt5_client_builder.mtls_from_path(
endpoint=endpoint_path,
cert_filepath=certificate_path,
pri_key_filepath=privatekey_path,
ca_filepath=amazonroot_path,
on_publish_received=on_publish_received,
on_lifecycle_stopped=on_lifecycle_stopped,
on_lifecycle_connection_success=on_lifecycle_connection_success,
on_lifecycle_connection_failure=on_lifecycle_connection_failure,
client_id=clientName,
clean_start=True,
keep_alive_secs=30
)
return mqtt5_client

if __name__ == '__main__':
mqtt5_client = set_mqttclient()
try:
print("Connecting to endpoint...")
mqtt5_client.start()

try:
while True:
messageJson = sensor.getMessage()
print(f"Publishing message: {messageJson}")

publish_future = mqtt5_client.publish(mqtt5.PublishPacket(
topic="your topic",
payload=messageJson.encode(),
qos=mqtt5.QoS.AT_LEAST_ONCE
))
publish_completion_data = publish_future.result(TIMEOUT)
print("Publish successful. PUBACK: {}".format(publish_completion_data.puback.reason_code)) #pubbackは0が返れば成功
time.sleep(5)
except Exception as e:
print(f"Publish failed: {e}")
except KeyboardInterrupt:
pass






※ インポートされている”clientConfig”は前々回記事の物と同じです。


 

このようなメールが届くはずです!!

 

 

 

以上、AWS LambdaとAmazon Simple Notification Serviceを組み合わせて、センサデータに基づくメール通知アプリケーションを作ってみました!!
センサーデータからメールで遠隔地にアラート、最近浸透してきているシステムっぽいですよね!

次回はいよいよ、センサデータを使ってアクチュエータ制御をしていきます!

----------------------------------------------------------------------
株式会社IIU AIMOSスタッフ
〒110-0008
東京都台東区池之端2-7-17 井門池之端ビル7F
Tel: 03-5814-5350  Fax: 03-3827-0682
Email: info@aimos-promotion.com
----------------------------------------------------------------------
modal_banner