test #426
@@ -1,6 +1,7 @@
|
|||||||
package kr.co.vividnext.sodalive.v2.ranking.application
|
package kr.co.vividnext.sodalive.v2.ranking.application
|
||||||
|
|
||||||
import kr.co.vividnext.sodalive.v2.ranking.domain.CreatorRankingPeriodPolicy
|
import kr.co.vividnext.sodalive.v2.ranking.domain.CreatorRankingPeriodPolicy
|
||||||
|
import kr.co.vividnext.sodalive.v2.ranking.domain.CreatorRankingType
|
||||||
import kr.co.vividnext.sodalive.v2.ranking.domain.CreatorRankingUtcRange
|
import kr.co.vividnext.sodalive.v2.ranking.domain.CreatorRankingUtcRange
|
||||||
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingSnapshotJobPort
|
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingSnapshotJobPort
|
||||||
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingSnapshotJobRecord
|
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingSnapshotJobRecord
|
||||||
@@ -34,38 +35,71 @@ class CreatorRankingSnapshotJobService(
|
|||||||
|
|
||||||
fun refreshLastCompletedWeekByScheduledJob() {
|
fun refreshLastCompletedWeekByScheduledJob() {
|
||||||
withLastCompletedWeekPeriodLock { now, utcRange ->
|
withLastCompletedWeekPeriodLock { now, utcRange ->
|
||||||
transactionTemplate.executeWithoutResult {
|
|
||||||
refreshLastCompletedWeekByScheduledJob(now, utcRange)
|
refreshLastCompletedWeekByScheduledJob(now, utcRange)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
private fun refreshLastCompletedWeekByScheduledJob(
|
private fun refreshLastCompletedWeekByScheduledJob(
|
||||||
now: ZonedDateTime,
|
now: ZonedDateTime,
|
||||||
utcRange: CreatorRankingUtcRange
|
utcRange: CreatorRankingUtcRange
|
||||||
) {
|
) {
|
||||||
val job = jobPort.save(
|
val job = savePendingJob(utcRange, CreatorRankingSnapshotJobTrigger.SCHEDULED)
|
||||||
|
val jobId = job.id ?: return
|
||||||
|
markProcessing(jobId)
|
||||||
|
logJobStatusChanged(job, CreatorRankingSnapshotJobStatus.PROCESSING)
|
||||||
|
try {
|
||||||
|
refresh(now)
|
||||||
|
markDone(jobId)
|
||||||
|
logJobStatusChanged(job, CreatorRankingSnapshotJobStatus.DONE)
|
||||||
|
} catch (ex: Exception) {
|
||||||
|
markFailed(jobId, ex.message)
|
||||||
|
logJobStatusChanged(job, CreatorRankingSnapshotJobStatus.FAILED, ex.message)
|
||||||
|
throw ex
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun refresh(now: ZonedDateTime) {
|
||||||
|
transactionTemplate.executeWithoutResult {
|
||||||
|
refreshService.refreshLastCompletedWeek(now)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun savePendingJob(
|
||||||
|
utcRange: CreatorRankingUtcRange,
|
||||||
|
trigger: CreatorRankingSnapshotJobTrigger
|
||||||
|
): CreatorRankingSnapshotJobRecord {
|
||||||
|
return transactionTemplate.execute {
|
||||||
|
jobPort.save(
|
||||||
CreatorRankingSnapshotJobRecord(
|
CreatorRankingSnapshotJobRecord(
|
||||||
|
rankingType = CreatorRankingType.WEEKLY,
|
||||||
aggregationStartAtUtc = utcRange.startInclusiveUtc,
|
aggregationStartAtUtc = utcRange.startInclusiveUtc,
|
||||||
aggregationEndAtUtc = utcRange.endExclusiveUtc,
|
aggregationEndAtUtc = utcRange.endExclusiveUtc,
|
||||||
trigger = CreatorRankingSnapshotJobTrigger.SCHEDULED,
|
visibleFromAtUtc = utcRange.endExclusiveUtc.plusHours(9),
|
||||||
|
trigger = trigger,
|
||||||
status = CreatorRankingSnapshotJobStatus.PENDING,
|
status = CreatorRankingSnapshotJobStatus.PENDING,
|
||||||
lastError = null,
|
lastError = null,
|
||||||
processingStartedAt = null,
|
processingStartedAt = null,
|
||||||
processedAt = null
|
processedAt = null
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
val jobId = job.id ?: return
|
}!!
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun markProcessing(jobId: Long) {
|
||||||
|
transactionTemplate.executeWithoutResult {
|
||||||
jobPort.markProcessing(jobId, LocalDateTime.now())
|
jobPort.markProcessing(jobId, LocalDateTime.now())
|
||||||
logJobStatusChanged(job, CreatorRankingSnapshotJobStatus.PROCESSING)
|
}
|
||||||
try {
|
}
|
||||||
refreshService.refreshLastCompletedWeek(now)
|
|
||||||
|
private fun markDone(jobId: Long) {
|
||||||
|
transactionTemplate.executeWithoutResult {
|
||||||
jobPort.markDone(jobId, LocalDateTime.now())
|
jobPort.markDone(jobId, LocalDateTime.now())
|
||||||
logJobStatusChanged(job, CreatorRankingSnapshotJobStatus.DONE)
|
}
|
||||||
} catch (ex: Exception) {
|
}
|
||||||
jobPort.markFailed(jobId, LocalDateTime.now(), ex.message)
|
|
||||||
logJobStatusChanged(job, CreatorRankingSnapshotJobStatus.FAILED, ex.message)
|
private fun markFailed(jobId: Long, message: String?) {
|
||||||
throw ex
|
transactionTemplate.executeWithoutResult {
|
||||||
|
jobPort.markFailed(jobId, LocalDateTime.now(), message)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -76,8 +110,10 @@ class CreatorRankingSnapshotJobService(
|
|||||||
): CreatorRankingSnapshotJobRecord {
|
): CreatorRankingSnapshotJobRecord {
|
||||||
return jobPort.save(
|
return jobPort.save(
|
||||||
CreatorRankingSnapshotJobRecord(
|
CreatorRankingSnapshotJobRecord(
|
||||||
|
rankingType = CreatorRankingType.WEEKLY,
|
||||||
aggregationStartAtUtc = aggregationStartAtUtc,
|
aggregationStartAtUtc = aggregationStartAtUtc,
|
||||||
aggregationEndAtUtc = aggregationEndAtUtc,
|
aggregationEndAtUtc = aggregationEndAtUtc,
|
||||||
|
visibleFromAtUtc = aggregationEndAtUtc.plusHours(9),
|
||||||
trigger = CreatorRankingSnapshotJobTrigger.MANUAL,
|
trigger = CreatorRankingSnapshotJobTrigger.MANUAL,
|
||||||
status = CreatorRankingSnapshotJobStatus.PENDING,
|
status = CreatorRankingSnapshotJobStatus.PENDING,
|
||||||
lastError = null,
|
lastError = null,
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
package kr.co.vividnext.sodalive.v2.ranking.application
|
package kr.co.vividnext.sodalive.v2.ranking.application
|
||||||
|
|
||||||
|
import kr.co.vividnext.sodalive.v2.ranking.domain.CreatorRankingType
|
||||||
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingSnapshotJobPort
|
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingSnapshotJobPort
|
||||||
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingSnapshotJobRecord
|
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingSnapshotJobRecord
|
||||||
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingSnapshotJobStatus
|
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingSnapshotJobStatus
|
||||||
@@ -37,8 +38,10 @@ class CreatorRankingSnapshotJobServiceTest {
|
|||||||
service.refreshLastCompletedWeekByScheduledJob()
|
service.refreshLastCompletedWeekByScheduledJob()
|
||||||
|
|
||||||
val job = jobPort.jobs.single()
|
val job = jobPort.jobs.single()
|
||||||
|
assertEquals(CreatorRankingType.WEEKLY, job.rankingType)
|
||||||
assertEquals(LocalDateTime.of(2026, 5, 31, 15, 0), job.aggregationStartAtUtc)
|
assertEquals(LocalDateTime.of(2026, 5, 31, 15, 0), job.aggregationStartAtUtc)
|
||||||
assertEquals(LocalDateTime.of(2026, 6, 7, 15, 0), job.aggregationEndAtUtc)
|
assertEquals(LocalDateTime.of(2026, 6, 7, 15, 0), job.aggregationEndAtUtc)
|
||||||
|
assertEquals(LocalDateTime.of(2026, 6, 8, 0, 0), job.visibleFromAtUtc)
|
||||||
assertEquals(CreatorRankingSnapshotJobTrigger.SCHEDULED, job.trigger)
|
assertEquals(CreatorRankingSnapshotJobTrigger.SCHEDULED, job.trigger)
|
||||||
assertEquals(CreatorRankingSnapshotJobStatus.DONE, job.status)
|
assertEquals(CreatorRankingSnapshotJobStatus.DONE, job.status)
|
||||||
assertEquals(null, job.lastError)
|
assertEquals(null, job.lastError)
|
||||||
@@ -78,6 +81,8 @@ class CreatorRankingSnapshotJobServiceTest {
|
|||||||
|
|
||||||
assertEquals(startAt, job.aggregationStartAtUtc)
|
assertEquals(startAt, job.aggregationStartAtUtc)
|
||||||
assertEquals(endAt, job.aggregationEndAtUtc)
|
assertEquals(endAt, job.aggregationEndAtUtc)
|
||||||
|
assertEquals(CreatorRankingType.WEEKLY, job.rankingType)
|
||||||
|
assertEquals(LocalDateTime.of(2026, 6, 8, 0, 0), job.visibleFromAtUtc)
|
||||||
assertEquals(CreatorRankingSnapshotJobTrigger.MANUAL, job.trigger)
|
assertEquals(CreatorRankingSnapshotJobTrigger.MANUAL, job.trigger)
|
||||||
assertEquals(CreatorRankingSnapshotJobStatus.PENDING, job.status)
|
assertEquals(CreatorRankingSnapshotJobStatus.PENDING, job.status)
|
||||||
assertEquals(null, job.lastError)
|
assertEquals(null, job.lastError)
|
||||||
@@ -95,8 +100,10 @@ class CreatorRankingSnapshotJobServiceTest {
|
|||||||
val endAt = LocalDateTime.of(2026, 6, 7, 15, 0)
|
val endAt = LocalDateTime.of(2026, 6, 7, 15, 0)
|
||||||
val failed = jobPort.save(
|
val failed = jobPort.save(
|
||||||
CreatorRankingSnapshotJobRecord(
|
CreatorRankingSnapshotJobRecord(
|
||||||
|
rankingType = CreatorRankingType.WEEKLY,
|
||||||
aggregationStartAtUtc = startAt,
|
aggregationStartAtUtc = startAt,
|
||||||
aggregationEndAtUtc = endAt,
|
aggregationEndAtUtc = endAt,
|
||||||
|
visibleFromAtUtc = endAt.plusHours(9),
|
||||||
trigger = CreatorRankingSnapshotJobTrigger.MANUAL,
|
trigger = CreatorRankingSnapshotJobTrigger.MANUAL,
|
||||||
status = CreatorRankingSnapshotJobStatus.FAILED,
|
status = CreatorRankingSnapshotJobStatus.FAILED,
|
||||||
lastError = "aggregate failed",
|
lastError = "aggregate failed",
|
||||||
@@ -106,8 +113,10 @@ class CreatorRankingSnapshotJobServiceTest {
|
|||||||
)
|
)
|
||||||
jobPort.save(
|
jobPort.save(
|
||||||
CreatorRankingSnapshotJobRecord(
|
CreatorRankingSnapshotJobRecord(
|
||||||
|
rankingType = CreatorRankingType.WEEKLY,
|
||||||
aggregationStartAtUtc = startAt,
|
aggregationStartAtUtc = startAt,
|
||||||
aggregationEndAtUtc = endAt,
|
aggregationEndAtUtc = endAt,
|
||||||
|
visibleFromAtUtc = endAt.plusHours(9),
|
||||||
trigger = CreatorRankingSnapshotJobTrigger.MANUAL,
|
trigger = CreatorRankingSnapshotJobTrigger.MANUAL,
|
||||||
status = CreatorRankingSnapshotJobStatus.DONE,
|
status = CreatorRankingSnapshotJobStatus.DONE,
|
||||||
lastError = null,
|
lastError = null,
|
||||||
@@ -133,8 +142,10 @@ class CreatorRankingSnapshotJobServiceTest {
|
|||||||
val service = CreatorRankingSnapshotJobService(refreshService, jobPort, unusedRedissonClient(), transactionManager())
|
val service = CreatorRankingSnapshotJobService(refreshService, jobPort, unusedRedissonClient(), transactionManager())
|
||||||
val failed = jobPort.save(
|
val failed = jobPort.save(
|
||||||
CreatorRankingSnapshotJobRecord(
|
CreatorRankingSnapshotJobRecord(
|
||||||
|
rankingType = CreatorRankingType.WEEKLY,
|
||||||
aggregationStartAtUtc = LocalDateTime.of(2026, 5, 31, 15, 0),
|
aggregationStartAtUtc = LocalDateTime.of(2026, 5, 31, 15, 0),
|
||||||
aggregationEndAtUtc = LocalDateTime.of(2026, 6, 7, 15, 0),
|
aggregationEndAtUtc = LocalDateTime.of(2026, 6, 7, 15, 0),
|
||||||
|
visibleFromAtUtc = LocalDateTime.of(2026, 6, 8, 0, 0),
|
||||||
trigger = CreatorRankingSnapshotJobTrigger.MANUAL,
|
trigger = CreatorRankingSnapshotJobTrigger.MANUAL,
|
||||||
status = CreatorRankingSnapshotJobStatus.FAILED,
|
status = CreatorRankingSnapshotJobStatus.FAILED,
|
||||||
lastError = "aggregate failed",
|
lastError = "aggregate failed",
|
||||||
@@ -144,8 +155,10 @@ class CreatorRankingSnapshotJobServiceTest {
|
|||||||
)
|
)
|
||||||
val pending = jobPort.save(
|
val pending = jobPort.save(
|
||||||
CreatorRankingSnapshotJobRecord(
|
CreatorRankingSnapshotJobRecord(
|
||||||
|
rankingType = CreatorRankingType.WEEKLY,
|
||||||
aggregationStartAtUtc = LocalDateTime.of(2026, 5, 31, 15, 0),
|
aggregationStartAtUtc = LocalDateTime.of(2026, 5, 31, 15, 0),
|
||||||
aggregationEndAtUtc = LocalDateTime.of(2026, 6, 7, 15, 0),
|
aggregationEndAtUtc = LocalDateTime.of(2026, 6, 7, 15, 0),
|
||||||
|
visibleFromAtUtc = LocalDateTime.of(2026, 6, 8, 0, 0),
|
||||||
trigger = CreatorRankingSnapshotJobTrigger.MANUAL,
|
trigger = CreatorRankingSnapshotJobTrigger.MANUAL,
|
||||||
status = CreatorRankingSnapshotJobStatus.PENDING,
|
status = CreatorRankingSnapshotJobStatus.PENDING,
|
||||||
lastError = "keep",
|
lastError = "keep",
|
||||||
@@ -270,6 +283,43 @@ class CreatorRankingSnapshotJobServiceTest {
|
|||||||
inOrder.verify(lock).unlock()
|
inOrder.verify(lock).unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("스케줄 refresh 실패 시 rollback 이후 별도 transaction으로 FAILED 상태를 커밋한다")
|
||||||
|
fun shouldCommitFailedStatusAfterRefreshTransactionRollback() {
|
||||||
|
val refreshService = Mockito.mock(CreatorRankingSnapshotRefreshService::class.java)
|
||||||
|
val jobPort = FakeCreatorRankingSnapshotJobPort()
|
||||||
|
val redissonClient = periodLockRedissonClient(lockAcquired = true)
|
||||||
|
val transactionManager = Mockito.mock(PlatformTransactionManager::class.java)
|
||||||
|
val saveStatus = SimpleTransactionStatus()
|
||||||
|
val processingStatus = SimpleTransactionStatus()
|
||||||
|
val refreshStatus = SimpleTransactionStatus()
|
||||||
|
val failedStatus = SimpleTransactionStatus()
|
||||||
|
val now = ZonedDateTime.of(2026, 6, 8, 1, 0, 0, 0, ZoneId.of("Asia/Seoul"))
|
||||||
|
Mockito.`when`(transactionManager.getTransaction(Mockito.any(TransactionDefinition::class.java)))
|
||||||
|
.thenReturn(saveStatus, processingStatus, refreshStatus, failedStatus)
|
||||||
|
Mockito.doThrow(IllegalStateException("aggregate failed"))
|
||||||
|
.`when`(refreshService).refreshLastCompletedWeek(now)
|
||||||
|
val service = CreatorRankingSnapshotJobService(
|
||||||
|
refreshService,
|
||||||
|
jobPort,
|
||||||
|
redissonClient,
|
||||||
|
transactionManager
|
||||||
|
) { now }
|
||||||
|
|
||||||
|
val exception = assertThrows(IllegalStateException::class.java) {
|
||||||
|
service.refreshLastCompletedWeekByScheduledJob()
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals("aggregate failed", exception.message)
|
||||||
|
assertEquals(CreatorRankingSnapshotJobStatus.FAILED, jobPort.jobs.single().status)
|
||||||
|
assertEquals("aggregate failed", jobPort.jobs.single().lastError)
|
||||||
|
val inOrder = Mockito.inOrder(transactionManager)
|
||||||
|
inOrder.verify(transactionManager).commit(saveStatus)
|
||||||
|
inOrder.verify(transactionManager).commit(processingStatus)
|
||||||
|
inOrder.verify(transactionManager).rollback(refreshStatus)
|
||||||
|
inOrder.verify(transactionManager).commit(failedStatus)
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@DisplayName("cold-start 스냅샷 생성은 기간 기반 lock 획득 시에만 refresh를 실행한다")
|
@DisplayName("cold-start 스냅샷 생성은 기간 기반 lock 획득 시에만 refresh를 실행한다")
|
||||||
fun shouldRefreshColdStartSnapshotOnlyWhenPeriodLockAcquired() {
|
fun shouldRefreshColdStartSnapshotOnlyWhenPeriodLockAcquired() {
|
||||||
|
|||||||
Reference in New Issue
Block a user