Skip to main content

meta_srv/
metasrv.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16
17use std::fmt::{self, Display};
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::sync::{Arc, Mutex, RwLock};
20use std::time::Duration;
21
22use api::v1::meta::{HeartbeatConfig, Role};
23use clap::ValueEnum;
24use common_base::Plugins;
25use common_base::readable_size::ReadableSize;
26use common_config::{Configurable, DEFAULT_DATA_HOME};
27use common_event_recorder::EventRecorderOptions;
28use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
29use common_meta::cache_invalidator::CacheInvalidatorRef;
30use common_meta::ddl::allocator::resource_id::ResourceIdAllocatorRef;
31use common_meta::ddl_manager::DdlManagerRef;
32use common_meta::distributed_time_constants::{
33    self, BASE_HEARTBEAT_INTERVAL, default_distributed_time_constants, frontend_heartbeat_interval,
34};
35use common_meta::election::LeaderChangeMessage;
36pub use common_meta::election::{ElectionRef, MetasrvNodeInfo};
37use common_meta::key::TableMetadataManagerRef;
38use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
39use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
40use common_meta::leadership_notifier::{
41    LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef,
42};
43use common_meta::node_expiry_listener::NodeExpiryListener;
44use common_meta::peer::{Peer, PeerDiscoveryRef};
45use common_meta::reconciliation::manager::ReconciliationManagerRef;
46use common_meta::region_keeper::MemoryRegionKeeperRef;
47use common_meta::region_registry::LeaderRegionRegistryRef;
48use common_meta::stats::topic::TopicStatsRegistryRef;
49use common_meta::wal_provider::WalProviderRef;
50use common_options::datanode::DatanodeClientOptions;
51use common_options::memory::MemoryOptions;
52use common_procedure::ProcedureManagerRef;
53use common_procedure::options::ProcedureConfig;
54use common_stat::ResourceStatRef;
55use common_telemetry::logging::{LoggingOptions, TracingOptions};
56use common_telemetry::{error, info, warn};
57use common_time::util::DefaultSystemTimer;
58use common_wal::config::MetasrvWalConfig;
59use serde::{Deserialize, Serialize};
60use servers::grpc::GrpcOptions;
61use servers::http::HttpOptions;
62use servers::tls::TlsOption;
63use snafu::{OptionExt, ResultExt};
64use store_api::storage::RegionId;
65use tokio::sync::broadcast::error::RecvError;
66
67use crate::cluster::MetaPeerClientRef;
68use crate::discovery;
69use crate::error::{
70    self, InitMetadataSnafu, KvBackendSnafu, Result, StartProcedureManagerSnafu,
71    StartTelemetryTaskSnafu, StopProcedureManagerSnafu,
72};
73use crate::failure_detector::PhiAccrualFailureDetectorOptions;
74use crate::gc::{GcSchedulerOptions, GcTickerRef};
75use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatHandlerGroupRef};
76use crate::procedure::ProcedureManagerListenerAdapter;
77use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
78use crate::procedure::wal_prune::manager::WalPruneTickerRef;
79use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
80use crate::region::flush_trigger::RegionFlushTickerRef;
81use crate::region::supervisor::RegionSupervisorTickerRef;
82use crate::selector::{RegionStatAwareSelector, Selector, SelectorType};
83use crate::service::mailbox::MailboxRef;
84use crate::service::store::cached_kv::LeaderCachedKvBackend;
85use crate::state::{StateRef, become_follower, become_leader};
86
87pub const TABLE_ID_SEQ: &str = "table_id";
88pub const FLOW_ID_SEQ: &str = "flow_id";
89pub const METASRV_DATA_DIR: &str = "metasrv";
90
91// The datastores that implements metadata kvbackend.
92#[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)]
93#[serde(rename_all = "snake_case")]
94pub enum BackendImpl {
95    // Etcd as metadata storage.
96    #[default]
97    EtcdStore,
98    // In memory metadata storage - mostly used for testing.
99    MemoryStore,
100    #[cfg(feature = "pg_kvbackend")]
101    // Postgres as metadata storage.
102    PostgresStore,
103    #[cfg(feature = "mysql_kvbackend")]
104    // MySql as metadata storage.
105    MysqlStore,
106}
107
108/// Configuration options for the stats persistence.
109#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
110pub struct StatsPersistenceOptions {
111    /// TTL for the stats table that will be used to store the stats.
112    #[serde(with = "humantime_serde")]
113    pub ttl: Duration,
114    /// The interval to persist the stats.
115    #[serde(with = "humantime_serde")]
116    pub interval: Duration,
117}
118
119impl Default for StatsPersistenceOptions {
120    fn default() -> Self {
121        Self {
122            ttl: Duration::ZERO,
123            interval: Duration::from_mins(10),
124        }
125    }
126}
127
128/// Heartbeat configuration for a single node type.
129#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
130#[serde(default)]
131pub struct HeartbeatOptions {
132    /// Heartbeat interval.
133    #[serde(with = "humantime_serde")]
134    pub interval: Duration,
135    /// Retry interval when heartbeat connection fails.
136    #[serde(with = "humantime_serde")]
137    pub retry_interval: Duration,
138}
139
140impl Default for HeartbeatOptions {
141    fn default() -> Self {
142        Self {
143            interval: BASE_HEARTBEAT_INTERVAL,
144            retry_interval: BASE_HEARTBEAT_INTERVAL,
145        }
146    }
147}
148
149impl HeartbeatOptions {
150    pub fn datanode_from(base_interval: Duration) -> Self {
151        Self {
152            interval: base_interval,
153            retry_interval: base_interval,
154        }
155    }
156
157    pub fn frontend_from(base_interval: Duration) -> Self {
158        Self {
159            interval: frontend_heartbeat_interval(base_interval),
160            retry_interval: base_interval,
161        }
162    }
163
164    pub fn flownode_from(base_interval: Duration) -> Self {
165        Self {
166            interval: base_interval,
167            retry_interval: base_interval,
168        }
169    }
170}
171
172impl From<HeartbeatOptions> for HeartbeatConfig {
173    fn from(opts: HeartbeatOptions) -> Self {
174        Self {
175            heartbeat_interval_ms: opts.interval.as_millis() as u64,
176            retry_interval_ms: opts.retry_interval.as_millis() as u64,
177        }
178    }
179}
180
181#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
182#[serde(default)]
183pub struct BackendClientOptions {
184    #[serde(with = "humantime_serde")]
185    pub keep_alive_timeout: Duration,
186    #[serde(with = "humantime_serde")]
187    pub keep_alive_interval: Duration,
188    #[serde(with = "humantime_serde")]
189    pub connect_timeout: Duration,
190}
191
192impl Default for BackendClientOptions {
193    fn default() -> Self {
194        Self {
195            keep_alive_interval: Duration::from_secs(10),
196            keep_alive_timeout: Duration::from_secs(3),
197            connect_timeout: Duration::from_secs(3),
198        }
199    }
200}
201
202#[derive(Clone, PartialEq, Serialize, Deserialize)]
203#[serde(default)]
204pub struct MetasrvOptions {
205    /// The address the server listens on.
206    #[deprecated(note = "Use grpc.bind_addr instead")]
207    pub bind_addr: String,
208    /// The address the server advertises to the clients.
209    #[deprecated(note = "Use grpc.server_addr instead")]
210    pub server_addr: String,
211    /// The address of the store, e.g., etcd.
212    pub store_addrs: Vec<String>,
213    /// TLS configuration for kv store backend (PostgreSQL/MySQL)
214    /// Only applicable when using PostgreSQL or MySQL as the metadata store
215    #[serde(default)]
216    pub backend_tls: Option<TlsOption>,
217    /// The backend client options.
218    /// Currently, only applicable when using etcd as the metadata store.
219    #[serde(default)]
220    pub backend_client: BackendClientOptions,
221    /// The type of selector.
222    pub selector: SelectorType,
223    /// Whether to enable region failover.
224    pub enable_region_failover: bool,
225    /// The base heartbeat interval.
226    ///
227    /// This value is used to calculate the distributed time constants for components.
228    /// e.g., the region lease time is `heartbeat_interval * 3 + Duration::from_secs(1)`.
229    #[serde(with = "humantime_serde")]
230    pub heartbeat_interval: Duration,
231    /// The delay before starting region failure detection.
232    /// This delay helps prevent Metasrv from triggering unnecessary region failovers before all Datanodes are fully started.
233    /// Especially useful when the cluster is not deployed with GreptimeDB Operator and maintenance mode is not enabled.
234    #[serde(with = "humantime_serde")]
235    pub region_failure_detector_initialization_delay: Duration,
236    /// Whether to allow region failover on local WAL.
237    ///
238    /// If it's true, the region failover will be allowed even if the local WAL is used.
239    /// Note that this option is not recommended to be set to true, because it may lead to data loss during failover.
240    pub allow_region_failover_on_local_wal: bool,
241    pub grpc: GrpcOptions,
242    /// The HTTP server options.
243    pub http: HttpOptions,
244    /// The logging options.
245    pub logging: LoggingOptions,
246    /// The procedure options.
247    pub procedure: ProcedureConfig,
248    /// The failure detector options.
249    pub failure_detector: PhiAccrualFailureDetectorOptions,
250    /// The datanode options.
251    pub datanode: DatanodeClientOptions,
252    /// Whether to enable telemetry.
253    pub enable_telemetry: bool,
254    /// The data home directory.
255    pub data_home: String,
256    /// The WAL options.
257    pub wal: MetasrvWalConfig,
258    /// The store key prefix. If it is not empty, all keys in the store will be prefixed with it.
259    /// This is useful when multiple metasrv clusters share the same store.
260    pub store_key_prefix: String,
261    /// The max operations per txn
262    ///
263    /// This value is usually limited by which store is used for the `KvBackend`.
264    /// For example, if using etcd, this value should ensure that it is less than
265    /// or equal to the `--max-txn-ops` option value of etcd.
266    ///
267    /// TODO(jeremy): Currently, this option only affects the etcd store, but it may
268    /// also affect other stores in the future. In other words, each store needs to
269    /// limit the number of operations in a txn because an infinitely large txn could
270    /// potentially block other operations.
271    pub max_txn_ops: usize,
272    /// The factor that determines how often statistics should be flushed,
273    /// based on the number of received heartbeats. When the number of heartbeats
274    /// reaches this factor, a flush operation is triggered.
275    pub flush_stats_factor: usize,
276    /// The tracing options.
277    pub tracing: TracingOptions,
278    /// The memory options.
279    pub memory: MemoryOptions,
280    /// The datastore for kv metadata.
281    pub backend: BackendImpl,
282    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
283    /// Table name of rds kv backend.
284    pub meta_table_name: String,
285    #[cfg(feature = "pg_kvbackend")]
286    /// Lock id for meta kv election. Only effect when using pg_kvbackend.
287    pub meta_election_lock_id: u64,
288    #[cfg(feature = "pg_kvbackend")]
289    /// Optional PostgreSQL schema for metadata table (defaults to current search_path if empty).
290    pub meta_schema_name: Option<String>,
291    #[cfg(feature = "pg_kvbackend")]
292    /// Automatically create PostgreSQL schema if it doesn't exist (default: true).
293    pub auto_create_schema: bool,
294    #[serde(with = "humantime_serde")]
295    pub node_max_idle_time: Duration,
296    /// The event recorder options.
297    pub event_recorder: EventRecorderOptions,
298    /// The stats persistence options.
299    pub stats_persistence: StatsPersistenceOptions,
300    /// The GC scheduler options.
301    pub gc: GcSchedulerOptions,
302}
303
304impl fmt::Debug for MetasrvOptions {
305    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
306        let mut debug_struct = f.debug_struct("MetasrvOptions");
307        debug_struct
308            .field("store_addrs", &self.sanitize_store_addrs())
309            .field("backend_tls", &self.backend_tls)
310            .field("selector", &self.selector)
311            .field("enable_region_failover", &self.enable_region_failover)
312            .field(
313                "allow_region_failover_on_local_wal",
314                &self.allow_region_failover_on_local_wal,
315            )
316            .field("grpc", &self.grpc)
317            .field("http", &self.http)
318            .field("logging", &self.logging)
319            .field("procedure", &self.procedure)
320            .field("failure_detector", &self.failure_detector)
321            .field("datanode", &self.datanode)
322            .field("enable_telemetry", &self.enable_telemetry)
323            .field("data_home", &self.data_home)
324            .field("wal", &self.wal)
325            .field("store_key_prefix", &self.store_key_prefix)
326            .field("max_txn_ops", &self.max_txn_ops)
327            .field("flush_stats_factor", &self.flush_stats_factor)
328            .field("tracing", &self.tracing)
329            .field("backend", &self.backend)
330            .field("event_recorder", &self.event_recorder)
331            .field("stats_persistence", &self.stats_persistence)
332            .field("heartbeat_interval", &self.heartbeat_interval)
333            .field("backend_client", &self.backend_client);
334
335        #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
336        debug_struct.field("meta_table_name", &self.meta_table_name);
337
338        #[cfg(feature = "pg_kvbackend")]
339        debug_struct.field("meta_election_lock_id", &self.meta_election_lock_id);
340        #[cfg(feature = "pg_kvbackend")]
341        debug_struct.field("meta_schema_name", &self.meta_schema_name);
342
343        debug_struct
344            .field("node_max_idle_time", &self.node_max_idle_time)
345            .finish()
346    }
347}
348
349const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
350
351impl Default for MetasrvOptions {
352    fn default() -> Self {
353        Self {
354            #[allow(deprecated)]
355            bind_addr: String::new(),
356            #[allow(deprecated)]
357            server_addr: String::new(),
358            store_addrs: vec!["127.0.0.1:2379".to_string()],
359            backend_tls: Some(TlsOption::prefer()),
360            selector: SelectorType::default(),
361            enable_region_failover: false,
362            heartbeat_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
363            region_failure_detector_initialization_delay: Duration::from_secs(10 * 60),
364            allow_region_failover_on_local_wal: false,
365            grpc: GrpcOptions {
366                bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
367                ..Default::default()
368            },
369            http: HttpOptions::default(),
370            logging: LoggingOptions::default(),
371            procedure: ProcedureConfig {
372                max_retry_times: 12,
373                retry_delay: Duration::from_millis(500),
374                // The etcd the maximum size of any request is 1.5 MiB
375                // 1500KiB = 1536KiB (1.5MiB) - 36KiB (reserved size of key)
376                max_metadata_value_size: Some(ReadableSize::kb(1500)),
377                max_running_procedures: 128,
378            },
379            failure_detector: PhiAccrualFailureDetectorOptions::default(),
380            datanode: DatanodeClientOptions::default(),
381            enable_telemetry: true,
382            data_home: DEFAULT_DATA_HOME.to_string(),
383            wal: MetasrvWalConfig::default(),
384            store_key_prefix: String::new(),
385            max_txn_ops: 128,
386            flush_stats_factor: 3,
387            tracing: TracingOptions::default(),
388            memory: MemoryOptions::default(),
389            backend: BackendImpl::EtcdStore,
390            #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
391            meta_table_name: common_meta::kv_backend::DEFAULT_META_TABLE_NAME.to_string(),
392            #[cfg(feature = "pg_kvbackend")]
393            meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID,
394            #[cfg(feature = "pg_kvbackend")]
395            meta_schema_name: None,
396            #[cfg(feature = "pg_kvbackend")]
397            auto_create_schema: true,
398            node_max_idle_time: Duration::from_secs(24 * 60 * 60),
399            event_recorder: EventRecorderOptions::default(),
400            stats_persistence: StatsPersistenceOptions::default(),
401            gc: GcSchedulerOptions::default(),
402            backend_client: BackendClientOptions::default(),
403        }
404    }
405}
406
407impl Configurable for MetasrvOptions {
408    fn env_list_keys() -> Option<&'static [&'static str]> {
409        Some(&["wal.broker_endpoints", "store_addrs"])
410    }
411}
412
413impl MetasrvOptions {
414    fn sanitize_store_addrs(&self) -> Vec<String> {
415        self.store_addrs
416            .iter()
417            .map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
418            .collect()
419    }
420}
421
422pub struct MetasrvInfo {
423    pub server_addr: String,
424}
425#[derive(Clone)]
426pub struct Context {
427    pub server_addr: String,
428    pub in_memory: ResettableKvBackendRef,
429    pub kv_backend: KvBackendRef,
430    pub leader_cached_kv_backend: ResettableKvBackendRef,
431    pub meta_peer_client: MetaPeerClientRef,
432    pub mailbox: MailboxRef,
433    pub election: Option<ElectionRef>,
434    pub is_infancy: bool,
435    pub table_metadata_manager: TableMetadataManagerRef,
436    pub cache_invalidator: CacheInvalidatorRef,
437    pub leader_region_registry: LeaderRegionRegistryRef,
438    pub topic_stats_registry: TopicStatsRegistryRef,
439    pub heartbeat_interval: Duration,
440    pub is_handshake: bool,
441}
442
443impl Context {
444    pub fn reset_in_memory(&self) {
445        self.in_memory.reset();
446        self.leader_region_registry.reset();
447    }
448
449    pub fn with_handshake(mut self, is_handshake: bool) -> Self {
450        self.is_handshake = is_handshake;
451        self
452    }
453
454    pub fn heartbeat_options_for(&self, role: Role) -> HeartbeatOptions {
455        match role {
456            Role::Datanode => HeartbeatOptions::datanode_from(self.heartbeat_interval),
457            Role::Frontend => HeartbeatOptions::frontend_from(self.heartbeat_interval),
458            Role::Flownode => HeartbeatOptions::flownode_from(self.heartbeat_interval),
459        }
460    }
461}
462
463#[derive(Clone, Copy)]
464pub enum SelectTarget {
465    Datanode,
466    Flownode,
467}
468
469impl Display for SelectTarget {
470    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
471        match self {
472            SelectTarget::Datanode => write!(f, "datanode"),
473            SelectTarget::Flownode => write!(f, "flownode"),
474        }
475    }
476}
477
478#[derive(Clone)]
479pub struct SelectorContext {
480    pub peer_discovery: PeerDiscoveryRef,
481}
482
483pub type SelectorRef = Arc<dyn Selector<Context = SelectorContext, Output = Vec<Peer>>>;
484pub type RegionStatAwareSelectorRef =
485    Arc<dyn RegionStatAwareSelector<Context = SelectorContext, Output = Vec<(RegionId, Peer)>>>;
486
487pub struct MetaStateHandler {
488    subscribe_manager: Option<SubscriptionManagerRef>,
489    greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
490    leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
491    leadership_change_notifier: LeadershipChangeNotifier,
492    mailbox: MailboxRef,
493    state: StateRef,
494}
495
496impl MetaStateHandler {
497    pub async fn on_leader_start(&self) {
498        self.state.write().unwrap().next_state(become_leader(false));
499
500        if let Err(e) = self.leader_cached_kv_backend.load().await {
501            error!(e; "Failed to load kv into leader cache kv store");
502        } else {
503            self.state.write().unwrap().next_state(become_leader(true));
504        }
505
506        self.leadership_change_notifier
507            .notify_on_leader_start()
508            .await;
509
510        self.greptimedb_telemetry_task.should_report(true);
511    }
512
513    pub async fn on_leader_stop(&self) {
514        self.state.write().unwrap().next_state(become_follower());
515
516        // Enforces the mailbox to clear all pushers.
517        // The remaining heartbeat connections will be closed by the remote peer or keep-alive detection.
518        self.mailbox.reset().await;
519        self.leadership_change_notifier
520            .notify_on_leader_stop()
521            .await;
522
523        // Suspends reporting.
524        self.greptimedb_telemetry_task.should_report(false);
525
526        if let Some(sub_manager) = self.subscribe_manager.clone() {
527            info!("Leader changed, un_subscribe all");
528            if let Err(e) = sub_manager.unsubscribe_all() {
529                error!(e; "Failed to un_subscribe all");
530            }
531        }
532    }
533}
534
535pub struct Metasrv {
536    state: StateRef,
537    started: Arc<AtomicBool>,
538    start_time_ms: u64,
539    options: MetasrvOptions,
540    // It is only valid at the leader node and is used to temporarily
541    // store some data that will not be persisted.
542    in_memory: ResettableKvBackendRef,
543    kv_backend: KvBackendRef,
544    leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
545    meta_peer_client: MetaPeerClientRef,
546    // The selector is used to select a target datanode.
547    selector: SelectorRef,
548    selector_ctx: SelectorContext,
549    // The flow selector is used to select a target flownode.
550    flow_selector: SelectorRef,
551    handler_group: RwLock<Option<HeartbeatHandlerGroupRef>>,
552    handler_group_builder: Mutex<Option<HeartbeatHandlerGroupBuilder>>,
553    election: Option<ElectionRef>,
554    procedure_manager: ProcedureManagerRef,
555    mailbox: MailboxRef,
556    ddl_manager: DdlManagerRef,
557    wal_provider: WalProviderRef,
558    table_metadata_manager: TableMetadataManagerRef,
559    runtime_switch_manager: RuntimeSwitchManagerRef,
560    memory_region_keeper: MemoryRegionKeeperRef,
561    greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
562    region_migration_manager: RegionMigrationManagerRef,
563    region_supervisor_ticker: Option<RegionSupervisorTickerRef>,
564    cache_invalidator: CacheInvalidatorRef,
565    leader_region_registry: LeaderRegionRegistryRef,
566    topic_stats_registry: TopicStatsRegistryRef,
567    wal_prune_ticker: Option<WalPruneTickerRef>,
568    region_flush_ticker: Option<RegionFlushTickerRef>,
569    table_id_allocator: ResourceIdAllocatorRef,
570    reconciliation_manager: ReconciliationManagerRef,
571    resource_stat: ResourceStatRef,
572    gc_ticker: Option<GcTickerRef>,
573
574    plugins: Plugins,
575}
576
577impl Metasrv {
578    pub async fn try_start(&self) -> Result<()> {
579        if self
580            .started
581            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
582            .is_err()
583        {
584            warn!("Metasrv already started");
585            return Ok(());
586        }
587
588        let handler_group_builder =
589            self.handler_group_builder
590                .lock()
591                .unwrap()
592                .take()
593                .context(error::UnexpectedSnafu {
594                    violated: "expected heartbeat handler group builder",
595                })?;
596        *self.handler_group.write().unwrap() = Some(Arc::new(handler_group_builder.build()?));
597
598        // Creates default schema if not exists
599        self.table_metadata_manager
600            .init()
601            .await
602            .context(InitMetadataSnafu)?;
603
604        if let Some(election) = self.election() {
605            let procedure_manager = self.procedure_manager.clone();
606            let in_memory = self.in_memory.clone();
607            let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
608            let subscribe_manager = self.subscription_manager();
609            let mut rx = election.subscribe_leader_change();
610            let greptimedb_telemetry_task = self.greptimedb_telemetry_task.clone();
611            greptimedb_telemetry_task
612                .start()
613                .context(StartTelemetryTaskSnafu)?;
614
615            // Builds leadership change notifier.
616            let mut leadership_change_notifier = LeadershipChangeNotifier::default();
617            leadership_change_notifier.add_listener(self.wal_provider.clone());
618            leadership_change_notifier
619                .add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager)));
620            leadership_change_notifier.add_listener(Arc::new(NodeExpiryListener::new(
621                self.options.node_max_idle_time,
622                self.in_memory.clone(),
623            )));
624            if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker {
625                leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _);
626            }
627            if let Some(wal_prune_ticker) = &self.wal_prune_ticker {
628                leadership_change_notifier.add_listener(wal_prune_ticker.clone() as _);
629            }
630            if let Some(region_flush_trigger) = &self.region_flush_ticker {
631                leadership_change_notifier.add_listener(region_flush_trigger.clone() as _);
632            }
633            if let Some(gc_ticker) = &self.gc_ticker {
634                leadership_change_notifier.add_listener(gc_ticker.clone() as _);
635            }
636            if let Some(customizer) = self.plugins.get::<LeadershipChangeNotifierCustomizerRef>() {
637                customizer.customize(&mut leadership_change_notifier);
638            }
639
640            let state_handler = MetaStateHandler {
641                greptimedb_telemetry_task,
642                subscribe_manager,
643                state: self.state.clone(),
644                leader_cached_kv_backend: leader_cached_kv_backend.clone(),
645                leadership_change_notifier,
646                mailbox: self.mailbox.clone(),
647            };
648            let _handle = common_runtime::spawn_global(async move {
649                loop {
650                    match rx.recv().await {
651                        Ok(msg) => {
652                            in_memory.reset();
653                            leader_cached_kv_backend.reset();
654                            info!("Leader's cache has bean cleared on leader change: {msg}");
655                            match msg {
656                                LeaderChangeMessage::Elected(_) => {
657                                    state_handler.on_leader_start().await;
658                                }
659                                LeaderChangeMessage::StepDown(leader) => {
660                                    error!("Leader :{:?} step down", leader);
661
662                                    state_handler.on_leader_stop().await;
663                                }
664                            }
665                        }
666                        Err(RecvError::Closed) => {
667                            error!("Not expected, is leader election loop still running?");
668                            break;
669                        }
670                        Err(RecvError::Lagged(_)) => {
671                            break;
672                        }
673                    }
674                }
675
676                state_handler.on_leader_stop().await;
677            });
678
679            // Register candidate and keep lease in background.
680            {
681                let election = election.clone();
682                let started = self.started.clone();
683                let node_info = self.node_info();
684                let _handle = common_runtime::spawn_global(async move {
685                    while started.load(Ordering::Acquire) {
686                        let res = election.register_candidate(&node_info).await;
687                        if let Err(e) = res {
688                            warn!(e; "Metasrv register candidate error");
689                        }
690                    }
691                });
692            }
693
694            // Campaign
695            {
696                let election = election.clone();
697                let started = self.started.clone();
698                let _handle = common_runtime::spawn_global(async move {
699                    while started.load(Ordering::Acquire) {
700                        let res = election.campaign().await;
701                        if let Err(e) = res {
702                            warn!(e; "Metasrv election error");
703                        }
704                        election.reset_campaign().await;
705                        info!("Metasrv re-initiate election");
706                    }
707                    info!("Metasrv stopped");
708                });
709            }
710        } else {
711            warn!(
712                "Ensure only one instance of Metasrv is running, as there is no election service."
713            );
714
715            if let Err(e) = self.wal_provider.start().await {
716                error!(e; "Failed to start wal provider");
717            }
718            // Always load kv into cached kv store.
719            self.leader_cached_kv_backend
720                .load()
721                .await
722                .context(KvBackendSnafu)?;
723            self.procedure_manager
724                .start()
725                .await
726                .context(StartProcedureManagerSnafu)?;
727        }
728
729        info!("Metasrv started");
730
731        Ok(())
732    }
733
734    pub async fn shutdown(&self) -> Result<()> {
735        if self
736            .started
737            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
738            .is_err()
739        {
740            warn!("Metasrv already stopped");
741            return Ok(());
742        }
743
744        self.procedure_manager
745            .stop()
746            .await
747            .context(StopProcedureManagerSnafu)?;
748
749        info!("Metasrv stopped");
750
751        Ok(())
752    }
753
754    pub fn start_time_ms(&self) -> u64 {
755        self.start_time_ms
756    }
757
758    pub fn resource_stat(&self) -> &ResourceStatRef {
759        &self.resource_stat
760    }
761
762    pub fn node_info(&self) -> MetasrvNodeInfo {
763        let build_info = common_version::build_info();
764        MetasrvNodeInfo {
765            addr: self.options().grpc.server_addr.clone(),
766            version: build_info.version.to_string(),
767            git_commit: build_info.commit_short.to_string(),
768            start_time_ms: self.start_time_ms(),
769            total_cpu_millicores: self.resource_stat.get_total_cpu_millicores(),
770            total_memory_bytes: self.resource_stat.get_total_memory_bytes(),
771            cpu_usage_millicores: self.resource_stat.get_cpu_usage_millicores(),
772            memory_usage_bytes: self.resource_stat.get_memory_usage_bytes(),
773            hostname: hostname::get()
774                .unwrap_or_default()
775                .to_string_lossy()
776                .to_string(),
777        }
778    }
779
780    /// Looks up a datanode peer by peer_id, returning it only when it's alive.
781    /// A datanode is considered alive when it's still within the lease period.
782    pub(crate) async fn lookup_datanode_peer(&self, peer_id: u64) -> Result<Option<Peer>> {
783        discovery::utils::alive_datanode(
784            &DefaultSystemTimer,
785            self.meta_peer_client.as_ref(),
786            peer_id,
787            default_distributed_time_constants().datanode_lease,
788        )
789        .await
790    }
791
792    pub fn options(&self) -> &MetasrvOptions {
793        &self.options
794    }
795
796    pub fn in_memory(&self) -> &ResettableKvBackendRef {
797        &self.in_memory
798    }
799
800    pub fn kv_backend(&self) -> &KvBackendRef {
801        &self.kv_backend
802    }
803
804    pub fn meta_peer_client(&self) -> &MetaPeerClientRef {
805        &self.meta_peer_client
806    }
807
808    pub fn selector(&self) -> &SelectorRef {
809        &self.selector
810    }
811
812    pub fn selector_ctx(&self) -> &SelectorContext {
813        &self.selector_ctx
814    }
815
816    pub fn flow_selector(&self) -> &SelectorRef {
817        &self.flow_selector
818    }
819
820    pub fn handler_group(&self) -> Option<HeartbeatHandlerGroupRef> {
821        self.handler_group.read().unwrap().clone()
822    }
823
824    pub fn election(&self) -> Option<&ElectionRef> {
825        self.election.as_ref()
826    }
827
828    pub fn mailbox(&self) -> &MailboxRef {
829        &self.mailbox
830    }
831
832    pub fn ddl_manager(&self) -> &DdlManagerRef {
833        &self.ddl_manager
834    }
835
836    pub fn procedure_manager(&self) -> &ProcedureManagerRef {
837        &self.procedure_manager
838    }
839
840    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
841        &self.table_metadata_manager
842    }
843
844    pub fn runtime_switch_manager(&self) -> &RuntimeSwitchManagerRef {
845        &self.runtime_switch_manager
846    }
847
848    pub fn memory_region_keeper(&self) -> &MemoryRegionKeeperRef {
849        &self.memory_region_keeper
850    }
851
852    pub fn region_migration_manager(&self) -> &RegionMigrationManagerRef {
853        &self.region_migration_manager
854    }
855
856    pub fn publish(&self) -> Option<PublisherRef> {
857        self.plugins.get::<PublisherRef>()
858    }
859
860    pub fn subscription_manager(&self) -> Option<SubscriptionManagerRef> {
861        self.plugins.get::<SubscriptionManagerRef>()
862    }
863
864    pub fn table_id_allocator(&self) -> &ResourceIdAllocatorRef {
865        &self.table_id_allocator
866    }
867
868    pub fn reconciliation_manager(&self) -> &ReconciliationManagerRef {
869        &self.reconciliation_manager
870    }
871
872    pub fn plugins(&self) -> &Plugins {
873        &self.plugins
874    }
875
876    pub fn started(&self) -> Arc<AtomicBool> {
877        self.started.clone()
878    }
879
880    pub fn gc_ticker(&self) -> Option<GcTickerRef> {
881        self.gc_ticker.as_ref().cloned()
882    }
883
884    #[inline]
885    pub fn new_ctx(&self) -> Context {
886        let server_addr = self.options().grpc.server_addr.clone();
887        let in_memory = self.in_memory.clone();
888        let kv_backend = self.kv_backend.clone();
889        let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
890        let meta_peer_client = self.meta_peer_client.clone();
891        let mailbox = self.mailbox.clone();
892        let election = self.election.clone();
893        let table_metadata_manager = self.table_metadata_manager.clone();
894        let cache_invalidator = self.cache_invalidator.clone();
895        let leader_region_registry = self.leader_region_registry.clone();
896        let topic_stats_registry = self.topic_stats_registry.clone();
897
898        Context {
899            server_addr,
900            in_memory,
901            kv_backend,
902            leader_cached_kv_backend,
903            meta_peer_client,
904            mailbox,
905            election,
906            is_infancy: false,
907            table_metadata_manager,
908            cache_invalidator,
909            leader_region_registry,
910            topic_stats_registry,
911            heartbeat_interval: self.options().heartbeat_interval,
912            is_handshake: false,
913        }
914    }
915}
916
917#[cfg(test)]
918mod tests {
919    use crate::metasrv::MetasrvNodeInfo;
920
921    #[test]
922    fn test_deserialize_metasrv_node_info() {
923        let str = r#"{"addr":"127.0.0.1:4002","version":"0.1.0","git_commit":"1234567890","start_time_ms":1715145600}"#;
924        let node_info: MetasrvNodeInfo = serde_json::from_str(str).unwrap();
925        assert_eq!(node_info.addr, "127.0.0.1:4002");
926        assert_eq!(node_info.version, "0.1.0");
927        assert_eq!(node_info.git_commit, "1234567890");
928        assert_eq!(node_info.start_time_ms, 1715145600);
929    }
930}