store_api/
region_engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Region Engine's definition
16
17use std::any::Any;
18use std::collections::HashMap;
19use std::fmt::{Debug, Display};
20use std::sync::{Arc, Mutex};
21
22use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole};
23use api::region::RegionResponse;
24use async_trait::async_trait;
25use common_error::ext::BoxedError;
26use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
27use common_time::Timestamp;
28use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
29use datafusion_physical_plan::{DisplayAs, DisplayFormatType};
30use datatypes::schema::SchemaRef;
31use futures::future::join_all;
32use serde::{Deserialize, Serialize};
33use tokio::sync::Semaphore;
34
35use crate::logstore::entry;
36use crate::metadata::RegionMetadataRef;
37use crate::region_request::{
38    BatchRegionDdlRequest, RegionOpenRequest, RegionRequest, RegionSequencesRequest,
39};
40use crate::storage::{RegionId, ScanRequest, SequenceNumber};
41
42/// The settable region role state.
43#[derive(Debug, PartialEq, Eq, Clone, Copy)]
44pub enum SettableRegionRoleState {
45    Follower,
46    DowngradingLeader,
47}
48
49impl Display for SettableRegionRoleState {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        match self {
52            SettableRegionRoleState::Follower => write!(f, "Follower"),
53            SettableRegionRoleState::DowngradingLeader => write!(f, "Leader(Downgrading)"),
54        }
55    }
56}
57
58impl From<SettableRegionRoleState> for RegionRole {
59    fn from(value: SettableRegionRoleState) -> Self {
60        match value {
61            SettableRegionRoleState::Follower => RegionRole::Follower,
62            SettableRegionRoleState::DowngradingLeader => RegionRole::DowngradingLeader,
63        }
64    }
65}
66
67/// The request to set region role state.
68#[derive(Debug, PartialEq, Eq)]
69pub struct SetRegionRoleStateRequest {
70    region_id: RegionId,
71    region_role_state: SettableRegionRoleState,
72}
73
74/// The success response of setting region role state.
75#[derive(Debug, PartialEq, Eq)]
76pub enum SetRegionRoleStateSuccess {
77    File,
78    Mito {
79        last_entry_id: entry::Id,
80    },
81    Metric {
82        last_entry_id: entry::Id,
83        metadata_last_entry_id: entry::Id,
84    },
85}
86
87impl SetRegionRoleStateSuccess {
88    /// Returns a [SetRegionRoleStateSuccess::File].
89    pub fn file() -> Self {
90        Self::File
91    }
92
93    /// Returns a [SetRegionRoleStateSuccess::Mito] with the `last_entry_id`.
94    pub fn mito(last_entry_id: entry::Id) -> Self {
95        SetRegionRoleStateSuccess::Mito { last_entry_id }
96    }
97
98    /// Returns a [SetRegionRoleStateSuccess::Metric] with the `last_entry_id` and `metadata_last_entry_id`.
99    pub fn metric(last_entry_id: entry::Id, metadata_last_entry_id: entry::Id) -> Self {
100        SetRegionRoleStateSuccess::Metric {
101            last_entry_id,
102            metadata_last_entry_id,
103        }
104    }
105}
106
107impl SetRegionRoleStateSuccess {
108    /// Returns the last entry id of the region.
109    pub fn last_entry_id(&self) -> Option<entry::Id> {
110        match self {
111            SetRegionRoleStateSuccess::File => None,
112            SetRegionRoleStateSuccess::Mito { last_entry_id } => Some(*last_entry_id),
113            SetRegionRoleStateSuccess::Metric { last_entry_id, .. } => Some(*last_entry_id),
114        }
115    }
116
117    /// Returns the last entry id of the metadata of the region.
118    pub fn metadata_last_entry_id(&self) -> Option<entry::Id> {
119        match self {
120            SetRegionRoleStateSuccess::File => None,
121            SetRegionRoleStateSuccess::Mito { .. } => None,
122            SetRegionRoleStateSuccess::Metric {
123                metadata_last_entry_id,
124                ..
125            } => Some(*metadata_last_entry_id),
126        }
127    }
128}
129
130/// The response of setting region role state.
131#[derive(Debug, PartialEq, Eq)]
132pub enum SetRegionRoleStateResponse {
133    Success(SetRegionRoleStateSuccess),
134    NotFound,
135}
136
137impl SetRegionRoleStateResponse {
138    /// Returns a [SetRegionRoleStateResponse::Success] with the `File` success.
139    pub fn success(success: SetRegionRoleStateSuccess) -> Self {
140        Self::Success(success)
141    }
142
143    /// Returns true if the response is a [SetRegionRoleStateResponse::NotFound].
144    pub fn is_not_found(&self) -> bool {
145        matches!(self, SetRegionRoleStateResponse::NotFound)
146    }
147}
148
149#[derive(Debug, Clone, PartialEq, Eq)]
150pub struct GrantedRegion {
151    pub region_id: RegionId,
152    pub region_role: RegionRole,
153    pub extensions: HashMap<String, Vec<u8>>,
154}
155
156impl GrantedRegion {
157    pub fn new(region_id: RegionId, region_role: RegionRole) -> Self {
158        Self {
159            region_id,
160            region_role,
161            extensions: HashMap::new(),
162        }
163    }
164}
165
166impl From<GrantedRegion> for PbGrantedRegion {
167    fn from(value: GrantedRegion) -> Self {
168        PbGrantedRegion {
169            region_id: value.region_id.as_u64(),
170            role: PbRegionRole::from(value.region_role).into(),
171            extensions: value.extensions,
172        }
173    }
174}
175
176impl From<PbGrantedRegion> for GrantedRegion {
177    fn from(value: PbGrantedRegion) -> Self {
178        GrantedRegion {
179            region_id: RegionId::from_u64(value.region_id),
180            region_role: value.role().into(),
181            extensions: value.extensions,
182        }
183    }
184}
185
186/// The role of the region.
187/// TODO(weny): rename it to `RegionRoleState`
188#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
189pub enum RegionRole {
190    // Readonly region(mito2)
191    Follower,
192    // Writable region(mito2), Readonly region(file).
193    Leader,
194    // Leader is downgrading to follower.
195    //
196    // This state is used to prevent new write requests.
197    DowngradingLeader,
198}
199
200impl Display for RegionRole {
201    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202        match self {
203            RegionRole::Follower => write!(f, "Follower"),
204            RegionRole::Leader => write!(f, "Leader"),
205            RegionRole::DowngradingLeader => write!(f, "Leader(Downgrading)"),
206        }
207    }
208}
209
210impl RegionRole {
211    pub fn writable(&self) -> bool {
212        matches!(self, RegionRole::Leader)
213    }
214}
215
216impl From<RegionRole> for PbRegionRole {
217    fn from(value: RegionRole) -> Self {
218        match value {
219            RegionRole::Follower => PbRegionRole::Follower,
220            RegionRole::Leader => PbRegionRole::Leader,
221            RegionRole::DowngradingLeader => PbRegionRole::DowngradingLeader,
222        }
223    }
224}
225
226impl From<PbRegionRole> for RegionRole {
227    fn from(value: PbRegionRole) -> Self {
228        match value {
229            PbRegionRole::Leader => RegionRole::Leader,
230            PbRegionRole::Follower => RegionRole::Follower,
231            PbRegionRole::DowngradingLeader => RegionRole::DowngradingLeader,
232        }
233    }
234}
235
236/// Output partition properties of the [RegionScanner].
237#[derive(Debug)]
238pub enum ScannerPartitioning {
239    /// Unknown partitioning scheme with a known number of partitions
240    Unknown(usize),
241}
242
243impl ScannerPartitioning {
244    /// Returns the number of partitions.
245    pub fn num_partitions(&self) -> usize {
246        match self {
247            ScannerPartitioning::Unknown(num_partitions) => *num_partitions,
248        }
249    }
250}
251
252/// Represents one data range within a partition
253#[derive(Debug, Clone, Copy, PartialEq, Eq)]
254pub struct PartitionRange {
255    /// Start time of time index column. Inclusive.
256    pub start: Timestamp,
257    /// End time of time index column. Exclusive.
258    pub end: Timestamp,
259    /// Number of rows in this range. Is used to balance ranges between partitions.
260    pub num_rows: usize,
261    /// Identifier to this range. Assigned by storage engine.
262    pub identifier: usize,
263}
264
265/// Properties of the [RegionScanner].
266#[derive(Debug, Default)]
267pub struct ScannerProperties {
268    /// A 2-dim partition ranges.
269    ///
270    /// The first dim vector's length represents the output partition number. The second
271    /// dim is ranges within one partition.
272    pub partitions: Vec<Vec<PartitionRange>>,
273
274    /// Whether scanner is in append-only mode.
275    append_mode: bool,
276
277    /// Total rows that **may** return by scanner. This field is only read iff
278    /// [ScannerProperties::append_mode] is true.
279    total_rows: usize,
280
281    /// Whether to yield an empty batch to distinguish partition ranges.
282    pub distinguish_partition_range: bool,
283
284    /// The target partitions of the scanner. 0 indicates using the number of partitions as target partitions.
285    target_partitions: usize,
286
287    /// Whether the scanner is scanning a logical region.
288    logical_region: bool,
289}
290
291impl ScannerProperties {
292    /// Sets append mode for scanner.
293    pub fn with_append_mode(mut self, append_mode: bool) -> Self {
294        self.append_mode = append_mode;
295        self
296    }
297
298    /// Sets total rows for scanner.
299    pub fn with_total_rows(mut self, total_rows: usize) -> Self {
300        self.total_rows = total_rows;
301        self
302    }
303
304    /// Creates a new [`ScannerProperties`] with the given partitioning.
305    pub fn new(partitions: Vec<Vec<PartitionRange>>, append_mode: bool, total_rows: usize) -> Self {
306        Self {
307            partitions,
308            append_mode,
309            total_rows,
310            distinguish_partition_range: false,
311            target_partitions: 0,
312            logical_region: false,
313        }
314    }
315
316    /// Updates the properties with the given [PrepareRequest].
317    pub fn prepare(&mut self, request: PrepareRequest) {
318        if let Some(ranges) = request.ranges {
319            self.partitions = ranges;
320        }
321        if let Some(distinguish_partition_range) = request.distinguish_partition_range {
322            self.distinguish_partition_range = distinguish_partition_range;
323        }
324        if let Some(target_partitions) = request.target_partitions {
325            self.target_partitions = target_partitions;
326        }
327    }
328
329    /// Returns the number of actual partitions.
330    pub fn num_partitions(&self) -> usize {
331        self.partitions.len()
332    }
333
334    pub fn append_mode(&self) -> bool {
335        self.append_mode
336    }
337
338    pub fn total_rows(&self) -> usize {
339        self.total_rows
340    }
341
342    /// Returns whether the scanner is scanning a logical region.
343    pub fn is_logical_region(&self) -> bool {
344        self.logical_region
345    }
346
347    /// Returns the target partitions of the scanner. If it is not set, returns the number of partitions.
348    pub fn target_partitions(&self) -> usize {
349        if self.target_partitions == 0 {
350            self.num_partitions()
351        } else {
352            self.target_partitions
353        }
354    }
355
356    /// Sets whether the scanner is reading a logical region.
357    pub fn set_logical_region(&mut self, logical_region: bool) {
358        self.logical_region = logical_region;
359    }
360}
361
362/// Request to override the scanner properties.
363#[derive(Default)]
364pub struct PrepareRequest {
365    /// Assigned partition ranges.
366    pub ranges: Option<Vec<Vec<PartitionRange>>>,
367    /// Distringuishes partition range by empty batches.
368    pub distinguish_partition_range: Option<bool>,
369    /// The expected number of target partitions.
370    pub target_partitions: Option<usize>,
371}
372
373impl PrepareRequest {
374    /// Sets the ranges.
375    pub fn with_ranges(mut self, ranges: Vec<Vec<PartitionRange>>) -> Self {
376        self.ranges = Some(ranges);
377        self
378    }
379
380    /// Sets the distinguish partition range flag.
381    pub fn with_distinguish_partition_range(mut self, distinguish_partition_range: bool) -> Self {
382        self.distinguish_partition_range = Some(distinguish_partition_range);
383        self
384    }
385
386    /// Sets the target partitions.
387    pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
388        self.target_partitions = Some(target_partitions);
389        self
390    }
391}
392
393/// Necessary context of the query for the scanner.
394#[derive(Clone, Default)]
395pub struct QueryScanContext {
396    /// Whether the query is EXPLAIN ANALYZE VERBOSE.
397    pub explain_verbose: bool,
398}
399
400/// A scanner that provides a way to scan the region concurrently.
401///
402/// The scanner splits the region into partitions so that each partition can be scanned concurrently.
403/// You can use this trait to implement an [`ExecutionPlan`](datafusion_physical_plan::ExecutionPlan).
404pub trait RegionScanner: Debug + DisplayAs + Send {
405    /// Returns the properties of the scanner.
406    fn properties(&self) -> &ScannerProperties;
407
408    /// Returns the schema of the record batches.
409    fn schema(&self) -> SchemaRef;
410
411    /// Returns the metadata of the region.
412    fn metadata(&self) -> RegionMetadataRef;
413
414    /// Prepares the scanner with the given partition ranges.
415    ///
416    /// This method is for the planner to adjust the scanner's behavior based on the partition ranges.
417    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError>;
418
419    /// Scans the partition and returns a stream of record batches.
420    ///
421    /// # Panics
422    /// Panics if the `partition` is out of bound.
423    fn scan_partition(
424        &self,
425        ctx: &QueryScanContext,
426        metrics_set: &ExecutionPlanMetricsSet,
427        partition: usize,
428    ) -> Result<SendableRecordBatchStream, BoxedError>;
429
430    /// Check if there is any predicate that may be executed in this scanner.
431    fn has_predicate(&self) -> bool;
432
433    /// Sets whether the scanner is reading a logical region.
434    fn set_logical_region(&mut self, logical_region: bool);
435}
436
437pub type RegionScannerRef = Box<dyn RegionScanner>;
438
439pub type BatchResponses = Vec<(RegionId, Result<RegionResponse, BoxedError>)>;
440
441/// Represents the statistics of a region.
442#[derive(Debug, Deserialize, Serialize, Default)]
443pub struct RegionStatistic {
444    /// The number of rows
445    #[serde(default)]
446    pub num_rows: u64,
447    /// The size of memtable in bytes.
448    pub memtable_size: u64,
449    /// The size of WAL in bytes.
450    pub wal_size: u64,
451    /// The size of manifest in bytes.
452    pub manifest_size: u64,
453    /// The size of SST data files in bytes.
454    pub sst_size: u64,
455    /// The num of SST files.
456    pub sst_num: u64,
457    /// The size of SST index files in bytes.
458    #[serde(default)]
459    pub index_size: u64,
460    /// The details of the region.
461    #[serde(default)]
462    pub manifest: RegionManifestInfo,
463    /// The latest entry id of the region's remote WAL since last flush.
464    /// For metric engine, there're two latest entry ids, one for data and one for metadata.
465    /// TODO(weny): remove this two fields and use single instead.
466    #[serde(default)]
467    pub data_topic_latest_entry_id: u64,
468    #[serde(default)]
469    pub metadata_topic_latest_entry_id: u64,
470}
471
472/// The manifest info of a region.
473#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
474pub enum RegionManifestInfo {
475    Mito {
476        manifest_version: u64,
477        flushed_entry_id: u64,
478    },
479    Metric {
480        data_manifest_version: u64,
481        data_flushed_entry_id: u64,
482        metadata_manifest_version: u64,
483        metadata_flushed_entry_id: u64,
484    },
485}
486
487impl RegionManifestInfo {
488    /// Creates a new [RegionManifestInfo] for mito2 engine.
489    pub fn mito(manifest_version: u64, flushed_entry_id: u64) -> Self {
490        Self::Mito {
491            manifest_version,
492            flushed_entry_id,
493        }
494    }
495
496    /// Creates a new [RegionManifestInfo] for metric engine.
497    pub fn metric(
498        data_manifest_version: u64,
499        data_flushed_entry_id: u64,
500        metadata_manifest_version: u64,
501        metadata_flushed_entry_id: u64,
502    ) -> Self {
503        Self::Metric {
504            data_manifest_version,
505            data_flushed_entry_id,
506            metadata_manifest_version,
507            metadata_flushed_entry_id,
508        }
509    }
510
511    /// Returns true if the region is a mito2 region.
512    pub fn is_mito(&self) -> bool {
513        matches!(self, RegionManifestInfo::Mito { .. })
514    }
515
516    /// Returns true if the region is a metric region.
517    pub fn is_metric(&self) -> bool {
518        matches!(self, RegionManifestInfo::Metric { .. })
519    }
520
521    /// Returns the flushed entry id of the data region.
522    pub fn data_flushed_entry_id(&self) -> u64 {
523        match self {
524            RegionManifestInfo::Mito {
525                flushed_entry_id, ..
526            } => *flushed_entry_id,
527            RegionManifestInfo::Metric {
528                data_flushed_entry_id,
529                ..
530            } => *data_flushed_entry_id,
531        }
532    }
533
534    /// Returns the manifest version of the data region.
535    pub fn data_manifest_version(&self) -> u64 {
536        match self {
537            RegionManifestInfo::Mito {
538                manifest_version, ..
539            } => *manifest_version,
540            RegionManifestInfo::Metric {
541                data_manifest_version,
542                ..
543            } => *data_manifest_version,
544        }
545    }
546
547    /// Returns the manifest version of the metadata region.
548    pub fn metadata_manifest_version(&self) -> Option<u64> {
549        match self {
550            RegionManifestInfo::Mito { .. } => None,
551            RegionManifestInfo::Metric {
552                metadata_manifest_version,
553                ..
554            } => Some(*metadata_manifest_version),
555        }
556    }
557
558    /// Returns the flushed entry id of the metadata region.
559    pub fn metadata_flushed_entry_id(&self) -> Option<u64> {
560        match self {
561            RegionManifestInfo::Mito { .. } => None,
562            RegionManifestInfo::Metric {
563                metadata_flushed_entry_id,
564                ..
565            } => Some(*metadata_flushed_entry_id),
566        }
567    }
568
569    /// Encodes a list of ([RegionId], [RegionManifestInfo]) to a byte array.
570    pub fn encode_list(manifest_infos: &[(RegionId, Self)]) -> serde_json::Result<Vec<u8>> {
571        serde_json::to_vec(manifest_infos)
572    }
573
574    /// Decodes a list of ([RegionId], [RegionManifestInfo]) from a byte array.
575    pub fn decode_list(value: &[u8]) -> serde_json::Result<Vec<(RegionId, Self)>> {
576        serde_json::from_slice(value)
577    }
578}
579
580impl Default for RegionManifestInfo {
581    fn default() -> Self {
582        Self::Mito {
583            manifest_version: 0,
584            flushed_entry_id: 0,
585        }
586    }
587}
588
589impl RegionStatistic {
590    /// Deserializes the region statistic to a byte array.
591    ///
592    /// Returns None if the deserialization fails.
593    pub fn deserialize_from_slice(value: &[u8]) -> Option<RegionStatistic> {
594        serde_json::from_slice(value).ok()
595    }
596
597    /// Serializes the region statistic to a byte array.
598    ///
599    /// Returns None if the serialization fails.
600    pub fn serialize_to_vec(&self) -> Option<Vec<u8>> {
601        serde_json::to_vec(self).ok()
602    }
603}
604
605impl RegionStatistic {
606    /// Returns the estimated disk size of the region.
607    pub fn estimated_disk_size(&self) -> u64 {
608        self.wal_size + self.sst_size + self.manifest_size + self.index_size
609    }
610}
611
612/// The response of syncing the manifest.
613#[derive(Debug)]
614pub enum SyncManifestResponse {
615    NotSupported,
616    Mito {
617        /// Indicates if the data region was synced.
618        synced: bool,
619    },
620    Metric {
621        /// Indicates if the metadata region was synced.
622        metadata_synced: bool,
623        /// Indicates if the data region was synced.
624        data_synced: bool,
625        /// The logical regions that were newly opened during the sync operation.
626        /// This only occurs after the metadata region has been successfully synced.
627        new_opened_logical_region_ids: Vec<RegionId>,
628    },
629}
630
631impl SyncManifestResponse {
632    /// Returns true if data region is synced.
633    pub fn is_data_synced(&self) -> bool {
634        match self {
635            SyncManifestResponse::NotSupported => false,
636            SyncManifestResponse::Mito { synced } => *synced,
637            SyncManifestResponse::Metric { data_synced, .. } => *data_synced,
638        }
639    }
640
641    /// Returns true if the engine is supported the sync operation.
642    pub fn is_supported(&self) -> bool {
643        matches!(self, SyncManifestResponse::NotSupported)
644    }
645
646    /// Returns true if the engine is a mito2 engine.
647    pub fn is_mito(&self) -> bool {
648        matches!(self, SyncManifestResponse::Mito { .. })
649    }
650
651    /// Returns true if the engine is a metric engine.
652    pub fn is_metric(&self) -> bool {
653        matches!(self, SyncManifestResponse::Metric { .. })
654    }
655
656    /// Returns the new opened logical region ids.
657    pub fn new_opened_logical_region_ids(self) -> Option<Vec<RegionId>> {
658        match self {
659            SyncManifestResponse::Metric {
660                new_opened_logical_region_ids,
661                ..
662            } => Some(new_opened_logical_region_ids),
663            _ => None,
664        }
665    }
666}
667
668#[async_trait]
669pub trait RegionEngine: Send + Sync {
670    /// Name of this engine
671    fn name(&self) -> &str;
672
673    /// Handles batch open region requests.
674    async fn handle_batch_open_requests(
675        &self,
676        parallelism: usize,
677        requests: Vec<(RegionId, RegionOpenRequest)>,
678    ) -> Result<BatchResponses, BoxedError> {
679        let semaphore = Arc::new(Semaphore::new(parallelism));
680        let mut tasks = Vec::with_capacity(requests.len());
681
682        for (region_id, request) in requests {
683            let semaphore_moved = semaphore.clone();
684
685            tasks.push(async move {
686                // Safety: semaphore must exist
687                let _permit = semaphore_moved.acquire().await.unwrap();
688                let result = self
689                    .handle_request(region_id, RegionRequest::Open(request))
690                    .await;
691                (region_id, result)
692            });
693        }
694
695        Ok(join_all(tasks).await)
696    }
697
698    async fn handle_batch_ddl_requests(
699        &self,
700        request: BatchRegionDdlRequest,
701    ) -> Result<RegionResponse, BoxedError> {
702        let requests = request.into_region_requests();
703
704        let mut affected_rows = 0;
705        let mut extensions = HashMap::new();
706
707        for (region_id, request) in requests {
708            let result = self.handle_request(region_id, request).await?;
709            affected_rows += result.affected_rows;
710            extensions.extend(result.extensions);
711        }
712
713        Ok(RegionResponse {
714            affected_rows,
715            extensions,
716            metadata: Vec::new(),
717        })
718    }
719
720    /// Handles non-query request to the region. Returns the count of affected rows.
721    async fn handle_request(
722        &self,
723        region_id: RegionId,
724        request: RegionRequest,
725    ) -> Result<RegionResponse, BoxedError>;
726
727    /// Returns the last sequence number of the region.
728    async fn get_last_seq_num(
729        &self,
730        region_id: RegionId,
731    ) -> Result<Option<SequenceNumber>, BoxedError>;
732
733    async fn get_region_sequences(
734        &self,
735        seqs: RegionSequencesRequest,
736    ) -> Result<HashMap<u64, u64>, BoxedError> {
737        let mut results = HashMap::with_capacity(seqs.region_ids.len());
738
739        for region_id in seqs.region_ids {
740            let seq = self.get_last_seq_num(region_id).await?.unwrap_or_default();
741            results.insert(region_id.as_u64(), seq);
742        }
743
744        Ok(results)
745    }
746
747    /// Handles query and return a scanner that can be used to scan the region concurrently.
748    async fn handle_query(
749        &self,
750        region_id: RegionId,
751        request: ScanRequest,
752    ) -> Result<RegionScannerRef, BoxedError>;
753
754    /// Retrieves region's metadata.
755    async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError>;
756
757    /// Retrieves region's statistic.
758    fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic>;
759
760    /// Stops the engine
761    async fn stop(&self) -> Result<(), BoxedError>;
762
763    /// Sets [RegionRole] for a region.
764    ///
765    /// The engine checks whether the region is writable before writing to the region. Setting
766    /// the region as readonly doesn't guarantee that write operations in progress will not
767    /// take effect.
768    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError>;
769
770    /// Syncs the region manifest to the given manifest version.
771    async fn sync_region(
772        &self,
773        region_id: RegionId,
774        manifest_info: RegionManifestInfo,
775    ) -> Result<SyncManifestResponse, BoxedError>;
776
777    /// Sets region role state gracefully.
778    ///
779    /// After the call returns, the engine ensures no more write operations will succeed in the region.
780    async fn set_region_role_state_gracefully(
781        &self,
782        region_id: RegionId,
783        region_role_state: SettableRegionRoleState,
784    ) -> Result<SetRegionRoleStateResponse, BoxedError>;
785
786    /// Indicates region role.
787    ///
788    /// Returns the `None` if the region is not found.
789    fn role(&self, region_id: RegionId) -> Option<RegionRole>;
790
791    fn as_any(&self) -> &dyn Any;
792}
793
794pub type RegionEngineRef = Arc<dyn RegionEngine>;
795
796/// A [RegionScanner] that only scans a single partition.
797pub struct SinglePartitionScanner {
798    stream: Mutex<Option<SendableRecordBatchStream>>,
799    schema: SchemaRef,
800    properties: ScannerProperties,
801    metadata: RegionMetadataRef,
802}
803
804impl SinglePartitionScanner {
805    /// Creates a new [SinglePartitionScanner] with the given stream and metadata.
806    pub fn new(
807        stream: SendableRecordBatchStream,
808        append_mode: bool,
809        metadata: RegionMetadataRef,
810    ) -> Self {
811        let schema = stream.schema();
812        Self {
813            stream: Mutex::new(Some(stream)),
814            schema,
815            properties: ScannerProperties::default().with_append_mode(append_mode),
816            metadata,
817        }
818    }
819}
820
821impl Debug for SinglePartitionScanner {
822    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
823        write!(f, "SinglePartitionScanner: <SendableRecordBatchStream>")
824    }
825}
826
827impl RegionScanner for SinglePartitionScanner {
828    fn properties(&self) -> &ScannerProperties {
829        &self.properties
830    }
831
832    fn schema(&self) -> SchemaRef {
833        self.schema.clone()
834    }
835
836    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
837        self.properties.prepare(request);
838        Ok(())
839    }
840
841    fn scan_partition(
842        &self,
843        _ctx: &QueryScanContext,
844        _metrics_set: &ExecutionPlanMetricsSet,
845        _partition: usize,
846    ) -> Result<SendableRecordBatchStream, BoxedError> {
847        let mut stream = self.stream.lock().unwrap();
848        let result = stream
849            .take()
850            .or_else(|| Some(Box::pin(EmptyRecordBatchStream::new(self.schema.clone()))));
851        Ok(result.unwrap())
852    }
853
854    fn has_predicate(&self) -> bool {
855        false
856    }
857
858    fn metadata(&self) -> RegionMetadataRef {
859        self.metadata.clone()
860    }
861
862    fn set_logical_region(&mut self, logical_region: bool) {
863        self.properties.set_logical_region(logical_region);
864    }
865}
866
867impl DisplayAs for SinglePartitionScanner {
868    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
869        write!(f, "{:?}", self)
870    }
871}