diff --git a/src/main/kotlin/kr/co/vividnext/sodalive/v2/content/ranking/adapter/out/persistence/AudioRankingSnapshotJob.kt b/src/main/kotlin/kr/co/vividnext/sodalive/v2/content/ranking/adapter/out/persistence/AudioRankingSnapshotJob.kt new file mode 100644 index 00000000..40bc7d96 --- /dev/null +++ b/src/main/kotlin/kr/co/vividnext/sodalive/v2/content/ranking/adapter/out/persistence/AudioRankingSnapshotJob.kt @@ -0,0 +1,46 @@ +package kr.co.vividnext.sodalive.v2.content.ranking.adapter.out.persistence + +import kr.co.vividnext.sodalive.common.BaseEntity +import kr.co.vividnext.sodalive.v2.content.ranking.domain.AudioRankingType +import kr.co.vividnext.sodalive.v2.content.ranking.port.out.AudioRankingSnapshotJobStatus +import kr.co.vividnext.sodalive.v2.content.ranking.port.out.AudioRankingSnapshotJobTrigger +import java.time.LocalDateTime +import javax.persistence.Column +import javax.persistence.Entity +import javax.persistence.EnumType +import javax.persistence.Enumerated +import javax.persistence.Table + +@Entity +@Table(name = "content_ranking_snapshot_job") +class AudioRankingSnapshotJob( + @Enumerated(EnumType.STRING) + @Column(name = "ranking_type", nullable = false, length = 30) + val rankingType: AudioRankingType, + + @Column(name = "aggregation_start_at_utc", nullable = false) + val aggregationStartAtUtc: LocalDateTime, + + @Column(name = "aggregation_end_at_utc", nullable = false) + val aggregationEndAtUtc: LocalDateTime, + + @Column(name = "visible_from_at", nullable = false) + val visibleFromAtUtc: LocalDateTime, + + @Enumerated(EnumType.STRING) + @Column(name = "trigger_type", nullable = false, length = 20) + val trigger: AudioRankingSnapshotJobTrigger, + + @Enumerated(EnumType.STRING) + @Column(name = "status", nullable = false, length = 20) + var status: AudioRankingSnapshotJobStatus = AudioRankingSnapshotJobStatus.PENDING, + + @Column(name = "last_error", columnDefinition = "text") + var lastError: String? = null, + + @Column(name = "processing_started_at") + var processingStartedAt: LocalDateTime? = null, + + @Column(name = "processed_at") + var processedAt: LocalDateTime? = null +) : BaseEntity() diff --git a/src/main/kotlin/kr/co/vividnext/sodalive/v2/content/ranking/adapter/out/persistence/AudioRankingSnapshotJobRepository.kt b/src/main/kotlin/kr/co/vividnext/sodalive/v2/content/ranking/adapter/out/persistence/AudioRankingSnapshotJobRepository.kt new file mode 100644 index 00000000..560031dc --- /dev/null +++ b/src/main/kotlin/kr/co/vividnext/sodalive/v2/content/ranking/adapter/out/persistence/AudioRankingSnapshotJobRepository.kt @@ -0,0 +1,31 @@ +package kr.co.vividnext.sodalive.v2.content.ranking.adapter.out.persistence + +import kr.co.vividnext.sodalive.v2.content.ranking.domain.AudioRankingType +import kr.co.vividnext.sodalive.v2.content.ranking.port.out.AudioRankingSnapshotJobStatus +import kr.co.vividnext.sodalive.v2.content.ranking.port.out.AudioRankingSnapshotJobTrigger +import org.springframework.data.jpa.repository.JpaRepository +import org.springframework.data.jpa.repository.Lock +import org.springframework.data.jpa.repository.Query +import org.springframework.data.repository.query.Param +import java.time.LocalDateTime +import javax.persistence.LockModeType + +interface AudioRankingSnapshotJobRepository : JpaRepository { + @Lock(LockModeType.PESSIMISTIC_WRITE) + @Query("select j from AudioRankingSnapshotJob j where j.id = :jobId") + fun findByIdForUpdate(@Param("jobId") jobId: Long): AudioRankingSnapshotJob? + + fun findAllByRankingTypeAndAggregationStartAtUtcAndAggregationEndAtUtcAndStatusInOrderByCreatedAtDesc( + rankingType: AudioRankingType, + aggregationStartAtUtc: LocalDateTime, + aggregationEndAtUtc: LocalDateTime, + statuses: List + ): List + + fun countByRankingTypeAndAggregationStartAtUtcAndAggregationEndAtUtcAndTrigger( + rankingType: AudioRankingType, + aggregationStartAtUtc: LocalDateTime, + aggregationEndAtUtc: LocalDateTime, + trigger: AudioRankingSnapshotJobTrigger + ): Long +} diff --git a/src/main/kotlin/kr/co/vividnext/sodalive/v2/content/ranking/adapter/out/persistence/DefaultAudioRankingSnapshotJobRepository.kt b/src/main/kotlin/kr/co/vividnext/sodalive/v2/content/ranking/adapter/out/persistence/DefaultAudioRankingSnapshotJobRepository.kt new file mode 100644 index 00000000..944debed --- /dev/null +++ b/src/main/kotlin/kr/co/vividnext/sodalive/v2/content/ranking/adapter/out/persistence/DefaultAudioRankingSnapshotJobRepository.kt @@ -0,0 +1,112 @@ +package kr.co.vividnext.sodalive.v2.content.ranking.adapter.out.persistence + +import kr.co.vividnext.sodalive.v2.content.ranking.domain.AudioRankingType +import kr.co.vividnext.sodalive.v2.content.ranking.port.out.AudioRankingSnapshotJobPort +import kr.co.vividnext.sodalive.v2.content.ranking.port.out.AudioRankingSnapshotJobRecord +import kr.co.vividnext.sodalive.v2.content.ranking.port.out.AudioRankingSnapshotJobStatus +import kr.co.vividnext.sodalive.v2.content.ranking.port.out.AudioRankingSnapshotJobTrigger +import org.springframework.stereotype.Repository +import org.springframework.transaction.annotation.Transactional +import java.time.LocalDateTime + +@Repository +class DefaultAudioRankingSnapshotJobRepository( + private val repository: AudioRankingSnapshotJobRepository +) : AudioRankingSnapshotJobPort { + @Transactional + override fun save(job: AudioRankingSnapshotJobRecord): AudioRankingSnapshotJobRecord { + return repository.save(job.toEntity()).toRecord() + } + + override fun findById(jobId: Long): AudioRankingSnapshotJobRecord? { + return repository.findById(jobId).orElse(null)?.toRecord() + } + + override fun findByRankingTypeAndPeriodAndStatuses( + rankingType: AudioRankingType, + aggregationStartAtUtc: LocalDateTime, + aggregationEndAtUtc: LocalDateTime, + statuses: List + ): List { + return repository.findAllByRankingTypeAndAggregationStartAtUtcAndAggregationEndAtUtcAndStatusInOrderByCreatedAtDesc( + rankingType = rankingType, + aggregationStartAtUtc = aggregationStartAtUtc, + aggregationEndAtUtc = aggregationEndAtUtc, + statuses = statuses + ).map { it.toRecord() } + } + + override fun countByRankingTypeAndPeriodAndTrigger( + rankingType: AudioRankingType, + aggregationStartAtUtc: LocalDateTime, + aggregationEndAtUtc: LocalDateTime, + trigger: AudioRankingSnapshotJobTrigger + ): Long { + return repository.countByRankingTypeAndAggregationStartAtUtcAndAggregationEndAtUtcAndTrigger( + rankingType = rankingType, + aggregationStartAtUtc = aggregationStartAtUtc, + aggregationEndAtUtc = aggregationEndAtUtc, + trigger = trigger + ) + } + + @Transactional + override fun markProcessing(jobId: Long, processingStartedAt: LocalDateTime): AudioRankingSnapshotJobRecord? { + val job = repository.findByIdForUpdate(jobId) ?: return null + job.status = AudioRankingSnapshotJobStatus.PROCESSING + job.processingStartedAt = processingStartedAt + job.lastError = null + return job.toRecord() + } + + @Transactional + override fun markDone(jobId: Long, processedAt: LocalDateTime): AudioRankingSnapshotJobRecord? { + val job = repository.findByIdForUpdate(jobId) ?: return null + job.status = AudioRankingSnapshotJobStatus.DONE + job.processedAt = processedAt + job.lastError = null + return job.toRecord() + } + + @Transactional + override fun markFailed(jobId: Long, processedAt: LocalDateTime, lastError: String?): AudioRankingSnapshotJobRecord? { + val job = repository.findByIdForUpdate(jobId) ?: return null + job.status = AudioRankingSnapshotJobStatus.FAILED + job.processedAt = processedAt + job.lastError = lastError?.take(MAX_ERROR_LENGTH) + return job.toRecord() + } + + private fun AudioRankingSnapshotJobRecord.toEntity(): AudioRankingSnapshotJob { + return AudioRankingSnapshotJob( + rankingType = rankingType, + aggregationStartAtUtc = aggregationStartAtUtc, + aggregationEndAtUtc = aggregationEndAtUtc, + visibleFromAtUtc = visibleFromAtUtc, + trigger = trigger, + status = status, + lastError = lastError, + processingStartedAt = processingStartedAt, + processedAt = processedAt + ) + } + + private fun AudioRankingSnapshotJob.toRecord(): AudioRankingSnapshotJobRecord { + return AudioRankingSnapshotJobRecord( + id = id, + rankingType = rankingType, + aggregationStartAtUtc = aggregationStartAtUtc, + aggregationEndAtUtc = aggregationEndAtUtc, + visibleFromAtUtc = visibleFromAtUtc, + trigger = trigger, + status = status, + lastError = lastError, + processingStartedAt = processingStartedAt, + processedAt = processedAt + ) + } + + companion object { + private const val MAX_ERROR_LENGTH = 1000 + } +} diff --git a/src/main/kotlin/kr/co/vividnext/sodalive/v2/content/ranking/port/out/AudioRankingSnapshotJobPort.kt b/src/main/kotlin/kr/co/vividnext/sodalive/v2/content/ranking/port/out/AudioRankingSnapshotJobPort.kt new file mode 100644 index 00000000..4bdc8a0f --- /dev/null +++ b/src/main/kotlin/kr/co/vividnext/sodalive/v2/content/ranking/port/out/AudioRankingSnapshotJobPort.kt @@ -0,0 +1,56 @@ +package kr.co.vividnext.sodalive.v2.content.ranking.port.out + +import kr.co.vividnext.sodalive.v2.content.ranking.domain.AudioRankingType +import java.time.LocalDateTime + +interface AudioRankingSnapshotJobPort { + fun save(job: AudioRankingSnapshotJobRecord): AudioRankingSnapshotJobRecord + + fun findById(jobId: Long): AudioRankingSnapshotJobRecord? + + fun findByRankingTypeAndPeriodAndStatuses( + rankingType: AudioRankingType, + aggregationStartAtUtc: LocalDateTime, + aggregationEndAtUtc: LocalDateTime, + statuses: List + ): List + + fun countByRankingTypeAndPeriodAndTrigger( + rankingType: AudioRankingType, + aggregationStartAtUtc: LocalDateTime, + aggregationEndAtUtc: LocalDateTime, + trigger: AudioRankingSnapshotJobTrigger + ): Long + + fun markProcessing(jobId: Long, processingStartedAt: LocalDateTime): AudioRankingSnapshotJobRecord? + + fun markDone(jobId: Long, processedAt: LocalDateTime): AudioRankingSnapshotJobRecord? + + fun markFailed(jobId: Long, processedAt: LocalDateTime, lastError: String?): AudioRankingSnapshotJobRecord? +} + +enum class AudioRankingSnapshotJobStatus { + PENDING, + PROCESSING, + DONE, + FAILED +} + +enum class AudioRankingSnapshotJobTrigger { + SCHEDULED, + MANUAL, + FALLBACK +} + +data class AudioRankingSnapshotJobRecord( + val id: Long? = null, + val rankingType: AudioRankingType, + val aggregationStartAtUtc: LocalDateTime, + val aggregationEndAtUtc: LocalDateTime, + val visibleFromAtUtc: LocalDateTime, + val trigger: AudioRankingSnapshotJobTrigger, + val status: AudioRankingSnapshotJobStatus, + val lastError: String?, + val processingStartedAt: LocalDateTime?, + val processedAt: LocalDateTime? +) diff --git a/src/test/kotlin/kr/co/vividnext/sodalive/v2/content/ranking/adapter/out/persistence/DefaultAudioRankingSnapshotJobRepositoryTest.kt b/src/test/kotlin/kr/co/vividnext/sodalive/v2/content/ranking/adapter/out/persistence/DefaultAudioRankingSnapshotJobRepositoryTest.kt new file mode 100644 index 00000000..468a886c --- /dev/null +++ b/src/test/kotlin/kr/co/vividnext/sodalive/v2/content/ranking/adapter/out/persistence/DefaultAudioRankingSnapshotJobRepositoryTest.kt @@ -0,0 +1,114 @@ +package kr.co.vividnext.sodalive.v2.content.ranking.adapter.out.persistence + +import kr.co.vividnext.sodalive.configs.QueryDslConfig +import kr.co.vividnext.sodalive.v2.content.ranking.domain.AudioRankingType +import kr.co.vividnext.sodalive.v2.content.ranking.port.out.AudioRankingSnapshotJobRecord +import kr.co.vividnext.sodalive.v2.content.ranking.port.out.AudioRankingSnapshotJobStatus +import kr.co.vividnext.sodalive.v2.content.ranking.port.out.AudioRankingSnapshotJobTrigger +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest +import org.springframework.context.annotation.Import +import java.time.LocalDateTime + +@DataJpaTest( + properties = [ + "spring.cache.type=none", + "spring.datasource.url=jdbc:h2:mem:testdb;MODE=MySQL;NON_KEYWORDS=VALUE" + ] +) +@Import(QueryDslConfig::class) +class DefaultAudioRankingSnapshotJobRepositoryTest @Autowired constructor( + private val repository: AudioRankingSnapshotJobRepository +) { + private val adapter = DefaultAudioRankingSnapshotJobRepository(repository) + + @Test + @DisplayName("스냅샷 job은 랭킹 타입, 기간, 트리거, 상태와 처리 정보를 저장하고 변경한다") + fun shouldSaveAndUpdateSnapshotJobHistory() { + val startAt = LocalDateTime.of(2026, 5, 31, 15, 0) + val endAt = LocalDateTime.of(2026, 6, 7, 15, 0) + val visibleAt = LocalDateTime.of(2026, 6, 8, 0, 0) + val saved = adapter.save(jobRecord(startAt = startAt, endAt = endAt, visibleAt = visibleAt)) + val jobId = saved.id!! + + adapter.markProcessing(jobId, LocalDateTime.of(2026, 6, 8, 1, 0)) + adapter.markFailed(jobId, LocalDateTime.of(2026, 6, 8, 1, 1), "aggregate failed") + + val failed = adapter.findById(jobId) + assertEquals(AudioRankingType.WEEKLY_POPULAR, failed?.rankingType) + assertEquals(AudioRankingSnapshotJobTrigger.SCHEDULED, failed?.trigger) + assertEquals(AudioRankingSnapshotJobStatus.FAILED, failed?.status) + assertEquals("aggregate failed", failed?.lastError) + + adapter.markProcessing(jobId, LocalDateTime.of(2026, 6, 8, 1, 2)) + adapter.markDone(jobId, LocalDateTime.of(2026, 6, 8, 1, 3)) + + val doneJobs = adapter.findByRankingTypeAndPeriodAndStatuses( + rankingType = AudioRankingType.WEEKLY_POPULAR, + aggregationStartAtUtc = startAt, + aggregationEndAtUtc = endAt, + statuses = listOf(AudioRankingSnapshotJobStatus.DONE) + ) + assertEquals(1, doneJobs.size) + assertEquals(AudioRankingSnapshotJobStatus.DONE, doneJobs.single().status) + assertEquals(null, doneJobs.single().lastError) + } + + @Test + @DisplayName("fallback job 수는 랭킹 타입과 집계 기간과 FALLBACK 트리거 기준으로만 계산한다") + fun shouldCountFallbackJobsByRankingTypeAndPeriodAndTrigger() { + val startAt = LocalDateTime.of(2026, 5, 31, 15, 0) + val endAt = LocalDateTime.of(2026, 6, 7, 15, 0) + adapter.save(jobRecord(startAt = startAt, endAt = endAt, trigger = AudioRankingSnapshotJobTrigger.FALLBACK)) + adapter.save(jobRecord(startAt = startAt, endAt = endAt, trigger = AudioRankingSnapshotJobTrigger.FALLBACK)) + adapter.save(jobRecord(startAt = startAt, endAt = endAt, trigger = AudioRankingSnapshotJobTrigger.SCHEDULED)) + adapter.save( + jobRecord( + startAt = startAt, + endAt = endAt, + rankingType = AudioRankingType.RISING, + trigger = AudioRankingSnapshotJobTrigger.FALLBACK + ) + ) + adapter.save( + jobRecord( + startAt = startAt.minusWeeks(1), + endAt = endAt.minusWeeks(1), + trigger = AudioRankingSnapshotJobTrigger.FALLBACK + ) + ) + + val count = adapter.countByRankingTypeAndPeriodAndTrigger( + rankingType = AudioRankingType.WEEKLY_POPULAR, + aggregationStartAtUtc = startAt, + aggregationEndAtUtc = endAt, + trigger = AudioRankingSnapshotJobTrigger.FALLBACK + ) + + assertEquals(2L, count) + } + + private fun jobRecord( + rankingType: AudioRankingType = AudioRankingType.WEEKLY_POPULAR, + startAt: LocalDateTime, + endAt: LocalDateTime, + visibleAt: LocalDateTime = LocalDateTime.of(2026, 6, 8, 0, 0), + trigger: AudioRankingSnapshotJobTrigger = AudioRankingSnapshotJobTrigger.SCHEDULED, + status: AudioRankingSnapshotJobStatus = AudioRankingSnapshotJobStatus.PENDING + ): AudioRankingSnapshotJobRecord { + return AudioRankingSnapshotJobRecord( + rankingType = rankingType, + aggregationStartAtUtc = startAt, + aggregationEndAtUtc = endAt, + visibleFromAtUtc = visibleAt, + trigger = trigger, + status = status, + lastError = null, + processingStartedAt = null, + processedAt = null + ) + } +}