@@ -157,6 +157,12 @@ class TestReplicatedDB : public homestore::ReplDevListener {
157157 void on_rollback (int64_t lsn, const sisl::blob& header, const sisl::blob& key,
158158 cintrusive< repl_req_ctx >& ctx) override {
159159 LOGINFOMOD (replication, " [Replica={}] Received rollback on lsn={}" , g_helper->replica_num (), lsn);
160+ {
161+ std::unique_lock lk (db_mtx_);
162+ rollback_count_++;
163+ }
164+ // continue the test
165+ if (ctx->is_proposer ()) { g_helper->runner ().next_task (); }
160166 }
161167
162168 void on_restart () {
@@ -364,6 +370,11 @@ class TestReplicatedDB : public homestore::ReplDevListener {
364370 return commit_count_;
365371 }
366372
373+ uint64_t db_rollback_count () const {
374+ std::shared_lock lk (db_mtx_);
375+ return rollback_count_;
376+ }
377+
367378 uint64_t db_size () const {
368379 std::shared_lock lk (db_mtx_);
369380 return inmem_db_.size ();
@@ -391,6 +402,7 @@ class TestReplicatedDB : public homestore::ReplDevListener {
391402 std::map< Key, Value > inmem_db_;
392403 std::map< int64_t , Value > lsn_index_;
393404 uint64_t commit_count_{0 };
405+ uint64_t rollback_count_{0 };
394406 std::shared_mutex db_mtx_;
395407 std::shared_ptr< snapshot_context > m_last_snapshot{nullptr };
396408 std::mutex m_snapshot_lock;
@@ -449,6 +461,22 @@ class RaftReplDevTest : public testing::Test {
449461
450462 void wait_for_all_commits () { wait_for_commits (written_entries_); }
451463
464+ uint64_t total_committed_cnt () {
465+ uint64_t total_writes{0 };
466+ for (auto const & db : dbs_) {
467+ total_writes += db->db_commit_count ();
468+ }
469+ return total_writes;
470+ }
471+
472+ uint64_t total_rollback_cnt () {
473+ uint64_t total_rollback{0 };
474+ for (auto const & db : dbs_) {
475+ total_rollback += db->db_rollback_count ();
476+ }
477+ return total_rollback;
478+ }
479+
452480 void wait_for_commits (uint64_t exp_writes) {
453481 uint64_t total_writes{0 };
454482 while (true ) {
@@ -522,27 +550,47 @@ class RaftReplDevTest : public testing::Test {
522550 LOGINFO (" Waiting for leader to be elected" );
523551 std::this_thread::sleep_for (std::chrono::milliseconds{500 });
524552 } else if (leader_uuid == g_helper->my_replica_id ()) {
525- LOGINFO (" Writing {} entries since I am the leader my_uuid={}" , num_entries,
526- boost::uuids::to_string (g_helper->my_replica_id ()));
553+ // LEADER ROLE
554+ auto batch_size = wait_for_commit ? g_helper->runner ().qdepth_ * 10 : num_entries;
555+ // cap batch_size but should be larger than QD.
556+ // It is possible after leader switch the writes run on previous leader will fail
557+ // so we need to do more IOs to have num_entries committed.
558+ if (batch_size > num_entries - written_entries_)
559+ batch_size = std::max (num_entries - written_entries_, g_helper->runner ().qdepth_ );
560+ LOGINFO (" Writing {} entries since I am the leader my_uuid={}, target_total {}, written {}" , batch_size,
561+ boost::uuids::to_string (g_helper->my_replica_id ()), num_entries, written_entries_);
527562 auto const block_size = SISL_OPTIONS[" block_size" ].as < uint32_t >();
528- g_helper->runner ().set_num_tasks (num_entries);
529-
563+ g_helper->runner ().set_num_tasks (batch_size);
530564 LOGINFO (" Run on worker threads to schedule append on repldev for {} Bytes." , block_size);
531565 g_helper->runner ().set_task ([this , block_size, db]() {
532566 static std::normal_distribution<> num_blks_gen{3.0 , 2.0 };
533567 this ->generate_writes (std::abs (std::lround (num_blks_gen (g_re))) * block_size, block_size, db);
534568 });
535- if (wait_for_commit) { g_helper->runner ().execute ().get (); }
536- break ;
569+ written_entries_ += batch_size;
570+ if (wait_for_commit) {
571+ g_helper->runner ().execute ().get ();
572+ if (total_committed_cnt () >= num_entries) { break ; }
573+ } else {
574+ if (written_entries_ >= num_entries) { break ; }
575+ }
537576 } else {
538- LOGINFO (" {} entries were written on the leader_uuid={} my_uuid={}" , num_entries,
577+ // FOLLOWER ROLE
578+ LOGINFO (" {} entries are expected to be written on the leader_uuid={}, my_uuid={}" , num_entries,
539579 boost::uuids::to_string (leader_uuid), boost::uuids::to_string (g_helper->my_replica_id ()));
540- break ;
580+ if (wait_for_commit) {
581+ LOGINFO (" {} entries are expected to be written, now I committed {}, my_uuid={}" , num_entries,
582+ total_committed_cnt (), boost::uuids::to_string (leader_uuid),
583+ boost::uuids::to_string (g_helper->my_replica_id ()));
584+ if (total_committed_cnt () >= num_entries) { break ; }
585+ std::this_thread::sleep_for (std::chrono::milliseconds{5000 });
586+ } else {
587+ break ;
588+ }
541589 }
542590 } while (true );
543-
544- written_entries_ += num_entries;
545- if (wait_for_commit) { this -> wait_for_all_commits (); }
591+ LOGINFO ( " my_uuid={}, {} entries are expected to be written, I wrote {}, committed {}, rollback {} " ,
592+ boost::uuids::to_string (g_helper-> my_replica_id ()), num_entries, written_entries_,
593+ total_committed_cnt (), total_rollback_cnt ());
546594 }
547595
548596 void remove_db (std::shared_ptr< TestReplicatedDB > db, bool wait_for_removal) {
0 commit comments