The Internet of Things on AWS – Official Blog
Connected vehicles telemetry – Processing Protobuf messages with AWS IoT Core
Introduction
In connect vehicles applications, telemetry data is usually very extensive, containing structure and unstructured data. To send data over to the Cloud you can use Protocol Buffers (Protobuf – binary format). Protobuf provides the application with an efficient yet well structured compressing mechanism. The built-in protocol documentation makes data serialization and deserialization more manageable than JavaScript object notation (JSON). However, producer and consumer must operate on a defined shared schema to encode and decode it properly.
In this blog we will cover the best practices while using Protobuf to encoding and decoding. You will also learn step-by-step how to use AWS IoT Core and AWS Lambda to ingest and process Protobuf for consumption
Solution Architecture
Figure 1. Architecture diagram
Solution overview
- You will simulate a connected car and authenticate it to AWS IoT Core. The device will first encode the payload and send it over Message Queuing Telemetry Transport (MQTT) to AWS IoT Core
- Once the message is received by AWS IoT Core you will use AWS IoT Rule which will invoke an AWS Lambda function in order to decode the payload.
- The rule sends messages to Amazon Kinesis Data Firehouse and then stores it in Amazon S3
- Each time a new file is written on Simple Storage Service (Amazon S3), AWS Glue Crawler will crawl the data to infer the schema and make it available in the AWS Glue Data Catalog.
- We will the use Amazon Athena to do Ad-hoc querying an visualize it in Amazon quick sight.
AWS IoT Core
AWS IoT Core securely connects your simulated IoT device and routes the encoded messages to AWS services without managing the underlying infrastructure. You can then use rules for AWS IoT to decode your payload data and forward it to Amazon Kinesis Data Firehose.
Amazon Kinesis Data Firehose
Amazon Kinesis Data Firehose captures the incoming data from your rule for AWS IoT and load it as batch in parquet format in our Amazon S3 Bucket.
Amazon S3
Amazon S3 serves as a data lake for your data that you can use for further analysis and visualization.
AWS Glue
The AWS Glue Data Catalog is your persistent store for the metadata (e.g., schema and location of the data). It is a managed service that lets you store, annotate, and share metadata in the AWS Cloud.
For writing files to Amazon S3, you can use AWS Glue crawler to scan data, classify it, perform schema extractions, and store the metadata automatically in the AWS Glue Data Catalog.
Amazon Athena
Amazon Athena uses the metadata classification from AWS Glue Data Catalog to perform ad-hoc queries on the data.
Amazon QuickSight
You can visualize your data and build a custom dashboard using Amazon QuickSight
Solution Walkthrough
Pre-requisite
- You need a PC with a web browser, preferably with the latest version of Chrome / FireFox
- You must have access to an AWS account with Administrator Access privileges
- If you don’t have an AWS Account follow the instructions to create one.
- You will use Cloud formation template to create the setup environment and you can delete the environment once done
- Following AWS services will be used:
- AWS IoT Core
- Amazon Kinesis Data Firehose
- Amazon S3
- AWS Glue
- Amazon Athena
- Amazon QuickSight
- Amazon Cloud9
Setup solution
Creating and setup AWS Cloud9 environment
Use the following link to setup the test environment using AWS Cloud9 for this blog AWS IoT Device Client Workshop (IoT quickstart) (workshops.aws). You may pick any region close to your location.
Setup AWS IoT Thing and SDK
Open Cloud9 terminal and let’s setup Python SDK for us to use.
Create the folder you will use to connect the IoT thing using Cloud9 terminal window.
mkdir -p /home/ubuntu/environment/protobuf-python-aws-iot-device/certs
cd /home/ubuntu/environment/protobuf-python-aws-iot-device/
Setup the dependencies:
copy and paste the following requirements.txt
AWSIoTPythonSDK==1.5.2
numpy==1.19.5
protobuf==3.19.4
and then run the following:
python3 -m venv venv
source ./venv/bin/activate
pip install -r requirements.txt
deactivate
Setup your AWS IoT Thing follow steps outlined here.
Once we have created the thing let’s upload these certificates in our Cloud9 instance for us to connect from there.
Upload the newly created certificates and RootCA into ‘certs’ folder created earlier.
Device and Schema
Here is the Protobuf schema that we will use. Create file the following file automotive.proto
file and copy and paste the following content.
syntax = "proto2";
package automotive;
message Automotive {
required float battery_level = 1;
required float battery_health = 2;
required float battery_discharge_rate = 3;
required float wheel_rpm = 4;
required float mileage_left = 5;
}
You will need to compile and generate the appropriate library, here is the corresponding file you can use and save into following file automotive_pb2.py
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: automotive.proto
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10\x61utomotive.proto\x12\nautomotive\"\x84\x01\n\nAutomotive\x12\x15\n\rbattery_level\x18\x01 \x02(\x02\x12\x16\n\x0e\x62\x61ttery_health\x18\x02 \x02(\x02\x12\x1e\n\x16\x62\x61ttery_discharge_rate\x18\x03 \x02(\x02\x12\x11\n\twheel_rpm\x18\x04 \x02(\x02\x12\x14\n\x0cmileage_left\x18\x05 \x02(\x02')
_AUTOMOTIVE = DESCRIPTOR.message_types_by_name['Automotive']
Automotive = _reflection.GeneratedProtocolMessageType('Automotive', (_message.Message,), {
'DESCRIPTOR' : _AUTOMOTIVE,
'__module__' : 'automotive_pb2'
# @@protoc_insertion_point(class_scope:automotive.Automotive)
})
_sym_db.RegisterMessage(Automotive)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_AUTOMOTIVE._serialized_start=33
_AUTOMOTIVE._serialized_end=165
# @@protoc_insertion_point(module_scope)
Let’s create our file that will execute our device simulation. Copy and paste the following content in a file named main.py
'''
/*
* Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
'''
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import logging
import time
import argparse
import json
import automotive_pb2
import numpy as np
AllowedActions = ['both', 'publish', 'subscribe']
# Custom MQTT message callback
def customCallback(client, userdata, message):
print("Received a new message: ")
print(message.payload)
print("from topic: ")
print(message.topic)
print("--------------\n\n")
# Read in command-line parameters
parser = argparse.ArgumentParser()
parser.add_argument("-e", "--endpoint", action="store", required=True, dest="host", help="Your AWS IoT custom endpoint")
parser.add_argument("-r", "--rootCA", action="store", required=True, dest="rootCAPath", help="Root CA file path")
parser.add_argument("-c", "--cert", action="store", dest="certificatePath", help="Certificate file path")
parser.add_argument("-k", "--key", action="store", dest="privateKeyPath", help="Private key file path")
parser.add_argument("-p", "--port", action="store", dest="port", type=int, help="Port number override")
parser.add_argument("-w", "--websocket", action="store_true", dest="useWebsocket", default=False,
help="Use MQTT over WebSocket")
parser.add_argument("-id", "--clientId", action="store", dest="clientId", default="basicPubSub",
help="Targeted client id")
parser.add_argument("-t", "--topic", action="store", dest="topic", default="sdk/test/Python", help="Targeted topic")
parser.add_argument("-m", "--mode", action="store", dest="mode", default="both",
help="Operation modes: %s"%str(AllowedActions))
parser.add_argument("-M", "--message", action="store", dest="message", default="Hello World!",
help="Message to publish")
args = parser.parse_args()
host = args.host
rootCAPath = args.rootCAPath
certificatePath = args.certificatePath
privateKeyPath = args.privateKeyPath
port = args.port
useWebsocket = args.useWebsocket
clientId = args.clientId
topic = args.topic
if args.mode not in AllowedActions:
parser.error("Unknown --mode option %s. Must be one of %s" % (args.mode, str(AllowedActions)))
exit(2)
if args.useWebsocket and args.certificatePath and args.privateKeyPath:
parser.error("X.509 cert authentication and WebSocket are mutual exclusive. Please pick one.")
exit(2)
if not args.useWebsocket and (not args.certificatePath or not args.privateKeyPath):
parser.error("Missing credentials for authentication.")
exit(2)
# Port defaults
if args.useWebsocket and not args.port: # When no port override for WebSocket, default to 443
port = 443
if not args.useWebsocket and not args.port: # When no port override for non-WebSocket, default to 8883
port = 8883
# Configure logging
logger = logging.getLogger("AWSIoTPythonSDK.core")
logger.setLevel(logging.DEBUG)
streamHandler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)
# Init AWSIoTMQTTClient
myAWSIoTMQTTClient = None
if useWebsocket:
myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId, useWebsocket=True)
myAWSIoTMQTTClient.configureEndpoint(host, port)
myAWSIoTMQTTClient.configureCredentials(rootCAPath)
else:
myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId)
myAWSIoTMQTTClient.configureEndpoint(host, port)
myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath)
# AWSIoTMQTTClient connection configuration
myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20)
myAWSIoTMQTTClient.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing
myAWSIoTMQTTClient.configureDrainingFrequency(2) # Draining: 2 Hz
myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10) # 10 sec
myAWSIoTMQTTClient.configureMQTTOperationTimeout(5) # 5 sec
# Connect and subscribe to AWS IoT
myAWSIoTMQTTClient.connect()
if args.mode == 'both' or args.mode == 'subscribe':
myAWSIoTMQTTClient.subscribe(topic, 1, customCallback)
time.sleep(2)
# Publish to the same topic in a loop forever
loopCount = 0
automotive = automotive_pb2.Automotive()
dataPointsSin = np.linspace(-np.pi, np.pi,100)
while True:
if args.mode == 'both' or args.mode == 'publish':
# 100 linearly spaced numbers
automotive.battery_level = abs(dataPointsSin[loopCount % 100]) / np.pi
automotive.battery_health = 100 - loopCount % 100
automotive.battery_discharge_rate = 4.8
automotive.wheel_rpm = 3000
automotive.mileage_left = loopCount % 100
message = bytearray(automotive.SerializeToString())
myAWSIoTMQTTClient.publish(topic, message, 1)
if args.mode == 'publish':
print('Published topic %s: %s\n' % (topic, message))
loopCount += 1
time.sleep(1)
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0.
Lambda layer
Create a lambda layer that will be used to store protobuf libraries, then execute the following command:
mkdir -p ./protobuf/python/custom
cd ./protobuf/python
pip3 install protobuf --target .
cd custom
cp ../../../automotive_pb2.py ./
echo 'custom' >> ../protobuf-*.dist-info/namespace_packages.txt
echo 'custom/demo_pb2.py' >> ../protobuf-*.dist-info/RECORD
echo 'custom' >> ../protobuf-*.dist-info/top_level.txt
cd ../../
zip -r protobuf.zip .
aws lambda publish-layer-version --layer-name protobuf --zip-file fileb://protobuf.zip --compatible-runtimes python3.8
Setup S3 bucket
We will in a first time create an S3 bucket where will store our data and which we will query it from. Create an S3 bucket, fill out a name, for the name will be “ACCOUNT-ID-connected-evs”, leave the rest as default. and click on create bucket. Please note your bucket name as we will reusing it through the whole this blog.
Figure 2.
Setup Kinesis data firehose
Create a delivery stream, the delivery stream will write the received data from the connected cars to Amazon S3. Select source Direct PUT, destination Amazon S3, and fill out a name, for me it will be: “my-delivery-stream-connected-evs“.
Figure 3.
In destination settings, select the S3 bucket that you previously created, as bucket prefix, fill out “raw“ and error prefix as ”errors“. Leave out the rest as default and wait few minutes before this completes.
Figure 4.
Setup AWS IoT Rule
Create the AWS IoT Rule, we will use the IoT rule during the lambda creation, please note your rule name. You need to select all data coming from the topic connected-cars/data, and then invoke the incoming data with a lambda function in order to decode the protobuf encoded payload. You first need to encode the binary string in base64. For the SQL statement please copy and paste the following, please replace ACCOUNT_ID with your account ID
Select Message Routing
- Select Rules
- Select Create rule
- Give Rule name (i.e. we are using “MyConnectedEVSRuleToFirehose”)
- Give Rule description
- Use the following query for the rule:
SELECT aws_lambda("arn:aws:lambda:us-east-1:ACCOUNT_ID:function:my-protobuf-decoder", {"data": encode(*, "base64"), "clientId": clientId()}) as payload, timestamp() as p_time FROM 'connected-cars/data'
- Select Next
- In Attach rule actions
- Select settings as per Figure 6
- Select Add action
- Select Next
- In Review and Create
- Select Create
Figure 5.
Figure 6.
Setup lambda
Create AWS Lambda function and give the same name as earlier when creating AWS IoT Rule. Pick Python 3.8 for runtime.
Figure 7.
After creating the layer for the protobuf part, please use the following code:
import json
import base64
from custom import automotive_pb2
print('Loading function')
def lambda_handler(event, context):
print("Received event: " + json.dumps(event, indent=2))
ret = {}
data = event["data"]
payload_data_decoded = base64.b64decode(data)
automotive = automotive_pb2.Automotive()
automotive.ParseFromString(payload_data_decoded)
elems = automotive.ListFields()
for elem in elems:
ret[elem[0].name] = elem[1]
ret["clientId"] = event["clientId"]
return ret
In the configuration tab and permissions, go to the resource-based policy and click on add permission. We need to add the necessary permission to allow the iot rule to invoke our function. when specifying the arn, please use the same name for the rule you created. Click on save.
Figure 8.
Finally, we will use the previously created layer, for that, go in the layer part and Select ‘Add a layer‘.
Figure 9.
Figure 10.
Protobuf decode/encode
Following JSON will be used for further encoding as Protobuf binary message.
{
"battery_level": 100,
"battery_health" : 50,
"battery_discharge_rate" : 4.8,
"wheel_rpm" : 3000,
"mileage_left" : 88
}
Let’s publish our first message and check if everything is working:
Sample command:
Using certificate and AWS IoT thing created earlier, these certificates used in the parameter to send the message (replace xxxx with relevant values for your setup).
source ./venv/bin/activate
python3 main.py -e xxxx-ats.iot.us-east-1.amazonaws.com -c ./certs/xxxx-certificate.pem.crt -r ./certs/AmazonRootCA1.pem -t connected-cars/data -m eza -k ./certs/xxxx-private.pem.key --mode publish
Figure 11.
Go to your bucket after few minutes, you should see files being written.
Setup AWS Glue Crawler
We’re going to now create the Glue Crawler that will be responsible for creating and updating the schema of our data. You can create a crawler on the following link. For crawler name, mine will be: ‘my_connected_evs_crawler’.
For the Crawler source type pick Data stores, for Repeat crawls of S3 data stores pick Crawl all folders. In the Add Data store leave everything by default but for the include path select your S3 bucket and the raw folder. For me it will be s3://ACCOUNT_ID-connected-evs/raw. click on next. Do not add another datastore. Give a name to your role. For the frequency leave as default.
For Configure the crawler’s output, click on add database, add a database name my_connected_evs_db and leave the rest blank. Leave the rest as default and click next.
Select your crawler, and click on run your crawler. The status of your crawler should be showing starting, When the status of your crawler is stopping, go check your table in your database. You should see the following for your raw table:
Figure 12.
Setup Amazon Athena
Go to the Amazon Athena console, you can setup your Amazon Athena query results by following this link.
Select your database and table that you used for the crawler. Run the following query:
SELECT * FROM raw;
Figure 13.
Visualize data using Amazon QuickSight
To setup QuickSight, please follow this link.
In QuickSight, let’s first create our dataset. Click on Dataset on the left. The source of our dataset will be Amazon Athena that we used previously to preview our data. If you want to check the other sources that are supported, please follow the following link. Please note that in our case we use Amazon Athena for simplicity to do ad hoc querying and rapid dash-boarding.
Figure 14.
On the following screen, click on new dataset.
Figure 15.
Then click on Athena.
Figure 16.
Then give a name to your data source, for us, it will be: ‘my-evs-athena-data-source’. Make to sure to the validate connection. Then click on Create Data source.
Figure 17.
Chose the AwsDataCatalog and our db my_connected_evs_db and the raw table. Click on Use custom SQL.
Figure 18.
We will flatten the payload struct with the following query. Copy and paste the query and name the custom SQL query and click on Confirm query.
SELECT payload.battery_level as battery_level, payload.battery_health as battery_health, payload.battery_discharge_rate as battery_discharge_rate, payload.wheel_rpm as wheel_rpm, payload.mileage_left as mileage_left, p_time, payload.clientId as client_id FROM "my_connected_evs_db"."raw";
Figure 19.
Leave the rest by default and click on visualize.
Figure 20.
Here are some examples of visualization
Click on the lower left hand side on the table diagram and on each dimension. You should see your data in a table format.
Figure 21.
Conclusion
In this blog post we took a sample JSON payload and encoded into Protobuf binary format and sent it over to AWS IoT Core, where we deserialized and decoded using AWS Lambda.The data was then our data lake in Amazon S3 using Kinesis Datafirehose, finally using this data we visualized our telemetry data of this connected vehicle. By following this blog, you learned how you can compress, serialize and deserialize JSON dataset of connected vehicles. Using this methodology you can achieve lower latency along with compatibility between your end device and your consumer.
About the Authors
Syed Rehan is a Global Sr. Specialist Solutions Architect at Amazon Web Services and is based in London. He is covering global span of customers and supporting them as lead IoT Solution Architect. Syed has in-depth knowledge of IoT and cloud and works in this role with global customers ranging from start-up to enterprises to enable them to build IoT solutions with the AWS eco system. |
Kevin Polossat is a Solutions Architect at AWS. He works with customers in France to help them embrace and adopt the cloud. Outside of work, he enjoys wine and cheese. |