# 코루틴의 기본적인 예제
package chap02.section1
import kotlinx.coroutines.*
/* 기초적인 코루틴 */
fun main() { // 메인 스레드의 문맥
GlobalScope.launch { // 새로운 코루틴을 백그라운드에서 실행
delay(1000L) // 1초의 넌블로킹 지연(시간의 기본 단위는 ms)
println("world!") // 지연 후 출력
}
println("Hello,") // 메인 스레드의 코루틴이 지연되는 동안 계속 실행
Thread.sleep(2000L) // 메인 스레드가 JVM에서 바로 종료되지 않게 2초 기다림
}
/**
* 실행 결과를 보면 "Hello,"는 메인 스레드에 의해 바로 출력됩니다.
* "world!"는 코루틴 코드의 부분으로 메인 스레드와 분리되어 백그라운드에서 1초 뒤 실행됩니다.
* 따라서, 메인 스레드의 코드보다 지연되어 실행됩니다.
* 또한 메인 스레드와 별도로 실행되므로 넌블로킹 코드이기도 합니다.
* 코루틴에서 사용되는 함수는 suspend()로 선언된 지연 함수여야 코루틴 기능을 사용할 수 있습니다.
* suspend로 표기함으로서 이 함수는 실행이 일시 중단 될 수 있으며 필요한 경우 다시 재개할 수 있게 됩니다.
* suspend 함수는 사용자가 실행을 일시중단 할 수 있음을 의미하고 코루틴 블록 안에서 사용할 수 있습니다.
* 지연함수(delay(),suspend()?)는 코루틴 빌더인 launch와 async에서 사용할 수 있지만 메인 스레드에서는 사용할 수 없습니다.
* 지연 함수는 또 다른 지연 함수 내에서 사용하거나 코루틴 블록에서만 사용해야 합니다!!
*
*/
# launch 코루틴 빌더 생성하기
이렇게 launch를 통해 코루틴 블록을 만들어 내는 것을 코루틴 빌더의 생성이라고 한다.
lanuch는 현재 스레드를 차단하지 않고 새로운 코루틴을 실행할 수 있게 며 특정 결과값 없이 Job객체를 반환한다.
package chap02.section1
import kotlinx.coroutines.*
/* Job 객체의 반환 */
fun main() {
val job = GlobalScope.launch {
delay(1000L)
println("World!")
}
println("Hello")
println("job.isActive: ${job.isActive}, completed : ${job.isCompleted}")
Thread.sleep(2000L)
println("job.isActive: ${job.isActive}, completed : ${job.isCompleted}")
}
/**
* launch를 살펴보면 실행 범위를 결정하는 GlobalScope가 지정되어 있습니다. 이것은 코루틴의 생명 주기가 프로그램의 생명 주기에 의존되므로
* main()이 종료되면 같이 종료됩니다. 코루틴을 실행하기 위해서는 내부적으로 스레드를 통해서 실행될 수 있습니다. 단, 실행 루틴이 많지 않은 경우에는
* 내부적으로 하나의 스레에서 여러 개의 코루틴을 실행할 수 있기 때문에 1개의 스레드면 충분합니다.
*/
# 코루틴의 순차적 실행
package chap02.section1
import kotlinx.coroutines.*
/* 코루틴의 순차적 실행 */
suspend fun doWork1() : String {
delay(1000)
return "Work1"
}
suspend fun doWork2() : String {
delay(3000)
return "Work2"
}
private fun workInSerial() {
// 순차적 실행
GlobalScope.launch {
val one = doWork1()
val two = doWork2()
println("Kotlin One : $one")
println("Kotlin Two : $two")
}
}
// async 코루틴 빌더 생성하기
/**
* async도 새로운 코루틴을 실행할 수 있는데 launch와 다른 점은 Deferred<T>를 통해 결과값을 반환한다는 것입니다.
* 때 지연된 결과값을 받기 위해 await()를 사용할 수 있습니다.
*/
private fun worksInParallel() {
// Deferred<T>를 통해 결과값 반환
val one = GlobalScope.async {
doWork1()
}
val two = GlobalScope.async {
doWork2()
}
GlobalScope.launch {
val combined = one.await() + "_" + two.await()
println("Kotlin Combined : $combined")
}
}
fun main() {
// workInSerial()
// readLine() // main()이 먼저 종료되는 것을 방지하기 위해 콘솔에서 Enter키 입력 대기
worksInParallel()
/**
* doWork1()과 doWork2()는 async에 의해 감싸져 있으므로 완전히 병행 수행할 수 있습니다.
* 여기서 delay()로 1초만 지연시킨 doWork1()이 먼저 종료됩니다.
* 그러나 좀 더 복잡한 루틴을 작성하는 경우에는 많은 태스크들과 같이 병행 수행되므로 어떤 루틴이 먼저 종료될지 알기 어렵습니다.
* 따라서 태스크가 종료되는 시점을 기다렸다가 결과를 받을 수 있도록 await()를 사용해 현재 스레드의 블로킹 없이 먼저 종료되면 결과를
* 가져올 수 있습니다. 여기서는 combined라는 변수에 2개의 비동기 루틴이 종료되고 결과가 반환되면
* 문자를 합쳐서 할당합니다.
*/
}
# 시작 시점 늦춰보기
package chap02.section1
import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis
/* 시작 시점 늦춰보기 */
suspend fun doWork1() : String {
delay(1000)
return "Work1"
}
suspend fun doWork2() : String {
delay(3000)
return "Work2"
}
fun main() = runBlocking{ // main() 함수가 코루틴 환경에서 실행
val time = measureTimeMillis {
val one = async ( start = CoroutineStart.LAZY) { doWork1() }
val two = async ( start = CoroutineStart.LAZY) { doWork2() }
println("AWAIT : ${one.await() + "_" + two.await()}")
}
println("Complete in $time ms")
}
/**
* DEFAULT : 즉시 시작
* LAZY : 코루틴을 느리게 시작(처음에는 중단된 상태이며 start()나 await() 등으로 시작됨)
* ATOMIC : 최적화된 방법으로 시작
* UNDISPATCHED : 분산 처리 방법으로 시작
*/
# 많은 작업의 처리
package chap02.section1
import kotlinx.coroutines.*
/* 많은 작업의 처리 */
fun main() = runBlocking{
/**
* 10만 개의 코루틴을 list로 생성하고 각 루틴으로 화면에 점(.)을 찍도록 작성
* 이런 코드를 스레드로 바꾸면 Out-of-memory 오류가 발생합니다. 하지만 코루틴으로 작업하면 내부적으로 단 몇 개의 스레드로 수많은
* 코루틴을 생성해 실행할 수 있기 때문에 오류가 발생하지 않습니다. 또 메모리나 실행 속도 면에서 큰 장점을 가집니다.
*/
val jobs = List(100_100) {
launch {
delay(1000L)
println(".")
}
}
jobs.forEach { it.join() }
/**
* 또 다른 방법으로 repeat() 함수를 사용하면 손쉽게 많은 양의 코루틴을 생성할 수 있습니다.
*/
repeat(100_000){
launch {
delay(1000L)
println("#")
}
}
}
# 코루틴과 시퀀스(sequence)
: 코틀린의 표준 라이브러리 중에서 sequence()를 사용하면 아주 많은 값을 만들어 내는 코드로부터 특정 값의 범위를 가져올 수 있습니다. sequence()함수는 Sequence<T>를 반환하는데 Sequence() 함수 내부에서 지연 함수를 사용할 수 있고 코루틴과 함께 최종 형태를 나중에 결정할 수 있는 늦은(lazy) 시퀀스를 만들 수도 있습니다. 늦은 시퀀스란 특정 요소가 완전히 구성되기 전에 사용 범위와 시점을 결정할 수 있다는 뜻입니다.
- step 1
package chap02.section1
val fibonacciSeq = sequence {
var a = 0
var b = 1
yield(1) // (1) 지연함수가 사용됨
while (true){
yield(a + b) // (2)
val tmp = a + b
a = b
b = tmp
}
}
fun main() {
println(fibonacciSeq.take(8).toList()) // (3) 8개의 값 획득
}
/**
* (1)번의 sequence 블록에서 지연 함수인 yield() 함수를 호출하면서 코루틴을 생성합니다.
* (2)번의 while 루프는 매 단계를 무한하게 순회할 때 코루틴에서 다음 수를 계산하도록 실행됩니다.
* (3)번에서 take().toList()에 의해 무한한 피보나치 수열 중 8개를 List로 변환해 화면상에 출력합니다.
*/
- step 2
package chap02.section1
val seq = sequence {
val start = 0
yield(start) // 단일값 산출
yieldAll(1..5 step 2) // 반복 값 산출
yieldAll(generateSequence(8) {it * 3}) // 무한한 시퀀스에서 산출
}
fun main() {
println(seq.take(7).toList())
}
/**
* 여기서는 yieldAll()을 사용해 반복적으로 멈추게 되면서 특정 범위의 값을 산출할 수 있습니다.
* 또한 yieldAll()을 사용해 무한한 시퀀스를 만들어 내는 generateSequence() 함수를 사용해서도 요소 값을 산출할 수 있습니다.
*
*/
- step 3
package chap02.section1
val fibonacciSeq = sequence {
var a = 0
var b = 1
yield(1) // (1) 지연함수가 사용됨
while (true){
yield(a + b) // (2)
val tmp = a + b
a = b
b = tmp
}
}
val seq = sequence {
val start = 0
yield(start) // 단일값 산출
yieldAll(1..5 step 2) // 반복 값 산출
yieldAll(generateSequence(8) {it * 3}) // 무한한 시퀀스에서 산출
}
fun main() {
// 모든 요소는 일회성이기 때문에 각 요소에 대한 다음 요소를 직접 지정하려면 iterator()를 통해 next() 메서드를 사용해야한다.
val saved = fibonacciSeq.iterator()
println("${saved.next()}, ${saved.next()}, ${saved.next()}")
}
# 코루틴의 문맥(★)
package chap02.section1
import kotlinx.coroutines.*
/* 코루틴의 문맥 */
/**
* 코루틴은 항상 특정 문맥에서 실행됩니다. 이때 어떤 문맥에서 코루틴을 실행할지는 디스패처(Dispatcher)가 결정합니다.
*/
fun main() = runBlocking<Unit>{
val jobs = arrayListOf<Job>()
jobs += launch(Dispatchers.Unconfined) { // 메인 스레드에서 작업
println("Unconfined : \t\t ${Thread.currentThread().name}")
}
jobs += launch(coroutineContext){ // 부모의 문맥, 여기서는 runBlocking의 문맥
println("coroutineContext : \t ${Thread.currentThread().name}")
}
jobs += launch(Dispatchers.Default){ // 디스패처의 기본값
println("Default : \t\t ${Thread.currentThread().name}")
}
jobs += launch(Dispatchers.IO) { // 입출력 중심의 문맥
println("IO:\t\t ${Thread.currentThread().name}")
}
jobs += launch { // 아무 인자가 없을 때
println("main runBlocking : ${Thread.currentThread().name}")
}
jobs += launch(newSingleThreadContext("My thread")) { // 새 스레드를 생성
println("My thread : \t\t ${Thread.currentThread().name}")
}
jobs.forEach{it.join()}
}
# 기본 동작 제어하기
package chap02.section1
import kotlinx.coroutines.*
/* 기본 동작 제어하기 */
fun main() = runBlocking<Unit>{
GlobalScope.launch { // 만일 launch만 사용하면 종료되지 않음
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L)
/* 1000회를 반복하기 위해 repeat()함수에 1000이라는 인자를 주고 있습니다. 하지만
* GlobalScope의 생명주기를 한정했기 때문에 메인 스레드가 종료되어 버리면 더 이상 진행되지 않습니다.
* 1.3초 뒤에 종료되므로 약 3번만 진행됨.*/
}
# finally의 실행 보장
package chap02.section1
import kotlinx.coroutines.*
/* finally의 실행 보장 */
fun main() = runBlocking<Unit>{
val job = launch {
try{
repeat(1000) { i ->
println("I'm sleeping $i...")
delay(500L)
}
} finally {
println("Bye!")
}
}
delay(1300L)
job.cancelAndJoin() // 작업을 취소하고 완료될 때까지 기다림
println("main : Quit")
}
package chap02.section1
import kotlinx.coroutines.*
/* finally의 실행 보장 */
fun main() = runBlocking<Unit>{
val job = launch {
try{
repeat(1000) { i ->
println("I'm sleeping $i...")
delay(500L)
}
} finally {
withContext(NonCancellable){ // finally의 완전한 실행을 보장함
println("I'm running finally")
delay(1000L)
println("Non-Cancelable") // 1초를 지연해도 취소되지 않음
}
}
}
delay(1300L)
job.cancelAndJoin() // 작업을 취소하고 완료될 때까지 기다림
println("main : Quit")
}
/**
* 일반적인 finally 블록에서 지연 함수를 사용하려고 하면 코루틴이 취소되므로 지연 함수를 사용할 수 없습니다.
* 그 외에 파일을 닫거나 통신 채널을 닫는 등의 작업은 넌블로킹 형태로 작동하며 지연 함수를 포함하고 있지 않기 때문에 문제가 없습니다.
* 만일 finally 블록에 시간이 걸리는 작업이나 지연 함수가 사용될 경우 실행을 보장하기 위해서는 NonCancellable 문맥에서 작동하도록 해야 합니다.
* 이것을 위해 withContext(NonCancelable) {...}을 사용해 다음과 같이 finally 블록을 구성할 수 있습니다.
* 이 코드는, 1초 이후에도 println() 함수의 실행을 보장하는 예입니다.
*/
# 실행상태의 판단
- step 1
package chap02.section1
import kotlinx.coroutines.*
/* 실행상태의 판단 */
/**
* 만일 코드를 중단하기 위해 코루틴에 조건식을 넣으려고 할 때 연산이 마무리되기 전까지는 조건식에 의해
* 코루틴이 중단되지 않는다는 것을 기억해야 한다.
*/
fun main() = runBlocking<Unit>{
val starTime = System.currentTimeMillis()
val job = GlobalScope.launch {
var nextPrintTime = starTime
var i = 0
while (i < 5) {
if (System.currentTimeMillis() >= nextPrintTime) {
println("I'm sleeping ${i++}...")
nextPrintTime += 500L
}
}
}
delay(1300L)
println("main : I'm tired of waiting!")
job.cancelAndJoin()
println("main : Now i can quit.")
}
/* delay(1300L) 이후 작업 취소 함수에 의해 시그널을 받아 루틴이 바로 취소 될 것 같지만 while(i < 5) {...} 루프를
* 사용하면 루프가 완료될 때까지 루틴이 끝나지 않습니다.
* */
- step 2
package chap02.section1
import kotlinx.coroutines.*
/* 실행상태의 판단 */
/**
* 만일 코드를 중단하기 위해 코루틴에 조건식을 넣으려고 할 때 연산이 마무리되기 전까지는 조건식에 의해
* 코루틴이 중단되지 않는다는 것을 기억해야 한다.
*/
fun main() = runBlocking<Unit>{
val starTime = System.currentTimeMillis()
val job = GlobalScope.launch {
var nextPrintTime = starTime
var i = 0
while (isActive) {
if (System.currentTimeMillis() >= nextPrintTime) {
println("I'm sleeping ${i++}...")
nextPrintTime += 500L
}
}
}
delay(1300L)
println("main : I'm tired of waiting!")
job.cancelAndJoin()
println("main : Now i can quit.")
}
/* 취소 시그널을 받아 루프를 중단하려면 소스 코드에서 while (i < 5)를 while(isActive)로 변경하면 의도한 시간에 루프가 취소되어 중단됩니다.
* */
# 코루틴의 시간 만료
package chap02.section1
import kotlinx.coroutines.*
/* 코루틴의 시간 만료 */
// 일정 실행 시간 뒤에 코루틴을 취소할 수 있도록 해보자.
fun main() = runBlocking<Unit>{
try{
withTimeout(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i...")
delay(500L)
}
}
}catch (e : TimeoutCancellationException) {
println("timed out with $e")
}
}
/**
* 다음은 시간이 만료되면 block을 취소시키고 TimeoutCancellationException 오류가 발생하는 코드이다.
* 예외를 발생하지 않고 null로 처리하려면 withTimeoutOrNull()을 사용한다.
*/
# 채널의 동작
package chap02.section1
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
/* 채널의 동작 */
/**
* 채널은 자료를 서로 주고받기 위해 약속된 일종의 통로 역할을 합니다.
* 코루틴의 채널은 넌블로킹 전송 개념으로 사용되고 있습니다. 채널을 구현할 때는
* SendChannel과 ReceiveChannel 인터페이스를 이용해 값들의 스트림을 전송하는 방법을 제공합니다.
* 실제 전송에는 다음과 같이 지연 함수의 send()와 receive() 함수를 사용합니다.
*/
fun main() = runBlocking<Unit>{
val channel = Channel<Int>()
launch {
// 여기에 다량의 CPU 연산 작업이나 비동기 로직을 둘 수 있음
for (x in 1..5 ) channel.send(x * x)
}
repeat(5) { println(channel.receive())} // 5개의 값을 채널로부터 받음
println("Done!")
}
/**
* 채널을 통해 send() 함수로 값을 보내 놓으면 이후 receive() 함수를 통해 값을 받을 수 있습니다.
* 일반 큐와는 다르게 더 이상 전달 요소가 없으면 채널을 닫을 수 있습니다.
*/
- close()
package chap02.section1
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
/* 채널의 동작 */
/**
* 보통 for문을 구성해 채널을 받고 close()를 사용하면 바로 채널을 닫는 것이 아니라
* 닫겠다는 특수한 토큰을 보냅니다.
*/
fun main() = runBlocking<Unit>{
val channel = Channel<Int>()
launch {
// 여기에 다량의 CPU 연산 작업이나 비동기 로직을 둘 수 있음
for (x in 1..5 ) channel.send(x * x)
channel.close() // 모두 보내고 닫기 명시
}
for (element in channel) println(element) // for문을 사용해 끝까지 읽기
println("Done!")
}
/**
* 여기서 보내는 쪽과 받는 쪽에 몇 가지 중요한 상태가 있습니다. 송신자는 SendChannel에서 채널이 꽉 차있는지,
* 즉 isFull 값이 true인지 살펴보고 꽉 차 있으면 일시 지연됩니다.
* 만일 close()에 의해 닫으면 isClosedForSend가 true로 지정되어 isFull은 false를 반환할 수 있습니다.
* 수신자는 isEmpty가 true라면 비어 있으므로 가져갈게 없는 루틴은 일시 지연됩니다.
* 마찬가지로 닫을 경우 isClosedForReceive에 의해 false를 반환할 수 있습니다.
*/
# produce 생산자 소비자 패턴
: produce는 채널이 붙어 있는 코루틴으로 생산자 측면의 코드를 쉽게 구성할 수 있습니다.
채널에 값을 보내면 생산자로 볼 수 있고 소비자는 consumeEach 함수를 확장해 for문을 대신 해서 저장된 요소를 소비합니다.
- 생산자 소비자 형태의 구성
package chap02.section1
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
/* produce 생산자 소비자 패턴 */
// 생산자를 위한 함수 생성
fun CoroutineScope.producer() : ReceiveChannel<Int> = produce{
var total : Int = 0
for (x in 1.. 5) {
total += x
send(total)
}
}
fun main() = runBlocking<Unit>{
val result = producer() // 값의 생산
result.consumeEach { print("$it ") } // 소비자 루틴 구성
}
- 버퍼를 가진 채널 구성하기
package chap02.section1
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
/* 버퍼를 가진 채널 구성하기 */
fun main() = runBlocking<Unit>{
val channel = Channel<Int>(3) // 버퍼 capacity 값을 할당
val sender = launch(coroutineContext) { // 송신자 측
repeat(10) {
println("Sending $it")
channel.send(it) // 지속적으로 보내다가 꽉 차면 일시 지연
}
}
delay(1000) // 아무것도 받지 않고 1초 기다린 후
sender.cancel() // 송신자의 작업을 취소
}
- select 표현식
package chap02.section1
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.select
import java.util.*
fun main() = runBlocking<Unit>{
val routine1 = GlobalScope.produce {
delay(Random().nextInt(1000).toLong())
send("A")
}
val routine2 = GlobalScope.produce {
delay(Random().nextInt(1000).toLong())
send("B")
}
val result = select<String> { // 먼저 수행된 것을 받게 된다.
routine1.onReceive {result -> result}
routine1.onReceive {result -> result}
}
println("Result was $result")
}
/**
* produce로 만든 2개의 루틴은 무작위로 지정된 시간에 각각 A,B라는 문자열을 채널에 보내게 됩니다.
* 이때 select 블록의 onReceive를 통해 채널로부터 이 값을 받아 먼저 완성된 결과를 가져오게 됩니다.
*/
'Kotlin' 카테고리의 다른 글
Kotlin - DSL(Domain-Specific Language) (0) | 2022.06.22 |
---|---|
Kotlin - 람다식을 사용하는 표준 라이브러리★ (0) | 2022.06.21 |
Kotlin - 클로저★ (0) | 2022.06.21 |
Kotlin - 람다식과 고차함수(review) ★ (0) | 2022.06.21 |
Kotlin - 시퀀스의 활용 (0) | 2022.06.21 |