Skip to main content

kafka_broker_implementation

🛠️ Custom Kafka Broker Implementation

Extend SolidQueue to publish and consume messages via Kafka for ultra-low latency and at-least-once delivery semantics. Implement the Broker interface and leverage the ruby-kafka gem.

# app/brokers/kafka_broker.rb
require 'kafka'

class KafkaBroker < SolidQueue::Broker
def initialize(config)
@kafka = Kafka.new(seed_brokers: config[:brokers])
@topic = config[:topic]
end

def push(message)
@kafka.deliver_message(message.to_json, topic: @topic)
end

def pop(batch_size: 10, timeout: 5)
@kafka.each_message(topic: @topic, max_wait_time: timeout, max_bytes_per_partition: batch_size) do |msg|
yield JSON.parse(msg.value)
end
end
end

# config/initializers/solid_queue.rb
SolidQueue.configure do |c|
c.broker = KafkaBroker.new(
brokers: ['kafka1:9092','kafka2:9092'],
topic: 'solid_queue_topic'
)
end