Application/Spring Framework

[스프링 인 액션 정리] 8장. 비동기 메시지 전송하기

반응형

8. 비동기 메시지 전송하기

  • REST를 사용한 동기화 통신이외에도 비동기 메시징을 통해 애플리케이션 간 통신할 수 있다.
  • 비동기 메시징은 애플리케이션 간에 응답을 기다리지 않고 간접적으로 메시지를 전송하는 방법
  • 비동기 메시징을 통해 통신하는 애플리케이션 간의 결합도를 낮추고 확장성을 높여준다.


비동기 메시징 종류

  • JMS
  • RabbitMQ
  • AMQP
  • 아파치 카프카
  • EJB의 MDB(스프링의 메시지 기반 POJO 지원) 등이 있다.


메시지를 수신하는 방식

  • 풀 모델
    • 우리 코드에서 메시지를 요청하고 도착할 때까지 기다리는 방식
  • 푸시 모델
    • 메시지가 수신 가능하게 되면 우리 코드로 자동 전달하는 방식
  • 두가지 모델 모두 용도에 맞게 사용할 수 있다. 그러나 스레드의 실행을 막지 않으므로 일반적으로 푸시 모델이 좋은 선택이다.
  • 단 많은 메시지가 너무 빨리 도착한다면 리스너에 과부하가 걸리는 경우가 생길 수 있음.
  • 메시지 리스너는 중단 없이 다수의 메시지를 빠르게 처리할 수 있어서 좋은 선택이 될 때가 있다.
  • 그러나 메시지 처리기가 자신의 시간에 맞춰 더 많은 메시지를 요청할 수 있어야 한다면 JmsTemplate이 제공하는 풀 모델이 더 적합.


cf) JMS의 단점

  • JMS가 자바 명세이므로 자바 애플리케이션에서만 사용할 수 있다는 것.
  • RabbitMQ와 카프카 같은 더 새로운 메시징 시스템은 이러한 단점을 해결하여 다른 언어와 JVM 외의 다른 플랫폼에서 사용할 수 있다.


RabbitMQ로 메시지 전송하기

  • AMQP의 가장 중요한 구현이라 할 수 있는 RabbitMQ는 JMS보다 더 진보된 메시지 라우팅 전략을 제공.
  • JMS 메시지가 수신자가 가져갈 메시지 도착지의 이름을 주소로 사용하는 반면, AMQP 메시지는 수신자가 리스닝하는 큐와 분리된 거래소 이름과 라우팅 키를 주소로 사용 함.

  • 메시지가 RabbitMQ 브로커에 도착하면 주소로 지정된 거래소로 들어간다.
  • 거래소는 하나 이상의 큐에 메시지를 전달할 책임이 있다.


거래소 종류

  • Default
    • 브로커가 자동으로 생성하는 특별한 거래소. 해당 메시지의 라우팅 키와 이름이 같은 큐로 메시지를 전달. 모든 큐는 자동으로 기본 거래소와 연결
  • Direct
    • 바인딩 키가 해당 메시지의 라우팅 키와 같은 큐에 메시지를 전달.
  • Topic
    • 바인딩 키와 해당 메시지의 라우팅 키와 일치하는 하나 이상의 큐에 메시지를 전달
  • Fanout
    • 바인딩 키나 라우팅 키에 상관없이 모든 연결된 큐에 메시지를 전달
  • Header
    • 토픽 거래소와 유사하며, 라우팅 키 대신 메시지 헤더 값을 기반으로 한다는 것만 다름
  • Dead letter
    • 전달 불가능 모든 메시지를 보관하는 거래소


RabbmitMQ를 스프링에 추가하기.


<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • AMQP 스타터를 빌드에 추가하면 다른 지원 컴포넌트는 물론이고 AMQP 연결 팩토리와 RabbitTemplate 빈을 생성하는 자동-구성이 수행됨.
  • 따라서 스프링을 사용해서 RabbitMQ 브로커로부터 메시지를 전송 및 수신할 수 있다.


RabbitMQ 브로커의 위치와 인증 정보를 구성하는 속성

spring:
  profiles: prod
  rabbitmq:
    host: rabbit.tacocloud.com
    port: 5673
    username: tacoweb
    password: tacoweb


RabbitTemplate을 사용해서 메시지 전송하기

public interface OrderMessagingService {

    void sendOrder(Order order);

}


방법1) send()


@Service
public class RabbitOrderMessagingService implements OrderMessagingService {

    private final RabbitTemplate rabbitTemplate;

    public RabbitOrderMessagingService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @Override
    public void sendOrder(Order order) {
        MessageConverter converter = rabbitTemplate.getMessageConverter();
        MessageProperties properties = new MessageProperties();
        Message message = converter.toMessage(order, properties);
        rabbitTemplate.send("tacocloud.orders", message);
    }

}


방법2) convertAndSend()

  • 모든 변환 작업을 RabbitTemplate이 처리하도록 convertAndSend()를 사용할 수 있다.

@Service
public class RabbitOrderMessagingService implements OrderMessagingService {

    private final RabbitTemplate rabbitTemplate;

    public RabbitOrderMessagingService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @Override
    public void sendOrder(Order order) {
        rabbitTemplate.convertAndSend("tacocloud.orders", order);
    }

}


메시지 변환기 구성하기

기본적인 메시지 변환은 SimpleMessageConverter로 수행되며, 이것은 String 같은 간단한 타입과 Serializable 객체를 Message 객체로 변환할 수 있다. 그러나 스프링은
RabbitTemplate에서 사용할 수 있는 다양한 메시지 변환기를 제공함.

  • Jackson2JsonMessageConverter 객체를 JSON으로 상호 변환
  • MarshallingMessageConverter
    • 스프링 Marshaller와 Unmarshaller를 사용해서 변환
  • SerializerMessageConverter
    • 스프링의 Serializer, Deserializer를 사용해서 String 객체를 변환.
  • SimpleMessageConverter
    • String, byte 배열, Serializable 타입을 변환.
  • ContentTypeDelegatingMessageConverter
    • contentType 헤더를 기반으로 다른 메시지 변환기에 변환을 위임한다.

@Configuration
public class MessagingConfig {

    @Bean
    public Jackson2JsonMessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

}
  • JSON 기반 메시지 변환.
  • 스프링 부트 자동-구성에서 이 빈을 찾아서 기본 메시지 변환기 대신 이 빈을 RabbitTemplate으로 주입한다.


메시지 속성 설정하기

  • 메시지의 일부 헤더를 설정해야 하는 경우.
  • Message 객체를 생성할 때 메시지 변환기에서 제공하는 MessageProperties 인스턴스를 통해 헤더를 설정할 수 있다.


방법1) send()


@Service
public class RabbitOrderMessagingService implements OrderMessagingService {

    private final RabbitTemplate rabbitTemplate;

    public RabbitOrderMessagingService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @Override
    public void sendOrder(Order order) {
        MessageConverter converter = rabbitTemplate.getMessageConverter();
        MessageProperties properties = new MessageProperties();
        properties.setHeader("X_ORDER_SOURCE", "WEB"); // 헤더를 설정
        Message message = converter.toMessage(order, properties);
        rabbitTemplate.send("tacocloud.orders", message);
    }

}


방법2) convertAndSend()


@Service
public class RabbitOrderMessagingService implements OrderMessagingService {

    private final RabbitTemplate rabbitTemplate;

    public RabbitOrderMessagingService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @Override
    public void sendOrder(Order order) {
        // 모든 변환 작업을 RabbitTemplate이 처리하도록 convertAndSend()를 사용할 수 있다.
        rabbitTemplate.convertAndSend("tacocloud.orders", order, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                MessageProperties properties = message.getMessageProperties();
                properties.setHeader("X_ORDER_SOURCE", "WEB");
                return message;
            }
        });
    }

}


RabbitMQ로부터 메시지 수신하기

  • Pull 모델: RabbitTemplate을 사용해서 큐로부터 메시지를 가져온다.
  • Push 모델: @RabbitListener가 지정된 메서드로부터 메시지가 푸시 된다.

@Component
public class RabbitOrderReceiver {

    private final RabbitTemplate rabbitTemplate;

    public RabbitOrderReceiver(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    // 방법1
    public Order receiveOrder() {
        return rabbitTemplate.receiveAndConvert("tacocloud.orders", new ParameterizedTypeReference<Order>() {
        });
    }

    // 방법2
    public Order receiveOrder() {
        return (Order) rabbitTemplate.receiveAndConvert("tacocloud.orders");
    }

    // 방법3
    public Order receiveOrder() {
        Message message = rabbitTemplate.receive("tacocloud.orders");
        return message != null ? (Order) messageConverter.fromMessage(message) : null;
    }

}


타임아웃 설정하기

spring:
  rabbitmq:
    template:
      receive-timeout: 30000 # timeout


리스너를 사용해서 RabbitMQ 메시지 처리하기


@Slf4j
@Component
public class OrderListener {

    @RabbitListener(queues = "tacocloud.orders")
    public void receiveOrder(Order order) {
        log.info(order.toString() + " send");
    }

}


Kafka 사용하기

  • 아파치 카프카는 가장 새로운 메시징 시스템이며 ActiveMQ, Artemis, RabbitMQ와 유사한 메시지 브로커이다.
  • 그러나 카프카는 특유의 아키텍처를 가짐.
  • 카프카는 높은 확장성을 제공하는 클러스터로 실행되도록 설계되었음.
  • 클러스터의 모든 카프카 인스턴스에 걸쳐 토픽을 파티션으로 분할하여, 메시지를 관리.
  • RabbitMQ가 거래소와 큐를 사용해서 메시지를 처리하는 반면, 카프카는 토픽만 사용.
  • 카프카의 토픽은 모든 브로커에 걸쳐 복제된다.
  • 클러스터의 각 노드는 하나 이상의 토픽에 대한 리더로 동작하며, 토픽 데이터를 관리하고 클러스터의 다른 노드로 데이터를 복제한다.

- 카프카 클러스터는 여래 개의 브로커로 구성되며, 각 브로커는 토픽의 파티션의 리더로 동작. - 각 토픽은 여러 개의 파티션으로 분할될 수 있따. 이 경우 클러스터의 각 노드는 한 토픽의 하나 이상의 파티션의 리더가 됨.


카프카 사용을 위해 스프링 설정하기


<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
  • 의존성을 추가하면 스프링 부트가 카프카 사용을 위한 자동-구성을 해준다. (KafkaTemplate을 준비함)
  • 우리는 KafkaTemplate을 주입하고 메시지를 전송, 수신하면 된다.


카프카 클러스터 설정

spring:
  kafka:
    bootstrap-servers:
      - localhost:9092
    default-topic: test # 기본 토픽을 설정할 필요 있을 경

---
spring:
  profiles: prod
  kafka:
    bootstrap-servers:
      - kafka.tacocloud.com:9092
      - kafka.tacocloud.com:9093
      - kafka.tacocloud.com:9093 # 클러스터의 여러 서버를 지정할 수 있음.
bin/zookeeper-server-start.sh config/zookeeper.properties # 주키버 서버 실행 
bin/kafka-server-start.sh config/server.properties # 카프카 브로커 실행
bin/kafka-topics.sh -create -zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test # 토픽(test) 생성


KafkaTemplate을 사용해서 메시지 전송하기


@Service
public class KafkaOrderMessagingService implements OrderMessagingService {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaOrderMessagingService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @Override
    public void sendOrder(Order order) {
        kafkaTemplate.send("test", order.getDeliveryName());
//        kafkaTemplate.sendDefault(order.getDeliveryName()); // Default 토픽을 설정하고 보내는 경우
    }

}


카프카 리스너 작성하기

  • 카프카는 send(), sendDefault() 특유의 메소드 시그니처 외에도 KafkaTemplate은 메시지를 수신하는 메서드를 일체 제공하지 않음.
  • 따라서 스프링을 사용해서 카프카 토픽 메시지를 가져오는 유일한 방법은 메시지 리스너를 작성하는 것.

@Slf4j
@Component
public class OrderListener {

    @KafkaListener(topics = "test")
    public void handle(String message) {
        log.info("message is receive" + message);
    }

}


수신된 메시지의 파티션과 타임스탬프를 가져올 수 있다.


@Slf4j
@Component
public class OrderListener {

    @KafkaListener(topics = "test", groupId = "test")
    public void handle(String message, ConsumerRecord<String, String> record) {
        log.info("Received message {} from partition {} with timestamp {}", message, record.partition(), record.timestamp());
    }

}


정리

  • 애플리케이션 간 비동기 메시지 큐를 이용한 통신 방식은 간접 계층을 제공하므로 애플리케이션 간의 결합도는 낮추면서 확장성은 높임.
  • 스프링은 JMS, RabbitMQ 또는 아파치 카프카를 사용해서 비동기 메시징을 지원.
  • 스프링 애플리케이션은 템플릿 기반의 클라이언트인 JmsTemplate, RabbitTemplate, KafkaTemplate을 사용해서 메시지 브로커를 통한 메시지 전송을 할 수 있다.
  • 메시지 수신 애플리케이션은 같은 템플릿 기반의 클라이언트들을 사용해서 풀 모델 형태의 메시지 소비를 할 수 있다. (카프카 제외)
  • 메시지 리스너 애플리케이션을 빈 메소드에 지정하면 푸쉬 모델의 형태로 Consumer에게 메시지가 전송될 수 있다.
반응형