Multiple workers need to process tasks from a pending_tasks
table without conflicts.
x1sql
2CREATE TABLE pending_tasks (
3task_id SERIAL PRIMARY KEY,
4payload JSONB,
5status VARCHAR(20) DEFAULT 'pending',
6claimed_by UUID
7);
8
9-- Insert sample tasks
10INSERT INTO pending_tasks (payload)
11VALUES
12('{"type": "email"}'),
13('{"type": "report"}'),
14('{"type": "cleanup"}');
Must use
ORDER BY task_id
and
FOR UPDATE SKIP LOCKED
x1sql
2BEGIN;
3
4-- Claim first available task that's not locked
5SELECT * FROM pending_tasks
6WHERE status = 'pending'
7ORDER BY task_id
8FOR UPDATE SKIP LOCKED
9LIMIT 1;
10
11-- Returns: task_id=1 (oldest pending task)
12-- Locks row 1 exclusively
xxxxxxxxxx
121sql
2BEGIN;
3
4-- Same query in concurrent session skips locked row
5SELECT * FROM pending_tasks
6WHERE status = 'pending'
7ORDER BY task_id
8FOR UPDATE SKIP LOCKED
9LIMIT 1;
10
11-- Returns: task_id=2 (next available task)
12-- Never sees task_id=1 because it's locked by Worker 1
Add commit after every update
Worker 1:
xxxxxxxxxx
71sql
2UPDATE pending_tasks
3SET status = 'processing',
4claimed_by = 'worker-uuid-1'
5WHERE task_id = 1;
6
7COMMIT; -- Releases lock
Worker 2:
xxxxxxxxxx
71sql
2UPDATE pending_tasks
3SET status = 'processing',
4claimed_by = 'worker-uuid-2'
5WHERE task_id = 2;
6
7COMMIT;
Non-blocking: Workers don't wait for each other's locks
Fair ordering: ORDER BY task_id
ensures oldest tasks are processed first
Atomic claims: Each task is guaranteed to be processed by only one worker
To use FOR UPDATE SKIP LOCKED
with GROUP BY
in PostgreSQL, you cannot directly combine these clauses because GROUP BY
requires aggregate functions and FOR UPDATE
locks individual rows, not grouped results37. However, there are workarounds:
For scenarios requiring exclusive access to grouped data (e.g., all jobs for a specific person_id
), use advisory locks to coordinate at the application level:
xxxxxxxxxx
81sql
2BEGIN;
3SELECT * FROM jobs
4WHERE status = 'new'
5AND pg_try_advisory_xact_lock(person_id) = true
6LIMIT 1 FOR UPDATE SKIP LOCKED;
7-- Process jobs for this person_id
8COMMIT;
How it works:
pg_try_advisory_xact_lock(person_id)
attempts to lock the person_id
without waiting3.
If the lock succeeds (true
), the worker processes all jobs for that person_id
.
If the lock fails (false
), the worker skips this person_id
and continues to the next available one.
FOR UPDATE SKIP LOCKED
Row-level locking: SKIP LOCKED
skips individual locked rows, not grouped results25.
Inconsistent views: Designed for queue-like systems where consistency isn’t critical2.
No batch locking: Cannot lock all rows for a group in a single operation3.
If you need to process all jobs for a group (e.g., all pending jobs for a person_id
), use a subquery to first identify the group, then lock its rows:
xxxxxxxxxx
121sql
2BEGIN;
3WITH target_person AS (
4SELECT person_id FROM jobs
5WHERE status = 'new'
6LIMIT 1
7)
8SELECT * FROM jobs
9WHERE person_id = (SELECT person_id FROM target_person)
10FOR UPDATE SKIP LOCKED;
11-- Process all jobs for this person_id
12COMMIT;