Language/Java & Kotlin

[Kotlin] 코루틴(Coroutines)에 대해서 알아보자

Joonfluence 2024. 6. 14. 00:26

[Language/Java] - [Kotlin] 자바와 코틀린의 공통점과 차이점

블로그엔 자주 안올렸지만 요새 코틀린 공부를 열심히 하고 있습니다. 지난번에는 자바와 코틀린의 공통점과 차이점에 대해서 알아보았습니다. 기본 문법은 자바스크립트와 문법적으로 유사한 부분이 있어, 금방 이해되는 부분이 많았지만 코루틴부터는 제대로 공부하지 않으면 안될 것 같아, 블로그에 열심히 정리하도록 하겠습니다.

공식 라이브러리

  • Kotlin Coroutines
  • Kotlin Serialization
  • Kotlin Lincheck

먼저, 코루틴을 사용하려면 별도의 라이브러리를 설치해야 합니다. (아래 버전은 예시입니다)

implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")

 

 

코루틴은 어떻게 사용할 수 있을까요? 

Kotlin Coroutines

  • 동시성을 지원하는 라이브러리
  • 비동기 논블로킹(non blocking)으로 동작하는 코드를 동기 방식으로 작성할 수 있게 지원
    • 결과 자체는 동기적으로 동작하지 않음
  • 여기서 복습. 동기, 비동기, blocking, non-blocking이란?
    • 동기 : 순차적인 흐름으로 작업을 실행됨. 동기 방식은 직관적이고 단순하지만, 긴 작업이 시스템을 블록하여 다른 작업들이 지연됩니다.
    • 비동기 : 작업이 동시 또는 병렬로 실행됨. 하나의 작업이 완료되기를 기다리지 않고 다른 작업을 실행함. 특히 I/O 작업이나 네트워크 요청과 같이 시간이 오래 걸리는 작업에서 유용합니다.
    • 블로킹 : 블로킹 방식에선 작업이 완료될 때까지 호출한 쪽이 기다려야 . 주로 동기 방식이 블로킹 상황입니다.
    • 논블로킹 : 논블로킹 방식에선 호출된 함수나 메서드가 완료되지 않았더라도 즉시 제어권을 반환. 주로 비동기 방식이 논블로킹 상황입니다.
  • CoroutineContext를 통해서 dispatcher, error handling, threadLocal 등을 지원합니다.
    • CoroutineContext란 Kotlin 코루틴에서 코루틴의 실행 환경을 정의하는 요소들의 집합으로, 코루틴이 어떻게 실행되고, 어떤 스레드에서 실행되며, 취소와 같은 작업을 어떻게 처리할지를 결정하는 여러 요소를 포함합니다.
  • CoroutineScope를 통해서 structured concurrency와 cancellation 제공합니다.
    • CoroutineScope는 하나 이상의 코루틴을 관리하는 역할을 합니다. CoroutineScope는 코루틴의 생명 주기를 관리하며, 모든 코루틴은 어떤 스코프(scope) 내에서 실행됩니다.
  • flow, channel 등의 심화 기능 제공합니다.
    • Flow는 데이터 스트림을 다루기 위한 더 고수준의 추상화를 제공하며, 주로 단방향 데이터 흐름을 처리합니다. Flow는 주로 데이터의 순차적 방출을 다루는 데 적합합니다.
    • Channel은 코루틴 간의 통신을 위해 더 저수준의 도구를 제공하며, 양방향 데이터 전송이 필요할 때 주로 사용됩니다. Channel은 생산자-소비자 패턴을 구현하는 데 적합합니다.

동기 스타일 지원

  • repository는 각각의 반환값을 CompletableFuture와 Mono로 반환합니다.
  object ArticleFutureRepository {
      fun findArticleById(id: Long): CompletableFuture<Article> {
          return CompletableFuture.completedFuture(Article(id, "article $id"))
      }
  }
  object ArticleFutureRepository {
      fun findArticleById(id: Long): CompletableFuture<Article> {
          return CompletableFuture.completedFuture(Article(id, "article $id"))
      }
  }
  • 기존 비동기로 구현을 한다면 thenApply나 map, flatMap 등을 통해서 chaining하고 subscribe 할 수 있습니다.
  fun main() {
      val personRepository = PersonReactorRepository
      val articleRepository = ArticleFutureRepository

      personRepository.findPersonByName("taewoo")
          .flatMap { person ->
              val future = articleRepository.findArticleById(person.id)

              Mono.fromFuture(future)
                  .map { article -> person to article }
          }.subscribe { (person, article) ->
              log.info("person: {}, article: {}", person, article)
          }
  }
  • 코틀린에서는 runBlocking과 suspend 함수 (awaitSingle, await)를 통해서 동기 스타일로 변경할 수 있습니다. non-blocking하게 동작하지만 blocking처럼 사용 가능한 코드 작성 가능합니다.
  suspend fun main() = runBlocking {
      val personRepository = PersonReactorRepository
      val articleRepository = ArticleFutureRepository

      val person = personRepository.findPersonByName("taewoo")
          .awaitSingle()

      val article = articleRepository.findArticleById(person.id)
          .await()

      log.info("person: {}, article: {}", person, article)
  }
  • 코틀린에서는 suspend 함수는 코루틴 내에서 호출될 수 있는 함수로, 비동기 작업을 동기식 코드처럼 작성할 수 있게 합니다.

CoroutineContext

  • 주요 구성 요소
    • Dispatcher
      • 코루틴이 어느 스레드나 스레드 풀에서 실행될지를 결정합니다. 예를 들어, Dispatchers.IO는 I/O 작업에 최적화된 스레드 풀에서 실행되며, Dispatchers.Main은 주로 UI 작업을 위해 메인 스레드에서 실행됩니다.
    • Job
      • 코루틴의 상태(예: 활성 상태, 취소 상태, 완료 상태)를 관리합니다. Job은 부모-자식 관계를 형성하여 구조적 동시성을 지원합니다.
    • CoroutineName
      • 디버깅이나 로깅을 위한 코루틴의 이름을 지정합니다.
    • CoroutineExceptionHandler
      • 코루틴 내에서 발생하는 예외를 처리하기 위한 핸들러를 정의합니다.
  • CoroutineContext를 통해서 coroutine에 필요한 정보들을 제공할 수 있습니다.
  val threadLocal = ThreadLocal<String>()
      threadLocal.set("hello")
      log.info("thread: {}", Thread.currentThread().name)
      log.info("threadLocal: {}", threadLocal.get())

      runBlocking {
          val context = CoroutineName("custom name") +
                  Dispatchers.IO +
                  threadLocal.asContextElement()

          launch(context) {
              log.info("thread: {}", Thread.currentThread().name)
              log.info("threadLocal: {}", threadLocal.get())
              log.info("coroutine name: {}",
                  coroutineContext[CoroutineName])
          }
      }

CoroutineScope

  • 기존 CompletableFuture를 사용한 비동기 함수의 동작는 차이가 있습니다.
    • Start main -> step 1 -> step 2 -> step 3 -> Finish main -> Finish run2 -> Finish run1의 순서로 실행됩니다.
private fun nonStructured() {
    log.info("step 1")
    CompletableFuture.runAsync {
        Thread.sleep(1000)
        log.info("Finish run1")
    }
    log.info("step 2")
    CompletableFuture.runAsync {
        Thread.sleep(100)
        log.info("Finish run2")
    }
    log.info("step 3")
}
  • 반면 코루틴에서 coroutineScope 함수를 사용하면 별도의 CoroutineScope를 생성하여, CoroutineScope 내에서는 자식 coroutine이 모두 완료되고 coroutine이 끝남을 보장합니다.
    • Start main -> step 1 -> step 2 -> step 3 -> Finish run2 -> Finish run1 -> Finish main 의 순서로 실행됩니다.
private suspend fun structured() = coroutineScope {
    log.info("step 1")
    launch {
        delay(1000)
        log.info("Finish launch1")
    }
    log.info("step 2")
    launch {
        delay(100)
        log.info("Finish launch2")
    }
    log.info("step 3")
}

fun main() = runBlocking {
    log.info("Start main")
    structured()
    log.info("Finish main")
}
  • 이번에는 cancle이라는 기능에 대해 알아보겠습니다. 코루틴에서는 취소 이벤트가 발생할 경우에 대한 처리도 해줄 수 있습니다. 아래 예시처럼 말이죠.
    • 실행 순서는 다음과 같습니다. Start runblocking -> Job1 is canceled -> Job2 is canceled -> Job is canceled -> Finish runblocking
private val log = kLogger()
private suspend fun structured() = coroutineScope {
    launch {
        try {
            delay(1000)
            log.info("Finish launch1")
        } catch (e: CancellationException) {
            log.info("Job1 is cancelled")
        }
    }

    launch {
        try {
            delay(500)
            log.info("Finish launch2")
        } catch (e: CancellationException) {
            log.info("Job2 is cancelled")
        }
    }

    delay(100)
    this.cancel()
}

fun main() = runBlocking {
    log.info("Start runBlocking")
    try {
        structured()
    } catch (e: CancellationException) {
        log.info("Job is cancelled")
    }
    log.info("Finish runBlocking")

Flow

  • flux와 마찬가지로 다양한 연산을 제공합니다.
    • map, flatMap, take, drop, transform 등의 중간 연산자를 사용합니다.
    • collect, toList, toSet, reduce, fold, first, single 등의 종료 연산자를 사용합니다.
      • 아래 연산의 실행 순서는 다음과 같습니다.
        • Start runBlocking -> numbers: [0, 2, 4] -> Finish runBlocking
private val log = kLogger()
private fun range(n: Int): Flow<Int> {
    return flow {
        for (i in 0 until n) {
            delay(100)
            emit(i)
        }
    }
}

fun main() = runBlocking {
    log.info("Start runBlocking")
    range(5).collect {
        log.info("item: {}", it)
    }
    log.info("Finish runBlocking")

}

Channel

  • channel을 생성하여 send와 receive 가능합니다.
  • channel은 여러 coroutine, thread에서 동시에 실행되도 안전합니다.
  • for를 통해서 clos되기 전까지 지속적으로 receive를 수행합니다.
    • 실행 순서는 아래와 같습니다.
      • Start runBlocking -> item: 0 -> item: 1 -> item: 2 -> item: 3 -> item: 4 -> Finish runBlocking
private val log = kLogger()
fun main() = runBlocking {
    log.info("Start runBlocking")
    val channel = Channel<Int>(1, BufferOverflow.DROP_LATEST)
    launch {
        delay(100)

        for (i in 0 until 5) {
            channel.send(i)
        }
        channel.close()
    }

    delay(500)

    for (i in channel) {
        log.info("item: {}", i)
    }
    log.info("Finish runBlocking")
}
  • capacity와 BufferOverflow 인자를 전달하여 channel의 크기를 제한할 수 있습니다.
  • BufferOverflow에 따라서 send를 지연하거나 item을 drop 할 수 있습니다.
private val log = kLogger()
fun main() = runBlocking {
    log.info("Start runBlocking")
    val channel = Channel<Int>(1, BufferOverflow.DROP_LATEST)
    launch {
        delay(100)

        for (i in 0 until 5) {
            channel.send(i)
        }
        channel.close()
    }

    delay(500)

    for (i in channel) {
        log.info("item: {}", i)
    }
    log.info("Finish runBlocking")
}
반응형