RocksDB的Compaction/Flush分析
RocksDB关键流程
Flush流程
后台Flush作业
- 调度Flush作业流程与compaction类似:
DBImpl::BGWorkFlush–>DBImpl::BackgroundCallFlush–>DBImpl::BackgroundFlush–>DBImpl::FlushMemTablesToOutputFiles–>DBImpl::FlushMemTableToOutputFile DBImpl::BGWorkFlush(void* arg): 开启后台Flush作业- 转发到
DBImpl::BackgroundCallFlush(Env::Priority thread_pri)- 调用
DBImpl::BackgroundFlush()- 调用
DBImpl::FlushMemTablesToOutputFiles()- 调用
DBImpl::FlushMemTableToOutputFile()
- 调用
- 调用
- 调用
- 检查状态
- 调用
DBImpl::MaybeScheduleFlushOrCompaction()
- 转发到
Flush作业主控流程
DBImpl::FlushMemTableToOutputFile: 将Memtable刷盘, 核心流程- 创建
FlushJob flush_job - 调用成员方法
FlushJob::PickMemTable(), 选择需要刷盘的memtable- 获取该列族的imm_(imutable memtable)列表, 调用
MemTableList::PickMemtablesToFlush(...)- 遍历
current_->memlist_, 将未flush的(!flush_in_grogress_)的memtable加入到mems_中,mems_中按时间升序排列 - 不管
mems_中有多少个memtable, 均有第一个memtable的edit_来记录该次flush的元数据
- 遍历
- 获取该列族的imm_(imutable memtable)列表, 调用
- 调用成员方法
FlushJob::Run(...), 启动刷盘- 首先会检查是否需要
MemPurge(), 即Memtable垃圾回收将Immutable Memtable中过期内容清除, 将有效数据保存到新的memtable中, 旨在减少对SSD的写操作, 并降低写放大
- 执行
FlushJob::WriteLevel0Table(), 执行刷盘memtables保存所有mm的迭代器- 设定
TableBuilderOptions tboptions - 调用
BuildTable(), 生成L0层SSTable - 调用
AddFile()将生成文件加入当前edit_, 相当于注册到LSM树的L0层 - 调用
FlushJob::GetFlushJobInfo()获得作业详细信息(包括file_path, table_properties), 并设置mems_队列的首个memtable
- 如果需要写最新版本MANIFEST, 会调用
MemtableList::TryInstallMemtableFlushResults(...)
- 首先会检查是否需要
- 安装新版本
DBImpl::InstallSuperVersionAndScheduleWork - 通知完成Flush, 在
SstFileManagerImpl注册生成的SSTable对象
- 创建
BuildTable工作流程
- 调用
NewWritableFile申请可写文件file, 并设定IO优先级和LifetimeHint - 设置
WritableFileWriter, 并调用NewTableBuilder, 与CompactionJob::OpenCompactionOutputFile()类似 - 设置
MergeHelper和用于KV迭代的CompactionIterator - 迭代
c_iter, 调用TableBuilder的Add(key, value)将KV对加入l0层的SSTable中 - 更新待刷新第一个Memtable的
FileMetaData(保存整个FlushJob元数据)和入参TableProperties(生成L0层SSTable元数据) - 构建
OutputValidator和InternalIterator检查新生成SSTable的有效性
Compactoion流程
调度Flush/Compaction作业
DBImpl::InstallSuperVersionAndScheduleWork: 所有列族的状态通过该函数改变, 该方法分析列族的新状态, 并决定是否需要flush/compaction- 调用
cfs->InstallSuperVersion(...) - 调用
SchedulePendingCompaction(cfd): - 调用
MaybeScheduleFlushOrCompaction
- 调用
DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd):- 如果当前列族未在
compaction_queue_中, 且NeedsCompaction()返回需要, 则通过AddToCompactionQueue(cfd)加入等待调度的Compaction队列 cfd->NeedCompaction()会检查是否开启了自动compaction, 并转发至compaction_picker->NeedCompaction(current_-storage_info())- 完成后调用
AddToCompactionQueue(cfd), 增加待调度的Compaction作业数
- 如果当前列族未在
class CompactionPicker:db/compaction/compaction_picker.h- 用于执行compaction文件选择的抽象类, RocksDB提供了
compaction_picker_level/compaction_picker_fifo/compaction_picker_universal - 默认使用
LevelCompactionPicker, 如下- 看是否需要Compaction接口
LevelCompactionPicker::NeedsCompaction()- 如果有超时TTL/被标记为Compaction的则返回Ture
- 自顶向下获取(已被排序)CompactionScore, 有大于1返回Ture, CompactionScore事先由VersionStorageInfo::ComputeCompactionScore计算
VersionStorageInfo::ComputeCompactionScore逻辑- 如果为L0: 统计该层文件数/文件大小, 得分=文件数/预先设定的阈值
- L1开始: 未参与Compaction文件总大小/设定阈值,
- 将compaction_score从高到低排序, 高分在list前部
- 更新TTL队列, 周期Compaction队列等, 这些均在NeedCompaction中首先被判断
LevelCompactionPicker::PickCompaction(...): 构建LevelCompactionBuilder并调用builder.PickCompaction(), 返回一个CompactionLevelCompactionBuilder::PickCompaction()流程如下LevelCompactionBuilder::SetupInitialFiles(): 对所有大于阈值的层调用PickFileToCompact,获得CompactionInputFiles,并确定CompactionReasonstruct CompactionInputFiles, 位于db/compaction/compaction.h- 维护Compaction输入文件数组
vector<FileMetaData*> files - 维护compaction边界
std::vector<AtomicCompactionUnitBoundary> atomic_compaction_unit_boundaries, 即InternalKey的最大值和最小值
- 维护Compaction输入文件数组
LevelCompactionBuilder::SetupOtherL0FilesIfNeeded()LevelCompactionBuilder::SetupOtherInputsIfNeeded()LevelCompactionBuilder::GetCompaction()
- 看是否需要Compaction接口
- 用于执行compaction文件选择的抽象类, RocksDB提供了
DBImpl::MaybeScheduleFlushOrCompaction():- 获取后台作业数(
DBImpl::GetBGJobLimits()) - 获取flush_pool是否为空, 即
env_->GetBackgroundThreads(Env::Priority::HIGH) == 0 - 如果flush_pool不空, 且未达到job上限, 在while循环中调度高优先级的
BGWorkFlush - 如果flush_pool为空, 则调度低优先级的
BGWorkFlush - 如果bg_compaction_scheduled_ 与bg_bottom_compaction_scheduled_总数小于上限, 调度低优先级的
BGWorkCompaction
1
2
3
4
5
6
7
8
9
10// function函数指针为待注册的作业, arg为参数, pri为优先级, tag为标签, unschedFunction为移除时的回调函数
virtual void Schedule(void (*function)(void* arg), void* arg,
Priority pri = LOW, void* tag = nullptr,
void (*unschedFunction)(void* arg) = nullptr) = 0;
// 调度Flush作业
env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::HIGH, this, &DBImpl::UnscheduleFlushCallback);
// 调度Compaction作业
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this, &DBImpl::UnscheduleCompactionCallback);- 获取后台作业数(
后台Compaction作业
DBImpl::BGWorkCompaction(void* arg): 开启后台Compaction作业- 将
arg转为CompactionArg, 并从中拿到prepicked_compaction, 通过BackgroundCallCompaction传递 - 转发到
DBImpl::BackgroundCallCompaction()- 调用
DBImpl::BackgroundCompaction() - 调用
DBImpl::MaybeScheduleFlushOrCompaction(), 调度其他可能的作业
- 调用
- 将
DBImpl::BackgroundCompaction:首先会检查各参数的有效性, 是否有足够空间
特列检查: 只需删除输入文件, 只需将输入文件移动到到下一level不涉及Merge/split, 可以转发到
BGWorkBottomCompaction构建
CompactionJobDBImpl::NotifyOnCompactionBegin()compaction_job.Prepare()即CompactionJob::Prepare(): 确定子合并的边界cfd->CalculateSSTWriteHint(c->output_level()): 根据输出level计算文件的Lifetime Hint- 调用
CompactionJob::GenSubcompactionBoundaries: 对于每个输入文件, 通过扫描文件index block估算出128个锚点, 将sst大致划为128个子区间, 再将其合并为n个子合并区间, 并将边界保存再boundaries_中 - 如果设置了保留时间,则处理
seqno--time之间的映射, 该映射(“class SeqnoToTimeMapping”)用于估算key的time
compaction_job.Run()即CompactionJob::Run(): 执行合并操作- 初始化线程池, 预留出
子合并数-1个线程空间用于处理子合并 - 启动子合并线程
CompactionJob::ProcessKeyValueCompaction - 调用lambda匿名函数
verify_table验证输出的有效性, 复用线程池 - 使用
Compaction::SetOutputTableProperties()设置该Compaction的输出文件属性
- 初始化线程池, 预留出
compaction_job.Install()即CompactionJob::Install(): 将合并结果加入到当前版本中- 调用
InstallCompactionResults()更新VersionEdit - 调用
UpdateCompactionJobStats()更新作业统计信息
- 调用
InstallSuperVersionAndScheduleWork(...): 实际是InstallSuperVersion的封装, 还会检查新状态是否需要flush/compaction调用
SstFileManager的OnCompactionCompletion(), 更新记录DBImpl::NotifyOnCompactionCompleted()
Compaction子合并
CompactionJob::ProcessKeyValueCompaction(): 调用filter, 遍历输入KV并执行compaction- 如果有
CompactionService(可以理解为使用外部Compaction算法)则调用CompactionJob::ProcessKeyValueCompactionWithCompactionService(), 执行成功则返回;否则使用自带的Compaction算法, 即主分支 - 构建
InternalIterator, 指针对象raw_input和input - 构建
MergeHelper - 构建
CompactionIterator, 指针对象c_iter c_iter->SeekToFirst(), 定位到第一个KV对- 定义
CompactionFileOpenFunc/CompactionFileCloseFunc- 转发到
CompactionJob::OpenCompactionOutputFile() - 转发到
CompactionJob::FinishCompactionOutputFile()
- 转发到
- 开始循环迭代, 每次迭代增加一个KV对文件中
- 调用
sub_compact->AddToOutput(*c_iter, open_file_func, close_file_func), 将c_iter的KV对加到Current输出组中SubcompactionState::AddToOutput()转发到Current().AddToOutput(...),Current()返回指向class CompactionOutputs对象的指针- 将KV对加入validator
- 将KV对通过table builder加入SST,
builder_->Add(key, value) - 更新FileMetaData的最大最小键
c_iter->Next()迭代到下一KV-pair
- 调用
- 状态检查, 再次执行
sub_compact->CloseCompactionFiles()完成收尾工作
- 如果有
Compaction文件读写
CompactionJob::OpenCompactionOutputFile(...)- 调用
read_write_util中的NewWritableFile, 转发到fs->NewWritableFile, 创建一个制定了文件名的新文件对象, 用指针writable_file保存 - 设置该文件的
FileMetaData, 并调用outputs.AddOutput加入当前输出队列中 - 通过文件系统接口层中
class FSWritableFile提供的接口, 设置writable_file的IO优先级`lifetime_hint`等信息 - 调用
CompactionOutputs::AssignFileWriter()设置写入器, 设置TableBuilderOptions, 再通过CompactionOutputs::NewBuilder()创建TableBuilder对象
- 调用
Status CompactionJob::FinishCompactionOutputFile(...)TableBuilder::Add(K, V)向table中添加KV(以默认的Block_based_table_builder::Add为例)- 先检查
Key的ValueType - 检查是否需要
Flush()(将缓冲的KV对刷入文件中)- 需要则调用
BlockBasedTableBuilder::Flush(), 主逻辑转发到BlockBasedTableBuilder::WriteBlock() - 先通过
block->Finish()关闭当前block - 调用
BlockBasedTableBuilder::CompressAndVerifyBlock尝试压缩block内容 - 调用
BlockBasedTableBuilder::WriteMaybeCompressedBlock写入数据块- 核心流程
r->file->Append(block_contents), 调用WritableFileWriter::Append(...)- 这里用到了
use_direct_io()判断是否是直接IO, 我的大部分实验设置为direct, 如果需要验证数据有效性还是会先写到一个buffer用于计算校验和 FSWritableFile::PrepareWrite写前准备工作, 通过预分配空间可以减少文件碎片, 或者避免文件系统过度预分配带来的浪费- 核心是调用
buf_.Append(src, left),class AlignedBuffer根据对齐需求来管理一个缓冲区, 主要用于direct io场景,Append方法使用memcpy复制到buffer中 WritableFileWriter::Flush(): 将内存中数据写入文件系统/设备- 有缓冲写
WriteBuffered(...), 使用writable_file_->Append()写底层文件 - 无缓冲写
WriteDirect(...), 使用writable_file_->PositionedAppend()写底层文件 - 调用
writable_file_->Flush(...), 这是底层文件系统接口
- 有缓冲写
- 这里用到了
- 调用
ComputeBuiltinChecksumWithLastByte处理checksum - 调用
InsertBlockInCompressedCache, 将压缩后的block_content加入缓存
- 核心流程
- 需要则调用
r->data_block.AddWithLastKey, 调用BlockBuilder::AddWithLastKey转发到BlockBuilder::AddWithLastKeyImpl, 主要是操作对应block builder的buffer_(string类型), 通过append方法将key写入buffer_中r->last_key.assign(key.data(), key.size()), 更新last_key(string类型)值- 更新
TableProperties, 包括num_entries/raw_key_size/raw_value_size
- 先检查
Flush/Compaction相关数据结构
class CompactionOutputs: 维护子合并产生的文件, 多数方法由compaction_job的open/close合并文件的方法使用- 数据成员
std::unique_ptr<TableBuilder> builder_: SST构建器指针std::unique_ptr<WritableFileWriter> file_writer_: 文件写入器指针std::vector<Output> outputs_: 当前时刻所有输出Struct Output存放FileMetaData文件元数据/OutputValidator有效性验证器/TableProperties表格属性
- 重要方法
AddToOutput(...): 将文件添加到outputs_队列NewBuilder(...): 为当前output设置TableBuilder构建器AssignFileWriter(...): 为当前output设置WritableFileWriter写入器
- 数据成员
class CompactionState维护Compaction状态- 数据成员
Compaction* const compaction: 指向对应Compaction的常量指针std::vector<SubcompactionState> sub_compact_states: 维护子合并状态
- 方法
AggregateCompactionStats聚合合并状态Slice SmallestUserKey()/Slice LargestUserKey()获取用户键
- 数据成员
VersionEdit::AddFile(...): 向LSM树某一层添加文件
工具方法
int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a, const InternalKey& b)- 先直接调用
user_cmp->CompareWithoutTimestamp(a.user_key(), b.user_key()), 指比较两个InternalKey的UserKey部分, 并且不考虑时间戳 - 如果不相等就直接返回, 如果先等则调用
ExtractInternalKeyFooter(), 比较两个键的Footer, Footer就是InternalKey的最后8字节, 即SequenceNumber和ValueType的组合RocksDB中,Internal Key在默认情况下| User Key (变长) | Sequence Number (7 字节) | Value Type (1 字节) |
在启用了User Defined Timestamp| User Key (变长) | Timestamp (变长) | Sequence Number (7 字节) | Value Type (1 字节) |Timestamp用于支持基于时间戳的数据管理Sequence Number用于支持MVCC, 越大越新, 查询/合并时返回最新版本
- 先直接调用
RocksDB-ZenFS交互流程分析
CompactionJob::OpenCompactionOutputFile(...)–>调用FS的FileSystem::NewWritableFile(...)–>调用ZenFS的ZenFS::OpenWritableFile(...)–>通过新建ZonedWritableFile对象, 并返回对象指针,ZonedWritableFile继承自public FSWritableFileZonedWritableFile的Append方法在非buffered模式下, 调用ZoneFile::Append–>在对非活动分区写入或写入空间不够时调用ZoneFile::AllocateNewZone()分配新分区–>转发到ZonedBlockDevice::AllocateIOZone(...)执行分区分配逻辑ZoneFile维护一个ZoneExtent指针队列, 每个ZoneExtent对应一个Zone, 每个Zone维护了写指针wp_/WriteLifeTimeHint/capacity等ZonedWritableFile::Append与ZonedWritableFile::PositionedAppend- 如前文所述,
Append来自BufferedIO模式调用,PisitionAppend来自DirectIO模式调用 - 在ZenFS中,
PisitionAppend强制保证在wp_处写入, 其余与Append一致, 转发到zoneFile_->Append - 上层RocksDB会保证
Append和PositionAppend只选择一种
- 如前文所述,
RocksDB-Table-build源码分析
公共接口
table.h: 与sst相关的抽象类, 两种tableBlock-based table: 默认table类型, 来自LevelDBPlain table: RocksDB针对低访问延时设备(纯内存, 低访问延迟介质)优化class TableFactory : public Customizable: table工厂函数基类- 重要接口
virtual Status NewTableReader(...) = 0, 读取器有三个地方被调用TableCache::FindTable()table cache未命中, 返回一个tableSstFileDumper打开table并转储DBImpl::IngestExternalFile()用于添加SST中的内容
virtual TableBuilder* NewTableBuilder(...) = 0, 构建器/写入器, 有四个地方被调用DBImpl::WriteLevel0Table(): 通过BuildTable()间接调用, flush操作中Mem=>L0DBImpl::OpenCompactionOutputFile(): 写Compaction的输出时DBImpl::WriteLevel0TableForRecovery: 通过BuildTable()间接调用, 从transaction log中恢复时Repairer::ConvertLogToTable(): 通过BuildTable()间接调用, 在修复阶段时, 将log转为sst- 调用者有责任保持文件打开,并在关闭表构建器后关闭文件
- 重要接口
实现
table\table_builder.h:- 声明
struct TableReaderOptions和struct TableBuilderOptions class TableBuilder: 用于构建Table的接口virtual void Add(const Slice& key, const Slice& value) = 0向构建中的表添加KV对virtual Status Finish() = 0完成构建virtual uint64_t NumEntries() const = 0
- 声明
table\block_based\: block_based table的具体实现block_based_table_factory.hblock_based_table_builder.hclass BlockBasedTableBuilder : public TableBuilder
参考资料
[1] RocksDB学习笔记#6 Compaction流程(1) —— 触发流程
[2] RocksDB学习笔记#7 Compaction流程(2) —— 执行流程
[3] rocksdb介绍之compaction流程
[4] RocksDB Wiki – Compaction
[5] RocksDB Wiki – Compaction 中文文档