You will be fine

<Coroutine> 6. 일시 중단 함수와 코루틴 멀티스레드

by BFine
반응형

참고 & 출처 : https://www.yes24.com/Product/Goods/125014350

 

코틀린 코루틴의 정석 - 예스24

많은 개발자들이 어렵게 느끼는 비동기 프로그래밍을 다양한 시각적 자료와 설명을 통해 누구나 쉽게 이해할 수 있도록 쓰인 책이다. 안드로이드, 스프링 등 코틀린을 사용하는 개발자들 중 코

www.yes24.com

 

가. 일시 중단 함수 (suspend fun)

 a. 만드는 방법

  -  함수이름 앞에 suspend 키워드를 붙여 생성할 수 있고 이 키워드는 함수 내에 일시 중단 지점을 포함 할 수 있는 특별한 기능을 포함시킨다.

 -  주의사항은 일시 중단 함수는 코루틴처럼 생각할수 도 있지만 코루틴 내부에서 실행되는 코드에 집합일 뿐이다.

 

 b.  특징

  -  코루틴의 비동기 작업과 관련된 복잡한 코드들을 구조화하고 재사용할 수 있는 코드의 집합으로 만드는 데 사용된다.

  -  일반 함수와 용도는 같은데 다른 점은 일시 중단 지점을 포함여부의 차이가 있다고 보면 된다.

 

 c. 예시

  -  위에 예시를 보면 suspend 함수인 경우에는 일반 함수에서 쓸수 없는 것을 볼 수 있다.

      => 즉 suspend 함수가 포함되면 호출부 함수도 suspend 여야하기 때문에 포함하는 모든 함수에 suspend 키워드를 붙여야 한다.

import kotlinx.coroutines.*

fun main(): Unit = runBlocking {
    launch {
        val car = callRentalCarApi(1)
        println(car)
    }
    launch {
        val car = callRentalCarApi(2)
        println(car)
    }
}

suspend fun callRentalCarApi(id: Long): String{
    delay(1000L)
    return if (id == 1L) {
        "현대"
    } else {
        "기아"
    }
}

  -  suspend 키워드를 이용하면 일시중단 기능을 포함한 함수를 서로 다른 코루틴에서 재사용이 가능하다.

 

 d. 일시 중단 함수와 코루틴 빌더

  -  위의 예시에서 볼 수 있듯이 일시 중단 함수 안에서 async, launch 같은 코루틴 빌더함수를 사용할 수 없다.

      => 이유는 코루틴 빌더 함수는 CoroutineScope의 확장함수인데 일시중단함수에서는 호출한 코루틴의 CoroutineScope에 접근할 수 없기 때문이다.

import kotlinx.coroutines.*

fun main(): Unit = runBlocking {
    launch {
        findRentalCar(1L)
    }
}

suspend fun findRentalCar(id: Long) = coroutineScope {
    val carDeferred = async {
        callRentalCarApi(id)
    }
    val priceDeferred = async {
        callRentalCarPriceApi(id)
    }
    val car = carDeferred.await()
    val price = priceDeferred.await()
    val result = Car(name = car, price = price)

    println(result)
}

suspend fun callRentalCarApi(id: Long): String{
    delay(1000L)
    return if (id == 1L) {
        "현대"
    } else {
        "기아"
    }
}

suspend fun callRentalCarPriceApi(id: Long): Long{
    delay(1000L)
    return if (id == 1L) {
        100_000L
    } else {
        50_000L
    }
}

data class Car(
    private val name: String,
    private val price: Long
)

  -  coroutineScope 를 사용하면 일시중단 내부에서 새로운 CoroutineScope 객체를 생성 할  수 있다.

      => coroutineScope는 구조화를 깨지 않는 CoroutineScope 객체를 생성하고 생성객체는 block 람다식에서 수신 객체(this)로 접근 가능하다.

 

나.  배려심이 깊은 코루틴

 a. 코루틴의 스레드 양보 

  -  스레드를 양보하는 주체는 코루틴이며 스레드에 코루틴을 할당해 실행시키는 주체는 CoroutineDispatcher 객체이지만 스레드 양보에는 관여하지 않는다.

  -  코루틴이 스레드를 양보하려면 코루틴에서 직접 스레드 양보를 위한 함수를 호출해야한다.

 

 b. delay 로 스레드 양보하기

  -  delay 함수를 호출하면 코루틴은 사용하던 스레드를 양보하고 설정된 시간동안 코루틴을 일시 중단 시킨다.

fun main(): Unit = runBlocking {
    val time = measureTimeMillis {
        val jobs = List(10) {
            launch {
                delay(1000L)
            }
        }
        joinAll(jobs = jobs.toTypedArray())
    }
    println("Execution time: $time ms")
}
// === 결과 ===
// Execution time: 1014 ms
fun main(): Unit = runBlocking {
    val time = measureTimeMillis {
        val jobs = List(10) {
            launch {
                TimeUnit.SECONDS.sleep(1L)
            }
        }
        joinAll(jobs = jobs.toTypedArray())
    }
    println("Execution time: $time ms")
}
// === 결과 ===
// Execution time: 10054 ms

  -  스레드를 블록킹하는 sleep 함수와 delay 함수를 사용한 각각의 동일한 로직을 성능을 비교해보면 많이 차이나는 것을 볼 수 있다.

      => sleep 함수에서는 스레드를 양보하기 때문에 10개의 코루틴이 바로 생성되지만 sleep 함수의 경우 첫번째 반복이 끝날때까지 스레드를 블록킹 한다.

 

 c. join & await 으로 스레드 양보하기

fun main(): Unit = runBlocking {
    launch {
        println("자식")
    }
    println("부모")
}
// === 결과 ===
// 부모
// 자식

  -  자식이 먼저 출력되지 않는다. 이유는 runBlocking main 단일 스레드만 존재하고 코드에서 코루틴이 스레드를 양보하지 않기 때문이다. 

fun main(): Unit = runBlocking {
    val job = launch {
        println("자식")
    }
    job.join()
    println("부모")
}
// === 결과 ====
// 자식
// 부모

  -  join 이나 await 함수를 코루틴이 호출하면 호출한 코루틴을 스레드를 양보하고 일시중단 상태가 되며 대상 코루틴이 완료될때까지 다시 시작되지 않는다.

 

 d. yield 으로 스레드 양보하기 

  -  delay, join, await 같은 일시 중단 함수들은 스레드 양보를 직접 호출하지 않아도 작업을 위해 내부적으로 스레드 양보를 일으킨다.

  -  yield 함수는 스레드 양보를 직접 호출 해야하는 상황에 사용한다.

fun main(): Unit = runBlocking {
    val job = launch {
        repeat(9999999) {
            if (this.isActive) {
                println("작업중..")
            }
        }
    }
    delay(10)
    job.cancel()
}

  -  main 스레드를 delay 함수를 이용해 자식 코루틴에게 양보 후 job.cancel 을 호출한다. 결과는 모든 반복이 끝나기전에 job은 취소되지 않는다.

      => 그 이유는 단일스레드이므로 자식코루틴에서 스레드를 양보해야 부모코루틴이 메인스레드에서 job.cancel 함수를 실행할 수 있기 때문이다.

fun main(): Unit = runBlocking {
    val job = launch {
        repeat(9999999) {
            if (this.isActive) {
                println("작업중..")
                yield()
            }
        }
    }
    delay(10)
    job.cancel()
}

  -  이때 yield 함수를 이용하여 스레드를 양보하면 모든 반복이 실행되기전에 job이 취소되는 것을 볼 수 있다.

 

 e. 코루틴의 실행스레드 

  -  실행스레드는 고정이 아니라서 일시중단 후 다시 실행 될때 CoroutineDispatcher의 작업대기열로 들어가며 이후에 코루틴을 다시 스레드에 할당한다

      => 이때 CoroutineDispatcher 는 사용할 수 있는 스레드 중에 하나에 할당하는데 이전에 실행되던 스레드와 다를 수 있다.  

  -  코루틴의 실행스레드가 바뀌는 시점은 다시 시작하는 시점 뿐이다.

      => 만약 스레드를 양보하는 경우가 없다면 일시중단이 발생하지 않는다는 것이므로 실행스레드가 변할 일이 없다.  

 

다. 코루틴과 멀티스레드 

 a.  자바에서 가시성, 원자성

  -  책에서 가시성에 대한 설명이 있어서 예전에 volatile 키워드 찾아보면서 잘 정리된 글을 본 기억이 있었는데 아래 페이지인 것 같다.

      =>  https://jenkov.com/tutorials/java-concurrency/volatile.html

 

Java Volatile Keyword

The Java volatile keyword guarantees variable visibility across threads, meaning reads and writes are visible across threads.

jenkov.com

import kotlinx.coroutines.*

var count = 0

fun main(): Unit = runBlocking {
    withContext(context = Dispatchers.Default) {
        repeat(10_000) {
            launch {
                count++
            }
        }
    }
    println(count)
}
// === 결과 ===
// 10000이 나오지 읺는다.

  - 가시성 문제는 멀티코어에서 멀티스레드를 사용할 경우 CPU 캐시를 사용하는 경우 메인 메모리에 전파되는데 지연시간으로

     CPU 캐시와 메인 메모리간 불일치 데이터 불일치로 각각 스레드에서 최신 데이터가 아닌 값을 사용하는 것을 의미한다.

     => volatile 키워드를 이용하면 CPU 캐시를 사용하지 않고 메인 메모리에서 가져오기 때문에 가시성 문제를 해결할 수 있다. (코틀린은 @Volatile)

  - 원자성 문제는 Race Condition이 발생하여 여러 스레드에서 동일 값을 읽어 쓰기작업이 발생하는 경우 연산의 손실을 발생하는 것을 의미한다.

     => 즉 여러 스레드가 동시에 하나의 값에 접근하면서 발생하는 문제이다

 

 b.  메모리 구조

  -  하드웨어 메모리 구조는 JVM의 스택&힙영역을 구분하지 않아서 스택 영역에 데이터들은 CPU 레지스터, CPU 캐시, 메인 메모리 모두에 나타날수있다.

 

 c.  동시 접근 제한하기

import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex

val mutex = Mutex()
var count = 0

fun main(): Unit = runBlocking {
    withContext(context = Dispatchers.Default) {
        repeat(10_000) {
            launch {
                mutex.lock()
                count++
                mutex.unlock()
            }
        }
    }
    println(count)
}
// === 결과 ===
// 10000
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

val mutex = Mutex()
var count = 0

fun main(): Unit = runBlocking {
    withContext(context = Dispatchers.Default) {
        repeat(10_000) {
            launch {
                mutex.withLock {
                    count++
                }
            }
        }
    }
    println(count)
}
// === 결과 ===
// 10000

  -  코틀린에서는 코루틴에 대한 임계영역을 만들기 위한 Mutex 객체를 제공하고 있다. 결과가 동시성이슈 없이 정상적으로 출력된것을 볼 수 있다.

  -  lock unlock을 사용하기 보다는 withLock 사용하는 것이 안전하다.

      => lock, unlock, withLock 모두 suspend 키워드가 붙은 일시중단 함수 이다.

  -  ReetrantLock, Atomic 과 같이 동시성 제어하기 위해 제공하는 기존 객체들을 사용할때 일시 중단 함수를 제공하지 않아 주의해야한다. 

      => 일시 중단 함수가 아닌 경우 스레드 블록킹이 발생하여 락이 해제될때까지 해당 스레드는 사용할 수 없다.

 

라. CorotineStart 의 옵션들

 a. CoroutineStart.DEFAULT

  -  launch의 start 인자로 값을 전달하지 않을때 설정되어있는 기본 옵션이 DFEAULT 이다

  -  코루틴 빌더 함수를 호출한 즉시 생성된 코루틴의 실행을 CoroutineDispatcher 객체에 예약하며 호출한 코루틴은 계속해서 실행된다.

 

 b. CoroutineStart.ATOMIC

  -  아직 스레드가 할당되지 않은 코루틴은 실행대기 상태로 존재하는데 이때 취소가 발생하면 실행되지 않고 종료된다.

      => ATOMIC은 이 실행대기상태에서는 취소하지 못하도록 취소방지를 위해 사용하는 옵션이다.

 

 c. CoroutineStart.UNDISPATCHED

import kotlinx.coroutines.*
import java.util.concurrent.Executors


fun main(): Unit = runBlocking {

    launch(context = Dispatchers.IO) {
        println("${Thread.currentThread().name} IO 시작")
        val dispatcher = Executors.newFixedThreadPool(2).asCoroutineDispatcher()
        val job = launch(start = CoroutineStart.UNDISPATCHED, context = dispatcher) {
            println("${Thread.currentThread().name} Unconfined 시작")
            delay(1000L)
            println("${Thread.currentThread().name} Unconfined 종료")
        }
        job.join()
        dispatcher.close()
    }
}

// === 결과 ===
// DefaultDispatcher-worker-1 @coroutine#2 IO 시작
// DefaultDispatcher-worker-1 @coroutine#3 Unconfined 시작
// pool-1-thread-1 @coroutine#3 Unconfined 종료

  -  최초 실행시 CoroutineDispatcher의 작업대기열을 거치지 않고 호출한 스레드에서 즉시 실행된다.

      =>  주의사항은 일시중단이 발생하면 다른 것과 동일하게 CoroutineDispatcher 작업대기열에 포함되어 실행된다. (실행스레드 변경될 수 있음)

 

 d. 번외 : 무제한 디스패처 (Dispathers.Unconfined)

  -  코루틴을 자신을 실행시킨 스레드에서 즉시 실행되도록 만드는 디스패처로 실행스레드가 제한되지 않으므로 무제한 디스패처라고 부른다.

      => 무제한(Unconfined)이라고 하니까 마치 무한정 만들수 있을것 같은 느낌이 드는데 이름을 잘못 지은 건가 번역이 아쉬운건지는 모르겠다..

import kotlinx.coroutines.*


fun main(): Unit = runBlocking {
    launch(context = Dispatchers.IO) {
        println("${Thread.currentThread().name} IO 시작")
        val job = launch(context = Dispatchers.Unconfined) {
            println("${Thread.currentThread().name} Unconfined 시작")
            delay(1000L)
            println("${Thread.currentThread().name} Unconfined 종료")
        }
         job.join()
        println("${Thread.currentThread().name} IO 종료")
    }
}

// === 결과 ===
// DefaultDispatcher-worker-1 @coroutine#2 IO 시작
// DefaultDispatcher-worker-1 @coroutine#3 Unconfined 시작
// kotlinx.coroutines.DefaultExecutor @coroutine#3 Unconfined 종료
// DefaultDispatcher-worker-1 @coroutine#2 IO 종료

  -  특징은 코루틴이 자신을 생성한 스레드에서 즉시 실행되며 일시중단 이후에 다시 시작될때는 다시 실행시키는 스레드에서 실행하는 것이다.

      => 일시 중단 이후에는 어떤 스레드가 코루틴을 다시 시작시키는지 예측하기 어렵기 때문에 비동기 작업이 불안정해져서 권장되지 않는다.

 -  위의 코드는 DefaultExecutor가 Unconfined 코루틴을 재수행했는데 이 DefaultExecutor 는 delay를 다시 시작시키는 스레드이다.

 

마.  코루틴의 동작 방식

 a. 일시중단한 정보는 어디에..?

  -  코루틴이 일시중단하고 다시 시작되려면 실행정보가 저장되어 전달되어야하는데 CPS라는 방식을 사용하고 있다

      => Continuation Passing Style로 단순하게 이어서 실행해야하는 작업인 Continuation을 전달하는 방식을 의미한다. 

 

 b. Continutation 

  -  코루틴에서는 이어서 실행해야 하는 작업 전달을 위해 Continuation 객체를 제공한다. 코루틴 라이브러리에서는 캡슐화해서 사용하고 있다.

import kotlinx.coroutines.*

fun main(): Unit = runBlocking {
    println("시작")
    suspendCancellableCoroutine<Unit> { continuation ->
        println("일시중단")
        // continuation.resume(Unit)
    }
    println("종료")
}

  -  suspendCancellableCoroutine 함수가 호출되면 코루틴은 일시중단되며 이 정보가 continuation에 저장되어 수신객체로 전달된다.

  -  위의 코드는 종료를 출력못하는데 이유는 Continuation 객체에 대해서 재개가 호출되지않아 runBlocking 코루틴이 다시 시작되지 못했기 때문이다.

  - delay 함수도 쭉 따라가다보면 Contination의 resume을 사용하는것을 볼 수 있다.

반응형

블로그의 정보

57개월 BackEnd

BFine

활동하기