Language/Java & Kotlin

[Kotlin] 코루틴(Coroutines) 기초에 대해 알아보자 (1) suspend

Joonfluence 2024. 6. 15. 11:46

[Language/Java] - [Kotlin] 자바와 코틀린의 공통점과 차이점

[Language/Java] - [Kotlin] 코루틴(Coroutines)에 대해서 알아보자

이전 글을 통해 코루틴에 대해 알아보았습니다. 이번에는 코루틴 기초에 대해서 자세하게 알아보도록 하겠습니다.

suspend 함수와 일반 함수의 차이점

  • 먼저 suspend fun으로 선언되는 일시 중단 함수는 함수 내에 일시 중단 지점을 포함할 수 있는 특별한 함수입니다. 코루틴은 특정 시점에서 작업을 일시 중단하고, 다른 코루틴을 실행하도록 허용한 후 나중에 해당 작업을 재개할 수 있습니다. 일시 중단 함수는 코루틴에서 실행되는 일시 중단 지점이 포함된 코드들을 재사용할 수 있는 코드의 집합으로 만드는 역할을 합니다.
import kotlinx.coroutines.delay

class SuspendExample {
  fun nomarl() {
    println("Start")
    delay(1000)
    println("End")
  }
}
  • main 함수에서 delay 함수를 실행하려고 하면 아래와 같은 에러가 뜨는데요, 그 이유는 뭘까요?

  • 이 오류를 살펴보면, 일시 중단 함수인 delay는 일시 중단 함수에서만 호출될 수 있다는 의미입니다. 왜 이런 오류가 나는 것일까? 바로 delay 함수는 코루틴이 일정 시간 동안 스레드를 양보하도록 만드는 함수여서 일시 중단 함수로 선언되었기 때문입니다. 따라서 이 함수를 호출하는 곳도 일시 중단 함수여야 합니다.
Suspend function 'delay' should be called only from a coroutine or another suspend function
  • suspend 함수 안에서 실행해보도록 하겠습니다. 그리고 내부적으로 어떤 동작을 수행하는지 확인하기 위해, Bytecode를 Decompie하여 Java 코드로 변환해보도록 하겠습니다.
class SuspendExample {
  suspend fun greet() {
    delay(100)
    println("Hello, World!")
  }
}
  • 가장 먼저 greet에 Continuation이라는 인자가 추가된 걸 볼 수 있습니다.
public final class SuspendExample {
   @Nullable
   public final Object greet(@NotNull Continuation $completion) {
     ...  
   }
}
  • 코드를 살펴보면 아래와 같은 특성이 존재합니다.
    • undefinedtype이 정확히 알 순 없지만 추가되었습니다.
    • Continuation 인자가 undefinedtype을 구현했는지 체크합니다.
      • 맞다면 $continuation에 추가하고 break 합니다.
      • 아니라면 ContinuationImpl 추상 클래스를 상속하여 invokeSuspend 함수를 구현합니다.
    • ContinuationImpl에는 label과 result를 포함한합니다.
Object $continuation;
label20: {
   if ($completion instanceof <undefinedtype>) {
       $continuation = (<undefinedtype>)$completion;
       if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
           ((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
           break label20;
        }
    }

    $continuation = new ContinuationImpl($completion) {
        // $FF: synthetic field
        Object result;
        int label;

        @Nullable
        public final Object invokeSuspend(@NotNull Object $result) {
           this.result = $result;
           this.label |= Integer.MIN_VALUE;
           return SuspendExample.this.greet((Continuation)this); // 여기서 재귀 호출
        }
     };
  }      
  • switch 문을 통해 label에 따라서 코드를 수행합니다.
  • switch 문이 종료된 이후, System.out.println을 통해서 Hello, World! 출력한다.
  • Kotlin의 Unit을 반환한다.
Object $result = ((<undefinedtype>)$continuation).result;
Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (((<undefinedtype>)$continuation).label) {
    case 0:
        ResultKt.throwOnFailure($result);
        ((<undefinedtype>)$continuation).label = 1;
        if (DelayKt.delay(1000L, (Continuation)$continuation) == var5) {
            return var5;
         }
         break;
    case 1:
         ResultKt.throwOnFailure($result);
         break;
    default:
         throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
      }

      String var2 = "Hello, World!";
      System.out.println(var2);
      return Unit.INSTANCE;
  • 최종 코드는 아래와 같습니다.
public final class SuspendExample {
   @Nullable
   public final Object greet(@NotNull Continuation $completion) {
      Object $continuation;
      label20: {
         if ($completion instanceof <undefinedtype>) {
            $continuation = (<undefinedtype>)$completion;
            if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
               ((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
               break label20;
            }
         }

         $continuation = new ContinuationImpl($completion) {
            // $FF: synthetic field
            Object result;
            int label;

            @Nullable
            public final Object invokeSuspend(@NotNull Object $result) {
               this.result = $result;
               this.label |= Integer.MIN_VALUE;
               return SuspendExample.this.greet((Continuation)this); 
            }
         };
      }

      Object $result = ((<undefinedtype>)$continuation).result;
      Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
      switch (((<undefinedtype>)$continuation).label) {
         case 0:
            ResultKt.throwOnFailure($result);
            ((<undefinedtype>)$continuation).label = 1;
            if (DelayKt.delay(1000L, (Continuation)$continuation) == var5) {
               return var5;
            }
            break;
         case 1:
            ResultKt.throwOnFailure($result);
            break;
         default:
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
      }

      String var2 = "Hello, World!";
      System.out.println(var2);
      return Unit.INSTANCE;
   }
}
  • 위 샘플 코드에 대한 Koltin Complier의 변환 결과는 아래와 같습니다. 아직은 이해가 잘 안되도 괜찮습니다. 일단 그렇다는 사실만 알아둡시다.
    • Continuation 인자를 추가
    • Continuation 구현체를 생성
    • switch문을 추가
    • DelayKt.delay를 호출하며 continuation을 전달하고 종료
    • System.out.println을 수행
    • Unit 반환
  • suspend 함수의 동작을 이해하려면 FSMContinuation passing style에 대해 추가로 학습해봐야 합니다. 그 중 FSM에 대해서, 먼저 알아봅시다.

FSM (finite state machine)

FSM이란 컴퓨터 과학에서 널리 사용되는 개념으로, 다양한 시스템의 동작을 모델링하고 설계하는 데 유용합니다. FSM은 유한한 수의 상태와 그 상태들 간의 전이(transition)를 통해 시스템의 동작을 표현합니다.

기본 구성 요소

  • 상태 (State)
    • 시스템이 특정 시점에 있을 수 있는 각 상황을 나타냅니다.
    • FSM은 유한한 수의 상태를 가지며, 한 번에 오직 하나의 상태만을 가질 수 있습니다.
  • 초기 상태 (Initial State)
    • 시스템이 시작할 때의 상태입니다.
    • FSM은 항상 하나의 초기 상태를 가집니다.
  • 종료 상태 (Final State)
    • 시스템이 종료될 수 있는 상태입니다.
    • 종료 상태는 여러 개일 수 있으며, 반드시 필요한 것은 아닙니다.
  • 전이 (Transition)
    • 하나의 상태에서 다른 상태로의 이동을 나타냅니다.
    • 전이는 특정 이벤트나 조건에 의해 발생합니다.
  • 이벤트 (Event) 또는 입력 (Input)
    • 상태 전이를 유발하는 외부 요인입니다.

구현

  • FSM을 구현하면 아래와 같습니다.
    • label을 이용해서 when 문을 수행합니다.
    • 각각의 case에서 작업을 수행하고 label을 변경합니다.
    • 재귀함수를 호출하며 nextLabel을 전달하여 상태를 변경합니다.
      • 실행순서는 다음과 같습니다. Initial -> State1 -> State2 -> End
class FsmExample {
    fun execute(label: Int = 0) {
        var nextLabel: Int? = null

        when (label) {
            0 -> {
                log.info("Initial")
                nextLabel = 1
            }
            1 -> {
                log.info("State1")
                nextLabel = 2
            }
            2 -> {
                log.info("State2")
                nextLabel = 3
            }
            3 -> {
                log.info("End")
            }
        }
        // transition
        if (nextLabel != null) {
            this.execute(nextLabel)
        }
    }
}

fun main() {
    val fsmExample = FsmExample()
    fsmExample.execute()
}

이번에는 FSM과 일반적인 방식으로 구현한 코드를 비교해보겠습니다.

object NormalCalculator {
    fun calculate(initialValue: Int) {
        var result = initialValue
        result += 1
        result *= 2
        log.info("Result: {}", result)
    }
}

fun main() {
    NormalCalculator.calculate(5)
}

12라는 결과가 출력됩니다. FSM 방식으로 구현한 코드도 마찬가지입니다.

object FsmCalculator {
    data class Shared(
        var result: Int = 0,
        var label: Int = 0,
    )

    fun calculate(initialValue: Int,
                  shared: Shared? = null) {
        val current = shared ?: Shared()

        val simpleCont = Continuation<Int>(EmptyCoroutineContext) {
            this.calculate(initialValue, current)
        }

        when (current.label) {
            0 -> {
                current.label = 1
                initialize(initialValue, simpleCont)
                current.result = initialValue
            }
            1 -> {
                current.result += 1
                current.label = 2
            }
            2 -> {
                current.result *= 2
                current.label = 3
            }
            3 -> {
                log.info("Result: {}", current.result)
                return
            }
        }
    }

    private fun initialize(value: Int, cont: Continuation<Int>) {
        log.info("Initial")
        cont.resume(value)
    }

    private fun addOne(value: Int, cont: Continuation<Int>) {
        log.info("Add one")
        cont.resume(value + 1)
    }

    private fun multiplyTwo(value: Int, cont: Continuation<Int>) {
        log.info("Multiply two")
        cont.resume(value * 2)
    }
}

fun main() {
    FsmCalculator.calculate(5)
}

코틀린의 suspend 함수는 FSM의 상태 기반 로직에 따라, 비동기 프로그래밍 로직을 구현합니다.

  • 상태 관리
    • 코루틴의 실행 상태는 FSM의 상태와 유사하게 관리될 수 있습니다. 코루틴은 일시 중단(suspend)되고 재개(resume)되는 다양한 상태를 가질 수 있습니다. 이는 FSM에서 상태 전이가 발생하는 방식과 비슷합니다.
  • 이벤트 처리
    • FSM은 이벤트에 기반한 상태 전이를 정의하는데, 코루틴도 비슷하게 비동기 이벤트(예: 데이터 수신, 타이머 완료 등)를 처리합니다. suspend 함수는 이러한 이벤트가 발생할 때 코루틴의 상태를 변경할 수 있습니다.
  • 상태 전이와 일시 중단
    • FSM에서 상태 전이는 특정 조건이나 이벤트에 의해 발생합니다. 마찬가지로 코루틴에서 suspend 함수는 비동기 작업이 완료되거나 특정 조건이 충족될 때 재개됩니다. 이는 상태 전이와 유사한 개념입니다.
  • 복잡한 흐름 제어
    • 복잡한 비동기 흐름을 관리하기 위해 FSM을 사용할 수 있으며, 코루틴의 suspend 함수도 복잡한 비동기 로직을 간결하게 표현하는 데 도움을 줍니다. 두 개념을 결합하면 복잡한 비동기 상태 관리를 명확하게 할 수 있습니다.

Continuation passing style

  • Continuation은 Kotlin의 코루틴에서 사용되는 개념으로, suspend 함수의 중단된 지점을 재개할 수 있는 객체입니다. 이 Continuation이 코루틴에서 비동기 작업을 순차적으로 작성할 수 있게 해줍니다. 또한 코루틴 내부에서 자동으로 관리되며, 일반적으로 개발자가 직접 다루지 않습니다. 코루틴 빌더 (launch, async, runBlocking 등)를 통해 코루틴을 시작하면, 컴파일러가 내부적으로 Continuation을 생성하고 관리합니다.
  • Continuation passing style이란 Caller가 Callee를 호출하는 상황에서 Callee는 값을 계산하여 continuation을 실행하고 인자로 값을 전달하며, continuation이 callee를 가장 마지막에서 딱 한 번 실행하는 것을 말합니다.
object CpsCalculator {
    fun calculate(initialValue: Int, continuation: (Int) -> Unit) {
        initialize(initialValue) { initial ->
            addOne(initial) { added ->
                multiplyTwo(added) { multiplied ->
                    continuation(multiplied)
                }
            }
        }
    }

    private fun initialize(value: Int, continuation: (Int) -> Unit) {
        log.info("Initial")
        continuation(value)
    }

    private fun addOne(value: Int, continuation: (Int) -> Unit) {
        log.info("Add one")
        continuation(value + 1)
    }

    private fun multiplyTwo(value: Int, continuation: (Int) -> Unit) {
        log.info("Multiply two")
        continuation(value * 2)
    }
}

fun main() {
    CpsCalculator.calculate(5) { result ->
        log.info("Result: {}", result)
    }
}
  • 위 예제를 보면, continuation 자체는 마지막에 한 번 호출되고 함께 전달되는 인자 값을 계속 연산하는 것을 볼 수 있습니다.

콜백 함수와의 차이점

  • 콜백함수
    • 프로그래밍에서 특정 이벤트가 발생하거나 특정 작업이 완료될 때 호출되는 함수입니다.
    • 다른 함수에 인수로 전달되며, 그 함수가 특정 조건이 만족될 때 이를 호출합니다.
    • 콜백은 비동기 프로그래밍, 이벤트 처리, UI 이벤트, 그리고 다양한 비동기 작업에서 중요한 역할을 합니다.
  • Continuation
    • 콜백과 다르게, 마지막에서 딱 한번 호출된다는 특징이 있습니다.
    • 주로 모든 결과를 계산하고 다음으로 넘어가는 상황에서 호출합니다.

Continuation 인터페이스와 구현

  • Kotlin coroutines에서는 아래와 같은 Continuation 인터페이스를 제공합니다.
    • resumeWith를 구현하여, 외부에서 해당 continuation을 실행할 수 있는 endpoint를 제공합니다.
    • 또한 CoroutineContext를 포함합니다.
package kotlin.coroutines

public interface Continuation<in T> {
    public val context: CoroutineContext
    public fun resumeWith(result: Result<T>)
}
  • 이제 실제 구현 코드를 살펴봅시다. 아래 코드는 Continuation 인터페이스를 구현하는 익명 클래스를 생성하여 context와 resumeWith를 구현한 코드입니다.
    • context에는 EmptyCoroutineContext를 두고, resumeWith에서는 상태에 따라 다른 코드가 실행되도록 합니다.
    • 결과 뿐 아니라, 에러도 전달 가능합니다.
      • 실행 순서는 다음과 같습니다. Visited: now -> Result: Success(10) -> Result: Failure(java.lang.IllegalStateException)
fun main() {
    var visited = false
    val continuation = object: Continuation<Int> {
        override val context: CoroutineContext
            get() = EmptyCoroutineContext

        override fun resumeWith(result: Result<Int>) {
            if (visited) {
                log.info("Result: {}", result)
            } else {
                log.info("Visit now")
                visited = true
            }
        }
    }

    continuation.resumeWith(10)
    continuation.resumeWith(10)
    continuation.resumeWithException(
        IllegalStateException()
    )
}

FSM과 CPS의 적용

  • FSM과 CPS의 적용을 어떻게 수행하는지 알아보기 위해, 1차에선 직접 연산을 수행하는 부분을 CPS가 적용된 연산 함수로 대체하고 2차에선 calculate 함수에 CPS를 적용해보록 하겠습니다.
    • 아래 코드는 FSM 을 통해, 구현한 계산기입니다. current.label에 따라 수행되는 로직이 달라지고, 각 label에서 연산을 직접 수행하고 수행된 값을 current에 직접 반영합니다.
      • 수행하는 연산은 다음과 같습니다. 값 초기화, +1, *2.
object FsmCalculator {
    data class Shared(
        var result: Int = 0,
        var label: Int = 0,
    )

    fun calculate(initialValue: Int,
                  shared: Shared? = null) {
        val current = shared ?: Shared()

        val simpleCont = Continuation<Int>(EmptyCoroutineContext) {
            this.calculate(initialValue, current)
        }

        when (current.label) {
            0 -> {
                current.result = initialValue
                current.label = 1
            }
            1 -> {
                current.result += 1
                current.label = 2
            }
            2 -> {
                current.result *= 2
                current.label = 3
            }
            3 -> {
                log.info("Result: {}", current.result)
                return
            }
        }
        this.calculate(initialValue, current)
    }
}

fun main() {
    FsmCalculator.calculate(5)
}
  • 이번에는 각각의 연산에 CPS를 적용해보겠습니다.
    • 각각의 함수는 cont.resume가 무엇을 하는지 모르지만, 값을 계산하여 cont.resume으로 전달합니다.
private fun initialize(value: Int, cont: Continuation<Int>) {
    log.info("Initial")
    cont.resume(value)
}

private fun addOne(value: Int, cont: Continuation<Int>) {
    log.info("Add one")
    cont.resume(value + 1)
}

private fun multiplyTwo(value: Int, cont: Continuation<Int>) {
    log.info("Multiply two")
    cont.resume(value * 2)
}
  • 이번엔 직접 재귀함수를 호출하지 않고 Continuation의 resumeWith에서 재귀함수를 대신 호출해보겠습니다.
    • 결국, 각각의 연산자들은 cont.resume이 무엇인지 몰랐지만 연산을 수행하고 재귀함수를 호출하게 됩니다.
  • 마지막으로 resume을 통해서 건네받은 결과값을 Shared의 result에 저장합니다.
fun calculate(initialValue: Int, shared: Shared? = null) {
     val current = shared ?: Shared()

     val cont = object: Continuation<Int> {
         override val context: CoroutineContext
             get() = EmptyCoroutineContext

         override fun resumeWith(result: Result<Int>) {
             current.result = result.getOrThrow()
             this@FsmCalculatorUpgrade1.calculate(initialValue, current)
        }
   }

    when (current.label) {
        0 -> {
             current.label = 1
             initialize(initialValue, cont)
        }
        1 -> {
             val initialized = current.result as Int
             current.label = 2
             addOne(initialized, cont)
        }
        2 -> {
             val added = current.result as Int
             current.label = 3
             multiplyTwo(added, cont)
        }
        3 -> {
             val multiplied = current.result as Int
             log.info("Result: {}", multiplied)
             return
        }
    }
}
  • label에 따라서 다른 연산이 수행되며
    • 각각의 case에서 Shared 객체의 result 값(이전 연산 함수의 결과를 저장)을 꺼내서 변수에 저장하고 활용합니다.
    • label을 변경합니다.
    • 연산 함수를 실행하면서 continuation을 전달합니다.
  • 3번 케이스에 도달하면 Shared 객체의 result를 log로 출력합니다.
    • 실행 결과는 다음과 같습니다.
      • Initial -> Add one -> Multiply two -> Result: 12
fun calculate(initialValue: Int,
              shared: Shared? = null) {
    val current = shared ?: Shared()

    val cont = object: Continuation<Int> {
        override val context: CoroutineContext
            get() = EmptyCoroutineContext

        override fun resumeWith(result: Result<Int>) {
            current.result = result.getOrThrow()
            this@FsmCalculatorUpgrade1.calculate(initialValue, current)
        }
    }

    when (current.label) {
            0 -> {
                current.label = 1
                initialize(initialValue, cont)
            }
            1 -> {
                val initialized = current.result as Int
                current.label = 2
                addOne(initialized, cont)
            }
            2 -> {
                val added = current.result as Int
                current.label = 3
                multiplyTwo(added, cont)
            }
            3 -> {
                val multiplied = current.result as Int
                log.info("Result: {}", multiplied)
                return
            }
        }
    }
  • calculate에 CPS 적용
    • CPS를 통해서 모든 함수는 continuation을 인자로 전달받고 resume을 수행한다.
    • calculate 함수는 Continuation 인터페이스를 활용하여 main이 전달하는 completion과 스스로가 전달하는 CustomContinuation 모두 수용한다.
    • 마지막 state가 아니라면 CustomContinuation의 resume을 수행한다.
    • 마지막 state라면 main의 completion을 수행한다.
class FsmCalculatorUpgrade2 {
    private class CustomContinuation(
        val completion: Continuation<Int>,
        val that: FsmCalculatorUpgrade2,
    ) : Continuation<Int> {
        var result: Any? = null
        var label: Int = 0

        override val context: CoroutineContext
            get() = EmptyCoroutineContext

        override fun resumeWith(result: Result<Int>) {
            this.result = result.getOrThrow()
            that.calculate(0, this)
        }

        fun complete(value: Int) {
            completion.resume(value)
        }
    }

    fun calculate(
        initialValue: Int,
        continuation: Continuation<Int>,
    ) {
        val cont = if (continuation is CustomContinuation) {
            continuation
        } else {
            CustomContinuation(continuation, this)
        }

        when (cont.label) {
            0 -> {
                cont.label = 1
                initialize(initialValue, cont)
            }

            1 -> {
                val initialized = cont.result as Int
                cont.label = 2
                addOne(initialized, cont)
            }

            2 -> {
                val added = cont.result as Int
                cont.label = 3
                multiplyTwo(added, cont)
            }

            3 -> {
                val multiplied = cont.result as Int
                cont.complete(multiplied)
            }
        }
    }

    private fun initialize(value: Int, cont: Continuation<Int>) {
        log.info("Initial")
        cont.resume(value)
    }

    private fun addOne(value: Int, cont: Continuation<Int>) {
        log.info("Add one")
        cont.resume(value + 1)
    }

    private fun multiplyTwo(value: Int, cont: Continuation<Int>) {
        log.info("Multiply two")
        cont.resume(value * 2)
    }
}

fun main() {
    val completion = Continuation<Int>(EmptyCoroutineContext) {
        log.info("Result: {}", it)
    }
    FsmCalculatorUpgrade2().calculate(5, completion)
}

레퍼런스

반응형