1use std::sync::{Arc, RwLock};
27use std::time::Duration;
28
29use common_telemetry::info;
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::SequenceNumber;
32
33use crate::error::Result;
34use crate::manifest::action::{RegionEdit, TruncateKind};
35use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
36use crate::memtable::version::{MemtableVersion, MemtableVersionRef};
37use crate::memtable::{MemtableBuilderRef, MemtableId};
38use crate::region::options::RegionOptions;
39use crate::sst::file::FileMeta;
40use crate::sst::file_purger::FilePurgerRef;
41use crate::sst::version::{SstVersion, SstVersionRef};
42use crate::wal::EntryId;
43
44#[derive(Debug)]
49pub(crate) struct VersionControl {
50 data: RwLock<VersionControlData>,
51}
52
53impl VersionControl {
54 pub(crate) fn new(version: Version) -> VersionControl {
56 let (flushed_sequence, flushed_entry_id) =
58 (version.flushed_sequence, version.flushed_entry_id);
59 VersionControl {
60 data: RwLock::new(VersionControlData {
61 version: Arc::new(version),
62 committed_sequence: flushed_sequence,
63 last_entry_id: flushed_entry_id,
64 is_dropped: false,
65 }),
66 }
67 }
68
69 pub(crate) fn current(&self) -> VersionControlData {
71 self.data.read().unwrap().clone()
72 }
73
74 pub(crate) fn set_committed_sequence(&self, seq: SequenceNumber) {
76 let mut data = self.data.write().unwrap();
77 data.committed_sequence = seq;
78 }
79
80 pub(crate) fn set_sequence_and_entry_id(&self, seq: SequenceNumber, entry_id: EntryId) {
82 let mut data = self.data.write().unwrap();
83 data.committed_sequence = seq;
84 data.last_entry_id = entry_id;
85 }
86
87 pub(crate) fn set_entry_id(&self, entry_id: EntryId) {
89 let mut data = self.data.write().unwrap();
90 data.last_entry_id = entry_id;
91 }
92
93 pub(crate) fn committed_sequence(&self) -> SequenceNumber {
95 self.data.read().unwrap().committed_sequence
96 }
97
98 pub(crate) fn freeze_mutable(&self) -> Result<()> {
100 let version = self.current().version;
101 let time_window = version.compaction_time_window;
102
103 let Some(new_memtables) = version
104 .memtables
105 .freeze_mutable(&version.metadata, time_window)?
106 else {
107 return Ok(());
108 };
109
110 let new_version = Arc::new(
112 VersionBuilder::from_version(version)
113 .memtables(new_memtables)
114 .build(),
115 );
116
117 let mut version_data = self.data.write().unwrap();
118 version_data.version = new_version;
119
120 Ok(())
121 }
122
123 pub(crate) fn alter_options(&self, options: RegionOptions) {
125 let version = self.current().version;
126 let new_version = Arc::new(
127 VersionBuilder::from_version(version)
128 .options(options)
129 .build(),
130 );
131 let mut version_data = self.data.write().unwrap();
132 version_data.version = new_version;
133 }
134
135 pub(crate) fn apply_edit(
139 &self,
140 edit: Option<RegionEdit>,
141 memtables_to_remove: &[MemtableId],
142 purger: FilePurgerRef,
143 ) {
144 let version = self.current().version;
145 let builder = VersionBuilder::from_version(version);
146 let committed_sequence = edit.as_ref().and_then(|e| e.committed_sequence);
147 let builder = if let Some(edit) = edit {
148 builder.apply_edit(edit, purger)
149 } else {
150 builder
151 };
152 let new_version = Arc::new(builder.remove_memtables(memtables_to_remove).build());
153
154 let mut version_data = self.data.write().unwrap();
155 version_data.committed_sequence = if let Some(committed_in_edit) = committed_sequence {
156 version_data.committed_sequence.max(committed_in_edit)
157 } else {
158 version_data.committed_sequence
159 };
160 version_data.version = new_version;
161 }
162
163 pub(crate) fn mark_dropped(&self, memtable_builder: &MemtableBuilderRef) {
165 let version = self.current().version;
166 let part_duration = Some(version.memtables.mutable.part_duration());
167 let next_memtable_id = version.memtables.mutable.next_memtable_id();
168 let new_mutable = Arc::new(TimePartitions::new(
169 version.metadata.clone(),
170 memtable_builder.clone(),
171 next_memtable_id,
172 part_duration,
173 ));
174
175 let mut data = self.data.write().unwrap();
176 data.is_dropped = true;
177 data.version.ssts.mark_all_deleted();
178 let new_version =
180 Arc::new(VersionBuilder::new(version.metadata.clone(), new_mutable).build());
181 data.version = new_version;
182 }
183
184 pub(crate) fn alter_schema(&self, metadata: RegionMetadataRef, builder: &MemtableBuilderRef) {
189 let version = self.current().version;
190 let part_duration = Some(version.memtables.mutable.part_duration());
191 let next_memtable_id = version.memtables.mutable.next_memtable_id();
192 let new_mutable = Arc::new(TimePartitions::new(
193 metadata.clone(),
194 builder.clone(),
195 next_memtable_id,
196 part_duration,
197 ));
198 debug_assert!(version.memtables.mutable.is_empty());
199 debug_assert!(version.memtables.immutables().is_empty());
200 let new_version = Arc::new(
201 VersionBuilder::from_version(version)
202 .metadata(metadata)
203 .memtables(MemtableVersion::new(new_mutable))
204 .build(),
205 );
206
207 let mut version_data = self.data.write().unwrap();
208 version_data.version = new_version;
209 }
210
211 pub(crate) fn truncate(
213 &self,
214 truncate_kind: TruncateKind,
215 memtable_builder: &MemtableBuilderRef,
216 ) {
217 let version = self.current().version;
218
219 let part_duration = version.memtables.mutable.part_duration();
220 let next_memtable_id = version.memtables.mutable.next_memtable_id();
221 let new_mutable = Arc::new(TimePartitions::new(
222 version.metadata.clone(),
223 memtable_builder.clone(),
224 next_memtable_id,
225 Some(part_duration),
226 ));
227 match truncate_kind {
228 TruncateKind::All {
229 truncated_entry_id,
230 truncated_sequence,
231 } => {
232 let new_version = Arc::new(
233 VersionBuilder::new(version.metadata.clone(), new_mutable)
234 .flushed_entry_id(truncated_entry_id)
235 .flushed_sequence(truncated_sequence)
236 .truncated_entry_id(Some(truncated_entry_id))
237 .build(),
238 );
239
240 let mut version_data = self.data.write().unwrap();
241 version_data.version.ssts.mark_all_deleted();
242 version_data.version = new_version;
243 }
244 TruncateKind::Partial { files_to_remove } => {
245 let new_version = Arc::new(
246 VersionBuilder::from_version(version)
247 .remove_files(files_to_remove.into_iter())
248 .build(),
249 );
250
251 let mut version_data = self.data.write().unwrap();
252 version_data.version = new_version;
254 }
255 };
256 }
257
258 pub(crate) fn overwrite_current(&self, version: VersionRef) {
260 let mut version_data = self.data.write().unwrap();
261 version_data.version = version;
262 }
263}
264
265pub(crate) type VersionControlRef = Arc<VersionControl>;
266
267#[derive(Debug, Clone)]
269pub(crate) struct VersionControlData {
270 pub(crate) version: VersionRef,
272 pub(crate) committed_sequence: SequenceNumber,
276 pub(crate) last_entry_id: EntryId,
280 pub(crate) is_dropped: bool,
282}
283
284impl VersionControlData {
285 pub(crate) fn series_count(&self) -> usize {
287 self.version.memtables.mutable.series_count()
288 }
289}
290
291#[derive(Clone, Debug)]
293pub(crate) struct Version {
294 pub(crate) metadata: RegionMetadataRef,
299 pub(crate) memtables: MemtableVersionRef,
303 pub(crate) ssts: SstVersionRef,
305 pub(crate) flushed_entry_id: EntryId,
307 pub(crate) flushed_sequence: SequenceNumber,
309 pub(crate) truncated_entry_id: Option<EntryId>,
313 pub(crate) compaction_time_window: Option<Duration>,
318 pub(crate) options: RegionOptions,
320}
321
322pub(crate) type VersionRef = Arc<Version>;
323
324pub(crate) struct VersionBuilder {
326 metadata: RegionMetadataRef,
327 memtables: MemtableVersionRef,
328 ssts: SstVersionRef,
329 flushed_entry_id: EntryId,
330 flushed_sequence: SequenceNumber,
331 truncated_entry_id: Option<EntryId>,
332 compaction_time_window: Option<Duration>,
333 options: RegionOptions,
334}
335
336impl VersionBuilder {
337 pub(crate) fn new(metadata: RegionMetadataRef, mutable: TimePartitionsRef) -> Self {
339 VersionBuilder {
340 metadata,
341 memtables: Arc::new(MemtableVersion::new(mutable)),
342 ssts: Arc::new(SstVersion::new()),
343 flushed_entry_id: 0,
344 flushed_sequence: 0,
345 truncated_entry_id: None,
346 compaction_time_window: None,
347 options: RegionOptions::default(),
348 }
349 }
350
351 pub(crate) fn from_version(version: VersionRef) -> Self {
353 VersionBuilder {
354 metadata: version.metadata.clone(),
355 memtables: version.memtables.clone(),
356 ssts: version.ssts.clone(),
357 flushed_entry_id: version.flushed_entry_id,
358 flushed_sequence: version.flushed_sequence,
359 truncated_entry_id: version.truncated_entry_id,
360 compaction_time_window: version.compaction_time_window,
361 options: version.options.clone(),
362 }
363 }
364
365 pub(crate) fn memtables(mut self, memtables: MemtableVersion) -> Self {
367 self.memtables = Arc::new(memtables);
368 self
369 }
370
371 pub(crate) fn metadata(mut self, metadata: RegionMetadataRef) -> Self {
373 self.metadata = metadata;
374 self
375 }
376
377 pub(crate) fn flushed_entry_id(mut self, entry_id: EntryId) -> Self {
379 self.flushed_entry_id = entry_id;
380 self
381 }
382
383 pub(crate) fn flushed_sequence(mut self, sequence: SequenceNumber) -> Self {
385 self.flushed_sequence = sequence;
386 self
387 }
388
389 pub(crate) fn truncated_entry_id(mut self, entry_id: Option<EntryId>) -> Self {
391 self.truncated_entry_id = entry_id;
392 self
393 }
394
395 pub(crate) fn compaction_time_window(mut self, window: Option<Duration>) -> Self {
397 self.compaction_time_window = window;
398 self
399 }
400
401 pub(crate) fn options(mut self, options: RegionOptions) -> Self {
403 self.options = options;
404 self
405 }
406
407 pub(crate) fn apply_edit(mut self, edit: RegionEdit, file_purger: FilePurgerRef) -> Self {
409 if let Some(entry_id) = edit.flushed_entry_id {
410 self.flushed_entry_id = self.flushed_entry_id.max(entry_id);
411 }
412 if let Some(sequence) = edit.flushed_sequence {
413 self.flushed_sequence = self.flushed_sequence.max(sequence);
414 }
415 if let Some(window) = edit.compaction_time_window {
416 self.compaction_time_window = Some(window);
417 }
418 if !edit.files_to_add.is_empty() || !edit.files_to_remove.is_empty() {
419 let mut ssts = (*self.ssts).clone();
420 ssts.add_files(file_purger, edit.files_to_add.into_iter());
421 ssts.remove_files(edit.files_to_remove.into_iter());
422 self.ssts = Arc::new(ssts);
423 }
424
425 self
426 }
427
428 pub(crate) fn remove_memtables(mut self, ids: &[MemtableId]) -> Self {
430 if !ids.is_empty() {
431 let mut memtables = (*self.memtables).clone();
432 memtables.remove_memtables(ids);
433 self.memtables = Arc::new(memtables);
434 }
435 self
436 }
437
438 pub(crate) fn add_files(
440 mut self,
441 file_purger: FilePurgerRef,
442 files: impl Iterator<Item = FileMeta>,
443 ) -> Self {
444 let mut ssts = (*self.ssts).clone();
445 ssts.add_files(file_purger, files);
446 self.ssts = Arc::new(ssts);
447
448 self
449 }
450
451 pub(crate) fn remove_files(mut self, files: impl Iterator<Item = FileMeta>) -> Self {
452 let mut ssts = (*self.ssts).clone();
453 ssts.remove_files(files);
454 self.ssts = Arc::new(ssts);
455
456 self
457 }
458
459 pub(crate) fn build(self) -> Version {
462 let compaction_time_window = self
463 .options
464 .compaction
465 .time_window()
466 .or(self.compaction_time_window);
467 if self.compaction_time_window.is_some()
468 && compaction_time_window != self.compaction_time_window
469 {
470 info!(
471 "VersionBuilder overwrites region compaction time window from {:?} to {:?}, region: {}",
472 self.compaction_time_window, compaction_time_window, self.metadata.region_id
473 );
474 }
475
476 Version {
477 metadata: self.metadata,
478 memtables: self.memtables,
479 ssts: self.ssts,
480 flushed_entry_id: self.flushed_entry_id,
481 flushed_sequence: self.flushed_sequence,
482 truncated_entry_id: self.truncated_entry_id,
483 compaction_time_window,
484 options: self.options,
485 }
486 }
487}