Resolve Race Condition problems with Map-Reduce and Coroutines

Tiếp sau bài Kotlin Coroutines Basic Concepts. Bài này mình sẽ tận dụng những kiến thức trên để giải quyết một vấn đề trong Concurrent đó là Race Condition bằng ý tưởng Map-Reduce. Bài này khá ngắn nhưng lại đề cập được 2 chủ đề rất hay là Race ConditionMap-Reduce nên các bạn hãy đọc tới cuối bài nhé ;))

Nội dung bài viết:

  1. What is Race Condition?
  2. How to resolve Race Condition?
  3. What is Map-Reduce?
  4. Resolve Race Condition with Map-Reduce

1. What is Race Condition?

Hãy nhìn video sau:

và tưởng tượng các chú chó là thread còn trái banh là resource ;))

Không cần giải thích nhiều thì chắc các bạn cũng thấy cũng thấy resource sẽ có thể không chính xác hoặc toàn vẹn nếu có nhiều thread cùng tranh chấp trong một thời điểm.

2. How to resolve Race Condition?

Nguyên nhân của Race Condition là do tranh chấp resource. Vậy chỉ cần giải quyết được vấn đề này sẽ giải quyết được Race Condition, và ta có 2 “trường phái” là:

1. Sử dụng Synchronizations[1] hoặc Atomic Operations[2]
2. Tránh chia sẻ trạng thái

Quay lại ví dụ về các chú chó và trái banh thì 2 cách giải quyết sẽ như sau:

  1. Sẽ cũng chỉ có 1 trái banh và các chú cho phải đợi tới lượt mình chơi - các chú chó chắc sẽ không được vui 😂.
  2. Sẽ có nhiều trái banh cho các chú chó và chắc chắn chú chó nào cũng vui 💪.

vậy chỉ cần phụ thuộc vào điều kiện anh chủ ;))

Trong thực tế, thì trước đây khi phần cứng chưa phát triển, các hệ thống máy tính chỉ có một core, bộ nhớ RAM đắt đỏ thì #1 là sự lựa chọn tốt hơn nhưng tới thời điểm hiện tại mình tin rằng #2 là giải pháp tối ưu hơn.

Tuy nhiên vẫn có một số trường hợp bắt buộc phải dùng #1 như: số dư tài khoản ngân hàng, chỉ số chứng khoán…

3. What is Map-Reduce?

Đọc tới đây chắc bạn cũng đưa ra nhận định #2 là lựa chọn tối ưu để giải quyết Race Condition trong thời điểm hiện tại. Nhưng làm thế nào để hiện thực #2? Một trong những cách hiện thực là Map-Reduce. Vậy Map-Reduce là gì?

Map-Reduce lần đầu tiên được giới thiệu bởi Jeffery Dean và Sanjay Ghemawat lúc đó đang làm việc ở Google vào năm 2004, và là ý tưởng cơ bản để hiện thực Hadoop. Về cơ bản, đây là cách thực hiện của tinh thần chia để trị. Khi thực thi, sẽ có 2 bước là:

  1. Map: chia dữ liệu cần xử lý thần các phần nhỏ và xử lý các phần nhỏ đó.
  2. Reduce: tổng hợp dữ liệu đã xử lý để cho ra kết quả cuối cùng.

Lý thuyết hơi khó hiểu nên bạn xem hình này nha:

Word Count

đây ví dụ “Word Count” trong bài Hadoop MapReduce deep diving and tuning . Nhiệm vụ là cần đếm số lần xuất hiện của một từ trong phần Input. Với bước Map được chia thành 2 bước nhỏ: Splitting và Mapping, Reduce được chia thành 2 bước nhỏ: Shuffling và Reducing.

Resolve Race Condition with Map-Reduce

Đây là phần chính của bài viết. Mình sẽ implement ví dụ “kinh điển” của Race Condition là increase một biến counter bằng 3 cách: increase bình thường, increase với Map-Reduce và increase với nhiều thread và AtomicLong.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
fun main(args: Array<String>) = runBlocking {

  val max = 1000_000_000L // the max value which counter need increase to this

  // increase use one thread
  var counterSync = 0L
  val timeSync = measureNanoTime { counterSync = increaseSync(max) }
  println("increaseSync: counterSync                        [$counterSync] - time [$timeSync]")

  val workers = 5 // number workers to split into sub many increment

  var counterMapReduce = 0L
  val timeMapReduce = measureNanoTime { counterMapReduce = increaseMapReduce(max, workers) }
  println("increaseSync with Map-Reduce: counterMapReduce   [${counterMapReduce}] - time [$timeMapReduce]")

  var atomicCounter = AtomicLong()
  val timeAtomicCounter = measureNanoTime { increaseWithAtomic(max, workers, atomicCounter) }

  println("increaseWithAtomic: atomicCounter                [${atomicCounter.get()}] - time [$timeAtomicCounter]")

}

// just use for statement in one thread
fun increaseSync(max: Long): Long {
  var counter = 0L
  for (i in 0 until (max)) {
    counter++
  }
  return counter
}

// create workers group then increase couter in each worker and sum it
suspend fun increaseMapReduce(max: Long, workers: Int): Long = runBlocking {
  val maxByWorker = max / workers // max counter of each worker

  val threadPool = newFixedThreadPoolContext(workers, "workerThreadPool") // workers group

  (1..workers)
    .map { async(threadPool) { increaseSync(maxByWorker) } } // create worker jobs
    .map { it.await() } // wait all jobs of workers done
    .reduce { acc, current -> acc + current } // sum max counter of each worker
}

// create thread group and shared couter, each thread will increase shared couter and wait all threads done
fun increaseWithAtomic(max: Long, workers: Int, atomicCounter: AtomicLong) {
  val maxByWorker = max / workers

  val threads = (1..workers)
    .map {// create thread group
      thread {
        for (i in 0 until maxByWorker) {
          atomicCounter.incrementAndGet() // increase atomic opertation
        }
      }
    }

  for (thread in threads) { // wait all threads done
    thread.join()
  }
}

kết quả sẽ show ra như sau:

increaseSync: counterSync                        [1000000000] - time [304001418]
increaseSync with Map-Reduce: counterMapReduce   [1000000000] - time [171316295]
increaseWithAtomic: atomicCounter                [1000000000] - time [19268418252]

Ta có thể thấy thời gian của increaseSync with Map-Reduce là nhanh nhất ta xem là 1x thì increaseSync1.8x và thật kinh khủng khi increaseWithAtomic112.5x. Số liệu chạy lại có thể khác nhưng từ đây có thể thấy trong trường hợp thời gian thực hiện toán tử là nhanh và thời gian để đảm bảo shared state luôn đúng là chậm thì Sử dụng Synchronizations hoặc Atomic Operations là không hiệu quả.

Kết: Khi giải quyết Race Condition trong thời điểm hiện tại thì cách Tránh chia sẻ trạng thái đang là giải pháp tối ưu trong hầu hết các trường hợp, Map-Reduce là một trong những hiện thực của cách trên nhưng vẫn còn những cách khác giải quyết những vấn đề phức tạp hơn. Mong các bạn đón đọc phần tiếp theo của loại bài viết về Kotlin Coroutines để được nhiều vấn đề trong Concurrent và cách giải quyết nhé.

Phần code sample mình update vào repo này nhé. Cảm ơn các bạn đã đọc tới đây.


  1. Trong một thời điểm chỉ một thread được phép thực thi. 

  2. Là các thao tác *không thể chia nhỏ ra nữa trong đó ngôn ngôn ngữ lập trình hoặc hệ điều hành sẽ đảm bảo các thao tác này sẽ hoàn thành một cách độc lập với bất kỳ thread hoặc process nào. 

updatedupdated2020-11-292020-11-29
Load Comments?