Database Concurency Problem and Benchmark

2814 words - 15 min read

2/24/2024

photo credit


Database Concurrency Problem permalink

We All Know about RDBMS ACID Properties. In this article, lets focus about atomicity and isolation, to achieve atomicity operation, the RDBMS provided with transaction to either commit all success changes and abort all if one fail, and If combined with isolation, we can achieve our goal to become a serious app, if you didn’t know Transaction and its Isolation Level, please check this PostgreSQL Documentation.

When transaction combined with isolation, what isolation level should i use? is it affect performance? how about race condition within concurrent transaction? we all have that question.

So lets start with problem and then talk about the solution, its very common about the young (even myself) not knowing about the problem but understand the database isolation, so we gained nothing the database isolation because we don’t know what is the difference and what problem it solves, lets talk about the problem one by one

Concurrency Problem permalink

Dirty Reads permalink

Dirty read meaning when we read from the database, we also read another uncommitted changes concurrently

Dirty Read

It can be prevented by Read Committed Isolation Level and not allowed in most RDBMS, but on some vendors can be toggled by using Read Uncommited Isolation Level.

Dirty Writes permalink

Dirty Writes

When two concurrent transaction trying to update the same object in database and earlier write is part of a transaction that has not yet committed, so he later transaction overwrite an uncommitted transaction

Read Committed Isolation Level prevent dirty writes by locking the row when write (UPDATE, DELETE) to ensure no dirty write, so the later transaction wait until the earlier write is committed or aborted.

However, preventing the dirty writes doesn't prevent another problem, consider this case:

Lost Update

The expected behavior is the counter should be 3, incremented one by User 1 and incremented one by User 2, but it resulted 2. It encounter another Problem, The Lost Update

Non Repeatable Read permalink

Non Repeatable Read

Alices has $1000 that split equally between two account, when it transferred, it become 600 on Account 1, and 400 on account 2. In between the transferred, Alice read its balance, in account A is 500,in Account B is 400 and has total $900 even though she has $1000.

If Alice refreshes, it indeed becoming $1000 again. If you tolerate this temporary inconsistency, you can use Read Committed Isolation Level, however we have some situation that we cannot tolerate those inconsistencies, for example when doing backup, Alice $100 money is gone or when doing analytics queries and integrity checks.

The Non Repeatable Read can prevented by Repeatable Read Isolation Level. In PostgreSQL implementation level, it already has MVCC. So how performance it the isolation level? we will talk the benchmark on another section, hang on!

Lost Update permalink

You have already know about Lost Update problem because we encounter it when we prevent dirty write, lost update problem can occurred when we have read-modify-write cycle, we Read something in the database, modify it, and then write it back, for example:

  • Increment a Counter: read a database, increment it, and write it again to the database.
  • Two Users Editing a Collaborating Document (wiki, forum, notes): when two users editing the page at the same time, it overwriting whatever is currently in the database.

Because this problem is common, we have variety of solutions, such as:

  • Atomic Operation

    UPDATE view_counter SET value = value +1 where document_id = $1;
    
  • Explicit Locking on Row Based Level:

    Using Select ... For Update: it lock rows that returns from SELECT within the transaction, whenever another concurrent transaction want to change (SELECT FOR UPDATE, UPDATE, DELETE), it must wait until this transaction committed or aborted.

    BEGIN TRANSACTION;
    
    SELECT *
    FROM forum
    WHERE id = $1
    FOR UPDATE; 
    
    ---- do some validation and then Change the Forum
    UPDATE forum SET content = $2 where id = $1
    
    COMMIT;
    

    However this is must implemented carefully, its easy to forget to add FOR UPDATE (the lock) and we are not careful, we might be locking more row that it should be.

  • Automatically Detecting Lost Updates

    By Using Repeatable Read Isolation Level In PostgreSQL, whenever the lost update is detected, the transaction will return error with Code 40001, we must retry it on the application level.

  • Compare and Set

    It usually when we have no transaction feature, like in the distributed system, we can implement it by any attributes, but it usually using version:

    SELECT content, version
    FROM forum
    where id = $1
    
    // Do some validation, and Modify
    
    UPDATE forum SET 
    	content = $1,
    	version = version +1
    WHERE id = $2 AND
    version = $3 --This the most important key for Compare And Set
    

    We get the attribute and version, modify it, update the attribute, and then write it back it condition version that we are get earlier, whenever the affected rows is 0, we must retry the cycle (read-modify-write) on the application level.

We will benchmark using Explicit Lock, Repeatable Read, and Compare And Set method to see in which throughput is good and its trade off but for now, we will go to the next concurrency problem.

Write Skew permalink

This problem is generalized whenever we read the database and based on that read, we can abort if its not met the validation or continue the transaction to insert or update, for example, we have hospital system that has a requirement at least we have 1 doctor in the ER, if two doctor click on_leave at the same time, this can happened:

Write Skwe

But that Problem is easily solved by SELECT … FOR UPDATE

Another Example:

  • Hotel System who Cannot Double-Booking

    BEGIN TRANSACTION;
    
    SELECT COUNT(1) FROM hotel_books
     WHERE hotel_id = $1 AND
     room = $2
     end_time > $4 AND start_time < $3;
    -- If the query above returned non zero, abort;
    -- If the query above returned zero, insert hotel_books
    
    INSERT INTO hotel_books
     (hotel_id, room, start_time, end_time, user_id)
     VALUES ($1,$2,$3,$4,$5);
    COMMIT;
    

    This cannot solved by SELECT ... For Update because the returning is either zero or many, whenever it returned zero, it locks on nothing, so double booking can happened.

  • Claiming Username / Phone_Number / Email on User:

    In the application code, we usually do this:

    1. Read if Username is not found on the database.
    2. If username found, return 400 and abort the transaction.
    3. If username not found, insert the username to the database, return 200.

    This can easily solved by unique constraint

On all of those example, the pattern is read, check, and do something based the first read.

All of those example can easily avoided using Serializable Isolation level, but how performance is serializable?

in PostgreSQL, Serializable Isolation Level implementation by using SSI (Serializable Snapshot Isolation) which mean it based on Repeatable Read Isolation Level and added detecting the write skew and phantom read on commit, whenever it has a phantom read or write skew, it aborted. So, it added overhead to keep all concurent transaction and the check on commit.

Isolation Level Benchmark in PostgreSQL permalink

Lets benchmark all these Isolation Level to know what isolation level we should use.

As an example, we want to implement forum, that user can write, comment, add reaction to it

CREATE EXTENSION IF NOT EXISTS citext;

CREATE TABLE IF NOT EXISTS ACCOUNT (
  id TEXT PRIMARY KEY,
  username CITEXT UNIQUE,
  phone_number CITEXT UNIQUE,
  email CITEXT UNIQUE,
  hashed_password TEXT NOT NULL,
  is_deleted BOOLEAN NOT NULL DEFAULT FALSE,
  created_on TIMESTAMPTZ NOT NULL,
  updated_on TIMESTAMPTZ,
  version BIGINT NOT NULL DEFAULT 1
);

CREATE TABLE IF NOT EXISTS THREAD (
  id TEXT PRIMARY KEY,
  title TEXT NOT NULL,
  body TEXT NOT NULL,
  total_comment BIGINT NOT NULL DEFAULT 0,
  total_reaction BIGINT NOT NULL DEFAULT 0,
  created_by TEXT NOT NULL REFERENCES ACCOUNT ON DELETE CASCADE,
  created_on TIMESTAMPTZ NOT NULL,
  updated_by TEXT REFERENCES ACCOUNT ON DELETE CASCADE,
  updated_on TIMESTAMPTZ,
  is_deleted BOOLEAN NOT NULL DEFAULT FALSE,
  version BIGINT NOT NULL DEFAULT 1
);

CREATE TABLE IF NOT EXISTS COMMENT (
  id TEXT PRIMARY KEY,
  thread_id TEXT NOT NULL REFERENCES THREAD ON DELETE CASCADE,
  user_id TEXT NOT NULL REFERENCES ACCOUNT ON DELETE CASCADE,
  reply_to TEXT REFERENCES COMMENT ON DELETE CASCADE,
  content TEXT NOT NULL,
  total_reply BIGINT NOT NULL DEFAULT 0,
  total_reaction BIGINT NOT NULL DEFAULT 0,
  created_on TIMESTAMPTZ NOT NULL,
  updated_on TIMESTAMPTZ,
  is_deleted BOOLEAN NOT NULL DEFAULT FALSE,
  version BIGINT NOT NULL DEFAULT 1
);

CREATE TABLE IF NOT EXISTS REACTION (
  id TEXT PRIMARY KEY,
  account_id TEXT NOT NULL REFERENCES ACCOUNT ON DELETE CASCADE,
  thread_id TEXT REFERENCES THREAD ON DELETE CASCADE,
  comment_id TEXT REFERENCES COMMENT ON DELETE CASCADE,
  content VARCHAR(100) NOT NULL,
  created_on TIMESTAMPTZ NOT NULL,
  updated_on TIMESTAMPTZ,
  version BIGINT NOT NULL DEFAULT 1
);

In this benchmark we use Golang, PostgreSQL, and PGX as a postgresql driver. We will benchmark the perfomance based on single object and multiple object.

Single Object permalink

We using read-modify-write to ACCOUNT table with various transaction isolation level

func ReadModifyWriteUser(ctx context.Context, txOpt pgx.TxOptions, userId string) error {
	return db.Atomic(ctx, txOpt, func(tx db.Connection) error {
		account, err := repository.GetAccount(ctx, tx, userId)
		if err != nil {
			return err
		}

		account.Username = helper.ToPointer(faker.Username())
		account.Email = helper.ToPointer(faker.Email())
		return repository.UpdateAccount(ctx, tx, account)
	})
}

Multiple Object permalink

On multiple object, we added a comment to a thread, and increment the counter to the thread

func ReadModifyWriteComment(ctx context.Context, txOpt pgx.TxOptions, userId, threadId string) error {
	return db.Atomic(ctx, txOpt, func(tx db.Connection) error {
		_, err := repository.GetAccount(ctx, tx, userId)
		if err != nil {
			return err
		}

		thread, err := repository.GetThread(ctx, tx, threadId)
		if err != nil {
			return err
		}

		commentId := helper.CommentId()
		err = repository.CreateComment(ctx, tx, repository.Comment{
			Id:        commentId,
			ThreadId:  threadId,
			UserId:    userId,
			Content:   faker.Sentence(),
			CreatedOn: time.Now(),
			Version:   1,
		})
		if err != nil {
			return err
		}

		thread.TotalComment++
		return repository.UpdateThread(ctx, tx, thread)
	})
}

The db.Atomic is wrapped transaction that when returned error, it will rollback and if returned nil, it will commit

package db

func Atomic(ctx context.Context, txOpt pgx.TxOptions, cb func(tx Connection) error) error {
	conn, err := GetConnection(ctx)
	if err != nil {
		return err
	}
	defer conn.Release()

	tx, err := conn.BeginTx(ctx, txOpt)
	if err != nil {
		return err
	}

	err = cb(tx)
	if err != nil {
		if rbErr := tx.Rollback(ctx); rbErr != nil {
			return fmt.Errorf("cannot rollback %w: %w", rbErr, err)
		}

		return err
	}

	return tx.Commit(ctx)
}

Result permalink

Benchmark Isolation Level

We can easily see that Read Committed and Repeatable Read Isolation level can be use interchangeably because the performance is quite the same, so when not sure which one to use, use Repeatable Read Isolation Level.

On Serializable Isolation level, the performance drop a little, so whenever we are sure the problem that we are facing is can be solved by lower isolation level, use the lower isolation level.

Lost Update Benchmark permalink

On the Lost Update section, we have variety of solution, but which one is good for throughput and good for consistency? Lets benchmark those by doing Add Reaction To Thread by Insert Row to Reaction Table and Increment Thread Reaction Counter.

  • Explicit Locking on Row Based Level

    For Explicit Locking on Row Based Level Solution, we use default isolation level, which is read Committed Isolation Level

    type ForUpdate struct{}
    
    func (f ForUpdate) Do(ctx context.Context, threadId, userId string) error {
    	return f.readModifyWriteReactionToThreadId(ctx,
    		pgx.TxOptions{},
    		threadId,
    		userId,
    	)
    }
    func (ForUpdate) readModifyWriteReactionToThreadId(
    	ctx context.Context,
    	txOpt pgx.TxOptions,
    	threadId,
    	userId string,
    ) error {
    	return db.Atomic(ctx, txOpt, func(tx db.Connection) error {
    		thread, err := repository.GetThread(ctx, tx,
    			threadId,
    			repository.GetThreadOption{
    				ForUpdate: true,
    			},
    		)
    		if err != nil {
    			return err
    		}
    
    		_, err = repository.GetAccount(ctx, tx, userId)
    		if err != nil {
    			return err
    		}
    
    		err = repository.CreateReaction(ctx, tx, repository.Reaction{
    			Id:        helper.ReactionId(),
    			AccountId: userId,
    			ThreadId:  &threadId,
    			Content:   "like",
    			CreatedOn: time.Now(),
    			Version:   1,
    		})
    		if err != nil {
    			return err
    		}
    
    		thread.TotalReaction++
    		return repository.UpdateThread(ctx, tx, thread)
    	})
    }
    
  • Automatically Detecting Lost Updates

    For Repeatable Read that automatically detect the Lost Update, we can just implemented retry on the application level and forget about the lock, whenever errors code is 40001, we retry it until retry counter reach maximum and return error max retry, so implementation retriable Transaction is becoming like this

    const maxRetry = 5
    
    func AtomicWithAutoRetry(
    	ctx context.Context,
    	txOpt pgx.TxOptions,
    	cb func(tx Connection) error,
    ) error {
    	return transaction(ctx, txOpt, cb, maxRetry)
    }
    
    var ErrLimitRetry = errors.New("retry limit exceeded!")
    
    func transaction(
    	ctx context.Context,
    	txOpt pgx.TxOptions,
    	cb func(tx Connection) error,
    	retry int,
    ) error {
    	if retry < 0 {
    		return ErrLimitRetry
    	}
    
    	conn, err := GetConnection(ctx)
    	if err != nil {
    		return err
    	}
    	defer conn.Release()
    
    	tx, err := conn.BeginTx(ctx, txOpt)
    	if err != nil {
    		return err
    	}
    
    	err = cb(tx)
    	if err != nil {
    		if rbErr := tx.Rollback(ctx); rbErr != nil {
    			return fmt.Errorf("cannot rollback %w: %w", rbErr, err)
    		}
    		var pgErr *pgconn.PgError
    		if errors.As(err, &pgErr) && pgErr.Code == "40001" {
    			return transaction(ctx, txOpt, cb, retry-1)
    
    		}
    		return err
    
    	}
    
    	return tx.Commit(ctx)
    
    }
    

    And the implementation is becoming more simple

    type RepeatableRead struct{}
    
    func (r RepeatableRead) Do(
    	ctx context.Context,
    	threadId,
    	userId string,
    ) error {
    	return r.readModifyWriteReactionToThreadId(
    		ctx,
    		pgx.TxOptions{
    			IsoLevel: pgx.RepeatableRead,
    		},
    		threadId,
    		userId,
    	)
    }
    
    func (RepeatableRead) readModifyWriteReactionToThreadId(
    	ctx context.Context,
    	txOpt pgx.TxOptions,
    	threadId,
    	userId string,
    ) error {
    	return db.AtomicWithAutoRetry(ctx, txOpt, func(tx db.Connection) error {
    		thread, err := repository.GetThread(ctx, tx, threadId)
    		if err != nil {
    			return err
    		}
    
    		_, err = repository.GetAccount(ctx, tx, userId)
    		if err != nil {
    			return err
    		}
    
    		err = repository.CreateReaction(ctx, tx, repository.Reaction{
    			Id:        helper.ReactionId(),
    			AccountId: userId,
    			ThreadId:  &threadId,
    			Content:   "like",
    			CreatedOn: time.Now(),
    			Version:   1,
    		})
    		if err != nil {
    			return err
    		}
    
    		thread.TotalReaction++
    		return repository.UpdateThread(ctx, tx, thread)
    	})
    }
    
  • Compare and Set

    In the Compare and Set, we don’t use transaction at all to following with premise distributed system that transaction is not available, so we retry with manual abort

    type CompareAndSet struct{}
    
    func (c CompareAndSet) Do(
    	ctx context.Context,
    	threadId,
    	userId string,
    ) error {
    	return c.readModifyWriteReactionToThreadId(ctx, threadId, userId)
    }
    
    func (CompareAndSet) readModifyWriteReactionToThreadId(
    	ctx context.Context,
    	threadId,
    	userId string,
    ) error {
    	conn, err := db.GetConnection(ctx)
    	if err != nil {
    		return err
    	}
    
    	_, err = repository.GetAccount(ctx, conn, userId)
    	if err != nil {
    		return err
    	}
    	conn.Release()
    
    	return db.RetryMatchAndSet(ctx, func(conn db.Connection) error {
    		thread, err := repository.GetThread(ctx, conn, threadId)
    		if err != nil {
    			return err
    		}
    
    		reactionId := helper.ReactionId()
    		err = repository.CreateReaction(ctx, conn, repository.Reaction{
    			Id:        reactionId,
    			AccountId: userId,
    			ThreadId:  &threadId,
    			Content:   "like",
    			CreatedOn: time.Now(),
    			Version:   1,
    		})
    		if err != nil {
    			//in a production environtment we need to deleteReaction in case its already created but failed to send a success response back
    			// _ = repository.DeleteReaction(ctx, conn, reactionId)
    			return err
    		}
    
    		thread.TotalReaction++
    		err = repository.UpdateThread(ctx, conn, thread, repository.UpdateThreadOption{
    			CompareAndSet: &repository.CompareAndSetOption{
    				Version: thread.Version,
    			},
    		})
    		if err != nil {
    			//in a production environtment we need to rollback UpdateThread to its previous State in case its already updated but failed to send a success response back
    			// _ = repository.UpdateThread(ctx, conn, oldThread)
    			_ = repository.DeleteReaction(ctx, conn, reactionId)
    
    			return err
    		}
    
    		return nil
    	})
    }
    

    The RetryMatchAndSet will retry the callback whenever the ErrVersionMismatch is returned

    const maxRetry = 5
    var ErrVersionMisMatch = errors.New("version mismacth, must retry")
    
    func RetryMatchAndSet(ctx context.Context, cb func(conn Connection) error) error {
    	conn, err := GetConnection(ctx)
    	if err != nil {
    		return err
    	}
    	defer conn.Release()
    
    	var errToTry error
    
    	retryCount := 0
    	for retryCount < maxRetry {
    		errToTry = cb(conn)
    		if errToTry != nil && !errors.Is(errToTry, ErrVersionMisMatch) {
    			return errToTry
    		}
    
    		retryCount++
    	}
    
    	if retryCount == maxRetry {
    		return ErrLimitRetry
    	}
    
    	return nil
    }
    

    After that, we do testing to check all of those method

    type Reaction interface {
    	Do(ctx context.Context, threadId, userId string) error
    }
    
    func TestReactionCounter(t *testing.T) {
    	ctx := context.Background()
    	tests := []struct {
    		name     string
    		reaction Reaction
    	}{
    		{
    			name:     "locking",
    			reaction: lostupdatebenchmark.ForUpdate{},
    		},
    		{
    			name:     "repeatable read",
    			reaction: lostupdatebenchmark.RepeatableRead{},
    		},
    		{
    			name:     "compare and set",
    			reaction: lostupdatebenchmark.CompareAndSet{},
    		},
    	}
    	for _, tt := range tests {
    		userId, err := helper.CreateUser()
    		require.NoError(t, err)
    		threadId, err := helper.CreateThread(userId)
    		require.NoError(t, err)
    		t.Run(tt.name, func(t *testing.T) {
    			start := time.Now()
    			err := addConcurentReaction(ctx, tt.reaction, 100, threadId)
    			if err != nil {
    				log.Println(err)
    			}
    
    			c, err := db.GetConnection(ctx)
    			require.NoError(t, err)
    			defer c.Release()
    
    			thread, err := repository.GetThread(ctx, c, threadId)
    			require.NoError(t, err)
    
    			assert.Equal(t, 100, thread.TotalReaction)
    			fmt.Println("execution time: ", time.Since(start))
    		})
    	}
    }
    
    func addConcurentReaction(ctx context.Context, cb Reaction, concurentUser int, threadId string) error {
    
    	newUserIds := make([]string, 0, concurentUser)
    	for i := 0; i < concurentUser; i++ {
    		userId, err := helper.CreateUser()
    		if err != nil {
    			return err
    		}
    		newUserIds = append(newUserIds, userId)
    	}
    
    	var wg sync.WaitGroup
    	errs := make([]error, concurentUser)
    	for i := 0; i < concurentUser; i++ {
    		wg.Add(1)
    		go func(i int) {
    			defer wg.Done()
    			errs[i] = cb.Do(ctx, threadId, newUserIds[i])
    		}(i)
    	}
    	wg.Wait()
    
    	for _, err := range errs {
    		if err != nil {
    			log.Println(err)
    		}
    	}
    
    	return nil
    }
    

    The returned on those test is:

    Lost Update Result 1 Lost Update Result 2

    The Lock Row Using SELECT ... FOR UPDATE Method is has 100 row inserted and has throughput 424ms.

    The Automatically Lost Update Detected Method has 93 row inserted and has throughput 325ms.

    The Compare And Set method perform the worse with 18 row inserted and has throughput 367ms.

    So we can conclude that Lock Row Method has good consistency, but has the worse throughput than Automatically Lost Update Detected method and vice versa. Compare And Set method is good enough when you don’t have a transaction. After those benchmark, we know about the trade-off, so use your own poison.

You can see all the benchmark in my repo


I thinks that its guys, i hope you can learn from the mistake that I've made and know about its trade-off between the Isolation level and Lost Update method

References: