fix(dm): MESSAGE 선도착 ACK 처리를 보정한다
This commit is contained in:
@@ -51,6 +51,7 @@ class DmChatRoomViewModel(
|
|||||||
private var localMessageSequence: Long = 0L
|
private var localMessageSequence: Long = 0L
|
||||||
private var requestSequence: Long = 0L
|
private var requestSequence: Long = 0L
|
||||||
private val pendingRequestLocalIds = mutableMapOf<String, String>()
|
private val pendingRequestLocalIds = mutableMapOf<String, String>()
|
||||||
|
private val recentFailedRequestLocalIds = mutableMapOf<String, String>()
|
||||||
private val pendingTimeoutDisposables = mutableMapOf<String, Disposable>()
|
private val pendingTimeoutDisposables = mutableMapOf<String, Disposable>()
|
||||||
private val mainHandler = Handler(Looper.getMainLooper())
|
private val mainHandler = Handler(Looper.getMainLooper())
|
||||||
|
|
||||||
@@ -137,6 +138,7 @@ class DmChatRoomViewModel(
|
|||||||
if (currentRoomId <= 0L) return
|
if (currentRoomId <= 0L) return
|
||||||
val requestId = nextRequestId()
|
val requestId = nextRequestId()
|
||||||
|
|
||||||
|
removeRecentFailedRequests(localId)
|
||||||
currentMessages = currentMessages.map {
|
currentMessages = currentMessages.map {
|
||||||
if (it.localId == localId) it.copy(requestId = requestId, status = DmChatMessageStatus.SENDING) else it
|
if (it.localId == localId) it.copy(requestId = requestId, status = DmChatMessageStatus.SENDING) else it
|
||||||
}
|
}
|
||||||
@@ -148,9 +150,7 @@ class DmChatRoomViewModel(
|
|||||||
}
|
}
|
||||||
|
|
||||||
fun onRealtimeMessage(message: DmChatMessageResponse) {
|
fun onRealtimeMessage(message: DmChatMessageResponse) {
|
||||||
val item = message.toUiItem() ?: return
|
handleRealtimeMessage(requestId = null, message = message)
|
||||||
currentMessages = currentMessages.mergeByMessageId(listOf(item))
|
|
||||||
emitContent()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fun syncLatestMessagesAfterReconnect() {
|
fun syncLatestMessagesAfterReconnect() {
|
||||||
@@ -364,7 +364,7 @@ class DmChatRoomViewModel(
|
|||||||
isRealtimeConnected = true
|
isRealtimeConnected = true
|
||||||
syncLatestMessagesAfterReconnect(token = token)
|
syncLatestMessagesAfterReconnect(token = token)
|
||||||
}
|
}
|
||||||
is DmChatSocketEvent.Message -> onRealtimeMessage(event.message)
|
is DmChatSocketEvent.Message -> handleRealtimeMessage(event.requestId, event.message)
|
||||||
is DmChatSocketEvent.SendAck -> handleSendAck(event.requestId, event.message)
|
is DmChatSocketEvent.SendAck -> handleSendAck(event.requestId, event.message)
|
||||||
is DmChatSocketEvent.Error -> event.requestId?.let { markPendingMessageFailed(it) }
|
is DmChatSocketEvent.Error -> event.requestId?.let { markPendingMessageFailed(it) }
|
||||||
DmChatSocketEvent.Pong -> Unit
|
DmChatSocketEvent.Pong -> Unit
|
||||||
@@ -372,7 +372,9 @@ class DmChatRoomViewModel(
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun handleSendAck(requestId: String, message: DmChatMessageResponse) {
|
private fun handleSendAck(requestId: String, message: DmChatMessageResponse) {
|
||||||
val localId = pendingRequestLocalIds.remove(requestId) ?: return
|
val localId = pendingRequestLocalIds.remove(requestId)
|
||||||
|
?: recentFailedRequestLocalIds.remove(requestId)
|
||||||
|
?: return
|
||||||
pendingTimeoutDisposables.remove(requestId)?.dispose()
|
pendingTimeoutDisposables.remove(requestId)?.dispose()
|
||||||
val sentItem = message.toUiItem()
|
val sentItem = message.toUiItem()
|
||||||
if (sentItem == null) {
|
if (sentItem == null) {
|
||||||
@@ -380,12 +382,28 @@ class DmChatRoomViewModel(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
removeRecentFailedRequests(localId)
|
||||||
currentMessages = currentMessages.map {
|
currentMessages = currentMessages.map {
|
||||||
if (it.localId == localId) sentItem.copy(localId = localId) else it
|
if (it.localId == localId) sentItem.copy(localId = localId) else it
|
||||||
}.deduplicateSentMessage(sentItem.messageId).sortByCreatedAtAndMessageId()
|
}.deduplicateSentMessage(sentItem.messageId).sortByCreatedAtAndMessageId()
|
||||||
emitContent()
|
emitContent()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun removeRecentFailedRequests(localId: String) {
|
||||||
|
recentFailedRequestLocalIds.entries.removeAll { it.value == localId }
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun handleRealtimeMessage(requestId: String?, message: DmChatMessageResponse) {
|
||||||
|
if (requestId != null && pendingRequestLocalIds.containsKey(requestId)) {
|
||||||
|
handleSendAck(requestId, message)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
val item = message.toUiItem() ?: return
|
||||||
|
currentMessages = currentMessages.mergeByMessageId(listOf(item))
|
||||||
|
emitContent()
|
||||||
|
}
|
||||||
|
|
||||||
private fun List<DmChatMessageUiItem>.deduplicateSentMessage(messageId: Long?): List<DmChatMessageUiItem> {
|
private fun List<DmChatMessageUiItem>.deduplicateSentMessage(messageId: Long?): List<DmChatMessageUiItem> {
|
||||||
if (messageId == null) return this
|
if (messageId == null) return this
|
||||||
var found = false
|
var found = false
|
||||||
@@ -399,6 +417,7 @@ class DmChatRoomViewModel(
|
|||||||
|
|
||||||
private fun markPendingMessageFailed(requestId: String) {
|
private fun markPendingMessageFailed(requestId: String) {
|
||||||
val localId = pendingRequestLocalIds.remove(requestId) ?: return
|
val localId = pendingRequestLocalIds.remove(requestId) ?: return
|
||||||
|
recentFailedRequestLocalIds[requestId] = localId
|
||||||
pendingTimeoutDisposables.remove(requestId)?.dispose()
|
pendingTimeoutDisposables.remove(requestId)?.dispose()
|
||||||
markLocalMessageFailed(localId)
|
markLocalMessageFailed(localId)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ data class DmChatSocketSendTextPayload(
|
|||||||
|
|
||||||
@Keep
|
@Keep
|
||||||
data class DmChatSocketMessagePayload(
|
data class DmChatSocketMessagePayload(
|
||||||
|
@SerializedName("requestId") val requestId: String?,
|
||||||
@SerializedName("message") val message: DmChatMessageResponse
|
@SerializedName("message") val message: DmChatMessageResponse
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -51,7 +52,10 @@ enum class DmChatSocketClientType(val value: String) {
|
|||||||
|
|
||||||
sealed class DmChatSocketEvent {
|
sealed class DmChatSocketEvent {
|
||||||
data object Joined : DmChatSocketEvent()
|
data object Joined : DmChatSocketEvent()
|
||||||
data class Message(val message: DmChatMessageResponse) : DmChatSocketEvent()
|
data class Message(
|
||||||
|
val requestId: String?,
|
||||||
|
val message: DmChatMessageResponse
|
||||||
|
) : DmChatSocketEvent()
|
||||||
data class SendAck(
|
data class SendAck(
|
||||||
val requestId: String,
|
val requestId: String,
|
||||||
val message: DmChatMessageResponse
|
val message: DmChatMessageResponse
|
||||||
@@ -85,7 +89,10 @@ class DmChatSocketParser(private val gson: Gson) {
|
|||||||
|
|
||||||
private fun parseMessage(payload: JsonObject?): DmChatSocketEvent.Message? {
|
private fun parseMessage(payload: JsonObject?): DmChatSocketEvent.Message? {
|
||||||
val messagePayload = gson.fromJson(payload, DmChatSocketMessagePayload::class.java) ?: return null
|
val messagePayload = gson.fromJson(payload, DmChatSocketMessagePayload::class.java) ?: return null
|
||||||
return DmChatSocketEvent.Message(messagePayload.message)
|
return DmChatSocketEvent.Message(
|
||||||
|
requestId = messagePayload.requestId,
|
||||||
|
message = messagePayload.message
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun parseSendAck(payload: JsonObject?): DmChatSocketEvent.SendAck? {
|
private fun parseSendAck(payload: JsonObject?): DmChatSocketEvent.SendAck? {
|
||||||
|
|||||||
@@ -388,6 +388,86 @@ class DmChatRoomViewModelTest {
|
|||||||
assertEquals(1, state.messages.size)
|
assertEquals(1, state.messages.size)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `MESSAGE requestId가 SEND_ACK보다 먼저 오면 pending local item을 확정한다`() {
|
||||||
|
api.enqueueOpenSuccess(openResponse(roomId = 10L))
|
||||||
|
viewModel.enter(roomId = 10L, creatorId = 0L)
|
||||||
|
viewModel.connectRealtime()
|
||||||
|
viewModel.sendText("선도착")
|
||||||
|
val pendingItem = (viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content)
|
||||||
|
.messages.single()
|
||||||
|
val requestId = pendingItem.requestId!!
|
||||||
|
|
||||||
|
socketFactory.emitMessage(requestId, message(messageId = 60L, mine = true, textMessage = "선도착"))
|
||||||
|
socketFactory.emitAck(requestId, message(messageId = 60L, mine = true, textMessage = "선도착"))
|
||||||
|
|
||||||
|
val state = viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content
|
||||||
|
assertEquals(listOf(60L), state.messages.map { it.messageId })
|
||||||
|
assertEquals(listOf(pendingItem.localId), state.messages.map { it.localId })
|
||||||
|
assertEquals(listOf(DmChatMessageStatus.SENT), state.messages.map { it.status })
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `MESSAGE requestId가 없으면 ACK 후도착 시 같은 messageId 중복을 남기지 않는다`() {
|
||||||
|
api.enqueueOpenSuccess(openResponse(roomId = 10L))
|
||||||
|
viewModel.enter(roomId = 10L, creatorId = 0L)
|
||||||
|
viewModel.connectRealtime()
|
||||||
|
viewModel.sendText("중복 방지")
|
||||||
|
val requestId = (viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content)
|
||||||
|
.messages.single().requestId!!
|
||||||
|
|
||||||
|
socketFactory.emitMessage(message(messageId = 61L, mine = true, textMessage = "중복 방지"))
|
||||||
|
socketFactory.emitAck(requestId, message(messageId = 61L, mine = true, textMessage = "중복 방지"))
|
||||||
|
|
||||||
|
val state = viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content
|
||||||
|
assertEquals(listOf(61L), state.messages.map { it.messageId })
|
||||||
|
assertEquals(1, state.messages.size)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `timeout 실패 후 늦은 SEND_ACK가 오면 같은 local item을 성공으로 복구한다`() {
|
||||||
|
api.enqueueOpenSuccess(openResponse(roomId = 10L))
|
||||||
|
viewModel.enter(roomId = 10L, creatorId = 0L)
|
||||||
|
viewModel.connectRealtime()
|
||||||
|
viewModel.sendText("늦은 ACK")
|
||||||
|
val pendingItem = (viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content)
|
||||||
|
.messages.single()
|
||||||
|
val requestId = pendingItem.requestId!!
|
||||||
|
|
||||||
|
reconnectScheduler.advanceTimeBy(10L, TimeUnit.SECONDS)
|
||||||
|
shadowOf(Looper.getMainLooper()).idle()
|
||||||
|
socketFactory.emitAck(requestId, message(messageId = 62L, mine = true, textMessage = "늦은 ACK"))
|
||||||
|
|
||||||
|
val state = viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content
|
||||||
|
assertEquals(listOf(62L), state.messages.map { it.messageId })
|
||||||
|
assertEquals(listOf(pendingItem.localId), state.messages.map { it.localId })
|
||||||
|
assertEquals(listOf(DmChatMessageStatus.SENT), state.messages.map { it.status })
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `retry 성공 후 이전 timeout request ACK는 같은 local item을 덮어쓰지 않는다`() {
|
||||||
|
api.enqueueOpenSuccess(openResponse(roomId = 10L))
|
||||||
|
viewModel.enter(roomId = 10L, creatorId = 0L)
|
||||||
|
viewModel.connectRealtime()
|
||||||
|
viewModel.sendText("재시도 ACK")
|
||||||
|
val firstState = viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content
|
||||||
|
val failedItem = firstState.messages.single()
|
||||||
|
val firstRequestId = failedItem.requestId!!
|
||||||
|
|
||||||
|
reconnectScheduler.advanceTimeBy(10L, TimeUnit.SECONDS)
|
||||||
|
shadowOf(Looper.getMainLooper()).idle()
|
||||||
|
viewModel.retry(failedItem.localId!!)
|
||||||
|
val retryRequestId = (viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content)
|
||||||
|
.messages.single().requestId!!
|
||||||
|
socketFactory.emitAck(retryRequestId, message(messageId = 70L, mine = true, textMessage = "최신 ACK"))
|
||||||
|
socketFactory.emitAck(firstRequestId, message(messageId = 69L, mine = true, textMessage = "이전 ACK"))
|
||||||
|
|
||||||
|
val state = viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content
|
||||||
|
assertEquals(listOf(70L), state.messages.map { it.messageId })
|
||||||
|
assertEquals(listOf("최신 ACK"), state.messages.map { it.textMessage })
|
||||||
|
assertEquals(listOf(failedItem.localId), state.messages.map { it.localId })
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `재연결 후 최신 메시지 동기화는 getMessages 결과를 병합한다`() {
|
fun `재연결 후 최신 메시지 동기화는 getMessages 결과를 병합한다`() {
|
||||||
api.enqueueOpenSuccess(openResponse(roomId = 10L, messages = listOf(message(messageId = 1L, textMessage = "기존"))))
|
api.enqueueOpenSuccess(openResponse(roomId = 10L, messages = listOf(message(messageId = 1L, textMessage = "기존"))))
|
||||||
@@ -621,20 +701,6 @@ class DmChatRoomViewModelTest {
|
|||||||
assertTrue(socketFactory.closeCount >= 1)
|
assertTrue(socketFactory.closeCount >= 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
fun `realtime leave 중 중복 요청은 close를 반복할 수 있다`() {
|
|
||||||
api.enqueueOpenSuccess(openResponse(roomId = 10L))
|
|
||||||
viewModel.enter(roomId = 10L, creatorId = 0L)
|
|
||||||
|
|
||||||
viewModel.leaveRealtime()
|
|
||||||
viewModel.leaveRealtime()
|
|
||||||
|
|
||||||
assertEquals(0, socketFactory.closeCount)
|
|
||||||
viewModel.leaveRealtime()
|
|
||||||
|
|
||||||
assertEquals(0, socketFactory.closeCount)
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `leave 후 다시 background로 가면 새 소켓도 close한다`() {
|
fun `leave 후 다시 background로 가면 새 소켓도 close한다`() {
|
||||||
api.enqueueOpenSuccess(openResponse(roomId = 10L))
|
api.enqueueOpenSuccess(openResponse(roomId = 10L))
|
||||||
@@ -883,6 +949,14 @@ class FakeWebSocketFactory {
|
|||||||
webSocketListener?.onMessage(webSocket, "{\"type\":\"MESSAGE\",\"payload\":{\"message\":$json}}")
|
webSocketListener?.onMessage(webSocket, "{\"type\":\"MESSAGE\",\"payload\":{\"message\":$json}}")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fun emitMessage(requestId: String, message: DmChatMessageResponse) {
|
||||||
|
val json = Gson().toJson(message)
|
||||||
|
webSocketListener?.onMessage(
|
||||||
|
webSocket,
|
||||||
|
"{\"type\":\"MESSAGE\",\"payload\":{\"requestId\":\"$requestId\",\"message\":$json}}"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
fun emitAck(requestId: String, message: DmChatMessageResponse) {
|
fun emitAck(requestId: String, message: DmChatMessageResponse) {
|
||||||
val json = Gson().toJson(message)
|
val json = Gson().toJson(message)
|
||||||
webSocketListener?.onMessage(
|
webSocketListener?.onMessage(
|
||||||
|
|||||||
@@ -42,6 +42,26 @@ class DmChatSocketParserTest {
|
|||||||
assertEquals("안녕하세요", message.textMessage)
|
assertEquals("안녕하세요", message.textMessage)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `MESSAGE type은 nullable requestId를 보존한다`() {
|
||||||
|
val event = parser.parse(
|
||||||
|
"""
|
||||||
|
{
|
||||||
|
"type": "MESSAGE",
|
||||||
|
"payload": {
|
||||||
|
"requestId": "request-1",
|
||||||
|
"message": ${messageJson(messageId = 12L, textMessage = "선도착")}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
""".trimIndent()
|
||||||
|
)
|
||||||
|
|
||||||
|
val messageEvent = event as? DmChatSocketEvent.Message
|
||||||
|
requireNotNull(messageEvent)
|
||||||
|
assertEquals("request-1", messageEvent.requestId)
|
||||||
|
assertEquals(12L, messageEvent.message.messageId)
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `SEND_ACK type은 requestId와 서버 확정 메시지로 파싱된다`() {
|
fun `SEND_ACK type은 requestId와 서버 확정 메시지로 파싱된다`() {
|
||||||
val event = parser.parse(
|
val event = parser.parse(
|
||||||
|
|||||||
Reference in New Issue
Block a user