Joo-Kwang Park

Joo-Kwang Park

Developer

© 2021

Dark Mode

Spring Boot에서 Amazon MQ의 RabbitMQ 연동 (1)

RabbitMQ는 오픈소스 메세지 브로커 소프트웨어이다. 메세지 브로커를 사용하여 다양한 어플리케이션 간 통신을 지원할 수 있다. 언어와 상관없이 AMQP(Advanced Message Queuing Protocol)을 사용하므로 확장성이 좋다고 할 수 있다. 메세지 브로커는 RabbitMQ 뿐만 아니라 Redis, Apache ActiveMQ 등 다양한 소프트웨어가 존재한다. 각 소프트웨어마다 차이점이 있지만, RabbitMQ의 특징만 정리한다. RabbitMQ는 메세지 전달을 위한 큐를 가지고 있다. 큐는 하나만 구성이 될 수 있으며, 여러개의 큐를 관리할 수 있다. 뿐만 아니라 네임스페이스를 만들어서 큐를 그룹핑 할 수 있다. 토픽이라는 것이 있으며 MQTT(Message Queuing Telemetry Transport)와 같은 방식으로 메세지를 전달할 수 있다. 추후 RabbitMQ 에 대한 내용은 RabbitMQ Tutorial을 정리할 예정이다.

AWS 에서는 AmazonMQ 제품을 제공하고 있다. AmazonMQ는 Apache ActiveMQ만 지원을 하였지만, 2020년 11월 4일 RabbitMQ를 추가적으로 지원하였다. AmazonMQ는 메세지 브로커를 쉽게 구축하고 운영할 수 있도록 해준다. 또한, 높은 확장성과 안정성을 제공한다. 본 글에서는 스프링 부트에서 RabbitMQ를 연동하는 방법을 정리하려고 한다.

RabbitMQ 브로커 생성

AWS에 접속하고 Amazon MQ 페이지로 이동한다. ‘시작하기’ 버튼을 눌러서 브로커를 만들기 위한 페이지로 이동한다. 브로커는 ‘RabbitMQ’를 선택한다. 배포 모드는 ‘단일 인스턴스 브로커’와 ‘클러스터 배포’가 있지만, 지금은 개발/테스트 용도이므로 ‘단일 인스턴스 브로커’를 선택한다. 다음으로 넘어가면 설정 구성을 할 수 있는데, 브로커 이름은 적당하게 지어주고 인스턴스 유형은 ‘mq.t3.micro’를 선택한다. 마지막으로 RabbitMQ 액세스를 할 수 있는 사용자 이름과 비밀번호를 입력한다. 이때 입력한 정보는 잘 기억을 하고 있어야 추후에 관리자 페이지 접속 및 브로커 서버에 접속이 가능하다. 추가적인 설정을 보면 기본적으로 액세스 유형이 ‘퍼블릭 액세스’로 설정되어 있다. 추후 실제 운영을 하기 위해서는 VPC 내에서만 접속이 가능한 ‘프라이빗 액세스’로 만드는 것이 좋다. 이와 같이 설정 후 생성하는데 약 5~10분 정도 시간 소요된다. 이때 커피 한 잔 내려오면 적절하다.

image

브로커가 정상적으로 생성이 됐을 경우에는 위와 같은 화면을 볼 수 있을 것이다. ‘RabbitMQ 웹 콘솔’ URL을 클릭하게 되면, RabbitMQ 관리자 페이지에 접속할 수 있다. 접속 정보는 브로커 생성 시 입력한 사용자 이름과 비밀번호이다.

스프링 부트와 RabbitMQ 연동

기존 스프링 부트 프로젝트가 없는 경우 Spring initializr에서 프로젝트를 쉽게 생성할 수 있다.

image

기존 프로젝트가 있는 경우에는 다음과 같이 build.gradle 파일에서의 dependencies 를 추가한다.

implementation 'org.springframework.boot:spring-boot-starter-amqp'

또한, 내부적으로 jackson 라이브러리를 사용하여 json을 오브젝트로 변환하는 기능을 사용하려고 한다. 다음과 같이 build.gradle 파일에 jackson에서 제공하는 databind 라이브러리도 추가한다.

implementation 'com.fasterxml.jackson.core:jackson-databind'

본인이 사용하는 에디터를 가지고 스프링 부트 프로젝트를 연다. 다음과 같이 application.yml 에서 RabbitMQ 접속 정보를 입력한다. application.properties는 해당하는 파일 양식으로 작성하면 된다.

spring:  
 rabbitmq:  
    host: <Your Host>
    port: 5671  
	username: <Your UserName>  
    password: <Your Password>
    virtual-host: /vesta
    ssl:  
      enabled: true
 listener:  
      simple:  
        acknowledge-mode: manual

위에서 rabbitmq 필드의 host, username, password는 브로커 생성 시 입력했던 사용자 정보와 비밀번호를 입력하면 된다. listener에서 acknowledge-mode가 manual인데, 스프링 부트에서 RabbitMQ Listener에서 메세지를 정상적으로 처리가 됐다는 ack을 수동으로 보내겠다는 의미이다. 이것에 대한 내용은 뒤에 설명한다. 기본적으로 Amazon MQ에서는 SSL을 지원하는 브로커로 생성된다. 따라서, ssl.enabled 필드를 true 로 설정해야 한다. 이를 설정하지 않는 경우 브로커 접속이 되지 않으며, 다음과 같은 오류메세지가 발생한다.

org.springframework.amqp.AmqpIOException: java.io.IOException
	at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:70) ~[spring-rabbit-2.3.2.jar:2.3.2]
	at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:602) ~[spring-rabbit-2.3.2.jar:2.3.2]
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:723) ~[spring-rabbit-2.3.2.jar:2.3.2]
	at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:214) ~[spring-rabbit-2.3.2.jar:2.3.2]
	at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2128) ~[spring-rabbit-2.3.2.jar:2.3.2]
	at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2101) ~[spring-rabbit-2.3.2.jar:2.3.2]
	at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2081) ~[spring-rabbit-2.3.2.jar:2.3.2]
	at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueInfo(RabbitAdmin.java:407) ~[spring-rabbit-2.3.2.jar:2.3.2]
	at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:391) ~[spring-rabbit-2.3.2.jar:2.3.2]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.attemptDeclarations(AbstractMessageListenerContainer.java:1883) ~[spring-rabbit-2.3.2.jar:2.3.2]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java:1864) ~[spring-rabbit-2.3.2.jar:2.3.2]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1345) ~[spring-rabbit-2.3.2.jar:2.3.2]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1191) ~[spring-rabbit-2.3.2.jar:2.3.2]
	at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: java.io.IOException: null
	at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129) ~[amqp-client-5.10.0.jar:5.10.0]
	at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125) ~[amqp-client-5.10.0.jar:5.10.0]
	at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:396) ~[amqp-client-5.10.0.jar:5.10.0]
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1139) ~[amqp-client-5.10.0.jar:5.10.0]
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1087) ~[amqp-client-5.10.0.jar:5.10.0]
	at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connectAddresses(AbstractConnectionFactory.java:638) ~[spring-rabbit-2.3.2.jar:2.3.2]
	at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connect(AbstractConnectionFactory.java:613) ~[spring-rabbit-2.3.2.jar:2.3.2]
	at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:565) ~[spring-rabbit-2.3.2.jar:2.3.2]
	... 12 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
	at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-5.10.0.jar:5.10.0]
	at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) ~[amqp-client-5.10.0.jar:5.10.0]
	at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502) ~[amqp-client-5.10.0.jar:5.10.0]
	at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:326) ~[amqp-client-5.10.0.jar:5.10.0]
	... 17 common frames omitted
Caused by: java.net.SocketException: Connection reset
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:186) ~[na:na]
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140) ~[na:na]
	at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:252) ~[na:na]
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:271) ~[na:na]
	at java.base/java.io.DataInputStream.readUnsignedByte(DataInputStream.java:293) ~[na:na]
	at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91) ~[amqp-client-5.10.0.jar:5.10.0]
	at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:184) ~[amqp-client-5.10.0.jar:5.10.0]
	at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:665) ~[amqp-client-5.10.0.jar:5.10.0]
	... 1 common frames omitted

이제 스프링 부트에서 RabbitMQ을 연결하기 위한 환경을 모두 완료 하였다. 이제 실질적으로 코드를 작성해서 RabbitMQ 에서 사용할 Queue를 만들고 토픽에 대하여 큐를 연동하기 위한 Exchange 바인딩 과정을 거칠 것이다. 다음과 같이 RabbitConfiguration.java 파일을 생성한다

@Configuration  
public class RabbitConfiguration {  
  
    public static final String queueName = "default";  
	public static final String topicExchangeName = "test-exchange";  
  
	@Bean  
	Queue defaultQueue() {  
		return new Queue(queueName, false);  
	}  

	@Bean  
	TopicExchange exchange() {  
		return new TopicExchange(topicExchangeName);  
	}  
  
    @Bean  
	Binding bindingDefault(Queue queue, TopicExchange exchange) {  
		return BindingBuilder.bind(queue).to(exchange).with("default.#");  
	}  
  
    @Bean  
	RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {  
		RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);  
		rabbitTemplate.setMessageConverter(Jackson2JsonMessageConverter());  
		return rabbitTemplate;  
	}  
  
	@Bean  
	public Jackson2JsonMessageConverter Jackson2JsonMessageConverter(){  
		return new Jackson2JsonMessageConverter();  
	}  
  
}

큐의 이름은 ‘default’이고 exchange 이름은 ‘test-exchange’ 이다. ‘default.#’은 ‘default.<any>’ 로 이해하면 되며, 해당 토픽으로 들어오는 메세지는 모두 ‘default’ 큐에 넣겠다는 의미이다. 이것에 대한 자세한 설명은 추후 RabbitMQ 튜토리얼 정리에서 할 예정이다. 자세한 것은 RabbitMQ 튜토리얼에서 보면 아주 좋다. 메세지는 json 포맷으로 오는 것을 가정하고 jackson 라이브러리를 통해서 오브젝트로 변환한다.

마지막으로 DefaultListener.java 파일을 다음과 같이 작성한다.

@Component  
@RabbitListener(queues = RabbitConfiguration.queueName)  
public class DefaultListener {  
  
	@RabbitHandler  
	public void receiveMessage(LinkedHashMap message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {  
		try {  
			System.out.println(message);  
			channel.basicAck(tag, false);  
		} catch (Exception e) {  
			e.printStackTrace();  
		}  
	}  
  
}

RabbitListener 어노테이션을 통해서 DefaultListener 클래스는 RabbitMQ의 리스너라는 것을 명시한다. 큐의 이름은 RabbitConfiguration 내에서 정의한 ‘default’ 이다. 해당 어노테이션에서는 큐를 동시다발적으로 처리할 수 있는 옵션이 있는데, 이것은 추후 정리 예정이다.

RabbitHandler 어노테이션을 메소드에 달아주면, RabbitListener에 명시한 큐의 데이터를 처리할 수 있다. 첫 번째 인자는 들어오는 메세지인데, 현재 LinkedHashMap으로 받아들인다. Jackson 라이브러리가 json 데이터를 오브젝트로 변경해주는데, 이를 표현하는 방식이 LinkedHashMap이다. 클래스로 변환도 가능하지만, 이것은 추가적인 작업이 필요하다. 두 번째 인자로는 채널, 세 번째 인자로는 태그이다. 내부에서 처리하는 코드를 보면, channel.basicAck 라고 메세지를 정상적으로 받았다는 ack를 수동으로 보내는 것을 확인할 수 있다. 이렇게 처리하는 이유는 내부적인 exception 발생 시 메세지를 다시 처리 하기 위한 수단이다. Exception이 발생하여 메세지에 대한 처리가 비정상적으로 끝났다면, 서비스 정상화 후 같은 메세지를 처리할 필요가 있다. 이런 것을 컨트롤하기 위하여 수동적으로 ack 메세지를 보낸다.

이와 같이 작성 후 Spring Boot 어플리케이션을 실행해보자!

테스트

Amazon MQ 페이지에 접속 후 본인이 만든 브로커를 선택한다. 하단에 ‘RabbitMQ 웹 콘솔’에 있는 URL에 접속하여 본인이 입력한 아이디 및 패스워드를 입력한다. ‘Queues’ 탭으로 이동한다.

성공적으로 스프링부트가 구동 됐을 경우 Virtual Host에는 /, Name은 default 로 큐가 생성이 되어 있을 것이다. default 큐의 상세 페이지로 이동한다. Publish Message 란에 다음과 같이 입력 후 ‘Publish message’ 버튼을 눌러본다.

image

위와 같은 과정을 통해서 성공적으로 메세지를 보냈을 경우, 다음과 같이 Spring Boot 어플리케이션 로그가 찍히는 것을 확인할 수 있다.

image

결론

AWS에서 제공하는 Amazon MQ 제품을 통해 RabbitMQ 브로커 인스턴스를 생성하는 법을 알아보았으며, Spring Boot에서 RabbitMQ 연동을 하고 메세지를 받는 것을 테스트 해보았다. 현재는 메세지를 HashMap으로 받는 법만 정리 하였지만, 다음에는 Class 변환하는 법과 메세지를 전송하는 법에 대하여 정리할 예정이다. 본 글은 시리즈 글로 진행된다. 작성된 코드는 Github에서 확인 가능하다.

Comments