Kotlin coroutine

Basic Practice

First Example

1
2
3
4
5
6
7
8
9
10
11
12
import kotlinx.coroutines.*

fun main() {
GlobalScope.launch { // launch a new coroutine in background and continue
delay(1000L)
println("World!")
}
println("Hello,") // main thread continues here immediately
runBlocking { // but this expression blocks the main thread
delay(2000L) // ... while we delay for 2 seconds to keep JVM alive
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import kotlinx.coroutines.*

fun main() = runBlocking<Unit> { // start main coroutine
GlobalScope.launch { // launch a new coroutine in background and continue
delay(1000L)
println("World!")
}
println("Hello,") // main coroutine continues here immediately
delay(2000L) // delaying for 2 seconds to keep JVM alive
}

// or

@Test
fun test() = runBlocking {
GlobalScope.launch {
// launch a new coroutine in background and continue
delay(1000L)
println("World!")
}
println("Hello,") // main coroutine continues here immediately
delay(2000L) // delaying for 2 seconds to keep JVM alive
}

Structured concurrency

1
2
3
4
5
6
7
8
9
import kotlinx.coroutines.*

fun main() = runBlocking { // this: CoroutineScope
launch { // launch a new coroutine in the scope of runBlocking
delay(1000L)
println("World!")
}
println("Hello,")
}

Scope builder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import kotlinx.coroutines.*

fun main() = runBlocking { // this: CoroutineScope
launch {
delay(200L)
println("Task from runBlocking")
}

coroutineScope { // Creates a coroutine scope
launch {
delay(500L)
println("Task from nested launch")
}

delay(100L)
println("Task from coroutine scope") // This line will be printed before the nested launch
}

println("Coroutine scope is over") // This line is not printed until the nested launch completes
}

suspending function

1
2
3
4
5
6
7
8
9
10
11
12
import kotlinx.coroutines.*

fun main() = runBlocking {
launch { doWorld() }
println("Hello,")
}

// this is your first suspending function
suspend fun doWorld() {
delay(1000L)
println("World!")
}

repeat

1
2
3
4
5
6
7
8
9
10
import kotlinx.coroutines.*

fun main() = runBlocking {
repeat(100_000) { // launch a lot of coroutines
launch {
delay(1000L)
print(".")
}
}
}

It launches 100K coroutines and, after a second, each coroutine prints a dot. Now, try that with threads. What would happen? (Most likely your code will produce some sort of out-of-memory error)


Cancellation

Cancelling coroutine execution

1
2
3
4
5
6
7
8
9
10
11
val job = launch {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancel() // cancels the job
job.join() // waits for job's completion
println("main: Now I can quit.")

withTimeout

1
2
3
4
5
6
withTimeout(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}

withTimeoutOrNull

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Test
fun test() = runBlocking {
val result = withTimeoutOrNull(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
"Done" // will get cancelled before it produces this result
}
println("Result is $result")
}

// I'm sleeping 0 ...
// I'm sleeping 1 ...
// I'm sleeping 2 ...
// Result is null

Channels

Channels provide a way to transfer a stream of values.

A Channel is conceptually very similar to BlockingQueue. One key difference is that instead of a blocking put operation it has a suspending send, and instead of a blocking take operation it has a suspending receive.

send number by channels

1
2
3
4
5
6
7
8
9
10
11
12
@Test
fun test() = runBlocking {
val channel = Channel<Int>()
launch {
// this might be heavy CPU-consuming computation or async logic, we'll just send five squares
for (x in 1..5) channel.send(x * x)
}

// here we print five received integers:
repeat(5) { println(channel.receive()) }
println("Done!")
}

Closing and iteration over channels

1
2
3
4
5
6
7
8
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // we're done sending
}
// here we print received values using `for` loop (until the channel is closed)
for (y in channel) println(y)
println("Done!")

channel producers

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package kotlinx.coroutines.guide.channel03

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
for (x in 1..5) send(x * x)
}

fun main() = runBlocking {
//sampleStart
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
//sampleEnd
}

Fan-out

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1 // start from 1
while (true) {
send(x++) // produce next
delay(100) // wait 0.1s
}
}

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}

@Test
fun test() = runBlocking {
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // cancel producer coroutine and thus kill them all
}
1
2
3
4
5
6
7
8
9
10
Processor #0 received 1
Processor #0 received 2
Processor #1 received 3
Processor #2 received 4
Processor #3 received 5
Processor #4 received 6
Processor #0 received 7
Processor #1 received 8
Processor #2 received 9
Processor #3 received 10

Fan-in

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}

@Test
fun test() = runBlocking {
val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) { // receive first six
println(channel.receive())
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
1
2
3
4
5
6
foo
foo
BAR!
foo
foo
BAR!

Buffered channels

If send is invoked first, then it is suspended until receive is invoked, if receive is invoked first, it is suspended until send is invoked.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Test
fun test() = runBlocking {
val channel = Channel<Int>(4) // create buffered channel
val sender = launch {
// launch sender coroutine
repeat(10) {
println("Sending $it") // print before sending each element
channel.send(it) // will suspend when buffer is full
}
}
// don't receive anything... just wait....
delay(1000)
println("done, then to cancel")
sender.cancel() // cancel sender coroutine
}

Channels are fair

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 data class Ball(var hits: Int)

@Test
fun test() = runBlocking {
val table = Channel<Ball>() // a shared table
launch { player("ping", table) }
launch { player("pong", table) }
table.send(Ball(0)) // serve the ball
delay(1000) // delay 1 second
coroutineContext.cancelChildren() // game over, cancel them
}

suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // receive the ball in a loop
ball.hits++
println("$name $ball")
delay(300) // wait a bit
table.send(ball) // send the ball back
}
}
1
2
3
4
ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)

Ticker channels

Ticker channel is a special rendezvous channel that produces Unit every time given delay passes since last consumption from this channel.

Though it may seem to be useless standalone, it is a useful building block to create complex time-based produce pipelines and operators that do windowing and other time-dependent processing.

Ticker channel can be used in select to perform “on tick” action.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Test
fun test() = runBlocking {
val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet

nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
println("Next element is not ready in 50 ms: $nextElement")

nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 100 ms: $nextElement")

// Emulate large consumption delays
println("Consumer pauses for 150ms")
delay(150)

// Next element is available immediately
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Next element is available immediately after large consumer delay: $nextElement")

// Note that the pause between `receive` calls is taken into account and next element arrives faster
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")

tickerChannel.cancel() // indicate that no more elements are needed
}
1
2
3
4
5
6
Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit

Composing suspending functions

Sequential

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
suspend fun doSomethingUsefulOne(): Int {
delay(1000L) // pretend we are doing something useful here
return 13
}

suspend fun doSomethingUsefulTwo(): Int {
delay(1000L) // pretend we are doing something useful here, too
return 29
}

val time = measureTimeMillis {
val one = doSomethingUsefulOne()
val two = doSomethingUsefulTwo()
println("The answer is ${one + two}")
}
println("Completed in $time ms")
1
2
The answer is 42
Completed in 2013 ms

async

1
2
3
4
5
6
val time = measureTimeMillis {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")

Lazily started async

1
2
3
4
5
6
7
8
9
val time = measureTimeMillis {
val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
// some computation
one.start() // start the first one
two.start() // start the second one
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")

Async-style functions

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// The result type of somethingUsefulOneAsync is Deferred<Int>
fun somethingUsefulOneAsync() = GlobalScope.async {
doSomethingUsefulOne()
}

// The result type of somethingUsefulTwoAsync is Deferred<Int>
fun somethingUsefulTwoAsync() = GlobalScope.async {
doSomethingUsefulTwo()
}

fun main() {
val time = measureTimeMillis {
// we can initiate async actions outside of a coroutine
val one = somethingUsefulOneAsync()
val two = somethingUsefulTwoAsync()
// but waiting for a result must involve either suspending or blocking.
// here we use `runBlocking { ... }` to block the main thread while waiting for the result
runBlocking {
println("The answer is ${one.await() + two.await()}")
}
}
println("Completed in $time ms")
}

Coroutine context and dispatchers

  • Dispatchers and threads: Coroutine context includes a coroutine dispatcher that determines what thread or threads the corresponding coroutine uses for its execution. Coroutine dispatcher can confine coroutine execution to a specific thread, dispatch it to a thread pool, or let it run unconfined.
1
2
3
4
5
6
7
8
9
10
11
12
launch { // context of the parent, main runBlocking coroutine
println("main runBlocking : I'm working in thread ${Thread.currentThread().name}")
}
launch(Dispatchers.Unconfined) { // not confined -- will work with main thread
println("Unconfined : I'm working in thread ${Thread.currentThread().name}")
}
launch(Dispatchers.Default) { // will get dispatched to DefaultDispatcher
println("Default : I'm working in thread ${Thread.currentThread().name}")
}
launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
}
1
2
3
4
Unconfined            : I'm working in thread main @coroutine#3
Default : I'm working in thread DefaultDispatcher-worker-1 @coroutine#4
main runBlocking : I'm working in thread main @coroutine#2
newSingleThreadContext: I'm working in thread MyOwnThread @coroutine#5

When launch { … } is used without parameters, it inherits the context (and thus dispatcher) from the CoroutineScope that it is being launched from. In this case, it inherits the context of the main runBlocking coroutine which runs in the main thread.

  1. Dispatchers.Unconfined is a special dispatcher that also appears to run in the main thread, but it is, in fact, a different mechanism that is explained later.

  2. The default dispatcher, that is used when coroutines are launched in GlobalScope, is represented by Dispatchers.Default and uses shared background pool of threads, so launch(Dispatchers.Default) { … } uses the same dispatcher as GlobalScope.launch { … }.

  3. newSingleThreadContext creates a thread for the coroutine to run. A dedicated thread is a very expensive resource. In a real application it must be either released, when no longer needed, using close function, or stored in a top-level variable and reused throughout the application.

Unconfined vs confined dispatcher

The Dispatchers.Unconfined coroutine dispatcher starts coroutine in the caller thread, but only until the first suspension point. After suspension it resumes in the thread that is fully determined by the suspending function that was invoked. Unconfined dispatcher is appropriate when coroutine does not consume CPU time nor updates any shared data (like UI) that is confined to a specific thread.

On the other side, by default, a dispatcher for the outer CoroutineScope is inherited. The default dispatcher for runBlocking coroutine, in particular, is confined to the invoker thread, so inheriting it has the effect of confining execution to this thread with a predictable FIFO scheduling.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test
fun test() = runBlocking {
//sampleStart
launch(Dispatchers.Unconfined) {
// not confined -- will work with main thread
println("Unconfined : I'm working in thread ${Thread.currentThread().name}")
delay(500)
println("Unconfined : After delay in thread ${Thread.currentThread().name}")
}
launch {
// context of the parent, main runBlocking coroutine
println("main runBlocking: I'm working in thread ${Thread.currentThread().name}")
delay(1000)
println("main runBlocking: After delay in thread ${Thread.currentThread().name}")
}
//sampleEnd
delay(2000)
}
1
2
3
4
Unconfined      : I'm working in thread main @coroutine#2
main runBlocking: I'm working in thread main @coroutine#3
Unconfined : After delay in thread kotlinx.coroutines.DefaultExecutor @coroutine#2
main runBlocking: After delay in thread main @coroutine#3

Debugging coroutines and threads


Reference