Developing GraphQL API with Netflix DGS Framework(Part-III)

This is the third post of the series. In this blog post I will explain how to develop GraphQL subscriptions API in using Netflix DGS framework.

In addition to queries and mutations, GraphQL supports a third operation type: subscriptions.

GraphQL Subscriptions

Like queries, subscriptions enable us to fetch data. Unlike queries, subscriptions are long-lasting operations that can change their result over time. They can maintain an active connection to your GraphQL server (most commonly via WebSocket). GraphQL Subscriptions enable a client to receive updates for a query from the server over time.

The Netflix DGS framework supports subscriptions out of the box.The GraphQL specification doesn’t specify a transport protocol. WebSockets are the most popular transport protocol however, and are supported by the Netflix DGS Framework

Developing GraphQL subscriptions operations

I will demonstrate the subscription operation with notifying the subscribers/clients when ever a new employee is created

Adding subscription operations to schema

type Subscription {
   notifyEmployeeCreation : Employee
}Code language: Java (java)
https://gist.github.com/sureshgadupu/06f326cafe63cc064f31a7100213151a

Implementing subscription methods

Adding maven dependencies

As Netflix DGS supports Subscription operation over WebSocket protocol, we need to add the corresponding dependency in our project.

<dependency>
	<groupId>com.netflix.graphql.dgs</groupId>
	<artifactId>graphql-dgs-subscriptions-websockets-autoconfigure</artifactId>
	<scope>runtime</scope>
</dependency>Code language: Java (java)
https://gist.github.com/sureshgadupu/f993efce5f25be4c135cac1f6a687b5a

Implementing Subscription operation

Lets implement the notifyEmployeeCreation method in EmployeeDataFetcher class.

In the Netflix DGS framework a Subscription is implemented as a data fetcher with the @DgsSubscription annotation.We have to use the reactive Java libraries to implement the Subscription methods in Java. The difference with a normal query is that a subscription must return a org.reactivestreams.Publisher.

When ever we create a new employee object (i.e. a client calling createEmployee mutation), we are publishing that object through reactive stream so that ,client which are calling subscription methods can receive the data.

@DgsComponent
public class EmployeeDataFetcher {

	private FluxSink<Employee> employeeStream;
	private ConnectableFlux<Employee> employeePublisher;


	@PostConstruct
	public void init() {		

		Flux<Employee> publisher = Flux.create(emitter -> {
			employeeStream = emitter;
		});

		employeePublisher = publisher.publish();
		employeePublisher.connect();
	}
	
	@DgsMutation
	public Employee createEmployee(@InputArgument SubmittedEmployee employee) {

		.....
		...
		
		employeeStream.next(emp);
		return emp;

	}

	@DgsSubscription
	public Publisher<Employee> notifyEmployeeCreation() {

		return employeePublisher;
	}

}Code language: Java (java)
https://gist.github.com/sureshgadupu/ad8c49904a5f8611964c68c9c1a2b119

Testing subscriptions

Since subscription uses WebSocket protocol we can not use the GraphiQL client for testing. So we will use other tools for testing subscriptions, and we have to use ws://<host>:<port>/subscriptions url to Subscribe.

Using Hasura graphqurl

graphqurl is a curl like CLI for GraphQL. This library can be used as CLI and node module.

First install the library (as CLI) using npm command

npm install -g graphqurl

Next, run the following command in terminal/cmd for Subscription

gq ws://localhost:8080/subscriptions -q "subscription { notifyEmployeeCreation { id first_name last_name } }"

The command will wait for the response like below

Now use the GraphiQL client (http://localhost:8080/graphiql) to run the mutation operation to create a new employee, and let’s see whether subscriber receives information about new employee.

In below image you can see that subscriber received the new employee information, Once I created a new employee with mutation operation

Using Altair GraphQL client (Browser extension)

Altair GraphQL client is available for Chrome , Edge and Firefox browser as plugin. Install the extension from webstore.

Altair GraphQL client also available as an app for Window, Linux and Mac.

Now open the extension in your favourite browser.

First set the graphql api url (i.e http://localhost:8080/graphql)

Next set the subscription url by clicking on the subscription icon available on the left side.

Now enter the subscription url and select WebSocket as subscription type

Now write the Subscription query in query window and run it by clicking on the Run button. On the right side, you can observe that client subscribed successfully and waiting for the events from Server.

Now let’s run the mutation query to add a new Employee and see whether the subscription client receives details.

I am using the grpahiql client run the createEmployee mutation query

Note

Altair GraphQL client can run Queries,Muations and Subscriptions. To show the results of Muation and Subscription side by side , I have used the GraphiQL client.

Unit Testing GraphQL Subscriptions

Similar to a “normal” GraphQL test, we use the DgsQueryExecutor to execute a query. Just like a normal query, this results in a ExecutionResult. Instead of returning a result directly in the getData() method, a subscription query returns a Publisher. From Publisher we can extract the response and convert it to Employee object and assert the results. Each onNext of the Publisher is another ExecutionResult. This ExecutionResult contains the actual data.

Then we run the mutation query to see that we receive the notification of created employee.

@Test
	public void test_notifyEmployeeCreation() throws JsonMappingException, JsonProcessingException {

		ExecutionResult notifyEmployeeCreationSubscription = dgsQueryExecutor.execute(
				"subscription  { notifyEmployeeCreation { id first_name last_name gender hire_date birth_date} }");

		Publisher<ExecutionResult> publisher = notifyEmployeeCreationSubscription.getData();

		publisher.subscribe(new Subscriber<ExecutionResult>() {
			@Override
			public void onSubscribe(Subscription s) {
				s.request(1);
				;
			}

			@Override
			public void onNext(ExecutionResult executionResult) {

				assertThat(executionResult.getErrors()).isEmpty();

				Map<String, Object> empData = executionResult.getData();

				try {

					ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
					Employee emp = objectMapper.convertValue(empData.get("notifyEmployeeCreation"), Employee.class);

					assertThat(emp.getFirst_name()).isEqualTo("Suresh");
					assertThat(emp.getLast_name()).isEqualTo("Gadupu");
					assertThat(emp.getGender()).isEqualTo(Gender.M);
				} catch (IllegalArgumentException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}

			}

			@Override
			public void onError(Throwable t) {
				System.out.println(t);
				assertThat(t).isNull();
			}

			@Override
			public void onComplete() {
			}
		});

		ExecutionResult createEmployeeResult = dgsQueryExecutor.execute(
				"mutation { createEmployee (employee : {first_name :\"Suresh\" , last_name : \"Gadupu\" , deptId : 1 ,gender : M , hire_date : \"2014-12-02\" , birth_date:\"1980-12-11\" }) {id first_name last_name gender hire_date birth_date } }");

	}Code language: Java (java)
https://gist.github.com/sureshgadupu/127ced9f1c046da1c35cdbe888b7f1b4

You can download source code for this blog post from GitHub

Other posts in this series

Similar Posts