feat(chat): DM SSE 재연결 기반을 추가한다

This commit is contained in:
2026-06-11 11:16:19 +09:00
parent e19442a61a
commit 871f4e73e8
3 changed files with 64 additions and 6 deletions

View File

@@ -49,11 +49,21 @@ class DmChatEventParser(private val gson: Gson) {
} }
} }
interface DmChatRealtimeClient {
fun connect(
token: String,
roomId: Long,
listener: DmChatEventClient.Listener
)
fun cancel()
}
class DmChatEventClient( class DmChatEventClient(
private val okHttpClient: OkHttpClient, private val okHttpClient: OkHttpClient,
gson: Gson, gson: Gson,
private val baseUrl: String private val baseUrl: String
) { ) : DmChatRealtimeClient {
interface Listener { interface Listener {
fun onConnected() fun onConnected()
fun onMessage(message: DmChatMessageResponse) fun onMessage(message: DmChatMessageResponse)
@@ -67,7 +77,7 @@ class DmChatEventClient(
private var listener: Listener? = null private var listener: Listener? = null
@Synchronized @Synchronized
fun connect( override fun connect(
token: String, token: String,
roomId: Long, roomId: Long,
listener: Listener listener: Listener
@@ -105,7 +115,7 @@ class DmChatEventClient(
} }
@Synchronized @Synchronized
fun cancel() { override fun cancel() {
call?.cancel() call?.cancel()
call = null call = null
listener = null listener = null
@@ -115,7 +125,12 @@ class DmChatEventClient(
reader.use { bufferedReader -> reader.use { bufferedReader ->
val frame = StringBuilder() val frame = StringBuilder()
while (!call.isCanceled()) { while (!call.isCanceled()) {
val line = bufferedReader.readLine() ?: break val line = bufferedReader.readLine()
if (line == null) {
if (frame.isNotEmpty()) dispatch(frame.toString())
notifyStreamClosed(call)
return
}
if (line.isBlank()) { if (line.isBlank()) {
dispatch(frame.toString()) dispatch(frame.toString())
frame.clear() frame.clear()
@@ -123,10 +138,13 @@ class DmChatEventClient(
frame.append(line).append('\n') frame.append(line).append('\n')
} }
} }
if (frame.isNotEmpty()) dispatch(frame.toString())
} }
} }
private fun notifyStreamClosed(call: Call) {
if (!call.isCanceled()) listener?.onFailure(IOException("SSE stream closed"))
}
private fun dispatch(frame: String) { private fun dispatch(frame: String) {
when (val event = parser.parse(frame)) { when (val event = parser.parse(frame)) {
DmChatEventParser.Event.Connected -> listener?.onConnected() DmChatEventParser.Event.Connected -> listener?.onConnected()

View File

@@ -3,7 +3,10 @@ package kr.co.vividnext.sodalive.v2.main.chat.dm.data
import io.reactivex.rxjava3.core.Single import io.reactivex.rxjava3.core.Single
import kr.co.vividnext.sodalive.common.ApiResponse import kr.co.vividnext.sodalive.common.ApiResponse
class DmChatRepository(private val api: DmChatApi) { class DmChatRepository(
private val api: DmChatApi,
private val realtimeClient: DmChatRealtimeClient? = null
) {
fun createOrGetRoom( fun createOrGetRoom(
token: String, token: String,
creatorId: Long creatorId: Long
@@ -52,6 +55,18 @@ class DmChatRepository(private val api: DmChatApi) {
roomId = roomId roomId = roomId
) )
fun connectRealtime(
token: String,
roomId: Long,
listener: DmChatEventClient.Listener
) {
realtimeClient?.connect(token = token, roomId = roomId, listener = listener)
}
fun cancelRealtime() {
realtimeClient?.cancel()
}
private fun bearer(token: String) = "Bearer $token" private fun bearer(token: String) = "Bearer $token"
private companion object { private companion object {

View File

@@ -66,6 +66,31 @@ class DmChatEventClientTest {
assertEquals("안녕하세요", receivedMessage?.textMessage) assertEquals("안녕하세요", receivedMessage?.textMessage)
} }
@Test
fun `취소되지 않은 SSE stream이 EOF로 종료되면 failure callback으로 전달된다`() {
val failureLatch = CountDownLatch(1)
var failure: Throwable? = null
val client = clientWithResponse(
code = 200,
body = "event: connected\n\n"
)
client.connect(
token = "test-token",
roomId = 10L,
listener = object : TestListener() {
override fun onFailure(throwable: Throwable) {
failure = throwable
failureLatch.countDown()
}
}
)
failureLatch.await(2, TimeUnit.SECONDS)
assertNotNull(failure)
assertEquals("SSE stream closed", failure?.message)
}
private fun clientWithResponse( private fun clientWithResponse(
code: Int, code: Int,
body: String body: String