1use std::sync::{Arc, RwLock};
27use std::time::Duration;
28
29use common_telemetry::info;
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::SequenceNumber;
32
33use crate::error::Result;
34use crate::manifest::action::RegionEdit;
35use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
36use crate::memtable::version::{MemtableVersion, MemtableVersionRef};
37use crate::memtable::{MemtableBuilderRef, MemtableId};
38use crate::region::options::RegionOptions;
39use crate::sst::file::FileMeta;
40use crate::sst::file_purger::FilePurgerRef;
41use crate::sst::version::{SstVersion, SstVersionRef};
42use crate::wal::EntryId;
43
44#[derive(Debug)]
49pub(crate) struct VersionControl {
50 data: RwLock<VersionControlData>,
51}
52
53impl VersionControl {
54 pub(crate) fn new(version: Version) -> VersionControl {
56 let (flushed_sequence, flushed_entry_id) =
58 (version.flushed_sequence, version.flushed_entry_id);
59 VersionControl {
60 data: RwLock::new(VersionControlData {
61 version: Arc::new(version),
62 committed_sequence: flushed_sequence,
63 last_entry_id: flushed_entry_id,
64 is_dropped: false,
65 }),
66 }
67 }
68
69 pub(crate) fn current(&self) -> VersionControlData {
71 self.data.read().unwrap().clone()
72 }
73
74 pub(crate) fn set_sequence_and_entry_id(&self, seq: SequenceNumber, entry_id: EntryId) {
76 let mut data = self.data.write().unwrap();
77 data.committed_sequence = seq;
78 data.last_entry_id = entry_id;
79 }
80
81 pub(crate) fn committed_sequence(&self) -> SequenceNumber {
83 self.data.read().unwrap().committed_sequence
84 }
85
86 pub(crate) fn freeze_mutable(&self) -> Result<()> {
88 let version = self.current().version;
89 let time_window = version.compaction_time_window;
90
91 let Some(new_memtables) = version
92 .memtables
93 .freeze_mutable(&version.metadata, time_window)?
94 else {
95 return Ok(());
96 };
97
98 let new_version = Arc::new(
100 VersionBuilder::from_version(version)
101 .memtables(new_memtables)
102 .build(),
103 );
104
105 let mut version_data = self.data.write().unwrap();
106 version_data.version = new_version;
107
108 Ok(())
109 }
110
111 pub(crate) fn alter_options(&self, options: RegionOptions) {
113 let version = self.current().version;
114 let new_version = Arc::new(
115 VersionBuilder::from_version(version)
116 .options(options)
117 .build(),
118 );
119 let mut version_data = self.data.write().unwrap();
120 version_data.version = new_version;
121 }
122
123 pub(crate) fn apply_edit(
125 &self,
126 edit: RegionEdit,
127 memtables_to_remove: &[MemtableId],
128 purger: FilePurgerRef,
129 ) {
130 let version = self.current().version;
131 let new_version = Arc::new(
132 VersionBuilder::from_version(version)
133 .apply_edit(edit, purger)
134 .remove_memtables(memtables_to_remove)
135 .build(),
136 );
137
138 let mut version_data = self.data.write().unwrap();
139 version_data.version = new_version;
140 }
141
142 pub(crate) fn mark_dropped(&self, memtable_builder: &MemtableBuilderRef) {
144 let version = self.current().version;
145 let part_duration = version.memtables.mutable.part_duration();
146 let next_memtable_id = version.memtables.mutable.next_memtable_id();
147 let new_mutable = Arc::new(TimePartitions::new(
148 version.metadata.clone(),
149 memtable_builder.clone(),
150 next_memtable_id,
151 part_duration,
152 ));
153
154 let mut data = self.data.write().unwrap();
155 data.is_dropped = true;
156 data.version.ssts.mark_all_deleted();
157 let new_version =
159 Arc::new(VersionBuilder::new(version.metadata.clone(), new_mutable).build());
160 data.version = new_version;
161 }
162
163 pub(crate) fn alter_schema(&self, metadata: RegionMetadataRef, builder: &MemtableBuilderRef) {
168 let version = self.current().version;
169 let part_duration = version.memtables.mutable.part_duration();
170 let next_memtable_id = version.memtables.mutable.next_memtable_id();
171 let new_mutable = Arc::new(TimePartitions::new(
172 metadata.clone(),
173 builder.clone(),
174 next_memtable_id,
175 part_duration,
176 ));
177 debug_assert!(version.memtables.mutable.is_empty());
178 debug_assert!(version.memtables.immutables().is_empty());
179 let new_version = Arc::new(
180 VersionBuilder::from_version(version)
181 .metadata(metadata)
182 .memtables(MemtableVersion::new(new_mutable))
183 .build(),
184 );
185
186 let mut version_data = self.data.write().unwrap();
187 version_data.version = new_version;
188 }
189
190 pub(crate) fn truncate(
192 &self,
193 truncated_entry_id: EntryId,
194 truncated_sequence: SequenceNumber,
195 memtable_builder: &MemtableBuilderRef,
196 ) {
197 let version = self.current().version;
198
199 let part_duration = version.memtables.mutable.part_duration();
200 let next_memtable_id = version.memtables.mutable.next_memtable_id();
201 let new_mutable = Arc::new(TimePartitions::new(
202 version.metadata.clone(),
203 memtable_builder.clone(),
204 next_memtable_id,
205 part_duration,
206 ));
207 let new_version = Arc::new(
208 VersionBuilder::new(version.metadata.clone(), new_mutable)
209 .flushed_entry_id(truncated_entry_id)
210 .flushed_sequence(truncated_sequence)
211 .truncated_entry_id(Some(truncated_entry_id))
212 .build(),
213 );
214
215 let mut version_data = self.data.write().unwrap();
216 version_data.version.ssts.mark_all_deleted();
217 version_data.version = new_version;
218 }
219
220 pub(crate) fn overwrite_current(&self, version: VersionRef) {
222 let mut version_data = self.data.write().unwrap();
223 version_data.version = version;
224 }
225}
226
227pub(crate) type VersionControlRef = Arc<VersionControl>;
228
229#[derive(Debug, Clone)]
231pub(crate) struct VersionControlData {
232 pub(crate) version: VersionRef,
234 pub(crate) committed_sequence: SequenceNumber,
238 pub(crate) last_entry_id: EntryId,
242 pub(crate) is_dropped: bool,
244}
245
246#[derive(Clone, Debug)]
248pub(crate) struct Version {
249 pub(crate) metadata: RegionMetadataRef,
254 pub(crate) memtables: MemtableVersionRef,
258 pub(crate) ssts: SstVersionRef,
260 pub(crate) flushed_entry_id: EntryId,
262 pub(crate) flushed_sequence: SequenceNumber,
264 pub(crate) truncated_entry_id: Option<EntryId>,
268 pub(crate) compaction_time_window: Option<Duration>,
273 pub(crate) options: RegionOptions,
275}
276
277pub(crate) type VersionRef = Arc<Version>;
278
279pub(crate) struct VersionBuilder {
281 metadata: RegionMetadataRef,
282 memtables: MemtableVersionRef,
283 ssts: SstVersionRef,
284 flushed_entry_id: EntryId,
285 flushed_sequence: SequenceNumber,
286 truncated_entry_id: Option<EntryId>,
287 compaction_time_window: Option<Duration>,
288 options: RegionOptions,
289}
290
291impl VersionBuilder {
292 pub(crate) fn new(metadata: RegionMetadataRef, mutable: TimePartitionsRef) -> Self {
294 VersionBuilder {
295 metadata,
296 memtables: Arc::new(MemtableVersion::new(mutable)),
297 ssts: Arc::new(SstVersion::new()),
298 flushed_entry_id: 0,
299 flushed_sequence: 0,
300 truncated_entry_id: None,
301 compaction_time_window: None,
302 options: RegionOptions::default(),
303 }
304 }
305
306 pub(crate) fn from_version(version: VersionRef) -> Self {
308 VersionBuilder {
309 metadata: version.metadata.clone(),
310 memtables: version.memtables.clone(),
311 ssts: version.ssts.clone(),
312 flushed_entry_id: version.flushed_entry_id,
313 flushed_sequence: version.flushed_sequence,
314 truncated_entry_id: version.truncated_entry_id,
315 compaction_time_window: version.compaction_time_window,
316 options: version.options.clone(),
317 }
318 }
319
320 pub(crate) fn memtables(mut self, memtables: MemtableVersion) -> Self {
322 self.memtables = Arc::new(memtables);
323 self
324 }
325
326 pub(crate) fn metadata(mut self, metadata: RegionMetadataRef) -> Self {
328 self.metadata = metadata;
329 self
330 }
331
332 pub(crate) fn flushed_entry_id(mut self, entry_id: EntryId) -> Self {
334 self.flushed_entry_id = entry_id;
335 self
336 }
337
338 pub(crate) fn flushed_sequence(mut self, sequence: SequenceNumber) -> Self {
340 self.flushed_sequence = sequence;
341 self
342 }
343
344 pub(crate) fn truncated_entry_id(mut self, entry_id: Option<EntryId>) -> Self {
346 self.truncated_entry_id = entry_id;
347 self
348 }
349
350 pub(crate) fn compaction_time_window(mut self, window: Option<Duration>) -> Self {
352 self.compaction_time_window = window;
353 self
354 }
355
356 pub(crate) fn options(mut self, options: RegionOptions) -> Self {
358 self.options = options;
359 self
360 }
361
362 pub(crate) fn apply_edit(mut self, edit: RegionEdit, file_purger: FilePurgerRef) -> Self {
364 if let Some(entry_id) = edit.flushed_entry_id {
365 self.flushed_entry_id = self.flushed_entry_id.max(entry_id);
366 }
367 if let Some(sequence) = edit.flushed_sequence {
368 self.flushed_sequence = self.flushed_sequence.max(sequence);
369 }
370 if let Some(window) = edit.compaction_time_window {
371 self.compaction_time_window = Some(window);
372 }
373 if !edit.files_to_add.is_empty() || !edit.files_to_remove.is_empty() {
374 let mut ssts = (*self.ssts).clone();
375 ssts.add_files(file_purger, edit.files_to_add.into_iter());
376 ssts.remove_files(edit.files_to_remove.into_iter());
377 self.ssts = Arc::new(ssts);
378 }
379
380 self
381 }
382
383 pub(crate) fn remove_memtables(mut self, ids: &[MemtableId]) -> Self {
385 if !ids.is_empty() {
386 let mut memtables = (*self.memtables).clone();
387 memtables.remove_memtables(ids);
388 self.memtables = Arc::new(memtables);
389 }
390 self
391 }
392
393 pub(crate) fn add_files(
395 mut self,
396 file_purger: FilePurgerRef,
397 files: impl Iterator<Item = FileMeta>,
398 ) -> Self {
399 let mut ssts = (*self.ssts).clone();
400 ssts.add_files(file_purger, files);
401 self.ssts = Arc::new(ssts);
402
403 self
404 }
405
406 pub(crate) fn build(self) -> Version {
409 let compaction_time_window = self
410 .options
411 .compaction
412 .time_window()
413 .or(self.compaction_time_window);
414 if self.compaction_time_window.is_some()
415 && compaction_time_window != self.compaction_time_window
416 {
417 info!(
418 "VersionBuilder overwrites region compaction time window from {:?} to {:?}, region: {}",
419 self.compaction_time_window,
420 compaction_time_window,
421 self.metadata.region_id
422 );
423 }
424
425 Version {
426 metadata: self.metadata,
427 memtables: self.memtables,
428 ssts: self.ssts,
429 flushed_entry_id: self.flushed_entry_id,
430 flushed_sequence: self.flushed_sequence,
431 truncated_entry_id: self.truncated_entry_id,
432 compaction_time_window,
433 options: self.options,
434 }
435 }
436}