meta_srv/
metasrv.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16
17use std::fmt::{self, Display};
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::sync::{Arc, Mutex, RwLock};
20use std::time::Duration;
21
22use clap::ValueEnum;
23use common_base::readable_size::ReadableSize;
24use common_base::Plugins;
25use common_config::{Configurable, DEFAULT_DATA_HOME};
26use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
27use common_meta::cache_invalidator::CacheInvalidatorRef;
28use common_meta::ddl::ProcedureExecutorRef;
29use common_meta::distributed_time_constants;
30use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
31use common_meta::key::TableMetadataManagerRef;
32use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
33use common_meta::leadership_notifier::{
34    LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef,
35};
36use common_meta::node_expiry_listener::NodeExpiryListener;
37use common_meta::peer::Peer;
38use common_meta::region_keeper::MemoryRegionKeeperRef;
39use common_meta::region_registry::LeaderRegionRegistryRef;
40use common_meta::wal_options_allocator::WalOptionsAllocatorRef;
41use common_options::datanode::DatanodeClientOptions;
42use common_procedure::options::ProcedureConfig;
43use common_procedure::ProcedureManagerRef;
44use common_telemetry::logging::{LoggingOptions, TracingOptions};
45use common_telemetry::{error, info, warn};
46use common_wal::config::MetasrvWalConfig;
47use serde::{Deserialize, Serialize};
48use servers::export_metrics::ExportMetricsOption;
49use servers::grpc::GrpcOptions;
50use servers::http::HttpOptions;
51use snafu::{OptionExt, ResultExt};
52use store_api::storage::RegionId;
53use table::metadata::TableId;
54use tokio::sync::broadcast::error::RecvError;
55
56use crate::cluster::MetaPeerClientRef;
57use crate::election::{Election, LeaderChangeMessage};
58use crate::error::{
59    self, InitMetadataSnafu, KvBackendSnafu, Result, StartProcedureManagerSnafu,
60    StartTelemetryTaskSnafu, StopProcedureManagerSnafu,
61};
62use crate::failure_detector::PhiAccrualFailureDetectorOptions;
63use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatHandlerGroupRef};
64use crate::lease::lookup_datanode_peer;
65use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
66use crate::procedure::wal_prune::manager::WalPruneTickerRef;
67use crate::procedure::ProcedureManagerListenerAdapter;
68use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
69use crate::region::supervisor::RegionSupervisorTickerRef;
70use crate::selector::{RegionStatAwareSelector, Selector, SelectorType};
71use crate::service::mailbox::MailboxRef;
72use crate::service::store::cached_kv::LeaderCachedKvBackend;
73use crate::state::{become_follower, become_leader, StateRef};
74
75pub const TABLE_ID_SEQ: &str = "table_id";
76pub const FLOW_ID_SEQ: &str = "flow_id";
77pub const METASRV_DATA_DIR: &str = "metasrv";
78
79// The datastores that implements metadata kvbackend.
80#[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)]
81#[serde(rename_all = "snake_case")]
82pub enum BackendImpl {
83    // Etcd as metadata storage.
84    #[default]
85    EtcdStore,
86    // In memory metadata storage - mostly used for testing.
87    MemoryStore,
88    #[cfg(feature = "pg_kvbackend")]
89    // Postgres as metadata storage.
90    PostgresStore,
91    #[cfg(feature = "mysql_kvbackend")]
92    // MySql as metadata storage.
93    MysqlStore,
94}
95
96#[derive(Clone, PartialEq, Serialize, Deserialize)]
97#[serde(default)]
98pub struct MetasrvOptions {
99    /// The address the server listens on.
100    #[deprecated(note = "Use grpc.bind_addr instead")]
101    pub bind_addr: String,
102    /// The address the server advertises to the clients.
103    #[deprecated(note = "Use grpc.server_addr instead")]
104    pub server_addr: String,
105    /// The address of the store, e.g., etcd.
106    pub store_addrs: Vec<String>,
107    /// The type of selector.
108    pub selector: SelectorType,
109    /// Whether to use the memory store.
110    pub use_memory_store: bool,
111    /// Whether to enable region failover.
112    pub enable_region_failover: bool,
113    /// The delay before starting region failure detection.
114    /// This delay helps prevent Metasrv from triggering unnecessary region failovers before all Datanodes are fully started.
115    /// Especially useful when the cluster is not deployed with GreptimeDB Operator and maintenance mode is not enabled.
116    #[serde(with = "humantime_serde")]
117    pub region_failure_detector_initialization_delay: Duration,
118    /// Whether to allow region failover on local WAL.
119    ///
120    /// If it's true, the region failover will be allowed even if the local WAL is used.
121    /// Note that this option is not recommended to be set to true, because it may lead to data loss during failover.
122    pub allow_region_failover_on_local_wal: bool,
123    pub grpc: GrpcOptions,
124    /// The HTTP server options.
125    pub http: HttpOptions,
126    /// The logging options.
127    pub logging: LoggingOptions,
128    /// The procedure options.
129    pub procedure: ProcedureConfig,
130    /// The failure detector options.
131    pub failure_detector: PhiAccrualFailureDetectorOptions,
132    /// The datanode options.
133    pub datanode: DatanodeClientOptions,
134    /// Whether to enable telemetry.
135    pub enable_telemetry: bool,
136    /// The data home directory.
137    pub data_home: String,
138    /// The WAL options.
139    pub wal: MetasrvWalConfig,
140    /// The metrics export options.
141    pub export_metrics: ExportMetricsOption,
142    /// The store key prefix. If it is not empty, all keys in the store will be prefixed with it.
143    /// This is useful when multiple metasrv clusters share the same store.
144    pub store_key_prefix: String,
145    /// The max operations per txn
146    ///
147    /// This value is usually limited by which store is used for the `KvBackend`.
148    /// For example, if using etcd, this value should ensure that it is less than
149    /// or equal to the `--max-txn-ops` option value of etcd.
150    ///
151    /// TODO(jeremy): Currently, this option only affects the etcd store, but it may
152    /// also affect other stores in the future. In other words, each store needs to
153    /// limit the number of operations in a txn because an infinitely large txn could
154    /// potentially block other operations.
155    pub max_txn_ops: usize,
156    /// The factor that determines how often statistics should be flushed,
157    /// based on the number of received heartbeats. When the number of heartbeats
158    /// reaches this factor, a flush operation is triggered.
159    pub flush_stats_factor: usize,
160    /// The tracing options.
161    pub tracing: TracingOptions,
162    /// The datastore for kv metadata.
163    pub backend: BackendImpl,
164    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
165    /// Table name of rds kv backend.
166    pub meta_table_name: String,
167    #[cfg(feature = "pg_kvbackend")]
168    /// Lock id for meta kv election. Only effect when using pg_kvbackend.
169    pub meta_election_lock_id: u64,
170    #[serde(with = "humantime_serde")]
171    pub node_max_idle_time: Duration,
172}
173
174impl fmt::Debug for MetasrvOptions {
175    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176        let mut debug_struct = f.debug_struct("MetasrvOptions");
177        debug_struct
178            .field("store_addrs", &self.sanitize_store_addrs())
179            .field("selector", &self.selector)
180            .field("use_memory_store", &self.use_memory_store)
181            .field("enable_region_failover", &self.enable_region_failover)
182            .field(
183                "allow_region_failover_on_local_wal",
184                &self.allow_region_failover_on_local_wal,
185            )
186            .field("grpc", &self.grpc)
187            .field("http", &self.http)
188            .field("logging", &self.logging)
189            .field("procedure", &self.procedure)
190            .field("failure_detector", &self.failure_detector)
191            .field("datanode", &self.datanode)
192            .field("enable_telemetry", &self.enable_telemetry)
193            .field("data_home", &self.data_home)
194            .field("wal", &self.wal)
195            .field("export_metrics", &self.export_metrics)
196            .field("store_key_prefix", &self.store_key_prefix)
197            .field("max_txn_ops", &self.max_txn_ops)
198            .field("flush_stats_factor", &self.flush_stats_factor)
199            .field("tracing", &self.tracing)
200            .field("backend", &self.backend);
201
202        #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
203        debug_struct.field("meta_table_name", &self.meta_table_name);
204
205        #[cfg(feature = "pg_kvbackend")]
206        debug_struct.field("meta_election_lock_id", &self.meta_election_lock_id);
207
208        debug_struct
209            .field("node_max_idle_time", &self.node_max_idle_time)
210            .finish()
211    }
212}
213
214const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
215
216impl Default for MetasrvOptions {
217    fn default() -> Self {
218        Self {
219            #[allow(deprecated)]
220            bind_addr: String::new(),
221            #[allow(deprecated)]
222            server_addr: String::new(),
223            store_addrs: vec!["127.0.0.1:2379".to_string()],
224            selector: SelectorType::default(),
225            use_memory_store: false,
226            enable_region_failover: false,
227            region_failure_detector_initialization_delay: Duration::from_secs(10 * 60),
228            allow_region_failover_on_local_wal: false,
229            grpc: GrpcOptions {
230                bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
231                ..Default::default()
232            },
233            http: HttpOptions::default(),
234            logging: LoggingOptions::default(),
235            procedure: ProcedureConfig {
236                max_retry_times: 12,
237                retry_delay: Duration::from_millis(500),
238                // The etcd the maximum size of any request is 1.5 MiB
239                // 1500KiB = 1536KiB (1.5MiB) - 36KiB (reserved size of key)
240                max_metadata_value_size: Some(ReadableSize::kb(1500)),
241                max_running_procedures: 128,
242            },
243            failure_detector: PhiAccrualFailureDetectorOptions::default(),
244            datanode: DatanodeClientOptions::default(),
245            enable_telemetry: true,
246            data_home: DEFAULT_DATA_HOME.to_string(),
247            wal: MetasrvWalConfig::default(),
248            export_metrics: ExportMetricsOption::default(),
249            store_key_prefix: String::new(),
250            max_txn_ops: 128,
251            flush_stats_factor: 3,
252            tracing: TracingOptions::default(),
253            backend: BackendImpl::EtcdStore,
254            #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
255            meta_table_name: common_meta::kv_backend::DEFAULT_META_TABLE_NAME.to_string(),
256            #[cfg(feature = "pg_kvbackend")]
257            meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID,
258            node_max_idle_time: Duration::from_secs(24 * 60 * 60),
259        }
260    }
261}
262
263impl Configurable for MetasrvOptions {
264    fn env_list_keys() -> Option<&'static [&'static str]> {
265        Some(&["wal.broker_endpoints", "store_addrs"])
266    }
267}
268
269impl MetasrvOptions {
270    fn sanitize_store_addrs(&self) -> Vec<String> {
271        self.store_addrs
272            .iter()
273            .map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
274            .collect()
275    }
276}
277
278pub struct MetasrvInfo {
279    pub server_addr: String,
280}
281#[derive(Clone)]
282pub struct Context {
283    pub server_addr: String,
284    pub in_memory: ResettableKvBackendRef,
285    pub kv_backend: KvBackendRef,
286    pub leader_cached_kv_backend: ResettableKvBackendRef,
287    pub meta_peer_client: MetaPeerClientRef,
288    pub mailbox: MailboxRef,
289    pub election: Option<ElectionRef>,
290    pub is_infancy: bool,
291    pub table_metadata_manager: TableMetadataManagerRef,
292    pub cache_invalidator: CacheInvalidatorRef,
293    pub leader_region_registry: LeaderRegionRegistryRef,
294}
295
296impl Context {
297    pub fn reset_in_memory(&self) {
298        self.in_memory.reset();
299        self.leader_region_registry.reset();
300    }
301}
302
303/// The value of the leader. It is used to store the leader's address.
304pub struct LeaderValue(pub String);
305
306impl<T: AsRef<[u8]>> From<T> for LeaderValue {
307    fn from(value: T) -> Self {
308        let string = String::from_utf8_lossy(value.as_ref());
309        Self(string.to_string())
310    }
311}
312
313#[derive(Debug, Clone, Serialize, Deserialize)]
314pub struct MetasrvNodeInfo {
315    // The metasrv's address
316    pub addr: String,
317    // The node build version
318    pub version: String,
319    // The node build git commit hash
320    pub git_commit: String,
321    // The node start timestamp in milliseconds
322    pub start_time_ms: u64,
323}
324
325impl From<MetasrvNodeInfo> for api::v1::meta::MetasrvNodeInfo {
326    fn from(node_info: MetasrvNodeInfo) -> Self {
327        Self {
328            peer: Some(api::v1::meta::Peer {
329                addr: node_info.addr,
330                ..Default::default()
331            }),
332            version: node_info.version,
333            git_commit: node_info.git_commit,
334            start_time_ms: node_info.start_time_ms,
335        }
336    }
337}
338
339#[derive(Clone, Copy)]
340pub enum SelectTarget {
341    Datanode,
342    Flownode,
343}
344
345impl Display for SelectTarget {
346    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
347        match self {
348            SelectTarget::Datanode => write!(f, "datanode"),
349            SelectTarget::Flownode => write!(f, "flownode"),
350        }
351    }
352}
353
354#[derive(Clone)]
355pub struct SelectorContext {
356    pub server_addr: String,
357    pub datanode_lease_secs: u64,
358    pub flownode_lease_secs: u64,
359    pub kv_backend: KvBackendRef,
360    pub meta_peer_client: MetaPeerClientRef,
361    pub table_id: Option<TableId>,
362}
363
364pub type SelectorRef = Arc<dyn Selector<Context = SelectorContext, Output = Vec<Peer>>>;
365pub type RegionStatAwareSelectorRef =
366    Arc<dyn RegionStatAwareSelector<Context = SelectorContext, Output = Vec<(RegionId, Peer)>>>;
367pub type ElectionRef = Arc<dyn Election<Leader = LeaderValue>>;
368
369pub struct MetaStateHandler {
370    subscribe_manager: Option<SubscriptionManagerRef>,
371    greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
372    leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
373    leadership_change_notifier: LeadershipChangeNotifier,
374    state: StateRef,
375}
376
377impl MetaStateHandler {
378    pub async fn on_leader_start(&self) {
379        self.state.write().unwrap().next_state(become_leader(false));
380
381        if let Err(e) = self.leader_cached_kv_backend.load().await {
382            error!(e; "Failed to load kv into leader cache kv store");
383        } else {
384            self.state.write().unwrap().next_state(become_leader(true));
385        }
386
387        self.leadership_change_notifier
388            .notify_on_leader_start()
389            .await;
390
391        self.greptimedb_telemetry_task.should_report(true);
392    }
393
394    pub async fn on_leader_stop(&self) {
395        self.state.write().unwrap().next_state(become_follower());
396
397        self.leadership_change_notifier
398            .notify_on_leader_stop()
399            .await;
400
401        // Suspends reporting.
402        self.greptimedb_telemetry_task.should_report(false);
403
404        if let Some(sub_manager) = self.subscribe_manager.clone() {
405            info!("Leader changed, un_subscribe all");
406            if let Err(e) = sub_manager.unsubscribe_all() {
407                error!(e; "Failed to un_subscribe all");
408            }
409        }
410    }
411}
412
413pub struct Metasrv {
414    state: StateRef,
415    started: Arc<AtomicBool>,
416    start_time_ms: u64,
417    options: MetasrvOptions,
418    // It is only valid at the leader node and is used to temporarily
419    // store some data that will not be persisted.
420    in_memory: ResettableKvBackendRef,
421    kv_backend: KvBackendRef,
422    leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
423    meta_peer_client: MetaPeerClientRef,
424    // The selector is used to select a target datanode.
425    selector: SelectorRef,
426    selector_ctx: SelectorContext,
427    // The flow selector is used to select a target flownode.
428    flow_selector: SelectorRef,
429    handler_group: RwLock<Option<HeartbeatHandlerGroupRef>>,
430    handler_group_builder: Mutex<Option<HeartbeatHandlerGroupBuilder>>,
431    election: Option<ElectionRef>,
432    procedure_manager: ProcedureManagerRef,
433    mailbox: MailboxRef,
434    procedure_executor: ProcedureExecutorRef,
435    wal_options_allocator: WalOptionsAllocatorRef,
436    table_metadata_manager: TableMetadataManagerRef,
437    runtime_switch_manager: RuntimeSwitchManagerRef,
438    memory_region_keeper: MemoryRegionKeeperRef,
439    greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
440    region_migration_manager: RegionMigrationManagerRef,
441    region_supervisor_ticker: Option<RegionSupervisorTickerRef>,
442    cache_invalidator: CacheInvalidatorRef,
443    leader_region_registry: LeaderRegionRegistryRef,
444    wal_prune_ticker: Option<WalPruneTickerRef>,
445
446    plugins: Plugins,
447}
448
449impl Metasrv {
450    pub async fn try_start(&self) -> Result<()> {
451        if self
452            .started
453            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
454            .is_err()
455        {
456            warn!("Metasrv already started");
457            return Ok(());
458        }
459
460        let handler_group_builder =
461            self.handler_group_builder
462                .lock()
463                .unwrap()
464                .take()
465                .context(error::UnexpectedSnafu {
466                    violated: "expected heartbeat handler group builder",
467                })?;
468        *self.handler_group.write().unwrap() = Some(Arc::new(handler_group_builder.build()?));
469
470        // Creates default schema if not exists
471        self.table_metadata_manager
472            .init()
473            .await
474            .context(InitMetadataSnafu)?;
475
476        if let Some(election) = self.election() {
477            let procedure_manager = self.procedure_manager.clone();
478            let in_memory = self.in_memory.clone();
479            let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
480            let subscribe_manager = self.subscription_manager();
481            let mut rx = election.subscribe_leader_change();
482            let greptimedb_telemetry_task = self.greptimedb_telemetry_task.clone();
483            greptimedb_telemetry_task
484                .start()
485                .context(StartTelemetryTaskSnafu)?;
486
487            // Builds leadership change notifier.
488            let mut leadership_change_notifier = LeadershipChangeNotifier::default();
489            leadership_change_notifier.add_listener(self.wal_options_allocator.clone());
490            leadership_change_notifier
491                .add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager)));
492            leadership_change_notifier.add_listener(Arc::new(NodeExpiryListener::new(
493                self.options.node_max_idle_time,
494                self.in_memory.clone(),
495            )));
496            if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker {
497                leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _);
498            }
499            if let Some(wal_prune_ticker) = &self.wal_prune_ticker {
500                leadership_change_notifier.add_listener(wal_prune_ticker.clone() as _);
501            }
502            if let Some(customizer) = self.plugins.get::<LeadershipChangeNotifierCustomizerRef>() {
503                customizer.customize(&mut leadership_change_notifier);
504            }
505
506            let state_handler = MetaStateHandler {
507                greptimedb_telemetry_task,
508                subscribe_manager,
509                state: self.state.clone(),
510                leader_cached_kv_backend: leader_cached_kv_backend.clone(),
511                leadership_change_notifier,
512            };
513            let _handle = common_runtime::spawn_global(async move {
514                loop {
515                    match rx.recv().await {
516                        Ok(msg) => {
517                            in_memory.reset();
518                            leader_cached_kv_backend.reset();
519                            info!("Leader's cache has bean cleared on leader change: {msg}");
520                            match msg {
521                                LeaderChangeMessage::Elected(_) => {
522                                    state_handler.on_leader_start().await;
523                                }
524                                LeaderChangeMessage::StepDown(leader) => {
525                                    error!("Leader :{:?} step down", leader);
526
527                                    state_handler.on_leader_stop().await;
528                                }
529                            }
530                        }
531                        Err(RecvError::Closed) => {
532                            error!("Not expected, is leader election loop still running?");
533                            break;
534                        }
535                        Err(RecvError::Lagged(_)) => {
536                            break;
537                        }
538                    }
539                }
540
541                state_handler.on_leader_stop().await;
542            });
543
544            // Register candidate and keep lease in background.
545            {
546                let election = election.clone();
547                let started = self.started.clone();
548                let node_info = self.node_info();
549                let _handle = common_runtime::spawn_global(async move {
550                    while started.load(Ordering::Acquire) {
551                        let res = election.register_candidate(&node_info).await;
552                        if let Err(e) = res {
553                            warn!(e; "Metasrv register candidate error");
554                        }
555                    }
556                });
557            }
558
559            // Campaign
560            {
561                let election = election.clone();
562                let started = self.started.clone();
563                let _handle = common_runtime::spawn_global(async move {
564                    while started.load(Ordering::Acquire) {
565                        let res = election.campaign().await;
566                        if let Err(e) = res {
567                            warn!(e; "Metasrv election error");
568                        }
569                        election.reset_campaign().await;
570                        info!("Metasrv re-initiate election");
571                    }
572                    info!("Metasrv stopped");
573                });
574            }
575        } else {
576            warn!(
577                "Ensure only one instance of Metasrv is running, as there is no election service."
578            );
579
580            if let Err(e) = self.wal_options_allocator.start().await {
581                error!(e; "Failed to start wal options allocator");
582            }
583            // Always load kv into cached kv store.
584            self.leader_cached_kv_backend
585                .load()
586                .await
587                .context(KvBackendSnafu)?;
588            self.procedure_manager
589                .start()
590                .await
591                .context(StartProcedureManagerSnafu)?;
592        }
593
594        info!("Metasrv started");
595
596        Ok(())
597    }
598
599    pub async fn shutdown(&self) -> Result<()> {
600        if self
601            .started
602            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
603            .is_err()
604        {
605            warn!("Metasrv already stopped");
606            return Ok(());
607        }
608
609        self.procedure_manager
610            .stop()
611            .await
612            .context(StopProcedureManagerSnafu)?;
613
614        info!("Metasrv stopped");
615
616        Ok(())
617    }
618
619    pub fn start_time_ms(&self) -> u64 {
620        self.start_time_ms
621    }
622
623    pub fn node_info(&self) -> MetasrvNodeInfo {
624        let build_info = common_version::build_info();
625        MetasrvNodeInfo {
626            addr: self.options().grpc.server_addr.clone(),
627            version: build_info.version.to_string(),
628            git_commit: build_info.commit_short.to_string(),
629            start_time_ms: self.start_time_ms(),
630        }
631    }
632
633    /// Looks up a datanode peer by peer_id, returning it only when it's alive.
634    /// A datanode is considered alive when it's still within the lease period.
635    pub(crate) async fn lookup_datanode_peer(&self, peer_id: u64) -> Result<Option<Peer>> {
636        lookup_datanode_peer(
637            peer_id,
638            &self.meta_peer_client,
639            distributed_time_constants::DATANODE_LEASE_SECS,
640        )
641        .await
642    }
643
644    pub fn options(&self) -> &MetasrvOptions {
645        &self.options
646    }
647
648    pub fn in_memory(&self) -> &ResettableKvBackendRef {
649        &self.in_memory
650    }
651
652    pub fn kv_backend(&self) -> &KvBackendRef {
653        &self.kv_backend
654    }
655
656    pub fn meta_peer_client(&self) -> &MetaPeerClientRef {
657        &self.meta_peer_client
658    }
659
660    pub fn selector(&self) -> &SelectorRef {
661        &self.selector
662    }
663
664    pub fn selector_ctx(&self) -> &SelectorContext {
665        &self.selector_ctx
666    }
667
668    pub fn flow_selector(&self) -> &SelectorRef {
669        &self.flow_selector
670    }
671
672    pub fn handler_group(&self) -> Option<HeartbeatHandlerGroupRef> {
673        self.handler_group.read().unwrap().clone()
674    }
675
676    pub fn election(&self) -> Option<&ElectionRef> {
677        self.election.as_ref()
678    }
679
680    pub fn mailbox(&self) -> &MailboxRef {
681        &self.mailbox
682    }
683
684    pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
685        &self.procedure_executor
686    }
687
688    pub fn procedure_manager(&self) -> &ProcedureManagerRef {
689        &self.procedure_manager
690    }
691
692    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
693        &self.table_metadata_manager
694    }
695
696    pub fn runtime_switch_manager(&self) -> &RuntimeSwitchManagerRef {
697        &self.runtime_switch_manager
698    }
699
700    pub fn memory_region_keeper(&self) -> &MemoryRegionKeeperRef {
701        &self.memory_region_keeper
702    }
703
704    pub fn region_migration_manager(&self) -> &RegionMigrationManagerRef {
705        &self.region_migration_manager
706    }
707
708    pub fn publish(&self) -> Option<PublisherRef> {
709        self.plugins.get::<PublisherRef>()
710    }
711
712    pub fn subscription_manager(&self) -> Option<SubscriptionManagerRef> {
713        self.plugins.get::<SubscriptionManagerRef>()
714    }
715
716    pub fn plugins(&self) -> &Plugins {
717        &self.plugins
718    }
719
720    #[inline]
721    pub fn new_ctx(&self) -> Context {
722        let server_addr = self.options().grpc.server_addr.clone();
723        let in_memory = self.in_memory.clone();
724        let kv_backend = self.kv_backend.clone();
725        let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
726        let meta_peer_client = self.meta_peer_client.clone();
727        let mailbox = self.mailbox.clone();
728        let election = self.election.clone();
729        let table_metadata_manager = self.table_metadata_manager.clone();
730        let cache_invalidator = self.cache_invalidator.clone();
731        let leader_region_registry = self.leader_region_registry.clone();
732
733        Context {
734            server_addr,
735            in_memory,
736            kv_backend,
737            leader_cached_kv_backend,
738            meta_peer_client,
739            mailbox,
740            election,
741            is_infancy: false,
742            table_metadata_manager,
743            cache_invalidator,
744            leader_region_registry,
745        }
746    }
747}