feat(ranking): 스냅샷 job 저장소를 추가한다
This commit is contained in:
@@ -0,0 +1,38 @@
|
|||||||
|
package kr.co.vividnext.sodalive.v2.ranking.adapter.out.persistence
|
||||||
|
|
||||||
|
import kr.co.vividnext.sodalive.common.BaseEntity
|
||||||
|
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingSnapshotJobStatus
|
||||||
|
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingSnapshotJobTrigger
|
||||||
|
import java.time.LocalDateTime
|
||||||
|
import javax.persistence.Column
|
||||||
|
import javax.persistence.Entity
|
||||||
|
import javax.persistence.EnumType
|
||||||
|
import javax.persistence.Enumerated
|
||||||
|
import javax.persistence.Table
|
||||||
|
|
||||||
|
@Entity
|
||||||
|
@Table(name = "creator_ranking_snapshot_job")
|
||||||
|
class CreatorRankingSnapshotJob(
|
||||||
|
@Column(name = "aggregation_start_at_utc", nullable = false)
|
||||||
|
val aggregationStartAtUtc: LocalDateTime,
|
||||||
|
|
||||||
|
@Column(name = "aggregation_end_at_utc", nullable = false)
|
||||||
|
val aggregationEndAtUtc: LocalDateTime,
|
||||||
|
|
||||||
|
@Enumerated(EnumType.STRING)
|
||||||
|
@Column(name = "trigger_type", nullable = false, length = 20)
|
||||||
|
val trigger: CreatorRankingSnapshotJobTrigger,
|
||||||
|
|
||||||
|
@Enumerated(EnumType.STRING)
|
||||||
|
@Column(name = "status", nullable = false, length = 20)
|
||||||
|
var status: CreatorRankingSnapshotJobStatus = CreatorRankingSnapshotJobStatus.PENDING,
|
||||||
|
|
||||||
|
@Column(name = "last_error", columnDefinition = "text")
|
||||||
|
var lastError: String? = null,
|
||||||
|
|
||||||
|
@Column(name = "processing_started_at")
|
||||||
|
var processingStartedAt: LocalDateTime? = null,
|
||||||
|
|
||||||
|
@Column(name = "processed_at")
|
||||||
|
var processedAt: LocalDateTime? = null
|
||||||
|
) : BaseEntity()
|
||||||
@@ -0,0 +1,21 @@
|
|||||||
|
package kr.co.vividnext.sodalive.v2.ranking.adapter.out.persistence
|
||||||
|
|
||||||
|
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingSnapshotJobStatus
|
||||||
|
import org.springframework.data.jpa.repository.JpaRepository
|
||||||
|
import org.springframework.data.jpa.repository.Lock
|
||||||
|
import org.springframework.data.jpa.repository.Query
|
||||||
|
import org.springframework.data.repository.query.Param
|
||||||
|
import java.time.LocalDateTime
|
||||||
|
import javax.persistence.LockModeType
|
||||||
|
|
||||||
|
interface CreatorRankingSnapshotJobRepository : JpaRepository<CreatorRankingSnapshotJob, Long> {
|
||||||
|
@Lock(LockModeType.PESSIMISTIC_WRITE)
|
||||||
|
@Query("select j from CreatorRankingSnapshotJob j where j.id = :jobId")
|
||||||
|
fun findByIdForUpdate(@Param("jobId") jobId: Long): CreatorRankingSnapshotJob?
|
||||||
|
|
||||||
|
fun findAllByAggregationStartAtUtcAndAggregationEndAtUtcAndStatusInOrderByCreatedAtDesc(
|
||||||
|
aggregationStartAtUtc: LocalDateTime,
|
||||||
|
aggregationEndAtUtc: LocalDateTime,
|
||||||
|
statuses: List<CreatorRankingSnapshotJobStatus>
|
||||||
|
): List<CreatorRankingSnapshotJob>
|
||||||
|
}
|
||||||
@@ -0,0 +1,90 @@
|
|||||||
|
package kr.co.vividnext.sodalive.v2.ranking.adapter.out.persistence
|
||||||
|
|
||||||
|
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingSnapshotJobPort
|
||||||
|
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingSnapshotJobRecord
|
||||||
|
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingSnapshotJobStatus
|
||||||
|
import org.springframework.stereotype.Repository
|
||||||
|
import org.springframework.transaction.annotation.Transactional
|
||||||
|
import java.time.LocalDateTime
|
||||||
|
|
||||||
|
@Repository
|
||||||
|
class DefaultCreatorRankingSnapshotJobRepository(
|
||||||
|
private val repository: CreatorRankingSnapshotJobRepository
|
||||||
|
) : CreatorRankingSnapshotJobPort {
|
||||||
|
@Transactional
|
||||||
|
override fun save(job: CreatorRankingSnapshotJobRecord): CreatorRankingSnapshotJobRecord {
|
||||||
|
return repository.save(job.toEntity()).toRecord()
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun findById(jobId: Long): CreatorRankingSnapshotJobRecord? {
|
||||||
|
return repository.findById(jobId).orElse(null)?.toRecord()
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun findByPeriodAndStatuses(
|
||||||
|
aggregationStartAtUtc: LocalDateTime,
|
||||||
|
aggregationEndAtUtc: LocalDateTime,
|
||||||
|
statuses: List<CreatorRankingSnapshotJobStatus>
|
||||||
|
): List<CreatorRankingSnapshotJobRecord> {
|
||||||
|
return repository.findAllByAggregationStartAtUtcAndAggregationEndAtUtcAndStatusInOrderByCreatedAtDesc(
|
||||||
|
aggregationStartAtUtc = aggregationStartAtUtc,
|
||||||
|
aggregationEndAtUtc = aggregationEndAtUtc,
|
||||||
|
statuses = statuses
|
||||||
|
).map { it.toRecord() }
|
||||||
|
}
|
||||||
|
|
||||||
|
@Transactional
|
||||||
|
override fun markProcessing(jobId: Long, processingStartedAt: LocalDateTime): CreatorRankingSnapshotJobRecord? {
|
||||||
|
val job = repository.findByIdForUpdate(jobId) ?: return null
|
||||||
|
job.status = CreatorRankingSnapshotJobStatus.PROCESSING
|
||||||
|
job.processingStartedAt = processingStartedAt
|
||||||
|
job.lastError = null
|
||||||
|
return job.toRecord()
|
||||||
|
}
|
||||||
|
|
||||||
|
@Transactional
|
||||||
|
override fun markDone(jobId: Long, processedAt: LocalDateTime): CreatorRankingSnapshotJobRecord? {
|
||||||
|
val job = repository.findByIdForUpdate(jobId) ?: return null
|
||||||
|
job.status = CreatorRankingSnapshotJobStatus.DONE
|
||||||
|
job.processedAt = processedAt
|
||||||
|
job.lastError = null
|
||||||
|
return job.toRecord()
|
||||||
|
}
|
||||||
|
|
||||||
|
@Transactional
|
||||||
|
override fun markFailed(jobId: Long, processedAt: LocalDateTime, lastError: String?): CreatorRankingSnapshotJobRecord? {
|
||||||
|
val job = repository.findByIdForUpdate(jobId) ?: return null
|
||||||
|
job.status = CreatorRankingSnapshotJobStatus.FAILED
|
||||||
|
job.processedAt = processedAt
|
||||||
|
job.lastError = lastError?.take(MAX_ERROR_LENGTH)
|
||||||
|
return job.toRecord()
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun CreatorRankingSnapshotJobRecord.toEntity(): CreatorRankingSnapshotJob {
|
||||||
|
return CreatorRankingSnapshotJob(
|
||||||
|
aggregationStartAtUtc = aggregationStartAtUtc,
|
||||||
|
aggregationEndAtUtc = aggregationEndAtUtc,
|
||||||
|
trigger = trigger,
|
||||||
|
status = status,
|
||||||
|
lastError = lastError,
|
||||||
|
processingStartedAt = processingStartedAt,
|
||||||
|
processedAt = processedAt
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun CreatorRankingSnapshotJob.toRecord(): CreatorRankingSnapshotJobRecord {
|
||||||
|
return CreatorRankingSnapshotJobRecord(
|
||||||
|
id = id,
|
||||||
|
aggregationStartAtUtc = aggregationStartAtUtc,
|
||||||
|
aggregationEndAtUtc = aggregationEndAtUtc,
|
||||||
|
trigger = trigger,
|
||||||
|
status = status,
|
||||||
|
lastError = lastError,
|
||||||
|
processingStartedAt = processingStartedAt,
|
||||||
|
processedAt = processedAt
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
private const val MAX_ERROR_LENGTH = 1000
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,44 @@
|
|||||||
|
package kr.co.vividnext.sodalive.v2.ranking.port.out
|
||||||
|
|
||||||
|
import java.time.LocalDateTime
|
||||||
|
|
||||||
|
interface CreatorRankingSnapshotJobPort {
|
||||||
|
fun save(job: CreatorRankingSnapshotJobRecord): CreatorRankingSnapshotJobRecord
|
||||||
|
|
||||||
|
fun findById(jobId: Long): CreatorRankingSnapshotJobRecord?
|
||||||
|
|
||||||
|
fun findByPeriodAndStatuses(
|
||||||
|
aggregationStartAtUtc: LocalDateTime,
|
||||||
|
aggregationEndAtUtc: LocalDateTime,
|
||||||
|
statuses: List<CreatorRankingSnapshotJobStatus>
|
||||||
|
): List<CreatorRankingSnapshotJobRecord>
|
||||||
|
|
||||||
|
fun markProcessing(jobId: Long, processingStartedAt: LocalDateTime): CreatorRankingSnapshotJobRecord?
|
||||||
|
|
||||||
|
fun markDone(jobId: Long, processedAt: LocalDateTime): CreatorRankingSnapshotJobRecord?
|
||||||
|
|
||||||
|
fun markFailed(jobId: Long, processedAt: LocalDateTime, lastError: String?): CreatorRankingSnapshotJobRecord?
|
||||||
|
}
|
||||||
|
|
||||||
|
enum class CreatorRankingSnapshotJobStatus {
|
||||||
|
PENDING,
|
||||||
|
PROCESSING,
|
||||||
|
DONE,
|
||||||
|
FAILED
|
||||||
|
}
|
||||||
|
|
||||||
|
enum class CreatorRankingSnapshotJobTrigger {
|
||||||
|
SCHEDULED,
|
||||||
|
MANUAL
|
||||||
|
}
|
||||||
|
|
||||||
|
data class CreatorRankingSnapshotJobRecord(
|
||||||
|
val id: Long? = null,
|
||||||
|
val aggregationStartAtUtc: LocalDateTime,
|
||||||
|
val aggregationEndAtUtc: LocalDateTime,
|
||||||
|
val trigger: CreatorRankingSnapshotJobTrigger,
|
||||||
|
val status: CreatorRankingSnapshotJobStatus,
|
||||||
|
val lastError: String?,
|
||||||
|
val processingStartedAt: LocalDateTime?,
|
||||||
|
val processedAt: LocalDateTime?
|
||||||
|
)
|
||||||
@@ -0,0 +1,83 @@
|
|||||||
|
package kr.co.vividnext.sodalive.v2.ranking.adapter.out.persistence
|
||||||
|
|
||||||
|
import kr.co.vividnext.sodalive.configs.QueryDslConfig
|
||||||
|
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingSnapshotJobRecord
|
||||||
|
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingSnapshotJobStatus
|
||||||
|
import kr.co.vividnext.sodalive.v2.ranking.port.out.CreatorRankingSnapshotJobTrigger
|
||||||
|
import org.junit.jupiter.api.Assertions.assertEquals
|
||||||
|
import org.junit.jupiter.api.DisplayName
|
||||||
|
import org.junit.jupiter.api.Test
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired
|
||||||
|
import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
|
||||||
|
import org.springframework.context.annotation.Import
|
||||||
|
import java.time.LocalDateTime
|
||||||
|
|
||||||
|
@DataJpaTest(
|
||||||
|
properties = [
|
||||||
|
"spring.cache.type=none",
|
||||||
|
"spring.datasource.url=jdbc:h2:mem:testdb;MODE=MySQL;NON_KEYWORDS=VALUE"
|
||||||
|
]
|
||||||
|
)
|
||||||
|
@Import(QueryDslConfig::class)
|
||||||
|
class DefaultCreatorRankingSnapshotJobRepositoryTest @Autowired constructor(
|
||||||
|
private val repository: CreatorRankingSnapshotJobRepository
|
||||||
|
) {
|
||||||
|
private val adapter = DefaultCreatorRankingSnapshotJobRepository(repository)
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("스냅샷 job은 기간, 트리거, 상태, 처리 시각, 실패 사유를 저장하고 조회한다")
|
||||||
|
fun shouldSaveAndFindSnapshotJobHistoryByPeriodAndStatus() {
|
||||||
|
val startAt = LocalDateTime.of(2026, 5, 31, 15, 0)
|
||||||
|
val endAt = LocalDateTime.of(2026, 6, 7, 15, 0)
|
||||||
|
|
||||||
|
val saved = adapter.save(
|
||||||
|
CreatorRankingSnapshotJobRecord(
|
||||||
|
aggregationStartAtUtc = startAt,
|
||||||
|
aggregationEndAtUtc = endAt,
|
||||||
|
trigger = CreatorRankingSnapshotJobTrigger.SCHEDULED,
|
||||||
|
status = CreatorRankingSnapshotJobStatus.PENDING,
|
||||||
|
lastError = null,
|
||||||
|
processingStartedAt = null,
|
||||||
|
processedAt = null
|
||||||
|
)
|
||||||
|
)
|
||||||
|
val savedId = saved.id!!
|
||||||
|
assertEquals(CreatorRankingSnapshotJobStatus.PENDING, adapter.findById(savedId)?.status)
|
||||||
|
|
||||||
|
val processingStartedAt = LocalDateTime.of(2026, 6, 8, 7, 30)
|
||||||
|
adapter.markProcessing(savedId, processingStartedAt)
|
||||||
|
val processingJob = adapter.findById(savedId)
|
||||||
|
assertEquals(CreatorRankingSnapshotJobStatus.PROCESSING, processingJob?.status)
|
||||||
|
assertEquals(processingStartedAt, processingJob?.processingStartedAt)
|
||||||
|
|
||||||
|
val processedAt = LocalDateTime.of(2026, 6, 8, 7, 31)
|
||||||
|
adapter.markDone(savedId, processedAt)
|
||||||
|
val failed = adapter.save(
|
||||||
|
CreatorRankingSnapshotJobRecord(
|
||||||
|
aggregationStartAtUtc = startAt.minusWeeks(1),
|
||||||
|
aggregationEndAtUtc = endAt.minusWeeks(1),
|
||||||
|
trigger = CreatorRankingSnapshotJobTrigger.SCHEDULED,
|
||||||
|
status = CreatorRankingSnapshotJobStatus.FAILED,
|
||||||
|
lastError = "aggregate failed",
|
||||||
|
processingStartedAt = LocalDateTime.of(2026, 6, 1, 7, 30),
|
||||||
|
processedAt = LocalDateTime.of(2026, 6, 1, 7, 31)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
val jobs = adapter.findByPeriodAndStatuses(
|
||||||
|
aggregationStartAtUtc = startAt,
|
||||||
|
aggregationEndAtUtc = endAt,
|
||||||
|
statuses = listOf(CreatorRankingSnapshotJobStatus.DONE)
|
||||||
|
)
|
||||||
|
val failedJob = adapter.findById(failed.id!!)
|
||||||
|
|
||||||
|
assertEquals(1, jobs.size)
|
||||||
|
assertEquals(CreatorRankingSnapshotJobTrigger.SCHEDULED, jobs.single().trigger)
|
||||||
|
assertEquals(CreatorRankingSnapshotJobStatus.DONE, jobs.single().status)
|
||||||
|
assertEquals(processingStartedAt, jobs.single().processingStartedAt)
|
||||||
|
assertEquals(processedAt, jobs.single().processedAt)
|
||||||
|
assertEquals(null, jobs.single().lastError)
|
||||||
|
assertEquals(CreatorRankingSnapshotJobStatus.FAILED, failedJob?.status)
|
||||||
|
assertEquals("aggregate failed", failedJob?.lastError)
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user