feat(ranking): 스냅샷 갱신 관측 로그를 추가한다
This commit is contained in:
@@ -3,6 +3,7 @@ package kr.co.vividnext.sodalive.v2.ranking.adapter.out.persistence
|
||||
import kr.co.vividnext.sodalive.v2.ranking.domain.CreatorRankingScorePolicy
|
||||
import kr.co.vividnext.sodalive.v2.ranking.domain.CreatorRankingSnapshotCandidate
|
||||
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingAggregationPort
|
||||
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingAggregationResult
|
||||
import org.springframework.stereotype.Repository
|
||||
import java.time.LocalDateTime
|
||||
import javax.persistence.EntityManager
|
||||
@@ -16,16 +17,35 @@ class DefaultCreatorRankingAggregationRepository(
|
||||
override fun aggregateCandidates(
|
||||
startInclusiveUtc: LocalDateTime,
|
||||
endExclusiveUtc: LocalDateTime
|
||||
): List<CreatorRankingSnapshotCandidate> {
|
||||
return aggregateCandidateResult(startInclusiveUtc, endExclusiveUtc).candidates
|
||||
}
|
||||
|
||||
override fun aggregateCandidateResult(
|
||||
startInclusiveUtc: LocalDateTime,
|
||||
endExclusiveUtc: LocalDateTime
|
||||
): CreatorRankingAggregationResult {
|
||||
val candidates = aggregateAllCandidates(startInclusiveUtc, endExclusiveUtc)
|
||||
val includedCandidates = candidates
|
||||
.filter { candidate -> candidate.finalScore >= MINIMUM_FINAL_SCORE }
|
||||
.sortedWith(compareByDescending<CreatorRankingSnapshotCandidate> { it.finalScore }.thenBy { it.creatorId })
|
||||
|
||||
return CreatorRankingAggregationResult(
|
||||
candidates = includedCandidates,
|
||||
lowScoreExcludedCount = candidates.size - includedCandidates.size
|
||||
)
|
||||
}
|
||||
|
||||
private fun aggregateAllCandidates(
|
||||
startInclusiveUtc: LocalDateTime,
|
||||
endExclusiveUtc: LocalDateTime
|
||||
): List<CreatorRankingSnapshotCandidate> {
|
||||
val rows = entityManager.createNativeQuery(AGGREGATION_SQL)
|
||||
.setParameter("startInclusiveUtc", startInclusiveUtc)
|
||||
.setParameter("endExclusiveUtc", endExclusiveUtc)
|
||||
.resultList
|
||||
|
||||
return rows
|
||||
.map { row -> (row as Array<*>).toCandidate() }
|
||||
.filter { candidate -> candidate.finalScore >= MINIMUM_FINAL_SCORE }
|
||||
.sortedWith(compareByDescending<CreatorRankingSnapshotCandidate> { it.finalScore }.thenBy { it.creatorId })
|
||||
return rows.map { row -> (row as Array<*>).toCandidate() }
|
||||
}
|
||||
|
||||
private fun Array<*>.toCandidate(): CreatorRankingSnapshotCandidate {
|
||||
|
||||
@@ -5,10 +5,14 @@ import kr.co.vividnext.sodalive.v2.ranking.domain.CreatorRankingScorePolicy
|
||||
import kr.co.vividnext.sodalive.v2.ranking.domain.CreatorRankingSnapshotCandidate
|
||||
import kr.co.vividnext.sodalive.v2.ranking.domain.CreatorRankingUtcRange
|
||||
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingAggregationPort
|
||||
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingAggregationResult
|
||||
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingSnapshotPort
|
||||
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingSnapshotRecord
|
||||
import org.slf4j.LoggerFactory
|
||||
import org.springframework.stereotype.Service
|
||||
import org.springframework.transaction.annotation.Transactional
|
||||
import org.springframework.transaction.support.TransactionSynchronization
|
||||
import org.springframework.transaction.support.TransactionSynchronizationManager
|
||||
import java.time.ZonedDateTime
|
||||
|
||||
@Service
|
||||
@@ -16,6 +20,7 @@ class CreatorRankingSnapshotRefreshService(
|
||||
private val aggregationPort: CreatorRankingAggregationPort,
|
||||
private val snapshotPort: CreatorRankingSnapshotPort
|
||||
) {
|
||||
private val log = LoggerFactory.getLogger(javaClass)
|
||||
private val periodPolicy = CreatorRankingPeriodPolicy()
|
||||
private val scorePolicy = CreatorRankingScorePolicy()
|
||||
|
||||
@@ -26,19 +31,75 @@ class CreatorRankingSnapshotRefreshService(
|
||||
|
||||
@Transactional
|
||||
fun refreshLastCompletedWeek(now: ZonedDateTime) {
|
||||
val startedAt = System.currentTimeMillis()
|
||||
val period = periodPolicy.resolveLastCompletedWeek(now)
|
||||
val utcRange = periodPolicy.toUtcRange(period)
|
||||
val snapshots = aggregationPort.aggregateCandidates(
|
||||
startInclusiveUtc = utcRange.startInclusiveUtc,
|
||||
endExclusiveUtc = utcRange.endExclusiveUtc
|
||||
).map { it.toSnapshotRecord(utcRange) }
|
||||
.sortedByDescending { it.finalScore }
|
||||
.takeRankedBoundary(limit = SNAPSHOT_LIMIT)
|
||||
runCatching {
|
||||
val aggregationResult = aggregationPort.aggregateCandidateResult(
|
||||
startInclusiveUtc = utcRange.startInclusiveUtc,
|
||||
endExclusiveUtc = utcRange.endExclusiveUtc
|
||||
)
|
||||
val snapshots = aggregationResult.candidates.map { it.toSnapshotRecord(utcRange) }
|
||||
.sortedByDescending { it.finalScore }
|
||||
.takeRankedBoundary(limit = SNAPSHOT_LIMIT)
|
||||
|
||||
snapshotPort.replaceSnapshots(
|
||||
aggregationStartAtUtc = utcRange.startInclusiveUtc,
|
||||
aggregationEndAtUtc = utcRange.endExclusiveUtc,
|
||||
newSnapshots = snapshots
|
||||
snapshotPort.replaceSnapshots(
|
||||
aggregationStartAtUtc = utcRange.startInclusiveUtc,
|
||||
aggregationEndAtUtc = utcRange.endExclusiveUtc,
|
||||
newSnapshots = snapshots
|
||||
)
|
||||
aggregationResult.toLogCounts(storedCount = snapshots.size)
|
||||
}.onSuccess { counts ->
|
||||
afterCommit {
|
||||
log.info(
|
||||
"event=creator_ranking_snapshot_refresh_success " +
|
||||
"aggregationStartAtUtc={} aggregationEndAtUtc={} " +
|
||||
"candidateCount={} storedCount={} lowScoreExcludedCount={} elapsedMs={}",
|
||||
utcRange.startInclusiveUtc,
|
||||
utcRange.endExclusiveUtc,
|
||||
counts.candidateCount,
|
||||
counts.storedCount,
|
||||
counts.lowScoreExcludedCount,
|
||||
System.currentTimeMillis() - startedAt
|
||||
)
|
||||
}
|
||||
}.onFailure { ex ->
|
||||
log.warn(
|
||||
"event=creator_ranking_snapshot_refresh_failure " +
|
||||
"aggregationStartAtUtc={} aggregationEndAtUtc={} elapsedMs={} error={}",
|
||||
utcRange.startInclusiveUtc,
|
||||
utcRange.endExclusiveUtc,
|
||||
System.currentTimeMillis() - startedAt,
|
||||
ex.message,
|
||||
ex
|
||||
)
|
||||
throw ex
|
||||
}
|
||||
}
|
||||
|
||||
private fun CreatorRankingAggregationResult.toLogCounts(storedCount: Int): RefreshLogCounts {
|
||||
return RefreshLogCounts(
|
||||
candidateCount = candidates.size,
|
||||
storedCount = storedCount,
|
||||
lowScoreExcludedCount = lowScoreExcludedCount
|
||||
)
|
||||
}
|
||||
|
||||
private data class RefreshLogCounts(
|
||||
val candidateCount: Int,
|
||||
val storedCount: Int,
|
||||
val lowScoreExcludedCount: Int
|
||||
)
|
||||
|
||||
private fun afterCommit(action: () -> Unit) {
|
||||
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
|
||||
action()
|
||||
return
|
||||
}
|
||||
TransactionSynchronizationManager.registerSynchronization(
|
||||
object : TransactionSynchronization {
|
||||
override fun afterCommit() = action()
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -8,4 +8,19 @@ interface CreatorRankingAggregationPort {
|
||||
startInclusiveUtc: LocalDateTime,
|
||||
endExclusiveUtc: LocalDateTime
|
||||
): List<CreatorRankingSnapshotCandidate>
|
||||
|
||||
fun aggregateCandidateResult(
|
||||
startInclusiveUtc: LocalDateTime,
|
||||
endExclusiveUtc: LocalDateTime
|
||||
): CreatorRankingAggregationResult {
|
||||
return CreatorRankingAggregationResult(
|
||||
candidates = aggregateCandidates(startInclusiveUtc, endExclusiveUtc),
|
||||
lowScoreExcludedCount = 0
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
data class CreatorRankingAggregationResult(
|
||||
val candidates: List<CreatorRankingSnapshotCandidate>,
|
||||
val lowScoreExcludedCount: Int
|
||||
)
|
||||
|
||||
@@ -3,20 +3,27 @@ package kr.co.vividnext.sodalive.v2.ranking.application
|
||||
import kr.co.vividnext.sodalive.v2.ranking.adapter.out.scheduler.CreatorRankingSnapshotScheduler
|
||||
import kr.co.vividnext.sodalive.v2.ranking.domain.CreatorRankingSnapshotCandidate
|
||||
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingAggregationPort
|
||||
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingAggregationResult
|
||||
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingSnapshotPort
|
||||
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingSnapshotRecord
|
||||
import org.junit.jupiter.api.Assertions.assertEquals
|
||||
import org.junit.jupiter.api.Assertions.assertThrows
|
||||
import org.junit.jupiter.api.DisplayName
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.junit.jupiter.api.extension.ExtendWith
|
||||
import org.mockito.Mockito
|
||||
import org.redisson.api.RLock
|
||||
import org.redisson.api.RedissonClient
|
||||
import org.springframework.boot.test.system.CapturedOutput
|
||||
import org.springframework.boot.test.system.OutputCaptureExtension
|
||||
import org.springframework.scheduling.annotation.Scheduled
|
||||
import org.springframework.transaction.support.TransactionSynchronizationManager
|
||||
import java.time.LocalDateTime
|
||||
import java.time.ZoneId
|
||||
import java.time.ZonedDateTime
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
@ExtendWith(OutputCaptureExtension::class)
|
||||
class CreatorRankingSnapshotRefreshServiceTest {
|
||||
@Test
|
||||
@DisplayName("주간 스냅샷 생성은 KST 지난 주를 UTC 조회 기간으로 변환하고 raw 지표 점수를 다시 계산해 저장한다")
|
||||
@@ -88,6 +95,79 @@ class CreatorRankingSnapshotRefreshServiceTest {
|
||||
assertEquals(listOf(2L), snapshotPort.snapshots.map { it.creatorId })
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("주간 스냅샷 생성 성공은 집계 기간과 후보/저장 수를 로그로 남긴다")
|
||||
fun shouldLogSnapshotRefreshSuccessWithPeriodAndCounts(output: CapturedOutput) {
|
||||
val aggregationPort = FakeCreatorRankingAggregationPort()
|
||||
val snapshotPort = FakeCreatorRankingSnapshotPort()
|
||||
val service = service(aggregationPort = aggregationPort, snapshotPort = snapshotPort)
|
||||
aggregationPort.candidates = listOf(
|
||||
candidate(creatorId = 1L, liveCanAmount = 100),
|
||||
candidate(creatorId = 2L, liveCanAmount = 50)
|
||||
)
|
||||
|
||||
service.refreshLastCompletedWeek(ZonedDateTime.of(2026, 6, 8, 6, 0, 0, 0, ZoneId.of("Asia/Seoul")))
|
||||
|
||||
assertEquals(true, output.out.contains("event=creator_ranking_snapshot_refresh_success"))
|
||||
assertEquals(true, output.out.contains("aggregationStartAtUtc=2026-05-31T15:00"))
|
||||
assertEquals(true, output.out.contains("aggregationEndAtUtc=2026-06-07T15:00"))
|
||||
assertEquals(true, output.out.contains("candidateCount=2"))
|
||||
assertEquals(true, output.out.contains("storedCount=2"))
|
||||
assertEquals(true, output.out.contains("lowScoreExcludedCount=0"))
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("주간 스냅샷 생성 성공 로그는 트랜잭션 커밋 후 기록한다")
|
||||
fun shouldLogSnapshotRefreshSuccessAfterTransactionCommit(output: CapturedOutput) {
|
||||
val aggregationPort = FakeCreatorRankingAggregationPort()
|
||||
val service = service(aggregationPort = aggregationPort)
|
||||
aggregationPort.candidates = listOf(candidate(creatorId = 1L, liveCanAmount = 100))
|
||||
|
||||
TransactionSynchronizationManager.initSynchronization()
|
||||
try {
|
||||
service.refreshLastCompletedWeek(ZonedDateTime.of(2026, 6, 8, 6, 0, 0, 0, ZoneId.of("Asia/Seoul")))
|
||||
|
||||
assertEquals(false, output.out.contains("event=creator_ranking_snapshot_refresh_success"))
|
||||
TransactionSynchronizationManager.getSynchronizations().forEach { it.afterCommit() }
|
||||
} finally {
|
||||
TransactionSynchronizationManager.clearSynchronization()
|
||||
}
|
||||
|
||||
assertEquals(true, output.out.contains("event=creator_ranking_snapshot_refresh_success"))
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("주간 스냅샷 생성 성공은 최종 점수 1점 미만 제외 수를 로그로 남긴다")
|
||||
fun shouldLogLowScoreExcludedCount(output: CapturedOutput) {
|
||||
val aggregationPort = FakeCreatorRankingAggregationPort()
|
||||
val service = service(aggregationPort = aggregationPort)
|
||||
aggregationPort.candidates = listOf(candidate(creatorId = 1L, liveCanAmount = 100))
|
||||
aggregationPort.lowScoreExcludedCount = 2
|
||||
|
||||
service.refreshLastCompletedWeek(ZonedDateTime.of(2026, 6, 8, 6, 0, 0, 0, ZoneId.of("Asia/Seoul")))
|
||||
|
||||
assertEquals(true, output.out.contains("candidateCount=1"))
|
||||
assertEquals(true, output.out.contains("lowScoreExcludedCount=2"))
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("주간 스냅샷 생성 실패는 집계 기간과 에러를 로그로 남기고 예외를 전파한다")
|
||||
fun shouldLogSnapshotRefreshFailureWithPeriodAndError(output: CapturedOutput) {
|
||||
val aggregationPort = FakeCreatorRankingAggregationPort()
|
||||
val service = service(aggregationPort = aggregationPort)
|
||||
aggregationPort.failure = IllegalStateException("aggregate failed")
|
||||
|
||||
val exception = assertThrows(IllegalStateException::class.java) {
|
||||
service.refreshLastCompletedWeek(ZonedDateTime.of(2026, 6, 8, 6, 0, 0, 0, ZoneId.of("Asia/Seoul")))
|
||||
}
|
||||
|
||||
assertEquals("aggregate failed", exception.message)
|
||||
assertEquals(true, output.out.contains("event=creator_ranking_snapshot_refresh_failure"))
|
||||
assertEquals(true, output.out.contains("aggregationStartAtUtc=2026-05-31T15:00"))
|
||||
assertEquals(true, output.out.contains("aggregationEndAtUtc=2026-06-07T15:00"))
|
||||
assertEquals(true, output.out.contains("error=aggregate failed"))
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("주간 스냅샷 스케줄러는 매주 월요일 07:30 KST cron으로 갱신 서비스를 호출한다")
|
||||
fun shouldScheduleWeeklySnapshotRefreshAtKstMondaySevenThirty() {
|
||||
@@ -194,6 +274,8 @@ class CreatorRankingSnapshotRefreshServiceTest {
|
||||
|
||||
private class FakeCreatorRankingAggregationPort : CreatorRankingAggregationPort {
|
||||
var candidates: List<CreatorRankingSnapshotCandidate> = emptyList()
|
||||
var lowScoreExcludedCount: Int = 0
|
||||
var failure: RuntimeException? = null
|
||||
var startInclusiveUtc: LocalDateTime? = null
|
||||
var endExclusiveUtc: LocalDateTime? = null
|
||||
|
||||
@@ -203,8 +285,22 @@ private class FakeCreatorRankingAggregationPort : CreatorRankingAggregationPort
|
||||
): List<CreatorRankingSnapshotCandidate> {
|
||||
this.startInclusiveUtc = startInclusiveUtc
|
||||
this.endExclusiveUtc = endExclusiveUtc
|
||||
failure?.let { throw it }
|
||||
return candidates
|
||||
}
|
||||
|
||||
override fun aggregateCandidateResult(
|
||||
startInclusiveUtc: LocalDateTime,
|
||||
endExclusiveUtc: LocalDateTime
|
||||
): CreatorRankingAggregationResult {
|
||||
this.startInclusiveUtc = startInclusiveUtc
|
||||
this.endExclusiveUtc = endExclusiveUtc
|
||||
failure?.let { throw it }
|
||||
return CreatorRankingAggregationResult(
|
||||
candidates = candidates,
|
||||
lowScoreExcludedCount = lowScoreExcludedCount
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
private class FakeCreatorRankingSnapshotPort : CreatorRankingSnapshotPort {
|
||||
|
||||
Reference in New Issue
Block a user