diff --git a/app/src/main/java/kr/co/vividnext/sodalive/v2/main/chat/dm/DmChatRoomViewModel.kt b/app/src/main/java/kr/co/vividnext/sodalive/v2/main/chat/dm/DmChatRoomViewModel.kt index 8756e46d..0f57361c 100644 --- a/app/src/main/java/kr/co/vividnext/sodalive/v2/main/chat/dm/DmChatRoomViewModel.kt +++ b/app/src/main/java/kr/co/vividnext/sodalive/v2/main/chat/dm/DmChatRoomViewModel.kt @@ -4,11 +4,14 @@ import androidx.lifecycle.LiveData import androidx.lifecycle.MutableLiveData import com.orhanobut.logger.Logger import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers +import io.reactivex.rxjava3.core.Scheduler +import io.reactivex.rxjava3.disposables.Disposable import io.reactivex.rxjava3.schedulers.Schedulers import kr.co.vividnext.sodalive.base.BaseViewModel import kr.co.vividnext.sodalive.common.ApiResponse import kr.co.vividnext.sodalive.common.SharedPreferenceManager import kr.co.vividnext.sodalive.v2.main.chat.dm.data.CreateDmChatRoomResponse +import kr.co.vividnext.sodalive.v2.main.chat.dm.data.DmChatEventClient import kr.co.vividnext.sodalive.v2.main.chat.dm.data.DmChatMessageResponse import kr.co.vividnext.sodalive.v2.main.chat.dm.data.DmChatMessagesPageResponse import kr.co.vividnext.sodalive.v2.main.chat.dm.data.DmChatRepository @@ -21,9 +24,12 @@ import kr.co.vividnext.sodalive.v2.main.chat.dm.model.mergeByMessageId import kr.co.vividnext.sodalive.v2.main.chat.dm.model.sortByCreatedAtAndMessageId import kr.co.vividnext.sodalive.v2.main.chat.dm.model.toUiItem import kr.co.vividnext.sodalive.v2.main.chat.dm.model.toUiItems +import java.util.concurrent.TimeUnit class DmChatRoomViewModel( - private val repository: DmChatRepository + private val repository: DmChatRepository, + private val reconnectScheduler: Scheduler = Schedulers.io(), + private val tokenProvider: () -> String = { SharedPreferenceManager.token } ) : BaseViewModel() { private var currentRoomId: Long = 0L @@ -34,6 +40,12 @@ class DmChatRoomViewModel( private var nextCursor: Long? = null private var isLoadingOlder: Boolean = false private var isSending: Boolean = false + private var isRealtimeConnected: Boolean = false + private var shouldReconnectRealtime: Boolean = false + private var currentAuthToken: String = "" + private var currentRealtimeToken: String = "" + private var isDisconnecting: Boolean = false + private var reconnectDisposable: Disposable? = null private var localMessageSequence: Long = 0L private val _chatRoomStateLiveData = MutableLiveData() @@ -48,6 +60,10 @@ class DmChatRoomViewModel( val prependedMessageCountLiveData: LiveData get() = _prependedMessageCountLiveData + private val _roomOpenedEventLiveData = MutableLiveData>() + val roomOpenedEventLiveData: LiveData> + get() = _roomOpenedEventLiveData + fun enter(roomId: Long, creatorId: Long) { when { roomId > 0L -> openRoom(roomId) @@ -127,12 +143,16 @@ class DmChatRoomViewModel( } fun syncLatestMessagesAfterReconnect() { + syncLatestMessagesAfterReconnect(token = authToken()) + } + + private fun syncLatestMessagesAfterReconnect(token: String) { val roomId = currentRoomId if (roomId <= 0L) return compositeDisposable.add( repository.getMessages( - token = authToken(), + token = token, roomId = roomId, cursor = null ) @@ -151,6 +171,90 @@ class DmChatRoomViewModel( ) } + fun connectRealtime() { + connectRealtime(token = authToken()) + } + + private fun connectRealtime(token: String) { + val roomId = currentRoomId + if (roomId <= 0L || isRealtimeConnected || !shouldReconnectRealtime && currentRealtimeToken.isNotEmpty()) return + + currentRealtimeToken = token + isRealtimeConnected = true + shouldReconnectRealtime = true + reconnectDisposable?.dispose() + reconnectDisposable = null + repository.connectRealtime( + token = token, + roomId = roomId, + listener = object : DmChatEventClient.Listener { + override fun onConnected() { + scheduleRealtimeCallback { syncLatestMessagesAfterReconnect(token = token) } + } + + override fun onMessage(message: DmChatMessageResponse) { + scheduleRealtimeCallback { onRealtimeMessage(message) } + } + + override fun onFailure(throwable: Throwable) { + scheduleRealtimeCallback { + isRealtimeConnected = false + throwable.message?.let { Logger.e(it) } + scheduleRealtimeReconnect() + } + } + } + ) + } + + fun disconnectRealtime() { + val roomId = currentRoomId + if (roomId <= 0L) return + + shouldReconnectRealtime = false + currentRealtimeToken = "" + isRealtimeConnected = false + reconnectDisposable?.dispose() + reconnectDisposable = null + repository.cancelRealtime() + if (isDisconnecting) return + + isDisconnecting = true + compositeDisposable.add( + repository.disconnectRealtime(token = authToken(), roomId = roomId) + .subscribeOn(Schedulers.io()) + .observeOn(AndroidSchedulers.mainThread()) + .subscribe( + { isDisconnecting = false }, + { + isDisconnecting = false + it.message?.let { message -> Logger.e(message) } + } + ) + ) + } + + private fun scheduleRealtimeReconnect() { + val roomId = currentRoomId + if (roomId <= 0L || !shouldReconnectRealtime) return + + reconnectDisposable?.dispose() + val token = currentRealtimeToken + reconnectDisposable = reconnectScheduler.scheduleDirect( + { + scheduleRealtimeCallback { + if (shouldReconnectRealtime) connectRealtime(token = token) + } + }, + RECONNECT_DELAY_MILLIS, + TimeUnit.MILLISECONDS + ).also { compositeDisposable.add(it) } + } + + private fun scheduleRealtimeCallback(action: () -> Unit) { + compositeDisposable.add(AndroidSchedulers.mainThread().scheduleDirect(action)) + } + private fun createRoomAndOpen(creatorId: Long) { _chatRoomStateLiveData.value = DmChatRoomUiState.Loading compositeDisposable.add( @@ -215,6 +319,7 @@ class DmChatRoomViewModel( nextCursor = data.nextCursor isLoadingOlder = false emitContent() + _roomOpenedEventLiveData.value = DmChatEvent(true) } private fun handleOlderMessagesResult(response: ApiResponse) { @@ -299,10 +404,28 @@ class DmChatRoomViewModel( return "local-$localMessageSequence" } - private fun authToken(): String = SharedPreferenceManager.token + private fun authToken(): String { + val token = tokenProvider() + if (token.isNotBlank()) currentAuthToken = token + return token.ifBlank { currentAuthToken } + } private fun ApiResponse.requireData(): CreateDmChatRoomResponse { if (success && data != null) return data throw IllegalStateException(message) } + + private companion object { + const val RECONNECT_DELAY_MILLIS = 3_000L + } +} + +class DmChatEvent(private val value: T) { + private var consumed: Boolean = false + + fun consume(): T? { + if (consumed) return null + consumed = true + return value + } } diff --git a/app/src/test/java/kr/co/vividnext/sodalive/v2/main/chat/dm/DmChatRoomViewModelTest.kt b/app/src/test/java/kr/co/vividnext/sodalive/v2/main/chat/dm/DmChatRoomViewModelTest.kt index bd7a9d03..e0c96a89 100644 --- a/app/src/test/java/kr/co/vividnext/sodalive/v2/main/chat/dm/DmChatRoomViewModelTest.kt +++ b/app/src/test/java/kr/co/vividnext/sodalive/v2/main/chat/dm/DmChatRoomViewModelTest.kt @@ -10,6 +10,7 @@ import io.reactivex.rxjava3.core.Scheduler import io.reactivex.rxjava3.core.Single import io.reactivex.rxjava3.plugins.RxJavaPlugins import io.reactivex.rxjava3.schedulers.Schedulers +import io.reactivex.rxjava3.schedulers.TestScheduler import io.reactivex.rxjava3.subjects.SingleSubject import kr.co.vividnext.sodalive.common.ApiResponse import kr.co.vividnext.sodalive.common.SharedPreferenceManager @@ -18,6 +19,8 @@ import kr.co.vividnext.sodalive.v2.main.chat.dm.data.CreateDmChatRoomResponse import kr.co.vividnext.sodalive.v2.main.chat.dm.data.DmChatApi import kr.co.vividnext.sodalive.v2.main.chat.dm.data.DmChatMessageResponse import kr.co.vividnext.sodalive.v2.main.chat.dm.data.DmChatMessagesPageResponse +import kr.co.vividnext.sodalive.v2.main.chat.dm.data.DmChatEventClient +import kr.co.vividnext.sodalive.v2.main.chat.dm.data.DmChatRealtimeClient import kr.co.vividnext.sodalive.v2.main.chat.dm.data.DmChatRepository import kr.co.vividnext.sodalive.v2.main.chat.dm.data.DmChatRoomOpenResponse import kr.co.vividnext.sodalive.v2.main.chat.dm.data.SendDmChatMessageResponse @@ -32,6 +35,7 @@ import org.junit.Test import org.junit.runner.RunWith import org.robolectric.RobolectricTestRunner import org.robolectric.annotation.Config +import java.util.concurrent.TimeUnit @RunWith(RobolectricTestRunner::class) @Config(sdk = [28], application = Application::class) @@ -39,6 +43,8 @@ class DmChatRoomViewModelTest { private val context: Context = ApplicationProvider.getApplicationContext() private lateinit var api: FakeDmChatApi + private lateinit var realtimeClient: FakeDmChatRealtimeClient + private lateinit var reconnectScheduler: TestScheduler private lateinit var viewModel: DmChatRoomViewModel @Before @@ -48,7 +54,13 @@ class DmChatRoomViewModelTest { SharedPreferenceManager.init(context) SharedPreferenceManager.token = "test-token" api = FakeDmChatApi() - viewModel = DmChatRoomViewModel(repository = DmChatRepository(api)) + realtimeClient = FakeDmChatRealtimeClient() + reconnectScheduler = TestScheduler() + viewModel = DmChatRoomViewModel( + repository = DmChatRepository(api, realtimeClient), + reconnectScheduler = reconnectScheduler, + tokenProvider = { "test-token" } + ) } @After @@ -278,6 +290,254 @@ class DmChatRoomViewModelTest { assertEquals(listOf("기존"), state.messages.map { it.textMessage }) } + @Test + fun `roomId가 없으면 realtime 연결과 disconnect를 요청하지 않는다`() { + viewModel.connectRealtime() + viewModel.disconnectRealtime() + + assertTrue(realtimeClient.connectCalls.isEmpty()) + assertEquals(0, realtimeClient.cancelCalls) + assertTrue(api.disconnectCalls.isEmpty()) + } + + @Test + fun `roomId가 있으면 realtime 연결 후 connected callback에서 최신 메시지를 동기화한다`() { + api.enqueueOpenSuccess( + openResponse( + roomId = 10L, + messages = listOf(message(messageId = 1L, textMessage = "기존")) + ) + ) + api.enqueueMessagesSuccess( + messagesPage( + messages = listOf(message(messageId = 2L, createdAt = 200L, textMessage = "동기화")) + ) + ) + viewModel.enter(roomId = 10L, creatorId = 0L) + + viewModel.connectRealtime() + realtimeClient.listener?.onConnected() + + assertEquals(listOf(RealtimeConnectCall("test-token", 10L)), realtimeClient.connectCalls) + assertEquals(listOf(MessagesCall("Bearer test-token", 10L, null, 20)), api.messagesCalls) + val state = viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content + assertEquals(listOf(1L, 2L), state.messages.map { it.messageId }) + } + + @Test + fun `realtime 연결 중 중복 connect 요청은 무시한다`() { + api.enqueueOpenSuccess(openResponse(roomId = 10L)) + viewModel.enter(roomId = 10L, creatorId = 0L) + + viewModel.connectRealtime() + viewModel.connectRealtime() + + assertEquals(listOf(RealtimeConnectCall("test-token", 10L)), realtimeClient.connectCalls) + } + + @Test + fun `openRoom 완료 시 realtime 연결 가능 이벤트를 한 번 발행한다`() { + api.enqueueOpenSuccess(openResponse(roomId = 10L)) + + viewModel.enter(roomId = 10L, creatorId = 0L) + + assertTrue(viewModel.roomOpenedEventLiveData.requireValue()?.consume() == true) + } + + @Test + fun `openRoom 완료 이벤트는 observer가 재등록되어도 한 번만 소비된다`() { + api.enqueueOpenSuccess(openResponse(roomId = 10L)) + viewModel.enter(roomId = 10L, creatorId = 0L) + + val event = viewModel.roomOpenedEventLiveData.requireValue() + + assertTrue(event?.consume() == true) + assertTrue(event?.consume() == null) + } + + @Test + fun `disconnect 진행 중 빠른 reconnect 시 crash 없이 connect를 허용한다`() { + val pendingDisconnect = SingleSubject.create>() + api.enqueueOpenSuccess(openResponse(roomId = 10L)) + api.enqueueDisconnect(pendingDisconnect) + viewModel.enter(roomId = 10L, creatorId = 0L) + + viewModel.connectRealtime() + viewModel.disconnectRealtime() + viewModel.connectRealtime() + + assertEquals(2, realtimeClient.connectCalls.size) + assertEquals(1, realtimeClient.cancelCalls) + assertEquals(listOf(DisconnectCall("Bearer test-token", 10L)), api.disconnectCalls) + + pendingDisconnect.onSuccess(ApiResponse(success = true, data = true)) + } + + @Test + fun `realtime message callback은 SSE 메시지를 화면 상태에 병합한다`() { + api.enqueueOpenSuccess(openResponse(roomId = 10L)) + viewModel.enter(roomId = 10L, creatorId = 0L) + + viewModel.connectRealtime() + realtimeClient.listener?.onMessage(message(messageId = 3L, textMessage = "실시간")) + + val state = viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content + assertEquals(listOf(3L), state.messages.map { it.messageId }) + assertEquals(listOf("실시간"), state.messages.map { it.textMessage }) + } + + @Test + fun `realtime listener callback은 main thread scheduler로 상태를 갱신한다`() { + val source = projectFile( + "app/src/main/java/kr/co/vividnext/sodalive/v2/main/chat/dm/DmChatRoomViewModel.kt" + ).readText() + + assertTrue(source.contains("scheduleRealtimeCallback")) + assertTrue(source.contains("AndroidSchedulers.mainThread().scheduleDirect")) + assertTrue(source.contains("scheduleRealtimeCallback { syncLatestMessagesAfterReconnect(token = token) }")) + assertTrue(source.contains("scheduleRealtimeCallback { onRealtimeMessage(message) }")) + } + + @Test + fun `SSE 실패는 3초 뒤 재연결을 예약하고 connected 후 최신 메시지를 동기화한다`() { + api.enqueueOpenSuccess(openResponse(roomId = 10L, messages = listOf(message(messageId = 1L, textMessage = "기존")))) + api.enqueueMessagesSuccess( + messagesPage(messages = listOf(message(messageId = 2L, createdAt = 200L, textMessage = "재연결"))) + ) + viewModel.enter(roomId = 10L, creatorId = 0L) + viewModel.connectRealtime() + + realtimeClient.listener?.onFailure(IllegalStateException("network")) + reconnectScheduler.advanceTimeBy(2999L, TimeUnit.MILLISECONDS) + + assertEquals(1, realtimeClient.connectCalls.size) + + reconnectScheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS) + assertEquals(2, realtimeClient.connectCalls.size) + assertEquals(RealtimeConnectCall("test-token", 10L), realtimeClient.connectCalls[1]) + + realtimeClient.listener?.onConnected() + assertEquals(listOf(MessagesCall("Bearer test-token", 10L, null, 20)), api.messagesCalls) + val state = viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content + assertEquals(listOf(1L, 2L), state.messages.map { it.messageId }) + } + + @Test + fun `SSE 실패 후 예약된 재연결은 main thread callback 경로에서 실행된다`() { + val source = projectFile( + "app/src/main/java/kr/co/vividnext/sodalive/v2/main/chat/dm/DmChatRoomViewModel.kt" + ).readText() + val compactSource = source.filterNot { it.isWhitespace() } + + assertTrue(compactSource.contains("scheduleRealtimeCallback{if(shouldReconnectRealtime)connectRealtime(token=token)}")) + assertTrue(!compactSource.contains("scheduleDirect({connectRealtime(token=token)}")) + } + + @Test + fun `반복 SSE 실패는 foreground 상태에서 3초 기본 간격으로 재연결을 유지한다`() { + api.enqueueOpenSuccess(openResponse(roomId = 10L)) + viewModel.enter(roomId = 10L, creatorId = 0L) + viewModel.connectRealtime() + + realtimeClient.listener?.onFailure(IllegalStateException("network-1")) + reconnectScheduler.advanceTimeBy(3L, TimeUnit.SECONDS) + realtimeClient.listener?.onFailure(IllegalStateException("network-2")) + reconnectScheduler.advanceTimeBy(2999L, TimeUnit.MILLISECONDS) + + assertEquals(2, realtimeClient.connectCalls.size) + + reconnectScheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS) + + assertEquals(3, realtimeClient.connectCalls.size) + } + + @Test + fun `disconnect는 예약된 SSE 재연결을 취소한다`() { + api.enqueueOpenSuccess(openResponse(roomId = 10L)) + viewModel.enter(roomId = 10L, creatorId = 0L) + viewModel.connectRealtime() + + realtimeClient.listener?.onFailure(IllegalStateException("network")) + viewModel.disconnectRealtime() + reconnectScheduler.advanceTimeBy(3L, TimeUnit.SECONDS) + + assertEquals(1, realtimeClient.connectCalls.size) + assertEquals(1, realtimeClient.cancelCalls) + assertEquals(listOf(DisconnectCall("Bearer test-token", 10L)), api.disconnectCalls) + } + + @Test + fun `예약 재연결 실행 후 main callback 전 disconnect되면 새 SSE 연결을 만들지 않는다`() { + api.enqueueOpenSuccess(openResponse(roomId = 10L)) + viewModel.enter(roomId = 10L, creatorId = 0L) + viewModel.connectRealtime() + + realtimeClient.listener?.onFailure(IllegalStateException("network")) + viewModel.disconnectRealtime() + reconnectScheduler.advanceTimeBy(3L, TimeUnit.SECONDS) + + assertEquals(listOf(RealtimeConnectCall("test-token", 10L)), realtimeClient.connectCalls) + assertEquals(1, realtimeClient.cancelCalls) + assertEquals(listOf(DisconnectCall("Bearer test-token", 10L)), api.disconnectCalls) + } + + @Test + fun `realtime disconnect 중 중복 요청은 무시하고 완료 후 다시 요청할 수 있다`() { + val pendingDisconnect = SingleSubject.create>() + api.enqueueOpenSuccess(openResponse(roomId = 10L)) + api.enqueueDisconnect(pendingDisconnect) + api.enqueueDisconnectSuccess() + viewModel.enter(roomId = 10L, creatorId = 0L) + + viewModel.disconnectRealtime() + viewModel.disconnectRealtime() + + assertEquals(2, realtimeClient.cancelCalls) + assertEquals(listOf(DisconnectCall("Bearer test-token", 10L)), api.disconnectCalls) + + pendingDisconnect.onSuccess(ApiResponse(success = true, data = true)) + viewModel.disconnectRealtime() + + assertEquals(3, realtimeClient.cancelCalls) + assertEquals(2, api.disconnectCalls.size) + } + + @Test + fun `disconnect API 진행 중 다시 background로 가면 새 SSE 연결도 cancel하고 API 중복 호출은 하지 않는다`() { + val pendingDisconnect = SingleSubject.create>() + api.enqueueOpenSuccess(openResponse(roomId = 10L)) + api.enqueueDisconnect(pendingDisconnect) + viewModel.enter(roomId = 10L, creatorId = 0L) + + viewModel.connectRealtime() + viewModel.disconnectRealtime() + viewModel.connectRealtime() + viewModel.disconnectRealtime() + + assertEquals(2, realtimeClient.cancelCalls) + assertEquals(listOf(DisconnectCall("Bearer test-token", 10L)), api.disconnectCalls) + + pendingDisconnect.onSuccess(ApiResponse(success = true, data = true)) + } + + @Test + fun `realtime disconnect 실패는 채팅 상태를 Error로 바꾸지 않는다`() { + api.enqueueOpenSuccess(openResponse(roomId = 10L, messages = listOf(message(messageId = 1L, textMessage = "기존")))) + api.enqueueDisconnect(Single.error(IllegalStateException("network"))) + viewModel.enter(roomId = 10L, creatorId = 0L) + + viewModel.disconnectRealtime() + + val state = viewModel.chatRoomStateLiveData.requireValue() as DmChatRoomUiState.Content + assertEquals(listOf(1L), state.messages.map { it.messageId }) + } + + private fun projectFile(relativePath: String): java.io.File { + val candidates = listOf(java.io.File(relativePath), java.io.File("../$relativePath")) + return candidates.firstOrNull { it.exists() } + ?: error("Project file not found: $relativePath") + } + private fun setImmediateRxSchedulers() { val trampoline = { _: Scheduler -> Schedulers.trampoline() } RxJavaPlugins.setIoSchedulerHandler(trampoline) @@ -362,16 +622,28 @@ data class SendCall( val request: SendDmTextMessageRequest ) +data class DisconnectCall( + val authHeader: String, + val roomId: Long +) + +data class RealtimeConnectCall( + val token: String, + val roomId: Long +) + class FakeDmChatApi : DmChatApi { val createCalls = mutableListOf() val openCalls = mutableListOf() val messagesCalls = mutableListOf() val sendCalls = mutableListOf() + val disconnectCalls = mutableListOf() private val createResponses = ArrayDeque>>() private val openResponses = ArrayDeque>>() private val messagesResponses = ArrayDeque>>() private val sendResponses = ArrayDeque>>() + private val disconnectResponses = ArrayDeque>>() fun enqueueCreateSuccess(response: CreateDmChatRoomResponse) { createResponses.addLast(Single.just(ApiResponse(success = true, data = response))) @@ -393,6 +665,14 @@ class FakeDmChatApi : DmChatApi { sendResponses.addLast(response) } + fun enqueueDisconnect(response: Single>) { + disconnectResponses.addLast(response) + } + + fun enqueueDisconnectSuccess() { + disconnectResponses.addLast(Single.just(ApiResponse(success = true, data = true))) + } + fun enqueueSendSuccess(message: DmChatMessageResponse) { sendResponses.addLast( Single.just( @@ -447,5 +727,28 @@ class FakeDmChatApi : DmChatApi { override fun disconnectRealtime( authHeader: String, roomId: Long - ): Single> = Single.just(ApiResponse(success = true, data = true)) + ): Single> { + disconnectCalls.add(DisconnectCall(authHeader, roomId)) + return disconnectResponses.removeFirstOrNull() ?: Single.just(ApiResponse(success = true, data = true)) + } +} + +class FakeDmChatRealtimeClient : DmChatRealtimeClient { + val connectCalls = mutableListOf() + var cancelCalls = 0 + var listener: DmChatEventClient.Listener? = null + + override fun connect( + token: String, + roomId: Long, + listener: DmChatEventClient.Listener + ) { + connectCalls.add(RealtimeConnectCall(token, roomId)) + this.listener = listener + } + + override fun cancel() { + cancelCalls += 1 + listener = null + } }