Ruby Concurrency & Mutexes: Từ Cơ Bản Đến Nâng Cao - Hướng Dẫn Toàn Diện
Concurrency (đồng thời) là một trong những chủ đề phức tạp nhất trong lập trình, nhưng cũng là chìa khóa để tạo ra những ứng dụng Ruby hiệu suất cao. Trong bài viết này, chúng ta sẽ khám phá từ những khái niệm cơ bản về threads và mutexes đến các kỹ thuật nâng cao như lock striping và fair mutexes.
Tại Sao Cần Concurrency?
Trước khi đi vào chi tiết, hãy hiểu tại sao concurrency lại quan trọng:
- Tăng hiệu suất: Tận dụng multi-core processors
- Cải thiện responsiveness: UI không bị đóng băng khi xử lý tác vụ nặng
- Xử lý I/O hiệu quả: Không chờ đợi network/disk operations
- Scalability: Xử lý nhiều requests đồng thời
- Resource utilization: Tối ưu hóa việc sử dụng CPU và memory
Vấn Đề Race Conditions
Khi nhiều threads cùng truy cập và thay đổi dữ liệu chung, có thể xảy ra race conditions:
# ❌ Không an toàn - Race condition
counter = 0
threads = 5.times.map do
Thread.new do
1000.times do
counter += 1 # Không thread-safe!
end
end
end
threads.each(&:join)
puts counter # Kết quả không đoán trước được, thường < 5000
Vấn đề: Phép toán counter += 1
không phải là atomic. Nó bao gồm 3 bước:
- Đọc giá trị hiện tại
- Tăng giá trị lên 1
- Ghi lại giá trị mới
Khi nhiều threads thực hiện đồng thời, có thể dẫn đến mất dữ liệu.
Phần 1: Cơ Bản - Làm Quen Với Mutex
1.1 Bảo Vệ Shared Counter Với Mutex
Mutex (Mutual Exclusion) đảm bảo chỉ có một thread được truy cập critical section tại một thời điểm:
require 'thread'
def safe_counter_example
mutex = Mutex.new
counter = 0
threads = 5.times.map do
Thread.new do
1000.times do
mutex.synchronize do
counter += 1 # Critical section được bảo vệ
end
end
end
end
threads.each(&:join)
puts "Kết quả an toàn: #{counter}" # => 5000
end
safe_counter_example
Giải thích:
Mutex.new
: Tạo một mutex mớimutex.synchronize
: Khóa mutex, thực thi block, sau đó mở khóa- Chỉ một thread có thể vào critical section tại một thời điểm
1.2 Manual Lock và Unlock
Đôi khi bạn cần kiểm soát chi tiết hơn việc khóa/mở khóa:
require 'thread'
class ManualLockExample
def initialize
@mutex = Mutex.new
@data = []
end
def safe_update(value)
@mutex.lock
begin
# Critical section
puts "Thread #{Thread.current.object_id} đang xử lý #{value}"
@data << value
sleep(0.01) # Simulate work
ensure
@mutex.unlock # Luôn unlock trong ensure block
end
end
def get_data
@mutex.synchronize { @data.dup }
end
end
# Sử dụng
example = ManualLockExample.new
threads = 5.times.map do |i|
Thread.new { example.safe_update("data_#{i}") }
end
threads.each(&:join)
puts "Dữ liệu cuối cùng: #{example.get_data}"
Lưu ý quan trọng:
- Luôn sử dụng
ensure
để đảm bảo mutex được unlock - Tránh deadlock bằng cách không giữ lock quá lâu
1.3 Mutex Per Resource Pattern
Tạo một mutex cho mỗi resource để tránh blocking không cần thiết:
require 'thread'
class BankAccount
def initialize(balance)
@balance = balance
@mutex = Mutex.new # Mỗi account có mutex riêng
end
def deposit(amount)
@mutex.synchronize do
old_balance = @balance
sleep(0.01) # Simulate processing time
@balance = old_balance + amount
puts "Gửi #{amount}, số dư mới: #{@balance}"
end
end
def withdraw(amount)
@mutex.synchronize do
if @balance >= amount
old_balance = @balance
sleep(0.01)
@balance = old_balance - amount
puts "Rút #{amount}, số dư mới: #{@balance}"
true
else
puts "Không đủ số dư để rút #{amount}"
false
end
end
end
def balance
@mutex.synchronize { @balance }
end
end
# Demo
account1 = BankAccount.new(1000)
account2 = BankAccount.new(500)
# Các threads có thể làm việc với different accounts song song
threads = []
threads << Thread.new { account1.deposit(100) }
threads << Thread.new { account2.deposit(200) }
threads << Thread.new { account1.withdraw(50) }
threads << Thread.new { account2.withdraw(300) }
threads.each(&:join)
puts "Account 1 balance: #{account1.balance}"
puts "Account 2 balance: #{account2.balance}"
1.4 Chờ Threads Hoàn Thành Với Join
Thread#join
đảm bảo main thread chờ các worker threads hoàn thành:
require 'thread'
def demonstrate_join
mutex = Mutex.new
results = []
puts "Bắt đầu xử lý..."
start_time = Time.now
threads = 4.times.map do |i|
Thread.new do
# Simulate different processing times
processing_time = rand(0.5..2.0)
sleep(processing_time)
result = "Kết quả từ thread #{i} (#{processing_time.round(2)}s)"
mutex.synchronize do
results << result
puts "✓ #{result}"
end
end
end
# Chờ tất cả threads hoàn thành
threads.each(&:join)
end_time = Time.now
puts "\nHoàn thành sau #{(end_time - start_time).round(2)}s"
puts "Tổng cộng #{results.length} kết quả"
end
demonstrate_join
Phần 2: Trung Cấp - Kỹ Thuật Nâng Cao
2.1 Thread-Safe Class Design
Thiết kế class thread-safe từ đầu:
require 'thread'
class ThreadSafeCounter
def initialize(initial_value = 0)
@count = initial_value
@mutex = Mutex.new
end
def increment(amount = 1)
@mutex.synchronize do
old_value = @count
sleep(0.001) # Simulate work that could cause race condition
@count = old_value + amount
end
end
def decrement(amount = 1)
@mutex.synchronize do
old_value = @count
sleep(0.001)
@count = old_value - amount
end
end
def value
@mutex.synchronize { @count }
end
def reset(new_value = 0)
@mutex.synchronize { @count = new_value }
end
# Atomic operations
def increment_and_get(amount = 1)
@mutex.synchronize do
@count += amount
@count
end
end
def compare_and_set(expected, new_value)
@mutex.synchronize do
if @count == expected
@count = new_value
true
else
false
end
end
end
end
# Demo
counter = ThreadSafeCounter.new(100)
# Test concurrent operations
threads = []
threads += 10.times.map { Thread.new { 100.times { counter.increment } } }
threads += 5.times.map { Thread.new { 50.times { counter.decrement } } }
threads.each(&:join)
puts "Final value: #{counter.value}" # Should be 100 + 1000 - 250 = 850
2.2 Non-Blocking Lock Với try_lock
Mutex#try_lock
cho phép thử khóa mà không block thread:
require 'thread'
class NonBlockingProcessor
def initialize
@mutex = Mutex.new
@processing = false
@queue = []
end
def add_task(task)
@mutex.synchronize { @queue << task }
end
def try_process
if @mutex.try_lock
begin
return false if @processing || @queue.empty?
@processing = true
task = @queue.shift
puts "#{Thread.current.object_id}: Bắt đầu xử lý #{task}"
sleep(0.1) # Simulate work
puts "#{Thread.current.object_id}: Hoàn thành #{task}"
true
ensure
@processing = false
@mutex.unlock
end
else
puts "#{Thread.current.object_id}: Không thể lấy lock, thử lại sau"
false
end
end
def queue_size
@mutex.synchronize { @queue.size }
end
end
# Demo
processor = NonBlockingProcessor.new
# Add tasks
10.times { |i| processor.add_task("Task_#{i}") }
# Multiple threads try to process
threads = 5.times.map do
Thread.new do
10.times do
success = processor.try_process
sleep(0.05) unless success
end
end
end
threads.each(&:join)
puts "Remaining tasks: #{processor.queue_size}"
2.3 ConditionVariable - Coordination Between Threads
ConditionVariable
giúp threads chờ đợi và thông báo cho nhau:
require 'thread'
class ProducerConsumerQueue
def initialize(max_size = 5)
@queue = []
@max_size = max_size
@mutex = Mutex.new
@not_empty = ConditionVariable.new
@not_full = ConditionVariable.new
end
def produce(item)
@mutex.synchronize do
# Chờ cho đến khi queue không full
while @queue.size >= @max_size
puts "Producer chờ - Queue full (#{@queue.size}/#{@max_size})"
@not_full.wait(@mutex)
end
@queue << item
puts "Produced: #{item} (Queue: #{@queue.size}/#{@max_size})"
# Thông báo cho consumers
@not_empty.signal
end
end
def consume
@mutex.synchronize do
# Chờ cho đến khi queue không empty
while @queue.empty?
puts "Consumer chờ - Queue empty"
@not_empty.wait(@mutex)
end
item = @queue.shift
puts "Consumed: #{item} (Queue: #{@queue.size}/#{@max_size})"
# Thông báo cho producers
@not_full.signal
item
end
end
def size
@mutex.synchronize { @queue.size }
end
end
# Demo
queue = ProducerConsumerQueue.new(3)
# Producer thread
producer = Thread.new do
10.times do |i|
queue.produce("Item_#{i}")
sleep(0.1)
end
end
# Consumer threads
consumers = 2.times.map do |i|
Thread.new do
5.times do
item = queue.consume
sleep(0.2) # Simulate processing
end
end
end
producer.join
consumers.each(&:join)
2.4 Double-Checked Locking Pattern
Tối ưu hóa lazy initialization:
require 'thread'
class ConfigurationManager
@instance = nil
@mutex = Mutex.new
def self.instance
# First check (không cần lock)
return @instance if @instance
@mutex.synchronize do
# Second check (với lock)
@instance ||= new
end
end
def initialize
puts "Khởi tạo Configuration..."
sleep(0.2) # Simulate expensive initialization
@config = load_configuration
end
private
def load_configuration
{
database: {
host: 'localhost',
port: 5432,
pool_size: 10
},
cache: {
ttl: 3600,
max_size: 1000
},
api: {
timeout: 30,
retries: 3
}
}
end
def config
@config
end
end
# Test concurrent access
threads = 10.times.map do |i|
Thread.new do
config = ConfigurationManager.instance
puts "Thread #{i}: #{config.object_id}"
end
end
threads.each(&:join)
Phần 3: Nâng Cao - Kỹ Thuật Chuyên Nghiệp
3.1 Monitor - Reentrant Mutex
Ruby's Monitor
cho phép cùng một thread lock nhiều lần:
require 'monitor'
class ThreadSafeBank
include MonitorMixin
def initialize
super() # Initialize MonitorMixin
@accounts = {}
end
def create_account(id, initial_balance)
synchronize do
@accounts[id] = initial_balance
log_transaction("Created account #{id} with balance #{initial_balance}")
end
end
def transfer(from_id, to_id, amount)
synchronize do
return false unless @accounts[from_id] && @accounts[to_id]
return false if @accounts[from_id] < amount
# Nested synchronization - Monitor cho phép reentrant
withdraw_internal(from_id, amount)
deposit_internal(to_id, amount)
log_transaction("Transferred #{amount} from #{from_id} to #{to_id}")
true
end
end
def get_balance(id)
synchronize { @accounts[id] }
end
private
def withdraw_internal(id, amount)
synchronize do # Reentrant lock
@accounts[id] -= amount
end
end
def deposit_internal(id, amount)
synchronize do # Reentrant lock
@accounts[id] += amount
end
end
def log_transaction(message)
synchronize do # Reentrant lock
puts "[#{Time.now}] #{message}"
end
end
end
# Demo
bank = ThreadSafeBank.new
bank.create_account('A', 1000)
bank.create_account('B', 500)
threads = []
threads << Thread.new { bank.transfer('A', 'B', 100) }
threads << Thread.new { bank.transfer('B', 'A', 50) }
threads << Thread.new { puts "Balance A: #{bank.get_balance('A')}" }
threads << Thread.new { puts "Balance B: #{bank.get_balance('B')}" }
threads.each(&:join)
3.2 Deadlock Prevention
Tránh deadlock bằng ordered locking:
require 'thread'
class DeadlockSafeTransfer
def initialize
@accounts = {}
@mutexes = {}
end
def create_account(id, balance)
@accounts[id] = balance
@mutexes[id] = Mutex.new
end
def transfer_safe(from_id, to_id, amount)
# Luôn lock theo thứ tự object_id để tránh deadlock
first_id, second_id = [from_id, to_id].sort
first_mutex = @mutexes[first_id]
second_mutex = @mutexes[second_id]
first_mutex.synchronize do
second_mutex.synchronize do
if @accounts[from_id] >= amount
@accounts[from_id] -= amount
@accounts[to_id] += amount
puts "Transferred #{amount} from #{from_id} to #{to_id}"
true
else
puts "Insufficient funds in #{from_id}"
false
end
end
end
end
def transfer_with_timeout(from_id, to_id, amount, timeout = 1.0)
first_id, second_id = [from_id, to_id].sort
first_mutex = @mutexes[first_id]
second_mutex = @mutexes[second_id]
start_time = Time.now
if first_mutex.try_lock
begin
# Try to get second lock with timeout
while !second_mutex.try_lock
if Time.now - start_time > timeout
puts "Transfer timeout: #{from_id} -> #{to_id}"
return false
end
sleep(0.001)
end
begin
if @accounts[from_id] >= amount
@accounts[from_id] -= amount
@accounts[to_id] += amount
puts "Transferred #{amount} from #{from_id} to #{to_id}"
true
else
false
end
ensure
second_mutex.unlock
end
ensure
first_mutex.unlock
end
else
puts "Could not acquire first lock: #{first_id}"
false
end
end
def balance(id)
@mutexes[id].synchronize { @accounts[id] }
end
end
# Demo
bank = DeadlockSafeTransfer.new
bank.create_account('A', 1000)
bank.create_account('B', 800)
bank.create_account('C', 600)
# Concurrent transfers that could cause deadlock with naive locking
threads = []
threads << Thread.new { bank.transfer_safe('A', 'B', 100) }
threads << Thread.new { bank.transfer_safe('B', 'A', 50) }
threads << Thread.new { bank.transfer_safe('B', 'C', 200) }
threads << Thread.new { bank.transfer_safe('C', 'A', 150) }
threads.each(&:join)
puts "Final balances:"
puts "A: #{bank.balance('A')}"
puts "B: #{bank.balance('B')}"
puts "C: #{bank.balance('C')}"
Phần 4: Expert Level - Kỹ Thuật Chuyên Gia
4.1 Lock Striping - Sharded Mutex Pool
Để tăng throughput, chia resource thành nhiều shard với mutex riêng:
require 'zlib'
class ShardedHashMap
SHARD_COUNT = 16
def initialize
@shards = Array.new(SHARD_COUNT) { {} }
@mutexes = Array.new(SHARD_COUNT) { Mutex.new }
end
def put(key, value)
idx = shard_index(key)
@mutexes[idx].synchronize do
@shards[idx][key] = value
end
end
def get(key)
idx = shard_index(key)
@mutexes[idx].synchronize do
@shards[idx][key]
end
end
def delete(key)
idx = shard_index(key)
@mutexes[idx].synchronize do
@shards[idx].delete(key)
end
end
def size
total = 0
@mutexes.each_with_index do |mutex, idx|
mutex.synchronize do
total += @shards[idx].size
end
end
total
end
# Atomic increment operation
def increment(key, amount = 1)
idx = shard_index(key)
@mutexes[idx].synchronize do
@shards[idx][key] = (@shards[idx][key] || 0) + amount
end
end
def keys
all_keys = []
@mutexes.each_with_index do |mutex, idx|
mutex.synchronize do
all_keys.concat(@shards[idx].keys)
end
end
all_keys
end
private
def shard_index(key)
Zlib.crc32(key.to_s) % SHARD_COUNT
end
end
# Performance test
def benchmark_sharded_map
map = ShardedHashMap.new
puts "Testing concurrent writes..."
start_time = Time.now
threads = 8.times.map do |thread_id|
Thread.new do
1000.times do |i|
key = "thread_#{thread_id}_item_#{i}"
map.put(key, "value_#{i}")
end
end
end
threads.each(&:join)
end_time = Time.now
puts "Wrote 8000 items in #{(end_time - start_time).round(3)}s"
puts "Final size: #{map.size}"
# Test concurrent increments
puts "\nTesting concurrent increments..."
start_time = Time.now
threads = 10.times.map do
Thread.new do
1000.times do |i|
map.increment("counter_#{i % 100}")
end
end
end
threads.each(&:join)
end_time = Time.now
puts "Performed 10000 increments in #{(end_time - start_time).round(3)}s"
end
benchmark_sharded_map
4.2 Fair FIFO Mutex
Đảm bảo fairness bằng FIFO queue:
require 'thread'
class FairMutex
def initialize
@queue = []
@cv = ConditionVariable.new
@lock = Mutex.new
@owner = nil
end
def synchronize
thread_id = Thread.current.object_id
@lock.synchronize do
# Add to queue
@queue << thread_id
# Wait until it's our turn
while @queue.first != thread_id || @owner
@cv.wait(@lock)
end
# We got the lock
@owner = thread_id
end
begin
yield
ensure
@lock.synchronize do
@owner = nil
@queue.shift
@cv.broadcast # Wake up all waiting threads
end
end
end
def queue_length
@lock.synchronize { @queue.length }
end
end
# Demo fairness
def demonstrate_fairness
fair_mutex = FairMutex.new
results = []
results_mutex = Mutex.new
threads = 10.times.map do |i|
Thread.new do
fair_mutex.synchronize do
timestamp = Time.now.to_f
thread_info = "Thread #{i} acquired lock at #{timestamp}"
results_mutex.synchronize do
results << { thread: i, time: timestamp }
end
puts thread_info
sleep(0.1) # Hold lock briefly
end
end
end
threads.each(&:join)
# Verify FIFO order
puts "\nVerifying FIFO order:"
results.sort_by! { |r| r[:time] }
results.each_with_index do |result, index|
puts "Position #{index}: Thread #{result[:thread]}"
end
end
demonstrate_fairness
4.3 Custom Reentrant Mutex
Tự implement reentrant mutex với ownership tracking:
require 'thread'
class CustomReentrantMutex
def initialize
@owner = nil
@depth = 0
@cv = ConditionVariable.new
@lock = Mutex.new
end
def synchronize
acquire
begin
yield
ensure
release
end
end
def acquire
current_thread = Thread.current
@lock.synchronize do
if @owner == current_thread
# Same thread, just increment depth
@depth += 1
else
# Different thread, wait until available
while @owner
@cv.wait(@lock)
end
@owner = current_thread
@depth = 1
end
end
end
def release
current_thread = Thread.current
@lock.synchronize do
raise "Not owner" unless @owner == current_thread
@depth -= 1
if @depth == 0
@owner = nil
@cv.signal
end
end
end
def owned_by_current_thread?
@lock.synchronize { @owner == Thread.current }
end
def lock_depth
@lock.synchronize { @owner == Thread.current ? @depth : 0 }
end
end
# Demo reentrant behavior
class ReentrantExample
def initialize
@mutex = CustomReentrantMutex.new
@value = 0
end
def method_a
@mutex.synchronize do
puts "Method A: depth = #{@mutex.lock_depth}"
@value += 1
method_b # Nested call
end
end
def method_b
@mutex.synchronize do
puts "Method B: depth = #{@mutex.lock_depth}"
@value += 10
method_c # Another nested call
end
end
def method_c
@mutex.synchronize do
puts "Method C: depth = #{@mutex.lock_depth}"
@value += 100
end
end
def value
@mutex.synchronize { @value }
end
end
example = ReentrantExample.new
example.method_a
puts "Final value: #{example.value}" # Should be 111
4.4 Thread Pool With Work Stealing
Advanced pattern cho high-performance concurrent processing:
require 'thread'
class WorkStealingThreadPool
def initialize(size = 4)
@size = size
@queues = Array.new(size) { Queue.new }
@workers = []
@shutdown = false
@mutex = Mutex.new
start_workers
end
def submit(task)
return if @shutdown
# Find queue with least work
min_queue = @queues.min_by(&:size)
min_queue << task
end
def shutdown
@mutex.synchronize do
@shutdown = true
@size.times { |i| @queues[i] << :shutdown }
end
@workers.each(&:join)
end
private
def start_workers
@workers = @size.times.map do |worker_id|
Thread.new { worker_loop(worker_id) }
end
end
def worker_loop(worker_id)
my_queue = @queues[worker_id]
loop do
task = get_task(worker_id, my_queue)
case task
when :shutdown
break
when Proc
begin
task.call
rescue => e
puts "Error in worker #{worker_id}: #{e.message}"
end
end
end
puts "Worker #{worker_id} shutting down"
end
def get_task(worker_id, my_queue)
# Try own queue first
begin
return my_queue.pop(true) # Non-blocking
rescue ThreadError
# Queue empty, try work stealing
end
# Try stealing from other queues
other_queues = @queues.each_with_index.reject { |_, i| i == worker_id }
other_queues.each do |queue, _|
begin
return queue.pop(true)
rescue ThreadError
# This queue is also empty
end
end
# All queues empty, block on own queue
my_queue.pop
end
end
# Demo
pool = WorkStealingThreadPool.new(4)
# Submit various tasks
20.times do |i|
pool.submit(proc do
puts "Processing task #{i} on thread #{Thread.current.object_id}"
sleep(rand(0.1..0.5)) # Simulate work
puts "Completed task #{i}"
end)
end
sleep(3) # Let tasks complete
pool.shutdown
Best Practices và Lưu Ý
1. Tránh Common Pitfalls
# ❌ Tránh: Nested locks có thể gây deadlock
def bad_nested_locks
@lock1.synchronize do
@lock2.synchronize do
# Dangerous if another thread locks in reverse order
end
end
end
# ✅ Tốt: Ordered locking
def good_ordered_locks
first, second = [@lock1, @lock2].sort_by(&:object_id)
first.synchronize do
second.synchronize do
# Safe
end
end
end
# ❌ Tránh: Giữ lock quá lâu
def bad_long_lock
@mutex.synchronize do
expensive_network_call # Blocks other threads unnecessarily
end
end
# ✅ Tốt: Minimize lock time
def good_short_lock
data = expensive_network_call
@mutex.synchronize do
@shared_data = data # Quick update
end
end
2. Performance Tips
- Lock granularity: Sử dụng fine-grained locks khi có thể
- Lock-free algorithms: Cân nhắc atomic operations
- Thread pool: Tái sử dụng threads thay vì tạo mới
- Work stealing: Cân bằng tải giữa các threads
3. Debugging Concurrent Code
class DebuggableMutex
def initialize(name)
@name = name
@mutex = Mutex.new
@owner = nil
@acquired_at = nil
end
def synchronize
puts "[#{Time.now}] #{Thread.current.object_id} waiting for #{@name}"
@mutex.synchronize do
@owner = Thread.current.object_id
@acquired_at = Time.now
puts "[#{@acquired_at}] #{@owner} acquired #{@name}"
begin
yield
ensure
duration = Time.now - @acquired_at
puts "[#{Time.now}] #{@owner} released #{@name} (held for #{duration.round(3)}s)"
@owner = nil
@acquired_at = nil
end
end
end
end
Kết Luận
Concurrency trong Ruby là một chủ đề phức tạp nhưng cực kỳ quan trọng. Từ những khái niệm cơ bản về Mutex đến các kỹ thuật nâng cao như lock striping và fair mutexes, mỗi technique đều có use case riêng.
Những điểm chính cần nhớ:
- Cơ bản: Sử dụng
Mutex#synchronize
để bảo vệ critical sections - Trung cấp:
ConditionVariable
cho thread coordination,try_lock
cho non-blocking - Nâng cao:
Monitor
cho reentrant locks, ordered locking để tránh deadlock - Expert: Lock striping cho performance, custom implementations cho special needs
Khi nào sử dụng gì:
- Simple shared state:
Mutex#synchronize
- Producer-consumer:
ConditionVariable
- High contention: Lock striping hoặc lock-free algorithms
- Nested locking:
Monitor
hoặc custom reentrant mutex - Fairness required: Custom FIFO mutex
Lời khuyên cuối:
- Bắt đầu đơn giản với
Mutex#synchronize
- Profile trước khi optimize
- Test thoroughly với concurrent workloads
- Document thread safety assumptions
- Consider alternatives như Actor model (Celluloid) hoặc async programming
Concurrency không dễ, nhưng với những kỹ thuật này, bạn có thể xây dựng những ứng dụng Ruby hiệu suất cao và thread-safe! 🚀