반응형
8. 비동기 메시지 전송하기
- REST를 사용한 동기화 통신이외에도 비동기 메시징을 통해 애플리케이션 간 통신할 수 있다.
- 비동기 메시징은 애플리케이션 간에 응답을 기다리지 않고 간접적으로 메시지를 전송하는 방법
- 비동기 메시징을 통해 통신하는 애플리케이션 간의 결합도를 낮추고 확장성을 높여준다.
비동기 메시징 종류
- JMS
- RabbitMQ
- AMQP
- 아파치 카프카
- EJB의 MDB(스프링의 메시지 기반 POJO 지원) 등이 있다.
메시지를 수신하는 방식
- 풀 모델
- 우리 코드에서 메시지를 요청하고 도착할 때까지 기다리는 방식
- 푸시 모델
- 메시지가 수신 가능하게 되면 우리 코드로 자동 전달하는 방식
- 두가지 모델 모두 용도에 맞게 사용할 수 있다. 그러나 스레드의 실행을 막지 않으므로 일반적으로 푸시 모델이 좋은 선택이다.
- 단 많은 메시지가 너무 빨리 도착한다면 리스너에 과부하가 걸리는 경우가 생길 수 있음.
- 메시지 리스너는 중단 없이 다수의 메시지를 빠르게 처리할 수 있어서 좋은 선택이 될 때가 있다.
- 그러나 메시지 처리기가 자신의 시간에 맞춰 더 많은 메시지를 요청할 수 있어야 한다면 JmsTemplate이 제공하는 풀 모델이 더 적합.
cf) JMS의 단점
- JMS가 자바 명세이므로 자바 애플리케이션에서만 사용할 수 있다는 것.
- RabbitMQ와 카프카 같은 더 새로운 메시징 시스템은 이러한 단점을 해결하여 다른 언어와 JVM 외의 다른 플랫폼에서 사용할 수 있다.
RabbitMQ로 메시지 전송하기
- AMQP의 가장 중요한 구현이라 할 수 있는 RabbitMQ는 JMS보다 더 진보된 메시지 라우팅 전략을 제공.
- JMS 메시지가 수신자가 가져갈 메시지 도착지의 이름을 주소로 사용하는 반면, AMQP 메시지는 수신자가 리스닝하는 큐와 분리된 거래소 이름과 라우팅 키를 주소로 사용 함.
- 메시지가 RabbitMQ 브로커에 도착하면 주소로 지정된 거래소로 들어간다.
- 거래소는 하나 이상의 큐에 메시지를 전달할 책임이 있다.
거래소 종류
- Default
- 브로커가 자동으로 생성하는 특별한 거래소. 해당 메시지의 라우팅 키와 이름이 같은 큐로 메시지를 전달. 모든 큐는 자동으로 기본 거래소와 연결
- Direct
- 바인딩 키가 해당 메시지의 라우팅 키와 같은 큐에 메시지를 전달.
- Topic
- 바인딩 키와 해당 메시지의 라우팅 키와 일치하는 하나 이상의 큐에 메시지를 전달
- Fanout
- 바인딩 키나 라우팅 키에 상관없이 모든 연결된 큐에 메시지를 전달
- Header
- 토픽 거래소와 유사하며, 라우팅 키 대신 메시지 헤더 값을 기반으로 한다는 것만 다름
- Dead letter
- 전달 불가능 모든 메시지를 보관하는 거래소
RabbmitMQ를 스프링에 추가하기.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- AMQP 스타터를 빌드에 추가하면 다른 지원 컴포넌트는 물론이고 AMQP 연결 팩토리와 RabbitTemplate 빈을 생성하는 자동-구성이 수행됨.
- 따라서 스프링을 사용해서 RabbitMQ 브로커로부터 메시지를 전송 및 수신할 수 있다.
RabbitMQ 브로커의 위치와 인증 정보를 구성하는 속성
spring:
profiles: prod
rabbitmq:
host: rabbit.tacocloud.com
port: 5673
username: tacoweb
password: tacoweb
RabbitTemplate을 사용해서 메시지 전송하기
public interface OrderMessagingService {
void sendOrder(Order order);
}
방법1) send()
@Service
public class RabbitOrderMessagingService implements OrderMessagingService {
private final RabbitTemplate rabbitTemplate;
public RabbitOrderMessagingService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@Override
public void sendOrder(Order order) {
MessageConverter converter = rabbitTemplate.getMessageConverter();
MessageProperties properties = new MessageProperties();
Message message = converter.toMessage(order, properties);
rabbitTemplate.send("tacocloud.orders", message);
}
}
방법2) convertAndSend()
- 모든 변환 작업을 RabbitTemplate이 처리하도록 convertAndSend()를 사용할 수 있다.
@Service
public class RabbitOrderMessagingService implements OrderMessagingService {
private final RabbitTemplate rabbitTemplate;
public RabbitOrderMessagingService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@Override
public void sendOrder(Order order) {
rabbitTemplate.convertAndSend("tacocloud.orders", order);
}
}
메시지 변환기 구성하기
기본적인 메시지 변환은 SimpleMessageConverter로 수행되며, 이것은 String 같은 간단한 타입과 Serializable 객체를 Message 객체로 변환할 수 있다. 그러나 스프링은
RabbitTemplate에서 사용할 수 있는 다양한 메시지 변환기를 제공함.
- Jackson2JsonMessageConverter 객체를 JSON으로 상호 변환
- MarshallingMessageConverter
- 스프링 Marshaller와 Unmarshaller를 사용해서 변환
- SerializerMessageConverter
- 스프링의 Serializer, Deserializer를 사용해서 String 객체를 변환.
- SimpleMessageConverter
- String, byte 배열, Serializable 타입을 변환.
- ContentTypeDelegatingMessageConverter
- contentType 헤더를 기반으로 다른 메시지 변환기에 변환을 위임한다.
@Configuration
public class MessagingConfig {
@Bean
public Jackson2JsonMessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
- JSON 기반 메시지 변환.
- 스프링 부트 자동-구성에서 이 빈을 찾아서 기본 메시지 변환기 대신 이 빈을 RabbitTemplate으로 주입한다.
메시지 속성 설정하기
- 메시지의 일부 헤더를 설정해야 하는 경우.
- Message 객체를 생성할 때 메시지 변환기에서 제공하는 MessageProperties 인스턴스를 통해 헤더를 설정할 수 있다.
방법1) send()
@Service
public class RabbitOrderMessagingService implements OrderMessagingService {
private final RabbitTemplate rabbitTemplate;
public RabbitOrderMessagingService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@Override
public void sendOrder(Order order) {
MessageConverter converter = rabbitTemplate.getMessageConverter();
MessageProperties properties = new MessageProperties();
properties.setHeader("X_ORDER_SOURCE", "WEB"); // 헤더를 설정
Message message = converter.toMessage(order, properties);
rabbitTemplate.send("tacocloud.orders", message);
}
}
방법2) convertAndSend()
@Service
public class RabbitOrderMessagingService implements OrderMessagingService {
private final RabbitTemplate rabbitTemplate;
public RabbitOrderMessagingService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@Override
public void sendOrder(Order order) {
// 모든 변환 작업을 RabbitTemplate이 처리하도록 convertAndSend()를 사용할 수 있다.
rabbitTemplate.convertAndSend("tacocloud.orders", order, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties properties = message.getMessageProperties();
properties.setHeader("X_ORDER_SOURCE", "WEB");
return message;
}
});
}
}
RabbitMQ로부터 메시지 수신하기
- Pull 모델: RabbitTemplate을 사용해서 큐로부터 메시지를 가져온다.
- Push 모델: @RabbitListener가 지정된 메서드로부터 메시지가 푸시 된다.
@Component
public class RabbitOrderReceiver {
private final RabbitTemplate rabbitTemplate;
public RabbitOrderReceiver(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
// 방법1
public Order receiveOrder() {
return rabbitTemplate.receiveAndConvert("tacocloud.orders", new ParameterizedTypeReference<Order>() {
});
}
// 방법2
public Order receiveOrder() {
return (Order) rabbitTemplate.receiveAndConvert("tacocloud.orders");
}
// 방법3
public Order receiveOrder() {
Message message = rabbitTemplate.receive("tacocloud.orders");
return message != null ? (Order) messageConverter.fromMessage(message) : null;
}
}
타임아웃 설정하기
spring:
rabbitmq:
template:
receive-timeout: 30000 # timeout
리스너를 사용해서 RabbitMQ 메시지 처리하기
@Slf4j
@Component
public class OrderListener {
@RabbitListener(queues = "tacocloud.orders")
public void receiveOrder(Order order) {
log.info(order.toString() + " send");
}
}
Kafka 사용하기
- 아파치 카프카는 가장 새로운 메시징 시스템이며 ActiveMQ, Artemis, RabbitMQ와 유사한 메시지 브로커이다.
- 그러나 카프카는 특유의 아키텍처를 가짐.
- 카프카는 높은 확장성을 제공하는 클러스터로 실행되도록 설계되었음.
- 클러스터의 모든 카프카 인스턴스에 걸쳐 토픽을 파티션으로 분할하여, 메시지를 관리.
- RabbitMQ가 거래소와 큐를 사용해서 메시지를 처리하는 반면, 카프카는 토픽만 사용.
- 카프카의 토픽은 모든 브로커에 걸쳐 복제된다.
- 클러스터의 각 노드는 하나 이상의 토픽에 대한 리더로 동작하며, 토픽 데이터를 관리하고 클러스터의 다른 노드로 데이터를 복제한다.
- 카프카 클러스터는 여래 개의 브로커로 구성되며, 각 브로커는 토픽의 파티션의 리더로 동작. - 각 토픽은 여러 개의 파티션으로 분할될 수 있따. 이 경우 클러스터의 각 노드는 한 토픽의 하나 이상의 파티션의 리더가 됨.
카프카 사용을 위해 스프링 설정하기
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
- 의존성을 추가하면 스프링 부트가 카프카 사용을 위한 자동-구성을 해준다. (KafkaTemplate을 준비함)
- 우리는 KafkaTemplate을 주입하고 메시지를 전송, 수신하면 된다.
카프카 클러스터 설정
spring:
kafka:
bootstrap-servers:
- localhost:9092
default-topic: test # 기본 토픽을 설정할 필요 있을 경
---
spring:
profiles: prod
kafka:
bootstrap-servers:
- kafka.tacocloud.com:9092
- kafka.tacocloud.com:9093
- kafka.tacocloud.com:9093 # 클러스터의 여러 서버를 지정할 수 있음.
bin/zookeeper-server-start.sh config/zookeeper.properties # 주키버 서버 실행
bin/kafka-server-start.sh config/server.properties # 카프카 브로커 실행
bin/kafka-topics.sh -create -zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test # 토픽(test) 생성
KafkaTemplate을 사용해서 메시지 전송하기
@Service
public class KafkaOrderMessagingService implements OrderMessagingService {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaOrderMessagingService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public void sendOrder(Order order) {
kafkaTemplate.send("test", order.getDeliveryName());
// kafkaTemplate.sendDefault(order.getDeliveryName()); // Default 토픽을 설정하고 보내는 경우
}
}
카프카 리스너 작성하기
- 카프카는 send(), sendDefault() 특유의 메소드 시그니처 외에도 KafkaTemplate은 메시지를 수신하는 메서드를 일체 제공하지 않음.
- 따라서 스프링을 사용해서 카프카 토픽 메시지를 가져오는 유일한 방법은 메시지 리스너를 작성하는 것.
@Slf4j
@Component
public class OrderListener {
@KafkaListener(topics = "test")
public void handle(String message) {
log.info("message is receive" + message);
}
}
수신된 메시지의 파티션과 타임스탬프를 가져올 수 있다.
@Slf4j
@Component
public class OrderListener {
@KafkaListener(topics = "test", groupId = "test")
public void handle(String message, ConsumerRecord<String, String> record) {
log.info("Received message {} from partition {} with timestamp {}", message, record.partition(), record.timestamp());
}
}
정리
- 애플리케이션 간 비동기 메시지 큐를 이용한 통신 방식은 간접 계층을 제공하므로 애플리케이션 간의 결합도는 낮추면서 확장성은 높임.
- 스프링은 JMS, RabbitMQ 또는 아파치 카프카를 사용해서 비동기 메시징을 지원.
- 스프링 애플리케이션은 템플릿 기반의 클라이언트인 JmsTemplate, RabbitTemplate, KafkaTemplate을 사용해서 메시지 브로커를 통한 메시지 전송을 할 수 있다.
- 메시지 수신 애플리케이션은 같은 템플릿 기반의 클라이언트들을 사용해서 풀 모델 형태의 메시지 소비를 할 수 있다. (카프카 제외)
- 메시지 리스너 애플리케이션을 빈 메소드에 지정하면 푸쉬 모델의 형태로 Consumer에게 메시지가 전송될 수 있다.
반응형
'Application > Spring Framework' 카테고리의 다른 글
[스프링 인 액션 정리] 11-12 리액티브 API, 데이터 퍼시스턴스 (0) | 2021.01.12 |
---|---|
[스프링 인 액션 정리] 10장. 리액터 개요 (0) | 2021.01.09 |
[스프링 인 액션 정리] 6장, 7장. REST 서비스 생성 & 사용하기 (0) | 2021.01.07 |
[스프링 인 액션 정리] 5장. 구성 속성 사용하기 (0) | 2021.01.07 |
[스프링 인 액션 정리] 4장. 스프링 시큐리티 (0) | 2021.01.06 |