Database Concurrency Problem permalink
We All Know about RDBMS ACID Properties. In this article, lets focus about atomicity and isolation, to achieve atomicity operation, the RDBMS provided with transaction to either commit all success changes and abort all if one fail, and If combined with isolation, we can achieve our goal to become a serious app, if you didn’t know Transaction and its Isolation Level, please check this PostgreSQL Documentation.
When transaction combined with isolation, what isolation level should i use? is it affect performance? how about race condition within concurrent transaction? we all have that question.
So lets start with problem and then talk about the solution, its very common about the young (even myself) not knowing about the problem but understand the database isolation, so we gained nothing the database isolation because we don’t know what is the difference and what problem it solves, lets talk about the problem one by one
Concurrency Problem permalink
Dirty Reads permalink
Dirty read meaning when we read from the database, we also read another uncommitted changes concurrently
It can be prevented by Read Committed Isolation Level and not allowed in most RDBMS, but on some vendors can be toggled by using Read Uncommited Isolation Level.
Dirty Writes permalink
When two concurrent transaction trying to update the same object in database and earlier write is part of a transaction that has not yet committed, so he later transaction overwrite an uncommitted transaction
Read Committed Isolation Level prevent dirty writes by locking the row when write (UPDATE, DELETE) to ensure no dirty write, so the later transaction wait until the earlier write is committed or aborted.
However, preventing the dirty writes doesn't prevent another problem, consider this case:
The expected behavior is the counter should be 3, incremented one by User 1 and incremented one by User 2, but it resulted 2. It encounter another Problem, The Lost Update
Non Repeatable Read permalink
Alices has $1000 that split equally between two account, when it transferred, it become 600 on Account 1, and 400 on account 2. In between the transferred, Alice read its balance, in account A is 500,in Account B is 400 and has total $900 even though she has $1000.
If Alice refreshes, it indeed becoming $1000 again. If you tolerate this temporary inconsistency, you can use Read Committed Isolation Level, however we have some situation that we cannot tolerate those inconsistencies, for example when doing backup, Alice $100 money is gone or when doing analytics queries and integrity checks.
The Non Repeatable Read can prevented by Repeatable Read Isolation Level. In PostgreSQL implementation level, it already has MVCC. So how performance it the isolation level? we will talk the benchmark on another section, hang on!
Lost Update permalink
You have already know about Lost Update problem because we encounter it when we prevent dirty write, lost update problem can occurred when we have read-modify-write cycle, we Read something in the database, modify it, and then write it back, for example:
- Increment a Counter: read a database, increment it, and write it again to the database.
- Two Users Editing a Collaborating Document (wiki, forum, notes): when two users editing the page at the same time, it overwriting whatever is currently in the database.
Because this problem is common, we have variety of solutions, such as:
-
Atomic Operation
UPDATE view_counter SET value = value +1 where document_id = $1;
-
Explicit Locking on Row Based Level:
Using Select ... For Update: it lock rows that returns from SELECT within the transaction, whenever another concurrent transaction want to change (SELECT FOR UPDATE, UPDATE, DELETE), it must wait until this transaction committed or aborted.
BEGIN TRANSACTION; SELECT * FROM forum WHERE id = $1 FOR UPDATE; ---- do some validation and then Change the Forum UPDATE forum SET content = $2 where id = $1 COMMIT;
However this is must implemented carefully, its easy to forget to add FOR UPDATE (the lock) and we are not careful, we might be locking more row that it should be.
-
Automatically Detecting Lost Updates
By Using Repeatable Read Isolation Level In PostgreSQL, whenever the lost update is detected, the transaction will return error with Code 40001, we must retry it on the application level.
-
Compare and Set
It usually when we have no transaction feature, like in the distributed system, we can implement it by any attributes, but it usually using version:
SELECT content, version FROM forum where id = $1 // Do some validation, and Modify UPDATE forum SET content = $1, version = version +1 WHERE id = $2 AND version = $3 --This the most important key for Compare And Set
We get the attribute and version, modify it, update the attribute, and then write it back it condition version that we are get earlier, whenever the affected rows is 0, we must retry the cycle (read-modify-write) on the application level.
We will benchmark using Explicit Lock, Repeatable Read, and Compare And Set method to see in which throughput is good and its trade off but for now, we will go to the next concurrency problem.
Write Skew permalink
This problem is generalized whenever we read the database and based on that read, we can abort if its not met the validation or continue the transaction to insert or update, for example, we have hospital system that has a requirement at least we have 1 doctor in the ER, if two doctor click on_leave at the same time, this can happened:
But that Problem is easily solved by SELECT … FOR UPDATE
Another Example:
-
Hotel System who Cannot Double-Booking
BEGIN TRANSACTION; SELECT COUNT(1) FROM hotel_books WHERE hotel_id = $1 AND room = $2 end_time > $4 AND start_time < $3; -- If the query above returned non zero, abort; -- If the query above returned zero, insert hotel_books INSERT INTO hotel_books (hotel_id, room, start_time, end_time, user_id) VALUES ($1,$2,$3,$4,$5); COMMIT;
This cannot solved by SELECT ... For Update because the returning is either zero or many, whenever it returned zero, it locks on nothing, so double booking can happened.
-
Claiming Username / Phone_Number / Email on User:
In the application code, we usually do this:
- Read if Username is not found on the database.
- If username found, return 400 and abort the transaction.
- If username not found, insert the username to the database, return 200.
This can easily solved by unique constraint
On all of those example, the pattern is read, check, and do something based the first read.
All of those example can easily avoided using Serializable Isolation level, but how performance is serializable?
in PostgreSQL, Serializable Isolation Level implementation by using SSI (Serializable Snapshot Isolation) which mean it based on Repeatable Read Isolation Level and added detecting the write skew and phantom read on commit, whenever it has a phantom read or write skew, it aborted. So, it added overhead to keep all concurent transaction and the check on commit.
Isolation Level Benchmark in PostgreSQL permalink
Lets benchmark all these Isolation Level to know what isolation level we should use.
As an example, we want to implement forum, that user can write, comment, add reaction to it
CREATE EXTENSION IF NOT EXISTS citext;
CREATE TABLE IF NOT EXISTS ACCOUNT (
id TEXT PRIMARY KEY,
username CITEXT UNIQUE,
phone_number CITEXT UNIQUE,
email CITEXT UNIQUE,
hashed_password TEXT NOT NULL,
is_deleted BOOLEAN NOT NULL DEFAULT FALSE,
created_on TIMESTAMPTZ NOT NULL,
updated_on TIMESTAMPTZ,
version BIGINT NOT NULL DEFAULT 1
);
CREATE TABLE IF NOT EXISTS THREAD (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
body TEXT NOT NULL,
total_comment BIGINT NOT NULL DEFAULT 0,
total_reaction BIGINT NOT NULL DEFAULT 0,
created_by TEXT NOT NULL REFERENCES ACCOUNT ON DELETE CASCADE,
created_on TIMESTAMPTZ NOT NULL,
updated_by TEXT REFERENCES ACCOUNT ON DELETE CASCADE,
updated_on TIMESTAMPTZ,
is_deleted BOOLEAN NOT NULL DEFAULT FALSE,
version BIGINT NOT NULL DEFAULT 1
);
CREATE TABLE IF NOT EXISTS COMMENT (
id TEXT PRIMARY KEY,
thread_id TEXT NOT NULL REFERENCES THREAD ON DELETE CASCADE,
user_id TEXT NOT NULL REFERENCES ACCOUNT ON DELETE CASCADE,
reply_to TEXT REFERENCES COMMENT ON DELETE CASCADE,
content TEXT NOT NULL,
total_reply BIGINT NOT NULL DEFAULT 0,
total_reaction BIGINT NOT NULL DEFAULT 0,
created_on TIMESTAMPTZ NOT NULL,
updated_on TIMESTAMPTZ,
is_deleted BOOLEAN NOT NULL DEFAULT FALSE,
version BIGINT NOT NULL DEFAULT 1
);
CREATE TABLE IF NOT EXISTS REACTION (
id TEXT PRIMARY KEY,
account_id TEXT NOT NULL REFERENCES ACCOUNT ON DELETE CASCADE,
thread_id TEXT REFERENCES THREAD ON DELETE CASCADE,
comment_id TEXT REFERENCES COMMENT ON DELETE CASCADE,
content VARCHAR(100) NOT NULL,
created_on TIMESTAMPTZ NOT NULL,
updated_on TIMESTAMPTZ,
version BIGINT NOT NULL DEFAULT 1
);
In this benchmark we use Golang, PostgreSQL, and PGX as a postgresql driver. We will benchmark the perfomance based on single object and multiple object.
Single Object permalink
We using read-modify-write to ACCOUNT table with various transaction isolation level
func ReadModifyWriteUser(ctx context.Context, txOpt pgx.TxOptions, userId string) error {
return db.Atomic(ctx, txOpt, func(tx db.Connection) error {
account, err := repository.GetAccount(ctx, tx, userId)
if err != nil {
return err
}
account.Username = helper.ToPointer(faker.Username())
account.Email = helper.ToPointer(faker.Email())
return repository.UpdateAccount(ctx, tx, account)
})
}
Multiple Object permalink
On multiple object, we added a comment to a thread, and increment the counter to the thread
func ReadModifyWriteComment(ctx context.Context, txOpt pgx.TxOptions, userId, threadId string) error {
return db.Atomic(ctx, txOpt, func(tx db.Connection) error {
_, err := repository.GetAccount(ctx, tx, userId)
if err != nil {
return err
}
thread, err := repository.GetThread(ctx, tx, threadId)
if err != nil {
return err
}
commentId := helper.CommentId()
err = repository.CreateComment(ctx, tx, repository.Comment{
Id: commentId,
ThreadId: threadId,
UserId: userId,
Content: faker.Sentence(),
CreatedOn: time.Now(),
Version: 1,
})
if err != nil {
return err
}
thread.TotalComment++
return repository.UpdateThread(ctx, tx, thread)
})
}
The db.Atomic is wrapped transaction that when returned error, it will rollback and if returned nil, it will commit
package db
func Atomic(ctx context.Context, txOpt pgx.TxOptions, cb func(tx Connection) error) error {
conn, err := GetConnection(ctx)
if err != nil {
return err
}
defer conn.Release()
tx, err := conn.BeginTx(ctx, txOpt)
if err != nil {
return err
}
err = cb(tx)
if err != nil {
if rbErr := tx.Rollback(ctx); rbErr != nil {
return fmt.Errorf("cannot rollback %w: %w", rbErr, err)
}
return err
}
return tx.Commit(ctx)
}
Result permalink
We can easily see that Read Committed and Repeatable Read Isolation level can be use interchangeably because the performance is quite the same, so when not sure which one to use, use Repeatable Read Isolation Level.
On Serializable Isolation level, the performance drop a little, so whenever we are sure the problem that we are facing is can be solved by lower isolation level, use the lower isolation level.
Lost Update Benchmark permalink
On the Lost Update section, we have variety of solution, but which one is good for throughput and good for consistency? Lets benchmark those by doing Add Reaction To Thread by Insert Row to Reaction Table and Increment Thread Reaction Counter.
-
Explicit Locking on Row Based Level
For Explicit Locking on Row Based Level Solution, we use default isolation level, which is read Committed Isolation Level
type ForUpdate struct{} func (f ForUpdate) Do(ctx context.Context, threadId, userId string) error { return f.readModifyWriteReactionToThreadId(ctx, pgx.TxOptions{}, threadId, userId, ) } func (ForUpdate) readModifyWriteReactionToThreadId( ctx context.Context, txOpt pgx.TxOptions, threadId, userId string, ) error { return db.Atomic(ctx, txOpt, func(tx db.Connection) error { thread, err := repository.GetThread(ctx, tx, threadId, repository.GetThreadOption{ ForUpdate: true, }, ) if err != nil { return err } _, err = repository.GetAccount(ctx, tx, userId) if err != nil { return err } err = repository.CreateReaction(ctx, tx, repository.Reaction{ Id: helper.ReactionId(), AccountId: userId, ThreadId: &threadId, Content: "like", CreatedOn: time.Now(), Version: 1, }) if err != nil { return err } thread.TotalReaction++ return repository.UpdateThread(ctx, tx, thread) }) }
-
Automatically Detecting Lost Updates
For Repeatable Read that automatically detect the Lost Update, we can just implemented retry on the application level and forget about the lock, whenever errors code is 40001, we retry it until retry counter reach maximum and return error max retry, so implementation retriable Transaction is becoming like this
const maxRetry = 5 func AtomicWithAutoRetry( ctx context.Context, txOpt pgx.TxOptions, cb func(tx Connection) error, ) error { return transaction(ctx, txOpt, cb, maxRetry) } var ErrLimitRetry = errors.New("retry limit exceeded!") func transaction( ctx context.Context, txOpt pgx.TxOptions, cb func(tx Connection) error, retry int, ) error { if retry < 0 { return ErrLimitRetry } conn, err := GetConnection(ctx) if err != nil { return err } defer conn.Release() tx, err := conn.BeginTx(ctx, txOpt) if err != nil { return err } err = cb(tx) if err != nil { if rbErr := tx.Rollback(ctx); rbErr != nil { return fmt.Errorf("cannot rollback %w: %w", rbErr, err) } var pgErr *pgconn.PgError if errors.As(err, &pgErr) && pgErr.Code == "40001" { return transaction(ctx, txOpt, cb, retry-1) } return err } return tx.Commit(ctx) }
And the implementation is becoming more simple
type RepeatableRead struct{} func (r RepeatableRead) Do( ctx context.Context, threadId, userId string, ) error { return r.readModifyWriteReactionToThreadId( ctx, pgx.TxOptions{ IsoLevel: pgx.RepeatableRead, }, threadId, userId, ) } func (RepeatableRead) readModifyWriteReactionToThreadId( ctx context.Context, txOpt pgx.TxOptions, threadId, userId string, ) error { return db.AtomicWithAutoRetry(ctx, txOpt, func(tx db.Connection) error { thread, err := repository.GetThread(ctx, tx, threadId) if err != nil { return err } _, err = repository.GetAccount(ctx, tx, userId) if err != nil { return err } err = repository.CreateReaction(ctx, tx, repository.Reaction{ Id: helper.ReactionId(), AccountId: userId, ThreadId: &threadId, Content: "like", CreatedOn: time.Now(), Version: 1, }) if err != nil { return err } thread.TotalReaction++ return repository.UpdateThread(ctx, tx, thread) }) }
-
Compare and Set
In the Compare and Set, we don’t use transaction at all to following with premise distributed system that transaction is not available, so we retry with manual abort
type CompareAndSet struct{} func (c CompareAndSet) Do( ctx context.Context, threadId, userId string, ) error { return c.readModifyWriteReactionToThreadId(ctx, threadId, userId) } func (CompareAndSet) readModifyWriteReactionToThreadId( ctx context.Context, threadId, userId string, ) error { conn, err := db.GetConnection(ctx) if err != nil { return err } _, err = repository.GetAccount(ctx, conn, userId) if err != nil { return err } conn.Release() return db.RetryMatchAndSet(ctx, func(conn db.Connection) error { thread, err := repository.GetThread(ctx, conn, threadId) if err != nil { return err } reactionId := helper.ReactionId() err = repository.CreateReaction(ctx, conn, repository.Reaction{ Id: reactionId, AccountId: userId, ThreadId: &threadId, Content: "like", CreatedOn: time.Now(), Version: 1, }) if err != nil { //in a production environtment we need to deleteReaction in case its already created but failed to send a success response back // _ = repository.DeleteReaction(ctx, conn, reactionId) return err } thread.TotalReaction++ err = repository.UpdateThread(ctx, conn, thread, repository.UpdateThreadOption{ CompareAndSet: &repository.CompareAndSetOption{ Version: thread.Version, }, }) if err != nil { //in a production environtment we need to rollback UpdateThread to its previous State in case its already updated but failed to send a success response back // _ = repository.UpdateThread(ctx, conn, oldThread) _ = repository.DeleteReaction(ctx, conn, reactionId) return err } return nil }) }
The RetryMatchAndSet will retry the callback whenever the ErrVersionMismatch is returned
const maxRetry = 5 var ErrVersionMisMatch = errors.New("version mismacth, must retry") func RetryMatchAndSet(ctx context.Context, cb func(conn Connection) error) error { conn, err := GetConnection(ctx) if err != nil { return err } defer conn.Release() var errToTry error retryCount := 0 for retryCount < maxRetry { errToTry = cb(conn) if errToTry != nil && !errors.Is(errToTry, ErrVersionMisMatch) { return errToTry } retryCount++ } if retryCount == maxRetry { return ErrLimitRetry } return nil }
After that, we do testing to check all of those method
type Reaction interface { Do(ctx context.Context, threadId, userId string) error } func TestReactionCounter(t *testing.T) { ctx := context.Background() tests := []struct { name string reaction Reaction }{ { name: "locking", reaction: lostupdatebenchmark.ForUpdate{}, }, { name: "repeatable read", reaction: lostupdatebenchmark.RepeatableRead{}, }, { name: "compare and set", reaction: lostupdatebenchmark.CompareAndSet{}, }, } for _, tt := range tests { userId, err := helper.CreateUser() require.NoError(t, err) threadId, err := helper.CreateThread(userId) require.NoError(t, err) t.Run(tt.name, func(t *testing.T) { start := time.Now() err := addConcurentReaction(ctx, tt.reaction, 100, threadId) if err != nil { log.Println(err) } c, err := db.GetConnection(ctx) require.NoError(t, err) defer c.Release() thread, err := repository.GetThread(ctx, c, threadId) require.NoError(t, err) assert.Equal(t, 100, thread.TotalReaction) fmt.Println("execution time: ", time.Since(start)) }) } } func addConcurentReaction(ctx context.Context, cb Reaction, concurentUser int, threadId string) error { newUserIds := make([]string, 0, concurentUser) for i := 0; i < concurentUser; i++ { userId, err := helper.CreateUser() if err != nil { return err } newUserIds = append(newUserIds, userId) } var wg sync.WaitGroup errs := make([]error, concurentUser) for i := 0; i < concurentUser; i++ { wg.Add(1) go func(i int) { defer wg.Done() errs[i] = cb.Do(ctx, threadId, newUserIds[i]) }(i) } wg.Wait() for _, err := range errs { if err != nil { log.Println(err) } } return nil }
The returned on those test is:
The Lock Row Using SELECT ... FOR UPDATE Method is has 100 row inserted and has throughput 424ms.
The Automatically Lost Update Detected Method has 93 row inserted and has throughput 325ms.
The Compare And Set method perform the worse with 18 row inserted and has throughput 367ms.
So we can conclude that Lock Row Method has good consistency, but has the worse throughput than Automatically Lost Update Detected method and vice versa. Compare And Set method is good enough when you don’t have a transaction. After those benchmark, we know about the trade-off, so use your own poison.
You can see all the benchmark in my repo
I thinks that its guys, i hope you can learn from the mistake that I've made and know about its trade-off between the Isolation level and Lost Update method
References:
- Designing Data-Intensive Applications by Martin Kleppmann
- https://www.postgresql.org/docs/current/transaction-iso.html