Kotlin Coroutines Basic Concepts

Mở đầu loại bài viết về Kotlin Coroutines mình sẽ giải thích khác khái niệm trong Coroutines và tương ứng với các khái niệm sẽ có thêm ví dụ. Qua đó mong bạn sẽ có kiến thức nền về Coroutines và tìm hiểu nó nhanh hơn.

Các khái niệm sẽ giải thích trong bài này:

  1. Dispatchers
  2. Scope
  3. Context
  4. Suspending Function
  5. Job
  6. Deferred

1. Dispatchers

Đúng như cái tên. Dispatcher có nhiệm vụ điều phối một hoặc nhiều thread làm nhiệm vụ thực thi coroutine. Có các loại Dispatcher như sau:

  • Dispatchers.Default: là dispatcher mặc định sẽ được sử dụng nếu không khai báo tường minh trong Scope Builder. Nó sử dụng một common pool được chia sẻ bởi các thread nền. Dispatchers.Default thích hợp với các tính toán cần nhiều tài nguyên CPU
  • Dispatchers.IO: được thiết kế để sử dụng cho các thao tác IO-intensive blocking operations như đọc/ghi file hay blocking socket I/O
  • Dispatchers.Unconfined: đây là một dispatcher khá lạ. Trong docs cũng nói nó không thường được sử dụng trong code ;)). Dispatchers này sẽ không chỉ định thread nào sẽ thực thi. Nó có nghĩa là khi khởi chạy nó sẽ được chạy bởi thread đang chạy nó, khi resume lại sau khi suspend thì Coroutines sẽ quyết định thread sẽ thực thi
  • ThreadPool cụ thể được chỉ định được tạo ra bởi newSingleThreadContext hoặc newFixedThreadPoolContext
  • Các Executor được convert bởi asCoroutineDispatcher
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
fun main() = runBlocking {

  launch { // context of the parent, main runBlocking coroutine
    println("main runBlocking            : I'm working in thread ${Thread.currentThread().name}")
  }
  launch(Dispatchers.Unconfined) { // not confined -- will work with main thread
    println("Unconfined                 : I'm working in thread ${Thread.currentThread().name}")
  }
  launch(Dispatchers.Default) { // will get dispatched to DefaultDispatcher
    println("Default                    : I'm working in thread ${Thread.currentThread().name}")
  }

  launch(newCoroutineContext(Dispatchers.Default)) { // will get its own new thread
    println("newCoroutineContext         : I'm working in thread ${Thread.currentThread().name}")
  }

  //
  val dispatcherWithTwoThread = newFixedThreadPoolContext(2, "ThreadPoll")

  repeat(10) {
    launch(dispatcherWithTwoThread) { // will get its own new thread
      println("dispatcherWithTwoThread${it.padEnd(3)}  : I'm working in thread ${Thread.currentThread().name}")
      delay(Random.nextLong(500, 5000))
      println("dispatcherWithTwoThread${it.padEnd(3)}  : I'm done in thread ${Thread.currentThread().name}")
    }
  }
}

fun Int.padEnd(length: Int, padChar: Char = '0') = this.toString().padStart(length, padChar)

2. Scope

Các coroutine được khởi chạy trong CoroutineScope. Mục đích của việc này để quản lý resource. Hãy tưởng tượng khi ta cần thực hiện một tác vụ nặng nào đó trong một hoặc nhiều coroutine, nhưng giữa chừng tác vụ đó không cần thiết nữa, lúc này ta chỉ cần cancel() scope chứa các coroutine của tác vụ nặng trên. Một đặc điểm khác của CoroutineScope là nó có thể chứa nhiều CoroutineScope con.

GlobalScope

GlobalScope được xem như là scope cha của tất cả các CoroutineScope trong ứng dụng. GlobalScope không thể bị cancel() và sẽ tồn tại xuyên suốt vòng đời ứng dụng. Ngoài ra việc sử dụng được khuyên là KHÔNG NÊN. Bạn có thể xem thêm ở bài The reason to avoid GlobalScope để hiểu nguyên nhân nhé.

Scope Builder

Như đã nói ở trên các coroutine sẽ được khởi chạy trong các Scope Builder sau:

  • runBlocking: chạy một coroutine và sẽ block[1] đến khi coroutine bên trong chạy xong. Không nên sử dụng function này trong một coroutine vì nó được thiết kế để làm cầu nối code thường với các thư viện được viết theo suspending style, thường được sử dụng cho hàm main và trong test.

  • coroutineScope: Tạo ra một CoroutineScope mới. Scope được tạo mới này là con của scope bên ngoài nhưng overrides lại Job. Function này được thiết kết để parallel decomposition, nghĩa là khi có bất cứ một coroutine nào fail trong scope này thì các coroutine đang đợi xử lý trong scope này cũng sẽ được cancel.

  • launch: Tạo ra một CoroutineScope mới, sẽ không block[2] thread hiện tại và sẽ trả về một Job.

  • async: Tạo ra một CoroutineScope mới, sẽ không block2 thread hiện tại và sẽ trả về một Deferred. Như vậy async được dùng khi cần kết quả trả về khi gọi còn launch thì không.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
fun main() = runBlocking {

  val job = launch(CoroutineName("parent")) { // parent scope
    println("Start $coroutineContext")

    async(CoroutineName("child-1")) {
      println("Start $coroutineContext")
      delay(1000)
      println("End $coroutineContext")
    }

    async(CoroutineName("child-2")) {
      println("Start $coroutineContext")
      delay(3000)
      println("End $coroutineContext") // not execute this line
    }

    println("End $coroutineContext") // still wait child-1 and child-2 finish
  }

  delay(2000)
  job.cancel() // cancel but child-2 not finish

  println("Done")
}

3. Context

Mỗi coroutine trong Kotlin đều có một context được thể hiện bằng một instance của interface CoroutineContext. Context này là một tập các thành phần cấu hình cho coroutine, trong đó 2 thành phần quan trọng là JobDispatcher.

Context là một immutable. Nhưng ta có thể dùng toán tử plus để tạo ra một context mới.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
fun main() = runBlocking<Unit> {

  println("Current context is: $coroutineContext")

  println("New context with name: ${coroutineContext + CoroutineName("test")}")

  println("Job in current context is: ${coroutineContext[Job]}")

  println("Dispatcher in current context is: ${coroutineContext[ContinuationInterceptor]}")
  
  launch(CoroutineName("child")) {
    println("Child context is $coroutineContext}")

    println("CoroutineName in current context is: ${coroutineContext[CoroutineName]}")
  }

}

4. Suspending Function

Suspending Function được xem xương sống của Kotlin Coroutines. Khi đó, các function bên trong Suspending Function sẽ được gọi mà không bị block1. Như thế thread đang handle cho Suspending Function sẽ được trả lại cho JVM và được dùng cho các tác vụ khác. Suspending Function phải chạy trong một coroutine thì mới có tác dụng nhé ;))

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
fun main() = runBlocking(
  CoroutineName("parent") +
    newSingleThreadContext("SingleThread") // Dispatcher SingleThread only use one thread
) {

  launch(CoroutineName("child")) {

    doSomeThingToWait(3000) // release thread in SingleThread for another function
  }

  doSomeThingToWait(1000) // use thread in SingleThread

}

suspend fun doSomeThingToWait(waitTime: Long) {
  println("Current context when START: $coroutineContext")

  delay(waitTime) // simulator processing

  println("Current context when END  : $coroutineContext")
}

5. Job

Như đã đề cập ở Context Job là thành phần không thể thiếu trong context. Nó cho phép ta cancel(), join() hoặc start() coroutine tương ứng với job đó. Ngoài ra nhờ có job mà ta biết được trạng thái của một coroutine. Các trạng thái của coroutine có thể được biển diễn ở hình dưới:

                                          wait children
    +-----+ start  +--------+ complete   +-------------+  finish  +-----------+
    | New | -----> | Active | ---------> | Completing  | -------> | Completed |
    +-----+        +--------+            +-------------+          +-----------+
                     |  cancel / fail       |
                     |     +----------------+
                     |     |
                     V     V
                 +------------+                           finish  +-----------+
                 | Cancelling | --------------------------------> | Cancelled |
                 +------------+                                   +-----------+
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
fun main() = runBlocking {

  val timeNotCancel = measureTimeMillis {
    val job = launch {
      delay(3_000) // done after 3000 ms
    }

    delay(1_000)
    job.join() // wait job done
  }

  println("Time to run not cancel $timeNotCancel ms")

  val timeWithCancel = measureTimeMillis {
    val job = launch {
      delay(3_000) // done after 3000 ms
    }

    delay(1_000)

    job.cancel() // not wait delay(3_000)
    job.join() // wait job done
  }

  println("Time to run with cancel $timeWithCancel ms")

}

6. Deferred

Deferred cũng là một Job nhưng nó chứa kết quả khi coroutine hoàn thành. Deferred được tạo bằng async mà mình đã đề cập ở ScopeBuilder hoặc khởi tạo trực tiếp bởi CompletableDeferred

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
fun main() = runBlocking {

  println("--------- async ---------")

  val deferred = async { getRandomInt() }

  println("Wait async-getRandomInt result")

  println("RandomInt from async-getRandomInt = ${deferred.await()}")

  println("------ CompletableDeferred ------")

  val completableDeferred = CompletableDeferred<Int>()

  launch {
    delay(1000)

    val result = Random.nextInt(1, 10)

    println("completableDeferred done with result $result")

    completableDeferred.complete(result)
  }

  println("Wait completableDeferred result")
  println("RandomInt from completableDeferred = ${completableDeferred.await()}")

}

suspend fun getRandomInt(): Int {
  delay(1000)

  val result = Random.nextInt(1, 10)

  println("async-getRandomInt done with result $result")

  return result
}

Phần code sample mình update vào repo này nhé. Cảm ơn các bạn đã đọc tới đây.


  1. Thread hiện tại sẽ đợi đến khi function chạy xong thì mới chạy tiếp đoạn code tiếp theo  ↩︎

  2. Chạy tiếp đoạn code tiếp theo mà không phải đợi function này hoàn tất. Kết quả sẽ trả về qua callback hoặc một feature object  ↩︎

updatedupdated2020-11-252020-11-25
Load Comments?