From f44ea58ca23750a7600c6cfc7ccc3c6feb536db7 Mon Sep 17 00:00:00 2001 From: Klaus Date: Thu, 18 Jun 2026 19:08:16 +0900 Subject: [PATCH] =?UTF-8?q?feat(user-creator-chat):=20WebSocket=20Redis=20?= =?UTF-8?q?room=20broker=EB=A5=BC=20=EC=B6=94=EA=B0=80=ED=95=9C=EB=8B=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vividnext/sodalive/configs/RedisConfig.kt | 8 ++ .../UserCreatorChatRoomMessageBroker.kt | 65 ++++++++++ .../UserCreatorChatRoomMessageBrokerTest.kt | 113 ++++++++++++++++++ 3 files changed, 186 insertions(+) create mode 100644 src/main/kotlin/kr/co/vividnext/sodalive/v2/usercreatorchat/websocket/UserCreatorChatRoomMessageBroker.kt create mode 100644 src/test/kotlin/kr/co/vividnext/sodalive/v2/usercreatorchat/websocket/UserCreatorChatRoomMessageBrokerTest.kt diff --git a/src/main/kotlin/kr/co/vividnext/sodalive/configs/RedisConfig.kt b/src/main/kotlin/kr/co/vividnext/sodalive/configs/RedisConfig.kt index d6ec21a6..cdfcb52b 100644 --- a/src/main/kotlin/kr/co/vividnext/sodalive/configs/RedisConfig.kt +++ b/src/main/kotlin/kr/co/vividnext/sodalive/configs/RedisConfig.kt @@ -14,6 +14,7 @@ import org.springframework.data.redis.connection.RedisStandaloneConfiguration import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory import org.springframework.data.redis.core.RedisTemplate +import org.springframework.data.redis.listener.RedisMessageListenerContainer import org.springframework.data.redis.repository.configuration.EnableRedisRepositories import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer import org.springframework.data.redis.serializer.RedisSerializationContext @@ -63,6 +64,13 @@ class RedisConfig( return redisTemplate } + @Bean + fun redisMessageListenerContainer(redisConnectionFactory: RedisConnectionFactory): RedisMessageListenerContainer { + val container = RedisMessageListenerContainer() + container.setConnectionFactory(redisConnectionFactory) + return container + } + @Bean fun cacheManager(redisConnectionFactory: RedisConnectionFactory): RedisCacheManager { val defaultCacheConfig = RedisCacheConfiguration.defaultCacheConfig() diff --git a/src/main/kotlin/kr/co/vividnext/sodalive/v2/usercreatorchat/websocket/UserCreatorChatRoomMessageBroker.kt b/src/main/kotlin/kr/co/vividnext/sodalive/v2/usercreatorchat/websocket/UserCreatorChatRoomMessageBroker.kt new file mode 100644 index 00000000..09328707 --- /dev/null +++ b/src/main/kotlin/kr/co/vividnext/sodalive/v2/usercreatorchat/websocket/UserCreatorChatRoomMessageBroker.kt @@ -0,0 +1,65 @@ +package kr.co.vividnext.sodalive.v2.usercreatorchat.websocket + +import com.fasterxml.jackson.databind.ObjectMapper +import org.springframework.data.redis.connection.Message +import org.springframework.data.redis.connection.MessageListener +import org.springframework.data.redis.core.StringRedisTemplate +import org.springframework.data.redis.listener.PatternTopic +import org.springframework.data.redis.listener.RedisMessageListenerContainer +import org.springframework.stereotype.Component +import org.springframework.web.socket.TextMessage +import org.springframework.web.socket.WebSocketSession +import java.nio.charset.StandardCharsets + +@Component +class UserCreatorChatRoomMessageBroker( + private val stringRedisTemplate: StringRedisTemplate, + private val sessionRegistry: UserCreatorChatWebSocketSessionRegistry, + private val objectMapper: ObjectMapper, + listenerContainer: RedisMessageListenerContainer +) : MessageListener { + init { + listenerContainer.addMessageListener(this, PatternTopic("$ROOM_CHANNEL_PREFIX:*")) + } + + fun publish(roomId: Long, memberId: Long, payload: String) { + val message = UserCreatorChatRoomPublishedMessage( + roomId = roomId, + memberId = memberId, + payload = payload + ) + stringRedisTemplate.convertAndSend(roomChannel(roomId), objectMapper.writeValueAsString(message)) + } + + override fun onMessage(message: Message, pattern: ByteArray?) { + val published = objectMapper.readValue( + String(message.body, StandardCharsets.UTF_8), + UserCreatorChatRoomPublishedMessage::class.java + ) + sessionRegistry.findSessions(published.roomId, published.memberId) + .filter { session -> session.isOpen } + .forEach { session -> sendMessage(session, published.payload) } + } + + private fun sendMessage(session: WebSocketSession, payload: String) { + try { + session.sendMessage(TextMessage(payload)) + } catch (_: Exception) { + sessionRegistry.remove(session.id) + } + } + + companion object { + private const val ROOM_CHANNEL_PREFIX = "v2:user-creator-chat:ws:room" + + fun roomChannel(roomId: Long): String { + return "$ROOM_CHANNEL_PREFIX:$roomId" + } + } +} + +data class UserCreatorChatRoomPublishedMessage( + val roomId: Long, + val memberId: Long, + val payload: String +) diff --git a/src/test/kotlin/kr/co/vividnext/sodalive/v2/usercreatorchat/websocket/UserCreatorChatRoomMessageBrokerTest.kt b/src/test/kotlin/kr/co/vividnext/sodalive/v2/usercreatorchat/websocket/UserCreatorChatRoomMessageBrokerTest.kt new file mode 100644 index 00000000..b2f5d80f --- /dev/null +++ b/src/test/kotlin/kr/co/vividnext/sodalive/v2/usercreatorchat/websocket/UserCreatorChatRoomMessageBrokerTest.kt @@ -0,0 +1,113 @@ +package kr.co.vividnext.sodalive.v2.usercreatorchat.websocket + +import com.fasterxml.jackson.databind.ObjectMapper +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test +import org.mockito.ArgumentCaptor +import org.mockito.Mockito +import org.springframework.data.redis.connection.Message +import org.springframework.data.redis.connection.MessageListener +import org.springframework.data.redis.core.StringRedisTemplate +import org.springframework.data.redis.listener.PatternTopic +import org.springframework.data.redis.listener.RedisMessageListenerContainer +import org.springframework.web.socket.TextMessage +import org.springframework.web.socket.WebSocketSession +import java.io.IOException +import java.nio.charset.StandardCharsets + +class UserCreatorChatRoomMessageBrokerTest { + private val stringRedisTemplate = Mockito.mock(StringRedisTemplate::class.java) + private val registry = UserCreatorChatWebSocketSessionRegistry() + private val listenerContainer = Mockito.mock(RedisMessageListenerContainer::class.java) + private val objectMapper = ObjectMapper().findAndRegisterModules() + private val broker = UserCreatorChatRoomMessageBroker( + stringRedisTemplate = stringRedisTemplate, + sessionRegistry = registry, + objectMapper = objectMapper, + listenerContainer = listenerContainer + ) + + @Test + @DisplayName("room channel로 target member와 payload를 publish한다") + fun shouldPublishMessageToRoomChannel() { + broker.publish(roomId = 10L, memberId = 20L, payload = "{\"type\":\"MESSAGE\"}") + + val messageCaptor = ArgumentCaptor.forClass(String::class.java) + Mockito.verify(stringRedisTemplate).convertAndSend( + Mockito.eq("v2:user-creator-chat:ws:room:10"), + messageCaptor.capture() + ) + + val published = objectMapper.readValue(messageCaptor.value, UserCreatorChatRoomPublishedMessage::class.java) + assertEquals(10L, published.roomId) + assertEquals(20L, published.memberId) + assertEquals("{\"type\":\"MESSAGE\"}", published.payload) + } + + @Test + @DisplayName("생성 시 ws room pattern topic을 구독한다") + fun shouldSubscribeRoomPatternOnCreation() { + Mockito.verify(listenerContainer).addMessageListener( + Mockito.any(MessageListener::class.java), + Mockito.eq(PatternTopic("v2:user-creator-chat:ws:room:*")) + ) + } + + @Test + @DisplayName("subscribe callback은 대상 member의 local session에만 메시지를 전송한다") + fun shouldDeliverSubscribedMessageOnlyToTargetMemberSessions() { + val targetSession = session("target-session") + val otherMemberSession = session("other-session") + registry.register(roomId = 10L, memberId = 20L, session = targetSession) + registry.register(roomId = 10L, memberId = 21L, session = otherMemberSession) + val published = UserCreatorChatRoomPublishedMessage( + roomId = 10L, + memberId = 20L, + payload = "{\"type\":\"MESSAGE\"}" + ) + + broker.onMessage(redisMessage(objectMapper.writeValueAsString(published)), null) + + val textCaptor = ArgumentCaptor.forClass(TextMessage::class.java) + Mockito.verify(targetSession).sendMessage(textCaptor.capture()) + assertEquals("{\"type\":\"MESSAGE\"}", textCaptor.value.payload) + Mockito.verify(otherMemberSession, Mockito.never()).sendMessage(Mockito.any(TextMessage::class.java)) + } + + @Test + @DisplayName("일부 local session 전송이 실패해도 같은 member의 다른 session 전송을 계속한다") + fun shouldContinueDeliveryWhenOneTargetSessionFails() { + val brokenSession = session("broken-session") + val healthySession = session("healthy-session") + Mockito.doThrow(IOException("broken socket")) + .`when`(brokenSession) + .sendMessage(Mockito.any(TextMessage::class.java)) + registry.register(roomId = 10L, memberId = 20L, session = brokenSession) + registry.register(roomId = 10L, memberId = 20L, session = healthySession) + val published = UserCreatorChatRoomPublishedMessage( + roomId = 10L, + memberId = 20L, + payload = "{\"type\":\"MESSAGE\"}" + ) + + broker.onMessage(redisMessage(objectMapper.writeValueAsString(published)), null) + + val textCaptor = ArgumentCaptor.forClass(TextMessage::class.java) + Mockito.verify(healthySession).sendMessage(textCaptor.capture()) + assertEquals("{\"type\":\"MESSAGE\"}", textCaptor.value.payload) + } + + private fun redisMessage(body: String): Message { + val message = Mockito.mock(Message::class.java) + Mockito.`when`(message.body).thenReturn(body.toByteArray(StandardCharsets.UTF_8)) + return message + } + + private fun session(id: String): WebSocketSession { + val session = Mockito.mock(WebSocketSession::class.java) + Mockito.`when`(session.id).thenReturn(id) + Mockito.`when`(session.isOpen).thenReturn(true) + return session + } +}