feat(chat): DM 채팅 실시간 수신을 연결한다
This commit is contained in:
@@ -4,11 +4,14 @@ import androidx.lifecycle.LiveData
|
||||
import androidx.lifecycle.MutableLiveData
|
||||
import com.orhanobut.logger.Logger
|
||||
import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers
|
||||
import io.reactivex.rxjava3.core.Scheduler
|
||||
import io.reactivex.rxjava3.disposables.Disposable
|
||||
import io.reactivex.rxjava3.schedulers.Schedulers
|
||||
import kr.co.vividnext.sodalive.base.BaseViewModel
|
||||
import kr.co.vividnext.sodalive.common.ApiResponse
|
||||
import kr.co.vividnext.sodalive.common.SharedPreferenceManager
|
||||
import kr.co.vividnext.sodalive.v2.main.chat.dm.data.CreateDmChatRoomResponse
|
||||
import kr.co.vividnext.sodalive.v2.main.chat.dm.data.DmChatEventClient
|
||||
import kr.co.vividnext.sodalive.v2.main.chat.dm.data.DmChatMessageResponse
|
||||
import kr.co.vividnext.sodalive.v2.main.chat.dm.data.DmChatMessagesPageResponse
|
||||
import kr.co.vividnext.sodalive.v2.main.chat.dm.data.DmChatRepository
|
||||
@@ -21,9 +24,12 @@ import kr.co.vividnext.sodalive.v2.main.chat.dm.model.mergeByMessageId
|
||||
import kr.co.vividnext.sodalive.v2.main.chat.dm.model.sortByCreatedAtAndMessageId
|
||||
import kr.co.vividnext.sodalive.v2.main.chat.dm.model.toUiItem
|
||||
import kr.co.vividnext.sodalive.v2.main.chat.dm.model.toUiItems
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
class DmChatRoomViewModel(
|
||||
private val repository: DmChatRepository
|
||||
private val repository: DmChatRepository,
|
||||
private val reconnectScheduler: Scheduler = Schedulers.io(),
|
||||
private val tokenProvider: () -> String = { SharedPreferenceManager.token }
|
||||
) : BaseViewModel() {
|
||||
|
||||
private var currentRoomId: Long = 0L
|
||||
@@ -34,6 +40,12 @@ class DmChatRoomViewModel(
|
||||
private var nextCursor: Long? = null
|
||||
private var isLoadingOlder: Boolean = false
|
||||
private var isSending: Boolean = false
|
||||
private var isRealtimeConnected: Boolean = false
|
||||
private var shouldReconnectRealtime: Boolean = false
|
||||
private var currentAuthToken: String = ""
|
||||
private var currentRealtimeToken: String = ""
|
||||
private var isDisconnecting: Boolean = false
|
||||
private var reconnectDisposable: Disposable? = null
|
||||
private var localMessageSequence: Long = 0L
|
||||
|
||||
private val _chatRoomStateLiveData = MutableLiveData<DmChatRoomUiState>()
|
||||
@@ -48,6 +60,10 @@ class DmChatRoomViewModel(
|
||||
val prependedMessageCountLiveData: LiveData<Int>
|
||||
get() = _prependedMessageCountLiveData
|
||||
|
||||
private val _roomOpenedEventLiveData = MutableLiveData<DmChatEvent<Boolean>>()
|
||||
val roomOpenedEventLiveData: LiveData<DmChatEvent<Boolean>>
|
||||
get() = _roomOpenedEventLiveData
|
||||
|
||||
fun enter(roomId: Long, creatorId: Long) {
|
||||
when {
|
||||
roomId > 0L -> openRoom(roomId)
|
||||
@@ -127,12 +143,16 @@ class DmChatRoomViewModel(
|
||||
}
|
||||
|
||||
fun syncLatestMessagesAfterReconnect() {
|
||||
syncLatestMessagesAfterReconnect(token = authToken())
|
||||
}
|
||||
|
||||
private fun syncLatestMessagesAfterReconnect(token: String) {
|
||||
val roomId = currentRoomId
|
||||
if (roomId <= 0L) return
|
||||
|
||||
compositeDisposable.add(
|
||||
repository.getMessages(
|
||||
token = authToken(),
|
||||
token = token,
|
||||
roomId = roomId,
|
||||
cursor = null
|
||||
)
|
||||
@@ -151,6 +171,90 @@ class DmChatRoomViewModel(
|
||||
)
|
||||
}
|
||||
|
||||
fun connectRealtime() {
|
||||
connectRealtime(token = authToken())
|
||||
}
|
||||
|
||||
private fun connectRealtime(token: String) {
|
||||
val roomId = currentRoomId
|
||||
if (roomId <= 0L || isRealtimeConnected || !shouldReconnectRealtime && currentRealtimeToken.isNotEmpty()) return
|
||||
|
||||
currentRealtimeToken = token
|
||||
isRealtimeConnected = true
|
||||
shouldReconnectRealtime = true
|
||||
reconnectDisposable?.dispose()
|
||||
reconnectDisposable = null
|
||||
repository.connectRealtime(
|
||||
token = token,
|
||||
roomId = roomId,
|
||||
listener = object : DmChatEventClient.Listener {
|
||||
override fun onConnected() {
|
||||
scheduleRealtimeCallback { syncLatestMessagesAfterReconnect(token = token) }
|
||||
}
|
||||
|
||||
override fun onMessage(message: DmChatMessageResponse) {
|
||||
scheduleRealtimeCallback { onRealtimeMessage(message) }
|
||||
}
|
||||
|
||||
override fun onFailure(throwable: Throwable) {
|
||||
scheduleRealtimeCallback {
|
||||
isRealtimeConnected = false
|
||||
throwable.message?.let { Logger.e(it) }
|
||||
scheduleRealtimeReconnect()
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
fun disconnectRealtime() {
|
||||
val roomId = currentRoomId
|
||||
if (roomId <= 0L) return
|
||||
|
||||
shouldReconnectRealtime = false
|
||||
currentRealtimeToken = ""
|
||||
isRealtimeConnected = false
|
||||
reconnectDisposable?.dispose()
|
||||
reconnectDisposable = null
|
||||
repository.cancelRealtime()
|
||||
if (isDisconnecting) return
|
||||
|
||||
isDisconnecting = true
|
||||
compositeDisposable.add(
|
||||
repository.disconnectRealtime(token = authToken(), roomId = roomId)
|
||||
.subscribeOn(Schedulers.io())
|
||||
.observeOn(AndroidSchedulers.mainThread())
|
||||
.subscribe(
|
||||
{ isDisconnecting = false },
|
||||
{
|
||||
isDisconnecting = false
|
||||
it.message?.let { message -> Logger.e(message) }
|
||||
}
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
private fun scheduleRealtimeReconnect() {
|
||||
val roomId = currentRoomId
|
||||
if (roomId <= 0L || !shouldReconnectRealtime) return
|
||||
|
||||
reconnectDisposable?.dispose()
|
||||
val token = currentRealtimeToken
|
||||
reconnectDisposable = reconnectScheduler.scheduleDirect(
|
||||
{
|
||||
scheduleRealtimeCallback {
|
||||
if (shouldReconnectRealtime) connectRealtime(token = token)
|
||||
}
|
||||
},
|
||||
RECONNECT_DELAY_MILLIS,
|
||||
TimeUnit.MILLISECONDS
|
||||
).also { compositeDisposable.add(it) }
|
||||
}
|
||||
|
||||
private fun scheduleRealtimeCallback(action: () -> Unit) {
|
||||
compositeDisposable.add(AndroidSchedulers.mainThread().scheduleDirect(action))
|
||||
}
|
||||
|
||||
private fun createRoomAndOpen(creatorId: Long) {
|
||||
_chatRoomStateLiveData.value = DmChatRoomUiState.Loading
|
||||
compositeDisposable.add(
|
||||
@@ -215,6 +319,7 @@ class DmChatRoomViewModel(
|
||||
nextCursor = data.nextCursor
|
||||
isLoadingOlder = false
|
||||
emitContent()
|
||||
_roomOpenedEventLiveData.value = DmChatEvent(true)
|
||||
}
|
||||
|
||||
private fun handleOlderMessagesResult(response: ApiResponse<DmChatMessagesPageResponse>) {
|
||||
@@ -299,10 +404,28 @@ class DmChatRoomViewModel(
|
||||
return "local-$localMessageSequence"
|
||||
}
|
||||
|
||||
private fun authToken(): String = SharedPreferenceManager.token
|
||||
private fun authToken(): String {
|
||||
val token = tokenProvider()
|
||||
if (token.isNotBlank()) currentAuthToken = token
|
||||
return token.ifBlank { currentAuthToken }
|
||||
}
|
||||
|
||||
private fun ApiResponse<CreateDmChatRoomResponse>.requireData(): CreateDmChatRoomResponse {
|
||||
if (success && data != null) return data
|
||||
throw IllegalStateException(message)
|
||||
}
|
||||
|
||||
private companion object {
|
||||
const val RECONNECT_DELAY_MILLIS = 3_000L
|
||||
}
|
||||
}
|
||||
|
||||
class DmChatEvent<out T>(private val value: T) {
|
||||
private var consumed: Boolean = false
|
||||
|
||||
fun consume(): T? {
|
||||
if (consumed) return null
|
||||
consumed = true
|
||||
return value
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user