Build a Real Time Data Streaming System with Amazon Kinesis Data Streams
Let’s talk streaming data. You want to read data and stream it into another program or different uses for gain. Your data suddenly comes to life and can be used to do amazing things. You could even launch a rocket using a text file if you wanted. Here, your imagination is your limit. Let’s dive into a step-by-step journey on how to stream data on AWS. Let’s dig in.
Prerequisites
- Have an AWS account. If you don’t have one, sign up here and enjoy the benefits of the Free-Tier Account
- View project files
Creating a Kinesis data stream
- Search for
kinesis
on the Services search bar and select the Kinesis service
2. Click on Create data stream
3. Set the following configurations:
- Data stream name:
learn-data-stream
- Capacity mode:
Provisioned
- Provisioned shards:
1
4. Scroll down and click on Create data stream
5. Once the data stream page, select the Configuration
tab. Scroll down to Encryption
and click on Edit
6. Check Enable server-side encryption
and select Use AWS managed CMK.
Click on Save changes
Create an S3 Bucket
- Search for
S3
on the Services search box - click on
Create bucket
3. Give the bucket a unique name. (Remember the bucket name should be globally unique). I have named mine learning-datasource-12222023
4. scroll down and Enable
Bucket versioning
5. Scroll down and click on Create bucket
Create Lambda role
- Navigate to the IAM service console
2. Select Policies
on the left-hand menu then click on Create policy
3. Under Specify permissions
select JSON
then enter the following JSON template
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "kinesisAccess",
"Effect": "Allow",
"Action": [
"kinesis:*"
],
"Resource": "*",
"Condition": {
"StringEquals": {
"aws:RequestedRegion": "us-east-1"
}
}
},
{
"Effect": "Allow",
"Action": [
"logs:*"
],
"Resource": "arn:aws:logs:*:*:*"
},
{
"Effect": "Allow",
"Action": [
"s3:*"
],
"Resource": "arn:aws:s3:::*"
}
]
}
4. Scroll to the bottom and click Next
5. Give the policy a name (I named mine lambda_datastream
)
6. Scroll down and click on Create policy
7. Select Roles
on the left-hand menu and click on Create role
8. Set the following configurations then click on Next
at the bottom.
- Trusted entity type:
AWS Service
- Use case:
Lambda
9. Under Add permissions
search for the policy you created and check it when it appears, then click Next
10. Give the role a name. (I named mine lambda_datastream
)
11. Scroll down and click on Create role
Creating a Producer Lambda Function
- Search for
Lambda
in the Services search box and select the Lambda service
2. Click on Create function
3. Set the following configuration:
- Create function:
Author from scratch
- Name:
Producer
- Runtime:
Node.js 14.x
- Execution role:
Use an existing role
, then selectlambda_datastream
4. Scroll to the bottom and click on Create function
5. On the Code
tab, replace the existing code in the index.js
file with the following:
const AWS = require('aws-sdk');
AWS.config.update({
region: 'us-east-1'
})
const s3 = new AWS.S3();
const kinesis = new AWS.Kinesis();
exports.handler = async (event) => {
console.log(JSON.stringify(event));
const bucketName = event.Records[0].s3.bucket.name;
const keyName = event.Records[0].s3.object.key;
const params = {
Bucket: bucketName,
Key: keyName
}
await s3.getObject(params).promise().then(async (data) => {
const dataString = data.Body.toString();
const payload = {
data: dataString
}
await sendToKinesis(payload, keyName);
}, error => {
console.error(error);
})
};
async function sendToKinesis(payload, partitionKey) {
const params = {
Data: JSON.stringify(payload),
PartitionKey: partitionKey,
StreamName: 'learn-data-stream'
}
await kinesis.putRecord(params).promise().then(response => {
console.log(response);
}, error => {
console.error(error);
})
}
6. Click on Deploy
to save the function
Create an event notification
- Navigate to
S3
- Click on the bucket we created and select the
Properties
tab
3. Scroll down to the Event notifications
section and click on Create event notification
.
4. Under Create an event notification
set the following:
- Event name:
upload-event
- Prefix: default (blank)
- Suffix:
.txt
5. Under Event types
select All object create events
6. Under Destination
select Lambda function
and choose the producer
function from your list of functions and click on Save changes
7. This will trigger the producer function each time an object is uploaded to the bucket
Create the consumer lambda functions
- Set up a lambda environment with the
Node.js 14.x
runtime and name the functionConsumer1
and thelambda_datastream
role
2. Under the code section, replace the existing code with the following code and Deploy
to save the changes
exports.handler = async (event) => {
console.log(JSON.stringify(event));
for (const record of event.Records) {
const data = JSON.parse(Buffer.from(record.kinesis.data, 'base64'));
console.log('consumer #1', data);
}
};
3. Move to the Configurations
tab and select Triggers
and click on Add trigger
4. Under Trigger configuration
- Source:
Kinesis
- Kinesis stream:
Kinesis/learn-data-stream
- Check
Activate trigger
5. Scroll to the bottom and click on Add
6. Create a second consumer function. Name it Consumer2
and use the following code with it. Remember to use the lambda_datastream
role we created.
exports.handler = async (event) => {
console.log(JSON.stringify(event));
for (const record of event.Records) {
const data = JSON.parse(Buffer.from(record.kinesis.data, 'base64'));
console.log('consumer #2', data);
}
};
7. Move to the Configurations
tab and select Triggers
and click on Add trigger
just like we did for Consumer1
8. Under Trigger configuration
- Source:
Kinesis
- Kinesis stream:
Kinesis/learn-data-stream
- Check
Activate trigger
Uploading a txt file to S3
- Using a text editor on your computer create a txt file. I created one with the following content:
Data Streaming exercise
We are learning how to stream data
This is a lot of fun!!!
2. Navigate to S3 and select the bucket we created earlier
3. Click on Upload
4. On the Upload page, click on Add files
and select the txt file we created. Then click on Upload
at the bottom of the page.
5. Click on Close
to close the upload status page
Testing the configuration
Producer Function
- The producer function should get triggered upon upload of the txt file
- We test by checking the lambda functions logs
- Navigate to
CloudWatch
on the Services menu - Under
Logs
, click onLog groups
and search for the/aws/lambda/
. You should see logs from the 3 Lambda functions
5. Click on the /aws/lambda/Producer
log group
6. You should see the Log stream at the bottom of the page. Click on it
7. On expanding the lo0gs, we can see that it was able to p[ick the object from the bucket
8. Then the data is sent to Kinesis
.
Consumer1 Function
- Under the
/aws/lambda/Consumer1
log streams, we can see that it was able to get the decoded data from Kinesis
2. The function then takes the data and reads it out
Consumer2 Function
- Viewing the
/aws/lambda/Consumer2
log stream, we can see that the function is also able to read and decode data from Kinesis
Clean Up
- Delete the Kinesis Data stream
- Delete the txt file upload to S3
- Delete the created S3 bucket
- Delete the Producer, Consumer1 and Consumer2 functions
- Delete the lambda role created
- Delete the lambda policy that we had created
Conclusion
The blog showed us a simple guide on how to work wth data. May it serve as a guide to help you proceed in your learning journey. Thanks and see you in the next blog.