feat(dm): 메시지 전송 pending을 requestId로 관리한다
This commit is contained in:
@@ -41,7 +41,6 @@ class DmChatRoomViewModel(
|
||||
private var hasMore: Boolean = false
|
||||
private var nextCursor: Long? = null
|
||||
private var isLoadingOlder: Boolean = false
|
||||
private var isSending: Boolean = false
|
||||
private var isRealtimeJoining: Boolean = false
|
||||
private var isRealtimeConnected: Boolean = false
|
||||
private var shouldReconnectRealtime: Boolean = false
|
||||
@@ -50,6 +49,9 @@ class DmChatRoomViewModel(
|
||||
private var currentRealtimeRoomId: Long = 0L
|
||||
private var reconnectDisposable: Disposable? = null
|
||||
private var localMessageSequence: Long = 0L
|
||||
private var requestSequence: Long = 0L
|
||||
private val pendingRequestLocalIds = mutableMapOf<String, String>()
|
||||
private val pendingTimeoutDisposables = mutableMapOf<String, Disposable>()
|
||||
private val mainHandler = Handler(Looper.getMainLooper())
|
||||
|
||||
private val _chatRoomStateLiveData = MutableLiveData<DmChatRoomUiState>()
|
||||
@@ -105,12 +107,14 @@ class DmChatRoomViewModel(
|
||||
|
||||
fun sendText(text: String) {
|
||||
val trimmed = text.trim()
|
||||
if (trimmed.isBlank() || currentRoomId <= 0L || isSending) return
|
||||
if (trimmed.isBlank() || currentRoomId <= 0L) return
|
||||
|
||||
val localId = nextLocalId()
|
||||
val requestId = nextRequestId()
|
||||
val localItem = DmChatMessageUiItem(
|
||||
messageId = null,
|
||||
localId = localId,
|
||||
requestId = requestId,
|
||||
mine = true,
|
||||
textMessage = trimmed,
|
||||
senderNickname = "",
|
||||
@@ -119,25 +123,28 @@ class DmChatRoomViewModel(
|
||||
status = DmChatMessageStatus.SENDING
|
||||
)
|
||||
currentMessages = currentMessages + localItem
|
||||
isSending = true
|
||||
pendingRequestLocalIds[requestId] = localId
|
||||
schedulePendingTimeout(requestId)
|
||||
emitContent()
|
||||
|
||||
sendLocalMessage(localId = localId, text = trimmed)
|
||||
sendLocalMessage(requestId = requestId, text = trimmed)
|
||||
}
|
||||
|
||||
fun retry(localId: String) {
|
||||
val failedItem = currentMessages.firstOrNull {
|
||||
it.localId == localId && it.status == DmChatMessageStatus.FAILED
|
||||
} ?: return
|
||||
if (isSending || currentRoomId <= 0L) return
|
||||
if (currentRoomId <= 0L) return
|
||||
val requestId = nextRequestId()
|
||||
|
||||
currentMessages = currentMessages.map {
|
||||
if (it.localId == localId) it.copy(status = DmChatMessageStatus.SENDING) else it
|
||||
if (it.localId == localId) it.copy(requestId = requestId, status = DmChatMessageStatus.SENDING) else it
|
||||
}
|
||||
isSending = true
|
||||
pendingRequestLocalIds[requestId] = localId
|
||||
schedulePendingTimeout(requestId)
|
||||
emitContent()
|
||||
|
||||
sendLocalMessage(localId = localId, text = failedItem.textMessage)
|
||||
sendLocalMessage(requestId = requestId, text = failedItem.textMessage)
|
||||
}
|
||||
|
||||
fun onRealtimeMessage(message: DmChatMessageResponse) {
|
||||
@@ -294,13 +301,24 @@ class DmChatRoomViewModel(
|
||||
)
|
||||
}
|
||||
|
||||
private fun sendLocalMessage(localId: String, text: String) {
|
||||
private fun sendLocalMessage(requestId: String, text: String) {
|
||||
val sent = repository.sendSocketText(
|
||||
roomId = currentRoomId,
|
||||
requestId = localId,
|
||||
requestId = requestId,
|
||||
textMessage = text
|
||||
)
|
||||
if (!sent) markLocalMessageFailed(localId)
|
||||
if (!sent) markPendingMessageFailed(requestId)
|
||||
}
|
||||
|
||||
private fun schedulePendingTimeout(requestId: String) {
|
||||
pendingTimeoutDisposables[requestId]?.dispose()
|
||||
val disposable = reconnectScheduler.scheduleDirect(
|
||||
{ scheduleRealtimeCallback { markPendingMessageFailed(requestId) } },
|
||||
SEND_ACK_TIMEOUT_MILLIS,
|
||||
TimeUnit.MILLISECONDS
|
||||
)
|
||||
pendingTimeoutDisposables[requestId] = disposable
|
||||
compositeDisposable.add(disposable)
|
||||
}
|
||||
|
||||
private fun handleOpenRoomResult(response: ApiResponse<DmChatRoomOpenResponse>) {
|
||||
@@ -348,21 +366,22 @@ class DmChatRoomViewModel(
|
||||
}
|
||||
is DmChatSocketEvent.Message -> onRealtimeMessage(event.message)
|
||||
is DmChatSocketEvent.SendAck -> handleSendAck(event.requestId, event.message)
|
||||
is DmChatSocketEvent.Error -> event.requestId?.let { markLocalMessageFailed(it) }
|
||||
is DmChatSocketEvent.Error -> event.requestId?.let { markPendingMessageFailed(it) }
|
||||
DmChatSocketEvent.Pong -> Unit
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleSendAck(localId: String, message: DmChatMessageResponse) {
|
||||
private fun handleSendAck(requestId: String, message: DmChatMessageResponse) {
|
||||
val localId = pendingRequestLocalIds.remove(requestId) ?: return
|
||||
pendingTimeoutDisposables.remove(requestId)?.dispose()
|
||||
val sentItem = message.toUiItem()
|
||||
if (sentItem == null) {
|
||||
markLocalMessageFailed(localId)
|
||||
return
|
||||
}
|
||||
|
||||
isSending = false
|
||||
currentMessages = currentMessages.map {
|
||||
if (it.localId == localId) sentItem else it
|
||||
if (it.localId == localId) sentItem.copy(localId = localId) else it
|
||||
}.deduplicateSentMessage(sentItem.messageId).sortByCreatedAtAndMessageId()
|
||||
emitContent()
|
||||
}
|
||||
@@ -378,8 +397,13 @@ class DmChatRoomViewModel(
|
||||
}
|
||||
}
|
||||
|
||||
private fun markPendingMessageFailed(requestId: String) {
|
||||
val localId = pendingRequestLocalIds.remove(requestId) ?: return
|
||||
pendingTimeoutDisposables.remove(requestId)?.dispose()
|
||||
markLocalMessageFailed(localId)
|
||||
}
|
||||
|
||||
private fun markLocalMessageFailed(localId: String) {
|
||||
isSending = false
|
||||
currentMessages = currentMessages.map {
|
||||
if (it.localId == localId) it.copy(status = DmChatMessageStatus.FAILED) else it
|
||||
}
|
||||
@@ -413,6 +437,11 @@ class DmChatRoomViewModel(
|
||||
return "local-$localMessageSequence"
|
||||
}
|
||||
|
||||
private fun nextRequestId(): String {
|
||||
requestSequence += 1L
|
||||
return "request-$requestSequence"
|
||||
}
|
||||
|
||||
private fun authToken(): String {
|
||||
val token = tokenProvider()
|
||||
if (token.isNotBlank()) currentAuthToken = token
|
||||
@@ -426,6 +455,7 @@ class DmChatRoomViewModel(
|
||||
|
||||
private companion object {
|
||||
const val RECONNECT_DELAY_MILLIS = 3_000L
|
||||
const val SEND_ACK_TIMEOUT_MILLIS = 10_000L
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ fun DmChatMessageResponse.toUiItem(): DmChatMessageUiItem? {
|
||||
return DmChatMessageUiItem(
|
||||
messageId = messageId,
|
||||
localId = null,
|
||||
requestId = null,
|
||||
mine = mine,
|
||||
textMessage = textMessage,
|
||||
senderNickname = senderNickname,
|
||||
|
||||
@@ -9,6 +9,7 @@ enum class DmChatMessageStatus {
|
||||
data class DmChatMessageUiItem(
|
||||
val messageId: Long?,
|
||||
val localId: String?,
|
||||
val requestId: String?,
|
||||
val mine: Boolean,
|
||||
val textMessage: String,
|
||||
val senderNickname: String,
|
||||
|
||||
@@ -33,6 +33,7 @@ import okhttp3.WebSocketListener
|
||||
import okio.ByteString
|
||||
import org.junit.After
|
||||
import org.junit.Assert.assertEquals
|
||||
import org.junit.Assert.assertNotEquals
|
||||
import org.junit.Assert.assertTrue
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
@@ -171,21 +172,21 @@ class DmChatRoomViewModelTest {
|
||||
|
||||
viewModel.sendText(" 안녕 ")
|
||||
val sendingState = viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content
|
||||
val localId = sendingState.messages.single().localId!!
|
||||
val requestId = sendingState.messages.single().requestId!!
|
||||
|
||||
assertEquals("SEND_TEXT", socketFactory.webSocket.sentJsonAt(1).get("type").asString)
|
||||
assertEquals(localId, socketFactory.webSocket.sentJsonAt(1).getAsJsonObject("payload").get("requestId").asString)
|
||||
assertEquals(requestId, socketFactory.webSocket.sentJsonAt(1).getAsJsonObject("payload").get("requestId").asString)
|
||||
assertEquals(DmChatMessageStatus.SENDING, sendingState.messages.single().status)
|
||||
assertEquals("안녕", sendingState.messages.single().textMessage)
|
||||
|
||||
socketFactory.emitAck(localId, message(messageId = 30L, mine = true, textMessage = "안녕"))
|
||||
socketFactory.emitAck(requestId, message(messageId = 30L, mine = true, textMessage = "안녕"))
|
||||
val sentState = viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content
|
||||
assertEquals(listOf(30L), sentState.messages.map { it.messageId })
|
||||
assertEquals(DmChatMessageStatus.SENT, sentState.messages.single().status)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `전송 중 새 전송 중복 요청은 무시한다`() {
|
||||
fun `전송 중 새 텍스트도 독립 pending으로 추가한다`() {
|
||||
api.enqueueOpenSuccess(openResponse(roomId = 10L))
|
||||
viewModel.enter(roomId = 10L, creatorId = 0L)
|
||||
viewModel.connectRealtime()
|
||||
@@ -193,9 +194,123 @@ class DmChatRoomViewModelTest {
|
||||
viewModel.sendText("안녕")
|
||||
viewModel.sendText("안녕")
|
||||
|
||||
assertEquals(2, socketFactory.webSocket.sentTexts.size)
|
||||
assertEquals(3, socketFactory.webSocket.sentTexts.size)
|
||||
val state = viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content
|
||||
assertEquals(1, state.messages.size)
|
||||
assertEquals(2, state.messages.size)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `서로 다른 텍스트는 각각 requestId로 독립 pending 전송한다`() {
|
||||
api.enqueueOpenSuccess(openResponse(roomId = 10L))
|
||||
viewModel.enter(roomId = 10L, creatorId = 0L)
|
||||
viewModel.connectRealtime()
|
||||
|
||||
viewModel.sendText("첫번째")
|
||||
viewModel.sendText("두번째")
|
||||
|
||||
val state = viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content
|
||||
assertEquals(listOf("첫번째", "두번째"), state.messages.map { it.textMessage })
|
||||
assertEquals(listOf(DmChatMessageStatus.SENDING, DmChatMessageStatus.SENDING), state.messages.map { it.status })
|
||||
val requestIds = state.messages.map { it.requestId }
|
||||
assertNotEquals(requestIds[0], requestIds[1])
|
||||
assertEquals("SEND_TEXT", socketFactory.webSocket.sentJsonAt(1).get("type").asString)
|
||||
assertEquals("SEND_TEXT", socketFactory.webSocket.sentJsonAt(2).get("type").asString)
|
||||
assertEquals(requestIds[0], socketFactory.webSocket.sentJsonAt(1).getAsJsonObject("payload").get("requestId").asString)
|
||||
assertEquals(requestIds[1], socketFactory.webSocket.sentJsonAt(2).getAsJsonObject("payload").get("requestId").asString)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `SEND_ACK는 requestId가 일치하는 pending만 확정한다`() {
|
||||
api.enqueueOpenSuccess(openResponse(roomId = 10L))
|
||||
viewModel.enter(roomId = 10L, creatorId = 0L)
|
||||
viewModel.connectRealtime()
|
||||
viewModel.sendText("첫번째")
|
||||
viewModel.sendText("두번째")
|
||||
val pendingState = viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content
|
||||
val firstRequestId = pendingState.messages[0].requestId!!
|
||||
|
||||
socketFactory.emitAck(firstRequestId, message(messageId = 30L, mine = true, textMessage = "첫번째"))
|
||||
|
||||
val state = viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content
|
||||
assertEquals(listOf(30L, null), state.messages.map { it.messageId })
|
||||
assertEquals(listOf(DmChatMessageStatus.SENT, DmChatMessageStatus.SENDING), state.messages.map { it.status })
|
||||
assertEquals("두번째", state.messages[1].textMessage)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `ERROR는 requestId가 일치하는 pending만 실패로 전환한다`() {
|
||||
api.enqueueOpenSuccess(openResponse(roomId = 10L))
|
||||
viewModel.enter(roomId = 10L, creatorId = 0L)
|
||||
viewModel.connectRealtime()
|
||||
viewModel.sendText("첫번째")
|
||||
viewModel.sendText("두번째")
|
||||
val pendingState = viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content
|
||||
val firstRequestId = pendingState.messages[0].requestId!!
|
||||
|
||||
socketFactory.emitError(firstRequestId)
|
||||
|
||||
val state = viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content
|
||||
assertEquals(listOf(DmChatMessageStatus.FAILED, DmChatMessageStatus.SENDING), state.messages.map { it.status })
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `pending timeout은 requestId가 일치하는 메시지만 실패로 전환한다`() {
|
||||
api.enqueueOpenSuccess(openResponse(roomId = 10L))
|
||||
viewModel.enter(roomId = 10L, creatorId = 0L)
|
||||
viewModel.connectRealtime()
|
||||
viewModel.sendText("첫번째")
|
||||
viewModel.sendText("두번째")
|
||||
val pendingState = viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content
|
||||
val secondRequestId = pendingState.messages[1].requestId!!
|
||||
|
||||
socketFactory.emitAck(secondRequestId, message(messageId = 31L, mine = true, textMessage = "두번째"))
|
||||
reconnectScheduler.advanceTimeBy(10L, TimeUnit.SECONDS)
|
||||
shadowOf(Looper.getMainLooper()).idle()
|
||||
|
||||
val state = viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content
|
||||
assertEquals(DmChatMessageStatus.FAILED, state.messages.first { it.textMessage == "첫번째" }.status)
|
||||
assertEquals(DmChatMessageStatus.SENT, state.messages.first { it.textMessage == "두번째" }.status)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `같은 requestId의 SEND_ACK 중복 수신은 첫 확정 결과만 반영한다`() {
|
||||
api.enqueueOpenSuccess(openResponse(roomId = 10L))
|
||||
viewModel.enter(roomId = 10L, creatorId = 0L)
|
||||
viewModel.connectRealtime()
|
||||
viewModel.sendText("안녕")
|
||||
val requestId = (viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content)
|
||||
.messages.single().requestId!!
|
||||
|
||||
socketFactory.emitAck(requestId, message(messageId = 41L, mine = true, textMessage = "첫 ACK"))
|
||||
socketFactory.emitAck(requestId, message(messageId = 42L, mine = true, textMessage = "중복 ACK"))
|
||||
|
||||
val state = viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content
|
||||
assertEquals(listOf(41L), state.messages.map { it.messageId })
|
||||
assertEquals(listOf("첫 ACK"), state.messages.map { it.textMessage })
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `retry는 기존 failed item을 유지하고 새 requestId로 SEND_TEXT를 전송한다`() {
|
||||
api.enqueueOpenSuccess(openResponse(roomId = 10L))
|
||||
viewModel.enter(roomId = 10L, creatorId = 0L)
|
||||
viewModel.connectRealtime()
|
||||
viewModel.sendText("재시도")
|
||||
val failedRequestId = (viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content)
|
||||
.messages.single().requestId!!
|
||||
socketFactory.emitError(failedRequestId)
|
||||
val failedItem = (viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content).messages.single()
|
||||
|
||||
viewModel.retry(failedItem.localId!!)
|
||||
|
||||
val retryState = viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content
|
||||
val retryItem = retryState.messages.single()
|
||||
assertEquals(failedItem.localId, retryItem.localId)
|
||||
assertNotEquals(failedRequestId, retryItem.requestId)
|
||||
assertEquals(DmChatMessageStatus.SENDING, retryItem.status)
|
||||
assertEquals(
|
||||
retryItem.requestId,
|
||||
socketFactory.webSocket.sentJsonAt(2).getAsJsonObject("payload").get("requestId").asString
|
||||
)
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -212,7 +327,9 @@ class DmChatRoomViewModelTest {
|
||||
|
||||
socketFactory.webSocket.sendResult = true
|
||||
viewModel.retry(failedItem.localId!!)
|
||||
socketFactory.emitAck(failedItem.localId, message(messageId = 40L, mine = true, textMessage = "안녕"))
|
||||
val retryRequestId = (viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content)
|
||||
.messages.single().requestId!!
|
||||
socketFactory.emitAck(retryRequestId, message(messageId = 40L, mine = true, textMessage = "안녕"))
|
||||
|
||||
val retriedState = viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content
|
||||
assertEquals(listOf(40L), retriedState.messages.map { it.messageId })
|
||||
@@ -231,8 +348,10 @@ class DmChatRoomViewModelTest {
|
||||
|
||||
socketFactory.webSocket.sendResult = true
|
||||
viewModel.retry(failedItem.localId!!)
|
||||
val retryRequestId = (viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content)
|
||||
.messages.single().requestId!!
|
||||
viewModel.onRealtimeMessage(message(messageId = 45L, mine = true, textMessage = "안녕"))
|
||||
socketFactory.emitAck(failedItem.localId, message(messageId = 45L, mine = true, textMessage = "안녕"))
|
||||
socketFactory.emitAck(retryRequestId, message(messageId = 45L, mine = true, textMessage = "안녕"))
|
||||
|
||||
val retriedState = viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content
|
||||
assertEquals(listOf(45L), retriedState.messages.map { it.messageId })
|
||||
@@ -259,9 +378,10 @@ class DmChatRoomViewModelTest {
|
||||
viewModel.connectRealtime()
|
||||
|
||||
viewModel.sendText("안녕")
|
||||
val localId = (viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content).messages.single().localId!!
|
||||
val requestId = (viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content)
|
||||
.messages.single().requestId!!
|
||||
viewModel.onRealtimeMessage(message(messageId = 50L, mine = true, textMessage = "안녕"))
|
||||
socketFactory.emitAck(localId, message(messageId = 50L, mine = true, textMessage = "안녕"))
|
||||
socketFactory.emitAck(requestId, message(messageId = 50L, mine = true, textMessage = "안녕"))
|
||||
|
||||
val state = viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content
|
||||
assertEquals(listOf(50L), state.messages.map { it.messageId })
|
||||
@@ -771,6 +891,13 @@ class FakeWebSocketFactory {
|
||||
)
|
||||
}
|
||||
|
||||
fun emitError(requestId: String) {
|
||||
webSocketListener?.onMessage(
|
||||
webSocket,
|
||||
"{\"type\":\"ERROR\",\"payload\":{\"requestId\":\"$requestId\",\"code\":\"SEND_FAILED\",\"message\":\"failed\"}}"
|
||||
)
|
||||
}
|
||||
|
||||
fun emitFailure(throwable: Throwable) {
|
||||
webSocketListener?.onFailure(webSocket, throwable, null)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user