Integration testing AWS Lambda with Localstack and Testcontainers

In my previous blog post, I covered how we can mock AWS services with LocalStack and test AWS Lambda from the terminal. In this post, I will cover how can we do integration testing of AWS Lambda using LocalStack and TestContainers with Java.

In this post, I will demonstrate the event published to Kinesis stream is processed by AWS Lambda and storing the event details in S3 buckets.

Create a Maven project

mvn -B archetype:generate -DarchetypeGroupId=software.amazon.awssdk   -DarchetypeArtifactId=archetype-lambda  -Dregion=US_EAST_1 -Dservice=s3 -DgroupId=com.fullstackdev.aws   -DartifactId=localstack-kinesis-s3-demoCode language: Python (python)

Let’s put the required jar file dependencies in the pom.xml file. I have added Kinesis,TestContainers and Slf4j related jars in the pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.fullstackdev.aws</groupId>
	<artifactId>localstack-kinesis-s3-demo</artifactId>
	<version>1.0-SNAPSHOT</version>
	<packaging>jar</packaging>
	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
		<maven.shade.plugin.version>3.2.1</maven.shade.plugin.version>
		<maven.compiler.plugin.version>3.6.1</maven.compiler.plugin.version>
		<exec-maven-plugin.version>1.6.0</exec-maven-plugin.version>
		<aws.java.sdk.version>2.15.79</aws.java.sdk.version>
		<aws.lambda.java.version>1.2.0</aws.lambda.java.version>
		<junit5.version>5.4.2</junit5.version>
		<slf4jVersion>1.7.30</slf4jVersion>
		<testcontainers.version>1.15.3</testcontainers.version>
	</properties>

	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>software.amazon.awssdk</groupId>
				<artifactId>bom</artifactId>
				<version>${aws.java.sdk.version}</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

	<dependencies>
		<dependency>
			<groupId>com.amazonaws</groupId>
			<artifactId>aws-lambda-java-events</artifactId>
			<version>3.9.0</version>
		</dependency>

		<dependency>
			<groupId>software.amazon.awssdk</groupId>
			<artifactId>kinesis</artifactId>
			<exclusions>
				<exclusion>
					<groupId>software.amazon.awssdk</groupId>
					<artifactId>netty-nio-client</artifactId>
				</exclusion>
				<exclusion>
					<groupId>software.amazon.awssdk</groupId>
					<artifactId>apache-client</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		
		<dependency>
			<groupId>software.amazon.awssdk</groupId>
			<artifactId>s3</artifactId>
			<exclusions>
				<exclusion>
					<groupId>software.amazon.awssdk</groupId>
					<artifactId>netty-nio-client</artifactId>
				</exclusion>
				<exclusion>
					<groupId>software.amazon.awssdk</groupId>
					<artifactId>apache-client</artifactId>
				</exclusion>
			</exclusions>
		</dependency>

		<dependency>
			<groupId>software.amazon.awssdk</groupId>
			<artifactId>url-connection-client</artifactId>
		</dependency>

		<dependency>
			<groupId>com.amazonaws</groupId>
			<artifactId>aws-lambda-java-core</artifactId>
			<version>${aws.lambda.java.version}</version>
		</dependency>

		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>${slf4jVersion}</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>${slf4jVersion}</version>
		</dependency>
		
		<!-- Test Dependencies -->

		<dependency>
			<groupId>com.amazonaws</groupId>
			<artifactId>aws-java-sdk-s3</artifactId>
			<version>1.11.914</version>
			<scope>test</scope>
		</dependency>

		
		<dependency>
			<groupId>org.junit.jupiter</groupId>
			<artifactId>junit-jupiter</artifactId>
			<version>${junit5.version}</version>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.testcontainers</groupId>
			<artifactId>junit-jupiter</artifactId>
			<version>${testcontainers.version}</version>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.testcontainers</groupId>
			<artifactId>localstack</artifactId>
			<version>${testcontainers.version}</version>
			<scope>test</scope>
		</dependency>

	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>${maven.compiler.plugin.version}</version>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<version>${maven.shade.plugin.version}</version>
				<configuration>
					<createDependencyReducedPom>false</createDependencyReducedPom>
					<finalName>localstack-kinesis-s3-demo</finalName>
					<filters>
						<filter>
							<artifact>*:*</artifact>
							<excludes>
								<!-- Suppress module-info.class warning -->
								<exclude>module-info.class</exclude>
							</excludes>
						</filter>
					</filters>
				</configuration>
				<executions>
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>
Code language: Java (java)
https://gist.github.com/sureshgadupu/9d68af0fb7d9291ecccfda1aa515ec55

Note

Even though we have created project with AWS Java SDK v2, we have to include the S3 jar dependency from Java SDK v1 for testing purpose.

For AWS Lambda code which processes events from Kinesis stream .

package com.fullstackdev.aws;

import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;

import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;

import software.amazon.awssdk.services.s3.S3Client;

/**
 * Lambda function entry point. You can change to use other pojo type or
 * implement a different RequestHandler.
 *
 * @see <a
 *      href=https://docs.aws.amazon.com/lambda/latest/dg/java-handler.html>Lambda
 *      Java Handler</a> for more information
 */

public class AwsKinesisLambda {

	private static final Logger logger = LoggerFactory.getLogger(AwsKinesisLambda.class);

	private ObjectMapper objectMapper = new ObjectMapper();

	private EmployeeEventProcessor empEventProcessor;

	private final S3Client s3Client;

	public void setEmpEventProcessor(EmployeeEventProcessor empEventProcessor) {
		this.empEventProcessor = empEventProcessor;
	}

	public AwsKinesisLambda() {
		this.empEventProcessor = new EmployeeEventProcessor();
		objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY);
		s3Client = DependencyFactory.s3Client();
	}

//    @Override
	public void handleRequest(KinesisEvent event, final Context context) {

		List<KinesisEvent.KinesisEventRecord> records = Optional.ofNullable(event.getRecords()).orElse(emptyList());

		List<EmployeeEvent> empEvents = records.stream().flatMap(this::convertKinesisRecordToEmployeeRecord)
				.collect(toList());

		int i = empEventProcessor.processEmployeeEvents(empEvents, s3Client);

		logger.info("No. of records processed :" + i);
	}

	private Stream<EmployeeEvent> convertKinesisRecordToEmployeeRecord(KinesisEvent.KinesisEventRecord record) {
		try {
			byte[] data = record.getKinesis().getData().array();

			return Stream.of(objectMapper.readValue(data, EmployeeEvent[].class));

		} catch (Exception e) {
			throw new RuntimeException(e);
		}

	}

}
Code language: Java (java)
https://gist.github.com/sureshgadupu/2433df1d39b68129c73e8667d6182fff

EmployeeEventProcessor class is used to process the events data and store event data in S3 buckets.

package com.fullstackdev.aws;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.waiters.S3Waiter;
import software.amazon.awssdk.services.s3.model.*;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.core.waiters.WaiterResponse;
import software.amazon.awssdk.regions.Region;

public class EmployeeEventProcessor {

	private static final Logger logger = LoggerFactory.getLogger(EmployeeEventProcessor.class);

	public int processEmployeeEvents(List<EmployeeEvent> empEvents, S3Client s3Client) {

		for (EmployeeEvent employeeEvent : empEvents) {

			logger.info("Emp event -> Id :  " + employeeEvent.getId() + " , Name :" + employeeEvent.getName()
					+ " , Address : " + employeeEvent.getAddress() + " , Salary :" + employeeEvent.getSalary());
			

			String bucket_name = employeeEvent.getId() + "-" + employeeEvent.getName().toLowerCase();

			if (s3Client.listBuckets().buckets().stream().anyMatch(bucket -> bucket.name().equals(bucket_name))) {

				logger.info("\nCannot create the bucket. \n" + "A bucket named " + bucket_name + " already exists.");

			} else {

				try {

					createBucket(s3Client, bucket_name, Region.US_EAST_1);

					createS3Object(s3Client, employeeEvent, bucket_name);

				} catch (S3Exception e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}
		return empEvents.size();

	}

	private void createS3Object(S3Client s3Client, EmployeeEvent employeeEvent, String bucket_name) {
		
		PutObjectRequest putObjectRequest = PutObjectRequest.builder().bucket(bucket_name)
				.key(bucket_name + ".csv").build();
		
		String csv = employeeEvent.getId() + "," + employeeEvent.getName() + ","
				+ employeeEvent.getAddress() + "," + employeeEvent.getSalary();

		s3Client.putObject(putObjectRequest, RequestBody.fromString(csv));
		
		logger.info("S3 object created successfully for ", employeeEvent.getName());
	}

	private void createBucket(S3Client s3Client, String bucketName, Region region) throws S3Exception {

		logger.info("\nCreating a new bucket named '%s'...\n\n", bucketName);

		S3Waiter s3Waiter = s3Client.waiter();

		CreateBucketRequest bucketRequest = CreateBucketRequest.builder().bucket(bucketName).build();

		s3Client.createBucket(bucketRequest);

		HeadBucketRequest bucketRequestWait = HeadBucketRequest.builder().bucket(bucketName).build();

		// Wait until the bucket is created and print out the response
		WaiterResponse<HeadBucketResponse> waiterResponse = s3Waiter.waitUntilBucketExists(bucketRequestWait);

		waiterResponse.matched().response().ifPresent(System.out::println);

		logger.info(bucketName + " is ready");

	}

}Code language: Java (java)
https://gist.github.com/sureshgadupu/01ff34666868245a82ae2a16a0762ecf
https://gist.github.com/sureshgadupu/4c35aeda483cd346049634f37e2f104d
package com.fullstackdev.aws;

import java.net.URI;
import java.net.URISyntaxException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;

/**
 * The module containing all dependencies required by the {@link App}.
 */
public class DependencyFactory {

	private static final Logger logger = LoggerFactory.getLogger(DependencyFactory.class);	

	private DependencyFactory() {
	}

	/**
	 * @return an instance of S3Client
	 */
	public static S3Client s3Client() {

		String localstack_host = System.getenv("LOCALSTACK_HOSTNAME");
		URI endpointOverride = null;

		try {

			endpointOverride = new URI("http://" + localstack_host + ":4566");

		} catch (URISyntaxException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
		if(localstack_host != null) {

		return S3Client.builder().endpointOverride(endpointOverride)
								 .credentialsProvider(EnvironmentVariableCredentialsProvider.create()).region(Region.US_EAST_1)
								 .httpClientBuilder(UrlConnectionHttpClient.builder())
								 .build();
		} else {
			
			return S3Client.builder()
					 				.credentialsProvider(EnvironmentVariableCredentialsProvider.create()).region(Region.US_EAST_1)
					 				.httpClientBuilder(UrlConnectionHttpClient.builder())
					 				.build();
			
		}
	}

}
Code language: Java (java)

In DependencyFactory class, we override endpoint so that in test environment ,it points to LocalStack for performing S3 operations.

Before looking at the integration test class, we need to understand how the Lambdas are executed in LocalStack.

Just like in AWS ,LocalStack spawns a separate container for Lambda execution.For executing Lambdas based on Java,LocalStack uses lambci/lambda (java8) image. Since Lambda is executed in a separate container, If Lambda requires interacting with other AWS services within the LocalStack , Lambda should be aware of LocalStack container network. “LAMBDA_DOCKER_NETWORK” env variable is used to pass the network name which connects the both the container.

localstack container
https://gist.github.com/sureshgadupu/287566d14b614c42a5fb938f47bc63ee
package com.fullstackdev.aws;

import static org.junit.Assert.assertEquals;

import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container.ExecResult;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.Network.NetworkImpl;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.containers.localstack.LocalStackContainer.Service;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.shaded.com.fasterxml.jackson.databind.JsonNode;
import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.MountableFile;

@Testcontainers
public class AwsKinesisLambdaIntegrationTest {
	
	private static final Logger logger =  LoggerFactory.getLogger(AwsKinesisLambdaIntegrationTest.class);
	
	static Network shared = Network.SHARED;
	
	public static String networkName = ((NetworkImpl)shared).getName();
	
	static ObjectMapper objectMapper = new ObjectMapper();
	
	@Container
	  static LocalStackContainer localStack = new LocalStackContainer(DockerImageName.parse("localstack/localstack"))
			  						.withServices(Service.S3,Service.LAMBDA,Service.KINESIS,Service.CLOUDWATCHLOGS,LocalStackContainer.EnabledService.named("events"))
			  						.withEnv("DEFAULT_REGION", "us-east-1")
			  						.withNetwork(shared)
			  						.withEnv("LAMBDA_DOCKER_NETWORK", networkName)
			  						.withCopyFileToContainer(MountableFile.forHostPath(new File("<jar file path>/localstack-kinesis-s3-demo.jar").getPath()), "/tmp/localstack/localstack-kinesis-s3-demo.jar")
			  						;
	
	
	@BeforeAll
	  static void beforeAll() throws IOException, InterruptedException, URISyntaxException {	

		
		ExecResult lambdaCreation = localStack.execInContainer(
				 "awslocal", "lambda", "create-function",
				 "--function-name", "localstack-kinesis-s3",
				 "--runtime", "java8",
				 "--region","us-east-1",
		"--handler","com.fullstackdev.aws.AwsKinesisLambda::handleRequest",
		"--role", "arn:aws:iam::123456:role/test",
              "--zip-file","fileb:///tmp/localstack/localstack-kinesis-s3-demo.jar",
	"--environment", "Variables={AWS_ACCESS_KEY_ID="+localStack.getAccessKey()+",AWS_SECRET_ACCESS_KEY="+localStack.getSecretKey()+"}"
									 );
		logger.info("LambdaCreation result :", lambdaCreation.getStdout());
		logger.info("LambdaCreation error :", lambdaCreation.getStderr());
		
	  }
	
	
	
	@Test
	public void testContainer() throws UnsupportedOperationException, IOException, InterruptedException, URISyntaxException {
		logger.info(localStack.getContainerName());

		
		ExecResult createStream = localStack.execInContainer(
								     "awslocal", "kinesis", "create-stream",
								     "--stream-name","lambda-stream",	    
								     "--shard-count", "3"	     
								   );
		
		logger.info("CreateStream  result :",createStream.getStdout());
		logger.info("CreateStream  error :",createStream.getStderr());
		
		
		ExecResult sourcemapping = localStack.execInContainer(
								      "awslocal", "lambda", "create-event-source-mapping",
								      "--function-name","localstack-kinesis-s3",	    
								      "--batch-size", "100",
								      "--starting-position","AT_TIMESTAMP",
								      "--starting-position-timestamp","1541139109",			     
								      "--event-source-arn","arn:aws:kinesis:us-east-1:000000000000:stream/lambda-stream"			      
			    						);
		logger.info("source mapping result :"+sourcemapping.getStdout());
		
		logger.info("source mapping error :"+sourcemapping.getStderr());
		
		ExecResult event   =   	localStack.execInContainer(
								   "awslocal", "kinesis", "put-record",
								   "--stream-name","lambda-stream",	    
								   "--partition-key", "000",
								   "--data","[{\"id\" : 1, \"name\" : \"Suresh\" , \"address\" : \"Hyderabad\" , \"salary\": 20} , {\"id\" : 2, \"name\" : \"Alex\" , \"address\" : \"Auckland\" , \"salary\": 40}]"
				                                     );
		
		logger.info("Event :"+event.getStdout());
		logger.info("Event error :"+event.getStderr());	
		
		
		ExecResult result32 = localStack.execInContainer(
			      					 "awslocal", "lambda", "list-functions"			     
			    					);
		
		logger.info("result32  :"+result32.getStdout());

		
		ExecResult logGroups = localStack.execInContainer(
			      					  "awslocal", "logs", "describe-log-groups"
			    					  );
		
		logger.info("logGroups  :", logGroups.getStdout());
		logger.info("logGroups Error  :",logGroups.getStderr());
		
		
		ExecResult logStreams = localStack.execInContainer(
								   "awslocal", "logs", "describe-log-streams",
								   "--region","us-east-1",
								   "--log-group-name",
								   "/aws/lambda/localstack-kinesis-s3"
								 );

		
		logger.info("logStreams :",logStreams.getStdout());
		
		logger.info("logStreams Error :",logStreams.getStderr());
		
		JsonNode jsonNode = objectMapper.readTree(logStreams.getStdout()).get("logStreams");
		
		String lambdaLogGroup = "";
		
		if (jsonNode.isArray()) {
			
		    for (final JsonNode objNode : jsonNode) {
		    	
		        logger.info(objNode.get("logStreamName").asText());
		        
		        lambdaLogGroup = objNode.get("logStreamName").asText().replace("\"", "");
		        
		        ExecResult logs = localStack.execInContainer(
					 "awslocal", "logs", "get-log-events",
			   "--region","us-east-1",
			  "--log-group-name","/aws/lambda/localstack-kinesis-s3",
			 "--log-stream-name",lambdaLogGroup);
		        logger.info(logs.getStdout());
		    }
		}

		ExecResult listS3buckets = localStack.execInContainer(		      											                   
                                              "awslocal", "s3api", "list-buckets");			  		
		logger.info(" Buckets :"+listS3buckets.getStdout());
		logger.info(" Buckets :"+listS3buckets.getStderr());
		
		List<String> expectedBucketNames = new ArrayList<>();
		expectedBucketNames.add("1-suresh");
		expectedBucketNames.add("2-alex");
		
		List<String> resultBucketNames = new ArrayList<>();
		JsonNode jsonNode2 = objectMapper.readTree(listS3buckets.getStdout()).get("Buckets");
		
		if (jsonNode2.isArray()) {
			
			 for (final JsonNode objNode : jsonNode2) {
				 String bucketname = objNode.get("Name").asText();
				 logger.info("Bucket Name :",bucketname);
				 resultBucketNames.add(bucketname);	
				 ExecResult bucketContents = localStack.execInContainer(																		"awslocal", "s3api", "list-objects","--bucket",bucketname  
																	   );
				 logger.info(bucketContents.getStdout());
				 logger.info(bucketContents.getStderr());
			 }
		}
		
		
		assertEquals(expectedBucketNames, resultBucketNames);
	}

}
Code language: Java (java)

Now let’s look into the integration test class

Testcontainers has pre-defined container classes for LocalStack which can be used to start the container.

static LocalStackContainer localStack = new LocalStackContainer(DockerImageName.parse(“localstack/localstack”))

withServices – pass the required AWS services to this method, to start with Localstack container

For our example, I am starting S3, LAMBDA, KINESIS, CLOUDWATCHLOGS services

.withNetwork – pass the network on which container is attached.

.withEnv – is used to pass environment variables to LocalStack container

Note : you can use multiple withEnv methods to pass the multiple env variables or you can pass Map of environment variables in single env method.

LAMBDA_DOCKER_NETWORK – env variable is used to pass the Docker network for the container running lambda function.

Before creating the Lambda function, we need to copy the corresponding jar to the container

.withCopyFileToContainer( ) – method is used to copy jar file to container

.execInContainer( ) – method is used to execute command against LocalStack container.

You can download the code for this blog post from  Github.

Similar Posts