feat(i18n): 번역 작업을 그룹 단위로 처리한다
This commit is contained in:
@@ -33,22 +33,26 @@ interface TranslationJobRepository : JpaRepository<TranslationJob, Long> {
|
||||
@Param("sourceHash") sourceHash: String
|
||||
): TranslationJob?
|
||||
|
||||
fun findFirstByStatusAndNextRetryAtLessThanEqualOrderByCreatedAtAsc(
|
||||
status: TranslationJobStatus,
|
||||
nextRetryAt: LocalDateTime
|
||||
): TranslationJob?
|
||||
|
||||
@Query(
|
||||
value = """
|
||||
select id
|
||||
from translation_job
|
||||
where status = 'PENDING'
|
||||
and next_retry_at <= :now
|
||||
order by created_at asc
|
||||
limit 1
|
||||
select j.id
|
||||
from translation_job j
|
||||
join (
|
||||
select resource_type, resource_id, target_language
|
||||
from translation_job
|
||||
where status = 'PENDING'
|
||||
and next_retry_at <= :now
|
||||
order by created_at asc
|
||||
limit 1
|
||||
) g on j.resource_type = g.resource_type
|
||||
and j.resource_id = g.resource_id
|
||||
and j.target_language = g.target_language
|
||||
where j.status = 'PENDING'
|
||||
and j.next_retry_at <= :now
|
||||
order by j.created_at asc
|
||||
for update skip locked
|
||||
""",
|
||||
nativeQuery = true
|
||||
)
|
||||
fun findNextPendingJobIdForUpdate(@Param("now") now: LocalDateTime): Long?
|
||||
fun findNextPendingGroupJobIdsForUpdate(@Param("now") now: LocalDateTime): List<Long>
|
||||
}
|
||||
|
||||
@@ -20,33 +20,52 @@ class TranslationJobWorker(
|
||||
|
||||
@Scheduled(fixedDelayString = "\${sodalive.translation-job.fixed-delay-ms:600000}")
|
||||
fun runPendingJobs() {
|
||||
repeat(MAX_JOBS_PER_TICK) {
|
||||
if (!processNextJob()) return
|
||||
repeat(MAX_GROUPS_PER_TICK) {
|
||||
if (!processNextGroup()) return
|
||||
}
|
||||
}
|
||||
|
||||
fun processNextJob(): Boolean {
|
||||
val job = claimNextJob() ?: return false
|
||||
fun processNextGroup(): Boolean {
|
||||
val jobs = claimNextGroup()
|
||||
if (jobs.isEmpty()) return false
|
||||
|
||||
val firstJob = jobs.first()
|
||||
val succeededJobs = mutableListOf<TranslationJob>()
|
||||
val failedJobs = mutableListOf<Pair<TranslationJob, Exception>>()
|
||||
jobs.forEach { job ->
|
||||
try {
|
||||
ensureMemory(job)
|
||||
succeededJobs.add(job)
|
||||
} catch (ex: Exception) {
|
||||
failedJobs.add(job to ex)
|
||||
}
|
||||
}
|
||||
|
||||
if (failedJobs.isNotEmpty()) {
|
||||
succeededJobs.forEach { completeJob(it.id!!) }
|
||||
failedJobs.forEach { (job, ex) -> failJob(job.id!!, ex) }
|
||||
return true
|
||||
}
|
||||
|
||||
try {
|
||||
ensureMemory(job)
|
||||
materializer.materialize(job.resourceType, job.resourceId, job.targetLanguage)
|
||||
completeJob(job.id!!)
|
||||
materializer.materialize(firstJob.resourceType, firstJob.resourceId, firstJob.targetLanguage)
|
||||
succeededJobs.forEach { completeJob(it.id!!) }
|
||||
} catch (ex: Exception) {
|
||||
failJob(job.id!!, ex)
|
||||
succeededJobs.forEach { failJob(it.id!!, ex) }
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
private fun claimNextJob(): TranslationJob? {
|
||||
private fun claimNextGroup(): List<TranslationJob> {
|
||||
return transactionTemplate.execute {
|
||||
val jobId = translationJobRepository.findNextPendingJobIdForUpdate(LocalDateTime.now())
|
||||
?: return@execute null
|
||||
val job = translationJobRepository.findById(jobId).orElse(null)
|
||||
?: return@execute null
|
||||
job.status = TranslationJobStatus.RUNNING
|
||||
translationJobRepository.save(job)
|
||||
job
|
||||
}
|
||||
val jobIds = translationJobRepository.findNextPendingGroupJobIdsForUpdate(LocalDateTime.now())
|
||||
jobIds.mapNotNull { jobId ->
|
||||
val job = translationJobRepository.findById(jobId).orElse(null) ?: return@mapNotNull null
|
||||
job.status = TranslationJobStatus.RUNNING
|
||||
translationJobRepository.save(job)
|
||||
job
|
||||
}
|
||||
}.orEmpty()
|
||||
}
|
||||
|
||||
private fun ensureMemory(job: TranslationJob) {
|
||||
@@ -129,7 +148,7 @@ class TranslationJobWorker(
|
||||
}
|
||||
|
||||
companion object {
|
||||
private const val MAX_JOBS_PER_TICK = 20
|
||||
private const val MAX_GROUPS_PER_TICK = 5
|
||||
private const val MAX_ERROR_LENGTH = 1000
|
||||
private const val MAX_RETRY_COUNT = 3
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package kr.co.vividnext.sodalive.i18n.translation
|
||||
import org.junit.jupiter.api.Assertions.assertEquals
|
||||
import org.junit.jupiter.api.Assertions.assertTrue
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.mockito.ArgumentCaptor
|
||||
import org.mockito.Mockito
|
||||
import org.springframework.scheduling.annotation.Scheduled
|
||||
import org.springframework.transaction.support.AbstractPlatformTransactionManager
|
||||
@@ -21,7 +22,7 @@ class TranslationJobWorkerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
fun shouldClaimPendingJobByLockedRepositoryMethod() {
|
||||
fun shouldClaimPendingJobGroupByLockedRepositoryMethod() {
|
||||
val jobRepository = Mockito.mock(TranslationJobRepository::class.java)
|
||||
val memoryRepository = Mockito.mock(TranslationMemoryRepository::class.java)
|
||||
val provider = successfulProvider()
|
||||
@@ -36,17 +37,111 @@ class TranslationJobWorkerTest {
|
||||
val job = translationJob()
|
||||
job.id = 100L
|
||||
|
||||
Mockito.`when`(jobRepository.findNextPendingJobIdForUpdate(anyLocalDateTime())).thenReturn(100L)
|
||||
Mockito.`when`(jobRepository.findNextPendingGroupJobIdsForUpdate(anyLocalDateTime())).thenReturn(listOf(100L))
|
||||
Mockito.`when`(jobRepository.findById(100L)).thenReturn(Optional.of(job))
|
||||
|
||||
worker.processNextJob()
|
||||
worker.processNextGroup()
|
||||
|
||||
Mockito.verify(jobRepository).findNextPendingJobIdForUpdate(anyLocalDateTime())
|
||||
Mockito.verify(jobRepository, Mockito.never())
|
||||
.findFirstByStatusAndNextRetryAtLessThanEqualOrderByCreatedAtAsc(
|
||||
anyTranslationJobStatus(),
|
||||
anyLocalDateTime()
|
||||
)
|
||||
Mockito.verify(jobRepository).findNextPendingGroupJobIdsForUpdate(anyLocalDateTime())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun shouldProcessAllJobsInClaimedGroupBeforeMaterializing() {
|
||||
val jobRepository = Mockito.mock(TranslationJobRepository::class.java)
|
||||
val memoryRepository = Mockito.mock(TranslationMemoryRepository::class.java)
|
||||
val provider = successfulProvider()
|
||||
val materializer = Mockito.mock(TranslationReadModelMaterializer::class.java)
|
||||
val worker = TranslationJobWorker(
|
||||
translationJobRepository = jobRepository,
|
||||
translationMemoryRepository = memoryRepository,
|
||||
translationProvider = provider,
|
||||
materializer = materializer,
|
||||
transactionManager = TestTransactionManager()
|
||||
)
|
||||
val titleJob = translationJob(fieldKey = "title", sourceText = "제목")
|
||||
titleJob.id = 100L
|
||||
val detailJob = translationJob(fieldKey = "detail", sourceText = "설명")
|
||||
detailJob.id = 101L
|
||||
|
||||
Mockito.`when`(jobRepository.findNextPendingGroupJobIdsForUpdate(anyLocalDateTime()))
|
||||
.thenReturn(listOf(100L, 101L))
|
||||
Mockito.`when`(jobRepository.findById(100L)).thenReturn(Optional.of(titleJob))
|
||||
Mockito.`when`(jobRepository.findById(101L)).thenReturn(Optional.of(detailJob))
|
||||
|
||||
worker.processNextGroup()
|
||||
|
||||
assertEquals(TranslationJobStatus.COMPLETED, titleJob.status)
|
||||
assertEquals(TranslationJobStatus.COMPLETED, detailJob.status)
|
||||
Mockito.verify(materializer, Mockito.times(1)).materialize(LanguageTranslationTargetType.CONTENT, 10L, "en")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun shouldRetryGroupWhenMaterializationFails() {
|
||||
val jobRepository = Mockito.mock(TranslationJobRepository::class.java)
|
||||
val memoryRepository = Mockito.mock(TranslationMemoryRepository::class.java)
|
||||
val provider = successfulProvider()
|
||||
val materializer = Mockito.mock(TranslationReadModelMaterializer::class.java)
|
||||
val worker = TranslationJobWorker(
|
||||
translationJobRepository = jobRepository,
|
||||
translationMemoryRepository = memoryRepository,
|
||||
translationProvider = provider,
|
||||
materializer = materializer,
|
||||
transactionManager = TestTransactionManager()
|
||||
)
|
||||
val titleJob = translationJob(fieldKey = "title", sourceText = "제목")
|
||||
titleJob.id = 100L
|
||||
val detailJob = translationJob(fieldKey = "detail", sourceText = "설명")
|
||||
detailJob.id = 101L
|
||||
val beforeRetryAt = titleJob.nextRetryAt
|
||||
|
||||
Mockito.`when`(jobRepository.findNextPendingGroupJobIdsForUpdate(anyLocalDateTime()))
|
||||
.thenReturn(listOf(100L, 101L))
|
||||
Mockito.`when`(jobRepository.findById(100L)).thenReturn(Optional.of(titleJob))
|
||||
Mockito.`when`(jobRepository.findById(101L)).thenReturn(Optional.of(detailJob))
|
||||
Mockito.`when`(materializer.materialize(LanguageTranslationTargetType.CONTENT, 10L, "en"))
|
||||
.thenThrow(IllegalStateException("materialize down"))
|
||||
|
||||
worker.processNextGroup()
|
||||
|
||||
assertEquals(TranslationJobStatus.PENDING, titleJob.status)
|
||||
assertEquals(TranslationJobStatus.PENDING, detailJob.status)
|
||||
assertEquals(1, titleJob.retryCount)
|
||||
assertEquals(1, detailJob.retryCount)
|
||||
assertTrue(titleJob.nextRetryAt.isAfter(beforeRetryAt))
|
||||
assertTrue(detailJob.nextRetryAt.isAfter(beforeRetryAt))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun shouldLimitRunToFiveGroupsPerTick() {
|
||||
val jobRepository = Mockito.mock(TranslationJobRepository::class.java)
|
||||
val memoryRepository = Mockito.mock(TranslationMemoryRepository::class.java)
|
||||
val provider = successfulProvider()
|
||||
val materializer = Mockito.mock(TranslationReadModelMaterializer::class.java)
|
||||
val worker = TranslationJobWorker(
|
||||
translationJobRepository = jobRepository,
|
||||
translationMemoryRepository = memoryRepository,
|
||||
translationProvider = provider,
|
||||
materializer = materializer,
|
||||
transactionManager = TestTransactionManager()
|
||||
)
|
||||
val jobs = (1L..6L).map { id ->
|
||||
translationJob(resourceId = id).also { it.id = id }
|
||||
}
|
||||
Mockito.`when`(jobRepository.findNextPendingGroupJobIdsForUpdate(anyLocalDateTime()))
|
||||
.thenReturn(listOf(1L), listOf(2L), listOf(3L), listOf(4L), listOf(5L), listOf(6L))
|
||||
jobs.forEach { job ->
|
||||
Mockito.`when`(jobRepository.findById(job.id!!)).thenReturn(Optional.of(job))
|
||||
}
|
||||
|
||||
worker.runPendingJobs()
|
||||
|
||||
val savedJobCaptor = ArgumentCaptor.forClass(TranslationJob::class.java)
|
||||
Mockito.verify(jobRepository, Mockito.atLeastOnce()).save(savedJobCaptor.capture())
|
||||
val completedResourceIds = savedJobCaptor.allValues
|
||||
.filter { it.status == TranslationJobStatus.COMPLETED }
|
||||
.map { it.resourceId }
|
||||
.toSet()
|
||||
assertEquals(setOf(1L, 2L, 3L, 4L, 5L), completedResourceIds)
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -66,10 +161,10 @@ class TranslationJobWorkerTest {
|
||||
job.id = 200L
|
||||
val beforeRetryAt = job.nextRetryAt
|
||||
|
||||
Mockito.`when`(jobRepository.findNextPendingJobIdForUpdate(anyLocalDateTime())).thenReturn(200L)
|
||||
Mockito.`when`(jobRepository.findNextPendingGroupJobIdsForUpdate(anyLocalDateTime())).thenReturn(listOf(200L))
|
||||
Mockito.`when`(jobRepository.findById(200L)).thenReturn(Optional.of(job))
|
||||
|
||||
worker.processNextJob()
|
||||
worker.processNextGroup()
|
||||
|
||||
assertEquals(TranslationJobStatus.PENDING, job.status)
|
||||
assertEquals(1, job.retryCount)
|
||||
@@ -77,13 +172,17 @@ class TranslationJobWorkerTest {
|
||||
assertTrue(job.nextRetryAt.isAfter(beforeRetryAt))
|
||||
}
|
||||
|
||||
private fun translationJob(): TranslationJob {
|
||||
private fun translationJob(
|
||||
resourceId: Long = 10L,
|
||||
fieldKey: String = "title",
|
||||
sourceText: String = "제목"
|
||||
): TranslationJob {
|
||||
return TranslationJob(
|
||||
resourceType = LanguageTranslationTargetType.CONTENT,
|
||||
resourceId = 10L,
|
||||
fieldKey = "title",
|
||||
sourceHash = SourceTextNormalizer.hash("제목"),
|
||||
sourceText = "제목",
|
||||
resourceId = resourceId,
|
||||
fieldKey = fieldKey,
|
||||
sourceHash = SourceTextNormalizer.hash(sourceText),
|
||||
sourceText = sourceText,
|
||||
sourceLanguage = "ko",
|
||||
targetLanguage = "en"
|
||||
)
|
||||
@@ -114,10 +213,6 @@ class TranslationJobWorkerTest {
|
||||
private fun anyLocalDateTime(): LocalDateTime {
|
||||
return Mockito.any(LocalDateTime::class.java) ?: LocalDateTime.now()
|
||||
}
|
||||
|
||||
private fun anyTranslationJobStatus(): TranslationJobStatus {
|
||||
return Mockito.any(TranslationJobStatus::class.java) ?: TranslationJobStatus.PENDING
|
||||
}
|
||||
}
|
||||
|
||||
private class TestTransactionManager : AbstractPlatformTransactionManager() {
|
||||
|
||||
Reference in New Issue
Block a user