<Coroutine> 6. 일시 중단 함수와 코루틴 멀티스레드
by BFine참고 & 출처 : https://www.yes24.com/Product/Goods/125014350
가. 일시 중단 함수 (suspend fun)
a. 만드는 방법
- 함수이름 앞에 suspend 키워드를 붙여 생성할 수 있고 이 키워드는 함수 내에 일시 중단 지점을 포함 할 수 있는 특별한 기능을 포함시킨다.
- 주의사항은 일시 중단 함수는 코루틴처럼 생각할수 도 있지만 코루틴 내부에서 실행되는 코드에 집합일 뿐이다.
b. 특징
- 코루틴의 비동기 작업과 관련된 복잡한 코드들을 구조화하고 재사용할 수 있는 코드의 집합으로 만드는 데 사용된다.
- 일반 함수와 용도는 같은데 다른 점은 일시 중단 지점을 포함여부의 차이가 있다고 보면 된다.
c. 예시
- 위에 예시를 보면 suspend 함수인 경우에는 일반 함수에서 쓸수 없는 것을 볼 수 있다.
=> 즉 suspend 함수가 포함되면 호출부 함수도 suspend 여야하기 때문에 포함하는 모든 함수에 suspend 키워드를 붙여야 한다.
import kotlinx.coroutines.*
fun main(): Unit = runBlocking {
launch {
val car = callRentalCarApi(1)
println(car)
}
launch {
val car = callRentalCarApi(2)
println(car)
}
}
suspend fun callRentalCarApi(id: Long): String{
delay(1000L)
return if (id == 1L) {
"현대"
} else {
"기아"
}
}
- suspend 키워드를 이용하면 일시중단 기능을 포함한 함수를 서로 다른 코루틴에서 재사용이 가능하다.
d. 일시 중단 함수와 코루틴 빌더
- 위의 예시에서 볼 수 있듯이 일시 중단 함수 안에서 async, launch 같은 코루틴 빌더함수를 사용할 수 없다.
=> 이유는 코루틴 빌더 함수는 CoroutineScope의 확장함수인데 일시중단함수에서는 호출한 코루틴의 CoroutineScope에 접근할 수 없기 때문이다.
import kotlinx.coroutines.*
fun main(): Unit = runBlocking {
launch {
findRentalCar(1L)
}
}
suspend fun findRentalCar(id: Long) = coroutineScope {
val carDeferred = async {
callRentalCarApi(id)
}
val priceDeferred = async {
callRentalCarPriceApi(id)
}
val car = carDeferred.await()
val price = priceDeferred.await()
val result = Car(name = car, price = price)
println(result)
}
suspend fun callRentalCarApi(id: Long): String{
delay(1000L)
return if (id == 1L) {
"현대"
} else {
"기아"
}
}
suspend fun callRentalCarPriceApi(id: Long): Long{
delay(1000L)
return if (id == 1L) {
100_000L
} else {
50_000L
}
}
data class Car(
private val name: String,
private val price: Long
)
- coroutineScope 를 사용하면 일시중단 내부에서 새로운 CoroutineScope 객체를 생성 할 수 있다.
=> coroutineScope는 구조화를 깨지 않는 CoroutineScope 객체를 생성하고 생성객체는 block 람다식에서 수신 객체(this)로 접근 가능하다.
나. 배려심이 깊은 코루틴
a. 코루틴의 스레드 양보
- 스레드를 양보하는 주체는 코루틴이며 스레드에 코루틴을 할당해 실행시키는 주체는 CoroutineDispatcher 객체이지만 스레드 양보에는 관여하지 않는다.
- 코루틴이 스레드를 양보하려면 코루틴에서 직접 스레드 양보를 위한 함수를 호출해야한다.
b. delay 로 스레드 양보하기
- delay 함수를 호출하면 코루틴은 사용하던 스레드를 양보하고 설정된 시간동안 코루틴을 일시 중단 시킨다.
fun main(): Unit = runBlocking {
val time = measureTimeMillis {
val jobs = List(10) {
launch {
delay(1000L)
}
}
joinAll(jobs = jobs.toTypedArray())
}
println("Execution time: $time ms")
}
// === 결과 ===
// Execution time: 1014 ms
fun main(): Unit = runBlocking {
val time = measureTimeMillis {
val jobs = List(10) {
launch {
TimeUnit.SECONDS.sleep(1L)
}
}
joinAll(jobs = jobs.toTypedArray())
}
println("Execution time: $time ms")
}
// === 결과 ===
// Execution time: 10054 ms
- 스레드를 블록킹하는 sleep 함수와 delay 함수를 사용한 각각의 동일한 로직을 성능을 비교해보면 많이 차이나는 것을 볼 수 있다.
=> sleep 함수에서는 스레드를 양보하기 때문에 10개의 코루틴이 바로 생성되지만 sleep 함수의 경우 첫번째 반복이 끝날때까지 스레드를 블록킹 한다.
c. join & await 으로 스레드 양보하기
fun main(): Unit = runBlocking {
launch {
println("자식")
}
println("부모")
}
// === 결과 ===
// 부모
// 자식
- 자식이 먼저 출력되지 않는다. 이유는 runBlocking main 단일 스레드만 존재하고 코드에서 코루틴이 스레드를 양보하지 않기 때문이다.
fun main(): Unit = runBlocking {
val job = launch {
println("자식")
}
job.join()
println("부모")
}
// === 결과 ====
// 자식
// 부모
- join 이나 await 함수를 코루틴이 호출하면 호출한 코루틴을 스레드를 양보하고 일시중단 상태가 되며 대상 코루틴이 완료될때까지 다시 시작되지 않는다.
d. yield 으로 스레드 양보하기
- delay, join, await 같은 일시 중단 함수들은 스레드 양보를 직접 호출하지 않아도 작업을 위해 내부적으로 스레드 양보를 일으킨다.
- yield 함수는 스레드 양보를 직접 호출 해야하는 상황에 사용한다.
fun main(): Unit = runBlocking {
val job = launch {
repeat(9999999) {
if (this.isActive) {
println("작업중..")
}
}
}
delay(10)
job.cancel()
}
- main 스레드를 delay 함수를 이용해 자식 코루틴에게 양보 후 job.cancel 을 호출한다. 결과는 모든 반복이 끝나기전에 job은 취소되지 않는다.
=> 그 이유는 단일스레드이므로 자식코루틴에서 스레드를 양보해야 부모코루틴이 메인스레드에서 job.cancel 함수를 실행할 수 있기 때문이다.
fun main(): Unit = runBlocking {
val job = launch {
repeat(9999999) {
if (this.isActive) {
println("작업중..")
yield()
}
}
}
delay(10)
job.cancel()
}
- 이때 yield 함수를 이용하여 스레드를 양보하면 모든 반복이 실행되기전에 job이 취소되는 것을 볼 수 있다.
e. 코루틴의 실행스레드
- 실행스레드는 고정이 아니라서 일시중단 후 다시 실행 될때 CoroutineDispatcher의 작업대기열로 들어가며 이후에 코루틴을 다시 스레드에 할당한다
=> 이때 CoroutineDispatcher 는 사용할 수 있는 스레드 중에 하나에 할당하는데 이전에 실행되던 스레드와 다를 수 있다.
- 코루틴의 실행스레드가 바뀌는 시점은 다시 시작하는 시점 뿐이다.
=> 만약 스레드를 양보하는 경우가 없다면 일시중단이 발생하지 않는다는 것이므로 실행스레드가 변할 일이 없다.
다. 코루틴과 멀티스레드
a. 자바에서 가시성, 원자성
- 책에서 가시성에 대한 설명이 있어서 예전에 volatile 키워드 찾아보면서 잘 정리된 글을 본 기억이 있었는데 아래 페이지인 것 같다.
=> https://jenkov.com/tutorials/java-concurrency/volatile.html
import kotlinx.coroutines.*
var count = 0
fun main(): Unit = runBlocking {
withContext(context = Dispatchers.Default) {
repeat(10_000) {
launch {
count++
}
}
}
println(count)
}
// === 결과 ===
// 10000이 나오지 읺는다.
- 가시성 문제는 멀티코어에서 멀티스레드를 사용할 경우 CPU 캐시를 사용하는 경우 메인 메모리에 전파되는데 지연시간으로
CPU 캐시와 메인 메모리간 불일치 데이터 불일치로 각각 스레드에서 최신 데이터가 아닌 값을 사용하는 것을 의미한다.
=> volatile 키워드를 이용하면 CPU 캐시를 사용하지 않고 메인 메모리에서 가져오기 때문에 가시성 문제를 해결할 수 있다. (코틀린은 @Volatile)
- 원자성 문제는 Race Condition이 발생하여 여러 스레드에서 동일 값을 읽어 쓰기작업이 발생하는 경우 연산의 손실을 발생하는 것을 의미한다.
=> 즉 여러 스레드가 동시에 하나의 값에 접근하면서 발생하는 문제이다
b. 메모리 구조
- 하드웨어 메모리 구조는 JVM의 스택&힙영역을 구분하지 않아서 스택 영역에 데이터들은 CPU 레지스터, CPU 캐시, 메인 메모리 모두에 나타날수있다.
c. 동시 접근 제한하기
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
val mutex = Mutex()
var count = 0
fun main(): Unit = runBlocking {
withContext(context = Dispatchers.Default) {
repeat(10_000) {
launch {
mutex.lock()
count++
mutex.unlock()
}
}
}
println(count)
}
// === 결과 ===
// 10000
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
val mutex = Mutex()
var count = 0
fun main(): Unit = runBlocking {
withContext(context = Dispatchers.Default) {
repeat(10_000) {
launch {
mutex.withLock {
count++
}
}
}
}
println(count)
}
// === 결과 ===
// 10000
- 코틀린에서는 코루틴에 대한 임계영역을 만들기 위한 Mutex 객체를 제공하고 있다. 결과가 동시성이슈 없이 정상적으로 출력된것을 볼 수 있다.
- lock unlock을 사용하기 보다는 withLock 사용하는 것이 안전하다.
=> lock, unlock, withLock 모두 suspend 키워드가 붙은 일시중단 함수 이다.
- ReetrantLock, Atomic 과 같이 동시성 제어하기 위해 제공하는 기존 객체들을 사용할때 일시 중단 함수를 제공하지 않아 주의해야한다.
=> 일시 중단 함수가 아닌 경우 스레드 블록킹이 발생하여 락이 해제될때까지 해당 스레드는 사용할 수 없다.
라. CorotineStart 의 옵션들
a. CoroutineStart.DEFAULT
- launch의 start 인자로 값을 전달하지 않을때 설정되어있는 기본 옵션이 DFEAULT 이다
- 코루틴 빌더 함수를 호출한 즉시 생성된 코루틴의 실행을 CoroutineDispatcher 객체에 예약하며 호출한 코루틴은 계속해서 실행된다.
b. CoroutineStart.ATOMIC
- 아직 스레드가 할당되지 않은 코루틴은 실행대기 상태로 존재하는데 이때 취소가 발생하면 실행되지 않고 종료된다.
=> ATOMIC은 이 실행대기상태에서는 취소하지 못하도록 취소방지를 위해 사용하는 옵션이다.
c. CoroutineStart.UNDISPATCHED
import kotlinx.coroutines.*
import java.util.concurrent.Executors
fun main(): Unit = runBlocking {
launch(context = Dispatchers.IO) {
println("${Thread.currentThread().name} IO 시작")
val dispatcher = Executors.newFixedThreadPool(2).asCoroutineDispatcher()
val job = launch(start = CoroutineStart.UNDISPATCHED, context = dispatcher) {
println("${Thread.currentThread().name} Unconfined 시작")
delay(1000L)
println("${Thread.currentThread().name} Unconfined 종료")
}
job.join()
dispatcher.close()
}
}
// === 결과 ===
// DefaultDispatcher-worker-1 @coroutine#2 IO 시작
// DefaultDispatcher-worker-1 @coroutine#3 Unconfined 시작
// pool-1-thread-1 @coroutine#3 Unconfined 종료
- 최초 실행시 CoroutineDispatcher의 작업대기열을 거치지 않고 호출한 스레드에서 즉시 실행된다.
=> 주의사항은 일시중단이 발생하면 다른 것과 동일하게 CoroutineDispatcher 작업대기열에 포함되어 실행된다. (실행스레드 변경될 수 있음)
d. 번외 : 무제한 디스패처 (Dispathers.Unconfined)
- 코루틴을 자신을 실행시킨 스레드에서 즉시 실행되도록 만드는 디스패처로 실행스레드가 제한되지 않으므로 무제한 디스패처라고 부른다.
=> 무제한(Unconfined)이라고 하니까 마치 무한정 만들수 있을것 같은 느낌이 드는데 이름을 잘못 지은 건가 번역이 아쉬운건지는 모르겠다..
import kotlinx.coroutines.*
fun main(): Unit = runBlocking {
launch(context = Dispatchers.IO) {
println("${Thread.currentThread().name} IO 시작")
val job = launch(context = Dispatchers.Unconfined) {
println("${Thread.currentThread().name} Unconfined 시작")
delay(1000L)
println("${Thread.currentThread().name} Unconfined 종료")
}
job.join()
println("${Thread.currentThread().name} IO 종료")
}
}
// === 결과 ===
// DefaultDispatcher-worker-1 @coroutine#2 IO 시작
// DefaultDispatcher-worker-1 @coroutine#3 Unconfined 시작
// kotlinx.coroutines.DefaultExecutor @coroutine#3 Unconfined 종료
// DefaultDispatcher-worker-1 @coroutine#2 IO 종료
- 특징은 코루틴이 자신을 생성한 스레드에서 즉시 실행되며 일시중단 이후에 다시 시작될때는 다시 실행시키는 스레드에서 실행하는 것이다.
=> 일시 중단 이후에는 어떤 스레드가 코루틴을 다시 시작시키는지 예측하기 어렵기 때문에 비동기 작업이 불안정해져서 권장되지 않는다.
- 위의 코드는 DefaultExecutor가 Unconfined 코루틴을 재수행했는데 이 DefaultExecutor 는 delay를 다시 시작시키는 스레드이다.
마. 코루틴의 동작 방식
a. 일시중단한 정보는 어디에..?
- 코루틴이 일시중단하고 다시 시작되려면 실행정보가 저장되어 전달되어야하는데 CPS라는 방식을 사용하고 있다
=> Continuation Passing Style로 단순하게 이어서 실행해야하는 작업인 Continuation을 전달하는 방식을 의미한다.
b. Continutation
- 코루틴에서는 이어서 실행해야 하는 작업 전달을 위해 Continuation 객체를 제공한다. 코루틴 라이브러리에서는 캡슐화해서 사용하고 있다.
import kotlinx.coroutines.*
fun main(): Unit = runBlocking {
println("시작")
suspendCancellableCoroutine<Unit> { continuation ->
println("일시중단")
// continuation.resume(Unit)
}
println("종료")
}
- suspendCancellableCoroutine 함수가 호출되면 코루틴은 일시중단되며 이 정보가 continuation에 저장되어 수신객체로 전달된다.
- 위의 코드는 종료를 출력못하는데 이유는 Continuation 객체에 대해서 재개가 호출되지않아 runBlocking 코루틴이 다시 시작되지 못했기 때문이다.
- delay 함수도 쭉 따라가다보면 Contination의 resume을 사용하는것을 볼 수 있다.
'공부 > Coroutine' 카테고리의 다른 글
<Coroutine> 8. 채널(Channel), 셀렉트(Select) (0) | 2024.08.15 |
---|---|
<Coroutine> 7. 코루틴 단위 테스트 (1) | 2024.07.23 |
<Coroutine> 5. 코루틴의 예외처리 (0) | 2024.06.30 |
<Coroutine> 4. 구조화된 코루틴 (2) | 2024.06.30 |
<Coroutine> 3. 코루틴 컨텍스트(Coroutine Context) (0) | 2024.06.22 |
블로그의 정보
57개월 BackEnd
BFine