<Coroutine> 8. 채널(Channel), 셀렉트(Select)
by BFine출처&참고 : https://m.yes24.com/Goods/Detail/123034354
가. 채널(Channel)
a. 코루틴간의 통신을 하려면..?
- 코드를 작성하다보면 어떤 독립적인 주체들간에 통신이 필요할때가 있다. 코루틴도 서로간의 통신이 필요한 경우가 있을텐데 이때 채널 API를 사용한다.
=> 즉 채널이란 서로 다른 코루틴 간의 통신을 위한 기본적인 방법이다.
b. 공공책장
- 책에서는 공공책장으로 비유하고 있는데 책을 공공책장에 두면 필요한 사람이 가져가는 느낌으로 보인다.
- 채널의 특징은 송신자와 수진자의 수에 제한이 없고 채널에 전송된 모든 값은 단 한번만 받을 수 있다.
c. 인터페이스
- Channel은 두개의 서로 다른 인터페이스(SendChannel, RecevieChannel) 를 구현한 하나의 인터페이스이다
- send와 receive 모두 suspend 키워드가 있는것을 볼 수 있는데 둘다 일시중단 함수라는 특징을 가지고 있다.
=> send는 채널의 용량이 다 찼을때, receive는 채널에 원소가 없을때 원소 들어올때까지 일시중단
- 여기서 일시중단 처리하고 싶지 않다면 trySend, tryReceive를 사용하면 되는데 이는 실행정보들 가진 ChannelResult를 즉시반환한다.
d. 예제
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
suspend fun main():Unit = coroutineScope {
val channel = Channel<String>()
launch {
val words = listOf("hello", "world", "!!")
repeat(3) {
delay(1000)
println("[송신]")
channel.send(words[it])
}
}
launch {
repeat(3) {
println("[수신] "+ channel.receive())
}
}
}
[송신]
[수신] hello
[송신]
[수신] world
[송신]
[수신] !!
- 일반적인 구조는 채널의 양끝에 각가 하나의 코루틴만 있는 경우이다.
- 위의 예제 코드는 불완전한데 그 이유는 먼저 수신자는 얼마나 많은 원소를 보내는지 알아야하는 실제론 알기가 어렵다.
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
suspend fun main():Unit = coroutineScope {
val channel = Channel<String>()
launch {
val words = listOf("hello", "world", "!!")
repeat(3) {
delay(1000)
println("[송신]")
channel.send(words[it])
}
channel.close() // 없으면 프로세스가 종료되지않는다.
}
launch {
for (element in channel) {
println("[수신1] $element")
}
// channel.consumeEach {
// println("[수신2] $it")
// }
}
}
[송신]
[수신1] hello
[송신]
[수신1] world
[송신]
[수신1] !!
Process finished with exit code 130
- 위 예시에서 볼 수 있듯이 채널이 닫힐 때까지 원소를 받기 위해서 for 루프나 consumeEach 함수를 사용하는 방법이 있다.
- 주석으로 표기해두었지만 채널을 닫는 것을 놓칠수 있는데 특히 예외로 인해 코루틴이 중단되면 수신자 코루틴은 영원히 기다려야 한다.
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
suspend fun main():Unit = coroutineScope {
val channel = produce {
val words = listOf("hello", "world", "!!")
repeat(3) {
delay(1000)
println("[송신]")
send(words[it])
}
}
launch {
for (element in channel) {
println("[수신] $element")
}
}
}
[송신]
[수신] hello
[송신]
[수신] world
[송신]
[수신] !!
Process finished with exit code 0
- ReceiveChannel을 반환하는 코루틴 빌더인 produce 함수를 사용하면 위의 누락 문제를 방지할 수 있다.
=> produce는 코루틴이 어떻게 종료되는 채널을 닫는다 즉 close를 반드시 호출한다.
e. 채널 타입
- 설정한 용량에 따라 채널을 4가지로 구분할 수 있다.
1. 무제한(Channel.UNLIMITED) : 제한이 없는 용량 버퍼로 send가 중단 되지않은다.
2. 버퍼(Channel.BUFFERED) : 특정 버퍼의 크기를 설정할 수 있다. (기본값은 64)
3. 랑데뷰(Channel.RENDEZVOUS, default) : 송신자와 수신자가 만날때만 원소를 교환한다. (책장이 아닌 책을 교환, 용량이 0)
4. 융합(Channel.CONFLATED) : 버퍼 크기가 1인 채널로 새로운 원소가 이전 원소를 대체 한다.
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
suspend fun main():Unit = coroutineScope {
val channel = produce(capacity = Channel.RENDEZVOUS) {
val words = listOf("hello", "world", "!!")
repeat(3) {
send(words[it])
println("[송신]")
}
}
launch {
for (element in channel) {
delay(1000L)
println("[수신] $element")
}
}
}
[송신]
[수신] hello
[송신]
[수신] world
[송신]
[수신] !!
suspend fun main():Unit = coroutineScope {
val channel = produce(capacity = Channel.CONFLATED) {
val words = listOf("hello", "world", "!!")
repeat(3) {
send(words[it])
println("[송신]")
}
}
launch {
for (element in channel) {
delay(1000L)
println("[수신] $element")
}
}
}
[송신]
[송신]
[송신]
[수신] !!
f. 기타 설정들
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
suspend fun main():Unit = coroutineScope {
val channel = Channel<String>(capacity = 1, onBufferOverflow = BufferOverflow.DROP_LATEST)
launch {
val words = listOf("hello", "world", "!!")
repeat(3) {
channel.send(words[it])
println("[송신]")
}
channel.close()
}
launch {
for (element in channel) {
delay(1000L)
println("[수신] $element")
}
}
}
[송신]
[송신]
[송신]
[수신] hello
Process finished with exit code 0
- 채널의 버퍼가 가득 찼을때의 행동을 정의할 수 있는데 SUSPEND(default, DROP_OLDEST, DROP_LATEST 설정을 줄 수 있다.
=> produce 에서 설정할수 없어서 오버플로 옵셥을 변경하려면 Channel 을 사용해야 한다.
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
suspend fun main():Unit = coroutineScope {
val channel = Channel<String>(onUndeliveredElement = { println("보내지못함 : $it")} )
launch {
val words = listOf("hello", "world", "!!")
repeat(3) {
channel.send(words[it])
println("[송신]")
delay(100L)
}
}
launch {
delay(150L)
for (element in channel) {
println("[수신] $element")
delay(200L)
}
}
delay(300L)
channel.close()
}
[송신]
[수신] hello
[수신] world
[송신]
보내지못함 : !!
Exception in thread "main" kotlinx.coroutines.channels.ClosedSendChannelException: Channel was closed
- onUndeliveredElement 설정은 원소가 어떠한 이유로 처리되지 않을 때 호출된다. 대부분 채널이 닫히거나 취소되었음을 의미한다.
g. 팬아웃(fan-out), 팬(fan-in)인 그리고 파이프라인
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
suspend fun main():Unit = coroutineScope {
val channel = produce {
val words = listOf("hello", "world", "!!","good", "day", "!!!")
repeat(6) {
send(words[it])
}
}
repeat(3){
launch {
for (element in channel) {
println("[수신 #$it] $element")
}
}
}
}
[수신 #1] world
[수신 #0] !!
[수신 #2] hello
[수신 #1] good
[수신 #2] !!!
[수신 #0] day
- 하나의 채널로부터 여러 개의 코루틴이 데이터를 전달 받을수도 있다. 결과의 순서는 다르지만 데이터의 수는 공평하게 배분된 것을 볼수 있다.
- 여기서 consumeEach 는 여러개의 코루틴이 사용하기에는 안전하지 않기 때문에 주의해야 한다.
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
suspend fun main():Unit = coroutineScope {
val channel = Channel<String>();
val words1 = listOf("hello", "world", "!!")
val words2 = listOf("good", "day", "!!!")
launch {
words1.forEach {
channel.send(it)
}
}
launch {
words2.forEach {
channel.send(it)
}
}
launch {
for (element in channel) {
println("[수신] $element")
}
}
delay(1000L)
channel.close()
}
[수신] hello
[수신] good
[수신] world
[수신] !!
[수신] day
[수신] !!!
Process finished with exit code 0
- 하나의 채널로 여러 개의 코루틴이 데이터를 전달할 수 있다.
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
suspend fun main():Unit = coroutineScope {
val channel1 = produce {
val words1 = listOf("hello", "world", "!!")
words1.forEach {
channel.send(it)
}
}
val channel2 = produce {
val words2 = listOf("good", "day", "!!!")
val list = mutableListOf<String>();
for (element in channel1) {
list.add(element)
}
list.addAll(words2)
list.forEach {
send(it)
}
}
launch {
for (element in channel2) {
println("[수신] $element")
}
}
}
[수신] hello
[수신] world
[수신] !!
[수신] good
[수신] day
[수신] !!!
- 한 채널로부터 받은 원소를 다른 채널로 전송하는 경우가 있는데 이를 파이프라인이라고 한다.
h. 어디에 활용할 수 있을까..?
- 채널은 서로 다른 코루틴이 통신할 때 유용한데 충돌이 발생하지 않으며 공평함을 보장한다.
import com.example.models.Hotel
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlin.random.Random
suspend fun main():Unit = coroutineScope {
val hotelIds = listOf(1L,2L,3L)
val channel1 = serveRequest(hotelIds = hotelIds, agencyName = "에어비엔비")
val channel2 = serveRequest(hotelIds = hotelIds, agencyName = "호텔스컴바인")
val channel3 = serveRequest(hotelIds = hotelIds, agencyName = "아고다")
listOf(channel1, channel2, channel3).forEach {
launch {
for (element in it) {
println("$element")
}
}
}
}
suspend fun CoroutineScope.serveRequest(
hotelIds: List<Long>,
agencyName: String
) = produce {
hotelIds.forEach {
val price = hotelPriceApi(agencyName, it)
send(
Hotel(id = it, price = price, agencyName= agencyName)
)
}
}
suspend fun hotelPriceApi(agencyName: String, hotelId: Long): Long {
delay(1000L)
println("# API request : $agencyName-$hotelId ")
return (Random.nextInt(1000, 10000).floorDiv(10)*10 * hotelId)
}
# API request : 에어비엔비-1
# API request : 호텔스컴바인-1
# API request : 아고다-1
Hotel(id=1, price=3840, agencyName=아고다)
Hotel(id=1, price=1280, agencyName=에어비엔비)
Hotel(id=1, price=3530, agencyName=호텔스컴바인)
# API request : 호텔스컴바인-2
# API request : 에어비엔비-2
Hotel(id=2, price=4460, agencyName=호텔스컴바인)
# API request : 아고다-2
Hotel(id=2, price=11300, agencyName=아고다)
Hotel(id=2, price=11020, agencyName=에어비엔비)
# API request : 에어비엔비-3
# API request : 호텔스컴바인-3
# API request : 아고다-3
Hotel(id=3, price=7560, agencyName=호텔스컴바인)
Hotel(id=3, price=4020, agencyName=에어비엔비)
Hotel(id=3, price=29070, agencyName=아고다)
- 여러 agency API로부터 ID에 해당하는 호텔 가격 받는 예제를 만들어보았다 이때 각각은 독립적인 작업이어서 독립적인 채널로 보낼수 있었다.
나. 셀렉트(Select)
a. 무엇인가?
- select는 가장 먼저 완료되는 코루틴의 결과값을 기다릴때나 여러개의 채널 중 전송 or 수신 가능한 채널을 선택할때 유용하다.
=> 예를 들어 여러 개의 채널 중 버퍼에 남은 공간이 있는 채널을 선택 등
- 실제로 사용되는 경우는 적어서 책에서 빨리 넘어가려하고 있다고 적혀있다..
b. 예시
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.selects.select
suspend fun main():Unit = coroutineScope {
val deferred1 = async { hotelPriceApi("에어비엔비1") }
val deferred2 = async { hotelPriceApi("에어비엔비2") }
val result = select<Long> {
deferred1.onAwait { it }
deferred2.onAwait { it }
}.also {
coroutineContext.cancelChildren()
}
println(result)
}
suspend fun hotelPriceApi(agencyName: String): Long {
when (agencyName) {
"에어비엔비1" -> {
delay(300L)
return 1000L
}
"에어비엔비2" -> {
delay(200L)
return 2000L
}
}
return 0L
}
2000
Process finished with exit code 0
- 둘중에 더 빠르게 응답된 값이 반환 되는 것을 볼수 있다. 그리고 select가 값을 생성하고 나서 명시적으로 스코프를 취소할때 also를 활용하면 된다.
c. 채널에서 사용하기
import com.example.models.Hotel
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.selects.select
import kotlin.random.Random
suspend fun main():Unit = coroutineScope {
val hotelIds = listOf(1L, 2L, 3L)
val channel1 = serveRequest(hotelIds = hotelIds , agencyName = "아고다")
val channel2 = serveRequest(hotelIds = hotelIds , agencyName = "호텔스닷컴")
repeat(3) {
select<Unit> {
channel1.onReceive {
println("$it")
}
channel2.onReceive {
println("$it")
}
}
}.also {
coroutineContext.cancelChildren()
}
}
suspend fun CoroutineScope.serveRequest(
hotelIds: List<Long>,
agencyName: String
) = produce {
hotelIds.forEach {
val price = hotelPriceApi(agencyName, it)
send(
Hotel(id = it, price = price, agencyName= agencyName)
)
}
}
suspend fun hotelPriceApi(agencyName: String, hotelId: Long): Long {
println("# API request : $agencyName-$hotelId ")
delay(agencyName.length * 100L)
return (Random.nextInt(1000, 10000).floorDiv(10)*10 * hotelId)
}
# API request : 아고다-1
# API request : 호텔스닷컴-1
Hotel(id=1, price=8600, agencyName=아고다)
# API request : 아고다-2
Hotel(id=1, price=3650, agencyName=호텔스닷컴)
# API request : 호텔스닷컴-2
Hotel(id=2, price=2940, agencyName=아고다)
# API request : 아고다-3
- onReceive는 채널이 값을 가지고 있을때 선택되며 값을 받은 뒤 람다식의 인자로 사용하며 이 람다식의 결과값을 반환 한다.
=> 추가로 onRecevieCatching 함수는 여기에 추가로 채널이 닫혔을 때도 선택된다.
- 위의 예시를 보면 반복에 따라서 채널에 먼저 들어와 있는 값이 각각 선택되는 것을 볼 수 있다.
import com.example.models.Hotel
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.selects.select
import kotlin.random.Random
suspend fun main():Unit = coroutineScope {
val channel1 = Channel<Hotel>(capacity = 1)
val channel2 = Channel<Hotel>(capacity = 1)
launch {
(1..10).forEach {
val price = hotelPriceApi("아고다", 1L)
val hotel = Hotel(id = it.toLong(), price = price, agencyName= "아고다")
select<Unit> {
channel1.onSend(hotel){ println("[송신 #1] $hotel") }
channel2.onSend(hotel){ println("[송신 #2] $hotel") }
}
}
}
launch {
for (element in channel1) {
println("[수신 #1] $element ")
}
}
launch {
for (element in channel2) {
println("[수신 #2] $element ")
}
}
delay(5000L)
coroutineContext.cancelChildren()
}
suspend fun CoroutineScope.serveRequest(
hotelIds: List<Long>,
agencyName: String
) = produce {
hotelIds.forEach {
val price = hotelPriceApi(agencyName, it)
send(
Hotel(id = it, price = price, agencyName= agencyName)
)
}
}
suspend fun hotelPriceApi(agencyName: String, hotelId: Long): Long {
return (Random.nextInt(1000, 10000).floorDiv(10)*10 * hotelId)
}
[송신 #1] Hotel(id=1, price=6140, agencyName=아고다)
[수신 #1] Hotel(id=1, price=6140, agencyName=아고다)
[송신 #1] Hotel(id=2, price=1750, agencyName=아고다)
[송신 #1] Hotel(id=3, price=8960, agencyName=아고다)
[수신 #1] Hotel(id=2, price=1750, agencyName=아고다)
[수신 #1] Hotel(id=3, price=8960, agencyName=아고다)
[송신 #1] Hotel(id=4, price=6010, agencyName=아고다)
[송신 #1] Hotel(id=5, price=7180, agencyName=아고다)
[수신 #1] Hotel(id=4, price=6010, agencyName=아고다)
[수신 #1] Hotel(id=5, price=7180, agencyName=아고다)
[수신 #1] Hotel(id=6, price=3620, agencyName=아고다)
[송신 #1] Hotel(id=6, price=3620, agencyName=아고다)
[송신 #1] Hotel(id=7, price=8390, agencyName=아고다)
[송신 #1] Hotel(id=8, price=1790, agencyName=아고다)
[수신 #1] Hotel(id=7, price=8390, agencyName=아고다)
[송신 #2] Hotel(id=9, price=3190, agencyName=아고다)
[수신 #1] Hotel(id=8, price=1790, agencyName=아고다)
[송신 #1] Hotel(id=10, price=7160, agencyName=아고다)
[수신 #1] Hotel(id=10, price=7160, agencyName=아고다)
[수신 #2] Hotel(id=9, price=3190, agencyName=아고다)
Process finished with exit code 0
- onSend 는 채널의 버퍼에 공간이 있을때 선택된다. 채널에 값을 보낸뒤 채널의 참조값으로 람다식을 수행하며 onSend가 선택되었을때 Unit을 반환한다.
'공부 > Coroutine' 카테고리의 다른 글
<Coroutine> 10. 코루틴 Flow(플로우) 살펴보기 (2) | 2024.09.01 |
---|---|
<Coroutine> 9. Hot & Cold Data Source (0) | 2024.08.24 |
<Coroutine> 7. 코루틴 단위 테스트 (1) | 2024.07.23 |
<Coroutine> 6. 일시 중단 함수와 코루틴 멀티스레드 (0) | 2024.07.07 |
<Coroutine> 5. 코루틴의 예외처리 (0) | 2024.06.30 |
블로그의 정보
57개월 BackEnd
BFine