1use std::sync::{Arc, RwLock};
27use std::time::Duration;
28
29use common_telemetry::info;
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::SequenceNumber;
32
33use crate::error::Result;
34use crate::manifest::action::{RegionEdit, TruncateKind};
35use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
36use crate::memtable::version::{MemtableVersion, MemtableVersionRef};
37use crate::memtable::{MemtableBuilderRef, MemtableId};
38use crate::region::options::RegionOptions;
39use crate::sst::file::FileMeta;
40use crate::sst::file_purger::FilePurgerRef;
41use crate::sst::version::{SstVersion, SstVersionRef};
42use crate::wal::EntryId;
43
44#[derive(Debug)]
49pub(crate) struct VersionControl {
50 data: RwLock<VersionControlData>,
51}
52
53impl VersionControl {
54 pub(crate) fn new(version: Version) -> VersionControl {
56 let (flushed_sequence, flushed_entry_id) =
58 (version.flushed_sequence, version.flushed_entry_id);
59 VersionControl {
60 data: RwLock::new(VersionControlData {
61 version: Arc::new(version),
62 committed_sequence: flushed_sequence,
63 last_entry_id: flushed_entry_id,
64 is_dropped: false,
65 }),
66 }
67 }
68
69 pub(crate) fn current(&self) -> VersionControlData {
71 self.data.read().unwrap().clone()
72 }
73
74 pub(crate) fn set_committed_sequence(&self, seq: SequenceNumber) {
76 let mut data = self.data.write().unwrap();
77 data.committed_sequence = seq;
78 }
79
80 pub(crate) fn set_sequence_and_entry_id(&self, seq: SequenceNumber, entry_id: EntryId) {
82 let mut data = self.data.write().unwrap();
83 data.committed_sequence = seq;
84 data.last_entry_id = entry_id;
85 }
86
87 pub(crate) fn set_entry_id(&self, entry_id: EntryId) {
89 let mut data = self.data.write().unwrap();
90 data.last_entry_id = entry_id;
91 }
92
93 pub(crate) fn committed_sequence(&self) -> SequenceNumber {
95 self.data.read().unwrap().committed_sequence
96 }
97
98 pub(crate) fn freeze_mutable(&self) -> Result<()> {
100 let version = self.current().version;
101 let time_window = version.compaction_time_window;
102
103 let Some(new_memtables) = version
104 .memtables
105 .freeze_mutable(&version.metadata, time_window)?
106 else {
107 return Ok(());
108 };
109
110 let new_version = Arc::new(
112 VersionBuilder::from_version(version)
113 .memtables(new_memtables)
114 .build(),
115 );
116
117 let mut version_data = self.data.write().unwrap();
118 version_data.version = new_version;
119
120 Ok(())
121 }
122
123 pub(crate) fn alter_options(&self, options: RegionOptions) {
125 let version = self.current().version;
126 let new_version = Arc::new(
127 VersionBuilder::from_version(version)
128 .options(options)
129 .build(),
130 );
131 let mut version_data = self.data.write().unwrap();
132 version_data.version = new_version;
133 }
134
135 pub(crate) fn apply_edit(
139 &self,
140 edit: Option<RegionEdit>,
141 memtables_to_remove: &[MemtableId],
142 purger: FilePurgerRef,
143 ) {
144 let version = self.current().version;
145 let builder = VersionBuilder::from_version(version);
146 let committed_sequence = edit.as_ref().and_then(|e| e.committed_sequence);
147 let builder = if let Some(edit) = edit {
148 builder.apply_edit(edit, purger)
149 } else {
150 builder
151 };
152 let new_version = Arc::new(builder.remove_memtables(memtables_to_remove).build());
153
154 let mut version_data = self.data.write().unwrap();
155 version_data.committed_sequence = if let Some(committed_in_edit) = committed_sequence {
156 version_data.committed_sequence.max(committed_in_edit)
157 } else {
158 version_data.committed_sequence
159 };
160 version_data.version = new_version;
161 }
162
163 pub(crate) fn mark_dropped(&self) {
165 let version = self.current().version;
166 let part_duration = Some(version.memtables.mutable.part_duration());
167 let next_memtable_id = version.memtables.mutable.next_memtable_id();
168 let memtable_builder = version.memtables.mutable.memtable_builder().clone();
169 let new_mutable = Arc::new(TimePartitions::new(
170 version.metadata.clone(),
171 memtable_builder,
172 next_memtable_id,
173 part_duration,
174 ));
175
176 let mut data = self.data.write().unwrap();
177 data.is_dropped = true;
178 data.version.ssts.mark_all_deleted();
179 let new_version =
181 Arc::new(VersionBuilder::new(version.metadata.clone(), new_mutable).build());
182 data.version = new_version;
183 }
184
185 pub(crate) fn alter_schema(&self, metadata: RegionMetadataRef) {
190 let version = self.current().version;
191 let part_duration = Some(version.memtables.mutable.part_duration());
192 let next_memtable_id = version.memtables.mutable.next_memtable_id();
193 let memtable_builder = version.memtables.mutable.memtable_builder().clone();
194 let new_mutable = Arc::new(TimePartitions::new(
195 metadata.clone(),
196 memtable_builder,
197 next_memtable_id,
198 part_duration,
199 ));
200 debug_assert!(version.memtables.mutable.is_empty());
201 debug_assert!(version.memtables.immutables().is_empty());
202 let new_version = Arc::new(
203 VersionBuilder::from_version(version)
204 .metadata(metadata)
205 .memtables(MemtableVersion::new(new_mutable))
206 .build(),
207 );
208
209 let mut version_data = self.data.write().unwrap();
210 version_data.version = new_version;
211 }
212
213 pub(crate) fn alter_schema_and_format(
218 &self,
219 metadata: RegionMetadataRef,
220 options: RegionOptions,
221 memtable_builder: MemtableBuilderRef,
222 ) {
223 let version = self.current().version;
224 let part_duration = Some(version.memtables.mutable.part_duration());
225 let next_memtable_id = version.memtables.mutable.next_memtable_id();
226 let new_mutable = Arc::new(TimePartitions::new(
228 metadata.clone(),
229 memtable_builder,
230 next_memtable_id,
231 part_duration,
232 ));
233 debug_assert!(version.memtables.mutable.is_empty());
234 debug_assert!(version.memtables.immutables().is_empty());
235 let new_version = Arc::new(
236 VersionBuilder::from_version(version)
237 .metadata(metadata)
238 .options(options)
239 .memtables(MemtableVersion::new(new_mutable))
240 .build(),
241 );
242
243 let mut version_data = self.data.write().unwrap();
244 version_data.version = new_version;
245 }
246
247 pub(crate) fn truncate(&self, truncate_kind: TruncateKind) {
249 let version = self.current().version;
250
251 let part_duration = version.memtables.mutable.part_duration();
252 let next_memtable_id = version.memtables.mutable.next_memtable_id();
253 let memtable_builder = version.memtables.mutable.memtable_builder().clone();
254 let new_mutable = Arc::new(TimePartitions::new(
255 version.metadata.clone(),
256 memtable_builder,
257 next_memtable_id,
258 Some(part_duration),
259 ));
260 match truncate_kind {
261 TruncateKind::All {
262 truncated_entry_id,
263 truncated_sequence,
264 } => {
265 let new_version = Arc::new(
266 VersionBuilder::from_version(version)
267 .memtables(MemtableVersion::new(new_mutable))
268 .clear_files()
269 .flushed_entry_id(truncated_entry_id)
270 .flushed_sequence(truncated_sequence)
271 .truncated_entry_id(Some(truncated_entry_id))
272 .build(),
273 );
274
275 let mut version_data = self.data.write().unwrap();
276 version_data.version.ssts.mark_all_deleted();
277 version_data.version = new_version;
278 }
279 TruncateKind::Partial { files_to_remove } => {
280 let new_version = Arc::new(
281 VersionBuilder::from_version(version)
282 .remove_files(files_to_remove.into_iter())
283 .build(),
284 );
285
286 let mut version_data = self.data.write().unwrap();
287 version_data.version = new_version;
289 }
290 };
291 }
292
293 pub(crate) fn overwrite_current(&self, version: VersionRef) {
295 let mut version_data = self.data.write().unwrap();
296 version_data.version = version;
297 }
298}
299
300pub(crate) type VersionControlRef = Arc<VersionControl>;
301
302#[derive(Debug, Clone)]
304pub(crate) struct VersionControlData {
305 pub(crate) version: VersionRef,
307 pub(crate) committed_sequence: SequenceNumber,
311 pub(crate) last_entry_id: EntryId,
315 pub(crate) is_dropped: bool,
317}
318
319impl VersionControlData {
320 pub(crate) fn series_count(&self) -> usize {
322 self.version.memtables.mutable.series_count()
323 }
324}
325
326#[derive(Clone, Debug)]
328pub(crate) struct Version {
329 pub(crate) metadata: RegionMetadataRef,
334 pub(crate) memtables: MemtableVersionRef,
338 pub(crate) ssts: SstVersionRef,
340 pub(crate) flushed_entry_id: EntryId,
342 pub(crate) flushed_sequence: SequenceNumber,
344 pub(crate) truncated_entry_id: Option<EntryId>,
348 pub(crate) compaction_time_window: Option<Duration>,
353 pub(crate) options: RegionOptions,
355}
356
357pub(crate) type VersionRef = Arc<Version>;
358
359pub(crate) struct VersionBuilder {
361 metadata: RegionMetadataRef,
362 memtables: MemtableVersionRef,
363 ssts: SstVersionRef,
364 flushed_entry_id: EntryId,
365 flushed_sequence: SequenceNumber,
366 truncated_entry_id: Option<EntryId>,
367 compaction_time_window: Option<Duration>,
368 options: RegionOptions,
369}
370
371impl VersionBuilder {
372 pub(crate) fn new(metadata: RegionMetadataRef, mutable: TimePartitionsRef) -> Self {
374 VersionBuilder {
375 metadata,
376 memtables: Arc::new(MemtableVersion::new(mutable)),
377 ssts: Arc::new(SstVersion::new()),
378 flushed_entry_id: 0,
379 flushed_sequence: 0,
380 truncated_entry_id: None,
381 compaction_time_window: None,
382 options: RegionOptions::default(),
383 }
384 }
385
386 pub(crate) fn from_version(version: VersionRef) -> Self {
388 VersionBuilder {
389 metadata: version.metadata.clone(),
390 memtables: version.memtables.clone(),
391 ssts: version.ssts.clone(),
392 flushed_entry_id: version.flushed_entry_id,
393 flushed_sequence: version.flushed_sequence,
394 truncated_entry_id: version.truncated_entry_id,
395 compaction_time_window: version.compaction_time_window,
396 options: version.options.clone(),
397 }
398 }
399
400 pub(crate) fn memtables(mut self, memtables: MemtableVersion) -> Self {
402 self.memtables = Arc::new(memtables);
403 self
404 }
405
406 pub(crate) fn metadata(mut self, metadata: RegionMetadataRef) -> Self {
408 self.metadata = metadata;
409 self
410 }
411
412 pub(crate) fn flushed_entry_id(mut self, entry_id: EntryId) -> Self {
414 self.flushed_entry_id = entry_id;
415 self
416 }
417
418 pub(crate) fn flushed_sequence(mut self, sequence: SequenceNumber) -> Self {
420 self.flushed_sequence = sequence;
421 self
422 }
423
424 pub(crate) fn truncated_entry_id(mut self, entry_id: Option<EntryId>) -> Self {
426 self.truncated_entry_id = entry_id;
427 self
428 }
429
430 pub(crate) fn compaction_time_window(mut self, window: Option<Duration>) -> Self {
432 self.compaction_time_window = window;
433 self
434 }
435
436 pub(crate) fn options(mut self, options: RegionOptions) -> Self {
438 self.options = options;
439 self
440 }
441
442 pub(crate) fn apply_edit(mut self, edit: RegionEdit, file_purger: FilePurgerRef) -> Self {
444 if let Some(entry_id) = edit.flushed_entry_id {
445 self.flushed_entry_id = self.flushed_entry_id.max(entry_id);
446 }
447 if let Some(sequence) = edit.flushed_sequence {
448 self.flushed_sequence = self.flushed_sequence.max(sequence);
449 }
450 if let Some(window) = edit.compaction_time_window {
451 self.compaction_time_window = Some(window);
452 }
453 if !edit.files_to_add.is_empty() || !edit.files_to_remove.is_empty() {
454 let mut ssts = (*self.ssts).clone();
455 ssts.add_files(file_purger, edit.files_to_add.into_iter());
456 ssts.remove_files(edit.files_to_remove.into_iter());
457 self.ssts = Arc::new(ssts);
458 }
459
460 self
461 }
462
463 pub(crate) fn remove_memtables(mut self, ids: &[MemtableId]) -> Self {
465 if !ids.is_empty() {
466 let mut memtables = (*self.memtables).clone();
467 memtables.remove_memtables(ids);
468 self.memtables = Arc::new(memtables);
469 }
470 self
471 }
472
473 pub(crate) fn add_files(
475 mut self,
476 file_purger: FilePurgerRef,
477 files: impl Iterator<Item = FileMeta>,
478 ) -> Self {
479 let mut ssts = (*self.ssts).clone();
480 ssts.add_files(file_purger, files);
481 self.ssts = Arc::new(ssts);
482
483 self
484 }
485
486 pub(crate) fn remove_files(mut self, files: impl Iterator<Item = FileMeta>) -> Self {
487 let mut ssts = (*self.ssts).clone();
488 ssts.remove_files(files);
489 self.ssts = Arc::new(ssts);
490
491 self
492 }
493
494 pub(crate) fn clear_files(mut self) -> Self {
496 self.ssts = Arc::new(SstVersion::new());
497 self
498 }
499
500 pub(crate) fn build(self) -> Version {
503 let compaction_time_window = self
504 .options
505 .compaction
506 .time_window()
507 .or(self.compaction_time_window);
508 if self.compaction_time_window.is_some()
509 && compaction_time_window != self.compaction_time_window
510 {
511 info!(
512 "VersionBuilder overwrites region compaction time window from {:?} to {:?}, region: {}",
513 self.compaction_time_window, compaction_time_window, self.metadata.region_id
514 );
515 }
516
517 Version {
518 metadata: self.metadata,
519 memtables: self.memtables,
520 ssts: self.ssts,
521 flushed_entry_id: self.flushed_entry_id,
522 flushed_sequence: self.flushed_sequence,
523 truncated_entry_id: self.truncated_entry_id,
524 compaction_time_window,
525 options: self.options,
526 }
527 }
528}