Rocksdb 的 WAL实现 底层探索

您所在的位置:网站首页 wal的作用和机制 Rocksdb 的 WAL实现 底层探索

Rocksdb 的 WAL实现 底层探索

2023-08-27 03:17| 来源: 网络整理| 查看: 265

作为通用的单机存储引擎,一个基本的crash safe功能是需要提供的,用来保证异常时的数据一致性。像我们rocksdb所在节点出现断电异常,节点死机 等情况,内存中的数据是会丢失的。此时,需要rocksdb提供一种机制,能够在这种异常情况下尽可能挽回多的数据。

业界通用的解决方案就是WAL (write ahead log),接下来通过Rocksdb 的WAL实现来探索一下 WAL怎么能够保证数据的crash safe。 在这里插入图片描述

1. 概览

上层应用针对rocksdb的每一次更新在同步流程之上会存放在两个地方,一个是rocksdb实现的内存数据结构 memtable,另一个是在磁盘的WAL。这里为什么没有说 SST呢,是因为同步流程上只会先写入wal,再写入memtable,具体将数据写入sst则是通过异步的flush和compaction 后台线程进行的。

WAL主要作用是用来恢复发生 rocksdb非优雅退出(节点断电,死机) 时 memtable中的未commited中的数据。 所以WAL 的写入需要优先于memtable,且每一次写入都需要flush ,这也是write head的由来。

ps: 下文中会提到一些缩写,这里提前声明一下 cf – column family 列族,将key-value逻辑隔离 mem – memtable 内存数据结构,在内存中保存key-value数据 imm – immutable memtable 只读的memtable,memtable写入到配置的大小之后会切换为imm,进行后台flush,同时创建一个新的mem接收IO

2. 创建

rocksdb wal的创建有两种情况:

Open db 时 会创建s = rocksdb::DB::Open(options, "./db/", &db); 当一个Column Family flush的时候会创建 db->Flush(rocksdb::FlushOptions(), handles[1]); 当wal文件大小达到了max_total_wal_size时,会重新创建新的wal,不过这里的创建是会和cf的memtable flush一块进行 如果没有配置该选项,则会将wal可写入的大小定为memtable的4倍if (UNLIKELY(status.ok() && !single_column_family_mode_ && total_log_size_ > GetMaxTotalWalSize())) { WaitForPendingWrites(); status = SwitchWAL(write_context); }

看看代码是怎么实现的,Open db的时候:

在恢复完MANIFEST之后,需要先获取即将创建的WAL 文件编号,即xxx.log为wal文件分配预分配 一个memtale的buffer大小 max_write_buffer_size创建wal文件 Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, const std::vector& column_families, std::vector* handles, DB** dbptr, const bool seq_per_batch, const bool batch_per_txn) { ...... // NewFileNumber 会在manifest中记录的next_file_number_ 基础上加一 uint64_t new_log_number = impl->versions_->NewFileNumber(); log::Writer* new_log = nullptr; const size_t preallocate_block_size = impl->GetWalPreallocateBlockSize(max_write_buffer_size); s = impl->CreateWAL(new_log_number, 0 /*recycle_log_number*/, preallocate_block_size, &new_log);

当column family flush的时候,通过DBImpl::Flush 调用对应cf的memtable flush函数,在flush memtable的过程中进行新的wal的创建。 这里当触发cf的flush时,需要将内存中memtable 标记为imutable-memetable,来进行后台的写入sst文件;同时会生成新的memtable,这个时候wal记录的是旧的memtable的请求,为了数据的隔离性,且wal不会过大,每个wal文件只和一个memtable绑定,所以切换memtable的过程中会创建新的wal文件,用来接收新的请求。

Status DBImpl::Flush(const FlushOptions& flush_options, ColumnFamilyHandle* column_family) { ... // 主要就是flush memtable s = FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualFlush); ... } Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& flush_options, FlushReason flush_reason, bool writes_stopped) { ... // 切换memtable s = SwitchMemtable(cfd, &context); ... } Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { ...... //前面获取到需要创建的wal 文件编号,预分配足够的wal存储空间,那么接下来开始进行对应的文件创建 if (creating_new_log) { // TODO: Write buffer size passed in should be max of all CF's instead // of mutable_cf_options.write_buffer_size. s = CreateWAL(new_log_number, recycle_log_number, preallocate_block_size, &new_log); } } 3. 更新

这里通过一些测试代码,对flush时 wal的更新进行更加详细的描述。 wal的更新主要是上面 创建过程说到的后两种情形,触发cf的flush和达到wal文件大小限制。 此时

新的cf中的数据会刷新到新的SST 文件之中,即SwitchMemtable,将当前mem变为imm,由imm后台flush创建一个新的wal文件,之后所有cf 中的 write请求会优先写入新的wal之中。在SwitchMemtable 调用CreateWAL旧的wal文件会被标记为不可写入,并在之后删除。这个过程是在CreateWAL 函数中进行

一个简单的测试demo如下,完整代码会补充在之后。 先打开一个db,向两个colum family中分别写入key-value,再触发一次flush一个cf。

先做一个预期结果的评估: 这个时候会存在两个wal文件,一个是open时候创建的,另一个是flush的时候创建的,还会有一个sst文件,因为我们只flush了一次且数据量较小。 因为flush是切换memtale,创建新的wal文件,所以第二个wal文件应该是空的,数据记录保存在第一个wal文件。因为只flush一个cf1,另一个cf0的数据还在内存中,所以第一个wal文件是不能被删除的。

s = rocksdb::DB::Open(options, "./db", column_families, &handles, &db); cout Put(rocksdb::WriteOptions(), handles[0], rocksdb::Slice("key5"), rocksdb::Slice("value5")); db->Put(rocksdb::WriteOptions(), handles[0], rocksdb::Slice("key6"), rocksdb::Slice("value6")); db->Flush(rocksdb::FlushOptions(), handles[0]);

查看文件列表 总共flush了两次,会有两个sst文件。只剩下了一个flush之后的 log文件了。 在这里插入图片描述 查看具体内容如下:

# log文件是空的 ▶ ./ldb dump_wal --walfile=./db/000014.log --header Sequence,Count,ByteSize,Physical Offset,Key(s) -------------------------------------------------- # 第一个sst文件保存的是cf1的数据 ▶ ./sst_dump --file=./db/000013.sst --command=scan from [] to [] Process ./db/000013.sst Sst file format: block-based 'key1' seq:1, type:1 => value1 'key3' seq:3, type:1 => value3 -------------------------------------------------- # 第二个sst文件保存的是cf2的数据 ▶ ./sst_dump --file=./db/000015.sst --command=scan from [] to [] Process ./db/000015.sst Sst file format: block-based 'key2' seq:2, type:1 => value2 'key4' seq:4, type:1 => value4 'key5' seq:5, type:1 => value5 'key6' seq:6, type:1 => value6

通过以上的简单测试,再结合对应的源码,我们基本清楚了wal文件的切换更新过程。

open db的时候会创建一个wal文件flush memtable的时候会创建一个wal文件,当旧wal文件中有未flush的cf的数据时不会被删除,直到所有cf的数据都被flush到sst文件之中才会被删除。 这是正常场景保证数据一直性的wal实现,后文还会继续描述更多的WAL recovery模式。

完整测试代码如下:

#include #include #include #include #include #include #include #include #include using std::cout; using std::endl; int main () { rocksdb::DB* db; rocksdb::Options options; rocksdb::Status s; std::vector column_families; column_families.push_back(rocksdb::ColumnFamilyDescriptor( rocksdb::kDefaultColumnFamilyName, rocksdb::ColumnFamilyOptions())); column_families.push_back(rocksdb::ColumnFamilyDescriptor( "new_cf", rocksdb::ColumnFamilyOptions())); std::vector handles; options.create_if_missing = true; options.max_open_files = -1; s = rocksdb::DB::Open(options, "./db/", &db); // create column family rocksdb::ColumnFamilyHandle* cf; s = db->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), "new_cf", &cf); assert(s.ok()); // close DB s = db->DestroyColumnFamilyHandle(cf); assert(s.ok()); delete db; s = rocksdb::DB::Open(options, "./db", column_families, &handles, &db); cout Flush(rocksdb::FlushOptions(), handles[0]); delete db; return 0; }

以上代码我是在mac上测试的,可以通过以下命令编译运行 g++ -std=c++11 rocksdb_wal_test.cc -lrocksdb

4. recovery mode

rocksdb 支持可以调整的4种 wal recovery模式

kAbsoluteConsistency 这种级别是对一致性要求最高的级别,不允许有任何的IO错误,不能出现一个record的丢失。kTolerateCorruptedTailRecords 这个级别是允许丢失一部分数据,会忽略一些在wal末尾写入失败的请求,数据异常仅限于log文件末尾写入失败。如果出现了其他的异常,都无法进行数据重放。kPointInTimeRecovery 这个级别也是现在rocksdb默认的recovery mode,当遇到IO error的时候会停止重放,将出现异常之前的所有数据进行完成重放。kSkipAnyCorruptedRecords 这个级别是一致性要求最低的,会忽略所有的IO error,尝试尽可能多得恢复数据。

还记得之前搞CEPH时被OSD支配的恐惧,无数的rocksdb corruption,当时如果对rocksdb有足够多的理解,那么丢失的数据应该会少很多,也会有更多的时间来睡觉了。

在这里插入图片描述

详细的mode声明如下:

enum class WALRecoveryMode : char { // Original levelDB recovery // We tolerate incomplete record in trailing data on all logs // Use case : This is legacy behavior kTolerateCorruptedTailRecords = 0x00, // Recover from clean shutdown // We don't expect to find any corruption in the WAL // Use case : This is ideal for unit tests and rare applications that // can require high consistency guarantee kAbsoluteConsistency = 0x01, // Recover to point-in-time consistency (default) // We stop the WAL playback on discovering WAL inconsistency // Use case : Ideal for systems that have disk controller cache like // hard disk, SSD without super capacitor that store related data kPointInTimeRecovery = 0x02, // Recovery after a disaster // We ignore any corruption in the WAL and try to salvage as much data as // possible // Use case : Ideal for last ditch effort to recover data or systems that // operate with low grade unrelated data kSkipAnyCorruptedRecords = 0x03, };

以上配置,可以通过option选项Options.wal_recovery_mode = 2来进行对应模式的配置

Talk is cheap!!!

接下来我们仔细看看每一种recovery mode是如何实现各自的recovery 级别的。

在Open db调用Recover的时候,如果db已经存在,会尝试从已经存在的db中的log文件 恢复上一次db的memtable数据 详细的过程是:

取存在的每一个log文件,循环进行如下操作创建一个wal文件的file_reader创建一个利用file_reader创建一个log reader,并用 wal_recovery_mode 参与到log reader的初始化在具体的ReadRecord 中进行对应的recovery mode区分 Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, SequenceNumber* next_sequence, bool read_only) { ...... // 创建一个file reader std::unique_ptr file_reader; { std::unique_ptr file; status = fs_->NewSequentialFile(fname, fs_->OptimizeForLogRead(file_options_), &file, nullptr); ...... // 创建一个log reader log::Reader reader(immutable_db_options_.info_log, std::move(file_reader), &reporter, true /*checksum*/, log_number); ...... while (!stop_replay_by_wal_filter && reader.ReadRecord(&record, &scratch, immutable_db_options_.wal_recovery_mode) && status.ok()) { if (record.size() while (true) { uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size(); size_t drop_size = 0; // 会从物理的record中读取数据,将读取过程中发生的异常返回给record_type const unsigned int record_type = ReadPhysicalRecord(&fragment, &drop_size); ...... //接下来通过不同的recovery mode来针对异常的record_type 进行处理,决定是否需要返回数据异常 //当出现 header,eof(log末尾),kOldRecord(和eof异常类似)异常时都会 确认recovery mode是否是 kAbsoluteConsistency //是的话直接report异常 case kBadHeader: if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) { // in clean shutdown we don't expect any error in the log files ReportCorruption(drop_size, "truncated header"); } ...... case kEof: if (in_fragmented_record) { if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) { // in clean shutdown we don't expect any error in the log files ReportCorruption(scratch->size(), "error reading trailing data"); } ...... case kOldRecord: if (wal_recovery_mode != WALRecoveryMode::kSkipAnyCorruptedRecords) {//如果是跳过所有异常的话,不会report corruption // Treat a record from a previous instance of the log as EOF. if (in_fragmented_record) { if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) { // in clean shutdown we don't expect any error in the log files ReportCorruption(scratch->size(), "error reading trailing data"); } // This can be caused by the writer dying immediately after // writing a physical record but before completing the next; don't // treat it as a corruption, just ignore the entire logical record. scratch->clear(); } return false; }

我们再回到RecoverLogFiles 函数之中,在处理完Record之后,后续还会有将重放后的数据写入memtable的过程。 当完成所有的读取和写入memtable之后,会对所有的返回状态进行确认,因为之上所有的操作都是在重放wal,可能会有失败的情况。

接下来进入如下逻辑,根据重放的状态进行后续的recovery 操作

如果mode是 kSkipAnyCorruptedRecords ,则跳过所有的异常,直接返回OK如果mode是 kPointInTimeRecovery 或者 kTolerateCorruptedTailRecords 则会暂停处理,将stop_replay_for_corruption 置为true, 后续会跳过当前的log number进行重放如果是kAbsoluteConsistency 或者 kTolerateCorruptedTailRecords 则直接返回 if (immutable_db_options_.wal_recovery_mode == WALRecoveryMode::kSkipAnyCorruptedRecords) { // 如果rev // We should ignore all errors unconditionally status = Status::OK(); } else if (immutable_db_options_.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) { // We should ignore the error but not continue replaying status = Status::OK(); stop_replay_for_corruption = true; corrupted_log_number = log_number; ROCKS_LOG_INFO(immutable_db_options_.info_log, "Point in time recovered to log #%" PRIu64 " seq #%" PRIu64, log_number, *next_sequence); } else { assert(immutable_db_options_.wal_recovery_mode == WALRecoveryMode::kTolerateCorruptedTailRecords || immutable_db_options_.wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency); return status; } // 主要是对kPointInTimeRecovery 和 kTolerateCorruptedTailRecords进行恢复处理 // 将corrupted_log_number 之前所有数据完成恢复,再报告异常 if (stop_replay_for_corruption == true && (immutable_db_options_.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery || immutable_db_options_.wal_recovery_mode == WALRecoveryMode::kTolerateCorruptedTailRecords)) { for (auto cfd : *versions_->GetColumnFamilySet()) { if (cfd->GetLogNumber() > corrupted_log_number) { ROCKS_LOG_ERROR(immutable_db_options_.info_log, "Column family inconsistency: SST file contains data" " beyond the point of corruption."); return Status::Corruption("SST file is ahead of WALs"); } } } 5. WAL 的写入

这里再补充以下wal文件是如何在写入memtable之前写入的,会如何更新。

还是通过用户接口Put,该接口会调用到底层的Status DBImpl::WriteImpl,在该函数中,如果我们开启来pipeline,则会走pipeline的写入逻辑。如果未开启pipeline,则会正常写入。

WriteToWAL的函数入口如下:

status = WriteToWAL(write_group, log_writer, log_used, need_log_sync, need_log_dir_sync, last_sequence + 1);

因为实际写入的过程中可能多个客户端会并发调用put,每一个Put都会有自己的writer,为了提高写性能,会从多个writer选出来一个leader,让这个leader将所有 writer要写的wal收集到一块,进行batch写入,其他从writer等待leader写完之后再并发写memtable。

所以以上WriteToWAL内部调用了一个重载 了batch写的wal函数

WriteBatch* merged_batch = MergeBatch(write_group, &tmp_batch_, &write_with_wal, &to_be_cached_state); ...... uint64_t log_size; status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size); if (to_be_cached_state) { cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_empty_ = false; }

进入到实际执行写入的WriteToWAL函数,会调用AddRecord函数执行实际的write,这里会用到文件系统的write接口,选择的文件系统是在db open的时候 进行DBImpl类初始化,根据env传入的参数进行文件系统的选择, 默认是PosixFileSystem

std::shared_ptr FileSystem::Default() { static PosixFileSystem default_fs; static std::shared_ptr default_fs_ptr( &default_fs, [](PosixFileSystem*) {}); return default_fs_ptr; }

在AddRecord函数中将之前合并好的 log_enery,每次写入大小不能超过11bytes,这个是wal的具体约定的格式了。通过文件系统接口进行Append写入,直到把log_entry完全写入。

Status Writer::AddRecord(const Slice& slice) { ...... do { ...... s = dest_->Append(Slice("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", static_cast(leftover))); ...... }while(s.ok() && left > 0); ...... }

因为我们没有配置direct,则最终通过 Flush -->WriteBuffered–> PosixWritableFile::Append --> PosixWrite --> write 直到底层的 write系统调用,进行写入。 其实这个写入链路还是比较长,不过耗时的话中间仅仅是做一些字符串的拼接,到write这里才是耗时所在。因为,还要走一遍庞大的内核文件系统的写入链路,这里也能够理解为什么ceph需要单独再做一个小型的文件系统来代替内核文件系统了。

写WAL是我们整个rocksdb写入最为耗时的一段,memtable的写入是在wal写完成之后才能写入,而memtable都是纯内存操作,所以耗时还是消耗在了写WAL之上,在传统的xfs之上,一个200bytes的请求,写WAL耗时大概2-4us。

为了保证WAL能够落盘,我们还需要配置options.sync=true,在WriteToWAL函数中,完成了文件系统的写入之后 会调用fsync来进行 sync写。

通过如下简单的stap脚本,可以非常方便得抓取rocksdb内部函数耗时:

#!/bin/stap global sends #打印出来的单位是微妙 probe process("test").function("rocksdb::DBImpl::GetImpl").return { sends


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3