Testing AWS Lambda with Kinesis stream in local IDE
Recently worked on a projected where I migrated event publishing from RabbitMQ to Kinesis stream.
In this post I will discuss how we can create AWS Lambda project and test the Lambda logic in local IDE without deploying to the project in AWS cloud. In this post I will focus on developing and testing lambda logic in local IDE. For step-by-step process to set up the lambda and to consume Kinesis stream in AWS cloud, please visit the AWS developer page
Required software
- Java 8+
- Junit 5
- Maven 3.6+
- Eclipse
Create a Maven project
Let’s create Lambda project using Maven.To create a Maven project from the command line, open a terminal or command prompt window, enter or paste the following command, and then press Enter or Return.
mvn -B archetype:generate
-DarchetypeGroupId=software.amazon.awssdk -DarchetypeArtifactId=archetype-lambda -Dregion=US_WEST_2 -Dservice=s3 -DgroupId=com.sureshtech.aws -DartifactId=aws-kinesis-lambda
Code language: Python (python)
The above maven command creates skeleton project for AWS Lambda. Let’s import the created project into eclipse IDE using below steps.
Once we import project into Eclipse IDE , we make changes to source code as per our needs.
First we include service jars in the pom.xml file we are going to use. Since we are going to use Kinesis event steam as source of data I have included ‘aws-java-sdk-kinesis’ and ‘aws-lambda-java-events’ jars.
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>2.2.5</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
<version>1.11.959</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-core</artifactId>
<version>${aws.lambda.java.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4jVersion}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4jVersion}</version>
</dependency>
<!-- Test Dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit5.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.andreinc</groupId>
<artifactId>mockneat</artifactId>
<version>0.4.6</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>3.7.7</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>3.8.0</version>
<scope>test</scope>
</dependency>
</dependencies>
Code language: Java (java)
.The structure of the projects looks like below.
Now we will rename the App.java class to AwsKinesisLambda.java and modify the handleRequest( ) method to take the KinesisEvent object.
package com.sureshtech.aws;
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* Lambda function entry point. You can change to use other pojo type or implement
* a different RequestHandler.
*
* @see <a href=https://docs.aws.amazon.com/lambda/latest/dg/java-handler.html>Lambda Java Handler</a> for more information
*/
public class AwsKinesisLambda {
private static final Logger logger = LoggerFactory.getLogger(AwsKinesisLambda.class);
private ObjectMapper objectMapper = new ObjectMapper();
private EmployeeEventProcessor empEventProcessor;
public void setEmpEventProcessor(EmployeeEventProcessor empEventProcessor) {
this.empEventProcessor = empEventProcessor;
}
public AwsKinesisLambda() {
}
// @Override
public void handleRequest(KinesisEvent event, final Context context) {
List<KinesisEvent.KinesisEventRecord> records = Optional.ofNullable(event.getRecords()).orElse(emptyList());
List<EmployeeEvent> empEvents = records.stream().flatMap(this::convertKinesisRecordToEmployeeRecord).collect(toList());
int i = empEventProcessor.processEmployeeEvents(empEvents);
logger.info("No. of records processed :"+ i);
}
private Stream<EmployeeEvent> convertKinesisRecordToEmployeeRecord(KinesisEvent.KinesisEventRecord record){
try {
byte[] data = record.getKinesis().getData().array();
return Stream.of(objectMapper.readValue(data,EmployeeEvent[].class));
}catch(Exception e) {
throw new RuntimeException(e);
}
}
}
Code language: Java (java)
Testing AWS Lambda in local IDE
We have developed lambda which takes the Kinesis Stream as input . Now it is time for testing the logic in local IDE.
If you look at the handleRequest() method it takes 2 parameters “KinesisEvent” and “context”.
“context” is runtime object provided by AWS Lambda runtime. We simulate “Context” object by implementing the “com.amazonaws.services.lambda.runtime.Context” interface
package com.sureshtech.aws;
import com.amazonaws.services.lambda.runtime.ClientContext;
import com.amazonaws.services.lambda.runtime.CognitoIdentity;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
public class TestContext implements Context{
private String awsRequestId = "EXAMPLE";
private ClientContext clientContext;
private String functionName = "EXAMPLE";
private CognitoIdentity identity;
private String logGroupName = "EXAMPLE";
private String logStreamName = "EXAMPLE";
private LambdaLogger logger = new TestLogger();
private int memoryLimitInMB = 128;
private int remainingTimeInMillis = 15000;
public String getAwsRequestId() {
return awsRequestId;
}
public void setAwsRequestId(String value) {
awsRequestId = value;
}
public ClientContext getClientContext() {
return clientContext;
}
public void setClientContext(ClientContext value) {
clientContext = value;
}
public String getFunctionName() {
return functionName;
}
public void setFunctionName(String value) {
functionName = value;
}
public CognitoIdentity getIdentity() {
return identity;
}
public void setIdentity(CognitoIdentity value) {
identity = value;
}
public String getLogGroupName() {
return logGroupName;
}
public void setLogGroupName(String value) {
logGroupName = value;
}
public String getLogStreamName() {
return logStreamName;
}
public void setLogStreamName(String value) {
logStreamName = value;
}
public LambdaLogger getLogger() {
return logger;
}
public void setLogger(LambdaLogger value) {
logger = value;
}
public int getMemoryLimitInMB() {
return memoryLimitInMB;
}
public void setMemoryLimitInMB(int value) {
memoryLimitInMB = value;
}
public int getRemainingTimeInMillis() {
return remainingTimeInMillis;
}
public void setRemainingTimeInMillis(int value) {
remainingTimeInMillis = value;
}
@Override
public String getFunctionVersion() {
// TODO Auto-generated method stub
return new String("$LATEST");
}
@Override
public String getInvokedFunctionArn() {
// TODO Auto-generated method stub
return new String("arn:aws:lambda:us-east-2:12345678:function:AwsKinesisLambda");
}
}
Code language: Java (java)
Now let’s develop a AWSLambdaTest class to invoke the Lambda.In below code testLambdaLocaltest( )
method invokes actual logic of the lambda as we are not using the any mock objects.
package com.sureshtech.aws;
import static java.nio.ByteBuffer.wrap;
import static java.util.Collections.singletonList;
import static net.andreinc.mockneat.unit.objects.Reflect.reflect;
import static net.andreinc.mockneat.unit.types.Ints.ints;
import static org.mockito.Mockito.mock;
import java.io.ByteArrayOutputStream;
import java.util.List;
import java.util.zip.GZIPOutputStream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.fasterxml.jackson.databind.ObjectMapper;
import net.andreinc.mockneat.MockNeat;
import net.andreinc.mockneat.unit.objects.Reflect;
@ExtendWith(MockitoExtension.class)
public class AwsKinesisLambdaTest {
private ObjectMapper objectMapper = new ObjectMapper();
private AwsKinesisLambda lambda;
private EmployeeEventProcessor employeeEventProcessor;
@Test
public void testLambdaHandleRequest() throws Exception {
lambda = new AwsKinesisLambda();
Context testContext = new TestContext();
employeeEventProcessor = mock(EmployeeEventProcessor.class);
lambda.setEmpEventProcessor(employeeEventProcessor);
Mockito.when(employeeEventProcessor.processEmployeeEvents(ArgumentMatchers.anyList())).thenReturn(5);
lambda.handleRequest(createKinesisEvent(false), testContext);
Mockito.verify(employeeEventProcessor, Mockito.times(1)).processEmployeeEvents(ArgumentMatchers.anyList());
}
@Test
public void testLambdaLocaltest() throws Exception {
lambda = new AwsKinesisLambda();
Context testContext = new TestContext();
employeeEventProcessor = new EmployeeEventProcessor();
lambda.setEmpEventProcessor(employeeEventProcessor);
lambda.handleRequest(createKinesisEvent(false), testContext);
}
}
Code language: Java (java)
We have context object , we need KinesisEvent
object to invoke the handleRequest(KinesisEvent event, final Context context)
method.
KinesisEvent object internally contains EmployeeEvent
object, which contains event data. EmployeeEvent
object can be created in 2 ways
Using MockNeat
We can create EmployeeEvent
object with new
keyword.Creating multiple objects with new
keyword and setting data for each object is a tedious process.
mockneat
is an arbitrary data-generator open-source library written in Java.
It provides a simple but powerful (fluent) API that enables developers to create json, xml, csv and sql data programatically. It can also act as a powerful Random substitute or a mocking library.
Below method shows usage of mockneat
library to create test data and set that data into Kinesis reord.
private KinesisEvent createKinesisEvent(boolean gzip) throws Exception {
MockNeat m = MockNeat.threadLocal();
Reflect<EmployeeEvent> empEventBuilder = reflect(EmployeeEvent.class)
.field("name", m.names() )
.field("address", m.strings().size(10).get())
.useDefaults(true);
List<EmployeeEvent> empEventList = empEventBuilder.list(4).get();
byte[] data = objectMapper.writeValueAsBytes(empEventList);
if(gzip) {
ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
GZIPOutputStream gzipOut = new GZIPOutputStream(byteOut);
gzipOut.write(data);
gzipOut.close();
data = byteOut.toByteArray();
}
KinesisEvent.Record record = new KinesisEvent.Record();
record.setData(wrap(data));
KinesisEvent.KinesisEventRecord kinesisEventRecord = new KinesisEvent.KinesisEventRecord();
kinesisEventRecord.setEventID("shardId-1234567");
kinesisEventRecord.setKinesis(record);
KinesisEvent kinesisEvent = new KinesisEvent();
kinesisEvent.setRecords(singletonList(kinesisEventRecord));
return kinesisEvent;
}
Code language: Java (java)
Using Json String
If you have access to event data (i.e json data) , you can convert that data into corresponding event object.
Below method shows usage of mockneat
library to create test data and set that data into Kinesis reord.
private KinesisEvent createKinesisEvent(boolean gzip) throws Exception {
KinesisEvent kinesisEvent = new KinesisEvent();
String str = "["+
"{"+
"\"id\": 1234,"+
"\"name\": \"Suresh\","+
"\"address\": \"Hyderabad\"," +
"\"salary\": 523.45" +
"},"+
"{"+
"\"id\": 6567,"+
"\"name\": \"Naresh\","+
"\"address\": \"Delhi\"," +
"\"salary\": 893.45" +
"},"+
"{"+
"\"id\": 9367,"+
"\"name\": \"Mahesh\","+
"\"address\": \"Hyderabad\"," +
"\"salary\": 456.90" +
"}"+
"]";
final ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
KinesisEvent.Record record = new KinesisEvent.Record();
record.setData(buffer);
record.setKinesisSchemaVersion("1.0");
record.setPartitionKey("partition-1");
record.setSequenceNumber("1234");
KinesisEvent.KinesisEventRecord kinesisEventRecord = new KinesisEvent.KinesisEventRecord();
kinesisEventRecord.setEventID("shardId-1234567");
kinesisEventRecord.setEventSourceARN("arn:aws:kinesis:example");
kinesisEventRecord.setEventName("aws:kinesis:record");
kinesisEventRecord.setKinesis(record);
kinesisEvent.setRecords(singletonList(kinesisEventRecord));
return kinesisEvent;
}
Code language: Java (java)
You can download the source code for this blog post from GitHub