feat(ranking): 스냅샷 job 관리 기능을 추가한다
This commit is contained in:
@@ -59,6 +59,18 @@ class DefaultCreatorRankingSnapshotJobRepository(
|
||||
return job.toRecord()
|
||||
}
|
||||
|
||||
@Transactional
|
||||
override fun markPending(jobId: Long): CreatorRankingSnapshotJobRecord? {
|
||||
val job = repository.findByIdForUpdate(jobId) ?: return null
|
||||
if (job.status != CreatorRankingSnapshotJobStatus.FAILED) return job.toRecord()
|
||||
|
||||
job.status = CreatorRankingSnapshotJobStatus.PENDING
|
||||
job.lastError = null
|
||||
job.processingStartedAt = null
|
||||
job.processedAt = null
|
||||
return job.toRecord()
|
||||
}
|
||||
|
||||
private fun CreatorRankingSnapshotJobRecord.toEntity(): CreatorRankingSnapshotJob {
|
||||
return CreatorRankingSnapshotJob(
|
||||
aggregationStartAtUtc = aggregationStartAtUtc,
|
||||
|
||||
@@ -6,10 +6,12 @@ import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingSnapshotJobRec
|
||||
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingSnapshotJobStatus
|
||||
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingSnapshotJobTrigger
|
||||
import org.springframework.stereotype.Service
|
||||
import org.springframework.transaction.annotation.Transactional
|
||||
import java.time.LocalDateTime
|
||||
import java.time.ZonedDateTime
|
||||
|
||||
@Service
|
||||
@Transactional(readOnly = true)
|
||||
class CreatorRankingSnapshotJobService(
|
||||
private val refreshService: CreatorRankingSnapshotRefreshService,
|
||||
private val jobPort: CreatorRankingSnapshotJobPort,
|
||||
@@ -17,6 +19,7 @@ class CreatorRankingSnapshotJobService(
|
||||
) {
|
||||
private val periodPolicy = CreatorRankingPeriodPolicy()
|
||||
|
||||
@Transactional
|
||||
fun refreshLastCompletedWeekByScheduledJob() {
|
||||
val now = nowProvider()
|
||||
val period = periodPolicy.resolveLastCompletedWeek(now)
|
||||
@@ -42,4 +45,42 @@ class CreatorRankingSnapshotJobService(
|
||||
throw ex
|
||||
}
|
||||
}
|
||||
|
||||
@Transactional
|
||||
fun createManualJob(
|
||||
aggregationStartAtUtc: LocalDateTime,
|
||||
aggregationEndAtUtc: LocalDateTime
|
||||
): CreatorRankingSnapshotJobRecord {
|
||||
return jobPort.save(
|
||||
CreatorRankingSnapshotJobRecord(
|
||||
aggregationStartAtUtc = aggregationStartAtUtc,
|
||||
aggregationEndAtUtc = aggregationEndAtUtc,
|
||||
trigger = CreatorRankingSnapshotJobTrigger.MANUAL,
|
||||
status = CreatorRankingSnapshotJobStatus.PENDING,
|
||||
lastError = null,
|
||||
processingStartedAt = null,
|
||||
processedAt = null
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
fun findJobs(
|
||||
aggregationStartAtUtc: LocalDateTime,
|
||||
aggregationEndAtUtc: LocalDateTime,
|
||||
statuses: List<CreatorRankingSnapshotJobStatus> = CreatorRankingSnapshotJobStatus.values().toList()
|
||||
): List<CreatorRankingSnapshotJobRecord> {
|
||||
return jobPort.findByPeriodAndStatuses(
|
||||
aggregationStartAtUtc = aggregationStartAtUtc,
|
||||
aggregationEndAtUtc = aggregationEndAtUtc,
|
||||
statuses = statuses
|
||||
)
|
||||
}
|
||||
|
||||
@Transactional
|
||||
fun retryFailedJob(jobId: Long) {
|
||||
val job = jobPort.findById(jobId) ?: return
|
||||
if (job.status != CreatorRankingSnapshotJobStatus.FAILED) return
|
||||
|
||||
jobPort.markPending(jobId)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,8 @@ interface CreatorRankingSnapshotJobPort {
|
||||
fun markDone(jobId: Long, processedAt: LocalDateTime): CreatorRankingSnapshotJobRecord?
|
||||
|
||||
fun markFailed(jobId: Long, processedAt: LocalDateTime, lastError: String?): CreatorRankingSnapshotJobRecord?
|
||||
|
||||
fun markPending(jobId: Long): CreatorRankingSnapshotJobRecord?
|
||||
}
|
||||
|
||||
enum class CreatorRankingSnapshotJobStatus {
|
||||
|
||||
@@ -80,4 +80,51 @@ class DefaultCreatorRankingSnapshotJobRepositoryTest @Autowired constructor(
|
||||
assertEquals(CreatorRankingSnapshotJobStatus.FAILED, failedJob?.status)
|
||||
assertEquals("aggregate failed", failedJob?.lastError)
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("실패한 스냅샷 job은 PENDING으로 되돌리며 실패/처리 정보를 초기화한다")
|
||||
fun shouldMarkFailedSnapshotJobPendingForRetry() {
|
||||
val saved = adapter.save(
|
||||
CreatorRankingSnapshotJobRecord(
|
||||
aggregationStartAtUtc = LocalDateTime.of(2026, 5, 31, 15, 0),
|
||||
aggregationEndAtUtc = LocalDateTime.of(2026, 6, 7, 15, 0),
|
||||
trigger = CreatorRankingSnapshotJobTrigger.MANUAL,
|
||||
status = CreatorRankingSnapshotJobStatus.FAILED,
|
||||
lastError = "aggregate failed",
|
||||
processingStartedAt = LocalDateTime.of(2026, 6, 8, 7, 30),
|
||||
processedAt = LocalDateTime.of(2026, 6, 8, 7, 31)
|
||||
)
|
||||
)
|
||||
|
||||
val retried = adapter.markPending(saved.id!!)
|
||||
val allRows = repository.findAll()
|
||||
|
||||
assertEquals(1, allRows.size)
|
||||
assertEquals(CreatorRankingSnapshotJobStatus.PENDING, retried?.status)
|
||||
assertEquals(null, retried?.lastError)
|
||||
assertEquals(null, retried?.processingStartedAt)
|
||||
assertEquals(null, retried?.processedAt)
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("실패 상태가 아닌 스냅샷 job은 재시도 대기 상태로 변경하지 않는다")
|
||||
fun shouldNotMarkNonFailedSnapshotJobPendingForRetry() {
|
||||
val saved = adapter.save(
|
||||
CreatorRankingSnapshotJobRecord(
|
||||
aggregationStartAtUtc = LocalDateTime.of(2026, 5, 31, 15, 0),
|
||||
aggregationEndAtUtc = LocalDateTime.of(2026, 6, 7, 15, 0),
|
||||
trigger = CreatorRankingSnapshotJobTrigger.MANUAL,
|
||||
status = CreatorRankingSnapshotJobStatus.DONE,
|
||||
lastError = null,
|
||||
processingStartedAt = LocalDateTime.of(2026, 6, 8, 7, 30),
|
||||
processedAt = LocalDateTime.of(2026, 6, 8, 7, 31)
|
||||
)
|
||||
)
|
||||
|
||||
val unchanged = adapter.markPending(saved.id!!)
|
||||
|
||||
assertEquals(CreatorRankingSnapshotJobStatus.DONE, unchanged?.status)
|
||||
assertEquals(LocalDateTime.of(2026, 6, 8, 7, 30), unchanged?.processingStartedAt)
|
||||
assertEquals(LocalDateTime.of(2026, 6, 8, 7, 31), unchanged?.processedAt)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,6 +51,109 @@ class CreatorRankingSnapshotJobServiceTest {
|
||||
assertEquals(CreatorRankingSnapshotJobStatus.FAILED, jobPort.jobs.single().status)
|
||||
assertEquals("aggregate failed", jobPort.jobs.single().lastError)
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("관리자 수동 생성은 지정 UTC 기간의 MANUAL PENDING job을 만든다")
|
||||
fun shouldCreateManualPendingJobForRequestedPeriod() {
|
||||
val refreshService = Mockito.mock(CreatorRankingSnapshotRefreshService::class.java)
|
||||
val jobPort = FakeCreatorRankingSnapshotJobPort()
|
||||
val service = CreatorRankingSnapshotJobService(refreshService, jobPort)
|
||||
val startAt = LocalDateTime.of(2026, 5, 31, 15, 0)
|
||||
val endAt = LocalDateTime.of(2026, 6, 7, 15, 0)
|
||||
|
||||
val job = service.createManualJob(startAt, endAt)
|
||||
|
||||
assertEquals(startAt, job.aggregationStartAtUtc)
|
||||
assertEquals(endAt, job.aggregationEndAtUtc)
|
||||
assertEquals(CreatorRankingSnapshotJobTrigger.MANUAL, job.trigger)
|
||||
assertEquals(CreatorRankingSnapshotJobStatus.PENDING, job.status)
|
||||
assertEquals(null, job.lastError)
|
||||
assertEquals(null, job.processingStartedAt)
|
||||
assertEquals(null, job.processedAt)
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("관리자 목록 조회는 기간과 상태 조건으로 snapshot job을 조회한다")
|
||||
fun shouldFindJobsByRequestedPeriodAndStatuses() {
|
||||
val refreshService = Mockito.mock(CreatorRankingSnapshotRefreshService::class.java)
|
||||
val jobPort = FakeCreatorRankingSnapshotJobPort()
|
||||
val service = CreatorRankingSnapshotJobService(refreshService, jobPort)
|
||||
val startAt = LocalDateTime.of(2026, 5, 31, 15, 0)
|
||||
val endAt = LocalDateTime.of(2026, 6, 7, 15, 0)
|
||||
val failed = jobPort.save(
|
||||
CreatorRankingSnapshotJobRecord(
|
||||
aggregationStartAtUtc = startAt,
|
||||
aggregationEndAtUtc = endAt,
|
||||
trigger = CreatorRankingSnapshotJobTrigger.MANUAL,
|
||||
status = CreatorRankingSnapshotJobStatus.FAILED,
|
||||
lastError = "aggregate failed",
|
||||
processingStartedAt = null,
|
||||
processedAt = null
|
||||
)
|
||||
)
|
||||
jobPort.save(
|
||||
CreatorRankingSnapshotJobRecord(
|
||||
aggregationStartAtUtc = startAt,
|
||||
aggregationEndAtUtc = endAt,
|
||||
trigger = CreatorRankingSnapshotJobTrigger.MANUAL,
|
||||
status = CreatorRankingSnapshotJobStatus.DONE,
|
||||
lastError = null,
|
||||
processingStartedAt = null,
|
||||
processedAt = null
|
||||
)
|
||||
)
|
||||
|
||||
val jobs = service.findJobs(
|
||||
aggregationStartAtUtc = startAt,
|
||||
aggregationEndAtUtc = endAt,
|
||||
statuses = listOf(CreatorRankingSnapshotJobStatus.FAILED)
|
||||
)
|
||||
|
||||
assertEquals(listOf(failed.id), jobs.map { it.id })
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("관리자 실패 job 재시도는 FAILED job만 PENDING으로 되돌린다")
|
||||
fun shouldRetryOnlyFailedSnapshotJob() {
|
||||
val refreshService = Mockito.mock(CreatorRankingSnapshotRefreshService::class.java)
|
||||
val jobPort = FakeCreatorRankingSnapshotJobPort()
|
||||
val service = CreatorRankingSnapshotJobService(refreshService, jobPort)
|
||||
val failed = jobPort.save(
|
||||
CreatorRankingSnapshotJobRecord(
|
||||
aggregationStartAtUtc = LocalDateTime.of(2026, 5, 31, 15, 0),
|
||||
aggregationEndAtUtc = LocalDateTime.of(2026, 6, 7, 15, 0),
|
||||
trigger = CreatorRankingSnapshotJobTrigger.MANUAL,
|
||||
status = CreatorRankingSnapshotJobStatus.FAILED,
|
||||
lastError = "aggregate failed",
|
||||
processingStartedAt = LocalDateTime.of(2026, 6, 8, 7, 30),
|
||||
processedAt = LocalDateTime.of(2026, 6, 8, 7, 31)
|
||||
)
|
||||
)
|
||||
val pending = jobPort.save(
|
||||
CreatorRankingSnapshotJobRecord(
|
||||
aggregationStartAtUtc = LocalDateTime.of(2026, 5, 31, 15, 0),
|
||||
aggregationEndAtUtc = LocalDateTime.of(2026, 6, 7, 15, 0),
|
||||
trigger = CreatorRankingSnapshotJobTrigger.MANUAL,
|
||||
status = CreatorRankingSnapshotJobStatus.PENDING,
|
||||
lastError = "keep",
|
||||
processingStartedAt = null,
|
||||
processedAt = null
|
||||
)
|
||||
)
|
||||
|
||||
service.retryFailedJob(failed.id!!)
|
||||
service.retryFailedJob(pending.id!!)
|
||||
service.retryFailedJob(999L)
|
||||
|
||||
val retried = jobPort.findById(failed.id!!)!!
|
||||
val unchanged = jobPort.findById(pending.id!!)!!
|
||||
assertEquals(CreatorRankingSnapshotJobStatus.PENDING, retried.status)
|
||||
assertEquals(null, retried.lastError)
|
||||
assertEquals(null, retried.processingStartedAt)
|
||||
assertEquals(null, retried.processedAt)
|
||||
assertEquals(CreatorRankingSnapshotJobStatus.PENDING, unchanged.status)
|
||||
assertEquals("keep", unchanged.lastError)
|
||||
}
|
||||
}
|
||||
|
||||
private class FakeCreatorRankingSnapshotJobPort : CreatorRankingSnapshotJobPort {
|
||||
@@ -108,6 +211,17 @@ private class FakeCreatorRankingSnapshotJobPort : CreatorRankingSnapshotJobPort
|
||||
}
|
||||
}
|
||||
|
||||
override fun markPending(jobId: Long): CreatorRankingSnapshotJobRecord? {
|
||||
return update(jobId) {
|
||||
it.copy(
|
||||
status = CreatorRankingSnapshotJobStatus.PENDING,
|
||||
lastError = null,
|
||||
processingStartedAt = null,
|
||||
processedAt = null
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
private fun update(
|
||||
jobId: Long,
|
||||
updater: (CreatorRankingSnapshotJobRecord) -> CreatorRankingSnapshotJobRecord
|
||||
|
||||
Reference in New Issue
Block a user