TMI
사내에서 다양한 데이터 수집 어플리케이션을 구동하고 있다.
서버에서 약 20여개의 개별로 구동되는 서브 어플리케이션의 수집주기 설정 및 모니터링에 어려움이 있었다.
각 서브 어플리케이션을 한 눈에 관리하기 위해 서브 어플리케이션 매니지먼트를 구상하게 되었다.
다수의 서브 어플리케이션간 데이터 통신 방법으로 웹소켓, 메세지 브로커 등 다양한 방법이 있다.
그 중 Redis pub/sub로 구현하기로 결정하였다.
주요한 선정 요인으로는
수집주기 설정 등 스케쥴링 관리는 Quartz로 구현할 예정이기에
이벤트 flag, state 등 짧고 간결한 메세지만 통신하면 되고, 따로 전송 메세지를 저장할 필요도 없었다.
In-Meory기반의 빠른 통신이 가능하여 채택하게 되었다.
현 포스팅은 Redis가 설치 및 설정이 되어있는 환경에서 Spring으로 구현 소스만 공유하기 때문에
Redis 설치 및 설정은 아래 포스팅에서 확인하면 됩니다.
2024.05.17 - [Database/Redis] - [Redis] 레디스 설치 및 기본 가이드
[Redis] 레디스 설치 및 기본 가이드
Redis 란?- 오픈 소스 인 메모리 데이터 구조 저장소로, 주로 데이터베이스, 캐시, 메시지 브로커로 사용- key-value 저장소의 형태를 가지며 String, List, Set, Hash 등 다양한 데이터 구조를 지원- 데이터
apronsksk.tistory.com
2024.05.20 - [Database/Redis] - [Redis] 레디스 Pub/Sub 기본 가이드
[Redis] 레디스 Pub/Sub 기본 가이드
Redis Pub/Sub 란?- 메시지 발행자(Publisher)와 구독자(Subscriber) 간의 메시지 전달을 위한 통신 방법- Publisher는 메시지를 특정 Channel에 발행하고, Subscriber는 Channel을 구독하여 메시지를 수신- Subscriber는
apronsksk.tistory.com
Redis Pub/Sub 설정
dependencies 추가
Gradle
dependencies {
// 레디스 연결
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
// 레디스 클라이언트
implementation 'io.lettuce.core:lettuce-core'
}
Maven
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>io.lettuce.core</groupId>
<artifactId>lettuce-core</artifactId>
</dependency>
</dependencies>
application 설정
properties
## 레디스 Publisher
# 레디스 호스트
spring.redis.host=localhost
# 레디스 포트
spring.redis.port=6379
## 레디스 Subscriber
# 레디스 호스트
spring.redis.host=localhost
# 레디스 포트
spring.redis.port=6379
# 레디스 채널
spring.redis.channels.name=test
# 이벤트 id
spring.redis.msg.ide=evt1,evt2
yml
## 레디스 Publisher
spring:
redis:
# 레디스 호스트
host: localhost
# 레디스 포트
port: 6379
## 레디스 Subscriber
spring:
redis:
# 레디스 호스트
host: localhost
# 레디스 포트
port: 6379
# 레디스 채널
channels:
name: test
# 이벤트 id
msg:
ide: evt1, evt2
Redis Publisher 구현
Configuration
@Slf4j
@Configuration
@RequiredArgsConstructor
@EnableRedisRepositories
public class RedisPublisherConfig {
// application.properties에서 가져온 Redis 호스트와 포트
@Value("${spring.redis.host}")
private String redisHost;
@Value("${spring.redis.port}")
private String redisPort;
/**
* LettuceConnectionFactory 빈을 생성하는 메서드
*
* @return RedisConnectionFactory - Redis 연결을 위한 팩토리 객체
*/
@Bean
public RedisConnectionFactory redisConnectionFactory() {
// 로그에 Redis 호스트와 포트 출력
log.info("-----------------------------------");
log.info("Redis Host : {}", redisHost);
log.info("Redis Port : {}", redisPort);
log.info("-----------------------------------");
// LettuceConnectionFactory를 생성하여 반환
return new LettuceConnectionFactory(redisHost, redisPort);
}
/**
* RedisTemplate 빈을 생성하는 메서드
*
* @return RedisTemplate<?, ?> - Redis 데이터 템플릿 객체
*/
@Bean
public RedisTemplate<?, ?> redisTemplate() {
// RedisTemplate 객체 생성
RedisTemplate<?, ?> redisTemplate = new RedisTemplate<>();
// Redis 연결 설정을 위해 LettuceConnectionFactory를 설정
redisTemplate.setConnectionFactory(redisConnectionFactory());
// Redis의 Key Serializer를 StringRedisSerializer로 설정
redisTemplate.setKeySerializer(new StringRedisSerializer());
// Redis의 Value Serializer를 Jackson2JsonRedisSerializer로 설정
redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(String.class));
// RedisTemplate 반환
return redisTemplate;
}
/**
* Redis pub/sub 메시지 리스너 컨테이너
*
* @return RedisMessageListenerContainer - Redis 메시지 리스너 컨테이너 객체
*/
@Bean
public RedisMessageListenerContainer redisMessageListener() {
// RedisMessageListenerContainer 생성
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
// Redis 연결 설정을 위해 LettuceConnectionFactory를 설정
container.setConnectionFactory(redisConnectionFactory());
// RedisMessageListenerContainer 반환
return container;
}
}
DTO
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageDto implements Serializable {
private static final long serialVersionUID = 1L;
// 전송할 메시지 내용
private String message;
// 메시지 발신자
private String sender;
// 채널 식별자
private String channelId;
}
Service
@Slf4j
@Service("redisPublisherService")
@RequiredArgsConstructor
public class RedisPublisherService {
// RedisTemplate을 주입받음
private final RedisTemplate<String, Object> template;
/**
* Object publish
*
* @param topic 메시지를 발행할 Redis 채널
* @param dto 발행할 메시지 객체
*/
public void publish(ChannelTopic topic, MessageDto dto) {
// MessageDto 객체를 받아서 Redis 채널에 해당 메시지를 발행합니다.
template.convertAndSend(topic.getTopic(), dto);
}
/**
* String publish
*
* @param topic 메시지를 발행할 Redis 채널
* @param data 발행할 문자열 데이터
*/
public void publish(ChannelTopic topic, String data) {
// 문자열 데이터를 받아서 Redis 채널에 해당 데이터를 발행합니다.
template.convertAndSend(topic.getTopic(), data);
}
}
Schedule Job
현재 예시는 Quartz를 사용하여 Schedule Job으로 메세지를 publish하는 로직으로 구현 하였지만
Controller를 통해 endpoint로 간단히 구현도 가능합니다.
Quartz
@Slf4j
@Component
@RequiredArgsConstructor
public class ScheduledDataSendJob implements Job {
// Redis Publisher 서비스를 주입받음
private final RedisPublisherService redisPublisherService;
/**
* 스케줄러가 실행하는 실제 작업을 정의하는 메서드
*
* @param jobExecutionContext Job 실행에 필요한 컨텍스트
* @throws JobExecutionException Job작업 실행 중 발생하는 예외
*/
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
// Job 실행에 필요한 데이터를 가져옴
JobDataMap jobDataMap = jobExecutionContext.getMergedJobDataMap();
// Quartz 정보를 저장하는 Map
Map<String, Object> paramDataMap = jobDataMap.getWrappedMap();
// 현재 스레드 ID
long threadId = Thread.currentThread().getId();
// 작업 시작을 로그로 출력
printStartLog(threadId);
log.info("data : {}", paramDataMap.toString());
// Trigger ID, Channel ID, Redis 채널 명, 메시지 등의 데이터를 가져옴
String triggerId = (String) paramDataMap.get("triggerId");
int channelId = Integer.parseInt(paramDataMap.get("channelId").toString());
String redisChannelName = (String) paramDataMap.get("jarCode");
String channelCode = (String) paramDataMap.get("channelCode");
log.info("triggerId : {} , channelId : {} , jarCode : {} , channelCode : {}", triggerId, channelId, redisChannelName, channelCode);
// Redis Publisher를 사용하여 메시지를 Redis 채널에 발행
String topicName = redisChannelName;
String msg = channelCode;
ChannelTopic topic = new ChannelTopic(topicName);
redisPublisherService.publish(topic, msg);
// 작업 종료를 로그로 출력
printEndLog(msg, threadId);
}
/**
* 작업 시작 로그를 출력하는 메서드
*
* @param threadId 현재 스레드 ID
*/
private void printStartLog(long threadId) {
log.info("{}]----------------------------------------------------------------------------", threadId);
log.info("{}]# execute job start", threadId);
}
/**
* 작업 종료 로그를 출력하는 메서드
*
* @param message 작업 메시지
* @param threadId 현재 스레드 ID
*/
private void printEndLog(String message, long threadId) {
log.info("{}]# {}", threadId, message);
log.info("{}]----------------------------------------------------------------------------", threadId);
}
}
Controller
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@Slf4j
@RestController("redisController")
@RequiredArgsConstructor
@RequestMapping("/api/redis")
public class RedisController {
// Redis Publisher 서비스를 주입받음
private final RedisPublisher redisPublisher;
/**
* POST 요청을 처리하는 엔드포인트
*
* @param channel 메시지를 발행할 Redis 채널 이름
* @param message 발행할 메시지 내용
* @return ResponseEntity<String> - 요청 처리 결과를 나타내는 HTTP 응답
*/
@PostMapping("/publish")
public ResponseEntity<String> publishMessage(@RequestParam String channel, @RequestParam String message) {
try {
// Redis 채널에 메시지 발행
ChannelTopic topic = new ChannelTopic(channel);
redisPublisher.publish(topic, message);
// 성공적으로 메시지를 발행한 경우
return ResponseEntity.ok("Message published successfully to channel: " + channel);
} catch (Exception e) {
// 메시지 발행 중 에러가 발생한 경우
log.error("Failed to publish message to channel: " + channel, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Failed to publish message to channel: " + channel);
}
}
}
Redis Subscriber 구현
Configuration
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.repository.configuration.EnableRedisRepositories;
@Slf4j
@Configuration
@RequiredArgsConstructor
@EnableRedisRepositories
public class RedisListenerConfig {
private final RedisListenerService redisListenerService;
// Redis에서 구독할 토픽 이름을 설정
@Value("${spring.redis.channels.name}")
private String redisChannelsName;
// application.properties에서 가져온 Redis 호스트와 포트
@Value("${spring.redis.host}")
private String redisHost;
@Value("${spring.redis.port}")
private String redisPort;
/**
* LettuceConnectionFactory 빈을 생성하는 메서드
*
* @return RedisConnectionFactory - Redis 연결을 위한 LettuceConnectionFactory 객체
*/
@Bean
public RedisConnectionFactory redisConnectionFactory() {
// Redis 연결 설정에 관한 로그를 출력
log.info("-----------------------------------");
log.info("Redis Host : {}", redisHost);
log.info("Redis Port : {}", redisPort);
log.info("Redis Channel : {}", redisChannelsName);
log.info("-----------------------------------");
// LettuceConnectionFactory를 생성하여 반환
return new LettuceConnectionFactory(redisHost, redisPort);
}
/**
* Redis 메시지를 수신하는 MessageListenerAdapter 빈을 생성하는 메서드
*
* @param listener RedisListener 객체
* @return MessageListenerAdapter - 메시지 리스너 어댑터 객체
*/
@Bean
public MessageListenerAdapter listenerAdapter() {
return new MessageListenerAdapter(redisListenerService);
}
/**
* Redis 연결 팩토리 및 메시지 리스너 어댑터를 설정하는
* RedisMessageListenerContainer 빈을 생성하는 메서드
*
* @param connectionFactory Redis 연결 팩토리
* @param listenerAdapter 메시지 리스너 어댑터
* @return RedisMessageListenerContainer - Redis 메시지 리스너 컨테이너
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
// RedisMessageListenerContainer 생성
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
// Redis 연결 팩토리를 설정
container.setConnectionFactory(connectionFactory);
// 구독할 Redis 토픽을 설정
container.addMessageListener(listenerAdapter, new PatternTopic(redisChannelsName));
return container;
}
}
Service
@Slf4j
@Service("redisListenerService")
@RequiredArgsConstructor
public class RedisListenerService implements MessageListener {
// 스케줄 서비스를 주입
private final ScheduledService service;
// Redis 메시지의 이벤트 ID들을 가져옴
@Value("${spring.redis.msg.ide}")
private String[] redisMsgIde;
/**
* Redis 메시지 수신 시 호출되는 메서드
*
* @param message 수신된 Redis 메시지
* @param pattern 메시지가 수신된 패턴
*/
@Override
public void onMessage(Message message, byte[] pattern) {
// 채널과 메시지 내용을 추출
String channel = new String(message.getChannel());
String bodyMsg = new String(message.getBody());
// 메시지 내용에서 따옴표 제거
bodyMsg = bodyMsg.replaceAll("\"", "");
// 수신된 메시지 정보를 로그로 출력
// byte[]로 오기 때문에 문자열로 형변환이 필요함
log.info("Message Received Channel : \t{}", channel);
log.info("Message Received Body : \t\t{}", bodyMsg);
// Redis 메시지의 이벤트 ID에 따라 작업 수행
// 메시지와 메서드 매핑
Map<String, Runnable> actionMap = new HashMap<>();
actionMap.put(redisMsgIde[0], service::MethodA);
actionMap.put(redisMsgIde[1], service::MethodB);
// 메시지에 해당하는 메서드 실행
Runnable action = actionMap.get(bodyMsg);
if (action != null) {
action.run();
}
}
}
참고사이트
• https://lucas-owner.tistory.com/68