Testing AWS Lambda with Kinesis stream in local IDE

Recently worked on a projected where I migrated event publishing from RabbitMQ to Kinesis stream.

In this post I will discuss how we can create AWS Lambda project and test the Lambda logic in local IDE without deploying to the project in AWS cloud. In this post I will focus on developing and testing lambda logic in local IDE. For step-by-step process to set up the lambda and to consume Kinesis stream in AWS cloud, please visit the AWS developer page

Required software

  • Java 8+
  • Junit 5
  • Maven 3.6+
  • Eclipse

Create a Maven project

Let’s create Lambda project using Maven.To create a Maven project from the command line, open a terminal or command prompt window, enter or paste the following command, and then press Enter or Return.

mvn -B archetype:generate  
-DarchetypeGroupId=software.amazon.awssdk   -DarchetypeArtifactId=archetype-lambda  -Dregion=US_WEST_2 -Dservice=s3 -DgroupId=com.sureshtech.aws   -DartifactId=aws-kinesis-lambdaCode language: Python (python)

The above maven command creates skeleton project for AWS Lambda. Let’s import the created project into eclipse IDE using below steps.

Once we import project into Eclipse IDE , we make changes to source code as per our needs.

First we include service jars in the pom.xml file we are going to use. Since we are going to use Kinesis event steam as source of data I have included ‘aws-java-sdk-kinesis’ and ‘aws-lambda-java-events’ jars.

<dependencies>
   <dependency>
      <groupId>com.amazonaws</groupId>
      <artifactId>aws-lambda-java-events</artifactId>
      <version>2.2.5</version>
   </dependency>
   <dependency>
      <groupId>com.amazonaws</groupId>
      <artifactId>aws-java-sdk-kinesis</artifactId>
      <version>1.11.959</version>
   </dependency>
   <dependency>
      <groupId>com.amazonaws</groupId>
      <artifactId>aws-lambda-java-core</artifactId>
      <version>${aws.lambda.java.version}</version>
   </dependency>
   <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.11.1</version>
   </dependency>
   <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>${slf4jVersion}</version>
   </dependency>
   <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>${slf4jVersion}</version>
   </dependency>
   <!-- Test Dependencies -->
   <dependency>
      <groupId>org.junit.jupiter</groupId>
      <artifactId>junit-jupiter</artifactId>
      <version>${junit5.version}</version>
      <scope>test</scope>
   </dependency>
   <dependency>
      <groupId>net.andreinc</groupId>
      <artifactId>mockneat</artifactId>
      <version>0.4.6</version>
      <scope>test</scope>
   </dependency>
   <dependency>
      <groupId>org.mockito</groupId>
      <artifactId>mockito-core</artifactId>
      <version>3.7.7</version>
      <scope>test</scope>
   </dependency>
   <dependency>
      <groupId>org.mockito</groupId>
      <artifactId>mockito-junit-jupiter</artifactId>
      <version>3.8.0</version>
      <scope>test</scope>
   </dependency>
</dependencies>Code language: Java (java)
https://gist.github.com/sureshgadupu/a06d88fed943a7670e6a13c18ce10d2f

.The structure of the projects looks like below.

Now we will rename the App.java class to AwsKinesisLambda.java and modify the handleRequest( ) method to take the KinesisEvent object.

package com.sureshtech.aws;

import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;

import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
 * Lambda function entry point. You can change to use other pojo type or implement
 * a different RequestHandler.
 *
 * @see <a href=https://docs.aws.amazon.com/lambda/latest/dg/java-handler.html>Lambda Java Handler</a> for more information
 */

public class AwsKinesisLambda  {
	
	private static final Logger logger =  LoggerFactory.getLogger(AwsKinesisLambda.class);
  
	private ObjectMapper objectMapper = new ObjectMapper();
	
	private EmployeeEventProcessor empEventProcessor;
	

    public void setEmpEventProcessor(EmployeeEventProcessor empEventProcessor) {
		this.empEventProcessor = empEventProcessor;
	}

	public AwsKinesisLambda() {
    	
    }

//    @Override
    public void  handleRequest(KinesisEvent event, final Context context) {
    	
        List<KinesisEvent.KinesisEventRecord> records = Optional.ofNullable(event.getRecords()).orElse(emptyList());
        
       List<EmployeeEvent> empEvents =  records.stream().flatMap(this::convertKinesisRecordToEmployeeRecord).collect(toList());
      
       int i = empEventProcessor.processEmployeeEvents(empEvents);
       
       logger.info("No. of records processed :"+ i);
    }

	private Stream<EmployeeEvent> convertKinesisRecordToEmployeeRecord(KinesisEvent.KinesisEventRecord record){
		try {
			byte[] data = record.getKinesis().getData().array();	
			
			
			return 	Stream.of(objectMapper.readValue(data,EmployeeEvent[].class));	
			
		}catch(Exception e) {
			throw new RuntimeException(e);
		}
    	
    	
    	
    }
	
	
}Code language: Java (java)
https://gist.github.com/sureshgadupu/f9d110e9f765ee45b415b4106f07a87b

Testing AWS Lambda in local IDE

We have developed lambda which takes the Kinesis Stream as input . Now it is time for testing the logic in local IDE.

If you look at the handleRequest() method it takes 2 parameters “KinesisEvent” and “context”.

“context” is runtime object provided by AWS Lambda runtime. We simulate “Context” object by implementing the “com.amazonaws.services.lambda.runtime.Context” interface

package com.sureshtech.aws;

import com.amazonaws.services.lambda.runtime.ClientContext;
import com.amazonaws.services.lambda.runtime.CognitoIdentity;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;

public class TestContext implements Context{
	
	private String awsRequestId = "EXAMPLE";
    private ClientContext clientContext;
    private String functionName = "EXAMPLE";
    private CognitoIdentity identity;
    private String logGroupName = "EXAMPLE";
    private String logStreamName = "EXAMPLE";
    private LambdaLogger logger = new TestLogger();
    private int memoryLimitInMB = 128;
    private int remainingTimeInMillis = 15000;

    public String getAwsRequestId() {
        return awsRequestId;
    }

    public void setAwsRequestId(String value) {
        awsRequestId = value;
    }

    
    public ClientContext getClientContext() {
        return clientContext;
    }

    public void setClientContext(ClientContext value) {
        clientContext = value;
    }

   
    public String getFunctionName() {
        return functionName;
    }

    public void setFunctionName(String value) {
        functionName = value;
    }

    public CognitoIdentity getIdentity() {
        return identity;
    }

    public void setIdentity(CognitoIdentity value) {
        identity = value;
    }

    public String getLogGroupName() {
        return logGroupName;
    }

    public void setLogGroupName(String value) {
        logGroupName = value;
    }

    public String getLogStreamName() {
        return logStreamName;
    }

    public void setLogStreamName(String value) {
        logStreamName = value;
    }

    public LambdaLogger getLogger() {
        return logger;
    }

    public void setLogger(LambdaLogger value) {
        logger = value;
    }

    public int getMemoryLimitInMB() {
        return memoryLimitInMB;
    }

    public void setMemoryLimitInMB(int value) {
        memoryLimitInMB = value;
    }

    public int getRemainingTimeInMillis() {
        return remainingTimeInMillis;
    }

    public void setRemainingTimeInMillis(int value) {
        remainingTimeInMillis = value;
    }

	@Override
	public String getFunctionVersion() {
		// TODO Auto-generated method stub
		return new String("$LATEST");
	}

	@Override
	public String getInvokedFunctionArn() {
		// TODO Auto-generated method stub
		return new String("arn:aws:lambda:us-east-2:12345678:function:AwsKinesisLambda");
	}

}
Code language: Java (java)
https://gist.github.com/sureshgadupu/5f782f547f5a85f198740b1022e58ec6

Now let’s develop a AWSLambdaTest class to invoke the Lambda.In below code testLambdaLocaltest( ) method invokes actual logic of the lambda as we are not using the any mock objects.

package com.sureshtech.aws;

import static java.nio.ByteBuffer.wrap;
import static java.util.Collections.singletonList;
import static net.andreinc.mockneat.unit.objects.Reflect.reflect;
import static net.andreinc.mockneat.unit.types.Ints.ints;
import static org.mockito.Mockito.mock;

import java.io.ByteArrayOutputStream;
import java.util.List;
import java.util.zip.GZIPOutputStream;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.fasterxml.jackson.databind.ObjectMapper;

import net.andreinc.mockneat.MockNeat;
import net.andreinc.mockneat.unit.objects.Reflect;

@ExtendWith(MockitoExtension.class)
public class AwsKinesisLambdaTest {
	
	private ObjectMapper objectMapper = new ObjectMapper();
	

	private AwsKinesisLambda lambda;

	
	private EmployeeEventProcessor employeeEventProcessor;
	
	 

    @Test
    public void testLambdaHandleRequest() throws Exception {
    	
    	lambda = new AwsKinesisLambda();
    	
    	Context testContext = new TestContext();
    	
    	employeeEventProcessor = mock(EmployeeEventProcessor.class);
    	
    	lambda.setEmpEventProcessor(employeeEventProcessor);

    	Mockito.when(employeeEventProcessor.processEmployeeEvents(ArgumentMatchers.anyList())).thenReturn(5);
    	
        lambda.handleRequest(createKinesisEvent(false), testContext);

        Mockito.verify(employeeEventProcessor, Mockito.times(1)).processEmployeeEvents(ArgumentMatchers.anyList());
    }
    
    
    @Test
    public void testLambdaLocaltest() throws Exception {
    	
    	lambda = new AwsKinesisLambda();
    	
    	Context testContext = new TestContext();
    	
    	employeeEventProcessor = new EmployeeEventProcessor();
    	
    	lambda.setEmpEventProcessor(employeeEventProcessor);

    	    	
      lambda.handleRequest(createKinesisEvent(false), testContext);

    }
    
    
}Code language: Java (java)
https://gist.github.com/sureshgadupu/d7d11b469f003dc078e323abf991b371

We have context object , we need KinesisEvent object to invoke the handleRequest(KinesisEvent event, final Context context) method.

KinesisEvent object internally contains EmployeeEvent object, which contains event data. EmployeeEvent object can be created in 2 ways

Using MockNeat

We can create EmployeeEvent object with new keyword.Creating multiple objects with new keyword and setting data for each object is a tedious process.

mockneat is an arbitrary data-generator open-source library written in Java.

It provides a simple but powerful (fluent) API that enables developers to create json, xml, csv and sql data programatically. It can also act as a powerful Random substitute or a mocking library.

Below method shows usage of mockneat library to create test data and set that data into Kinesis reord.

private KinesisEvent createKinesisEvent(boolean gzip) throws Exception {
    	
    	MockNeat m = MockNeat.threadLocal();
    	Reflect<EmployeeEvent> empEventBuilder = reflect(EmployeeEvent.class)    											
    											 .field("name", m.names()  )
    											 .field("address", m.strings().size(10).get())
    											 .useDefaults(true);
    	
    	List<EmployeeEvent>  empEventList = empEventBuilder.list(4).get();
    	
    	byte[] data = objectMapper.writeValueAsBytes(empEventList);
    	
    	if(gzip) {
    		
    		ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
    		
    		GZIPOutputStream gzipOut = new GZIPOutputStream(byteOut);
    		gzipOut.write(data);
    		gzipOut.close();
    		
    		data = byteOut.toByteArray();
    	}
    	
    	KinesisEvent.Record record =  new KinesisEvent.Record();
    	record.setData(wrap(data));
    	
    	KinesisEvent.KinesisEventRecord kinesisEventRecord = new KinesisEvent.KinesisEventRecord();
    	kinesisEventRecord.setEventID("shardId-1234567");
    	kinesisEventRecord.setKinesis(record);
    	
    	KinesisEvent kinesisEvent = new KinesisEvent();
    	kinesisEvent.setRecords(singletonList(kinesisEventRecord));
    	
    	return kinesisEvent;
    }Code language: Java (java)
https://gist.github.com/sureshgadupu/0fd496f43da4b7c2831648a90f89e7ce

Using Json String

If you have access to event data (i.e json data) , you can convert that data into corresponding event object.

Below method shows usage of mockneat library to create test data and set that data into Kinesis reord.

private KinesisEvent createKinesisEvent(boolean gzip) throws Exception {
    	
    	KinesisEvent kinesisEvent = new KinesisEvent();
    	
    	String str = "["+
    	  "{"+
    	       "\"id\": 1234,"+
    	       "\"name\": \"Suresh\","+
    	       "\"address\": \"Hyderabad\"," +
    	       "\"salary\": 523.45" +
    	  "},"+
	  "{"+
	  	"\"id\": 6567,"+
	  	"\"name\": \"Naresh\","+
	  	"\"address\": \"Delhi\"," +
	  	"\"salary\": 893.45" +
	   "},"+
	   "{"+
		 "\"id\": 9367,"+
		 "\"name\": \"Mahesh\","+
		 "\"address\": \"Hyderabad\"," +
		 "\"salary\": 456.90" +
	    "}"+
    	  
    	"]";
    	
    	
    	
    	final ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
    	
    	KinesisEvent.Record record =  new KinesisEvent.Record();
    	record.setData(buffer);
    	record.setKinesisSchemaVersion("1.0");
    	record.setPartitionKey("partition-1");
    	record.setSequenceNumber("1234");
    	
    	
    	KinesisEvent.KinesisEventRecord kinesisEventRecord = new KinesisEvent.KinesisEventRecord();
    	kinesisEventRecord.setEventID("shardId-1234567");
    	kinesisEventRecord.setEventSourceARN("arn:aws:kinesis:example");
    	kinesisEventRecord.setEventName("aws:kinesis:record");
    	kinesisEventRecord.setKinesis(record);
    	
    	
    	kinesisEvent.setRecords(singletonList(kinesisEventRecord));
    	
    	return kinesisEvent;
    }Code language: Java (java)
https://gist.github.com/sureshgadupu/7e4d5db56be8e194bf7dacb8963f12a9

You can download the source code for this blog post from GitHub

Similar Posts