feat(content-ranking): 랭킹 스냅샷 job 서비스를 추가한다
This commit is contained in:
@@ -0,0 +1,175 @@
|
||||
package kr.co.vividnext.sodalive.v2.content.ranking.application
|
||||
|
||||
import kr.co.vividnext.sodalive.v2.content.ranking.domain.AudioRankingPeriodPolicy
|
||||
import kr.co.vividnext.sodalive.v2.content.ranking.domain.AudioRankingSchedulePolicy
|
||||
import kr.co.vividnext.sodalive.v2.content.ranking.domain.AudioRankingType
|
||||
import kr.co.vividnext.sodalive.v2.content.ranking.domain.AudioRankingUtcRange
|
||||
import kr.co.vividnext.sodalive.v2.content.ranking.port.out.AudioRankingSnapshotJobPort
|
||||
import kr.co.vividnext.sodalive.v2.content.ranking.port.out.AudioRankingSnapshotJobRecord
|
||||
import kr.co.vividnext.sodalive.v2.content.ranking.port.out.AudioRankingSnapshotJobStatus
|
||||
import kr.co.vividnext.sodalive.v2.content.ranking.port.out.AudioRankingSnapshotJobTrigger
|
||||
import org.redisson.api.RedissonClient
|
||||
import org.slf4j.LoggerFactory
|
||||
import org.springframework.stereotype.Service
|
||||
import org.springframework.transaction.PlatformTransactionManager
|
||||
import org.springframework.transaction.TransactionDefinition
|
||||
import org.springframework.transaction.annotation.Transactional
|
||||
import org.springframework.transaction.support.TransactionTemplate
|
||||
import java.time.LocalDateTime
|
||||
import java.time.ZonedDateTime
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
@Service
|
||||
@Transactional(readOnly = true)
|
||||
class AudioRankingSnapshotJobService(
|
||||
private val refreshService: AudioRankingSnapshotRefreshService,
|
||||
private val jobPort: AudioRankingSnapshotJobPort,
|
||||
private val redissonClient: RedissonClient,
|
||||
transactionManager: PlatformTransactionManager,
|
||||
private val nowProvider: () -> ZonedDateTime = { ZonedDateTime.now() }
|
||||
) {
|
||||
private val log = LoggerFactory.getLogger(javaClass)
|
||||
private val periodPolicy = AudioRankingPeriodPolicy()
|
||||
private val schedulePolicy = AudioRankingSchedulePolicy()
|
||||
private val transactionTemplate = TransactionTemplate(transactionManager).also { template ->
|
||||
template.propagationBehavior = TransactionDefinition.PROPAGATION_REQUIRES_NEW
|
||||
}
|
||||
|
||||
fun refreshLastCompletedWeekByScheduledJob(type: AudioRankingType) {
|
||||
withLastCompletedWeekPeriodLock(type) { now, utcRange, visibleFromAtUtc ->
|
||||
refreshLastCompletedWeek(type, now, utcRange, visibleFromAtUtc, AudioRankingSnapshotJobTrigger.SCHEDULED)
|
||||
}
|
||||
}
|
||||
|
||||
fun refreshLastCompletedWeekByFallback(type: AudioRankingType): Boolean {
|
||||
var refreshed = false
|
||||
withLastCompletedWeekPeriodLock(type) { now, utcRange, visibleFromAtUtc ->
|
||||
if (fallbackCountReachedLimit(type, utcRange)) return@withLastCompletedWeekPeriodLock
|
||||
refreshLastCompletedWeek(type, now, utcRange, visibleFromAtUtc, AudioRankingSnapshotJobTrigger.FALLBACK)
|
||||
refreshed = true
|
||||
}
|
||||
return refreshed
|
||||
}
|
||||
|
||||
private fun refreshLastCompletedWeek(
|
||||
type: AudioRankingType,
|
||||
now: ZonedDateTime,
|
||||
utcRange: AudioRankingUtcRange,
|
||||
visibleFromAtUtc: LocalDateTime,
|
||||
trigger: AudioRankingSnapshotJobTrigger
|
||||
) {
|
||||
val job = savePendingJob(type, utcRange, visibleFromAtUtc, trigger)
|
||||
val jobId = job.id ?: return
|
||||
markProcessing(jobId)
|
||||
logJobStatusChanged(job, AudioRankingSnapshotJobStatus.PROCESSING)
|
||||
try {
|
||||
refresh(type, now)
|
||||
markDone(jobId)
|
||||
logJobStatusChanged(job, AudioRankingSnapshotJobStatus.DONE)
|
||||
} catch (ex: Exception) {
|
||||
markFailed(jobId, ex.message)
|
||||
logJobStatusChanged(job, AudioRankingSnapshotJobStatus.FAILED, ex.message)
|
||||
throw ex
|
||||
}
|
||||
}
|
||||
|
||||
private fun refresh(type: AudioRankingType, now: ZonedDateTime) {
|
||||
transactionTemplate.executeWithoutResult {
|
||||
refreshService.refreshLastCompletedWeek(type, now)
|
||||
}
|
||||
}
|
||||
|
||||
private fun savePendingJob(
|
||||
type: AudioRankingType,
|
||||
utcRange: AudioRankingUtcRange,
|
||||
visibleFromAtUtc: LocalDateTime,
|
||||
trigger: AudioRankingSnapshotJobTrigger
|
||||
): AudioRankingSnapshotJobRecord {
|
||||
return transactionTemplate.execute {
|
||||
jobPort.save(
|
||||
AudioRankingSnapshotJobRecord(
|
||||
rankingType = type,
|
||||
aggregationStartAtUtc = utcRange.startInclusiveUtc,
|
||||
aggregationEndAtUtc = utcRange.endExclusiveUtc,
|
||||
visibleFromAtUtc = visibleFromAtUtc,
|
||||
trigger = trigger,
|
||||
status = AudioRankingSnapshotJobStatus.PENDING,
|
||||
lastError = null,
|
||||
processingStartedAt = null,
|
||||
processedAt = null
|
||||
)
|
||||
)
|
||||
}!!
|
||||
}
|
||||
|
||||
private fun markProcessing(jobId: Long) {
|
||||
transactionTemplate.executeWithoutResult {
|
||||
jobPort.markProcessing(jobId, LocalDateTime.now())
|
||||
}
|
||||
}
|
||||
|
||||
private fun markDone(jobId: Long) {
|
||||
transactionTemplate.executeWithoutResult {
|
||||
jobPort.markDone(jobId, LocalDateTime.now())
|
||||
}
|
||||
}
|
||||
|
||||
private fun markFailed(jobId: Long, message: String?) {
|
||||
transactionTemplate.executeWithoutResult {
|
||||
jobPort.markFailed(jobId, LocalDateTime.now(), message)
|
||||
}
|
||||
}
|
||||
|
||||
private fun fallbackCountReachedLimit(type: AudioRankingType, utcRange: AudioRankingUtcRange): Boolean {
|
||||
return jobPort.countByRankingTypeAndPeriodAndTrigger(
|
||||
rankingType = type,
|
||||
aggregationStartAtUtc = utcRange.startInclusiveUtc,
|
||||
aggregationEndAtUtc = utcRange.endExclusiveUtc,
|
||||
trigger = AudioRankingSnapshotJobTrigger.FALLBACK
|
||||
) >= FALLBACK_LIMIT
|
||||
}
|
||||
|
||||
private fun withLastCompletedWeekPeriodLock(
|
||||
type: AudioRankingType,
|
||||
action: (ZonedDateTime, AudioRankingUtcRange, LocalDateTime) -> Unit
|
||||
) {
|
||||
val now = nowProvider()
|
||||
val period = periodPolicy.resolveLastCompletedWeek(now)
|
||||
val utcRange = periodPolicy.toUtcRange(period)
|
||||
val visibleFromAtUtc = schedulePolicy.resolveVisibleFromAt(period.endExclusiveKst)
|
||||
val lockName = "lock:content-ranking-snapshot-refresh:$type:${utcRange.startInclusiveUtc}:${utcRange.endExclusiveUtc}"
|
||||
val lock = redissonClient.getLock(lockName)
|
||||
|
||||
try {
|
||||
if (lock.tryLock(0, -1, TimeUnit.SECONDS)) {
|
||||
action(now, utcRange, visibleFromAtUtc)
|
||||
}
|
||||
} finally {
|
||||
if (lock.isHeldByCurrentThread) {
|
||||
lock.unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun logJobStatusChanged(
|
||||
job: AudioRankingSnapshotJobRecord,
|
||||
status: AudioRankingSnapshotJobStatus,
|
||||
error: String? = null
|
||||
) {
|
||||
log.info(
|
||||
"event=content_ranking_snapshot_job_status_changed " +
|
||||
"jobId={} rankingType={} trigger={} status={} aggregationStartAtUtc={} aggregationEndAtUtc={} error={}",
|
||||
job.id,
|
||||
job.rankingType,
|
||||
job.trigger,
|
||||
status,
|
||||
job.aggregationStartAtUtc,
|
||||
job.aggregationEndAtUtc,
|
||||
error
|
||||
)
|
||||
}
|
||||
|
||||
companion object {
|
||||
private const val FALLBACK_LIMIT = 3L
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user