Build a Real Time Data Streaming System with Amazon Kinesis Data Streams

Kevin Kiruri
8 min readDec 28, 2023

--

Let’s talk streaming data. You want to read data and stream it into another program or different uses for gain. Your data suddenly comes to life and can be used to do amazing things. You could even launch a rocket using a text file if you wanted. Here, your imagination is your limit. Let’s dive into a step-by-step journey on how to stream data on AWS. Let’s dig in.

Prerequisites

  1. Have an AWS account. If you don’t have one, sign up here and enjoy the benefits of the Free-Tier Account
  2. View project files

Creating a Kinesis data stream

  1. Search for kinesis on the Services search bar and select the Kinesis service

2. Click on Create data stream

3. Set the following configurations:

  • Data stream name: learn-data-stream
  • Capacity mode: Provisioned
  • Provisioned shards: 1

4. Scroll down and click on Create data stream

5. Once the data stream page, select the Configuration tab. Scroll down to Encryption and click on Edit

6. Check Enable server-side encryption and select Use AWS managed CMK. Click on Save changes

Create an S3 Bucket

  1. Search for S3 on the Services search box
  2. click on Create bucket

3. Give the bucket a unique name. (Remember the bucket name should be globally unique). I have named mine learning-datasource-12222023

4. scroll down and Enable Bucket versioning

5. Scroll down and click on Create bucket

Create Lambda role

  1. Navigate to the IAM service console

2. Select Policies on the left-hand menu then click on Create policy

3. Under Specify permissions select JSON then enter the following JSON template

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "kinesisAccess",
"Effect": "Allow",
"Action": [
"kinesis:*"
],
"Resource": "*",
"Condition": {
"StringEquals": {
"aws:RequestedRegion": "us-east-1"
}
}
},
{
"Effect": "Allow",
"Action": [
"logs:*"
],
"Resource": "arn:aws:logs:*:*:*"
},
{
"Effect": "Allow",
"Action": [
"s3:*"
],
"Resource": "arn:aws:s3:::*"
}
]
}

4. Scroll to the bottom and click Next

5. Give the policy a name (I named mine lambda_datastream )

6. Scroll down and click on Create policy

7. Select Roles on the left-hand menu and click on Create role

8. Set the following configurations then click on Next at the bottom.

  • Trusted entity type: AWS Service
  • Use case: Lambda

9. Under Add permissions search for the policy you created and check it when it appears, then click Next

10. Give the role a name. (I named mine lambda_datastream )

11. Scroll down and click on Create role

Creating a Producer Lambda Function

  1. Search for Lambda in the Services search box and select the Lambda service

2. Click on Create function

3. Set the following configuration:

  • Create function: Author from scratch
  • Name: Producer
  • Runtime: Node.js 14.x
  • Execution role: Use an existing role , then select lambda_datastream

4. Scroll to the bottom and click on Create function

5. On the Code tab, replace the existing code in the index.js file with the following:

const AWS = require('aws-sdk');         
AWS.config.update({
region: 'us-east-1'
})
const s3 = new AWS.S3();
const kinesis = new AWS.Kinesis();
exports.handler = async (event) => {
console.log(JSON.stringify(event));
const bucketName = event.Records[0].s3.bucket.name;
const keyName = event.Records[0].s3.object.key;
const params = {
Bucket: bucketName,
Key: keyName
}
await s3.getObject(params).promise().then(async (data) => {
const dataString = data.Body.toString();
const payload = {
data: dataString
}
await sendToKinesis(payload, keyName);
}, error => {
console.error(error);
})
};
async function sendToKinesis(payload, partitionKey) {
const params = {
Data: JSON.stringify(payload),
PartitionKey: partitionKey,
StreamName: 'learn-data-stream'
}
await kinesis.putRecord(params).promise().then(response => {
console.log(response);
}, error => {
console.error(error);
})
}

6. Click on Deploy to save the function

Create an event notification

  1. Navigate to S3
  2. Click on the bucket we created and select the Properties tab

3. Scroll down to the Event notifications section and click on Create event notification .

4. Under Create an event notification set the following:

  • Event name: upload-event
  • Prefix: default (blank)
  • Suffix: .txt

5. Under Event types select All object create events

6. Under Destination select Lambda function and choose the producer function from your list of functions and click on Save changes

7. This will trigger the producer function each time an object is uploaded to the bucket

Create the consumer lambda functions

  1. Set up a lambda environment with the Node.js 14.x runtime and name the function Consumer1 and the lambda_datastream role

2. Under the code section, replace the existing code with the following code and Deploy to save the changes

exports.handler = async (event) => {
console.log(JSON.stringify(event));
for (const record of event.Records) {
const data = JSON.parse(Buffer.from(record.kinesis.data, 'base64'));
console.log('consumer #1', data);
}
};

3. Move to the Configurations tab and select Triggers and click on Add trigger

4. Under Trigger configuration

  • Source: Kinesis
  • Kinesis stream: Kinesis/learn-data-stream
  • Check Activate trigger

5. Scroll to the bottom and click on Add

6. Create a second consumer function. Name it Consumer2 and use the following code with it. Remember to use the lambda_datastream role we created.

exports.handler = async (event) => {
console.log(JSON.stringify(event));
for (const record of event.Records) {
const data = JSON.parse(Buffer.from(record.kinesis.data, 'base64'));
console.log('consumer #2', data);
}
};

7. Move to the Configurations tab and select Triggers and click on Add trigger just like we did for Consumer1

8. Under Trigger configuration

  • Source: Kinesis
  • Kinesis stream: Kinesis/learn-data-stream
  • Check Activate trigger

Uploading a txt file to S3

  1. Using a text editor on your computer create a txt file. I created one with the following content:
Data Streaming exercise
We are learning how to stream data
This is a lot of fun!!!

2. Navigate to S3 and select the bucket we created earlier

3. Click on Upload

4. On the Upload page, click on Add files and select the txt file we created. Then click on Upload at the bottom of the page.

5. Click on Close to close the upload status page

Testing the configuration

Producer Function

  1. The producer function should get triggered upon upload of the txt file
  2. We test by checking the lambda functions logs
  3. Navigate to CloudWatch on the Services menu
  4. Under Logs , click on Log groups and search for the /aws/lambda/ . You should see logs from the 3 Lambda functions

5. Click on the /aws/lambda/Producer log group

6. You should see the Log stream at the bottom of the page. Click on it

7. On expanding the lo0gs, we can see that it was able to p[ick the object from the bucket

8. Then the data is sent to Kinesis .

Consumer1 Function

  1. Under the /aws/lambda/Consumer1 log streams, we can see that it was able to get the decoded data from Kinesis

2. The function then takes the data and reads it out

Consumer2 Function

  1. Viewing the /aws/lambda/Consumer2 log stream, we can see that the function is also able to read and decode data from Kinesis

Clean Up

  1. Delete the Kinesis Data stream
  2. Delete the txt file upload to S3
  3. Delete the created S3 bucket
  4. Delete the Producer, Consumer1 and Consumer2 functions
  5. Delete the lambda role created
  6. Delete the lambda policy that we had created

Conclusion

The blog showed us a simple guide on how to work wth data. May it serve as a guide to help you proceed in your learning journey. Thanks and see you in the next blog.

--

--