1use std::sync::{Arc, RwLock};
27use std::time::Duration;
28
29use common_telemetry::info;
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::SequenceNumber;
32
33use crate::error::Result;
34use crate::manifest::action::{RegionEdit, TruncateKind};
35use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
36use crate::memtable::version::{MemtableVersion, MemtableVersionRef};
37use crate::memtable::{MemtableBuilderRef, MemtableId};
38use crate::region::options::RegionOptions;
39use crate::sst::file::FileMeta;
40use crate::sst::file_purger::FilePurgerRef;
41use crate::sst::version::{SstVersion, SstVersionRef};
42use crate::wal::EntryId;
43
44#[derive(Debug)]
49pub(crate) struct VersionControl {
50 data: RwLock<VersionControlData>,
51}
52
53impl VersionControl {
54 pub(crate) fn new(version: Version) -> VersionControl {
56 let (flushed_sequence, flushed_entry_id) =
58 (version.flushed_sequence, version.flushed_entry_id);
59 VersionControl {
60 data: RwLock::new(VersionControlData {
61 version: Arc::new(version),
62 committed_sequence: flushed_sequence,
63 last_entry_id: flushed_entry_id,
64 is_dropped: false,
65 }),
66 }
67 }
68
69 pub(crate) fn current(&self) -> VersionControlData {
71 self.data.read().unwrap().clone()
72 }
73
74 pub(crate) fn set_committed_sequence(&self, seq: SequenceNumber) {
76 let mut data = self.data.write().unwrap();
77 data.committed_sequence = seq;
78 }
79
80 pub(crate) fn set_sequence_and_entry_id(&self, seq: SequenceNumber, entry_id: EntryId) {
82 let mut data = self.data.write().unwrap();
83 data.committed_sequence = seq;
84 data.last_entry_id = entry_id;
85 }
86
87 pub(crate) fn set_entry_id(&self, entry_id: EntryId) {
89 let mut data = self.data.write().unwrap();
90 data.last_entry_id = entry_id;
91 }
92
93 pub(crate) fn committed_sequence(&self) -> SequenceNumber {
95 self.data.read().unwrap().committed_sequence
96 }
97
98 pub(crate) fn freeze_mutable(&self) -> Result<()> {
100 let version = self.current().version;
101 let time_window = version.compaction_time_window;
102
103 let Some(new_memtables) = version
104 .memtables
105 .freeze_mutable(&version.metadata, time_window)?
106 else {
107 return Ok(());
108 };
109
110 let new_version = Arc::new(
112 VersionBuilder::from_version(version)
113 .memtables(new_memtables)
114 .build(),
115 );
116
117 let mut version_data = self.data.write().unwrap();
118 version_data.version = new_version;
119
120 Ok(())
121 }
122
123 pub(crate) fn alter_options(&self, options: RegionOptions) {
125 let version = self.current().version;
126 let new_version = Arc::new(
127 VersionBuilder::from_version(version)
128 .options(options)
129 .build(),
130 );
131 let mut version_data = self.data.write().unwrap();
132 version_data.version = new_version;
133 }
134
135 pub(crate) fn apply_edit(
139 &self,
140 edit: Option<RegionEdit>,
141 memtables_to_remove: &[MemtableId],
142 purger: FilePurgerRef,
143 ) {
144 let version = self.current().version;
145 let builder = VersionBuilder::from_version(version);
146 let committed_sequence = edit.as_ref().and_then(|e| e.committed_sequence);
147 let builder = if let Some(edit) = edit {
148 builder.apply_edit(edit, purger)
149 } else {
150 builder
151 };
152 let new_version = Arc::new(builder.remove_memtables(memtables_to_remove).build());
153
154 let mut version_data = self.data.write().unwrap();
155 version_data.committed_sequence = if let Some(committed_in_edit) = committed_sequence {
156 version_data.committed_sequence.max(committed_in_edit)
157 } else {
158 version_data.committed_sequence
159 };
160 version_data.version = new_version;
161 }
162
163 pub(crate) fn mark_dropped(&self) {
165 let version = self.current().version;
166 let memtable_builder = version.memtables.mutable.memtable_builder().clone();
167 let new_mutable =
168 Self::new_mutable_from_version(&version, version.metadata.clone(), memtable_builder);
169
170 let mut data = self.data.write().unwrap();
171 data.is_dropped = true;
172 data.version.ssts.mark_all_deleted();
173 let new_version =
175 Arc::new(VersionBuilder::new(version.metadata.clone(), new_mutable).build());
176 data.version = new_version;
177 }
178
179 pub(crate) fn alter_schema(&self, metadata: RegionMetadataRef) {
184 let version = self.current().version;
185 let memtable_builder = version.memtables.mutable.memtable_builder().clone();
186 let new_mutable =
187 Self::new_mutable_from_version(&version, metadata.clone(), memtable_builder);
188 debug_assert!(version.memtables.mutable.is_empty());
189 debug_assert!(version.memtables.immutables().is_empty());
190 let new_version = Arc::new(
191 VersionBuilder::from_version(version)
192 .metadata(metadata)
193 .memtables(MemtableVersion::new(new_mutable))
194 .build(),
195 );
196
197 let mut version_data = self.data.write().unwrap();
198 version_data.version = new_version;
199 }
200
201 pub(crate) fn alter_schema_and_format(
206 &self,
207 metadata: RegionMetadataRef,
208 options: RegionOptions,
209 memtable_builder: MemtableBuilderRef,
210 ) {
211 let version = self.current().version;
212 let new_mutable =
214 Self::new_mutable_from_version(&version, metadata.clone(), memtable_builder);
215 debug_assert!(version.memtables.mutable.is_empty());
216 debug_assert!(version.memtables.immutables().is_empty());
217 let new_version = Arc::new(
218 VersionBuilder::from_version(version)
219 .metadata(metadata)
220 .options(options)
221 .memtables(MemtableVersion::new(new_mutable))
222 .build(),
223 );
224
225 let mut version_data = self.data.write().unwrap();
226 version_data.version = new_version;
227 }
228
229 pub(crate) fn truncate(&self, truncate_kind: TruncateKind) {
231 let version = self.current().version;
232
233 match truncate_kind {
234 TruncateKind::All {
235 truncated_entry_id,
236 truncated_sequence,
237 } => {
238 let memtable_builder = version.memtables.mutable.memtable_builder().clone();
239 let new_mutable = Self::new_mutable_from_version(
240 &version,
241 version.metadata.clone(),
242 memtable_builder,
243 );
244 let new_version = Arc::new(
245 VersionBuilder::from_version(version)
246 .memtables(MemtableVersion::new(new_mutable))
247 .clear_files()
248 .flushed_entry_id(truncated_entry_id)
249 .flushed_sequence(truncated_sequence)
250 .truncated_entry_id(Some(truncated_entry_id))
251 .build(),
252 );
253
254 let mut version_data = self.data.write().unwrap();
255 version_data.version.ssts.mark_all_deleted();
256 version_data.version = new_version;
257 }
258 TruncateKind::Partial { files_to_remove } => {
259 let new_version = Arc::new(
260 VersionBuilder::from_version(version)
261 .remove_files(files_to_remove.into_iter())
262 .build(),
263 );
264
265 let mut version_data = self.data.write().unwrap();
266 version_data.version = new_version;
268 }
269 };
270 }
271
272 pub(crate) fn overwrite_current(&self, version: VersionRef) {
274 let mut version_data = self.data.write().unwrap();
275 version_data.version = version;
276 }
277
278 fn new_mutable_from_version(
279 version: &Version,
280 metadata: RegionMetadataRef,
281 memtable_builder: MemtableBuilderRef,
282 ) -> TimePartitionsRef {
283 Arc::new(TimePartitions::new(
284 metadata,
285 memtable_builder,
286 version.memtables.mutable.next_memtable_id(),
287 Some(version.memtables.mutable.part_duration()),
288 ))
289 }
290}
291
292pub(crate) type VersionControlRef = Arc<VersionControl>;
293
294#[derive(Debug, Clone)]
296pub(crate) struct VersionControlData {
297 pub(crate) version: VersionRef,
299 pub(crate) committed_sequence: SequenceNumber,
303 pub(crate) last_entry_id: EntryId,
307 pub(crate) is_dropped: bool,
309}
310
311impl VersionControlData {
312 pub(crate) fn series_count(&self) -> usize {
314 self.version.memtables.mutable.series_count()
315 }
316}
317
318#[derive(Clone, Debug)]
320pub(crate) struct Version {
321 pub(crate) metadata: RegionMetadataRef,
326 pub(crate) memtables: MemtableVersionRef,
330 pub(crate) ssts: SstVersionRef,
332 pub(crate) flushed_entry_id: EntryId,
334 pub(crate) flushed_sequence: SequenceNumber,
336 pub(crate) truncated_entry_id: Option<EntryId>,
340 pub(crate) compaction_time_window: Option<Duration>,
345 pub(crate) options: RegionOptions,
347}
348
349pub(crate) type VersionRef = Arc<Version>;
350
351pub(crate) struct VersionBuilder {
353 metadata: RegionMetadataRef,
354 memtables: MemtableVersionRef,
355 ssts: SstVersionRef,
356 flushed_entry_id: EntryId,
357 flushed_sequence: SequenceNumber,
358 truncated_entry_id: Option<EntryId>,
359 compaction_time_window: Option<Duration>,
360 options: RegionOptions,
361}
362
363impl VersionBuilder {
364 pub(crate) fn new(metadata: RegionMetadataRef, mutable: TimePartitionsRef) -> Self {
366 VersionBuilder {
367 metadata,
368 memtables: Arc::new(MemtableVersion::new(mutable)),
369 ssts: Arc::new(SstVersion::new()),
370 flushed_entry_id: 0,
371 flushed_sequence: 0,
372 truncated_entry_id: None,
373 compaction_time_window: None,
374 options: RegionOptions::default(),
375 }
376 }
377
378 pub(crate) fn from_version(version: VersionRef) -> Self {
380 VersionBuilder {
381 metadata: version.metadata.clone(),
382 memtables: version.memtables.clone(),
383 ssts: version.ssts.clone(),
384 flushed_entry_id: version.flushed_entry_id,
385 flushed_sequence: version.flushed_sequence,
386 truncated_entry_id: version.truncated_entry_id,
387 compaction_time_window: version.compaction_time_window,
388 options: version.options.clone(),
389 }
390 }
391
392 pub(crate) fn memtables(mut self, memtables: MemtableVersion) -> Self {
394 self.memtables = Arc::new(memtables);
395 self
396 }
397
398 pub(crate) fn metadata(mut self, metadata: RegionMetadataRef) -> Self {
400 self.metadata = metadata;
401 self
402 }
403
404 pub(crate) fn flushed_entry_id(mut self, entry_id: EntryId) -> Self {
406 self.flushed_entry_id = entry_id;
407 self
408 }
409
410 pub(crate) fn flushed_sequence(mut self, sequence: SequenceNumber) -> Self {
412 self.flushed_sequence = sequence;
413 self
414 }
415
416 pub(crate) fn truncated_entry_id(mut self, entry_id: Option<EntryId>) -> Self {
418 self.truncated_entry_id = entry_id;
419 self
420 }
421
422 pub(crate) fn compaction_time_window(mut self, window: Option<Duration>) -> Self {
424 self.compaction_time_window = window;
425 self
426 }
427
428 pub(crate) fn options(mut self, options: RegionOptions) -> Self {
430 self.options = options;
431 self
432 }
433
434 pub(crate) fn apply_edit(mut self, edit: RegionEdit, file_purger: FilePurgerRef) -> Self {
436 if let Some(entry_id) = edit.flushed_entry_id {
437 self.flushed_entry_id = self.flushed_entry_id.max(entry_id);
438 }
439 if let Some(sequence) = edit.flushed_sequence {
440 self.flushed_sequence = self.flushed_sequence.max(sequence);
441 }
442 if let Some(window) = edit.compaction_time_window {
443 self.compaction_time_window = Some(window);
444 }
445 if !edit.files_to_add.is_empty() || !edit.files_to_remove.is_empty() {
446 let mut ssts = (*self.ssts).clone();
447 ssts.add_files(file_purger, edit.files_to_add.into_iter());
448 ssts.remove_files(edit.files_to_remove.into_iter());
449 self.ssts = Arc::new(ssts);
450 }
451
452 self
453 }
454
455 pub(crate) fn remove_memtables(mut self, ids: &[MemtableId]) -> Self {
457 if !ids.is_empty() {
458 let mut memtables = (*self.memtables).clone();
459 memtables.remove_memtables(ids);
460 self.memtables = Arc::new(memtables);
461 }
462 self
463 }
464
465 pub(crate) fn add_files(
467 mut self,
468 file_purger: FilePurgerRef,
469 files: impl Iterator<Item = FileMeta>,
470 ) -> Self {
471 let mut ssts = (*self.ssts).clone();
472 ssts.add_files(file_purger, files);
473 self.ssts = Arc::new(ssts);
474
475 self
476 }
477
478 pub(crate) fn remove_files(mut self, files: impl Iterator<Item = FileMeta>) -> Self {
479 let mut ssts = (*self.ssts).clone();
480 ssts.remove_files(files);
481 self.ssts = Arc::new(ssts);
482
483 self
484 }
485
486 pub(crate) fn clear_files(mut self) -> Self {
488 self.ssts = Arc::new(SstVersion::new());
489 self
490 }
491
492 pub(crate) fn build(self) -> Version {
495 let compaction_time_window = self
496 .options
497 .compaction
498 .time_window()
499 .or(self.compaction_time_window);
500 if self.compaction_time_window.is_some()
501 && compaction_time_window != self.compaction_time_window
502 {
503 info!(
504 "VersionBuilder overwrites region compaction time window from {:?} to {:?}, region: {}",
505 self.compaction_time_window, compaction_time_window, self.metadata.region_id
506 );
507 }
508
509 Version {
510 metadata: self.metadata,
511 memtables: self.memtables,
512 ssts: self.ssts,
513 flushed_entry_id: self.flushed_entry_id,
514 flushed_sequence: self.flushed_sequence,
515 truncated_entry_id: self.truncated_entry_id,
516 compaction_time_window,
517 options: self.options,
518 }
519 }
520}