Что делать, когда к данным одновременно обращаются два воркера

Когда два или более воркеров (или потоков/процессов) одновременно обращаются к одним и тем же данным, возникает проблема конкурентного доступа (race condition). Это может привести к неконсистентным данным, дублированию, потерям обновлений или даже краху системы. Чтобы избежать этих проблем, необходимо использовать механизмы синхронизации и блокировки, которые обеспечивают атомарность операций.

Ниже подробно описаны подходы и техники, которые применяются для безопасного параллельного доступа к данным.

1. Понимание проблемы

Race condition — это ситуация, когда исход результата зависит от того, в каком порядке выполняются операции двумя или более конкурентными исполнителями (воркерами). Например, если два воркера одновременно пытаются:

  • обновить одну и ту же запись в базе данных

  • изменить значение счетчика

  • взять задачу из очереди и пометить её как выполненную

...может возникнуть конфликт.

2. Механизмы защиты от одновременного доступа

2.1. Транзакции в базе данных

Транзакции позволяют сгруппировать несколько операций в атомарный блок: либо все операции выполняются, либо ни одна. Особенно важно использовать изолированные уровни транзакций.

BEGIN;
SELECT balance FROM accounts WHERE id = 1 FOR UPDATE;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
COMMIT;
  • FOR UPDATE блокирует строку, чтобы другие воркеры не могли её читать/изменять до завершения транзакции.

  • Позволяет избежать состояния, когда два воркера одновременно читают одно значение и записывают его независимо.

2.2. Блокировки (Locks)

✅ Row-level lock (строковые блокировки)

Обычно через SELECT ... FOR UPDATE — предотвращает одновременное изменение одной и той же строки.

✅ Advisory Locks

В PostgreSQL и других СУБД можно использовать программные блокировки, которые можно задавать по произвольным ключам:

SELECT pg_advisory_lock(12345); -- воркер 1
SELECT pg_advisory_unlock(12345);
✅ Redis Locks (распределенные блокировки)

Используются в распределённых системах, например, через механизм Redlock:

\# pseudocode using Redis
if redis.set("lock:task:123", "locked", nx: true, ex: 5)
process_task()
redis.del("lock:task:123")
else
skip_or_wait()
end

3. Оптимистичная и пессимистичная блокировка

✅ Пессимистичная блокировка

Блокирует данные сразу после начала работы с ними. Используется там, где вероятность конфликта высокая.

User.lock.find(1) # ActiveRecord — эквивалент SELECT FOR UPDATE

✅ Оптимистичная блокировка

Не блокирует данные, но проверяет, не изменились ли они в процессе выполнения. Часто реализуется через поле lock_version.

\# В модели ActiveRecord
t.integer :lock_version, default: 0
\# При сохранении:

record.save! # выбросит исключение, если версия изменилась

4. Очереди задач и «взятие в работу»

Чтобы два воркера не обработали одну и ту же задачу, используют:

✅ Атомарное взятие задачи

UPDATE jobs
SET status = 'processing'
WHERE status = 'pending'
ORDER BY created_at
LIMIT 1
RETURNING \*;

Этот запрос позволяет воркеру взять задачу и сразу пометить её как занятую — атомарно.

5. Использование mutex (в многопоточном контексте)

В локальных потоках, например на Ruby или Python, используют мьютексы:

mutex = Mutex.new
mutex.synchronize do
\# критическая секция
end

Но это работает только в пределах одного процесса, и не защищает от нескольких воркеров на разных машинах.

6. Использование распределённых очередей (например, Sidekiq, RabbitMQ)

Брокеры сообщений гарантируют, что одна задача попадёт только одному воркеру:

  • Sidekiq использует Redis и распределяет задачи по ключам

  • RabbitMQ и Kafka реализуют гарантии доставки: at most once, at least once, **exactly once
    **

7. Примеры, где возникает конфликт

  • ❌ Два воркера обрабатывают один и тот же заказ и дважды списывают деньги.

  • ❌ Несколько пользователей одновременно покупают один и тот же последний товар на складе.

  • ❌ Два потока обновляют одно и то же значение в конфигурации и перезаписывают изменения друг друга.

8. Стратегии управления конфликтами

  • Неизменяемые данные (immutable) — хранить все изменения как события (event sourcing)

  • Queue-based ownership — задача выдается только одному исполнителю

  • Delayed retry + backoff — если доступ невозможен, повторить через X секунд

  • Dead letter queues — если воркер "не смог", задача попадает в отдельную очередь

9. Мониторинг и логгирование

Всегда логируйте:

  • кто взял задачу

  • какие блокировки наложены

  • сколько времени заняли транзакции

  • наличие deadlocks (в PostgreSQL их можно отлавливать по логам)

10. Обработка ошибок и повторов

Если воркер не смог взять блокировку или задача уже обрабатывается:

  • можно отложить выполнение (retry через 5 секунд)

  • можно использовать флаг состояния (processing, locked)

  • не допускать перезапуска задач вручную без проверки их состояния

Обеспечение корректной работы с общими данными при конкурентном доступе — ключ к надежности многопоточных и распределённых систем. Основное правило: атомарность, синхронизация и идемпотентность действий — тогда даже два воркера не смогут повредить систему.