test #426
@@ -14,6 +14,7 @@ import org.springframework.data.redis.connection.RedisStandaloneConfiguration
|
|||||||
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration
|
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration
|
||||||
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
|
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
|
||||||
import org.springframework.data.redis.core.RedisTemplate
|
import org.springframework.data.redis.core.RedisTemplate
|
||||||
|
import org.springframework.data.redis.listener.RedisMessageListenerContainer
|
||||||
import org.springframework.data.redis.repository.configuration.EnableRedisRepositories
|
import org.springframework.data.redis.repository.configuration.EnableRedisRepositories
|
||||||
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer
|
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer
|
||||||
import org.springframework.data.redis.serializer.RedisSerializationContext
|
import org.springframework.data.redis.serializer.RedisSerializationContext
|
||||||
@@ -63,6 +64,13 @@ class RedisConfig(
|
|||||||
return redisTemplate
|
return redisTemplate
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
fun redisMessageListenerContainer(redisConnectionFactory: RedisConnectionFactory): RedisMessageListenerContainer {
|
||||||
|
val container = RedisMessageListenerContainer()
|
||||||
|
container.setConnectionFactory(redisConnectionFactory)
|
||||||
|
return container
|
||||||
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
fun cacheManager(redisConnectionFactory: RedisConnectionFactory): RedisCacheManager {
|
fun cacheManager(redisConnectionFactory: RedisConnectionFactory): RedisCacheManager {
|
||||||
val defaultCacheConfig = RedisCacheConfiguration.defaultCacheConfig()
|
val defaultCacheConfig = RedisCacheConfiguration.defaultCacheConfig()
|
||||||
|
|||||||
@@ -0,0 +1,65 @@
|
|||||||
|
package kr.co.vividnext.sodalive.v2.usercreatorchat.websocket
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
|
import org.springframework.data.redis.connection.Message
|
||||||
|
import org.springframework.data.redis.connection.MessageListener
|
||||||
|
import org.springframework.data.redis.core.StringRedisTemplate
|
||||||
|
import org.springframework.data.redis.listener.PatternTopic
|
||||||
|
import org.springframework.data.redis.listener.RedisMessageListenerContainer
|
||||||
|
import org.springframework.stereotype.Component
|
||||||
|
import org.springframework.web.socket.TextMessage
|
||||||
|
import org.springframework.web.socket.WebSocketSession
|
||||||
|
import java.nio.charset.StandardCharsets
|
||||||
|
|
||||||
|
@Component
|
||||||
|
class UserCreatorChatRoomMessageBroker(
|
||||||
|
private val stringRedisTemplate: StringRedisTemplate,
|
||||||
|
private val sessionRegistry: UserCreatorChatWebSocketSessionRegistry,
|
||||||
|
private val objectMapper: ObjectMapper,
|
||||||
|
listenerContainer: RedisMessageListenerContainer
|
||||||
|
) : MessageListener {
|
||||||
|
init {
|
||||||
|
listenerContainer.addMessageListener(this, PatternTopic("$ROOM_CHANNEL_PREFIX:*"))
|
||||||
|
}
|
||||||
|
|
||||||
|
fun publish(roomId: Long, memberId: Long, payload: String) {
|
||||||
|
val message = UserCreatorChatRoomPublishedMessage(
|
||||||
|
roomId = roomId,
|
||||||
|
memberId = memberId,
|
||||||
|
payload = payload
|
||||||
|
)
|
||||||
|
stringRedisTemplate.convertAndSend(roomChannel(roomId), objectMapper.writeValueAsString(message))
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun onMessage(message: Message, pattern: ByteArray?) {
|
||||||
|
val published = objectMapper.readValue(
|
||||||
|
String(message.body, StandardCharsets.UTF_8),
|
||||||
|
UserCreatorChatRoomPublishedMessage::class.java
|
||||||
|
)
|
||||||
|
sessionRegistry.findSessions(published.roomId, published.memberId)
|
||||||
|
.filter { session -> session.isOpen }
|
||||||
|
.forEach { session -> sendMessage(session, published.payload) }
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun sendMessage(session: WebSocketSession, payload: String) {
|
||||||
|
try {
|
||||||
|
session.sendMessage(TextMessage(payload))
|
||||||
|
} catch (_: Exception) {
|
||||||
|
sessionRegistry.remove(session.id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
private const val ROOM_CHANNEL_PREFIX = "v2:user-creator-chat:ws:room"
|
||||||
|
|
||||||
|
fun roomChannel(roomId: Long): String {
|
||||||
|
return "$ROOM_CHANNEL_PREFIX:$roomId"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
data class UserCreatorChatRoomPublishedMessage(
|
||||||
|
val roomId: Long,
|
||||||
|
val memberId: Long,
|
||||||
|
val payload: String
|
||||||
|
)
|
||||||
@@ -0,0 +1,113 @@
|
|||||||
|
package kr.co.vividnext.sodalive.v2.usercreatorchat.websocket
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
|
import org.junit.jupiter.api.Assertions.assertEquals
|
||||||
|
import org.junit.jupiter.api.DisplayName
|
||||||
|
import org.junit.jupiter.api.Test
|
||||||
|
import org.mockito.ArgumentCaptor
|
||||||
|
import org.mockito.Mockito
|
||||||
|
import org.springframework.data.redis.connection.Message
|
||||||
|
import org.springframework.data.redis.connection.MessageListener
|
||||||
|
import org.springframework.data.redis.core.StringRedisTemplate
|
||||||
|
import org.springframework.data.redis.listener.PatternTopic
|
||||||
|
import org.springframework.data.redis.listener.RedisMessageListenerContainer
|
||||||
|
import org.springframework.web.socket.TextMessage
|
||||||
|
import org.springframework.web.socket.WebSocketSession
|
||||||
|
import java.io.IOException
|
||||||
|
import java.nio.charset.StandardCharsets
|
||||||
|
|
||||||
|
class UserCreatorChatRoomMessageBrokerTest {
|
||||||
|
private val stringRedisTemplate = Mockito.mock(StringRedisTemplate::class.java)
|
||||||
|
private val registry = UserCreatorChatWebSocketSessionRegistry()
|
||||||
|
private val listenerContainer = Mockito.mock(RedisMessageListenerContainer::class.java)
|
||||||
|
private val objectMapper = ObjectMapper().findAndRegisterModules()
|
||||||
|
private val broker = UserCreatorChatRoomMessageBroker(
|
||||||
|
stringRedisTemplate = stringRedisTemplate,
|
||||||
|
sessionRegistry = registry,
|
||||||
|
objectMapper = objectMapper,
|
||||||
|
listenerContainer = listenerContainer
|
||||||
|
)
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("room channel로 target member와 payload를 publish한다")
|
||||||
|
fun shouldPublishMessageToRoomChannel() {
|
||||||
|
broker.publish(roomId = 10L, memberId = 20L, payload = "{\"type\":\"MESSAGE\"}")
|
||||||
|
|
||||||
|
val messageCaptor = ArgumentCaptor.forClass(String::class.java)
|
||||||
|
Mockito.verify(stringRedisTemplate).convertAndSend(
|
||||||
|
Mockito.eq("v2:user-creator-chat:ws:room:10"),
|
||||||
|
messageCaptor.capture()
|
||||||
|
)
|
||||||
|
|
||||||
|
val published = objectMapper.readValue(messageCaptor.value, UserCreatorChatRoomPublishedMessage::class.java)
|
||||||
|
assertEquals(10L, published.roomId)
|
||||||
|
assertEquals(20L, published.memberId)
|
||||||
|
assertEquals("{\"type\":\"MESSAGE\"}", published.payload)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("생성 시 ws room pattern topic을 구독한다")
|
||||||
|
fun shouldSubscribeRoomPatternOnCreation() {
|
||||||
|
Mockito.verify(listenerContainer).addMessageListener(
|
||||||
|
Mockito.any(MessageListener::class.java),
|
||||||
|
Mockito.eq(PatternTopic("v2:user-creator-chat:ws:room:*"))
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("subscribe callback은 대상 member의 local session에만 메시지를 전송한다")
|
||||||
|
fun shouldDeliverSubscribedMessageOnlyToTargetMemberSessions() {
|
||||||
|
val targetSession = session("target-session")
|
||||||
|
val otherMemberSession = session("other-session")
|
||||||
|
registry.register(roomId = 10L, memberId = 20L, session = targetSession)
|
||||||
|
registry.register(roomId = 10L, memberId = 21L, session = otherMemberSession)
|
||||||
|
val published = UserCreatorChatRoomPublishedMessage(
|
||||||
|
roomId = 10L,
|
||||||
|
memberId = 20L,
|
||||||
|
payload = "{\"type\":\"MESSAGE\"}"
|
||||||
|
)
|
||||||
|
|
||||||
|
broker.onMessage(redisMessage(objectMapper.writeValueAsString(published)), null)
|
||||||
|
|
||||||
|
val textCaptor = ArgumentCaptor.forClass(TextMessage::class.java)
|
||||||
|
Mockito.verify(targetSession).sendMessage(textCaptor.capture())
|
||||||
|
assertEquals("{\"type\":\"MESSAGE\"}", textCaptor.value.payload)
|
||||||
|
Mockito.verify(otherMemberSession, Mockito.never()).sendMessage(Mockito.any(TextMessage::class.java))
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("일부 local session 전송이 실패해도 같은 member의 다른 session 전송을 계속한다")
|
||||||
|
fun shouldContinueDeliveryWhenOneTargetSessionFails() {
|
||||||
|
val brokenSession = session("broken-session")
|
||||||
|
val healthySession = session("healthy-session")
|
||||||
|
Mockito.doThrow(IOException("broken socket"))
|
||||||
|
.`when`(brokenSession)
|
||||||
|
.sendMessage(Mockito.any(TextMessage::class.java))
|
||||||
|
registry.register(roomId = 10L, memberId = 20L, session = brokenSession)
|
||||||
|
registry.register(roomId = 10L, memberId = 20L, session = healthySession)
|
||||||
|
val published = UserCreatorChatRoomPublishedMessage(
|
||||||
|
roomId = 10L,
|
||||||
|
memberId = 20L,
|
||||||
|
payload = "{\"type\":\"MESSAGE\"}"
|
||||||
|
)
|
||||||
|
|
||||||
|
broker.onMessage(redisMessage(objectMapper.writeValueAsString(published)), null)
|
||||||
|
|
||||||
|
val textCaptor = ArgumentCaptor.forClass(TextMessage::class.java)
|
||||||
|
Mockito.verify(healthySession).sendMessage(textCaptor.capture())
|
||||||
|
assertEquals("{\"type\":\"MESSAGE\"}", textCaptor.value.payload)
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun redisMessage(body: String): Message {
|
||||||
|
val message = Mockito.mock(Message::class.java)
|
||||||
|
Mockito.`when`(message.body).thenReturn(body.toByteArray(StandardCharsets.UTF_8))
|
||||||
|
return message
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun session(id: String): WebSocketSession {
|
||||||
|
val session = Mockito.mock(WebSocketSession::class.java)
|
||||||
|
Mockito.`when`(session.id).thenReturn(id)
|
||||||
|
Mockito.`when`(session.isOpen).thenReturn(true)
|
||||||
|
return session
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user