You will be fine

<Coroutine> 8. 채널(Channel), 셀렉트(Select)

by BFine
반응형

출처&참고 : https://m.yes24.com/Goods/Detail/123034354

 

코틀린 코루틴 - 예스24

코틀린 전문 강사가 알려 주는 코틀린 코루틴에 대한 모든 것!코틀린 코루틴은 효율적이고 신뢰할 수 있는 멀티스레드 프로그램을 쉽게 구현할 수 있게 해 주어 자바 가상 머신(JVM), 특히 안드로

m.yes24.com

 

가.  채널(Channel)

 a. 코루틴간의 통신을 하려면..?

  -  코드를 작성하다보면 어떤 독립적인 주체들간에 통신이 필요할때가 있다. 코루틴도 서로간의 통신이 필요한 경우가 있을텐데 이때 채널 API를 사용한다.

     => 즉 채널이란 서로 다른 코루틴 간의 통신을 위한 기본적인 방법이다.

 

 b. 공공책장

  -  책에서는 공공책장으로 비유하고 있는데 책을 공공책장에 두면 필요한 사람이 가져가는 느낌으로 보인다.

  -  채널의 특징은 송신자와 수진자의 수에 제한이 없고 채널에 전송된 모든 값은 단 한번만 받을 수 있다.

 

 c. 인터페이스

  -  Channel은 두개의 서로 다른 인터페이스(SendChannel, RecevieChannel) 를 구현한 하나의 인터페이스이다

  -  send와 receive 모두 suspend 키워드가 있는것을 볼 수 있는데 둘다 일시중단 함수라는 특징을 가지고 있다.

      => send는 채널의 용량이 다 찼을때, receive는 채널에 원소가 없을때 원소 들어올때까지 일시중단

  -  여기서 일시중단 처리하고 싶지 않다면 trySend, tryReceive를 사용하면 되는데 이는 실행정보들 가진 ChannelResult를 즉시반환한다. 

 

 d. 예제

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch

suspend fun main():Unit = coroutineScope {
    val channel = Channel<String>()
    launch {
        val words = listOf("hello", "world", "!!")
        repeat(3) {
            delay(1000)
            println("[송신]")
            channel.send(words[it])
        }
    }

    launch {
        repeat(3) {
            println("[수신] "+ channel.receive())
        }
    }
}

[송신]
[수신] hello
[송신]
[수신] world
[송신]
[수신] !!

  -  일반적인 구조는 채널의 양끝에 각가 하나의 코루틴만 있는 경우이다.

  -  위의 예제 코드는 불완전한데 그 이유는 먼저 수신자는 얼마나 많은 원소를 보내는지 알아야하는 실제론 알기가 어렵다.

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch

suspend fun main():Unit = coroutineScope {
    val channel = Channel<String>()
    launch {
        val words = listOf("hello", "world", "!!")
        repeat(3) {
            delay(1000)
            println("[송신]")
            channel.send(words[it])
        }
        channel.close() // 없으면 프로세스가 종료되지않는다.
    }

    launch {
        for (element in channel) {
            println("[수신1] $element")
        }
//        channel.consumeEach {
//            println("[수신2] $it")
//        }
    }
}
[송신]
[수신1] hello
[송신]
[수신1] world
[송신]
[수신1] !!

Process finished with exit code 130 

  -  위 예시에서 볼 수 있듯이 채널이 닫힐 때까지 원소를 받기 위해서 for 루프나 consumeEach 함수를 사용하는 방법이 있다. 

  -  주석으로 표기해두었지만 채널을 닫는 것을 놓칠수 있는데 특히 예외로 인해 코루틴이 중단되면 수신자 코루틴은 영원히 기다려야 한다.

import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch

suspend fun main():Unit = coroutineScope {
    val channel = produce {
        val words = listOf("hello", "world", "!!")
        repeat(3) {
            delay(1000)
            println("[송신]")
            send(words[it])
        }
    }

    launch {
        for (element in channel) {
            println("[수신] $element")
        }
    }
}

[송신]
[수신] hello
[송신]
[수신] world
[송신]
[수신] !!

Process finished with exit code 0

  -  ReceiveChannel을 반환하는 코루틴 빌더인  produce 함수를 사용하면 위의 누락 문제를 방지할 수 있다.

     => produce는 코루틴이 어떻게 종료되는 채널을 닫는다 즉 close를 반드시 호출한다.

 

 e. 채널 타입

  -  설정한 용량에 따라 채널을 4가지로 구분할 수 있다.  

      1. 무제한(Channel.UNLIMITED) : 제한이 없는 용량 버퍼로 send가 중단 되지않은다.

      2. 버퍼(Channel.BUFFERED) :  특정 버퍼의 크기를 설정할 수 있다. (기본값은 64)

      3. 랑데뷰(Channel.RENDEZVOUS, default) : 송신자와 수신자가 만날때만 원소를 교환한다. (책장이 아닌 책을 교환, 용량이 0)

      4. 융합(Channel.CONFLATED) : 버퍼 크기가 1인 채널로 새로운 원소가 이전 원소를 대체 한다.

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch

suspend fun main():Unit = coroutineScope {
    val channel = produce(capacity = Channel.RENDEZVOUS) {
        val words = listOf("hello", "world", "!!")
        repeat(3) {
            send(words[it])
            println("[송신]")
        }
    }
    launch {
        for (element in channel) {
            delay(1000L)
            println("[수신] $element")
        }
    }
    
}
[송신]
[수신] hello
[송신]
[수신] world
[송신]
[수신] !!
suspend fun main():Unit = coroutineScope {
    val channel = produce(capacity = Channel.CONFLATED) {
        val words = listOf("hello", "world", "!!")
        repeat(3) {
            send(words[it])
            println("[송신]")
        }
    }
    launch {
        for (element in channel) {
            delay(1000L)
            println("[수신] $element")
        }
    }

}

[송신]
[송신]
[송신]
[수신] !!

 

 f.  기타 설정들

import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch

suspend fun main():Unit = coroutineScope {
    val channel = Channel<String>(capacity = 1, onBufferOverflow = BufferOverflow.DROP_LATEST)

    launch {
        val words = listOf("hello", "world", "!!")
        repeat(3) {
            channel.send(words[it])
            println("[송신]")
        }
        channel.close()
    }

    launch {
        for (element in channel) {
            delay(1000L)
            println("[수신] $element")
        }
    }

}

[송신]
[송신]
[송신]
[수신] hello

Process finished with exit code 0

  -  채널의 버퍼가 가득 찼을때의 행동을 정의할 수 있는데 SUSPEND(default, DROP_OLDEST, DROP_LATEST 설정을 줄 수 있다.

     => produce 에서 설정할수 없어서 오버플로 옵셥을 변경하려면 Channel 을 사용해야 한다.

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch

suspend fun main():Unit = coroutineScope {
    val channel = Channel<String>(onUndeliveredElement = { println("보내지못함 : $it")} )

    launch {
        val words = listOf("hello", "world", "!!")
        repeat(3) {
            channel.send(words[it])
            println("[송신]")
            delay(100L)
        }
    }

    launch {
        delay(150L)
        for (element in channel) {
            println("[수신] $element")
            delay(200L)
        }
    }

    delay(300L)
    channel.close()
}

[송신]
[수신] hello
[수신] world
[송신]
보내지못함 : !!
Exception in thread "main" kotlinx.coroutines.channels.ClosedSendChannelException: Channel was closed

 -  onUndeliveredElement 설정은 원소가 어떠한 이유로 처리되지 않을 때 호출된다. 대부분 채널이 닫히거나 취소되었음을 의미한다.

 

 g. 팬아웃(fan-out), 팬(fan-in)인 그리고 파이프라인 

import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch

suspend fun main():Unit = coroutineScope {
    val channel = produce {
        val words = listOf("hello", "world", "!!","good", "day", "!!!")
        repeat(6) {
            send(words[it])
        }
    }

    repeat(3){
        launch {
            for (element in channel) {
                println("[수신 #$it] $element")
            }
        }
    }

}

[수신 #1] world
[수신 #0] !!
[수신 #2] hello
[수신 #1] good
[수신 #2] !!!
[수신 #0] day

  -  하나의 채널로부터 여러 개의 코루틴이 데이터를 전달 받을수도 있다. 결과의 순서는 다르지만 데이터의 수는 공평하게 배분된 것을 볼수 있다.

  -  여기서 consumeEach 는 여러개의 코루틴이 사용하기에는 안전하지 않기 때문에 주의해야 한다.

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch

suspend fun main():Unit = coroutineScope {
    val channel = Channel<String>();
    val words1 = listOf("hello", "world", "!!")
    val words2 = listOf("good", "day", "!!!")

    launch {
        words1.forEach {
            channel.send(it)
        }
    }

    launch {
        words2.forEach {
            channel.send(it)
        }
    }

    launch {
        for (element in channel) {
            println("[수신] $element")
        }
    }

    delay(1000L)
    channel.close()
}
[수신] hello
[수신] good
[수신] world
[수신] !!
[수신] day
[수신] !!!

Process finished with exit code 0

  -  하나의 채널로 여러 개의 코루틴이 데이터를 전달할 수 있다. 

import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch

suspend fun main():Unit = coroutineScope {

    val channel1 = produce {
        val words1 = listOf("hello", "world", "!!")
        words1.forEach {
            channel.send(it)
        }
    }

    val channel2 = produce {
        val words2 = listOf("good", "day", "!!!")
        val list = mutableListOf<String>();
        for (element in channel1) {
            list.add(element)
        }
        list.addAll(words2)
        list.forEach {
            send(it)
        }
    }

    launch {
        for (element in channel2) {
            println("[수신] $element")
        }
    }

}

[수신] hello
[수신] world
[수신] !!
[수신] good
[수신] day
[수신] !!!

  -  한 채널로부터 받은 원소를 다른 채널로 전송하는 경우가 있는데 이를 파이프라인이라고 한다.

 

 h.  어디에 활용할 수 있을까..? 

  -  채널은 서로 다른 코루틴이 통신할 때 유용한데 충돌이 발생하지 않으며 공평함을 보장한다.

import com.example.models.Hotel
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlin.random.Random

suspend fun main():Unit = coroutineScope {
    val hotelIds = listOf(1L,2L,3L)
    val channel1 = serveRequest(hotelIds = hotelIds, agencyName = "에어비엔비")
    val channel2 = serveRequest(hotelIds = hotelIds, agencyName = "호텔스컴바인")
    val channel3 = serveRequest(hotelIds = hotelIds, agencyName = "아고다")

    listOf(channel1, channel2, channel3).forEach {
        launch {
            for (element in it) {
                println("$element")
            }
        }
    }
}

suspend fun CoroutineScope.serveRequest(
    hotelIds: List<Long>,
    agencyName: String
) = produce {
    hotelIds.forEach {
        val price = hotelPriceApi(agencyName, it)
        send(
            Hotel(id = it, price = price, agencyName= agencyName)
        )
    }
}

suspend fun hotelPriceApi(agencyName: String, hotelId: Long): Long {
    delay(1000L)
    println("# API request : $agencyName-$hotelId ")
    return (Random.nextInt(1000, 10000).floorDiv(10)*10 * hotelId)
}

# API request : 에어비엔비-1
# API request : 호텔스컴바인-1
# API request : 아고다-1
Hotel(id=1, price=3840, agencyName=아고다)
Hotel(id=1, price=1280, agencyName=에어비엔비)
Hotel(id=1, price=3530, agencyName=호텔스컴바인)
# API request : 호텔스컴바인-2
# API request : 에어비엔비-2
Hotel(id=2, price=4460, agencyName=호텔스컴바인)
# API request : 아고다-2
Hotel(id=2, price=11300, agencyName=아고다)
Hotel(id=2, price=11020, agencyName=에어비엔비)
# API request : 에어비엔비-3
# API request : 호텔스컴바인-3
# API request : 아고다-3
Hotel(id=3, price=7560, agencyName=호텔스컴바인)
Hotel(id=3, price=4020, agencyName=에어비엔비)
Hotel(id=3, price=29070, agencyName=아고다)

  -  여러 agency API로부터 ID에 해당하는 호텔 가격 받는 예제를 만들어보았다 이때 각각은 독립적인 작업이어서 독립적인 채널로 보낼수 있었다. 

 

나.  셀렉트(Select) 

 a. 무엇인가?

  -  select는 가장 먼저 완료되는 코루틴의 결과값을 기다릴때나 여러개의 채널 중 전송 or 수신 가능한 채널을 선택할때 유용하다.

      => 예를 들어 여러 개의 채널 중 버퍼에 남은 공간이 있는 채널을 선택 등

  -  실제로 사용되는 경우는 적어서 책에서 빨리 넘어가려하고 있다고 적혀있다..

 

 b.  예시

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.selects.select

suspend fun main():Unit = coroutineScope {
    val deferred1 = async { hotelPriceApi("에어비엔비1") }
    val deferred2 = async { hotelPriceApi("에어비엔비2") }

    val result = select<Long> {
        deferred1.onAwait { it }
        deferred2.onAwait { it }
    }.also {
        coroutineContext.cancelChildren()
    }
    println(result)
}

suspend fun hotelPriceApi(agencyName: String): Long {
    when (agencyName) {
        "에어비엔비1" -> {
            delay(300L)
            return 1000L
        }
        "에어비엔비2" -> {
            delay(200L)
            return 2000L
        }
    }
    return 0L
}

2000

Process finished with exit code 0

  -  둘중에 더 빠르게 응답된 값이 반환 되는 것을 볼수 있다. 그리고 select가 값을 생성하고 나서 명시적으로 스코프를 취소할때 also를 활용하면 된다.

 

 c. 채널에서 사용하기

import com.example.models.Hotel
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.selects.select
import kotlin.random.Random

suspend fun main():Unit = coroutineScope {
    val hotelIds = listOf(1L, 2L, 3L)
    val channel1 = serveRequest(hotelIds = hotelIds , agencyName = "아고다")
    val channel2 = serveRequest(hotelIds = hotelIds , agencyName = "호텔스닷컴")

    repeat(3) {
        select<Unit> {
            channel1.onReceive {
                println("$it")
            }
            channel2.onReceive {
                println("$it")
            }
        }
    }.also {
        coroutineContext.cancelChildren()
    }

}

suspend fun CoroutineScope.serveRequest(
    hotelIds: List<Long>,
    agencyName: String
) = produce {
    hotelIds.forEach {
        val price = hotelPriceApi(agencyName, it)
        send(
            Hotel(id = it, price = price, agencyName= agencyName)
        )
    }
}

suspend fun hotelPriceApi(agencyName: String, hotelId: Long): Long {
    println("# API request : $agencyName-$hotelId ")
    delay(agencyName.length * 100L)
    return (Random.nextInt(1000, 10000).floorDiv(10)*10 * hotelId)
}

# API request : 아고다-1
# API request : 호텔스닷컴-1
Hotel(id=1, price=8600, agencyName=아고다)
# API request : 아고다-2
Hotel(id=1, price=3650, agencyName=호텔스닷컴)
# API request : 호텔스닷컴-2
Hotel(id=2, price=2940, agencyName=아고다)
# API request : 아고다-3 

  -  onReceive는 채널이 값을 가지고 있을때 선택되며 값을 받은 뒤 람다식의 인자로 사용하며 이 람다식의 결과값을 반환 한다.

      => 추가로 onRecevieCatching 함수는 여기에 추가로 채널이 닫혔을 때도 선택된다.

  -  위의 예시를 보면 반복에 따라서 채널에 먼저 들어와 있는 값이 각각 선택되는 것을 볼 수 있다.

import com.example.models.Hotel
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.selects.select
import kotlin.random.Random

suspend fun main():Unit = coroutineScope {

    val channel1 = Channel<Hotel>(capacity = 1)
    val channel2 = Channel<Hotel>(capacity = 1)

    launch {
        (1..10).forEach {
            val price = hotelPriceApi("아고다", 1L)
            val hotel = Hotel(id = it.toLong(), price = price, agencyName= "아고다")
            select<Unit> {
                channel1.onSend(hotel){ println("[송신 #1] $hotel") }
                channel2.onSend(hotel){ println("[송신 #2] $hotel") }
            }
        }
    }

    launch {
        for (element in channel1) {
            println("[수신 #1] $element ")

        }
    }
    launch {
        for (element in channel2) {
            println("[수신 #2] $element ")
        }
    }
    delay(5000L)
    coroutineContext.cancelChildren()
}

suspend fun CoroutineScope.serveRequest(
    hotelIds: List<Long>,
    agencyName: String
) = produce {
    hotelIds.forEach {
        val price = hotelPriceApi(agencyName, it)
        send(
            Hotel(id = it, price = price, agencyName= agencyName)
        )
    }
}

suspend fun hotelPriceApi(agencyName: String, hotelId: Long): Long {
    return (Random.nextInt(1000, 10000).floorDiv(10)*10 * hotelId)
}

[송신 #1] Hotel(id=1, price=6140, agencyName=아고다)
[수신 #1] Hotel(id=1, price=6140, agencyName=아고다)
[송신 #1] Hotel(id=2, price=1750, agencyName=아고다)
[송신 #1] Hotel(id=3, price=8960, agencyName=아고다)
[수신 #1] Hotel(id=2, price=1750, agencyName=아고다)
[수신 #1] Hotel(id=3, price=8960, agencyName=아고다)
[송신 #1] Hotel(id=4, price=6010, agencyName=아고다)
[송신 #1] Hotel(id=5, price=7180, agencyName=아고다)
[수신 #1] Hotel(id=4, price=6010, agencyName=아고다)
[수신 #1] Hotel(id=5, price=7180, agencyName=아고다)
[수신 #1] Hotel(id=6, price=3620, agencyName=아고다)
[송신 #1] Hotel(id=6, price=3620, agencyName=아고다)
[송신 #1] Hotel(id=7, price=8390, agencyName=아고다)
[송신 #1] Hotel(id=8, price=1790, agencyName=아고다)
[수신 #1] Hotel(id=7, price=8390, agencyName=아고다)
[송신 #2] Hotel(id=9, price=3190, agencyName=아고다)
[수신 #1] Hotel(id=8, price=1790, agencyName=아고다)
[송신 #1] Hotel(id=10, price=7160, agencyName=아고다)
[수신 #1] Hotel(id=10, price=7160, agencyName=아고다)
[수신 #2] Hotel(id=9, price=3190, agencyName=아고다)

Process finished with exit code 0

  -  onSend 는 채널의 버퍼에 공간이 있을때 선택된다. 채널에 값을 보낸뒤 채널의 참조값으로 람다식을 수행하며 onSend가 선택되었을때 Unit을 반환한다.

반응형

블로그의 정보

57개월 BackEnd

BFine

활동하기