1use std::sync::{Arc, RwLock};
27use std::time::Duration;
28
29use common_telemetry::info;
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::SequenceNumber;
32
33use crate::error::Result;
34use crate::manifest::action::{RegionEdit, TruncateKind};
35use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
36use crate::memtable::version::{MemtableVersion, MemtableVersionRef};
37use crate::memtable::{MemtableBuilderRef, MemtableId};
38use crate::region::options::RegionOptions;
39use crate::sst::file::FileMeta;
40use crate::sst::file_purger::FilePurgerRef;
41use crate::sst::version::{SstVersion, SstVersionRef};
42use crate::wal::EntryId;
43
44#[derive(Debug)]
49pub(crate) struct VersionControl {
50 data: RwLock<VersionControlData>,
51}
52
53impl VersionControl {
54 pub(crate) fn new(version: Version) -> VersionControl {
56 let (flushed_sequence, flushed_entry_id) =
58 (version.flushed_sequence, version.flushed_entry_id);
59 VersionControl {
60 data: RwLock::new(VersionControlData {
61 version: Arc::new(version),
62 committed_sequence: flushed_sequence,
63 last_entry_id: flushed_entry_id,
64 is_dropped: false,
65 }),
66 }
67 }
68
69 pub(crate) fn current(&self) -> VersionControlData {
71 self.data.read().unwrap().clone()
72 }
73
74 pub(crate) fn set_committed_sequence(&self, seq: SequenceNumber) {
76 let mut data = self.data.write().unwrap();
77 data.committed_sequence = seq;
78 }
79
80 pub(crate) fn set_sequence_and_entry_id(&self, seq: SequenceNumber, entry_id: EntryId) {
82 let mut data = self.data.write().unwrap();
83 data.committed_sequence = seq;
84 data.last_entry_id = entry_id;
85 }
86
87 pub(crate) fn set_entry_id(&self, entry_id: EntryId) {
89 let mut data = self.data.write().unwrap();
90 data.last_entry_id = entry_id;
91 }
92
93 pub(crate) fn committed_sequence(&self) -> SequenceNumber {
95 self.data.read().unwrap().committed_sequence
96 }
97
98 pub(crate) fn freeze_mutable(&self) -> Result<()> {
100 let version = self.current().version;
101 let time_window = version.compaction_time_window;
102
103 let Some(new_memtables) = version
104 .memtables
105 .freeze_mutable(&version.metadata, time_window)?
106 else {
107 return Ok(());
108 };
109
110 let new_version = Arc::new(
112 VersionBuilder::from_version(version)
113 .memtables(new_memtables)
114 .build(),
115 );
116
117 let mut version_data = self.data.write().unwrap();
118 version_data.version = new_version;
119
120 Ok(())
121 }
122
123 pub(crate) fn alter_options(&self, options: RegionOptions) {
125 let version = self.current().version;
126 let new_version = Arc::new(
127 VersionBuilder::from_version(version)
128 .options(options)
129 .build(),
130 );
131 let mut version_data = self.data.write().unwrap();
132 version_data.version = new_version;
133 }
134
135 pub(crate) fn apply_edit(
139 &self,
140 edit: Option<RegionEdit>,
141 memtables_to_remove: &[MemtableId],
142 purger: FilePurgerRef,
143 ) {
144 let version = self.current().version;
145 let builder = VersionBuilder::from_version(version);
146 let committed_sequence = edit.as_ref().and_then(|e| e.committed_sequence);
147 let builder = if let Some(edit) = edit {
148 builder.apply_edit(edit, purger)
149 } else {
150 builder
151 };
152 let new_version = Arc::new(builder.remove_memtables(memtables_to_remove).build());
153
154 let mut version_data = self.data.write().unwrap();
155 version_data.committed_sequence = if let Some(committed_in_edit) = committed_sequence {
156 version_data.committed_sequence.max(committed_in_edit)
157 } else {
158 version_data.committed_sequence
159 };
160 version_data.version = new_version;
161 }
162
163 pub(crate) fn mark_dropped(&self) {
165 let version = self.current().version;
166 let memtable_builder = version.memtables.mutable.memtable_builder().clone();
167 let new_mutable =
168 Self::new_mutable_from_version(&version, version.metadata.clone(), memtable_builder);
169
170 let mut data = self.data.write().unwrap();
171 data.is_dropped = true;
172 data.version.ssts.mark_all_deleted();
173 let new_version =
175 Arc::new(VersionBuilder::new(version.metadata.clone(), new_mutable).build());
176 data.version = new_version;
177 }
178
179 pub(crate) fn alter_schema(&self, metadata: RegionMetadataRef) {
184 let version = self.current().version;
185 let memtable_builder = version.memtables.mutable.memtable_builder().clone();
186 let new_mutable =
187 Self::new_mutable_from_version(&version, metadata.clone(), memtable_builder);
188 debug_assert!(version.memtables.mutable.is_empty());
189 debug_assert!(version.memtables.immutables().is_empty());
190 let new_version = Arc::new(
191 VersionBuilder::from_version(version)
192 .metadata(metadata)
193 .memtables(MemtableVersion::new(new_mutable))
194 .build(),
195 );
196
197 let mut version_data = self.data.write().unwrap();
198 version_data.version = new_version;
199 }
200
201 pub(crate) fn alter_metadata(&self, metadata: RegionMetadataRef) {
203 let version = self.current().version;
204 let new_version = Arc::new(
205 VersionBuilder::from_version(version)
206 .metadata(metadata)
207 .build(),
208 );
209
210 let mut version_data = self.data.write().unwrap();
211 version_data.version = new_version;
212 }
213
214 pub(crate) fn alter_schema_and_format(
219 &self,
220 metadata: RegionMetadataRef,
221 options: RegionOptions,
222 memtable_builder: MemtableBuilderRef,
223 ) {
224 let version = self.current().version;
225 let new_mutable =
227 Self::new_mutable_from_version(&version, metadata.clone(), memtable_builder);
228 debug_assert!(version.memtables.mutable.is_empty());
229 debug_assert!(version.memtables.immutables().is_empty());
230 let new_version = Arc::new(
231 VersionBuilder::from_version(version)
232 .metadata(metadata)
233 .options(options)
234 .memtables(MemtableVersion::new(new_mutable))
235 .build(),
236 );
237
238 let mut version_data = self.data.write().unwrap();
239 version_data.version = new_version;
240 }
241
242 pub(crate) fn truncate(&self, truncate_kind: TruncateKind) {
244 let version = self.current().version;
245
246 match truncate_kind {
247 TruncateKind::All {
248 truncated_entry_id,
249 truncated_sequence,
250 } => {
251 let memtable_builder = version.memtables.mutable.memtable_builder().clone();
252 let new_mutable = Self::new_mutable_from_version(
253 &version,
254 version.metadata.clone(),
255 memtable_builder,
256 );
257 let new_version = Arc::new(
258 VersionBuilder::from_version(version)
259 .memtables(MemtableVersion::new(new_mutable))
260 .clear_files()
261 .flushed_entry_id(truncated_entry_id)
262 .flushed_sequence(truncated_sequence)
263 .truncated_entry_id(Some(truncated_entry_id))
264 .build(),
265 );
266
267 let mut version_data = self.data.write().unwrap();
268 version_data.version.ssts.mark_all_deleted();
269 version_data.version = new_version;
270 }
271 TruncateKind::Partial { files_to_remove } => {
272 let new_version = Arc::new(
273 VersionBuilder::from_version(version)
274 .remove_files(files_to_remove.into_iter())
275 .build(),
276 );
277
278 let mut version_data = self.data.write().unwrap();
279 version_data.version = new_version;
281 }
282 };
283 }
284
285 pub(crate) fn overwrite_current(&self, version: VersionRef) {
287 let mut version_data = self.data.write().unwrap();
288 version_data.version = version;
289 }
290
291 fn new_mutable_from_version(
292 version: &Version,
293 metadata: RegionMetadataRef,
294 memtable_builder: MemtableBuilderRef,
295 ) -> TimePartitionsRef {
296 Arc::new(TimePartitions::new(
297 metadata,
298 memtable_builder,
299 version.memtables.mutable.next_memtable_id(),
300 Some(version.memtables.mutable.part_duration()),
301 ))
302 }
303}
304
305pub(crate) type VersionControlRef = Arc<VersionControl>;
306
307#[derive(Debug, Clone)]
309pub(crate) struct VersionControlData {
310 pub(crate) version: VersionRef,
312 pub(crate) committed_sequence: SequenceNumber,
316 pub(crate) last_entry_id: EntryId,
320 pub(crate) is_dropped: bool,
322}
323
324impl VersionControlData {
325 pub(crate) fn series_count(&self) -> usize {
327 self.version.memtables.mutable.series_count()
328 }
329}
330
331#[derive(Clone, Debug)]
333pub(crate) struct Version {
334 pub(crate) metadata: RegionMetadataRef,
339 pub(crate) memtables: MemtableVersionRef,
343 pub(crate) ssts: SstVersionRef,
345 pub(crate) flushed_entry_id: EntryId,
347 pub(crate) flushed_sequence: SequenceNumber,
349 pub(crate) truncated_entry_id: Option<EntryId>,
353 pub(crate) compaction_time_window: Option<Duration>,
358 pub(crate) options: RegionOptions,
360}
361
362pub(crate) type VersionRef = Arc<Version>;
363
364pub(crate) struct VersionBuilder {
366 metadata: RegionMetadataRef,
367 memtables: MemtableVersionRef,
368 ssts: SstVersionRef,
369 flushed_entry_id: EntryId,
370 flushed_sequence: SequenceNumber,
371 truncated_entry_id: Option<EntryId>,
372 compaction_time_window: Option<Duration>,
373 options: RegionOptions,
374}
375
376impl VersionBuilder {
377 pub(crate) fn new(metadata: RegionMetadataRef, mutable: TimePartitionsRef) -> Self {
379 VersionBuilder {
380 metadata,
381 memtables: Arc::new(MemtableVersion::new(mutable)),
382 ssts: Arc::new(SstVersion::new()),
383 flushed_entry_id: 0,
384 flushed_sequence: 0,
385 truncated_entry_id: None,
386 compaction_time_window: None,
387 options: RegionOptions::default(),
388 }
389 }
390
391 pub(crate) fn from_version(version: VersionRef) -> Self {
393 VersionBuilder {
394 metadata: version.metadata.clone(),
395 memtables: version.memtables.clone(),
396 ssts: version.ssts.clone(),
397 flushed_entry_id: version.flushed_entry_id,
398 flushed_sequence: version.flushed_sequence,
399 truncated_entry_id: version.truncated_entry_id,
400 compaction_time_window: version.compaction_time_window,
401 options: version.options.clone(),
402 }
403 }
404
405 pub(crate) fn memtables(mut self, memtables: MemtableVersion) -> Self {
407 self.memtables = Arc::new(memtables);
408 self
409 }
410
411 pub(crate) fn metadata(mut self, metadata: RegionMetadataRef) -> Self {
413 self.metadata = metadata;
414 self
415 }
416
417 pub(crate) fn flushed_entry_id(mut self, entry_id: EntryId) -> Self {
419 self.flushed_entry_id = entry_id;
420 self
421 }
422
423 pub(crate) fn flushed_sequence(mut self, sequence: SequenceNumber) -> Self {
425 self.flushed_sequence = sequence;
426 self
427 }
428
429 pub(crate) fn truncated_entry_id(mut self, entry_id: Option<EntryId>) -> Self {
431 self.truncated_entry_id = entry_id;
432 self
433 }
434
435 pub(crate) fn compaction_time_window(mut self, window: Option<Duration>) -> Self {
437 self.compaction_time_window = window;
438 self
439 }
440
441 pub(crate) fn options(mut self, options: RegionOptions) -> Self {
443 self.options = options;
444 self
445 }
446
447 pub(crate) fn apply_edit(mut self, edit: RegionEdit, file_purger: FilePurgerRef) -> Self {
449 if let Some(entry_id) = edit.flushed_entry_id {
450 self.flushed_entry_id = self.flushed_entry_id.max(entry_id);
451 }
452 if let Some(sequence) = edit.flushed_sequence {
453 self.flushed_sequence = self.flushed_sequence.max(sequence);
454 }
455 if let Some(window) = edit.compaction_time_window {
456 self.compaction_time_window = Some(window);
457 }
458 if !edit.files_to_add.is_empty() || !edit.files_to_remove.is_empty() {
459 let mut ssts = (*self.ssts).clone();
460 ssts.add_files(file_purger, edit.files_to_add.into_iter());
461 ssts.remove_files(edit.files_to_remove.into_iter());
462 self.ssts = Arc::new(ssts);
463 }
464
465 self
466 }
467
468 pub(crate) fn remove_memtables(mut self, ids: &[MemtableId]) -> Self {
470 if !ids.is_empty() {
471 let mut memtables = (*self.memtables).clone();
472 memtables.remove_memtables(ids);
473 self.memtables = Arc::new(memtables);
474 }
475 self
476 }
477
478 pub(crate) fn add_files(
480 mut self,
481 file_purger: FilePurgerRef,
482 files: impl Iterator<Item = FileMeta>,
483 ) -> Self {
484 let mut ssts = (*self.ssts).clone();
485 ssts.add_files(file_purger, files);
486 self.ssts = Arc::new(ssts);
487
488 self
489 }
490
491 pub(crate) fn remove_files(mut self, files: impl Iterator<Item = FileMeta>) -> Self {
492 let mut ssts = (*self.ssts).clone();
493 ssts.remove_files(files);
494 self.ssts = Arc::new(ssts);
495
496 self
497 }
498
499 pub(crate) fn clear_files(mut self) -> Self {
501 self.ssts = Arc::new(SstVersion::new());
502 self
503 }
504
505 pub(crate) fn build(self) -> Version {
508 let compaction_time_window = self
509 .options
510 .compaction
511 .time_window()
512 .or(self.compaction_time_window);
513 if self.compaction_time_window.is_some()
514 && compaction_time_window != self.compaction_time_window
515 {
516 info!(
517 "VersionBuilder overwrites region compaction time window from {:?} to {:?}, region: {}",
518 self.compaction_time_window, compaction_time_window, self.metadata.region_id
519 );
520 }
521
522 Version {
523 metadata: self.metadata,
524 memtables: self.memtables,
525 ssts: self.ssts,
526 flushed_entry_id: self.flushed_entry_id,
527 flushed_sequence: self.flushed_sequence,
528 truncated_entry_id: self.truncated_entry_id,
529 compaction_time_window,
530 options: self.options,
531 }
532 }
533}