Kotlin

Kotlin - coroutine★

J_Bin 2022. 6. 22. 17:28

# 코루틴의 기본적인 예제

package chap02.section1

import kotlinx.coroutines.*

/* 기초적인 코루틴 */

fun main() {        // 메인 스레드의 문맥


    GlobalScope.launch {                // 새로운 코루틴을 백그라운드에서 실행
        delay(1000L)           // 1초의 넌블로킹 지연(시간의 기본 단위는 ms)
        println("world!")               // 지연 후 출력
    }
    println("Hello,")                   // 메인 스레드의 코루틴이 지연되는 동안 계속 실행
    Thread.sleep(2000L)           // 메인 스레드가 JVM에서 바로 종료되지 않게 2초 기다림

}


/**
 *  실행 결과를 보면 "Hello,"는 메인 스레드에 의해 바로 출력됩니다.
 *  "world!"는 코루틴 코드의 부분으로 메인 스레드와 분리되어 백그라운드에서 1초 뒤 실행됩니다.
 *  따라서, 메인 스레드의 코드보다 지연되어 실행됩니다.
 *  또한 메인 스레드와 별도로 실행되므로 넌블로킹 코드이기도 합니다.
 *  코루틴에서 사용되는 함수는 suspend()로 선언된 지연 함수여야 코루틴 기능을 사용할 수 있습니다.
 *  suspend로 표기함으로서 이 함수는 실행이 일시 중단 될 수 있으며 필요한 경우 다시 재개할 수 있게 됩니다.
 *  suspend 함수는 사용자가 실행을 일시중단 할 수 있음을 의미하고 코루틴 블록 안에서 사용할 수 있습니다.
 *  지연함수(delay(),suspend()?)는 코루틴 빌더인 launch와 async에서 사용할 수 있지만 메인 스레드에서는 사용할 수 없습니다.
 *	지연 함수는 또 다른 지연 함수 내에서 사용하거나 코루틴 블록에서만 사용해야 합니다!!
 *  
 */

 

# launch 코루틴 빌더 생성하기

이렇게 launch를 통해 코루틴 블록을 만들어 내는 것을 코루틴 빌더의 생성이라고 한다.

lanuch는 현재 스레드를 차단하지 않고 새로운 코루틴을 실행할 수 있게 며 특정 결과값 없이 Job객체를 반환한다.

package chap02.section1

import kotlinx.coroutines.*

/* Job 객체의 반환 */

fun main() {


    val job = GlobalScope.launch {
        delay(1000L)
        println("World!")
    }
    println("Hello")
    println("job.isActive: ${job.isActive}, completed : ${job.isCompleted}")
    Thread.sleep(2000L)
    println("job.isActive: ${job.isActive}, completed : ${job.isCompleted}")


}

/**
 *  launch를 살펴보면 실행 범위를 결정하는 GlobalScope가 지정되어 있습니다. 이것은 코루틴의 생명 주기가 프로그램의 생명 주기에 의존되므로
 *  main()이 종료되면 같이 종료됩니다. 코루틴을 실행하기 위해서는 내부적으로 스레드를 통해서 실행될 수 있습니다. 단, 실행 루틴이 많지 않은 경우에는
 *  내부적으로 하나의 스레에서 여러 개의 코루틴을 실행할 수 있기 때문에 1개의 스레드면 충분합니다.
 */

# 코루틴의 순차적 실행

package chap02.section1

import kotlinx.coroutines.*

/* 코루틴의 순차적 실행 */

suspend fun doWork1() : String {
    delay(1000)
    return "Work1"
}

suspend fun doWork2() : String {
    delay(3000)
    return "Work2"
}

private fun workInSerial() {
    // 순차적 실행
    GlobalScope.launch {
        val one = doWork1()
        val two = doWork2()
        println("Kotlin One : $one")
        println("Kotlin Two : $two")
    }
}

// async 코루틴 빌더 생성하기
/**
 * async도 새로운 코루틴을 실행할 수 있는데 launch와 다른 점은 Deferred<T>를 통해 결과값을 반환한다는 것입니다.
 * 때 지연된 결과값을 받기 위해 await()를 사용할 수 있습니다.
 */
private fun worksInParallel() {
    // Deferred<T>를 통해 결과값 반환
    val one = GlobalScope.async {
        doWork1()
    }
    val two = GlobalScope.async {
        doWork2()
    }
    GlobalScope.launch {
        val combined = one.await() + "_" + two.await()
        println("Kotlin Combined : $combined")
    }
}

fun main() {

//    workInSerial()
//    readLine()          // main()이 먼저 종료되는 것을 방지하기 위해 콘솔에서 Enter키 입력 대기

    worksInParallel()
    /**
     *  doWork1()과 doWork2()는 async에 의해 감싸져 있으므로 완전히 병행 수행할 수 있습니다.
     *  여기서 delay()로 1초만 지연시킨 doWork1()이 먼저 종료됩니다.
     *  그러나 좀 더 복잡한 루틴을 작성하는 경우에는 많은 태스크들과 같이 병행 수행되므로 어떤 루틴이 먼저 종료될지 알기 어렵습니다.
     *  따라서 태스크가 종료되는 시점을 기다렸다가 결과를 받을 수 있도록 await()를 사용해 현재 스레드의 블로킹 없이 먼저 종료되면 결과를
     *  가져올 수 있습니다. 여기서는 combined라는 변수에 2개의 비동기 루틴이 종료되고 결과가 반환되면
     *  문자를 합쳐서 할당합니다.
     */




}

 

# 시작 시점 늦춰보기

package chap02.section1

import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis

/* 시작 시점 늦춰보기 */

suspend fun doWork1() : String {
    delay(1000)
    return "Work1"
}

suspend fun doWork2() : String {
    delay(3000)
    return "Work2"
}




fun main() = runBlocking{             // main() 함수가 코루틴 환경에서 실행

    val time = measureTimeMillis {
        val one = async ( start = CoroutineStart.LAZY) { doWork1() }
        val two = async ( start = CoroutineStart.LAZY) { doWork2() }
        println("AWAIT : ${one.await() + "_" + two.await()}")
    }
    println("Complete in $time ms")

}

/**
 *  DEFAULT : 즉시 시작
 *  LAZY : 코루틴을 느리게 시작(처음에는 중단된 상태이며 start()나 await() 등으로 시작됨)
 *  ATOMIC : 최적화된 방법으로 시작
 *  UNDISPATCHED : 분산 처리 방법으로 시작
 */

# 많은 작업의 처리

package chap02.section1

import kotlinx.coroutines.*

/* 많은 작업의 처리 */

fun main() = runBlocking{

    /**
     *  10만 개의 코루틴을 list로 생성하고 각 루틴으로 화면에 점(.)을 찍도록 작성
     *  이런 코드를 스레드로 바꾸면 Out-of-memory 오류가 발생합니다. 하지만 코루틴으로 작업하면 내부적으로 단 몇 개의 스레드로 수많은
     *  코루틴을 생성해 실행할 수 있기 때문에 오류가 발생하지 않습니다. 또 메모리나 실행 속도 면에서 큰 장점을 가집니다.
     */
    val jobs = List(100_100) {
        launch {
            delay(1000L)
            println(".")
        }
    }
    jobs.forEach { it.join() }


    /**
     *  또 다른 방법으로 repeat() 함수를 사용하면 손쉽게 많은 양의 코루틴을 생성할 수 있습니다.
     */
    repeat(100_000){
        launch {
            delay(1000L)
            println("#")
        }
    }

}

# 코루틴과 시퀀스(sequence)

: 코틀린의 표준 라이브러리 중에서 sequence()를 사용하면 아주 많은 값을 만들어 내는 코드로부터 특정 값의 범위를 가져올 수 있습니다. sequence()함수는 Sequence<T>를 반환하는데 Sequence() 함수 내부에서 지연 함수를 사용할 수 있고 코루틴과 함께 최종 형태를 나중에 결정할 수 있는 늦은(lazy) 시퀀스를 만들 수도 있습니다. 늦은 시퀀스란 특정 요소가 완전히 구성되기 전에 사용 범위와 시점을 결정할 수 있다는 뜻입니다.

 

- step 1

package chap02.section1


val fibonacciSeq = sequence {
    var a = 0
    var b = 1
    yield(1)        // (1) 지연함수가 사용됨

    while (true){
        yield(a + b)        // (2)
        val tmp = a + b
        a = b
        b = tmp
    }
}


fun main() {

    println(fibonacciSeq.take(8).toList())      // (3) 8개의 값 획득

}


/**
 * (1)번의 sequence 블록에서 지연 함수인 yield() 함수를 호출하면서 코루틴을 생성합니다.
 * (2)번의 while 루프는 매 단계를 무한하게 순회할 때 코루틴에서 다음 수를 계산하도록 실행됩니다.
 * (3)번에서 take().toList()에 의해 무한한 피보나치 수열 중 8개를 List로 변환해 화면상에 출력합니다.
 */

- step 2

package chap02.section1




val seq = sequence {
    val start = 0

    yield(start)        // 단일값 산출
    yieldAll(1..5 step 2)       // 반복 값 산출
    yieldAll(generateSequence(8) {it * 3})  // 무한한 시퀀스에서 산출
}


fun main() {
    
    println(seq.take(7).toList())

    
}

/**
 *   여기서는 yieldAll()을 사용해 반복적으로 멈추게 되면서 특정 범위의 값을 산출할 수 있습니다.
 *   또한 yieldAll()을 사용해 무한한 시퀀스를 만들어 내는 generateSequence() 함수를 사용해서도 요소 값을 산출할 수 있습니다.
 *   
 */

- step 3

package chap02.section1

val fibonacciSeq = sequence {
    var a = 0
    var b = 1
    yield(1)        // (1) 지연함수가 사용됨

    while (true){
        yield(a + b)        // (2)
        val tmp = a + b
        a = b
        b = tmp
    }
}

val seq = sequence {
    val start = 0

    yield(start)        // 단일값 산출
    yieldAll(1..5 step 2)       // 반복 값 산출
    yieldAll(generateSequence(8) {it * 3})  // 무한한 시퀀스에서 산출
}


fun main() {

    // 모든 요소는 일회성이기 때문에 각 요소에 대한 다음 요소를 직접 지정하려면 iterator()를 통해 next() 메서드를 사용해야한다.
    val saved = fibonacciSeq.iterator()
    println("${saved.next()}, ${saved.next()}, ${saved.next()}")


}

# 코루틴의 문맥(★)

package chap02.section1

import kotlinx.coroutines.*

/* 코루틴의 문맥 */
/**
 *  코루틴은 항상 특정 문맥에서 실행됩니다. 이때 어떤 문맥에서 코루틴을 실행할지는 디스패처(Dispatcher)가 결정합니다.
 */



fun main() = runBlocking<Unit>{

    val jobs = arrayListOf<Job>()
    jobs += launch(Dispatchers.Unconfined) {                // 메인 스레드에서 작업
        println("Unconfined : \t\t ${Thread.currentThread().name}")
    }
    jobs += launch(coroutineContext){                       // 부모의 문맥, 여기서는 runBlocking의 문맥
        println("coroutineContext : \t ${Thread.currentThread().name}")
    }
    jobs += launch(Dispatchers.Default){                    // 디스패처의 기본값
        println("Default : \t\t ${Thread.currentThread().name}")
    }
    jobs += launch(Dispatchers.IO) {                        // 입출력 중심의 문맥
        println("IO:\t\t ${Thread.currentThread().name}")
    }
    jobs += launch {                                        // 아무 인자가 없을 때
        println("main runBlocking : ${Thread.currentThread().name}")
    }
    jobs += launch(newSingleThreadContext("My thread")) {       // 새 스레드를 생성
        println("My thread : \t\t ${Thread.currentThread().name}")
    }
    jobs.forEach{it.join()}
    




}

# 기본 동작 제어하기

package chap02.section1

import kotlinx.coroutines.*

/* 기본 동작 제어하기 */



fun main() = runBlocking<Unit>{

    GlobalScope.launch {                // 만일 launch만 사용하면 종료되지 않음
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
    }
    delay(1300L)

    /* 1000회를 반복하기 위해 repeat()함수에 1000이라는 인자를 주고 있습니다. 하지만
    * GlobalScope의 생명주기를 한정했기 때문에 메인 스레드가 종료되어 버리면 더 이상 진행되지 않습니다.
    * 1.3초 뒤에 종료되므로 약 3번만 진행됨.*/





}

# finally의 실행 보장

package chap02.section1

import kotlinx.coroutines.*

/* finally의 실행 보장 */



fun main() = runBlocking<Unit>{

    val job = launch {
        try{
            repeat(1000) { i ->
                println("I'm sleeping $i...")
                delay(500L)
            }
        } finally {
            println("Bye!")
        }
    }
    delay(1300L)
    job.cancelAndJoin()     // 작업을 취소하고 완료될 때까지 기다림
    println("main : Quit")

}
package chap02.section1

import kotlinx.coroutines.*

/* finally의 실행 보장 */



fun main() = runBlocking<Unit>{

    val job = launch {
        try{
            repeat(1000) { i ->
                println("I'm sleeping $i...")
                delay(500L)
            }
        } finally {
            withContext(NonCancellable){                // finally의 완전한 실행을 보장함
                println("I'm running finally")
                delay(1000L)
                println("Non-Cancelable")               // 1초를 지연해도 취소되지 않음
            }

        }
    }
    delay(1300L)
    job.cancelAndJoin()     // 작업을 취소하고 완료될 때까지 기다림
    println("main : Quit")

}

/**
 *  일반적인 finally 블록에서 지연 함수를 사용하려고 하면 코루틴이 취소되므로 지연 함수를 사용할 수 없습니다.
 *  그 외에 파일을 닫거나 통신 채널을 닫는 등의 작업은 넌블로킹 형태로 작동하며 지연 함수를 포함하고 있지 않기 때문에 문제가 없습니다.
 *  만일 finally 블록에 시간이 걸리는 작업이나 지연 함수가 사용될 경우 실행을 보장하기 위해서는 NonCancellable 문맥에서 작동하도록 해야 합니다.
 *  이것을 위해 withContext(NonCancelable) {...}을 사용해 다음과 같이 finally 블록을 구성할 수 있습니다.
 *  이 코드는, 1초 이후에도 println() 함수의 실행을 보장하는 예입니다.
 */

# 실행상태의 판단

- step 1

package chap02.section1

import kotlinx.coroutines.*

/* 실행상태의 판단 */

/**
 *  만일 코드를 중단하기 위해 코루틴에 조건식을 넣으려고 할 때 연산이 마무리되기 전까지는 조건식에 의해
 *  코루틴이 중단되지 않는다는 것을 기억해야 한다.
 */

fun main() = runBlocking<Unit>{

    val starTime = System.currentTimeMillis()
    val job = GlobalScope.launch {
        var nextPrintTime = starTime
        var i = 0
        while (i < 5) {
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("I'm sleeping ${i++}...")
                nextPrintTime += 500L
            }
        }
    }

    delay(1300L)
    println("main : I'm tired of waiting!")
    job.cancelAndJoin()
    println("main : Now i can quit.")
}

/* delay(1300L) 이후 작업 취소 함수에 의해 시그널을 받아 루틴이 바로 취소 될 것 같지만 while(i < 5) {...} 루프를
* 사용하면 루프가 완료될 때까지 루틴이 끝나지 않습니다.
* */

- step 2

package chap02.section1

import kotlinx.coroutines.*

/* 실행상태의 판단 */

/**
 *  만일 코드를 중단하기 위해 코루틴에 조건식을 넣으려고 할 때 연산이 마무리되기 전까지는 조건식에 의해
 *  코루틴이 중단되지 않는다는 것을 기억해야 한다.
 */

fun main() = runBlocking<Unit>{

    val starTime = System.currentTimeMillis()
    val job = GlobalScope.launch {
        var nextPrintTime = starTime
        var i = 0
        while (isActive) {
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("I'm sleeping ${i++}...")
                nextPrintTime += 500L
            }
        }
    }

    delay(1300L)
    println("main : I'm tired of waiting!")
    job.cancelAndJoin()
    println("main : Now i can quit.")
}

/* 취소 시그널을 받아 루프를 중단하려면 소스 코드에서 while (i < 5)를 while(isActive)로 변경하면 의도한 시간에 루프가 취소되어 중단됩니다.
* */

 

# 코루틴의 시간 만료

package chap02.section1

import kotlinx.coroutines.*

/* 코루틴의 시간 만료 */

// 일정 실행 시간 뒤에 코루틴을 취소할 수 있도록 해보자.



fun main() = runBlocking<Unit>{

    try{
        withTimeout(1300L) {
            repeat(1000) { i ->
                println("I'm sleeping $i...")
                delay(500L)
            }
        }
    }catch (e : TimeoutCancellationException) {
        println("timed out with $e")
    }

}

/**
 *  다음은 시간이 만료되면 block을 취소시키고 TimeoutCancellationException 오류가 발생하는 코드이다.
 *  예외를 발생하지 않고 null로 처리하려면 withTimeoutOrNull()을 사용한다.
 */

 

# 채널의 동작

package chap02.section1

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel

/* 채널의 동작 */

/**
 * 채널은 자료를 서로 주고받기 위해 약속된 일종의 통로 역할을 합니다.
 * 코루틴의 채널은 넌블로킹 전송 개념으로 사용되고 있습니다. 채널을 구현할 때는
 * SendChannel과 ReceiveChannel 인터페이스를 이용해 값들의 스트림을 전송하는 방법을 제공합니다.
 * 실제 전송에는 다음과 같이 지연 함수의 send()와 receive() 함수를 사용합니다.
 */



fun main() = runBlocking<Unit>{

    val channel = Channel<Int>()
    launch {
        // 여기에 다량의 CPU 연산 작업이나 비동기 로직을 둘 수 있음
        for (x in 1..5 ) channel.send(x * x)
    }
    repeat(5) { println(channel.receive())}         // 5개의 값을 채널로부터 받음
    println("Done!")

}

/**
 * 채널을 통해 send() 함수로 값을 보내 놓으면 이후 receive() 함수를 통해 값을 받을 수 있습니다. 
 * 일반 큐와는 다르게 더 이상 전달 요소가 없으면 채널을 닫을 수 있습니다.
 */

- close()

package chap02.section1

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel

/* 채널의 동작 */

/**
 * 보통 for문을 구성해 채널을 받고 close()를 사용하면 바로 채널을 닫는 것이 아니라
 * 닫겠다는 특수한 토큰을 보냅니다.
 */



fun main() = runBlocking<Unit>{

    val channel = Channel<Int>()
    launch {
        // 여기에 다량의 CPU 연산 작업이나 비동기 로직을 둘 수 있음
        for (x in 1..5 ) channel.send(x * x)
        channel.close()             // 모두 보내고 닫기 명시
    }
    for (element in channel) println(element)       // for문을 사용해 끝까지 읽기
    println("Done!")

}

/**
 * 여기서 보내는 쪽과 받는 쪽에 몇 가지 중요한 상태가 있습니다. 송신자는 SendChannel에서 채널이 꽉 차있는지,
 * 즉 isFull 값이 true인지 살펴보고 꽉 차 있으면 일시 지연됩니다.
 * 만일 close()에 의해 닫으면 isClosedForSend가 true로 지정되어 isFull은 false를 반환할 수 있습니다.
 * 수신자는 isEmpty가 true라면 비어 있으므로 가져갈게 없는 루틴은 일시 지연됩니다.
 * 마찬가지로 닫을 경우 isClosedForReceive에 의해 false를 반환할 수 있습니다.
 */

# produce 생산자 소비자 패턴

: produce는 채널이 붙어 있는 코루틴으로 생산자 측면의 코드를 쉽게 구성할 수 있습니다.

채널에 값을 보내면 생산자로 볼 수 있고 소비자는 consumeEach 함수를 확장해 for문을 대신 해서 저장된 요소를 소비합니다.

 

- 생산자 소비자 형태의 구성

package chap02.section1

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

/* produce 생산자 소비자 패턴 */


// 생산자를 위한 함수 생성
fun CoroutineScope.producer() : ReceiveChannel<Int> = produce{
    var total : Int = 0
    for (x in 1.. 5) {
        total += x
        send(total)
    }
}




fun main() = runBlocking<Unit>{

    val result = producer()                 // 값의 생산
    result.consumeEach { print("$it ") }    // 소비자 루틴 구성

}

- 버퍼를 가진 채널 구성하기

package chap02.section1

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

/* 버퍼를 가진 채널 구성하기 */







fun main() = runBlocking<Unit>{

    val channel = Channel<Int>(3)                           // 버퍼 capacity 값을 할당
    val sender = launch(coroutineContext) {                         // 송신자 측
        repeat(10) {
            println("Sending $it")
            channel.send(it)                                        // 지속적으로 보내다가 꽉 차면 일시 지연
        }
    }
    delay(1000)                                             // 아무것도 받지 않고 1초 기다린 후
    sender.cancel()                                                 // 송신자의 작업을 취소

    

}

- select 표현식

package chap02.section1

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.select
import java.util.*




fun main() = runBlocking<Unit>{

    val routine1 = GlobalScope.produce {
        delay(Random().nextInt(1000).toLong())
        send("A")
    }

    val routine2 = GlobalScope.produce {
        delay(Random().nextInt(1000).toLong())
        send("B")
    }
    val result = select<String> {                       // 먼저 수행된 것을 받게 된다.
        routine1.onReceive {result -> result}
        routine1.onReceive {result -> result}
    }
    println("Result was $result")


}

/**
 *  produce로 만든 2개의 루틴은 무작위로 지정된 시간에 각각 A,B라는 문자열을 채널에 보내게 됩니다.
 *  이때 select 블록의 onReceive를 통해 채널로부터 이 값을 받아 먼저 완성된 결과를 가져오게 됩니다.
 */