这篇文章主要记录 LevelDB 的重要模块、类以及方法。把读写操作和 Compaction 操作的代码串了一遍,并添加了小部分注释。
模块
Log 文件
客户端的写请求会先 append 到 Log 文件,成功后再写入到 Memtable。如果宕机可以通过 Log 文件来恢复 Memtable。
Memtable 和 Immutable Memtable
内存数据结构,基于跳表。客户端的读写请求都会由 Memtable 处理。 当 Memtable 占用的内存达到一定阈值,重新生成新的 Memtable 处理客户端请求。原来的 Memtable 转成 Immutable Memtable,等待归并到 SST 文件中。
SST 文件
落地到磁盘的存储文件。SST 分为不同的 level,具体参考文档。
Manifest 文件
Manifest 记录不同 level 的 SST 文件,包括每个 SST 文件的 key range、大小等 metadata。
Current 文件
Current 记录了最新的 Manifest 文件。
类成员变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
| class DBImpl : public DB {
private:
TableCache* table_cache_;
MemTable* mem_;
MemTable* imm_;
WritableFile* logfile_;
log::Writer* log_;
std::deque<Writer*> writers_;
VersionSet* versions_;
// Set of table files to protect from deletion because they are
// part of ongoing compactions.
std::set<uint64_t> pending_outputs_;
};
class MemTable {
private:
typedef SkipList<const char*, KeyComparator> Table;
Arena arena_; // 内存池
Table table_; // 跳表
};
struct FileMetaData {
int refs;
int allowed_seek; // seeks allowed until compaction
uint64_t number; // ??
uint64_t file_size;
InternalKey smallest;
InternalKey largest;
};
class VersionEdit {
private:
typedef std::set< std::pair<int, uint64_t> > DeletedFileSet;
std::vector< std::pair<int, InternalKey> > compact_pointers_;
DeletedFileSet deleted_files_;
std::vector< std::pair<int, FileMetaData> > new_files_;
};
class Version {
public:
Status Get(const ReadOptions&, const LookupKey& key, std::string* val,
GetStats* stats);
private:
VersionSet* vset_;
Version* next_;
Version* prev_;
// list of files per level
std::vector<FileMetaData*> files_[config::kNumLevels];
};
class TableCache {
public:
Status Get(const ReadOptions& options, uint64_t file_number,
uint64_t file_size, const Slice& k, void *arg,
void (*handle_result)(void*, const Slice&, const Slice&));
private:
Cache* cache_;
};
class VersionSet {
private:
TableCache* const table_cache_;
WritableFile* descriptor_file_;
log::Writer* descriptor_log_;
Version dummy_versions_; // Head of circurlar doubly-linked list of versions
Version* current_; // == dummy_versions_.prev_
};
class WriteBatch {
public:
class Handler {
public:
virtual ~Handler();
virtual void Put(const Slice& key, const Slice& value) = 0;
virtual void Delete(const Slice& key) = 0;
};
private:
friend class WriteBatchInternal;
std::string req_;
}
struct DBImpl::Writer {
Status status;
WriteBatch* batch;
bool sync;
bool done;
port::CondVar cv;
expplicit Writer(port::Mutex* mu) : cv(mu) { }
};
class Compaction {
private:
Version* input_version_;
VersionEdit edit_;
// Each compaction reads inputs from "level_" and "level_+1"
std::vector<FileMetaData*> inputs_[2]; // The two sets of inputs
// State used to check for number of of overlapping grandparent files
// (parent == level_ + 1, grandparent == level_ + 2)
std::vector<FileMetaData*> grandparents_;
size_t grandparent_index_; // Index in grandparent_starts_
bool seen_key_; // Some output key has been seen
int64_t overlapped_bytes_; // Bytes of overlap between current output
// and grandparent files
// level_ptrs_ holds indices into input_version_->levels_: our state
// is that we are positioned at one of the file ranges for each
// higher level than the ones involved in this compaction (i.e. for
// all L >= level_ + 2).
size_t level_ptrs_[config::kNumLevels];
};
|
主要操作
读操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| Status DBImpl::Get(const ReadOptions& options,
const Slice& key,
std::string* value) {
MutexLock l(&mutex_);
MemTable* mem = mem_;
MemTable* imm = imm_;
Version* current = versions_->current();
bool have_stat_update = false;
Version::GetStats stats;
// Unlock while reading from files and memtables
{
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s)) { // 1)先在 MemTable 中查找
// Done
} else if (imm != NULL && imm->Get(lkey, value, &s)) { // 2)再在 Imutable MemTable 中查找
// Done
} else {
s = current->Get(options, lkey, value, &stats); // 3) 最后在当前 Version 中查找
have_stat_update = true;
}
mutex_.Lock();
}
// UpdateStats 减去 allowed_seeks,如果小于等于 0,则设置 file_to_compact_,准备 compaction
if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction();
}
return s;
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
| // Version 类的 Get 方法
Status Version::Get(const ReadOptions& options,
const LookupKey& k,
std::string* value,
GetStats* stats) {
Slice ikey = k.internal_key();
Slice user_key = k.user_key();
const Comparator* ucmp = vset_->icmp_.user_comparator();
Status s;
stats->seek_file = NULL;
stats->seek_file_level = -1;
FileMetaData* last_file_read = NULL;
int last_file_read_level = -1;
// We can search level-by-level since entries never hop across
// levels. Therefore we are guaranteed that if we find data
// in an smaller level, later levels are irrelevant.
std::vector<FileMetaData*> tmp;
FileMetaData* tmp2;
for (int level = 0; level < config::kNumLevels; level++) {
size_t num_files = files_[level].size();
if (num_files == 0) continue;
// 这里省略一大段代码 files 指向候选文件列表,num_files 为列表的长度。具体实现看源码
for (uint32_t i = 0; i < num_files; ++i) {
if (last_file_read != NULL && stats->seek_file == NULL) {
// We have had more than one seek for this read. Charge the 1st file.
// last_file_read 保存的其实就是第一个查找未命中的文件,函数返回后会调用 UpdateStats 来减去 allowed_seeks
stats->seek_file = last_file_read;
stats->seek_file_level = last_file_read_level;
}
FileMetaData* f = files[i];
last_file_read = f;
last_file_read_level = level;
Saver saver;
saver.state = kNotFound;
saver.ucmp = ucmp;
saver.user_key = user_key;
saver.value = value;
// 从 TableCache 中读取文件内容
s = vset_->table_cache_->Get(options, f->number, f->file_size,
ikey, &saver, SaveValue);
if (!s.ok()) {
return s;
}
switch (saver.state) {
case kNotFound:
break; // Keep searching in other files
case kFound:
return s;
case kDeleted:
s = Status::NotFound(Slice()); // Use empty error message for speed
return s;
case kCorrupt:
s = Status::Corruption("corrupted key for ", user_key);
return s;
}
}
}
return Status::NotFound(Slice()); // Use an empty error message for speed
}
|
1
2
3
4
5
6
7
8
9
10
11
12
| Status TableCache::Get(const ReadOptions& options, uint64_t file_number,
uint64_t file_size, const Slice& k, void *arg,
void (*saver)(void* const Slice&, const Slice&)) {
Cache::Handle* handle = NULL;
Status s = FindTable(file_number, file_size, &handle);
if (s.ok()) {
Table* t = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table;
s = t->InternalGet(options, k, arg, saver);
cache_->Release(handle);
}
return s;
}
|
查找顺序如下图:
写操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
| Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Put(key, value);
return Write(opt, &batch);
}
Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
Writer w(&mutex_);
w.batch = my_batch;
w.sync = options.sync;
w.done = false;
MutexLock l(&mutex_);
writers_.push_back(&w);
// 生产者消费者模型
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
// 写操作有可能被合并处理,因此有可能取到的时候写入已经完成。完成的话直接返回
if (w.done) {
return w.status;
}
// May temporarily unlock and wait.
// MakeRoomForWrite 判断是非需要归并 memtable
Status status = MakeRoomForWrite(my_batch == NULL);
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
if (status.ok() && my_batch != NULL) { // NULL batch is for compactions
WriteBatch* updates = BuildBatchGroup(&last_writer); // 合并写操作
WriteBatchInternal::SetSequence(updates, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(updates);
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into mem_.
{
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(updates));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(updates, mem_);
}
mutex_.Lock();
if (sync_error) {
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
RecordBackgroundError(status);
}
}
if (updates == tmp_batch_) tmp_batch_->Clear();
versions_->SetLastSequence(last_sequence);
}
while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}
// Notify new head of write queue
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}
return status;
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
| // REQUIRES: Writer list must be non-empty
// REQUIRES: First writer must have a non-NULL batch
// 尝试合并写操作
WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
assert(!writers_.empty());
Writer* first = writers_.front();
WriteBatch* result = first->batch;
assert(result != NULL);
size_t size = WriteBatchInternal::ByteSize(first->batch);
// Allow the group to grow up to a maximum size, but if the
// original write is small, limit the growth so we do not slow
// down the small write too much.
size_t max_size = 1 << 20;
if (size <= (128<<10)) {
max_size = size + (128<<10);
}
*last_writer = first;
std::deque<Writer*>::iterator iter = writers_.begin();
++iter; // Advance past "first"
for (; iter != writers_.end(); ++iter) {
Writer* w = *iter;
if (w->sync && !first->sync) {
// Do not include a sync write into a batch handled by a non-sync write.
break;
}
if (w->batch != NULL) {
size += WriteBatchInternal::ByteSize(w->batch);
if (size > max_size) {
// Do not make batch too big
break;
}
// Append to *result
// 把合并的写请求保存在成员变量 tmp_batch_ 中,避免和调用者的写请求混淆在一起
if (result == first->batch) {
// Switch to temporary batch instead of disturbing caller's batch
result = tmp_batch_;
assert(WriteBatchInternal::Count(result) == 0);
WriteBatchInternal::Append(result, first->batch);
}
WriteBatchInternal::Append(result, w->batch);
}
*last_writer = w;
}
return result;
}
|
1
2
3
4
5
6
7
| Status WriteBatchInternal::InsertInto(const WriteBatch* b,
MemTable* memtable) {
MemTableInserter inserter;
inserter.sequence_ = WriteBatchInternal::Sequence(b);
inserter.mem_ = memtable;
return b->Iterate(&inserter);
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| Status WriteBatch::Iterate(Handler* handler) const {
Slice input(rep_);
if (input.size() < kHeader) {
return Status::Corruption("malformed WriteBatch (too small)");
}
input.remove_prefix(kHeader);
Slice key, value;
int found = 0;
while (!input.empty()) {
found++;
char tag = input[0];
input.remove_prefix(1);
switch (tag) {
case kTypeValue:
if (GetLengthPrefixedSlice(&input, &key) &&
GetLengthPrefixedSlice(&input, &value)) {
handler->Put(key, value);
} else {
return Status::Corruption("bad WriteBatch Put");
}
break;
case kTypeDeletion:
if (GetLengthPrefixedSlice(&input, &key)) {
handler->Delete(key);
} else {
return Status::Corruption("bad WriteBatch Delete");
}
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
}
if (found != WriteBatchInternal::Count(this)) {
return Status::Corruption("WriteBatch has wrong count");
} else {
return Status::OK();
}
}
|
Compaction
Compaction 触发时机:
- Immutable MemTable 不为空
- 指定了 Manual Compaction
- VersionSet NeedsCompaction 返回 True
compaction_score_
大于 1
file_to_compact_
不为空
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (bg_compaction_scheduled_) {
// Already scheduled
} else if (shutting_down_.Acquire_Load()) {
// DB is being deleted; no more background compactions
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
} else if (imm_ == NULL &&
manual_compaction_ == NULL &&
!versions_->NeedsCompaction()) {
// No work to be done
} else {
bg_compaction_scheduled_ = true;
env_->Schedule(&DBImpl::BGWork, this);
}
}
bool VersionSet::NeedsCompaction() const {
Version* v = current_;
return (v->compaction_score_ >= 1) || (v->file_to_compact_ != NULL);
}
|
compaction_score_ 的计算如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| void VersionSet::Finalize(Version* v) {
// Precomputed best level for next compaction
int best_level = -1;
double best_score = -1;
for (int level = 0; level < config::kNumLevels-1; level++) {
double score;
if (level == 0) {
// We treat level-0 specially by bounding the number of files
// instead of number of bytes for two reasons:
//
// (1) With larger write-buffer sizes, it is nice not to do too
// many level-0 compactions.
//
// (2) The files in level-0 are merged on every read and
// therefore we wish to avoid too many files when the individual
// file size is small (perhaps because of a small write-buffer
// setting, or very high compression ratios, or lots of
// overwrites/deletions).
score = v->files_[level].size() /
static_cast<double>(config::kL0_CompactionTrigger);
} else {
// Compute the ratio of current size to size limit.
const uint64_t level_bytes = TotalFileSize(v->files_[level]);
score = static_cast<double>(level_bytes) / MaxBytesForLevel(level);
}
if (score > best_score) {
best_level = level;
best_score = score;
}
}
v->compaction_level_ = best_level;
v->compaction_score_ = best_score;
}
|
file_to_compact_ 则是由 allowed_seeks 来控制。从下面代码的注释可知 25 次 seek 的开销和一次 compaction 的开销差不多。allowed_seeks 可以理解为文件剩余查找次数,每次查找失败allowed_seeks 就会减 1。当 allowed_seeks 小于等于 0,意味着应该启动 compaction 来减少查找未命中带来的 seek 的开销了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
| bool Version::UpdateStats(const GetStats& stats) {
FileMetaData* f = stats.seek_file;
if (f != NULL) {
f->allowed_seeks--;
if (f->allowed_seeks <= 0 && file_to_compact_ == NULL) {
file_to_compact_ = f;
file_to_compact_level_ = stats.seek_file_level;
return true;
}
}
return false;
}
// Apply all of the edits in *edit to the current state.
void Builder::Apply(VersionEdit* edit) {
// Update compaction pointers
for (size_t i = 0; i < edit->compact_pointers_.size(); i++) {
const int level = edit->compact_pointers_[i].first;
vset_->compact_pointer_[level] =
edit->compact_pointers_[i].second.Encode().ToString();
}
// Delete files
const VersionEdit::DeletedFileSet& del = edit->deleted_files_;
for (VersionEdit::DeletedFileSet::const_iterator iter = del.begin();
iter != del.end();
++iter) {
const int level = iter->first;
const uint64_t number = iter->second;
levels_[level].deleted_files.insert(number);
}
// Add new files
for (size_t i = 0; i < edit->new_files_.size(); i++) {
const int level = edit->new_files_[i].first;
FileMetaData* f = new FileMetaData(edit->new_files_[i].second);
f->refs = 1;
// We arrange to automatically compact this file after
// a certain number of seeks. Let's assume:
// (1) One seek costs 10ms
// (2) Writing or reading 1MB costs 10ms (100MB/s)
// (3) A compaction of 1MB does 25MB of IO:
// 1MB read from this level
// 10-12MB read from next level (boundaries may be misaligned)
// 10-12MB written to next level
// This implies that 25 seeks cost the same as the compaction
// of 1MB of data. I.e., one seek costs approximately the
// same as the compaction of 40KB of data. We are a little
// conservative and allow approximately one seek for every 16KB
// of data before triggering a compaction.
f->allowed_seeks = (f->file_size / 16384);
if (f->allowed_seeks < 100) f->allowed_seeks = 100;
levels_[level].deleted_files.erase(f->number);
levels_[level].added_files->insert(f);
}
}
|
看看 Compaction 做了哪些工作:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
| void DBImpl::BackgroundCompaction() {
mutex_.AssertHeld();
if (imm_ != NULL) {
CompactMemTable();
return;
}
// 这里去掉了 manual compaction 的代码 不关心
Compaction* c = versions_->PickCompaction();
Status status;
if (c == NULL) {
// Nothing to do
} else if (c->IsTrivialMove()) {
// Move file to next level
// IsTrivialMove 返回 True 则直接将文件移入 level + 1 层即可
assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0);
c->edit()->DeleteFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_);
if (!status.ok()) {
RecordBackgroundError(status);
}
} else {
CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact);
if (!status.ok()) {
RecordBackgroundError(status);
}
CleanupCompaction(compact);
c->ReleaseInputs();
DeleteObsoleteFiles();
}
delete c;
if (status.ok()) {
// Done
} else if (shutting_down_.Acquire_Load()) {
// Ignore compaction errors found during shutting down
} else {
Log(options_.info_log,
"Compaction error: %s", status.ToString().c_str());
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
| Compaction* VersionSet::PickCompaction() {
Compaction* c;
int level;
// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
// 判断是 size_compaction 还是 seek_compaction
const bool size_compaction = (current_->compaction_score_ >= 1);
const bool seek_compaction = (current_->file_to_compact_ != NULL);
if (size_compaction) {
level = current_->compaction_level_;
assert(level >= 0);
assert(level+1 < config::kNumLevels);
c = new Compaction(level);
// Pick the first file that comes after compact_pointer_[level]
// compact_pointer_[level] 记录上次 compact 时最大的 key
for (size_t i = 0; i < current_->files_[level].size(); i++) {
FileMetaData* f = current_->files_[level][i];
if (compact_pointer_[level].empty() ||
icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
c->inputs_[0].push_back(f);
break;
}
}
if (c->inputs_[0].empty()) {
// Wrap-around to the beginning of the key space
c->inputs_[0].push_back(current_->files_[level][0]);
}
} else if (seek_compaction) {
level = current_->file_to_compact_level_;
c = new Compaction(level);
c->inputs_[0].push_back(current_->file_to_compact_);
} else {
return NULL;
}
c->input_version_ = current_;
c->input_version_->Ref();
// Files in level 0 may overlap each other, so pick up all overlapping ones
if (level == 0) {
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
// Note that the next call will discard the file we placed in
// c->inputs_[0] earlier and replace it with an overlapping set
// which will include the picked file.
current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
assert(!c->inputs_[0].empty());
}
// 填充 level + 1 的文件,更新 compact_pointer_
SetupOtherInputs(c);
return c;
}
|
IsTrivialMove 判断能否直接将文件移入 level + 1 层:
1
2
3
4
5
6
7
8
| bool Compaction::IsTrivialMove() const {
// Avoid a move if there is lots of overlapping grandparent data.
// Otherwise, the move could create a parent file that will require
// a very expensive merge later on.
return (num_input_files(0) == 1 &&
num_input_files(1) == 0 &&
TotalFileSize(grandparents_) <= kMaxGrandParentOverlapBytes);
}
|
具体的合并操作在 DoCompactionWork 方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
| Status DBImpl::DoCompactionWork(CompactionState* compact) {
if (snapshots_.empty()) {
compact->smallest_snapshot = versions_->LastSequence();
} else {
compact->smallest_snapshot = snapshots_.oldest()->number_;
}
// Release mutex while we're actually doing the compaction work
mutex_.Unlock();
Iterator* input = versions_->MakeInputIterator(compact->compaction);
input->SeekToFirst();
Status status;
ParsedInternalKey ikey;
std::string current_user_key;
bool has_current_user_key = false;
SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
// Prioritize immutable compaction work
if (has_imm_.NoBarrier_Load() != NULL) {
mutex_.Lock();
if (imm_ != NULL) {
CompactMemTable(); // 总是优先处理 CompactMemTable 避免阻塞 MemTable 的写入
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
}
mutex_.Unlock();
}
Slice key = input->key();
if (compact->compaction->ShouldStopBefore(key) &&
compact->builder != NULL) {
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
break;
}
}
// Handle key/value, add to state, etc.
bool drop = false;
if (!ParseInternalKey(key, &ikey)) {
// Do not hide error keys
current_user_key.clear();
has_current_user_key = false;
last_sequence_for_key = kMaxSequenceNumber;
} else {
if (!has_current_user_key ||
user_comparator()->Compare(ikey.user_key,
Slice(current_user_key)) != 0) {
// First occurrence of this user key
current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
has_current_user_key = true;
last_sequence_for_key = kMaxSequenceNumber;
}
if (last_sequence_for_key <= compact->smallest_snapshot) {
// Hidden by an newer entry for same user key
drop = true; // (A)
} else if (ikey.type == kTypeDeletion &&
ikey.sequence <= compact->smallest_snapshot &&
compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
// For this user key:
// (1) there is no data in higher levels
// (2) data in lower levels will have larger sequence numbers
// (3) data in layers that are being compacted here and have
// smaller sequence numbers will be dropped in the next
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
// 如果高层还有记录,则 kTypeDeletion 标记不能丢掉。
// smallest_snapshot 主要是为了快照功能服务
// 但 ikey.sequence <= compact->smallest_snapshot 这个判断没看懂
drop = true;
}
last_sequence_for_key = ikey.sequence;
}
if (!drop) {
// Open output file if necessary
if (compact->builder == NULL) {
status = OpenCompactionOutputFile(compact);
if (!status.ok()) {
break;
}
}
if (compact->builder->NumEntries() == 0) {
compact->current_output()->smallest.DecodeFrom(key);
}
compact->current_output()->largest.DecodeFrom(key);
compact->builder->Add(key, input->value());
// Close output file if it is big enough
if (compact->builder->FileSize() >=
compact->compaction->MaxOutputFileSize()) {
status = FinishCompactionOutputFile(compact, input); // 输出新的 SST 文件
if (!status.ok()) {
break;
}
}
}
input->Next();
}
// 中间省略一坨代码
mutex_.Lock();
stats_[compact->compaction->level() + 1].Add(stats);
if (status.ok()) {
status = InstallCompactionResults(compact);
}
return status;
}
|
最后调用 InstallCompactionResults,记录版本变化:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| Status DBImpl::InstallCompactionResults(CompactionState* compact) {
mutex_.AssertHeld();
Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
compact->compaction->num_input_files(0),
compact->compaction->level(),
compact->compaction->num_input_files(1),
compact->compaction->level() + 1,
static_cast<long long>(compact->total_bytes));
// Add compaction outputs
compact->compaction->AddInputDeletions(compact->compaction->edit());
const int level = compact->compaction->level();
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
compact->compaction->edit()->AddFile(
level + 1,
out.number, out.file_size, out.smallest, out.largest);
}
return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
}
|