Create your Event-driven Application with Spring Cloud Stream

Soy Phea
3 min readAug 25, 2020

--

Introduction

Event-driven application is allow the application to became at high-performance, resilience and responsivenesses.

In microservices world architecture all the services as decouple to small independent to take advantages such deploy independently, small change specific feature and autonomous team.

Communication between the services

The communication between the service and service connect could be following protocol

  • HTTP RESTful
  • Asynchronous messaging like RabbitMQ, Kafka
  • gRCP
  • etc

The example of workflow for using event-driven

Let’s give a use case, you have two service like below:

account-service is responsible for exposing the API create an account.

When the user has created an account successfully, we would like to send them the notification via the mobile app or SMS.In this scenario account-service will send an event to notification-service along with data. So notification-service can trigger to the notification provider like SMS operator, or Firebase for mobile notification to the user.

This is the diagram details:

Implement this Event-driven with Spring Cloud Stream

Adding the maven dependencies:

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

The account-service or sender side

Binding our application to the RabbitMQ

@SpringBootApplication
@EnableBinding(Processor.class)
public class AccountApplication {

public static void main(String[] args) {
SpringApplication.run(AccountApplication.class,args);
}
}

By using the @EnableBinding(Processor.class), now our application is bind to the RabbitMQ as output channel.

Next step, we have to configure our output channel:

spring:
cloud:
stream:
bindings:
output:
destination: accounts_created

Sending the event to the RabbitMQ

We create a service class as AccountBinder and using Processor that provide by Spring framework.

public class AccountBinder<T extends BaseDomain> {

Object target;
Logger logger = LoggerFactory.getLogger(this.getClass());
final private Processor processor;

public AccountBinder(Processor processor) {
this.processor = processor;
}

public void send(T data) {
logger.info("Send message => {} to the channel",data);
Message message = MessageBuilder
.withPayload(data)
.setHeaderIfAbsent("operation","created")
.build();
processor.output()
.send(message);
}

Now we have a AccountService which will execute for adding an account for every created.

private List<Account> accounts = new ArrayList<>();

@Autowired
AccountBinder accountBinder;

public Account add(Account account) {
long count = accounts.stream()
.filter(acc -> acc.getAccountNumber().equals(account.getAccountNumber()))
.count();
logger.info("Found [{}] with account number:{}", count, account.getAccountNumber());
if (count > 0) {
throw new AccountException("Account :" + account.getAccountNumber() + " already existed");
}
Account registerAccount = new Account(account.getName(), account.getAccountNumber());
registerAccount.setCreateDate(LocalDate.now());
accounts.add(registerAccount);
accountBinder.send(registerAccount);
return registerAccount;
}

The notification-service or receiver side

@SpringBootApplication
@EnableBinding(Sink.class)
public class NotificationApplication {

@Autowired
private ObjectMapper objectMapper;

public static void main(String[] args) {
SpringApplication.run(NotificationApplication.class, args);
}
}

We are using @EnableBinding(Sink.class), to bind our application to RabbitMQ as input channel.

Next step, we have to configure our input channel:

spring:
cloud:
stream:
bindings:
input:
destination: accounts_created
group: accounts_created_group

The group: accounts_created_group will be guarantee only one consumer or notification-service will consume only one at the time if there are multiple consumers or notification-services are running the sametime.

Received the event from account-service

@Service
public class AccountStreaming {

static Class clazz = AccountStreaming.class;
static Logger logger = LoggerFactory.getLogger(clazz.getName());

@StreamListener(Sink.INPUT)
public void receivedAccount(Account account) {
logger.info("Received account :{} created.", account);
}
}

Source code: https://github.com/PheaSoy/spring-cloud-stream-example

--

--

Soy Phea
Soy Phea

Written by Soy Phea

System Design, Scalability, Spring/Java, k8s

No responses yet