Basic Practice
First Example
1 | import kotlinx.coroutines.* |
1 | import kotlinx.coroutines.* |
Structured concurrency
1 | import kotlinx.coroutines.* |
Scope builder
1 | import kotlinx.coroutines.* |
suspending function
1 | import kotlinx.coroutines.* |
repeat
1 | import kotlinx.coroutines.* |
It launches 100K coroutines and, after a second, each coroutine prints a dot. Now, try that with threads. What would happen? (Most likely your code will produce some sort of out-of-memory error)
Cancellation
Cancelling coroutine execution
1 | val job = launch { |
withTimeout
1 | withTimeout(1300L) { |
withTimeoutOrNull
1 |
|
Channels
Channels provide a way to transfer a stream of values.
A Channel is conceptually very similar to BlockingQueue. One key difference is that instead of a blocking put operation it has a suspending send, and instead of a blocking take operation it has a suspending receive.
send number by channels
1 |
|
Closing and iteration over channels
1 | val channel = Channel<Int>() |
channel producers
1 | package kotlinx.coroutines.guide.channel03 |
Fan-out
1 | fun CoroutineScope.produceNumbers() = produce<Int> { |
1 | Processor #0 received 1 |
Fan-in
1 | suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) { |
1 | foo |
Buffered channels
If send is invoked first, then it is suspended until receive is invoked, if receive is invoked first, it is suspended until send is invoked.
1 |
|
Channels are fair
1 | data class Ball(var hits: Int) |
1 | ping Ball(hits=1) |
Ticker channels
Ticker channel is a special rendezvous channel that produces Unit every time given delay passes since last consumption from this channel.
Though it may seem to be useless standalone, it is a useful building block to create complex time-based produce pipelines and operators that do windowing and other time-dependent processing.
Ticker channel can be used in select to perform “on tick” action.
1 |
|
1 | Initial element is available immediately: kotlin.Unit |
Composing suspending functions
Sequential
1 | suspend fun doSomethingUsefulOne(): Int { |
1 | The answer is 42 |
async
1 | val time = measureTimeMillis { |
Lazily started async
1 | val time = measureTimeMillis { |
Async-style functions
1 | // The result type of somethingUsefulOneAsync is Deferred<Int> |
Coroutine context and dispatchers
- Dispatchers and threads: Coroutine context includes a coroutine dispatcher that determines what thread or threads the corresponding coroutine uses for its execution. Coroutine dispatcher can confine coroutine execution to a specific thread, dispatch it to a thread pool, or let it run unconfined.
1 | launch { // context of the parent, main runBlocking coroutine |
1 | Unconfined : I'm working in thread main @coroutine#3 |
When launch { … } is used without parameters, it inherits the context (and thus dispatcher) from the CoroutineScope that it is being launched from. In this case, it inherits the context of the main runBlocking coroutine which runs in the main thread.
Dispatchers.Unconfined is a special dispatcher that also appears to run in the main thread, but it is, in fact, a different mechanism that is explained later.
The default dispatcher, that is used when coroutines are launched in GlobalScope, is represented by Dispatchers.Default and uses shared background pool of threads, so launch(Dispatchers.Default) { … } uses the same dispatcher as GlobalScope.launch { … }.
newSingleThreadContext creates a thread for the coroutine to run. A dedicated thread is a very expensive resource. In a real application it must be either released, when no longer needed, using close function, or stored in a top-level variable and reused throughout the application.
Unconfined vs confined dispatcher
The Dispatchers.Unconfined coroutine dispatcher starts coroutine in the caller thread, but only until the first suspension point. After suspension it resumes in the thread that is fully determined by the suspending function that was invoked. Unconfined dispatcher is appropriate when coroutine does not consume CPU time nor updates any shared data (like UI) that is confined to a specific thread.
On the other side, by default, a dispatcher for the outer CoroutineScope is inherited. The default dispatcher for runBlocking coroutine, in particular, is confined to the invoker thread, so inheriting it has the effect of confining execution to this thread with a predictable FIFO scheduling.
1 |
|
1 | Unconfined : I'm working in thread main @coroutine#2 |