본문 바로가기

spring

spring boot 에 rabbitmq 적용하기

RabbitMQ 란


open source Message broker 로 알려져 있다


즉 mircro service에서 Producer-Consumer pattern을 이용할 시에 


producer 가 message 를 send 하면 consumer가 소비하는 구조이다. 


예를 들자면 어느 shop 에서 고객이 주문을 요청하면 producer 가 주문flow를 요청하고


microservice 에서 주문 요청을 처리하는 flow를 소비한다는 개념이다


이때 중간에서 rabbitMQ가


비동기로 이 주문요청의 event message를 queue 형식으로 받고 microservice에 전달하는 역할을 한다고 보면 된다.


RabbitMQ 기본개념

AMQP(Advanced Message Queuing Protocol) : 시스템 간 메시지를 교환하기 위해 공개 표준으로 정의한 프로토콜


Broker : 발행자가 만든 메시지를 저장


Virtual host : Broker 내의 가상 영역


Connection : 발생자와 소비자, Broker 사이의 물리적인 연결


Channel : 발행자와 소비자, Broker 사이의 논리적인 연결, 하나의 Connection 내에 다수의 Channel 설정 가능


Exchange : 발행한 모든 메시지가 처음 도달하는 지점으로 메시지가 목적지에 도달할 수 있도록 라우팅 규칙 적용, 라우팅 규칙에는 direct, topic, fanout


Queue : 메시지가 소비되기 전 대기하고 있는 최종 지점으로 Exchange 라우팅 규칙에 의해 단일 메시지가 복사되거나 다수의 큐에 도달할 수 있다


Binding : Exchange 와 Queue 간의 가상 연결

위에서 rabbitmq통신 방식은 3가지 direct / topic / fanout 에 대해서 알아봤는데


우리가 사용한 rabbitmq 통신방식은 direct를 사용했다. 다음은 direct의 queue 통신방식이다.





docker에 rabbitmq 설치

> docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 --restart=unless-stopped -e RABBITMQ_DEFAULT_USER=username -e RABBITMQ_DEFAULT_PASS=password rabbitmq:management

docker management 를 설치하면 port 15672 에서 rabbitmq 페이지를 확인할 수 있다. 

5672는 rabbitmq message 통신할때 사용되는 port이다.

실제로 이렇게 설치 후 


localhost:15672 에 접속했을때 보이는 화면은 다음과 같다.



RabbitMQ spring boot 에 설치

compile('org.springframework.boot:spring-boot-starter-amqp')


Producer 세팅

spring:
rabbitmq:
host: 192.168.10.11
port: 5672
username: username
password: password


Producer config 


rabbitmq manage에 접속해서 직접 direct exchange와 queue를 만들어도 되지만 다음과 같이 직접 @bean으로 등록하여 생성해주어도 된다.

@Configuration
public class RabbitMQConfiguration {

private String exchange = "direct.exchange";
private String chargeOrderRoutingKey = "charge.order.route";
private String chargeCompleteRoutingKey = "charge.complete.route";

private static final String CHARGE_ORDER_QUEUE = "charge.order.queue";
private static final String CHARGE_COMPLETE_QUEUE = "charge.complete.queue";

@Bean
public List<Declarable> directBindings() {
Queue chargeOrderQueue = new Queue(CHARGE_ORDER_QUEUE,true);
Queue chargeCompleteQueue = new Queue(CHARGE_COMPLETE_QUEUE, true);

DirectExchange directExchange = new DirectExchange(exchange);

return Arrays.asList(
chargeOrderQueue,
chargeCompleteQueue,
directExchange,
BindingBuilder.bind(chargeOrderQueue).to(directExchange).with(chargeOrderRoutingKey),
BindingBuilder.bind(chargeCompleteQueue).to(directExchange).with(chargeCompleteRoutingKey)
);
}

}


Producer broadcast


@Component
public class BroadcastMessageProducer {

private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());

private String exchange = "direct.exchange";
private String chargeOrderRoutingKey = "charge.order.route";
private String chargeCompleteRoutingKey = "charge.complete.route";


@Autowired
private RabbitTemplate rabbitTemplate;

public void produceChargeOrder(Entity chargeOrder){
String chargeOrderString = MapperUtil.writeObjectAsJsonString(chargeOrder);
rabbitTemplate.convertAndSend(exchange,chargeOrderRoutingKey,chargeOrderString);
}

public void produceChargeComplete(Entity chargeOrder){
String chargeCompleteString = MapperUtil.writeObjectAsJsonString(chargeOrder);
rabbitTemplate.convertAndSend(exchange,chargeCompleteRoutingKey,chargeCompleteString);
}

}

여기서 MapperUtil 은 우리가 만든 Overnodes Util 이고 gson 객체를 다루는 부분이다.


우리는 charge 객체를 message에 string json으로 담아서 보낼예정이다.


Consumer 세팅

compile('org.springframework.boot:spring-boot-starter-amqp')

rabbitmq:
host: 192.168.10.11
port: 5672
username: username
password: password


BroadcastMessageConsumer

여기서는 굳이 route키를 받을 필요가 없다 queue만 입력해주면 된다. 

우리는 producer 에서 message를 받고 이를 다시 mapperutil 로 json을 entity 객체로 만들어서 실제 서비스에 사용하였다.

여기서 alarmTalkservice는 우리 서비스중에 message를 받았을때 실행되어야 할 flow이다.

@Component
public class BroadcastMessageConsumers {

private static final String CHARGE_ORDER_QUEUE = "charge.order.queue";
private static final String CHARGE_COMPLETE_QUEUE = "charge.complete.queue";

@Autowired
AlarmTalkService alarmTalkService;

@RabbitListener(queues = {CHARGE_ORDER_QUEUE})
public void receiveMessageFromDirectExchangeWithOrderQueue(String message) {

Entity chargeOrder = MapperUtil.writeStringAsObject(message, Entity.class);
alarmTalkService.sendAlarmTalkChargeOrderConfirm(chargeOrder);

}

@RabbitListener(queues = {CHARGE_COMPLETE_QUEUE})
public void receiveMessageFromDirectExchangeWithCompleteQueue(String message) {

Entity chargeOrder = MapperUtil.writeStringAsObject(message, Entity.class);
alarmTalkService.sendAlarmTalkChargeOrderComplete(chargeOrder);
}

}