Masutangu

长风破浪会有时 直挂云帆济沧海

也許我這一生 始終在追逐那顆九號球


LevelDB 源码阅读(一)

这篇文章主要记录 LevelDB 的重要模块、类以及方法。把读写操作和 Compaction 操作的代码串了一遍,并添加了小部分注释。

模块

Log 文件

客户端的写请求会先 append 到 Log 文件,成功后再写入到 Memtable。如果宕机可以通过 Log 文件来恢复 Memtable。

Memtable 和 Immutable Memtable

内存数据结构,基于跳表。客户端的读写请求都会由 Memtable 处理。 当 Memtable 占用的内存达到一定阈值,重新生成新的 Memtable 处理客户端请求。原来的 Memtable 转成 Immutable Memtable,等待归并到 SST 文件中。

SST 文件

落地到磁盘的存储文件。SST 分为不同的 level,具体参考文档

Manifest 文件

Manifest 记录不同 level 的 SST 文件,包括每个 SST 文件的 key range、大小等 metadata。

Current 文件

Current 记录了最新的 Manifest 文件。

类成员变量

class DBImpl : public DB {
  private:
    TableCache* table_cache_;
    MemTable* mem_;
    MemTable* imm_;
    WritableFile* logfile_;
    log::Writer* log_;
    std::deque<Writer*> writers_;
    VersionSet* versions_;

    // Set of table files to protect from deletion because they are
    // part of ongoing compactions.
    std::set<uint64_t> pending_outputs_;
};

class MemTable {
  private:
    typedef SkipList<const char*, KeyComparator> Table;
    Arena arena_;  // 内存池
    Table table_;  // 跳表
};

struct FileMetaData {
  int refs;
  int allowed_seek;   // seeks allowed until compaction
  uint64_t number;    // ?? 
  uint64_t file_size;
  InternalKey smallest;
  InternalKey largest;
};

class VersionEdit {
  private:
    typedef std::set< std::pair<int, uint64_t> > DeletedFileSet;

    std::vector< std::pair<int, InternalKey> > compact_pointers_;
    DeletedFileSet deleted_files_;
    std::vector< std::pair<int, FileMetaData> > new_files_;
};

class Version {
  public:
    Status Get(const ReadOptions&, const LookupKey& key, std::string* val, 
               GetStats* stats);
  private:
    VersionSet* vset_;
    Version* next_;
    Version* prev_;

    // list of files per level
    std::vector<FileMetaData*> files_[config::kNumLevels];
};

class TableCache {
  public:
    Status Get(const ReadOptions& options, uint64_t file_number, 
               uint64_t file_size, const Slice& k, void *arg, 
               void (*handle_result)(void*, const Slice&, const Slice&));

  private:
    Cache* cache_;
};

class VersionSet {
  private:
    TableCache* const table_cache_;
    WritableFile* descriptor_file_;
    log::Writer* descriptor_log_;
    Version dummy_versions_;  // Head of circurlar doubly-linked list of versions  
    Version* current_;        // == dummy_versions_.prev_
};

class WriteBatch {
  public:
    class Handler {
    public:
        virtual ~Handler();
        virtual void Put(const Slice& key, const Slice& value) = 0;
        virtual void Delete(const Slice& key) = 0;
    };
  private:
    friend class WriteBatchInternal;
    std::string req_;
}

struct DBImpl::Writer {
  Status status;
  WriteBatch* batch;
  bool sync;
  bool done;
  port::CondVar cv;

  expplicit Writer(port::Mutex* mu) : cv(mu) { }
};

class Compaction {
  private:
    Version* input_version_;
    VersionEdit edit_;

    // Each compaction reads inputs from "level_" and "level_+1"
    std::vector<FileMetaData*> inputs_[2];      // The two sets of inputs

    // State used to check for number of of overlapping grandparent files
    // (parent == level_ + 1, grandparent == level_ + 2)
    std::vector<FileMetaData*> grandparents_;
    size_t grandparent_index_;  // Index in grandparent_starts_
    bool seen_key_;             // Some output key has been seen
    int64_t overlapped_bytes_;  // Bytes of overlap between current output
                                // and grandparent files

    // level_ptrs_ holds indices into input_version_->levels_: our state
    // is that we are positioned at one of the file ranges for each
    // higher level than the ones involved in this compaction (i.e. for
    // all L >= level_ + 2).
    size_t level_ptrs_[config::kNumLevels];
};

主要操作

读操作

Status DBImpl::Get(const ReadOptions& options,
                   const Slice& key,
                   std::string* value) {

  MutexLock l(&mutex_);
  MemTable* mem = mem_;
  MemTable* imm = imm_;
  Version* current = versions_->current();

  bool have_stat_update = false;
  Version::GetStats stats;

  // Unlock while reading from files and memtables
  {
    mutex_.Unlock();
    // First look in the memtable, then in the immutable memtable (if any).
    LookupKey lkey(key, snapshot);
    if (mem->Get(lkey, value, &s)) {  // 1)先在 MemTable 中查找
      // Done
    } else if (imm != NULL && imm->Get(lkey, value, &s)) {  // 2)再在 Imutable MemTable 中查找
      // Done
    } else {
      s = current->Get(options, lkey, value, &stats);  // 3) 最后在当前 Version 中查找
      have_stat_update = true;
    }
    mutex_.Lock();
  }

  // UpdateStats 减去 allowed_seeks,如果小于等于 0,则设置 file_to_compact_,准备 compaction
  if (have_stat_update && current->UpdateStats(stats)) {
    MaybeScheduleCompaction();
  }
  
  return s;
}
// Version 类的 Get 方法
Status Version::Get(const ReadOptions& options,
                    const LookupKey& k,
                    std::string* value,
                    GetStats* stats) {
  Slice ikey = k.internal_key();
  Slice user_key = k.user_key();
  const Comparator* ucmp = vset_->icmp_.user_comparator();
  Status s;

  stats->seek_file = NULL;
  stats->seek_file_level = -1;
  FileMetaData* last_file_read = NULL;
  int last_file_read_level = -1;

  // We can search level-by-level since entries never hop across
  // levels.  Therefore we are guaranteed that if we find data
  // in an smaller level, later levels are irrelevant.
  std::vector<FileMetaData*> tmp;
  FileMetaData* tmp2;
  for (int level = 0; level < config::kNumLevels; level++) {
    size_t num_files = files_[level].size();
    if (num_files == 0) continue;

    // 这里省略一大段代码 files 指向候选文件列表,num_files 为列表的长度。具体实现看源码

    for (uint32_t i = 0; i < num_files; ++i) {
      if (last_file_read != NULL && stats->seek_file == NULL) {
        // We have had more than one seek for this read.  Charge the 1st file.
        // last_file_read 保存的其实就是第一个查找未命中的文件,函数返回后会调用 UpdateStats 来减去 allowed_seeks
        stats->seek_file = last_file_read;
        stats->seek_file_level = last_file_read_level;
      }

      FileMetaData* f = files[i];
      last_file_read = f;
      last_file_read_level = level;

      Saver saver;
      saver.state = kNotFound;
      saver.ucmp = ucmp;
      saver.user_key = user_key;
      saver.value = value;
      // 从 TableCache 中读取文件内容
      s = vset_->table_cache_->Get(options, f->number, f->file_size,
                                   ikey, &saver, SaveValue);
      if (!s.ok()) {
        return s;
      }
      switch (saver.state) {
        case kNotFound:
          break;      // Keep searching in other files
        case kFound:
          return s;
        case kDeleted:
          s = Status::NotFound(Slice());  // Use empty error message for speed
          return s;
        case kCorrupt:
          s = Status::Corruption("corrupted key for ", user_key);
          return s;
      }
    }
  }

  return Status::NotFound(Slice());  // Use an empty error message for speed
}
Status TableCache::Get(const ReadOptions& options, uint64_t file_number, 
                      uint64_t file_size, const Slice& k, void *arg,
                      void (*saver)(void* const Slice&, const Slice&)) {
  Cache::Handle* handle = NULL;
  Status s = FindTable(file_number, file_size, &handle);
  if (s.ok()) {
    Table* t = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table;
    s = t->InternalGet(options, k, arg, saver);
    cache_->Release(handle);
  }
  return s;                        
}

查找顺序如下图:

写操作

Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
  WriteBatch batch;
  batch.Put(key, value);
  return Write(opt, &batch);
}

Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
  Writer w(&mutex_);
  w.batch = my_batch;
  w.sync = options.sync;
  w.done = false;

  MutexLock l(&mutex_);
  writers_.push_back(&w);
  // 生产者消费者模型
  while (!w.done && &w != writers_.front()) {
    w.cv.Wait();
  }
  // 写操作有可能被合并处理,因此有可能取到的时候写入已经完成。完成的话直接返回
  if (w.done) {
    return w.status;
  }

  // May temporarily unlock and wait.
  // MakeRoomForWrite 判断是非需要归并 memtable
  Status status = MakeRoomForWrite(my_batch == NULL);
  uint64_t last_sequence = versions_->LastSequence();
  Writer* last_writer = &w;
  if (status.ok() && my_batch != NULL) {  // NULL batch is for compactions
    WriteBatch* updates = BuildBatchGroup(&last_writer); // 合并写操作
    WriteBatchInternal::SetSequence(updates, last_sequence + 1);
    last_sequence += WriteBatchInternal::Count(updates);

    // Add to log and apply to memtable.  We can release the lock
    // during this phase since &w is currently responsible for logging
    // and protects against concurrent loggers and concurrent writes
    // into mem_.
    {
      mutex_.Unlock();
      status = log_->AddRecord(WriteBatchInternal::Contents(updates));
      bool sync_error = false;
      if (status.ok() && options.sync) {
        status = logfile_->Sync();
        if (!status.ok()) {
          sync_error = true;
        }
      }
      if (status.ok()) {
        status = WriteBatchInternal::InsertInto(updates, mem_);
      }
      mutex_.Lock();
      if (sync_error) {
        // The state of the log file is indeterminate: the log record we
        // just added may or may not show up when the DB is re-opened.
        // So we force the DB into a mode where all future writes fail.
        RecordBackgroundError(status);
      }
    }
    if (updates == tmp_batch_) tmp_batch_->Clear();

    versions_->SetLastSequence(last_sequence);
  }

  while (true) {
    Writer* ready = writers_.front();
    writers_.pop_front();
    if (ready != &w) {
      ready->status = status;
      ready->done = true;
      ready->cv.Signal();
    }
    if (ready == last_writer) break;
  }

  // Notify new head of write queue
  if (!writers_.empty()) {
    writers_.front()->cv.Signal();
  }

  return status;
}
// REQUIRES: Writer list must be non-empty
// REQUIRES: First writer must have a non-NULL batch
// 尝试合并写操作
WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
  assert(!writers_.empty());
  Writer* first = writers_.front();
  WriteBatch* result = first->batch;
  assert(result != NULL);

  size_t size = WriteBatchInternal::ByteSize(first->batch);

  // Allow the group to grow up to a maximum size, but if the
  // original write is small, limit the growth so we do not slow
  // down the small write too much.
  size_t max_size = 1 << 20;
  if (size <= (128<<10)) {
    max_size = size + (128<<10);
  }

  *last_writer = first;
  std::deque<Writer*>::iterator iter = writers_.begin();
  ++iter;  // Advance past "first"
  for (; iter != writers_.end(); ++iter) {
    Writer* w = *iter;
    if (w->sync && !first->sync) {
      // Do not include a sync write into a batch handled by a non-sync write.
      break;
    }

    if (w->batch != NULL) {
      size += WriteBatchInternal::ByteSize(w->batch);
      if (size > max_size) {
        // Do not make batch too big
        break;
      }

      // Append to *result
      // 把合并的写请求保存在成员变量 tmp_batch_ 中,避免和调用者的写请求混淆在一起
      if (result == first->batch) {
        // Switch to temporary batch instead of disturbing caller's batch
        result = tmp_batch_;
        assert(WriteBatchInternal::Count(result) == 0);
        WriteBatchInternal::Append(result, first->batch);
      }
      WriteBatchInternal::Append(result, w->batch);
    }
    *last_writer = w;
  }
  return result;
}
Status WriteBatchInternal::InsertInto(const WriteBatch* b,
                                      MemTable* memtable) {
  MemTableInserter inserter;
  inserter.sequence_ = WriteBatchInternal::Sequence(b);
  inserter.mem_ = memtable;
  return b->Iterate(&inserter);
}
Status WriteBatch::Iterate(Handler* handler) const {
  Slice input(rep_);
  if (input.size() < kHeader) {
    return Status::Corruption("malformed WriteBatch (too small)");
  }

  input.remove_prefix(kHeader);
  Slice key, value;
  int found = 0;
  while (!input.empty()) {
    found++;
    char tag = input[0];
    input.remove_prefix(1);
    switch (tag) {
      case kTypeValue:
        if (GetLengthPrefixedSlice(&input, &key) &&
            GetLengthPrefixedSlice(&input, &value)) {
          handler->Put(key, value);
        } else {
          return Status::Corruption("bad WriteBatch Put");
        }
        break;
      case kTypeDeletion:
        if (GetLengthPrefixedSlice(&input, &key)) {
          handler->Delete(key);
        } else {
          return Status::Corruption("bad WriteBatch Delete");
        }
        break;
      default:
        return Status::Corruption("unknown WriteBatch tag");
    }
  }
  if (found != WriteBatchInternal::Count(this)) {
    return Status::Corruption("WriteBatch has wrong count");
  } else {
    return Status::OK();
  }
}

Compaction

Compaction 触发时机:

  • Immutable MemTable 不为空
  • 指定了 Manual Compaction
  • VersionSet NeedsCompaction 返回 True
    • compaction_score_ 大于 1
    • file_to_compact_ 不为空
void DBImpl::MaybeScheduleCompaction() {
  mutex_.AssertHeld();
  if (bg_compaction_scheduled_) {
    // Already scheduled
  } else if (shutting_down_.Acquire_Load()) {
    // DB is being deleted; no more background compactions
  } else if (!bg_error_.ok()) {
    // Already got an error; no more changes
  } else if (imm_ == NULL &&
             manual_compaction_ == NULL &&
             !versions_->NeedsCompaction()) {
    // No work to be done
  } else {
    bg_compaction_scheduled_ = true;
    env_->Schedule(&DBImpl::BGWork, this);
  }
}

bool VersionSet::NeedsCompaction() const {
  Version* v = current_;
  return (v->compaction_score_ >= 1) || (v->file_to_compact_ != NULL);
}

compaction_score_ 的计算如下:

void VersionSet::Finalize(Version* v) {
  // Precomputed best level for next compaction
  int best_level = -1;
  double best_score = -1;

  for (int level = 0; level < config::kNumLevels-1; level++) {
    double score;
    if (level == 0) {
      // We treat level-0 specially by bounding the number of files
      // instead of number of bytes for two reasons:
      //
      // (1) With larger write-buffer sizes, it is nice not to do too
      // many level-0 compactions.
      //
      // (2) The files in level-0 are merged on every read and
      // therefore we wish to avoid too many files when the individual
      // file size is small (perhaps because of a small write-buffer
      // setting, or very high compression ratios, or lots of
      // overwrites/deletions).
      score = v->files_[level].size() /
          static_cast<double>(config::kL0_CompactionTrigger);
    } else {
      // Compute the ratio of current size to size limit.
      const uint64_t level_bytes = TotalFileSize(v->files_[level]);
      score = static_cast<double>(level_bytes) / MaxBytesForLevel(level);
    }

    if (score > best_score) {
      best_level = level;
      best_score = score;
    }
  }

  v->compaction_level_ = best_level;
  v->compaction_score_ = best_score;
}

file_to_compact_ 则是由 allowed_seeks 来控制。从下面代码的注释可知 25 次 seek 的开销和一次 compaction 的开销差不多。allowed_seeks 可以理解为文件剩余查找次数,每次查找失败allowed_seeks 就会减 1。当 allowed_seeks 小于等于 0,意味着应该启动 compaction 来减少查找未命中带来的 seek 的开销了:

bool Version::UpdateStats(const GetStats& stats) {
  FileMetaData* f = stats.seek_file;
  if (f != NULL) {
    f->allowed_seeks--;
    if (f->allowed_seeks <= 0 && file_to_compact_ == NULL) {
      file_to_compact_ = f;
      file_to_compact_level_ = stats.seek_file_level;
      return true;
    }
  }
  return false;
}

// Apply all of the edits in *edit to the current state.
void Builder::Apply(VersionEdit* edit) {
  // Update compaction pointers
  for (size_t i = 0; i < edit->compact_pointers_.size(); i++) {
    const int level = edit->compact_pointers_[i].first;
    vset_->compact_pointer_[level] =
        edit->compact_pointers_[i].second.Encode().ToString();
  }

  // Delete files
  const VersionEdit::DeletedFileSet& del = edit->deleted_files_;
  for (VersionEdit::DeletedFileSet::const_iterator iter = del.begin();
        iter != del.end();
        ++iter) {
    const int level = iter->first;
    const uint64_t number = iter->second;
    levels_[level].deleted_files.insert(number);
  }

  // Add new files
  for (size_t i = 0; i < edit->new_files_.size(); i++) {
    const int level = edit->new_files_[i].first;
    FileMetaData* f = new FileMetaData(edit->new_files_[i].second);
    f->refs = 1;

    // We arrange to automatically compact this file after
    // a certain number of seeks.  Let's assume:
    //   (1) One seek costs 10ms
    //   (2) Writing or reading 1MB costs 10ms (100MB/s)
    //   (3) A compaction of 1MB does 25MB of IO:
    //         1MB read from this level
    //         10-12MB read from next level (boundaries may be misaligned)
    //         10-12MB written to next level
    // This implies that 25 seeks cost the same as the compaction
    // of 1MB of data.  I.e., one seek costs approximately the
    // same as the compaction of 40KB of data.  We are a little
    // conservative and allow approximately one seek for every 16KB
    // of data before triggering a compaction.
    f->allowed_seeks = (f->file_size / 16384);
    if (f->allowed_seeks < 100) f->allowed_seeks = 100;

    levels_[level].deleted_files.erase(f->number);
    levels_[level].added_files->insert(f);
  }
}

看看 Compaction 做了哪些工作:

void DBImpl::BackgroundCompaction() {
  mutex_.AssertHeld();

  if (imm_ != NULL) {
    CompactMemTable();
    return;
  }
  // 这里去掉了 manual compaction 的代码 不关心
  Compaction* c = versions_->PickCompaction();

  Status status;
  if (c == NULL) {
    // Nothing to do
  } else if (c->IsTrivialMove()) {
    // Move file to next level
    // IsTrivialMove 返回 True 则直接将文件移入 level + 1 层即可
    assert(c->num_input_files(0) == 1);
    FileMetaData* f = c->input(0, 0);
    c->edit()->DeleteFile(c->level(), f->number);
    c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
                       f->smallest, f->largest);
    status = versions_->LogAndApply(c->edit(), &mutex_);
    if (!status.ok()) {
      RecordBackgroundError(status);
    }
  } else {
    CompactionState* compact = new CompactionState(c);
    status = DoCompactionWork(compact);
    if (!status.ok()) {
      RecordBackgroundError(status);
    }
    CleanupCompaction(compact);
    c->ReleaseInputs();
    DeleteObsoleteFiles();
  }
  delete c;

  if (status.ok()) {
    // Done
  } else if (shutting_down_.Acquire_Load()) {
    // Ignore compaction errors found during shutting down
  } else {
    Log(options_.info_log,
        "Compaction error: %s", status.ToString().c_str());
  }
}
Compaction* VersionSet::PickCompaction() {
  Compaction* c;
  int level;

  // We prefer compactions triggered by too much data in a level over
  // the compactions triggered by seeks.
  // 判断是 size_compaction 还是 seek_compaction
  const bool size_compaction = (current_->compaction_score_ >= 1);
  const bool seek_compaction = (current_->file_to_compact_ != NULL);
  if (size_compaction) {
    level = current_->compaction_level_;
    assert(level >= 0);
    assert(level+1 < config::kNumLevels);
    c = new Compaction(level);

    // Pick the first file that comes after compact_pointer_[level]
    // compact_pointer_[level] 记录上次 compact 时最大的 key
    for (size_t i = 0; i < current_->files_[level].size(); i++) {
      FileMetaData* f = current_->files_[level][i];
      if (compact_pointer_[level].empty() ||
          icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
        c->inputs_[0].push_back(f);
        break;
      }
    }
    if (c->inputs_[0].empty()) {
      // Wrap-around to the beginning of the key space
      c->inputs_[0].push_back(current_->files_[level][0]);
    }
  } else if (seek_compaction) {
    level = current_->file_to_compact_level_;
    c = new Compaction(level);
    c->inputs_[0].push_back(current_->file_to_compact_);
  } else {
    return NULL;
  }

  c->input_version_ = current_;
  c->input_version_->Ref();

  // Files in level 0 may overlap each other, so pick up all overlapping ones
  if (level == 0) {
    InternalKey smallest, largest;
    GetRange(c->inputs_[0], &smallest, &largest);
    // Note that the next call will discard the file we placed in
    // c->inputs_[0] earlier and replace it with an overlapping set
    // which will include the picked file.
    current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
    assert(!c->inputs_[0].empty());
  }

  // 填充 level + 1 的文件,更新 compact_pointer_ 
  SetupOtherInputs(c);

  return c;
}

IsTrivialMove 判断能否直接将文件移入 level + 1 层:

bool Compaction::IsTrivialMove() const {
  // Avoid a move if there is lots of overlapping grandparent data.
  // Otherwise, the move could create a parent file that will require
  // a very expensive merge later on.
  return (num_input_files(0) == 1 &&
          num_input_files(1) == 0 &&
          TotalFileSize(grandparents_) <= kMaxGrandParentOverlapBytes);
}

具体的合并操作在 DoCompactionWork 方法:

Status DBImpl::DoCompactionWork(CompactionState* compact) {
  if (snapshots_.empty()) {
    compact->smallest_snapshot = versions_->LastSequence();
  } else {
    compact->smallest_snapshot = snapshots_.oldest()->number_;
  }

  // Release mutex while we're actually doing the compaction work
  mutex_.Unlock();

  Iterator* input = versions_->MakeInputIterator(compact->compaction);
  input->SeekToFirst();
  Status status;
  ParsedInternalKey ikey;
  std::string current_user_key;
  bool has_current_user_key = false;
  SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
  for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
    // Prioritize immutable compaction work
    if (has_imm_.NoBarrier_Load() != NULL) {
      mutex_.Lock();
      if (imm_ != NULL) {
        CompactMemTable();  // 总是优先处理 CompactMemTable 避免阻塞 MemTable 的写入
        bg_cv_.SignalAll();  // Wakeup MakeRoomForWrite() if necessary
      }
      mutex_.Unlock();
    }

    Slice key = input->key();
    if (compact->compaction->ShouldStopBefore(key) &&
        compact->builder != NULL) {
      status = FinishCompactionOutputFile(compact, input);
      if (!status.ok()) {
        break;
      }
    }

    // Handle key/value, add to state, etc.
    bool drop = false;
    if (!ParseInternalKey(key, &ikey)) {
      // Do not hide error keys
      current_user_key.clear();
      has_current_user_key = false;
      last_sequence_for_key = kMaxSequenceNumber;
    } else {
      if (!has_current_user_key ||
          user_comparator()->Compare(ikey.user_key,
                                     Slice(current_user_key)) != 0) {
        // First occurrence of this user key
        current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
        has_current_user_key = true;
        last_sequence_for_key = kMaxSequenceNumber;
      }

      if (last_sequence_for_key <= compact->smallest_snapshot) {
        // Hidden by an newer entry for same user key
        drop = true;    // (A)
      } else if (ikey.type == kTypeDeletion &&
                 ikey.sequence <= compact->smallest_snapshot &&
                 compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
        // For this user key:
        // (1) there is no data in higher levels
        // (2) data in lower levels will have larger sequence numbers
        // (3) data in layers that are being compacted here and have
        //     smaller sequence numbers will be dropped in the next
        //     few iterations of this loop (by rule (A) above).
        // Therefore this deletion marker is obsolete and can be dropped.
        // 如果高层还有记录,则 kTypeDeletion 标记不能丢掉。
        // smallest_snapshot 主要是为了快照功能服务
        // 但 ikey.sequence <= compact->smallest_snapshot 这个判断没看懂
        drop = true;
      }

      last_sequence_for_key = ikey.sequence;
    }

    if (!drop) {
      // Open output file if necessary
      if (compact->builder == NULL) {
        status = OpenCompactionOutputFile(compact);
        if (!status.ok()) {
          break;
        }
      }
      if (compact->builder->NumEntries() == 0) {
        compact->current_output()->smallest.DecodeFrom(key);
      }
      compact->current_output()->largest.DecodeFrom(key);
      compact->builder->Add(key, input->value());

      // Close output file if it is big enough
      if (compact->builder->FileSize() >=
          compact->compaction->MaxOutputFileSize()) {
        status = FinishCompactionOutputFile(compact, input);  // 输出新的 SST 文件
        if (!status.ok()) {
          break;
        }
      }
    }

    input->Next();
  }

  // 中间省略一坨代码

  mutex_.Lock();
  stats_[compact->compaction->level() + 1].Add(stats);

  if (status.ok()) {
    status = InstallCompactionResults(compact);
  }

  return status;
}

最后调用 InstallCompactionResults,记录版本变化:

Status DBImpl::InstallCompactionResults(CompactionState* compact) {
  mutex_.AssertHeld();
  Log(options_.info_log,  "Compacted %d@%d + %d@%d files => %lld bytes",
      compact->compaction->num_input_files(0),
      compact->compaction->level(),
      compact->compaction->num_input_files(1),
      compact->compaction->level() + 1,
      static_cast<long long>(compact->total_bytes));

  // Add compaction outputs
  compact->compaction->AddInputDeletions(compact->compaction->edit());
  const int level = compact->compaction->level();
  for (size_t i = 0; i < compact->outputs.size(); i++) {
    const CompactionState::Output& out = compact->outputs[i];
    compact->compaction->edit()->AddFile(
        level + 1,
        out.number, out.file_size, out.smallest, out.largest);
  }
  return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
}
最近的文章

游戏开发之状态机

这阵子工作的内容有用到状态机,感觉挺有意思。正好好久没写博客了,今天也来写一篇总结下。前言用状态机来实现业务模型,有以下几点好处: 不需要写一大坨 if-else 或 switch case。代码逻辑结构清晰,也更便于调试 代码阅读起来更加友好,方便其他读者理解整个业务逻辑状态机可以划分为下面三个模块: 状态集:总共包括哪些状态 事件(条件):事件会触发状态机的状态发生变化 动作:事件发生后执行的动作,可以变迁到新状态,也可以维持当前状态实现一个简单的状态机的实现// examp...…

工作继续阅读
更早的文章

链接和装载

本文是读《程序员的自我修养: 链接、装载与库》所整理的读书笔记。概论从源文件到可执行文件,可以分解为四个过程:预处理,编译,汇编,链接。预处理主要完成以下工作: 展开所有宏定义,删除 #define 处理所有条件预编译指令 处理 #include 预编译指令。将被包含的文件插入到该预编译指令的位置 删除所有注释 添加行号和文件名标识,以便编译时编辑器产生调试用的行号信息以及编译产生错误或警告时的行号信息 保留所有 #pragma 编译器指令编译的过程即把预处理完的文件进行一系列...…

读书笔记继续阅读