feat(chat): DM 채팅 SSE 클라이언트를 추가한다
This commit is contained in:
@@ -0,0 +1,146 @@
|
|||||||
|
package kr.co.vividnext.sodalive.v2.main.chat.dm.data
|
||||||
|
|
||||||
|
import com.google.gson.Gson
|
||||||
|
import com.google.gson.JsonSyntaxException
|
||||||
|
import okhttp3.Call
|
||||||
|
import okhttp3.Callback
|
||||||
|
import okhttp3.OkHttpClient
|
||||||
|
import okhttp3.Request
|
||||||
|
import okhttp3.Response
|
||||||
|
import java.io.IOException
|
||||||
|
|
||||||
|
class DmChatEventParser(private val gson: Gson) {
|
||||||
|
sealed class Event {
|
||||||
|
data object Connected : Event()
|
||||||
|
data class Message(val message: DmChatMessageResponse) : Event()
|
||||||
|
}
|
||||||
|
|
||||||
|
fun parse(frame: String): Event? {
|
||||||
|
val lines = frame.lineSequence().filter { it.isNotBlank() }
|
||||||
|
var eventName: String? = null
|
||||||
|
val dataLines = mutableListOf<String>()
|
||||||
|
|
||||||
|
lines.forEach { line ->
|
||||||
|
when {
|
||||||
|
line.startsWith("event:") -> eventName = line.substringAfter(':').trim()
|
||||||
|
line.startsWith("data:") -> dataLines += line.substringAfter(':').removeSingleLeadingSpace()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return when (eventName) {
|
||||||
|
EVENT_CONNECTED -> Event.Connected
|
||||||
|
EVENT_MESSAGE -> parseMessage(dataLines.joinToString(separator = "\n"))
|
||||||
|
else -> null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun parseMessage(data: String): Event.Message? = try {
|
||||||
|
Event.Message(gson.fromJson(data, DmChatMessageResponse::class.java))
|
||||||
|
} catch (e: JsonSyntaxException) {
|
||||||
|
null
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun String.removeSingleLeadingSpace(): String =
|
||||||
|
if (startsWith(' ')) drop(1) else this
|
||||||
|
|
||||||
|
private companion object {
|
||||||
|
const val EVENT_CONNECTED = "connected"
|
||||||
|
const val EVENT_MESSAGE = "message"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class DmChatEventClient(
|
||||||
|
private val okHttpClient: OkHttpClient,
|
||||||
|
gson: Gson,
|
||||||
|
private val baseUrl: String
|
||||||
|
) {
|
||||||
|
interface Listener {
|
||||||
|
fun onConnected()
|
||||||
|
fun onMessage(message: DmChatMessageResponse)
|
||||||
|
fun onFailure(throwable: Throwable)
|
||||||
|
}
|
||||||
|
|
||||||
|
private val parser = DmChatEventParser(gson)
|
||||||
|
private var call: Call? = null
|
||||||
|
|
||||||
|
@Volatile
|
||||||
|
private var listener: Listener? = null
|
||||||
|
|
||||||
|
@Synchronized
|
||||||
|
fun connect(
|
||||||
|
token: String,
|
||||||
|
roomId: Long,
|
||||||
|
listener: Listener
|
||||||
|
) {
|
||||||
|
cancel()
|
||||||
|
this.listener = listener
|
||||||
|
|
||||||
|
val request = Request.Builder()
|
||||||
|
.url(eventsUrl(roomId))
|
||||||
|
.header(HEADER_AUTHORIZATION, bearer(token))
|
||||||
|
.build()
|
||||||
|
|
||||||
|
call = okHttpClient.newCall(request).also { newCall ->
|
||||||
|
newCall.enqueue(object : Callback {
|
||||||
|
override fun onFailure(call: Call, e: IOException) {
|
||||||
|
if (!call.isCanceled()) listener.onFailure(e)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun onResponse(call: Call, response: Response) {
|
||||||
|
response.use { usedResponse ->
|
||||||
|
if (!usedResponse.isSuccessful) {
|
||||||
|
if (!call.isCanceled()) listener.onFailure(IOException("Unexpected code ${usedResponse.code}"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
val body = usedResponse.body ?: return
|
||||||
|
try {
|
||||||
|
readFrames(call, body.charStream().buffered())
|
||||||
|
} catch (e: IOException) {
|
||||||
|
if (!call.isCanceled()) listener.onFailure(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Synchronized
|
||||||
|
fun cancel() {
|
||||||
|
call?.cancel()
|
||||||
|
call = null
|
||||||
|
listener = null
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun readFrames(call: Call, reader: java.io.BufferedReader) {
|
||||||
|
reader.use { bufferedReader ->
|
||||||
|
val frame = StringBuilder()
|
||||||
|
while (!call.isCanceled()) {
|
||||||
|
val line = bufferedReader.readLine() ?: break
|
||||||
|
if (line.isBlank()) {
|
||||||
|
dispatch(frame.toString())
|
||||||
|
frame.clear()
|
||||||
|
} else {
|
||||||
|
frame.append(line).append('\n')
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (frame.isNotEmpty()) dispatch(frame.toString())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun dispatch(frame: String) {
|
||||||
|
when (val event = parser.parse(frame)) {
|
||||||
|
DmChatEventParser.Event.Connected -> listener?.onConnected()
|
||||||
|
is DmChatEventParser.Event.Message -> listener?.onMessage(event.message)
|
||||||
|
null -> Unit
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun eventsUrl(roomId: Long): String =
|
||||||
|
"${baseUrl.trimEnd('/')}/api/v2/user-creator-chat/rooms/$roomId/events"
|
||||||
|
|
||||||
|
private fun bearer(token: String) = "Bearer $token"
|
||||||
|
|
||||||
|
private companion object {
|
||||||
|
const val HEADER_AUTHORIZATION = "Authorization"
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,111 @@
|
|||||||
|
package kr.co.vividnext.sodalive.v2.main.chat.dm
|
||||||
|
|
||||||
|
import com.google.gson.Gson
|
||||||
|
import kr.co.vividnext.sodalive.v2.main.chat.dm.data.DmChatEventClient
|
||||||
|
import kr.co.vividnext.sodalive.v2.main.chat.dm.data.DmChatMessageResponse
|
||||||
|
import okhttp3.MediaType.Companion.toMediaType
|
||||||
|
import okhttp3.OkHttpClient
|
||||||
|
import okhttp3.Protocol
|
||||||
|
import okhttp3.Response
|
||||||
|
import okhttp3.ResponseBody.Companion.toResponseBody
|
||||||
|
import org.junit.Assert.assertEquals
|
||||||
|
import org.junit.Assert.assertNotNull
|
||||||
|
import org.junit.Test
|
||||||
|
import java.util.concurrent.CountDownLatch
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
|
class DmChatEventClientTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `비정상 HTTP 응답은 failure callback으로 전달된다`() {
|
||||||
|
val failureLatch = CountDownLatch(1)
|
||||||
|
var failure: Throwable? = null
|
||||||
|
val client = clientWithResponse(
|
||||||
|
code = 500,
|
||||||
|
body = "server error"
|
||||||
|
)
|
||||||
|
|
||||||
|
client.connect(
|
||||||
|
token = "test-token",
|
||||||
|
roomId = 10L,
|
||||||
|
listener = object : TestListener() {
|
||||||
|
override fun onFailure(throwable: Throwable) {
|
||||||
|
failure = throwable
|
||||||
|
failureLatch.countDown()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
failureLatch.await(2, TimeUnit.SECONDS)
|
||||||
|
assertNotNull(failure)
|
||||||
|
assertEquals("Unexpected code 500", failure?.message)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `trailing blank line 없이 종료된 마지막 frame도 message callback으로 전달된다`() {
|
||||||
|
val messageLatch = CountDownLatch(1)
|
||||||
|
var receivedMessage: DmChatMessageResponse? = null
|
||||||
|
val client = clientWithResponse(
|
||||||
|
code = 200,
|
||||||
|
body = "event: message\ndata: ${messageJson()}"
|
||||||
|
)
|
||||||
|
|
||||||
|
client.connect(
|
||||||
|
token = "test-token",
|
||||||
|
roomId = 10L,
|
||||||
|
listener = object : TestListener() {
|
||||||
|
override fun onMessage(message: DmChatMessageResponse) {
|
||||||
|
receivedMessage = message
|
||||||
|
messageLatch.countDown()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
messageLatch.await(2, TimeUnit.SECONDS)
|
||||||
|
assertEquals(10L, receivedMessage?.messageId)
|
||||||
|
assertEquals("안녕하세요", receivedMessage?.textMessage)
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun clientWithResponse(
|
||||||
|
code: Int,
|
||||||
|
body: String
|
||||||
|
): DmChatEventClient {
|
||||||
|
val okHttpClient = OkHttpClient.Builder()
|
||||||
|
.addInterceptor { chain ->
|
||||||
|
Response.Builder()
|
||||||
|
.request(chain.request())
|
||||||
|
.protocol(Protocol.HTTP_1_1)
|
||||||
|
.code(code)
|
||||||
|
.message("test")
|
||||||
|
.body(body.toResponseBody("text/event-stream".toMediaType()))
|
||||||
|
.build()
|
||||||
|
}
|
||||||
|
.build()
|
||||||
|
return DmChatEventClient(
|
||||||
|
okHttpClient = okHttpClient,
|
||||||
|
gson = Gson(),
|
||||||
|
baseUrl = "https://example.com"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
private open class TestListener : DmChatEventClient.Listener {
|
||||||
|
override fun onConnected() = Unit
|
||||||
|
override fun onMessage(message: DmChatMessageResponse) = Unit
|
||||||
|
override fun onFailure(throwable: Throwable) = Unit
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun messageJson(): String =
|
||||||
|
"""
|
||||||
|
{
|
||||||
|
"messageId": 10,
|
||||||
|
"messageType": "TEXT",
|
||||||
|
"mine": false,
|
||||||
|
"createdAt": 1000,
|
||||||
|
"textMessage": "안녕하세요",
|
||||||
|
"voiceMessageUrl": null,
|
||||||
|
"senderId": 20,
|
||||||
|
"senderNickname": "크리에이터",
|
||||||
|
"senderProfileImageUrl": "https://example.com/profile.png"
|
||||||
|
}
|
||||||
|
""".trimIndent().replace("\n", "")
|
||||||
|
}
|
||||||
@@ -0,0 +1,83 @@
|
|||||||
|
package kr.co.vividnext.sodalive.v2.main.chat.dm
|
||||||
|
|
||||||
|
import com.google.gson.Gson
|
||||||
|
import kr.co.vividnext.sodalive.v2.main.chat.dm.data.DmChatEventParser
|
||||||
|
import kr.co.vividnext.sodalive.v2.main.chat.dm.data.DmChatMessageResponse
|
||||||
|
import org.junit.Assert.assertEquals
|
||||||
|
import org.junit.Assert.assertNull
|
||||||
|
import org.junit.Test
|
||||||
|
|
||||||
|
class DmChatEventParserTest {
|
||||||
|
|
||||||
|
private val parser = DmChatEventParser(Gson())
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `connected 이벤트는 연결 이벤트로 파싱된다`() {
|
||||||
|
val event = parser.parse("event: connected\ndata: {}\n\n")
|
||||||
|
|
||||||
|
assertEquals(DmChatEventParser.Event.Connected, event)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `message 이벤트는 DM 메시지로 파싱된다`() {
|
||||||
|
val event = parser.parse("event: message\ndata: ${messageJson()}\n\n")
|
||||||
|
|
||||||
|
val message = requireMessage(event)
|
||||||
|
assertEquals(10L, message.messageId)
|
||||||
|
assertEquals("안녕하세요", message.textMessage)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `여러 줄 data는 하나의 message payload로 합쳐 파싱된다`() {
|
||||||
|
val json = messageJson(textMessage = "첫줄\\n둘째줄")
|
||||||
|
val splitIndex = json.indexOf("\\\\n") + 2
|
||||||
|
val event = parser.parse(
|
||||||
|
"event: message\n" +
|
||||||
|
"data: ${json.substring(0, splitIndex)}\n" +
|
||||||
|
"data: ${json.substring(splitIndex)}\n\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
val message = requireMessage(event)
|
||||||
|
assertEquals("첫줄\n둘째줄", message.textMessage)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `data field는 콜론 뒤 공백 하나만 제거한다`() {
|
||||||
|
val json = messageJson(textMessage = " 앞 공백 유지")
|
||||||
|
val event = parser.parse("event: message\ndata: $json\n\n")
|
||||||
|
|
||||||
|
val message = requireMessage(event)
|
||||||
|
assertEquals(" 앞 공백 유지", message.textMessage)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `잘못된 JSON message 이벤트는 null로 무시된다`() {
|
||||||
|
val event = parser.parse("event: message\ndata: {not-json}\n\n")
|
||||||
|
|
||||||
|
assertNull(event)
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun requireMessage(event: DmChatEventParser.Event?): DmChatMessageResponse {
|
||||||
|
val messageEvent = event as? DmChatEventParser.Event.Message
|
||||||
|
requireNotNull(messageEvent)
|
||||||
|
return messageEvent.message
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun messageJson(
|
||||||
|
messageId: Long = 10L,
|
||||||
|
textMessage: String = "안녕하세요"
|
||||||
|
): String =
|
||||||
|
"""
|
||||||
|
{
|
||||||
|
"messageId": $messageId,
|
||||||
|
"messageType": "TEXT",
|
||||||
|
"mine": false,
|
||||||
|
"createdAt": 1000,
|
||||||
|
"textMessage": "$textMessage",
|
||||||
|
"voiceMessageUrl": null,
|
||||||
|
"senderId": 20,
|
||||||
|
"senderNickname": "크리에이터",
|
||||||
|
"senderProfileImageUrl": "https://example.com/profile.png"
|
||||||
|
}
|
||||||
|
""".trimIndent().replace("\n", "")
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user