RabbitMQ 란
open source Message broker 로 알려져 있다
즉 mircro service에서 Producer-Consumer pattern을 이용할 시에
producer 가 message 를 send 하면 consumer가 소비하는 구조이다.
예를 들자면 어느 shop 에서 고객이 주문을 요청하면 producer 가 주문flow를 요청하고
microservice 에서 주문 요청을 처리하는 flow를 소비한다는 개념이다
이때 중간에서 rabbitMQ가
비동기로 이 주문요청의 event message를 queue 형식으로 받고 microservice에 전달하는 역할을 한다고 보면 된다.
RabbitMQ 기본개념
AMQP(Advanced Message Queuing Protocol) : 시스템 간 메시지를 교환하기 위해 공개 표준으로 정의한 프로토콜
Broker : 발행자가 만든 메시지를 저장
Virtual host : Broker 내의 가상 영역
Connection : 발생자와 소비자, Broker 사이의 물리적인 연결
Channel : 발행자와 소비자, Broker 사이의 논리적인 연결, 하나의 Connection 내에 다수의 Channel 설정 가능
Exchange : 발행한 모든 메시지가 처음 도달하는 지점으로 메시지가 목적지에 도달할 수 있도록 라우팅 규칙 적용, 라우팅 규칙에는 direct, topic, fanout
Queue : 메시지가 소비되기 전 대기하고 있는 최종 지점으로 Exchange 라우팅 규칙에 의해 단일 메시지가 복사되거나 다수의 큐에 도달할 수 있다
Binding : Exchange 와 Queue 간의 가상 연결
위에서 rabbitmq통신 방식은 3가지 direct / topic / fanout 에 대해서 알아봤는데
우리가 사용한 rabbitmq 통신방식은 direct를 사용했다. 다음은 direct의 queue 통신방식이다.
docker에 rabbitmq 설치
실제로 이렇게 설치 후
localhost:15672 에 접속했을때 보이는 화면은 다음과 같다.
RabbitMQ spring boot 에 설치
compile('org.springframework.boot:spring-boot-starter-amqp')
Producer 세팅
spring:
rabbitmq:
host: 192.168.10.11
port: 5672
username: username
password: password
Producer config
rabbitmq manage에 접속해서 직접 direct exchange와 queue를 만들어도 되지만 다음과 같이 직접 @bean으로 등록하여 생성해주어도 된다.
@Configuration
public class RabbitMQConfiguration {
private String exchange = "direct.exchange";
private String chargeOrderRoutingKey = "charge.order.route";
private String chargeCompleteRoutingKey = "charge.complete.route";
private static final String CHARGE_ORDER_QUEUE = "charge.order.queue";
private static final String CHARGE_COMPLETE_QUEUE = "charge.complete.queue";
@Bean
public List<Declarable> directBindings() {
Queue chargeOrderQueue = new Queue(CHARGE_ORDER_QUEUE,true);
Queue chargeCompleteQueue = new Queue(CHARGE_COMPLETE_QUEUE, true);
DirectExchange directExchange = new DirectExchange(exchange);
return Arrays.asList(
chargeOrderQueue,
chargeCompleteQueue,
directExchange,
BindingBuilder.bind(chargeOrderQueue).to(directExchange).with(chargeOrderRoutingKey),
BindingBuilder.bind(chargeCompleteQueue).to(directExchange).with(chargeCompleteRoutingKey)
);
}
}
Producer broadcast
@Component
public class BroadcastMessageProducer {
private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
private String exchange = "direct.exchange";
private String chargeOrderRoutingKey = "charge.order.route";
private String chargeCompleteRoutingKey = "charge.complete.route";
@Autowired
private RabbitTemplate rabbitTemplate;
public void produceChargeOrder(Entity chargeOrder){
String chargeOrderString = MapperUtil.writeObjectAsJsonString(chargeOrder);
rabbitTemplate.convertAndSend(exchange,chargeOrderRoutingKey,chargeOrderString);
}
public void produceChargeComplete(Entity chargeOrder){
String chargeCompleteString = MapperUtil.writeObjectAsJsonString(chargeOrder);
rabbitTemplate.convertAndSend(exchange,chargeCompleteRoutingKey,chargeCompleteString);
}
}
여기서 MapperUtil 은 우리가 만든 Overnodes Util 이고 gson 객체를 다루는 부분이다.
우리는 charge 객체를 message에 string json으로 담아서 보낼예정이다.
Consumer 세팅
compile('org.springframework.boot:spring-boot-starter-amqp')
rabbitmq:
host: 192.168.10.11
port: 5672
username: username
password: password
@Component
public class BroadcastMessageConsumers {
private static final String CHARGE_ORDER_QUEUE = "charge.order.queue";
private static final String CHARGE_COMPLETE_QUEUE = "charge.complete.queue";
@Autowired
AlarmTalkService alarmTalkService;
@RabbitListener(queues = {CHARGE_ORDER_QUEUE})
public void receiveMessageFromDirectExchangeWithOrderQueue(String message) {
Entity chargeOrder = MapperUtil.writeStringAsObject(message, Entity.class);
alarmTalkService.sendAlarmTalkChargeOrderConfirm(chargeOrder);
}
@RabbitListener(queues = {CHARGE_COMPLETE_QUEUE})
public void receiveMessageFromDirectExchangeWithCompleteQueue(String message) {
Entity chargeOrder = MapperUtil.writeStringAsObject(message, Entity.class);
alarmTalkService.sendAlarmTalkChargeOrderComplete(chargeOrder);
}
}
'spring' 카테고리의 다른 글
(Spring Boot) @Transactional 이란? / @Transactional에 대하여 (1) | 2018.11.28 |
---|---|
RxJava 적용하기 (0) - RxJava overview (0) | 2018.11.22 |
토비의 스프링 5-4장. 메일 서비스 추상화 (0) | 2018.11.09 |
(Spring Boot Bean) 사용자 클래스 Bean 객체 등록 / 설정 간단하게 하기! (0) | 2018.11.06 |