RocksDB的Compaction/Flush分析

RocksDB关键流程

Flush流程

后台Flush作业

  • 调度Flush作业流程与compaction类似: DBImpl::BGWorkFlush–>DBImpl::BackgroundCallFlush–>DBImpl::BackgroundFlush–>DBImpl::FlushMemTablesToOutputFiles–>DBImpl::FlushMemTableToOutputFile
  • DBImpl::BGWorkFlush(void* arg): 开启后台Flush作业
    • 转发到DBImpl::BackgroundCallFlush(Env::Priority thread_pri)
      • 调用DBImpl::BackgroundFlush()
        • 调用DBImpl::FlushMemTablesToOutputFiles()
          • 调用DBImpl::FlushMemTableToOutputFile()
    • 检查状态
    • 调用DBImpl::MaybeScheduleFlushOrCompaction()

Flush作业主控流程

  • DBImpl::FlushMemTableToOutputFile: 将Memtable刷盘, 核心流程
    • 创建FlushJob flush_job
    • 调用成员方法FlushJob::PickMemTable(), 选择需要刷盘的memtable
      • 获取该列族的imm_(imutable memtable)列表, 调用MemTableList::PickMemtablesToFlush(...)
        • 遍历current_->memlist_, 将未flush的(!flush_in_grogress_)的memtable加入到mems_中, mems_中按时间升序排列
        • 不管mems_中有多少个memtable, 均有第一个memtableedit_来记录该次flush的元数据
    • 调用成员方法FlushJob::Run(...), 启动刷盘
      • 首先会检查是否需要MemPurge(), 即Memtable垃圾回收

        将Immutable Memtable中过期内容清除, 将有效数据保存到新的memtable中, 旨在减少对SSD的写操作, 并降低写放大

      • 执行FlushJob::WriteLevel0Table(), 执行刷盘
        • memtables保存所有mm的迭代器
        • 设定TableBuilderOptions tboptions
        • 调用BuildTable(), 生成L0层SSTable
        • 调用AddFile()将生成文件加入当前edit_, 相当于注册到LSM树的L0层
        • 调用FlushJob::GetFlushJobInfo()获得作业详细信息(包括file_path, table_properties), 并设置mems_队列的首个memtable
      • 如果需要写最新版本MANIFEST, 会调用MemtableList::TryInstallMemtableFlushResults(...)
    • 安装新版本DBImpl::InstallSuperVersionAndScheduleWork
    • 通知完成Flush, 在SstFileManagerImpl注册生成的SSTable对象

BuildTable工作流程

  • 调用NewWritableFile申请可写文件file, 并设定IO优先级和LifetimeHint
  • 设置WritableFileWriter, 并调用NewTableBuilder, 与CompactionJob::OpenCompactionOutputFile()类似
  • 设置MergeHelper和用于KV迭代的CompactionIterator
  • 迭代c_iter, 调用TableBuilderAdd(key, value)将KV对加入l0层的SSTable
  • 更新待刷新第一个Memtable的FileMetaData(保存整个FlushJob元数据)和入参TableProperties(生成L0层SSTable元数据)
  • 构建OutputValidatorInternalIterator检查新生成SSTable的有效性

Compactoion流程

调度Flush/Compaction作业

  • DBImpl::InstallSuperVersionAndScheduleWork: 所有列族的状态通过该函数改变, 该方法分析列族的新状态, 并决定是否需要flush/compaction

    1. 调用cfs->InstallSuperVersion(...)
    2. 调用SchedulePendingCompaction(cfd):
    3. 调用MaybeScheduleFlushOrCompaction
  • DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd):

    1. 如果当前列族未在compaction_queue_中, 且NeedsCompaction()返回需要, 则通过AddToCompactionQueue(cfd)加入等待调度的Compaction队列
    2. cfd->NeedCompaction()会检查是否开启了自动compaction, 并转发至compaction_picker->NeedCompaction(current_-storage_info())
    3. 完成后调用AddToCompactionQueue(cfd), 增加待调度的Compaction作业数
  • class CompactionPicker: db/compaction/compaction_picker.h

    • 用于执行compaction文件选择的抽象类, RocksDB提供了compaction_picker_level/compaction_picker_fifo/compaction_picker_universal
    • 默认使用LevelCompactionPicker, 如下
      • 看是否需要Compaction接口LevelCompactionPicker::NeedsCompaction()
        • 如果有超时TTL/被标记为Compaction的则返回Ture
        • 自顶向下获取(已被排序)CompactionScore, 有大于1返回Ture, CompactionScore事先由VersionStorageInfo::ComputeCompactionScore计算
        • VersionStorageInfo::ComputeCompactionScore逻辑
          • 如果为L0: 统计该层文件数/文件大小, 得分=文件数/预先设定的阈值
          • L1开始: 未参与Compaction文件总大小/设定阈值,
          • 将compaction_score从高到低排序, 高分在list前部
          • 更新TTL队列, 周期Compaction队列等, 这些均在NeedCompaction中首先被判断
      • LevelCompactionPicker::PickCompaction(...): 构建LevelCompactionBuilder并调用builder.PickCompaction(), 返回一个Compaction
      • LevelCompactionBuilder::PickCompaction()流程如下
        • LevelCompactionBuilder::SetupInitialFiles(): 对所有大于阈值的层调用PickFileToCompact,获得CompactionInputFiles,并确定CompactionReason
          • struct CompactionInputFiles, 位于db/compaction/compaction.h
            • 维护Compaction输入文件数组vector<FileMetaData*> files
            • 维护compaction边界std::vector<AtomicCompactionUnitBoundary> atomic_compaction_unit_boundaries, 即InternalKey的最大值和最小值
        • LevelCompactionBuilder::SetupOtherL0FilesIfNeeded()
        • LevelCompactionBuilder::SetupOtherInputsIfNeeded()
        • LevelCompactionBuilder::GetCompaction()
  • DBImpl::MaybeScheduleFlushOrCompaction():

    1. 获取后台作业数(DBImpl::GetBGJobLimits())
    2. 获取flush_pool是否为空, 即env_->GetBackgroundThreads(Env::Priority::HIGH) == 0
    3. 如果flush_pool不空, 且未达到job上限, 在while循环中调度高优先级的BGWorkFlush
    4. 如果flush_pool为空, 则调度低优先级的BGWorkFlush
    5. 如果bg_compaction_scheduled_ 与bg_bottom_compaction_scheduled_总数小于上限, 调度低优先级的BGWorkCompaction
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // function函数指针为待注册的作业, arg为参数, pri为优先级, tag为标签, unschedFunction为移除时的回调函数
    virtual void Schedule(void (*function)(void* arg), void* arg,
    Priority pri = LOW, void* tag = nullptr,
    void (*unschedFunction)(void* arg) = nullptr) = 0;

    // 调度Flush作业
    env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::HIGH, this, &DBImpl::UnscheduleFlushCallback);

    // 调度Compaction作业
    env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this, &DBImpl::UnscheduleCompactionCallback);

后台Compaction作业

  • DBImpl::BGWorkCompaction(void* arg): 开启后台Compaction作业

    • arg转为CompactionArg, 并从中拿到prepicked_compaction, 通过BackgroundCallCompaction传递
    • 转发到DBImpl::BackgroundCallCompaction()
      • 调用DBImpl::BackgroundCompaction()
      • 调用DBImpl::MaybeScheduleFlushOrCompaction(), 调度其他可能的作业
  • DBImpl::BackgroundCompaction:

    • 首先会检查各参数的有效性, 是否有足够空间

    • 特列检查: 只需删除输入文件, 只需将输入文件移动到到下一level不涉及Merge/split, 可以转发到BGWorkBottomCompaction

    • 构建CompactionJob

    • DBImpl::NotifyOnCompactionBegin()

    • compaction_job.Prepare()CompactionJob::Prepare(): 确定子合并的边界

      • cfd->CalculateSSTWriteHint(c->output_level()): 根据输出level计算文件的Lifetime Hint
      • 调用CompactionJob::GenSubcompactionBoundaries: 对于每个输入文件, 通过扫描文件index block估算出128个锚点, 将sst大致划为128个子区间, 再将其合并为n个子合并区间, 并将边界保存再boundaries_中
      • 如果设置了保留时间,则处理seqno--time之间的映射, 该映射(“class SeqnoToTimeMapping”)用于估算key的time
    • compaction_job.Run()CompactionJob::Run(): 执行合并操作

      • 初始化线程池, 预留出子合并数-1个线程空间用于处理子合并
      • 启动子合并线程CompactionJob::ProcessKeyValueCompaction
      • 调用lambda匿名函数verify_table验证输出的有效性, 复用线程池
      • 使用Compaction::SetOutputTableProperties()设置该Compaction的输出文件属性
    • compaction_job.Install()CompactionJob::Install(): 将合并结果加入到当前版本中

      • 调用InstallCompactionResults()更新VersionEdit
      • 调用UpdateCompactionJobStats()更新作业统计信息
    • InstallSuperVersionAndScheduleWork(...): 实际是InstallSuperVersion的封装, 还会检查新状态是否需要flush/compaction

    • 调用SstFileManagerOnCompactionCompletion(), 更新记录

    • DBImpl::NotifyOnCompactionCompleted()

Compaction子合并

  • CompactionJob::ProcessKeyValueCompaction(): 调用filter, 遍历输入KV并执行compaction
    • 如果有CompactionService(可以理解为使用外部Compaction算法)则调用CompactionJob::ProcessKeyValueCompactionWithCompactionService(), 执行成功则返回;否则使用自带的Compaction算法, 即主分支
    • 构建InternalIterator, 指针对象raw_inputinput
    • 构建MergeHelper
    • 构建CompactionIterator, 指针对象c_iter
    • c_iter->SeekToFirst(), 定位到第一个KV对
    • 定义CompactionFileOpenFunc/CompactionFileCloseFunc
      • 转发到CompactionJob::OpenCompactionOutputFile()
      • 转发到CompactionJob::FinishCompactionOutputFile()
    • 开始循环迭代, 每次迭代增加一个KV对文件中
      • 调用sub_compact->AddToOutput(*c_iter, open_file_func, close_file_func), 将c_iter的KV对加到Current输出组中
        • SubcompactionState::AddToOutput()转发到Current().AddToOutput(...), Current()返回指向class CompactionOutputs对象的指针
          • 将KV对加入validator
          • 将KV对通过table builder加入SST, builder_->Add(key, value)
          • 更新FileMetaData的最大最小键
      • c_iter->Next()迭代到下一KV-pair
    • 状态检查, 再次执行sub_compact->CloseCompactionFiles()完成收尾工作

Compaction文件读写

  • CompactionJob::OpenCompactionOutputFile(...)
    • 调用read_write_util中的NewWritableFile, 转发到fs->NewWritableFile, 创建一个制定了文件名的新文件对象, 用指针writable_file保存
    • 设置该文件的FileMetaData, 并调用outputs.AddOutput加入当前输出队列中
    • 通过文件系统接口层中class FSWritableFile提供的接口, 设置writable_file的IO优先级`lifetime_hint`等信息
    • 调用CompactionOutputs::AssignFileWriter()设置写入器, 设置TableBuilderOptions, 再通过CompactionOutputs::NewBuilder()创建TableBuilder对象
  • Status CompactionJob::FinishCompactionOutputFile(...)
  • TableBuilder::Add(K, V)向table中添加KV(以默认的Block_based_table_builder::Add为例)
    • 先检查KeyValueType
    • 检查是否需要Flush()(将缓冲的KV对刷入文件中)
      • 需要则调用BlockBasedTableBuilder::Flush(), 主逻辑转发到BlockBasedTableBuilder::WriteBlock()
      • 先通过block->Finish()关闭当前block
      • 调用BlockBasedTableBuilder::CompressAndVerifyBlock尝试压缩block内容
      • 调用BlockBasedTableBuilder::WriteMaybeCompressedBlock写入数据块
        • 核心流程r->file->Append(block_contents), 调用WritableFileWriter::Append(...)
          • 这里用到了use_direct_io()判断是否是直接IO, 我的大部分实验设置为direct, 如果需要验证数据有效性还是会先写到一个buffer用于计算校验和
          • FSWritableFile::PrepareWrite写前准备工作, 通过预分配空间可以减少文件碎片, 或者避免文件系统过度预分配带来的浪费
          • 核心是调用buf_.Append(src, left), class AlignedBuffer根据对齐需求来管理一个缓冲区, 主要用于direct io场景, Append方法使用memcpy复制到buffer中
          • WritableFileWriter::Flush(): 将内存中数据写入文件系统/设备
            • 有缓冲写WriteBuffered(...), 使用writable_file_->Append()写底层文件
            • 无缓冲写WriteDirect(...), 使用writable_file_->PositionedAppend()写底层文件
            • 调用writable_file_->Flush(...), 这是底层文件系统接口
        • 调用ComputeBuiltinChecksumWithLastByte处理checksum
        • 调用InsertBlockInCompressedCache, 将压缩后的block_content加入缓存
    • r->data_block.AddWithLastKey, 调用BlockBuilder::AddWithLastKey转发到BlockBuilder::AddWithLastKeyImpl, 主要是操作对应block builderbuffer_(string类型), 通过append方法将key写入buffer_
    • r->last_key.assign(key.data(), key.size()), 更新last_key(string类型)值
    • 更新TableProperties, 包括num_entries/raw_key_size/raw_value_size

Flush/Compaction相关数据结构

  • class CompactionOutputs: 维护子合并产生的文件, 多数方法由compaction_job的open/close合并文件的方法使用

    • 数据成员
      • std::unique_ptr<TableBuilder> builder_: SST构建器指针
      • std::unique_ptr<WritableFileWriter> file_writer_: 文件写入器指针
      • std::vector<Output> outputs_: 当前时刻所有输出
        • Struct Output存放FileMetaData文件元数据/OutputValidator有效性验证器/TableProperties表格属性
    • 重要方法
      • AddToOutput(...): 将文件添加到outputs_队列
      • NewBuilder(...): 为当前output设置TableBuilder构建器
      • AssignFileWriter(...): 为当前output设置WritableFileWriter写入器
  • class CompactionState 维护Compaction状态

    • 数据成员
      • Compaction* const compaction: 指向对应Compaction的常量指针
      • std::vector<SubcompactionState> sub_compact_states: 维护子合并状态
    • 方法
      • AggregateCompactionStats聚合合并状态
      • Slice SmallestUserKey()/Slice LargestUserKey()获取用户键
  • VersionEdit::AddFile(...): 向LSM树某一层添加文件

工具方法

  • int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a, const InternalKey& b)
    • 先直接调用user_cmp->CompareWithoutTimestamp(a.user_key(), b.user_key()), 指比较两个InternalKeyUserKey部分, 并且不考虑时间戳
    • 如果不相等就直接返回, 如果先等则调用ExtractInternalKeyFooter(), 比较两个键的Footer, Footer就是InternalKey的最后8字节, 即SequenceNumberValueType的组合

      RocksDB中, Internal Key在默认情况下
      | User Key (变长) | Sequence Number (7 字节) | Value Type (1 字节) |
      在启用了User Defined Timestamp
      | User Key (变长) | Timestamp (变长) | Sequence Number (7 字节) | Value Type (1 字节) |
      Timestamp用于支持基于时间戳的数据管理
      Sequence Number用于支持MVCC, 越大越新, 查询/合并时返回最新版本

RocksDB-ZenFS交互流程分析

  • CompactionJob::OpenCompactionOutputFile(...)–>调用FS的FileSystem::NewWritableFile(...)–>调用ZenFS的ZenFS::OpenWritableFile(...)–>通过新建ZonedWritableFile对象, 并返回对象指针, ZonedWritableFile继承自public FSWritableFile
  • ZonedWritableFileAppend方法在非buffered模式下, 调用ZoneFile::Append–>在对非活动分区写入或写入空间不够时调用ZoneFile::AllocateNewZone()分配新分区–>转发到ZonedBlockDevice::AllocateIOZone(...)执行分区分配逻辑
  • ZoneFile维护一个ZoneExtent指针队列, 每个ZoneExtent对应一个Zone, 每个Zone维护了写指针wp_/WriteLifeTimeHint/capacity
  • ZonedWritableFile::AppendZonedWritableFile::PositionedAppend
    • 如前文所述, Append来自BufferedIO模式调用, PisitionAppend来自DirectIO模式调用
    • 在ZenFS中, PisitionAppend强制保证在wp_处写入, 其余与Append一致, 转发到zoneFile_->Append
    • 上层RocksDB会保证AppendPositionAppend只选择一种

RocksDB-Table-build源码分析

公共接口

  • table.h: 与sst相关的抽象类, 两种table
    • Block-based table: 默认table类型, 来自LevelDB
    • Plain table: RocksDB针对低访问延时设备(纯内存, 低访问延迟介质)优化
    • class TableFactory : public Customizable: table工厂函数基类
      • 重要接口
        • virtual Status NewTableReader(...) = 0, 读取器有三个地方被调用
          • TableCache::FindTable() table cache未命中, 返回一个table
          • SstFileDumper 打开table并转储
          • DBImpl::IngestExternalFile() 用于添加SST中的内容
        • virtual TableBuilder* NewTableBuilder(...) = 0, 构建器/写入器, 有四个地方被调用
          • DBImpl::WriteLevel0Table(): 通过BuildTable()间接调用, flush操作中Mem=>L0
          • DBImpl::OpenCompactionOutputFile(): 写Compaction的输出时
          • DBImpl::WriteLevel0TableForRecovery: 通过BuildTable()间接调用, 从transaction log中恢复时
          • Repairer::ConvertLogToTable(): 通过BuildTable()间接调用, 在修复阶段时, 将log转为sst
          • 调用者有责任保持文件打开,并在关闭表构建器后关闭文件

实现

  • table\table_builder.h:
    • 声明struct TableReaderOptionsstruct TableBuilderOptions
    • class TableBuilder: 用于构建Table的接口
      • virtual void Add(const Slice& key, const Slice& value) = 0 向构建中的表添加KV对
      • virtual Status Finish() = 0 完成构建
      • virtual uint64_t NumEntries() const = 0
  • table\block_based\: block_based table的具体实现
    • block_based_table_factory.h
    • block_based_table_builder.h
      • class BlockBasedTableBuilder : public TableBuilder

参考资料

[1] RocksDB学习笔记#6 Compaction流程(1) —— 触发流程
[2] RocksDB学习笔记#7 Compaction流程(2) —— 执行流程
[3] rocksdb介绍之compaction流程
[4] RocksDB Wiki – Compaction
[5] RocksDB Wiki – Compaction 中文文档


RocksDB的Compaction/Flush分析
https://yee686.github.io/2024/12/17/RocksDB的Compaction分析/
作者
Yi Zhengye
发布于
2024年12月17日
许可协议