From abeffb0a4f31343827185ca664ab03bb6c7e66e1 Mon Sep 17 00:00:00 2001 From: Klaus Date: Wed, 24 Jun 2026 19:02:11 +0900 Subject: [PATCH] =?UTF-8?q?feat(content-ranking):=20=EB=9E=AD=ED=82=B9=20?= =?UTF-8?q?=EC=8A=A4=EB=83=85=EC=83=B7=20job=20=EC=84=9C=EB=B9=84=EC=8A=A4?= =?UTF-8?q?=EB=A5=BC=20=EC=B6=94=EA=B0=80=ED=95=9C=EB=8B=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../AudioRankingSnapshotJobService.kt | 175 ++++++++++ .../AudioRankingSnapshotJobServiceTest.kt | 307 ++++++++++++++++++ 2 files changed, 482 insertions(+) create mode 100644 src/main/kotlin/kr/co/vividnext/sodalive/v2/content/ranking/application/AudioRankingSnapshotJobService.kt create mode 100644 src/test/kotlin/kr/co/vividnext/sodalive/v2/content/ranking/application/AudioRankingSnapshotJobServiceTest.kt diff --git a/src/main/kotlin/kr/co/vividnext/sodalive/v2/content/ranking/application/AudioRankingSnapshotJobService.kt b/src/main/kotlin/kr/co/vividnext/sodalive/v2/content/ranking/application/AudioRankingSnapshotJobService.kt new file mode 100644 index 00000000..81217d64 --- /dev/null +++ b/src/main/kotlin/kr/co/vividnext/sodalive/v2/content/ranking/application/AudioRankingSnapshotJobService.kt @@ -0,0 +1,175 @@ +package kr.co.vividnext.sodalive.v2.content.ranking.application + +import kr.co.vividnext.sodalive.v2.content.ranking.domain.AudioRankingPeriodPolicy +import kr.co.vividnext.sodalive.v2.content.ranking.domain.AudioRankingSchedulePolicy +import kr.co.vividnext.sodalive.v2.content.ranking.domain.AudioRankingType +import kr.co.vividnext.sodalive.v2.content.ranking.domain.AudioRankingUtcRange +import kr.co.vividnext.sodalive.v2.content.ranking.port.out.AudioRankingSnapshotJobPort +import kr.co.vividnext.sodalive.v2.content.ranking.port.out.AudioRankingSnapshotJobRecord +import kr.co.vividnext.sodalive.v2.content.ranking.port.out.AudioRankingSnapshotJobStatus +import kr.co.vividnext.sodalive.v2.content.ranking.port.out.AudioRankingSnapshotJobTrigger +import org.redisson.api.RedissonClient +import org.slf4j.LoggerFactory +import org.springframework.stereotype.Service +import org.springframework.transaction.PlatformTransactionManager +import org.springframework.transaction.TransactionDefinition +import org.springframework.transaction.annotation.Transactional +import org.springframework.transaction.support.TransactionTemplate +import java.time.LocalDateTime +import java.time.ZonedDateTime +import java.util.concurrent.TimeUnit + +@Service +@Transactional(readOnly = true) +class AudioRankingSnapshotJobService( + private val refreshService: AudioRankingSnapshotRefreshService, + private val jobPort: AudioRankingSnapshotJobPort, + private val redissonClient: RedissonClient, + transactionManager: PlatformTransactionManager, + private val nowProvider: () -> ZonedDateTime = { ZonedDateTime.now() } +) { + private val log = LoggerFactory.getLogger(javaClass) + private val periodPolicy = AudioRankingPeriodPolicy() + private val schedulePolicy = AudioRankingSchedulePolicy() + private val transactionTemplate = TransactionTemplate(transactionManager).also { template -> + template.propagationBehavior = TransactionDefinition.PROPAGATION_REQUIRES_NEW + } + + fun refreshLastCompletedWeekByScheduledJob(type: AudioRankingType) { + withLastCompletedWeekPeriodLock(type) { now, utcRange, visibleFromAtUtc -> + refreshLastCompletedWeek(type, now, utcRange, visibleFromAtUtc, AudioRankingSnapshotJobTrigger.SCHEDULED) + } + } + + fun refreshLastCompletedWeekByFallback(type: AudioRankingType): Boolean { + var refreshed = false + withLastCompletedWeekPeriodLock(type) { now, utcRange, visibleFromAtUtc -> + if (fallbackCountReachedLimit(type, utcRange)) return@withLastCompletedWeekPeriodLock + refreshLastCompletedWeek(type, now, utcRange, visibleFromAtUtc, AudioRankingSnapshotJobTrigger.FALLBACK) + refreshed = true + } + return refreshed + } + + private fun refreshLastCompletedWeek( + type: AudioRankingType, + now: ZonedDateTime, + utcRange: AudioRankingUtcRange, + visibleFromAtUtc: LocalDateTime, + trigger: AudioRankingSnapshotJobTrigger + ) { + val job = savePendingJob(type, utcRange, visibleFromAtUtc, trigger) + val jobId = job.id ?: return + markProcessing(jobId) + logJobStatusChanged(job, AudioRankingSnapshotJobStatus.PROCESSING) + try { + refresh(type, now) + markDone(jobId) + logJobStatusChanged(job, AudioRankingSnapshotJobStatus.DONE) + } catch (ex: Exception) { + markFailed(jobId, ex.message) + logJobStatusChanged(job, AudioRankingSnapshotJobStatus.FAILED, ex.message) + throw ex + } + } + + private fun refresh(type: AudioRankingType, now: ZonedDateTime) { + transactionTemplate.executeWithoutResult { + refreshService.refreshLastCompletedWeek(type, now) + } + } + + private fun savePendingJob( + type: AudioRankingType, + utcRange: AudioRankingUtcRange, + visibleFromAtUtc: LocalDateTime, + trigger: AudioRankingSnapshotJobTrigger + ): AudioRankingSnapshotJobRecord { + return transactionTemplate.execute { + jobPort.save( + AudioRankingSnapshotJobRecord( + rankingType = type, + aggregationStartAtUtc = utcRange.startInclusiveUtc, + aggregationEndAtUtc = utcRange.endExclusiveUtc, + visibleFromAtUtc = visibleFromAtUtc, + trigger = trigger, + status = AudioRankingSnapshotJobStatus.PENDING, + lastError = null, + processingStartedAt = null, + processedAt = null + ) + ) + }!! + } + + private fun markProcessing(jobId: Long) { + transactionTemplate.executeWithoutResult { + jobPort.markProcessing(jobId, LocalDateTime.now()) + } + } + + private fun markDone(jobId: Long) { + transactionTemplate.executeWithoutResult { + jobPort.markDone(jobId, LocalDateTime.now()) + } + } + + private fun markFailed(jobId: Long, message: String?) { + transactionTemplate.executeWithoutResult { + jobPort.markFailed(jobId, LocalDateTime.now(), message) + } + } + + private fun fallbackCountReachedLimit(type: AudioRankingType, utcRange: AudioRankingUtcRange): Boolean { + return jobPort.countByRankingTypeAndPeriodAndTrigger( + rankingType = type, + aggregationStartAtUtc = utcRange.startInclusiveUtc, + aggregationEndAtUtc = utcRange.endExclusiveUtc, + trigger = AudioRankingSnapshotJobTrigger.FALLBACK + ) >= FALLBACK_LIMIT + } + + private fun withLastCompletedWeekPeriodLock( + type: AudioRankingType, + action: (ZonedDateTime, AudioRankingUtcRange, LocalDateTime) -> Unit + ) { + val now = nowProvider() + val period = periodPolicy.resolveLastCompletedWeek(now) + val utcRange = periodPolicy.toUtcRange(period) + val visibleFromAtUtc = schedulePolicy.resolveVisibleFromAt(period.endExclusiveKst) + val lockName = "lock:content-ranking-snapshot-refresh:$type:${utcRange.startInclusiveUtc}:${utcRange.endExclusiveUtc}" + val lock = redissonClient.getLock(lockName) + + try { + if (lock.tryLock(0, -1, TimeUnit.SECONDS)) { + action(now, utcRange, visibleFromAtUtc) + } + } finally { + if (lock.isHeldByCurrentThread) { + lock.unlock() + } + } + } + + private fun logJobStatusChanged( + job: AudioRankingSnapshotJobRecord, + status: AudioRankingSnapshotJobStatus, + error: String? = null + ) { + log.info( + "event=content_ranking_snapshot_job_status_changed " + + "jobId={} rankingType={} trigger={} status={} aggregationStartAtUtc={} aggregationEndAtUtc={} error={}", + job.id, + job.rankingType, + job.trigger, + status, + job.aggregationStartAtUtc, + job.aggregationEndAtUtc, + error + ) + } + + companion object { + private const val FALLBACK_LIMIT = 3L + } +} diff --git a/src/test/kotlin/kr/co/vividnext/sodalive/v2/content/ranking/application/AudioRankingSnapshotJobServiceTest.kt b/src/test/kotlin/kr/co/vividnext/sodalive/v2/content/ranking/application/AudioRankingSnapshotJobServiceTest.kt new file mode 100644 index 00000000..ed5cad42 --- /dev/null +++ b/src/test/kotlin/kr/co/vividnext/sodalive/v2/content/ranking/application/AudioRankingSnapshotJobServiceTest.kt @@ -0,0 +1,307 @@ +package kr.co.vividnext.sodalive.v2.content.ranking.application + +import kr.co.vividnext.sodalive.v2.content.ranking.domain.AudioRankingType +import kr.co.vividnext.sodalive.v2.content.ranking.port.out.AudioRankingSnapshotJobPort +import kr.co.vividnext.sodalive.v2.content.ranking.port.out.AudioRankingSnapshotJobRecord +import kr.co.vividnext.sodalive.v2.content.ranking.port.out.AudioRankingSnapshotJobStatus +import kr.co.vividnext.sodalive.v2.content.ranking.port.out.AudioRankingSnapshotJobTrigger +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertThrows +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import org.mockito.Mockito +import org.redisson.api.RLock +import org.redisson.api.RedissonClient +import org.springframework.transaction.PlatformTransactionManager +import org.springframework.transaction.TransactionDefinition +import org.springframework.transaction.support.SimpleTransactionStatus +import java.time.LocalDateTime +import java.time.ZoneId +import java.time.ZonedDateTime +import java.util.concurrent.TimeUnit + +class AudioRankingSnapshotJobServiceTest { + @Test + fun shouldCreateScheduledJobAndMarkDoneWhenRefreshSucceeds() { + val refreshService = Mockito.mock(AudioRankingSnapshotRefreshService::class.java) + val jobPort = FakeAudioRankingSnapshotJobPort() + val now = now() + val service = service( + refreshService = refreshService, + jobPort = jobPort, + redissonClient = periodLockRedissonClient(true) + ) { now } + + service.refreshLastCompletedWeekByScheduledJob(AudioRankingType.REVENUE) + + val job = jobPort.jobs.single() + assertEquals(AudioRankingType.REVENUE, job.rankingType) + assertEquals(LocalDateTime.of(2026, 5, 31, 15, 0), job.aggregationStartAtUtc) + assertEquals(LocalDateTime.of(2026, 6, 7, 15, 0), job.aggregationEndAtUtc) + assertEquals(LocalDateTime.of(2026, 6, 8, 0, 0), job.visibleFromAtUtc) + assertEquals(AudioRankingSnapshotJobTrigger.SCHEDULED, job.trigger) + assertEquals(AudioRankingSnapshotJobStatus.DONE, job.status) + Mockito.verify(refreshService).refreshLastCompletedWeek(AudioRankingType.REVENUE, now) + } + + @Test + fun shouldMarkScheduledJobFailedWhenRefreshFails() { + val refreshService = Mockito.mock(AudioRankingSnapshotRefreshService::class.java) + val jobPort = FakeAudioRankingSnapshotJobPort() + val now = now() + Mockito.doThrow(IllegalStateException("aggregate failed")) + .`when`(refreshService).refreshLastCompletedWeek(AudioRankingType.RISING, now) + val service = service( + refreshService = refreshService, + jobPort = jobPort, + redissonClient = periodLockRedissonClient(true) + ) { now } + + val exception = assertThrows(IllegalStateException::class.java) { + service.refreshLastCompletedWeekByScheduledJob(AudioRankingType.RISING) + } + + assertEquals("aggregate failed", exception.message) + assertEquals(AudioRankingSnapshotJobStatus.FAILED, jobPort.jobs.single().status) + assertEquals("aggregate failed", jobPort.jobs.single().lastError) + } + + @Test + fun shouldCommitFailedJobStatusWhenRefreshFails() { + val refreshService = Mockito.mock(AudioRankingSnapshotRefreshService::class.java) + val jobPort = FakeAudioRankingSnapshotJobPort() + val now = now() + val transactionManager = transactionManager() + Mockito.doThrow(IllegalStateException("aggregate failed")) + .`when`(refreshService).refreshLastCompletedWeek(AudioRankingType.RISING, now) + val service = AudioRankingSnapshotJobService( + refreshService = refreshService, + jobPort = jobPort, + redissonClient = periodLockRedissonClient(true), + transactionManager = transactionManager, + nowProvider = { now } + ) + + assertThrows(IllegalStateException::class.java) { + service.refreshLastCompletedWeekByScheduledJob(AudioRankingType.RISING) + } + + Mockito.verify(transactionManager, Mockito.times(4)) + .getTransaction(Mockito.any(TransactionDefinition::class.java)) + Mockito.verify(transactionManager, Mockito.times(3)) + .commit(Mockito.any(SimpleTransactionStatus::class.java)) + Mockito.verify(transactionManager) + .rollback(Mockito.any(SimpleTransactionStatus::class.java)) + } + + @Test + fun shouldSkipScheduledJobWhenPeriodLockIsNotAcquired() { + val refreshService = Mockito.mock(AudioRankingSnapshotRefreshService::class.java) + val jobPort = FakeAudioRankingSnapshotJobPort() + val now = now() + val service = service( + refreshService = refreshService, + jobPort = jobPort, + redissonClient = periodLockRedissonClient(false) + ) { now } + + service.refreshLastCompletedWeekByScheduledJob(AudioRankingType.WEEKLY_POPULAR) + + assertTrue(jobPort.jobs.isEmpty()) + Mockito.verify(refreshService, Mockito.never()).refreshLastCompletedWeek( + AudioRankingType.WEEKLY_POPULAR, + now + ) + } + + @Test + fun shouldCreateFallbackJobWhenFallbackCountIsBelowLimit() { + val refreshService = Mockito.mock(AudioRankingSnapshotRefreshService::class.java) + val jobPort = FakeAudioRankingSnapshotJobPort() + val now = now() + val service = service( + refreshService = refreshService, + jobPort = jobPort, + redissonClient = periodLockRedissonClient(true) + ) { now } + + val refreshed = service.refreshLastCompletedWeekByFallback(AudioRankingType.LIKE_COUNT) + + assertEquals(true, refreshed) + assertEquals(AudioRankingSnapshotJobTrigger.FALLBACK, jobPort.jobs.single().trigger) + assertEquals(AudioRankingSnapshotJobStatus.DONE, jobPort.jobs.single().status) + Mockito.verify(refreshService).refreshLastCompletedWeek(AudioRankingType.LIKE_COUNT, now) + } + + @Test + fun shouldSkipFallbackJobWhenFallbackCountReachedLimit() { + val refreshService = Mockito.mock(AudioRankingSnapshotRefreshService::class.java) + val jobPort = FakeAudioRankingSnapshotJobPort() + repeat(3) { + jobPort.save(jobRecord(trigger = AudioRankingSnapshotJobTrigger.FALLBACK)) + } + val now = now() + val service = service( + refreshService = refreshService, + jobPort = jobPort, + redissonClient = periodLockRedissonClient(true) + ) { now } + + val refreshed = service.refreshLastCompletedWeekByFallback(AudioRankingType.REVENUE) + + assertEquals(false, refreshed) + assertEquals(3, jobPort.jobs.size) + Mockito.verify(refreshService, Mockito.never()).refreshLastCompletedWeek(AudioRankingType.REVENUE, now) + } + + @Test + fun shouldUseTypeAndPeriodScopedLock() { + val refreshService = Mockito.mock(AudioRankingSnapshotRefreshService::class.java) + val jobPort = FakeAudioRankingSnapshotJobPort() + val redissonClient = periodLockRedissonClient(true) + val now = now() + val service = service(refreshService = refreshService, jobPort = jobPort, redissonClient = redissonClient) { now } + + service.refreshLastCompletedWeekByScheduledJob(AudioRankingType.REVENUE) + + Mockito.verify(redissonClient).getLock( + "lock:content-ranking-snapshot-refresh:REVENUE:2026-05-31T15:00:2026-06-07T15:00" + ) + } + + private fun service( + refreshService: AudioRankingSnapshotRefreshService, + jobPort: AudioRankingSnapshotJobPort, + redissonClient: RedissonClient, + nowProvider: () -> ZonedDateTime + ): AudioRankingSnapshotJobService { + return AudioRankingSnapshotJobService( + refreshService = refreshService, + jobPort = jobPort, + redissonClient = redissonClient, + transactionManager = transactionManager(), + nowProvider = nowProvider + ) + } + + private fun now(): ZonedDateTime { + return ZonedDateTime.of(2026, 6, 8, 6, 0, 0, 0, ZoneId.of("Asia/Seoul")) + } + + private fun periodLockRedissonClient(lockAcquired: Boolean): RedissonClient { + val redissonClient = Mockito.mock(RedissonClient::class.java) + val lock = Mockito.mock(RLock::class.java) + val lockName = "lock:content-ranking-snapshot-refresh:REVENUE:2026-05-31T15:00:2026-06-07T15:00" + Mockito.`when`(redissonClient.getLock(Mockito.anyString())).thenReturn(lock) + Mockito.`when`(redissonClient.getLock(lockName)).thenReturn(lock) + Mockito.`when`(lock.tryLock(0, -1, TimeUnit.SECONDS)).thenReturn(lockAcquired) + Mockito.`when`(lock.isHeldByCurrentThread).thenReturn(lockAcquired) + return redissonClient + } +} + +private fun transactionManager(): PlatformTransactionManager { + val transactionManager = Mockito.mock(PlatformTransactionManager::class.java) + Mockito.`when`(transactionManager.getTransaction(Mockito.any(TransactionDefinition::class.java))) + .thenAnswer { SimpleTransactionStatus() } + return transactionManager +} + +private fun jobRecord( + rankingType: AudioRankingType = AudioRankingType.REVENUE, + trigger: AudioRankingSnapshotJobTrigger = AudioRankingSnapshotJobTrigger.SCHEDULED, + status: AudioRankingSnapshotJobStatus = AudioRankingSnapshotJobStatus.PENDING +): AudioRankingSnapshotJobRecord { + return AudioRankingSnapshotJobRecord( + rankingType = rankingType, + aggregationStartAtUtc = LocalDateTime.of(2026, 5, 31, 15, 0), + aggregationEndAtUtc = LocalDateTime.of(2026, 6, 7, 15, 0), + visibleFromAtUtc = LocalDateTime.of(2026, 6, 8, 0, 0), + trigger = trigger, + status = status, + lastError = null, + processingStartedAt = null, + processedAt = null + ) +} + +private class FakeAudioRankingSnapshotJobPort : AudioRankingSnapshotJobPort { + val jobs = mutableListOf() + private var nextId = 1L + + override fun save(job: AudioRankingSnapshotJobRecord): AudioRankingSnapshotJobRecord { + val saved = job.copy(id = job.id ?: nextId++) + jobs.add(saved) + return saved + } + + override fun findById(jobId: Long): AudioRankingSnapshotJobRecord? = jobs.firstOrNull { it.id == jobId } + + override fun findByRankingTypeAndPeriodAndStatuses( + rankingType: AudioRankingType, + aggregationStartAtUtc: LocalDateTime, + aggregationEndAtUtc: LocalDateTime, + statuses: List + ): List { + return jobs.filter { + it.rankingType == rankingType && + it.aggregationStartAtUtc == aggregationStartAtUtc && + it.aggregationEndAtUtc == aggregationEndAtUtc && + it.status in statuses + } + } + + override fun countByRankingTypeAndPeriodAndTrigger( + rankingType: AudioRankingType, + aggregationStartAtUtc: LocalDateTime, + aggregationEndAtUtc: LocalDateTime, + trigger: AudioRankingSnapshotJobTrigger + ): Long { + return jobs.count { + it.rankingType == rankingType && + it.aggregationStartAtUtc == aggregationStartAtUtc && + it.aggregationEndAtUtc == aggregationEndAtUtc && + it.trigger == trigger + }.toLong() + } + + override fun markProcessing(jobId: Long, processingStartedAt: LocalDateTime): AudioRankingSnapshotJobRecord? { + return update(jobId) { + it.copy( + status = AudioRankingSnapshotJobStatus.PROCESSING, + processingStartedAt = processingStartedAt + ) + } + } + + override fun markDone(jobId: Long, processedAt: LocalDateTime): AudioRankingSnapshotJobRecord? { + return update(jobId) { + it.copy( + status = AudioRankingSnapshotJobStatus.DONE, + processedAt = processedAt, + lastError = null + ) + } + } + + override fun markFailed(jobId: Long, processedAt: LocalDateTime, lastError: String?): AudioRankingSnapshotJobRecord? { + return update(jobId) { + it.copy( + status = AudioRankingSnapshotJobStatus.FAILED, + processedAt = processedAt, + lastError = lastError + ) + } + } + + private fun update( + jobId: Long, + transform: (AudioRankingSnapshotJobRecord) -> AudioRankingSnapshotJobRecord + ): AudioRankingSnapshotJobRecord? { + val index = jobs.indexOfFirst { it.id == jobId } + if (index < 0) return null + val updated = transform(jobs[index]) + jobs[index] = updated + return updated + } +}