meta_srv/
metasrv.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16
17use std::fmt::{self, Display};
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::sync::{Arc, Mutex, RwLock};
20use std::time::Duration;
21
22use clap::ValueEnum;
23use common_base::readable_size::ReadableSize;
24use common_base::Plugins;
25use common_config::Configurable;
26use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
27use common_meta::cache_invalidator::CacheInvalidatorRef;
28use common_meta::ddl::ProcedureExecutorRef;
29use common_meta::distributed_time_constants;
30use common_meta::key::maintenance::MaintenanceModeManagerRef;
31use common_meta::key::TableMetadataManagerRef;
32use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
33use common_meta::leadership_notifier::{
34    LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef,
35};
36use common_meta::node_expiry_listener::NodeExpiryListener;
37use common_meta::peer::Peer;
38use common_meta::region_keeper::MemoryRegionKeeperRef;
39use common_meta::region_registry::LeaderRegionRegistryRef;
40use common_meta::wal_options_allocator::WalOptionsAllocatorRef;
41use common_options::datanode::DatanodeClientOptions;
42use common_procedure::options::ProcedureConfig;
43use common_procedure::ProcedureManagerRef;
44use common_telemetry::logging::{LoggingOptions, TracingOptions};
45use common_telemetry::{error, info, warn};
46use common_wal::config::MetasrvWalConfig;
47use serde::{Deserialize, Serialize};
48use servers::export_metrics::ExportMetricsOption;
49use servers::http::HttpOptions;
50use snafu::{OptionExt, ResultExt};
51use store_api::storage::RegionId;
52use table::metadata::TableId;
53use tokio::sync::broadcast::error::RecvError;
54
55use crate::cluster::MetaPeerClientRef;
56use crate::election::{Election, LeaderChangeMessage};
57use crate::error::{
58    self, InitMetadataSnafu, KvBackendSnafu, Result, StartProcedureManagerSnafu,
59    StartTelemetryTaskSnafu, StopProcedureManagerSnafu,
60};
61use crate::failure_detector::PhiAccrualFailureDetectorOptions;
62use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatHandlerGroupRef};
63use crate::lease::lookup_datanode_peer;
64use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
65use crate::procedure::wal_prune::manager::WalPruneTickerRef;
66use crate::procedure::ProcedureManagerListenerAdapter;
67use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
68use crate::region::supervisor::RegionSupervisorTickerRef;
69use crate::selector::{RegionStatAwareSelector, Selector, SelectorType};
70use crate::service::mailbox::MailboxRef;
71use crate::service::store::cached_kv::LeaderCachedKvBackend;
72use crate::state::{become_follower, become_leader, StateRef};
73
74pub const TABLE_ID_SEQ: &str = "table_id";
75pub const FLOW_ID_SEQ: &str = "flow_id";
76pub const METASRV_HOME: &str = "./greptimedb_data/metasrv";
77
78#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
79pub const DEFAULT_META_TABLE_NAME: &str = "greptime_metakv";
80#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
81pub const DEFAULT_META_ELECTION_LOCK_ID: u64 = 1;
82
83// The datastores that implements metadata kvbackend.
84#[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)]
85#[serde(rename_all = "snake_case")]
86pub enum BackendImpl {
87    // Etcd as metadata storage.
88    #[default]
89    EtcdStore,
90    // In memory metadata storage - mostly used for testing.
91    MemoryStore,
92    #[cfg(feature = "pg_kvbackend")]
93    // Postgres as metadata storage.
94    PostgresStore,
95    #[cfg(feature = "mysql_kvbackend")]
96    // MySql as metadata storage.
97    MysqlStore,
98}
99
100#[derive(Clone, PartialEq, Serialize, Deserialize)]
101#[serde(default)]
102pub struct MetasrvOptions {
103    /// The address the server listens on.
104    pub bind_addr: String,
105    /// The address the server advertises to the clients.
106    pub server_addr: String,
107    /// The address of the store, e.g., etcd.
108    pub store_addrs: Vec<String>,
109    /// The type of selector.
110    pub selector: SelectorType,
111    /// Whether to use the memory store.
112    pub use_memory_store: bool,
113    /// Whether to enable region failover.
114    pub enable_region_failover: bool,
115    /// Whether to allow region failover on local WAL.
116    ///
117    /// If it's true, the region failover will be allowed even if the local WAL is used.
118    /// Note that this option is not recommended to be set to true, because it may lead to data loss during failover.
119    pub allow_region_failover_on_local_wal: bool,
120    /// The HTTP server options.
121    pub http: HttpOptions,
122    /// The logging options.
123    pub logging: LoggingOptions,
124    /// The procedure options.
125    pub procedure: ProcedureConfig,
126    /// The failure detector options.
127    pub failure_detector: PhiAccrualFailureDetectorOptions,
128    /// The datanode options.
129    pub datanode: DatanodeClientOptions,
130    /// Whether to enable telemetry.
131    pub enable_telemetry: bool,
132    /// The data home directory.
133    pub data_home: String,
134    /// The WAL options.
135    pub wal: MetasrvWalConfig,
136    /// The metrics export options.
137    pub export_metrics: ExportMetricsOption,
138    /// The store key prefix. If it is not empty, all keys in the store will be prefixed with it.
139    /// This is useful when multiple metasrv clusters share the same store.
140    pub store_key_prefix: String,
141    /// The max operations per txn
142    ///
143    /// This value is usually limited by which store is used for the `KvBackend`.
144    /// For example, if using etcd, this value should ensure that it is less than
145    /// or equal to the `--max-txn-ops` option value of etcd.
146    ///
147    /// TODO(jeremy): Currently, this option only affects the etcd store, but it may
148    /// also affect other stores in the future. In other words, each store needs to
149    /// limit the number of operations in a txn because an infinitely large txn could
150    /// potentially block other operations.
151    pub max_txn_ops: usize,
152    /// The factor that determines how often statistics should be flushed,
153    /// based on the number of received heartbeats. When the number of heartbeats
154    /// reaches this factor, a flush operation is triggered.
155    pub flush_stats_factor: usize,
156    /// The tracing options.
157    pub tracing: TracingOptions,
158    /// The datastore for kv metadata.
159    pub backend: BackendImpl,
160    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
161    /// Table name of rds kv backend.
162    pub meta_table_name: String,
163    #[cfg(feature = "pg_kvbackend")]
164    /// Lock id for meta kv election. Only effect when using pg_kvbackend.
165    pub meta_election_lock_id: u64,
166    #[serde(with = "humantime_serde")]
167    pub node_max_idle_time: Duration,
168}
169
170impl fmt::Debug for MetasrvOptions {
171    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172        let mut debug_struct = f.debug_struct("MetasrvOptions");
173        debug_struct
174            .field("bind_addr", &self.bind_addr)
175            .field("server_addr", &self.server_addr)
176            .field("store_addrs", &self.sanitize_store_addrs())
177            .field("selector", &self.selector)
178            .field("use_memory_store", &self.use_memory_store)
179            .field("enable_region_failover", &self.enable_region_failover)
180            .field(
181                "allow_region_failover_on_local_wal",
182                &self.allow_region_failover_on_local_wal,
183            )
184            .field("http", &self.http)
185            .field("logging", &self.logging)
186            .field("procedure", &self.procedure)
187            .field("failure_detector", &self.failure_detector)
188            .field("datanode", &self.datanode)
189            .field("enable_telemetry", &self.enable_telemetry)
190            .field("data_home", &self.data_home)
191            .field("wal", &self.wal)
192            .field("export_metrics", &self.export_metrics)
193            .field("store_key_prefix", &self.store_key_prefix)
194            .field("max_txn_ops", &self.max_txn_ops)
195            .field("flush_stats_factor", &self.flush_stats_factor)
196            .field("tracing", &self.tracing)
197            .field("backend", &self.backend);
198
199        #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
200        debug_struct.field("meta_table_name", &self.meta_table_name);
201
202        #[cfg(feature = "pg_kvbackend")]
203        debug_struct.field("meta_election_lock_id", &self.meta_election_lock_id);
204
205        debug_struct
206            .field("node_max_idle_time", &self.node_max_idle_time)
207            .finish()
208    }
209}
210
211const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
212
213impl Default for MetasrvOptions {
214    fn default() -> Self {
215        Self {
216            bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
217            // If server_addr is not set, the server will use the local ip address as the server address.
218            server_addr: String::new(),
219            store_addrs: vec!["127.0.0.1:2379".to_string()],
220            selector: SelectorType::default(),
221            use_memory_store: false,
222            enable_region_failover: false,
223            allow_region_failover_on_local_wal: false,
224            http: HttpOptions::default(),
225            logging: LoggingOptions {
226                dir: format!("{METASRV_HOME}/logs"),
227                ..Default::default()
228            },
229            procedure: ProcedureConfig {
230                max_retry_times: 12,
231                retry_delay: Duration::from_millis(500),
232                // The etcd the maximum size of any request is 1.5 MiB
233                // 1500KiB = 1536KiB (1.5MiB) - 36KiB (reserved size of key)
234                max_metadata_value_size: Some(ReadableSize::kb(1500)),
235                max_running_procedures: 128,
236            },
237            failure_detector: PhiAccrualFailureDetectorOptions::default(),
238            datanode: DatanodeClientOptions::default(),
239            enable_telemetry: true,
240            data_home: METASRV_HOME.to_string(),
241            wal: MetasrvWalConfig::default(),
242            export_metrics: ExportMetricsOption::default(),
243            store_key_prefix: String::new(),
244            max_txn_ops: 128,
245            flush_stats_factor: 3,
246            tracing: TracingOptions::default(),
247            backend: BackendImpl::EtcdStore,
248            #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
249            meta_table_name: DEFAULT_META_TABLE_NAME.to_string(),
250            #[cfg(feature = "pg_kvbackend")]
251            meta_election_lock_id: DEFAULT_META_ELECTION_LOCK_ID,
252            node_max_idle_time: Duration::from_secs(24 * 60 * 60),
253        }
254    }
255}
256
257impl Configurable for MetasrvOptions {
258    fn env_list_keys() -> Option<&'static [&'static str]> {
259        Some(&["wal.broker_endpoints", "store_addrs"])
260    }
261}
262
263impl MetasrvOptions {
264    /// Detect server address.
265    #[cfg(not(target_os = "android"))]
266    pub fn detect_server_addr(&mut self) {
267        if self.server_addr.is_empty() {
268            match local_ip_address::local_ip() {
269                Ok(ip) => {
270                    let detected_addr = format!(
271                        "{}:{}",
272                        ip,
273                        self.bind_addr
274                            .split(':')
275                            .nth(1)
276                            .unwrap_or(DEFAULT_METASRV_ADDR_PORT)
277                    );
278                    info!("Using detected: {} as server address", detected_addr);
279                    self.server_addr = detected_addr;
280                }
281                Err(e) => {
282                    error!("Failed to detect local ip address: {}", e);
283                }
284            }
285        }
286    }
287
288    #[cfg(target_os = "android")]
289    pub fn detect_server_addr(&mut self) {
290        if self.server_addr.is_empty() {
291            common_telemetry::debug!("detect local IP is not supported on Android");
292        }
293    }
294
295    fn sanitize_store_addrs(&self) -> Vec<String> {
296        self.store_addrs
297            .iter()
298            .map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
299            .collect()
300    }
301}
302
303pub struct MetasrvInfo {
304    pub server_addr: String,
305}
306#[derive(Clone)]
307pub struct Context {
308    pub server_addr: String,
309    pub in_memory: ResettableKvBackendRef,
310    pub kv_backend: KvBackendRef,
311    pub leader_cached_kv_backend: ResettableKvBackendRef,
312    pub meta_peer_client: MetaPeerClientRef,
313    pub mailbox: MailboxRef,
314    pub election: Option<ElectionRef>,
315    pub is_infancy: bool,
316    pub table_metadata_manager: TableMetadataManagerRef,
317    pub cache_invalidator: CacheInvalidatorRef,
318    pub leader_region_registry: LeaderRegionRegistryRef,
319}
320
321impl Context {
322    pub fn reset_in_memory(&self) {
323        self.in_memory.reset();
324        self.leader_region_registry.reset();
325    }
326}
327
328/// The value of the leader. It is used to store the leader's address.
329pub struct LeaderValue(pub String);
330
331impl<T: AsRef<[u8]>> From<T> for LeaderValue {
332    fn from(value: T) -> Self {
333        let string = String::from_utf8_lossy(value.as_ref());
334        Self(string.to_string())
335    }
336}
337
338#[derive(Debug, Clone, Serialize, Deserialize)]
339pub struct MetasrvNodeInfo {
340    // The metasrv's address
341    pub addr: String,
342    // The node build version
343    pub version: String,
344    // The node build git commit hash
345    pub git_commit: String,
346    // The node start timestamp in milliseconds
347    pub start_time_ms: u64,
348}
349
350impl From<MetasrvNodeInfo> for api::v1::meta::MetasrvNodeInfo {
351    fn from(node_info: MetasrvNodeInfo) -> Self {
352        Self {
353            peer: Some(api::v1::meta::Peer {
354                addr: node_info.addr,
355                ..Default::default()
356            }),
357            version: node_info.version,
358            git_commit: node_info.git_commit,
359            start_time_ms: node_info.start_time_ms,
360        }
361    }
362}
363
364#[derive(Clone, Copy)]
365pub enum SelectTarget {
366    Datanode,
367    Flownode,
368}
369
370impl Display for SelectTarget {
371    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
372        match self {
373            SelectTarget::Datanode => write!(f, "datanode"),
374            SelectTarget::Flownode => write!(f, "flownode"),
375        }
376    }
377}
378
379#[derive(Clone)]
380pub struct SelectorContext {
381    pub server_addr: String,
382    pub datanode_lease_secs: u64,
383    pub flownode_lease_secs: u64,
384    pub kv_backend: KvBackendRef,
385    pub meta_peer_client: MetaPeerClientRef,
386    pub table_id: Option<TableId>,
387}
388
389pub type SelectorRef = Arc<dyn Selector<Context = SelectorContext, Output = Vec<Peer>>>;
390pub type RegionStatAwareSelectorRef =
391    Arc<dyn RegionStatAwareSelector<Context = SelectorContext, Output = Vec<(RegionId, Peer)>>>;
392pub type ElectionRef = Arc<dyn Election<Leader = LeaderValue>>;
393
394pub struct MetaStateHandler {
395    subscribe_manager: Option<SubscriptionManagerRef>,
396    greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
397    leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
398    leadership_change_notifier: LeadershipChangeNotifier,
399    state: StateRef,
400}
401
402impl MetaStateHandler {
403    pub async fn on_leader_start(&self) {
404        self.state.write().unwrap().next_state(become_leader(false));
405
406        if let Err(e) = self.leader_cached_kv_backend.load().await {
407            error!(e; "Failed to load kv into leader cache kv store");
408        } else {
409            self.state.write().unwrap().next_state(become_leader(true));
410        }
411
412        self.leadership_change_notifier
413            .notify_on_leader_start()
414            .await;
415
416        self.greptimedb_telemetry_task.should_report(true);
417    }
418
419    pub async fn on_leader_stop(&self) {
420        self.state.write().unwrap().next_state(become_follower());
421
422        self.leadership_change_notifier
423            .notify_on_leader_stop()
424            .await;
425
426        // Suspends reporting.
427        self.greptimedb_telemetry_task.should_report(false);
428
429        if let Some(sub_manager) = self.subscribe_manager.clone() {
430            info!("Leader changed, un_subscribe all");
431            if let Err(e) = sub_manager.unsubscribe_all() {
432                error!(e; "Failed to un_subscribe all");
433            }
434        }
435    }
436}
437
438pub struct Metasrv {
439    state: StateRef,
440    started: Arc<AtomicBool>,
441    start_time_ms: u64,
442    options: MetasrvOptions,
443    // It is only valid at the leader node and is used to temporarily
444    // store some data that will not be persisted.
445    in_memory: ResettableKvBackendRef,
446    kv_backend: KvBackendRef,
447    leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
448    meta_peer_client: MetaPeerClientRef,
449    // The selector is used to select a target datanode.
450    selector: SelectorRef,
451    // The flow selector is used to select a target flownode.
452    flow_selector: SelectorRef,
453    handler_group: RwLock<Option<HeartbeatHandlerGroupRef>>,
454    handler_group_builder: Mutex<Option<HeartbeatHandlerGroupBuilder>>,
455    election: Option<ElectionRef>,
456    procedure_manager: ProcedureManagerRef,
457    mailbox: MailboxRef,
458    procedure_executor: ProcedureExecutorRef,
459    wal_options_allocator: WalOptionsAllocatorRef,
460    table_metadata_manager: TableMetadataManagerRef,
461    maintenance_mode_manager: MaintenanceModeManagerRef,
462    memory_region_keeper: MemoryRegionKeeperRef,
463    greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
464    region_migration_manager: RegionMigrationManagerRef,
465    region_supervisor_ticker: Option<RegionSupervisorTickerRef>,
466    cache_invalidator: CacheInvalidatorRef,
467    leader_region_registry: LeaderRegionRegistryRef,
468    wal_prune_ticker: Option<WalPruneTickerRef>,
469
470    plugins: Plugins,
471}
472
473impl Metasrv {
474    pub async fn try_start(&self) -> Result<()> {
475        if self
476            .started
477            .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
478            .is_err()
479        {
480            warn!("Metasrv already started");
481            return Ok(());
482        }
483
484        let handler_group_builder =
485            self.handler_group_builder
486                .lock()
487                .unwrap()
488                .take()
489                .context(error::UnexpectedSnafu {
490                    violated: "expected heartbeat handler group builder",
491                })?;
492        *self.handler_group.write().unwrap() = Some(Arc::new(handler_group_builder.build()?));
493
494        // Creates default schema if not exists
495        self.table_metadata_manager
496            .init()
497            .await
498            .context(InitMetadataSnafu)?;
499
500        if let Some(election) = self.election() {
501            let procedure_manager = self.procedure_manager.clone();
502            let in_memory = self.in_memory.clone();
503            let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
504            let subscribe_manager = self.subscription_manager();
505            let mut rx = election.subscribe_leader_change();
506            let greptimedb_telemetry_task = self.greptimedb_telemetry_task.clone();
507            greptimedb_telemetry_task
508                .start()
509                .context(StartTelemetryTaskSnafu)?;
510
511            // Builds leadership change notifier.
512            let mut leadership_change_notifier = LeadershipChangeNotifier::default();
513            leadership_change_notifier.add_listener(self.wal_options_allocator.clone());
514            leadership_change_notifier
515                .add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager)));
516            leadership_change_notifier.add_listener(Arc::new(NodeExpiryListener::new(
517                self.options.node_max_idle_time,
518                self.in_memory.clone(),
519            )));
520            if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker {
521                leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _);
522            }
523            if let Some(wal_prune_ticker) = &self.wal_prune_ticker {
524                leadership_change_notifier.add_listener(wal_prune_ticker.clone() as _);
525            }
526            if let Some(customizer) = self.plugins.get::<LeadershipChangeNotifierCustomizerRef>() {
527                customizer.customize(&mut leadership_change_notifier);
528            }
529
530            let state_handler = MetaStateHandler {
531                greptimedb_telemetry_task,
532                subscribe_manager,
533                state: self.state.clone(),
534                leader_cached_kv_backend: leader_cached_kv_backend.clone(),
535                leadership_change_notifier,
536            };
537            let _handle = common_runtime::spawn_global(async move {
538                loop {
539                    match rx.recv().await {
540                        Ok(msg) => {
541                            in_memory.reset();
542                            leader_cached_kv_backend.reset();
543                            info!("Leader's cache has bean cleared on leader change: {msg}");
544                            match msg {
545                                LeaderChangeMessage::Elected(_) => {
546                                    state_handler.on_leader_start().await;
547                                }
548                                LeaderChangeMessage::StepDown(leader) => {
549                                    error!("Leader :{:?} step down", leader);
550
551                                    state_handler.on_leader_stop().await;
552                                }
553                            }
554                        }
555                        Err(RecvError::Closed) => {
556                            error!("Not expected, is leader election loop still running?");
557                            break;
558                        }
559                        Err(RecvError::Lagged(_)) => {
560                            break;
561                        }
562                    }
563                }
564
565                state_handler.on_leader_stop().await;
566            });
567
568            // Register candidate and keep lease in background.
569            {
570                let election = election.clone();
571                let started = self.started.clone();
572                let node_info = self.node_info();
573                let _handle = common_runtime::spawn_global(async move {
574                    while started.load(Ordering::Relaxed) {
575                        let res = election.register_candidate(&node_info).await;
576                        if let Err(e) = res {
577                            warn!(e; "Metasrv register candidate error");
578                        }
579                    }
580                });
581            }
582
583            // Campaign
584            {
585                let election = election.clone();
586                let started = self.started.clone();
587                let _handle = common_runtime::spawn_global(async move {
588                    while started.load(Ordering::Relaxed) {
589                        let res = election.campaign().await;
590                        if let Err(e) = res {
591                            warn!(e; "Metasrv election error");
592                        }
593                        info!("Metasrv re-initiate election");
594                    }
595                    info!("Metasrv stopped");
596                });
597            }
598        } else {
599            warn!(
600                "Ensure only one instance of Metasrv is running, as there is no election service."
601            );
602
603            if let Err(e) = self.wal_options_allocator.start().await {
604                error!(e; "Failed to start wal options allocator");
605            }
606            // Always load kv into cached kv store.
607            self.leader_cached_kv_backend
608                .load()
609                .await
610                .context(KvBackendSnafu)?;
611            self.procedure_manager
612                .start()
613                .await
614                .context(StartProcedureManagerSnafu)?;
615        }
616
617        info!("Metasrv started");
618
619        Ok(())
620    }
621
622    pub async fn shutdown(&self) -> Result<()> {
623        self.started.store(false, Ordering::Relaxed);
624        self.procedure_manager
625            .stop()
626            .await
627            .context(StopProcedureManagerSnafu)
628    }
629
630    pub fn start_time_ms(&self) -> u64 {
631        self.start_time_ms
632    }
633
634    pub fn node_info(&self) -> MetasrvNodeInfo {
635        let build_info = common_version::build_info();
636        MetasrvNodeInfo {
637            addr: self.options().server_addr.clone(),
638            version: build_info.version.to_string(),
639            git_commit: build_info.commit_short.to_string(),
640            start_time_ms: self.start_time_ms(),
641        }
642    }
643
644    /// Lookup a peer by peer_id, return it only when it's alive.
645    pub(crate) async fn lookup_peer(&self, peer_id: u64) -> Result<Option<Peer>> {
646        lookup_datanode_peer(
647            peer_id,
648            &self.meta_peer_client,
649            distributed_time_constants::DATANODE_LEASE_SECS,
650        )
651        .await
652    }
653
654    pub fn options(&self) -> &MetasrvOptions {
655        &self.options
656    }
657
658    pub fn in_memory(&self) -> &ResettableKvBackendRef {
659        &self.in_memory
660    }
661
662    pub fn kv_backend(&self) -> &KvBackendRef {
663        &self.kv_backend
664    }
665
666    pub fn meta_peer_client(&self) -> &MetaPeerClientRef {
667        &self.meta_peer_client
668    }
669
670    pub fn selector(&self) -> &SelectorRef {
671        &self.selector
672    }
673
674    pub fn flow_selector(&self) -> &SelectorRef {
675        &self.flow_selector
676    }
677
678    pub fn handler_group(&self) -> Option<HeartbeatHandlerGroupRef> {
679        self.handler_group.read().unwrap().clone()
680    }
681
682    pub fn election(&self) -> Option<&ElectionRef> {
683        self.election.as_ref()
684    }
685
686    pub fn mailbox(&self) -> &MailboxRef {
687        &self.mailbox
688    }
689
690    pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
691        &self.procedure_executor
692    }
693
694    pub fn procedure_manager(&self) -> &ProcedureManagerRef {
695        &self.procedure_manager
696    }
697
698    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
699        &self.table_metadata_manager
700    }
701
702    pub fn maintenance_mode_manager(&self) -> &MaintenanceModeManagerRef {
703        &self.maintenance_mode_manager
704    }
705
706    pub fn memory_region_keeper(&self) -> &MemoryRegionKeeperRef {
707        &self.memory_region_keeper
708    }
709
710    pub fn region_migration_manager(&self) -> &RegionMigrationManagerRef {
711        &self.region_migration_manager
712    }
713
714    pub fn publish(&self) -> Option<PublisherRef> {
715        self.plugins.get::<PublisherRef>()
716    }
717
718    pub fn subscription_manager(&self) -> Option<SubscriptionManagerRef> {
719        self.plugins.get::<SubscriptionManagerRef>()
720    }
721
722    pub fn plugins(&self) -> &Plugins {
723        &self.plugins
724    }
725
726    #[inline]
727    pub fn new_ctx(&self) -> Context {
728        let server_addr = self.options().server_addr.clone();
729        let in_memory = self.in_memory.clone();
730        let kv_backend = self.kv_backend.clone();
731        let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
732        let meta_peer_client = self.meta_peer_client.clone();
733        let mailbox = self.mailbox.clone();
734        let election = self.election.clone();
735        let table_metadata_manager = self.table_metadata_manager.clone();
736        let cache_invalidator = self.cache_invalidator.clone();
737        let leader_region_registry = self.leader_region_registry.clone();
738
739        Context {
740            server_addr,
741            in_memory,
742            kv_backend,
743            leader_cached_kv_backend,
744            meta_peer_client,
745            mailbox,
746            election,
747            is_infancy: false,
748            table_metadata_manager,
749            cache_invalidator,
750            leader_region_registry,
751        }
752    }
753}