store_api/
region_engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Region Engine's definition
16
17use std::any::Any;
18use std::collections::HashMap;
19use std::fmt::{Debug, Display};
20use std::sync::{Arc, Mutex};
21
22use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole};
23use api::region::RegionResponse;
24use async_trait::async_trait;
25use common_error::ext::BoxedError;
26use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
27use common_time::Timestamp;
28use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
29use datafusion_physical_plan::{DisplayAs, DisplayFormatType};
30use datatypes::schema::SchemaRef;
31use futures::future::join_all;
32use serde::{Deserialize, Serialize};
33use tokio::sync::Semaphore;
34
35use crate::logstore::entry;
36use crate::metadata::RegionMetadataRef;
37use crate::region_request::{
38    BatchRegionDdlRequest, RegionOpenRequest, RegionRequest, RegionSequencesRequest,
39};
40use crate::storage::{RegionId, ScanRequest, SequenceNumber};
41
42/// The settable region role state.
43#[derive(Debug, PartialEq, Eq, Clone, Copy)]
44pub enum SettableRegionRoleState {
45    Follower,
46    DowngradingLeader,
47}
48
49impl Display for SettableRegionRoleState {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        match self {
52            SettableRegionRoleState::Follower => write!(f, "Follower"),
53            SettableRegionRoleState::DowngradingLeader => write!(f, "Leader(Downgrading)"),
54        }
55    }
56}
57
58impl From<SettableRegionRoleState> for RegionRole {
59    fn from(value: SettableRegionRoleState) -> Self {
60        match value {
61            SettableRegionRoleState::Follower => RegionRole::Follower,
62            SettableRegionRoleState::DowngradingLeader => RegionRole::DowngradingLeader,
63        }
64    }
65}
66
67/// The request to set region role state.
68#[derive(Debug, PartialEq, Eq)]
69pub struct SetRegionRoleStateRequest {
70    region_id: RegionId,
71    region_role_state: SettableRegionRoleState,
72}
73
74/// The success response of setting region role state.
75#[derive(Debug, PartialEq, Eq)]
76pub enum SetRegionRoleStateSuccess {
77    File,
78    Mito {
79        last_entry_id: entry::Id,
80    },
81    Metric {
82        last_entry_id: entry::Id,
83        metadata_last_entry_id: entry::Id,
84    },
85}
86
87impl SetRegionRoleStateSuccess {
88    /// Returns a [SetRegionRoleStateSuccess::File].
89    pub fn file() -> Self {
90        Self::File
91    }
92
93    /// Returns a [SetRegionRoleStateSuccess::Mito] with the `last_entry_id`.
94    pub fn mito(last_entry_id: entry::Id) -> Self {
95        SetRegionRoleStateSuccess::Mito { last_entry_id }
96    }
97
98    /// Returns a [SetRegionRoleStateSuccess::Metric] with the `last_entry_id` and `metadata_last_entry_id`.
99    pub fn metric(last_entry_id: entry::Id, metadata_last_entry_id: entry::Id) -> Self {
100        SetRegionRoleStateSuccess::Metric {
101            last_entry_id,
102            metadata_last_entry_id,
103        }
104    }
105}
106
107impl SetRegionRoleStateSuccess {
108    /// Returns the last entry id of the region.
109    pub fn last_entry_id(&self) -> Option<entry::Id> {
110        match self {
111            SetRegionRoleStateSuccess::File => None,
112            SetRegionRoleStateSuccess::Mito { last_entry_id } => Some(*last_entry_id),
113            SetRegionRoleStateSuccess::Metric { last_entry_id, .. } => Some(*last_entry_id),
114        }
115    }
116
117    /// Returns the last entry id of the metadata of the region.
118    pub fn metadata_last_entry_id(&self) -> Option<entry::Id> {
119        match self {
120            SetRegionRoleStateSuccess::File => None,
121            SetRegionRoleStateSuccess::Mito { .. } => None,
122            SetRegionRoleStateSuccess::Metric {
123                metadata_last_entry_id,
124                ..
125            } => Some(*metadata_last_entry_id),
126        }
127    }
128}
129
130/// The response of setting region role state.
131#[derive(Debug, PartialEq, Eq)]
132pub enum SetRegionRoleStateResponse {
133    Success(SetRegionRoleStateSuccess),
134    NotFound,
135}
136
137impl SetRegionRoleStateResponse {
138    /// Returns a [SetRegionRoleStateResponse::Success] with the `File` success.
139    pub fn success(success: SetRegionRoleStateSuccess) -> Self {
140        Self::Success(success)
141    }
142
143    /// Returns true if the response is a [SetRegionRoleStateResponse::NotFound].
144    pub fn is_not_found(&self) -> bool {
145        matches!(self, SetRegionRoleStateResponse::NotFound)
146    }
147}
148
149#[derive(Debug, Clone, PartialEq, Eq)]
150pub struct GrantedRegion {
151    pub region_id: RegionId,
152    pub region_role: RegionRole,
153    pub extensions: HashMap<String, Vec<u8>>,
154}
155
156impl GrantedRegion {
157    pub fn new(region_id: RegionId, region_role: RegionRole) -> Self {
158        Self {
159            region_id,
160            region_role,
161            extensions: HashMap::new(),
162        }
163    }
164}
165
166impl From<GrantedRegion> for PbGrantedRegion {
167    fn from(value: GrantedRegion) -> Self {
168        PbGrantedRegion {
169            region_id: value.region_id.as_u64(),
170            role: PbRegionRole::from(value.region_role).into(),
171            extensions: value.extensions,
172        }
173    }
174}
175
176impl From<PbGrantedRegion> for GrantedRegion {
177    fn from(value: PbGrantedRegion) -> Self {
178        GrantedRegion {
179            region_id: RegionId::from_u64(value.region_id),
180            region_role: value.role().into(),
181            extensions: value.extensions,
182        }
183    }
184}
185
186/// The role of the region.
187/// TODO(weny): rename it to `RegionRoleState`
188#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
189pub enum RegionRole {
190    // Readonly region(mito2)
191    Follower,
192    // Writable region(mito2), Readonly region(file).
193    Leader,
194    // Leader is downgrading to follower.
195    //
196    // This state is used to prevent new write requests.
197    DowngradingLeader,
198}
199
200impl Display for RegionRole {
201    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202        match self {
203            RegionRole::Follower => write!(f, "Follower"),
204            RegionRole::Leader => write!(f, "Leader"),
205            RegionRole::DowngradingLeader => write!(f, "Leader(Downgrading)"),
206        }
207    }
208}
209
210impl RegionRole {
211    pub fn writable(&self) -> bool {
212        matches!(self, RegionRole::Leader)
213    }
214}
215
216impl From<RegionRole> for PbRegionRole {
217    fn from(value: RegionRole) -> Self {
218        match value {
219            RegionRole::Follower => PbRegionRole::Follower,
220            RegionRole::Leader => PbRegionRole::Leader,
221            RegionRole::DowngradingLeader => PbRegionRole::DowngradingLeader,
222        }
223    }
224}
225
226impl From<PbRegionRole> for RegionRole {
227    fn from(value: PbRegionRole) -> Self {
228        match value {
229            PbRegionRole::Leader => RegionRole::Leader,
230            PbRegionRole::Follower => RegionRole::Follower,
231            PbRegionRole::DowngradingLeader => RegionRole::DowngradingLeader,
232        }
233    }
234}
235
236/// Output partition properties of the [RegionScanner].
237#[derive(Debug)]
238pub enum ScannerPartitioning {
239    /// Unknown partitioning scheme with a known number of partitions
240    Unknown(usize),
241}
242
243impl ScannerPartitioning {
244    /// Returns the number of partitions.
245    pub fn num_partitions(&self) -> usize {
246        match self {
247            ScannerPartitioning::Unknown(num_partitions) => *num_partitions,
248        }
249    }
250}
251
252/// Represents one data range within a partition
253#[derive(Debug, Clone, Copy, PartialEq, Eq)]
254pub struct PartitionRange {
255    /// Start time of time index column. Inclusive.
256    pub start: Timestamp,
257    /// End time of time index column. Exclusive.
258    pub end: Timestamp,
259    /// Number of rows in this range. Is used to balance ranges between partitions.
260    pub num_rows: usize,
261    /// Identifier to this range. Assigned by storage engine.
262    pub identifier: usize,
263}
264
265/// Properties of the [RegionScanner].
266#[derive(Debug, Default)]
267pub struct ScannerProperties {
268    /// A 2-dim partition ranges.
269    ///
270    /// The first dim vector's length represents the output partition number. The second
271    /// dim is ranges within one partition.
272    pub partitions: Vec<Vec<PartitionRange>>,
273
274    /// Whether scanner is in append-only mode.
275    append_mode: bool,
276
277    /// Total rows that **may** return by scanner. This field is only read iff
278    /// [ScannerProperties::append_mode] is true.
279    total_rows: usize,
280
281    /// Whether to yield an empty batch to distinguish partition ranges.
282    pub distinguish_partition_range: bool,
283
284    /// The target partitions of the scanner. 0 indicates using the number of partitions as target partitions.
285    target_partitions: usize,
286
287    /// Whether the scanner is scanning a logical region.
288    logical_region: bool,
289}
290
291impl ScannerProperties {
292    /// Sets append mode for scanner.
293    pub fn with_append_mode(mut self, append_mode: bool) -> Self {
294        self.append_mode = append_mode;
295        self
296    }
297
298    /// Sets total rows for scanner.
299    pub fn with_total_rows(mut self, total_rows: usize) -> Self {
300        self.total_rows = total_rows;
301        self
302    }
303
304    /// Creates a new [`ScannerProperties`] with the given partitioning.
305    pub fn new(partitions: Vec<Vec<PartitionRange>>, append_mode: bool, total_rows: usize) -> Self {
306        Self {
307            partitions,
308            append_mode,
309            total_rows,
310            distinguish_partition_range: false,
311            target_partitions: 0,
312            logical_region: false,
313        }
314    }
315
316    /// Updates the properties with the given [PrepareRequest].
317    pub fn prepare(&mut self, request: PrepareRequest) {
318        if let Some(ranges) = request.ranges {
319            self.partitions = ranges;
320        }
321        if let Some(distinguish_partition_range) = request.distinguish_partition_range {
322            self.distinguish_partition_range = distinguish_partition_range;
323        }
324        if let Some(target_partitions) = request.target_partitions {
325            self.target_partitions = target_partitions;
326        }
327    }
328
329    /// Returns the number of actual partitions.
330    pub fn num_partitions(&self) -> usize {
331        self.partitions.len()
332    }
333
334    pub fn append_mode(&self) -> bool {
335        self.append_mode
336    }
337
338    pub fn total_rows(&self) -> usize {
339        self.total_rows
340    }
341
342    /// Returns whether the scanner is scanning a logical region.
343    pub fn is_logical_region(&self) -> bool {
344        self.logical_region
345    }
346
347    /// Returns the target partitions of the scanner. If it is not set, returns the number of partitions.
348    pub fn target_partitions(&self) -> usize {
349        if self.target_partitions == 0 {
350            self.num_partitions()
351        } else {
352            self.target_partitions
353        }
354    }
355
356    /// Sets whether the scanner is reading a logical region.
357    pub fn set_logical_region(&mut self, logical_region: bool) {
358        self.logical_region = logical_region;
359    }
360}
361
362/// Request to override the scanner properties.
363#[derive(Default)]
364pub struct PrepareRequest {
365    /// Assigned partition ranges.
366    pub ranges: Option<Vec<Vec<PartitionRange>>>,
367    /// Distringuishes partition range by empty batches.
368    pub distinguish_partition_range: Option<bool>,
369    /// The expected number of target partitions.
370    pub target_partitions: Option<usize>,
371}
372
373impl PrepareRequest {
374    /// Sets the ranges.
375    pub fn with_ranges(mut self, ranges: Vec<Vec<PartitionRange>>) -> Self {
376        self.ranges = Some(ranges);
377        self
378    }
379
380    /// Sets the distinguish partition range flag.
381    pub fn with_distinguish_partition_range(mut self, distinguish_partition_range: bool) -> Self {
382        self.distinguish_partition_range = Some(distinguish_partition_range);
383        self
384    }
385
386    /// Sets the target partitions.
387    pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
388        self.target_partitions = Some(target_partitions);
389        self
390    }
391}
392
393/// A scanner that provides a way to scan the region concurrently.
394///
395/// The scanner splits the region into partitions so that each partition can be scanned concurrently.
396/// You can use this trait to implement an [`ExecutionPlan`](datafusion_physical_plan::ExecutionPlan).
397pub trait RegionScanner: Debug + DisplayAs + Send {
398    /// Returns the properties of the scanner.
399    fn properties(&self) -> &ScannerProperties;
400
401    /// Returns the schema of the record batches.
402    fn schema(&self) -> SchemaRef;
403
404    /// Returns the metadata of the region.
405    fn metadata(&self) -> RegionMetadataRef;
406
407    /// Prepares the scanner with the given partition ranges.
408    ///
409    /// This method is for the planner to adjust the scanner's behavior based on the partition ranges.
410    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError>;
411
412    /// Scans the partition and returns a stream of record batches.
413    ///
414    /// # Panics
415    /// Panics if the `partition` is out of bound.
416    fn scan_partition(
417        &self,
418        metrics_set: &ExecutionPlanMetricsSet,
419        partition: usize,
420    ) -> Result<SendableRecordBatchStream, BoxedError>;
421
422    /// Check if there is any predicate that may be executed in this scanner.
423    fn has_predicate(&self) -> bool;
424
425    /// Sets whether the scanner is reading a logical region.
426    fn set_logical_region(&mut self, logical_region: bool);
427}
428
429pub type RegionScannerRef = Box<dyn RegionScanner>;
430
431pub type BatchResponses = Vec<(RegionId, Result<RegionResponse, BoxedError>)>;
432
433/// Represents the statistics of a region.
434#[derive(Debug, Deserialize, Serialize, Default)]
435pub struct RegionStatistic {
436    /// The number of rows
437    #[serde(default)]
438    pub num_rows: u64,
439    /// The size of memtable in bytes.
440    pub memtable_size: u64,
441    /// The size of WAL in bytes.
442    pub wal_size: u64,
443    /// The size of manifest in bytes.
444    pub manifest_size: u64,
445    /// The size of SST data files in bytes.
446    pub sst_size: u64,
447    /// The size of SST index files in bytes.
448    #[serde(default)]
449    pub index_size: u64,
450    /// The details of the region.
451    #[serde(default)]
452    pub manifest: RegionManifestInfo,
453    /// The latest entry id of the region's remote WAL since last flush.
454    /// For metric engine, there're two latest entry ids, one for data and one for metadata.
455    /// TODO(weny): remove this two fields and use single instead.
456    #[serde(default)]
457    pub data_topic_latest_entry_id: u64,
458    #[serde(default)]
459    pub metadata_topic_latest_entry_id: u64,
460}
461
462/// The manifest info of a region.
463#[derive(Debug, Clone, Serialize, Deserialize)]
464pub enum RegionManifestInfo {
465    Mito {
466        manifest_version: u64,
467        flushed_entry_id: u64,
468    },
469    Metric {
470        data_manifest_version: u64,
471        data_flushed_entry_id: u64,
472        metadata_manifest_version: u64,
473        metadata_flushed_entry_id: u64,
474    },
475}
476
477impl RegionManifestInfo {
478    /// Creates a new [RegionManifestInfo] for mito2 engine.
479    pub fn mito(manifest_version: u64, flushed_entry_id: u64) -> Self {
480        Self::Mito {
481            manifest_version,
482            flushed_entry_id,
483        }
484    }
485
486    /// Creates a new [RegionManifestInfo] for metric engine.
487    pub fn metric(
488        data_manifest_version: u64,
489        data_flushed_entry_id: u64,
490        metadata_manifest_version: u64,
491        metadata_flushed_entry_id: u64,
492    ) -> Self {
493        Self::Metric {
494            data_manifest_version,
495            data_flushed_entry_id,
496            metadata_manifest_version,
497            metadata_flushed_entry_id,
498        }
499    }
500
501    /// Returns true if the region is a mito2 region.
502    pub fn is_mito(&self) -> bool {
503        matches!(self, RegionManifestInfo::Mito { .. })
504    }
505
506    /// Returns true if the region is a metric region.
507    pub fn is_metric(&self) -> bool {
508        matches!(self, RegionManifestInfo::Metric { .. })
509    }
510
511    /// Returns the flushed entry id of the data region.
512    pub fn data_flushed_entry_id(&self) -> u64 {
513        match self {
514            RegionManifestInfo::Mito {
515                flushed_entry_id, ..
516            } => *flushed_entry_id,
517            RegionManifestInfo::Metric {
518                data_flushed_entry_id,
519                ..
520            } => *data_flushed_entry_id,
521        }
522    }
523
524    /// Returns the manifest version of the data region.
525    pub fn data_manifest_version(&self) -> u64 {
526        match self {
527            RegionManifestInfo::Mito {
528                manifest_version, ..
529            } => *manifest_version,
530            RegionManifestInfo::Metric {
531                data_manifest_version,
532                ..
533            } => *data_manifest_version,
534        }
535    }
536
537    /// Returns the manifest version of the metadata region.
538    pub fn metadata_manifest_version(&self) -> Option<u64> {
539        match self {
540            RegionManifestInfo::Mito { .. } => None,
541            RegionManifestInfo::Metric {
542                metadata_manifest_version,
543                ..
544            } => Some(*metadata_manifest_version),
545        }
546    }
547
548    /// Returns the flushed entry id of the metadata region.
549    pub fn metadata_flushed_entry_id(&self) -> Option<u64> {
550        match self {
551            RegionManifestInfo::Mito { .. } => None,
552            RegionManifestInfo::Metric {
553                metadata_flushed_entry_id,
554                ..
555            } => Some(*metadata_flushed_entry_id),
556        }
557    }
558
559    /// Encodes a list of ([RegionId], [RegionManifestInfo]) to a byte array.
560    pub fn encode_list(manifest_infos: &[(RegionId, Self)]) -> serde_json::Result<Vec<u8>> {
561        serde_json::to_vec(manifest_infos)
562    }
563
564    /// Decodes a list of ([RegionId], [RegionManifestInfo]) from a byte array.
565    pub fn decode_list(value: &[u8]) -> serde_json::Result<Vec<(RegionId, Self)>> {
566        serde_json::from_slice(value)
567    }
568}
569
570impl Default for RegionManifestInfo {
571    fn default() -> Self {
572        Self::Mito {
573            manifest_version: 0,
574            flushed_entry_id: 0,
575        }
576    }
577}
578
579impl RegionStatistic {
580    /// Deserializes the region statistic to a byte array.
581    ///
582    /// Returns None if the deserialization fails.
583    pub fn deserialize_from_slice(value: &[u8]) -> Option<RegionStatistic> {
584        serde_json::from_slice(value).ok()
585    }
586
587    /// Serializes the region statistic to a byte array.
588    ///
589    /// Returns None if the serialization fails.
590    pub fn serialize_to_vec(&self) -> Option<Vec<u8>> {
591        serde_json::to_vec(self).ok()
592    }
593}
594
595impl RegionStatistic {
596    /// Returns the estimated disk size of the region.
597    pub fn estimated_disk_size(&self) -> u64 {
598        self.wal_size + self.sst_size + self.manifest_size + self.index_size
599    }
600}
601
602/// The response of syncing the manifest.
603#[derive(Debug)]
604pub enum SyncManifestResponse {
605    NotSupported,
606    Mito {
607        /// Indicates if the data region was synced.
608        synced: bool,
609    },
610    Metric {
611        /// Indicates if the metadata region was synced.
612        metadata_synced: bool,
613        /// Indicates if the data region was synced.
614        data_synced: bool,
615        /// The logical regions that were newly opened during the sync operation.
616        /// This only occurs after the metadata region has been successfully synced.
617        new_opened_logical_region_ids: Vec<RegionId>,
618    },
619}
620
621impl SyncManifestResponse {
622    /// Returns true if data region is synced.
623    pub fn is_data_synced(&self) -> bool {
624        match self {
625            SyncManifestResponse::NotSupported => false,
626            SyncManifestResponse::Mito { synced } => *synced,
627            SyncManifestResponse::Metric { data_synced, .. } => *data_synced,
628        }
629    }
630
631    /// Returns true if the engine is supported the sync operation.
632    pub fn is_supported(&self) -> bool {
633        matches!(self, SyncManifestResponse::NotSupported)
634    }
635
636    /// Returns true if the engine is a mito2 engine.
637    pub fn is_mito(&self) -> bool {
638        matches!(self, SyncManifestResponse::Mito { .. })
639    }
640
641    /// Returns true if the engine is a metric engine.
642    pub fn is_metric(&self) -> bool {
643        matches!(self, SyncManifestResponse::Metric { .. })
644    }
645
646    /// Returns the new opened logical region ids.
647    pub fn new_opened_logical_region_ids(self) -> Option<Vec<RegionId>> {
648        match self {
649            SyncManifestResponse::Metric {
650                new_opened_logical_region_ids,
651                ..
652            } => Some(new_opened_logical_region_ids),
653            _ => None,
654        }
655    }
656}
657
658#[async_trait]
659pub trait RegionEngine: Send + Sync {
660    /// Name of this engine
661    fn name(&self) -> &str;
662
663    /// Handles batch open region requests.
664    async fn handle_batch_open_requests(
665        &self,
666        parallelism: usize,
667        requests: Vec<(RegionId, RegionOpenRequest)>,
668    ) -> Result<BatchResponses, BoxedError> {
669        let semaphore = Arc::new(Semaphore::new(parallelism));
670        let mut tasks = Vec::with_capacity(requests.len());
671
672        for (region_id, request) in requests {
673            let semaphore_moved = semaphore.clone();
674
675            tasks.push(async move {
676                // Safety: semaphore must exist
677                let _permit = semaphore_moved.acquire().await.unwrap();
678                let result = self
679                    .handle_request(region_id, RegionRequest::Open(request))
680                    .await;
681                (region_id, result)
682            });
683        }
684
685        Ok(join_all(tasks).await)
686    }
687
688    async fn handle_batch_ddl_requests(
689        &self,
690        request: BatchRegionDdlRequest,
691    ) -> Result<RegionResponse, BoxedError> {
692        let requests = request.into_region_requests();
693
694        let mut affected_rows = 0;
695        let mut extensions = HashMap::new();
696
697        for (region_id, request) in requests {
698            let result = self.handle_request(region_id, request).await?;
699            affected_rows += result.affected_rows;
700            extensions.extend(result.extensions);
701        }
702
703        Ok(RegionResponse {
704            affected_rows,
705            extensions,
706        })
707    }
708
709    /// Handles non-query request to the region. Returns the count of affected rows.
710    async fn handle_request(
711        &self,
712        region_id: RegionId,
713        request: RegionRequest,
714    ) -> Result<RegionResponse, BoxedError>;
715
716    /// Returns the last sequence number of the region.
717    async fn get_last_seq_num(
718        &self,
719        region_id: RegionId,
720    ) -> Result<Option<SequenceNumber>, BoxedError>;
721
722    async fn get_region_sequences(
723        &self,
724        seqs: RegionSequencesRequest,
725    ) -> Result<HashMap<u64, u64>, BoxedError> {
726        let mut results = HashMap::with_capacity(seqs.region_ids.len());
727
728        for region_id in seqs.region_ids {
729            let seq = self.get_last_seq_num(region_id).await?.unwrap_or_default();
730            results.insert(region_id.as_u64(), seq);
731        }
732
733        Ok(results)
734    }
735
736    /// Handles query and return a scanner that can be used to scan the region concurrently.
737    async fn handle_query(
738        &self,
739        region_id: RegionId,
740        request: ScanRequest,
741    ) -> Result<RegionScannerRef, BoxedError>;
742
743    /// Retrieves region's metadata.
744    async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError>;
745
746    /// Retrieves region's statistic.
747    fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic>;
748
749    /// Stops the engine
750    async fn stop(&self) -> Result<(), BoxedError>;
751
752    /// Sets [RegionRole] for a region.
753    ///
754    /// The engine checks whether the region is writable before writing to the region. Setting
755    /// the region as readonly doesn't guarantee that write operations in progress will not
756    /// take effect.
757    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError>;
758
759    /// Syncs the region manifest to the given manifest version.
760    async fn sync_region(
761        &self,
762        region_id: RegionId,
763        manifest_info: RegionManifestInfo,
764    ) -> Result<SyncManifestResponse, BoxedError>;
765
766    /// Sets region role state gracefully.
767    ///
768    /// After the call returns, the engine ensures no more write operations will succeed in the region.
769    async fn set_region_role_state_gracefully(
770        &self,
771        region_id: RegionId,
772        region_role_state: SettableRegionRoleState,
773    ) -> Result<SetRegionRoleStateResponse, BoxedError>;
774
775    /// Indicates region role.
776    ///
777    /// Returns the `None` if the region is not found.
778    fn role(&self, region_id: RegionId) -> Option<RegionRole>;
779
780    fn as_any(&self) -> &dyn Any;
781}
782
783pub type RegionEngineRef = Arc<dyn RegionEngine>;
784
785/// A [RegionScanner] that only scans a single partition.
786pub struct SinglePartitionScanner {
787    stream: Mutex<Option<SendableRecordBatchStream>>,
788    schema: SchemaRef,
789    properties: ScannerProperties,
790    metadata: RegionMetadataRef,
791}
792
793impl SinglePartitionScanner {
794    /// Creates a new [SinglePartitionScanner] with the given stream and metadata.
795    pub fn new(
796        stream: SendableRecordBatchStream,
797        append_mode: bool,
798        metadata: RegionMetadataRef,
799    ) -> Self {
800        let schema = stream.schema();
801        Self {
802            stream: Mutex::new(Some(stream)),
803            schema,
804            properties: ScannerProperties::default().with_append_mode(append_mode),
805            metadata,
806        }
807    }
808}
809
810impl Debug for SinglePartitionScanner {
811    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
812        write!(f, "SinglePartitionScanner: <SendableRecordBatchStream>")
813    }
814}
815
816impl RegionScanner for SinglePartitionScanner {
817    fn properties(&self) -> &ScannerProperties {
818        &self.properties
819    }
820
821    fn schema(&self) -> SchemaRef {
822        self.schema.clone()
823    }
824
825    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
826        self.properties.prepare(request);
827        Ok(())
828    }
829
830    fn scan_partition(
831        &self,
832        _metrics_set: &ExecutionPlanMetricsSet,
833        _partition: usize,
834    ) -> Result<SendableRecordBatchStream, BoxedError> {
835        let mut stream = self.stream.lock().unwrap();
836        let result = stream
837            .take()
838            .or_else(|| Some(Box::pin(EmptyRecordBatchStream::new(self.schema.clone()))));
839        Ok(result.unwrap())
840    }
841
842    fn has_predicate(&self) -> bool {
843        false
844    }
845
846    fn metadata(&self) -> RegionMetadataRef {
847        self.metadata.clone()
848    }
849
850    fn set_logical_region(&mut self, logical_region: bool) {
851        self.properties.set_logical_region(logical_region);
852    }
853}
854
855impl DisplayAs for SinglePartitionScanner {
856    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
857        write!(f, "{:?}", self)
858    }
859}