Hedisを使用したRedisとHaskellによるプロデューサー-コンシューマーキューの構築
原題: Building a Producer-Consumer Queue with Redis and Haskell Using Hedis
分析結果
- カテゴリ
- AI
- 重要度
- 59
- トレンドスコア
- 21
- 要約
- この記事では、HaskellのHedisライブラリを使用してRedis上にプロデューサー-コンシューマーキューを構築する方法を解説します。プロデューサーはデータをキューに追加し、コンシューマーはそのデータを処理します。Redisの特性を活かし、効率的なデータのやり取りを実現するためのコード例や設計のポイントが紹介されています。
- キーワード
TL;DR: We'll build a production-grade producer-consumer queue in Haskell using Redis as the message broker via the Hedis client library. By the end, you'll have a working system that can handle high-throughput job dispatch and consumption — the same pattern I used to process 1M+ payment refunds at Juspay. Why Redis for a Queue? When people think "message queue," they reach for Kafka or RabbitMQ. But Redis is often the right call when you need: Low latency — sub-millisecond enqueue/dequeue Simplicity — no broker clusters to manage Atomicity — LPUSH / BRPOP are atomic operations, safe under concurrency Visibility — you can inspect the queue state instantly with LLEN At Juspay, we routed payment refunds through a Redis-backed producer-consumer system. The queue absorbed burst traffic from merchant-triggered refund events and fed a pool of consumers that processed each refund, updated sub-statuses, and called downstream banking APIs — all without a single dropped message. Let's build that. What We're Building ┌──────────────┐ LPUSH ┌─────────────────┐ BRPOP ┌──────────────┐ │ Producer │ ──────────────────▶ │ Redis Queue │ ─────────────────▶ │ Consumer │ │ (Job sender)│ │ (List: jobs) │ │ (Job worker) │ └──────────────┘ └─────────────────┘ └──────────────┘ Producer pushes JSON-encoded jobs onto a Redis list using LPUSH Consumer blocks on BRPOP — waking up the instant a job arrives Multiple consumers can run in parallel, each pulling distinct jobs atomically Prerequisites GHC + Cabal (or Stack) installed A running Redis instance ( redis-server or Docker: docker run -p 6379:6379 redis ) Basic familiarity with Haskell ( do notation, IO ) Project Setup Create a new Cabal project: mkdir redis-queue && cd redis-queue cabal init --non-interactive Add dependencies to your redis-queue.cabal file: build-depends: base >= 4.14, hedis >= 0.15, aeson >= 2.0, text >= 1.2, bytestring >= 0.11, async >= 2.2 Install and confirm Hedis is available: cabal build Understanding Hedis Basics Hedis wraps all Redis commands in the Redis monad, which you run against a Connection . Here's the mental model: -- Open a connection pool conn <- connect defaultConnectInfo -- Run Redis commands inside runRedis runRedis conn $ do set "hello" "world" get "hello" -- returns Right (Just "world") Every command returns Either Reply a — the Left branch is a Redis protocol error, Right is success. In practice you'll pattern-match or use either to handle errors. Step 1 — Define the Job Type Create src/Job.hs : {-# LANGUAGE DeriveGeneric #-} module Job where import Data.Aeson ( FromJSON , ToJSON , encode , decode ) import Data.Text ( Text ) import GHC.Generics ( Generic ) import Data.ByteString.Lazy ( ByteString ) -- Our job payload — swap this for whatever your domain needs data Job = Job { jobId :: Text , jobType :: Text , payload :: Text } deriving ( Show , Eq , Generic ) instance ToJSON Job instance FromJSON Job -- The Redis key we'll use as our queue queueKey :: ByteString queueKey = "jobs:queue" Keeping the job type generic means you can serialise anything that has a ToJSON instance — refund requests, email notifications, image processing tasks, whatever fits your system. Step 2 — The Producer Create src/Producer.hs : module Producer where import Database.Redis import Data.Aeson ( encode ) import Data.ByteString.Lazy ( toStrict ) import Control.Monad ( forM_ ) import Job -- Push a single job onto the left end of the list enqueue :: Connection -> Job -> IO () enqueue conn job = do let encoded = toStrict ( encode job ) result <- runRedis conn $ lpush queueKey [ encoded ] case result of Left err -> putStrLn $ "Enqueue error: " ++ show err Right count -> putStrLn $ "Job enqueued. Queue depth: " ++ show count -- Simulate a burst of jobs — e.g. end-of-day refund batch producerMain :: Connection -> IO () producerMain conn = do let jobs = [ Job "txn-001" "refund" "{ \" amount \" : 500, \" currency \" : \" INR \" }" , Job "txn-002" "refund" "{ \" amount \" : 1200, \" currency \" : \" INR \" }" , Job "txn-003" "notify" "{ \" email \" : \" [email protected] \" }" , Job "txn-004" "refund" "{ \" amount \" : 300, \" currency \" : \" USD \" }" , Job "txn-005" "notify" "{ \" email \" : \" [email protected] \" }" ] putStrLn "Producer starting — pushing jobs..." forM_ jobs ( enqueue conn ) putStrLn "Producer done." Key point: lpush is atomic . Even if 100 producers call it simultaneously, each job lands on the queue exactly once. Redis serialises concurrent writes internally — no locks needed on your side. Step 3 — The Consumer Create src/Consumer.hs : module Consumer where import Database.Redis import Data.Aeson ( decode ) import Data.ByteString.Lazy ( fromStrict ) import Control.Monad ( forever ) import Job -- Process a single job — replace this with your real business logic processJob :: Job -> IO () processJob job = putStrLn $ "[Worker] Processing " ++ show ( jobType job ) ++ " | ID: " ++ show ( jobId job ) ++ " | Payload: " ++ show ( payload job ) -- Block until a job is available, then process it consumeOne :: Connection -> IO () consumeOne conn = do result <- runRedis conn $ brpop [ queueKey ] 30 -- 30s timeout case result of Left err -> putStrLn $ "Redis error: " ++ show err Right Nothing -> putStrLn "Timeout — no jobs in 30s, polling again..." Right ( Just ( _ , raw )) -> case decode ( fromStrict raw ) of Nothing -> putStrLn $ "Failed to decode job: " ++ show raw Just job -> processJob job -- Run forever, consuming jobs as they arrive consumerMain :: Connection -> IO () consumerMain conn = do putStrLn "Consumer started — waiting for jobs..." forever ( consumeOne conn ) brpop is the magic here. It blocks the connection until an item is available on any of the listed keys, then atomically pops and returns it. The 30 is a timeout in seconds — after which it returns Right Nothing so you can loop cleanly rather than hanging forever. This is fundamentally different from polling ( RPOP in a loop with threadDelay ) — blocking means zero CPU burn while the queue is empty. Step 4 — Wire It Together Create app/Main.hs : module Main where import Database.Redis import Control.Concurrent.Async ( concurrently_ ) import Producer import Consumer main :: IO () main = do -- Connect to local Redis; swap defaultConnectInfo for your host/port/auth conn <- connect defaultConnectInfo -- Run producer and consumer concurrently -- In production you'd run these as separate processes/services concurrently_ ( producerMain conn ) ( consumerMain conn ) concurrently_ from the async package runs both actions in parallel on separate OS threads, waiting for both to finish. In a real deployment you'd run the producer and consumer as separate services — this just wires them together for a clean demo. Running It # Terminal 1 — start Redis redis-server # Terminal 2 — run the app cabal run redis-queue Expected output: Producer starting — pushing jobs... Job enqueued. Queue depth: 1 Job enqueued. Queue depth: 2 Job enqueued. Queue depth: 3 Job enqueued. Queue depth: 4 Job enqueued. Queue depth: 5 Producer done. Consumer started — waiting for jobs... [Worker] Processing "refund" | ID: "txn-001" | Payload: "{"amount": 500, "currency": "INR"}" [Worker] Processing "refund" | ID: "txn-002" | Payload: "{"amount": 1200, "currency": "INR"}" [Worker] Processing "notify" | ID: "txn-003" | Payload: "{"email": "[email protected]"}" [Worker] Processing "refund" | ID: "txn-004" | Payload: "{"amount": 300, "currency": "USD"}" [Worker] Processing "notify" | ID: "txn-005" | Payload: "{"email": "[email protected]"}" Step 5 — Scaling to Multiple Consumers Want parallel workers? Spawn multiple consumers against the same queue: import Control.Concurrent.Async ( replicateConcurrently_ ) main :: IO () main = do conn <- connect defaultConnectInfo -- Run 4 parallel consumer workers concurrently_ ( producerMain conn ) ( replicateConcurrently_ 4 ( consumerMain conn )) Because BRPOP is atomic, each job is delivered to exactly one consumer — no double-processing. Redis handles the fan-out natively. You can verify this live: # In a Redis CLI while the app runs: redis-cli LLEN jobs :queue # current queue depth redis-cli MONITOR # watch every command in real time Step 6 — Dead Letter Handling (Production Hardening) In production, jobs can fail. You don't want failed jobs silently disappearing. Add a dead-letter queue: deadLetterKey :: ByteString deadLetterKey = "jobs:dead" -- Consume with failure handling consumeSafe :: Connection -> IO () consumeSafe conn = do result <- runRedis conn $ brpop [ queueKey ] 30 case result of Right ( Just ( _ , raw )) -> case decode ( fromStrict raw ) of Nothing -> do -- Malformed payload — send to dead letter queue _ <- runRedis conn $ lpush deadLetterKey [ raw ] putStrLn "Malformed job moved to dead letter queue" Just job -> -- Wrap in exception handler for business logic failures processJob job ` catch ` \ ( e :: SomeException ) -> do _ <- runRedis conn $ lpush deadLetterKey [ raw ] putStrLn $ "Job failed, dead-lettered: " ++ show e _ -> pure () Now failed jobs accumulate in jobs:dead where you can inspect, replay, or alert on them — no silent data loss. Connecting to a Real Redis Host For production (Redis Cloud, AWS ElastiCache, etc.): import Database.Redis productionConnInfo :: ConnectInfo productionConnInfo = defaultConnectInfo { connectHost = "your-redis-host.example.com" , connectPort = PortNumber 6379 , connectAuth = Just "your-auth-password" , connectDatabase = 0 , connectMaxConnections = 50 -- connection pool size } main :: IO () main = do conn <- connect productionConnInfo ... For TLS (Redis Cloud, Upstash, etc.), use checkedConnect with connectTLSParams set. What We Built vs. What Juspay Ran The pattern here is the same core design behind Juspay's refund processing pipeline — with a few additions at scale: This Tutorial Production at Juspay In-memory job type Protobuf-encoded payloads Single queue key Separate queues per refund type/priority brpop timeout loop Supervised consumer pools with h