I am implementing a work queue that processes some data for each user in the DB. The data can be processed for each user once, so I have the following schema:
CREATE TABLE users (
id INTEGER NOT NULL,
PRIMARY KEY (id)
);
CREATE TABLE emails (
email VARCHAR NOT NULL,
domain VARCHAR NOT NULL,
PRIMARY KEY (email)
);
CREATE TABLE email_usage (
email VARCHAR NOT NULL,
user_id INTEGER NOT NULL,
PRIMARY KEY (email, user_id),
FOREIGN KEY(email) REFERENCES emails (email),
FOREIGN KEY(user_id) REFERENCES users (id)
);
Because each email can be processed for each user once (but the same email might still be processed for other users), I use anti-join query which tells me if it has already been processed for the specific user or not. After I select unused email I immediately want to mark it as used so I can start processing and other workers won't select it for the same user. I do this through SQLAlchemy but the resulting query is this:
BEGIN;
INSERT INTO email_usage (email, user_id)
SELECT email, some_user_id FROM emails ea /* some_user_id is a variable filled by SQLAlchemy */
WHERE ea.domain = 'some_domain.com'
AND NOT EXISTS(
SELECT 1 FROM email_usage eu
WHERE ea.email = eu.email
AND eu.user_id = some_user_id
)
FOR UPDATE SKIP LOCKED
LIMIT 1;
COMMIT;
The issue is that when I scale up worker processes I start getting a lot of duplicate key errors on email_usage
table, so the INSERT statement is inserting already existing data. I don't understand how this happens, since the SELECT FOR UPDATE should lock the row preventing other queries executing SELECT FOR UPDATE to acquire the lock and they should just skip to the next available row, or am I missing something?
I tried playing with transaction isolation levels, but only when I used SERIALIZABLE level I started getting different errors: (psycopg2.errors.SerializationFailure) could not serialize access due to read/write dependencies among transactions
. I'm pretty sure that SQLAlchemy is not commiting and therefore releasing the FOR UPDATE lock after the SELECT statement executes.
CodePudding user response:
This can only happen if a transaction is committed, so make sure that you delete the row (or make it not eligible in some other way) before the transaction ends.
The transaction isolation level has no impact on this.