meta_srv/
metasrv.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16
17use std::fmt::{self, Display};
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::sync::{Arc, Mutex, RwLock};
20use std::time::Duration;
21
22use clap::ValueEnum;
23use common_base::Plugins;
24use common_base::readable_size::ReadableSize;
25use common_config::{Configurable, DEFAULT_DATA_HOME};
26use common_event_recorder::EventRecorderOptions;
27use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
28use common_meta::cache_invalidator::CacheInvalidatorRef;
29use common_meta::ddl_manager::DdlManagerRef;
30use common_meta::distributed_time_constants::{self, default_distributed_time_constants};
31use common_meta::key::TableMetadataManagerRef;
32use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
33use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
34use common_meta::leadership_notifier::{
35    LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef,
36};
37use common_meta::node_expiry_listener::NodeExpiryListener;
38use common_meta::peer::{Peer, PeerDiscoveryRef};
39use common_meta::reconciliation::manager::ReconciliationManagerRef;
40use common_meta::region_keeper::MemoryRegionKeeperRef;
41use common_meta::region_registry::LeaderRegionRegistryRef;
42use common_meta::sequence::SequenceRef;
43use common_meta::stats::topic::TopicStatsRegistryRef;
44use common_meta::wal_options_allocator::WalOptionsAllocatorRef;
45use common_options::datanode::DatanodeClientOptions;
46use common_options::memory::MemoryOptions;
47use common_procedure::ProcedureManagerRef;
48use common_procedure::options::ProcedureConfig;
49use common_stat::ResourceStatRef;
50use common_telemetry::logging::{LoggingOptions, TracingOptions};
51use common_telemetry::{error, info, warn};
52use common_time::util::DefaultSystemTimer;
53use common_wal::config::MetasrvWalConfig;
54use serde::{Deserialize, Serialize};
55use servers::grpc::GrpcOptions;
56use servers::http::HttpOptions;
57use servers::tls::TlsOption;
58use snafu::{OptionExt, ResultExt};
59use store_api::storage::RegionId;
60use tokio::sync::broadcast::error::RecvError;
61
62use crate::cluster::MetaPeerClientRef;
63use crate::discovery;
64use crate::election::{Election, LeaderChangeMessage};
65use crate::error::{
66    self, InitMetadataSnafu, KvBackendSnafu, Result, StartProcedureManagerSnafu,
67    StartTelemetryTaskSnafu, StopProcedureManagerSnafu,
68};
69use crate::failure_detector::PhiAccrualFailureDetectorOptions;
70use crate::gc::{GcSchedulerOptions, GcTickerRef};
71use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatHandlerGroupRef};
72use crate::procedure::ProcedureManagerListenerAdapter;
73use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
74use crate::procedure::wal_prune::manager::WalPruneTickerRef;
75use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
76use crate::region::flush_trigger::RegionFlushTickerRef;
77use crate::region::supervisor::RegionSupervisorTickerRef;
78use crate::selector::{RegionStatAwareSelector, Selector, SelectorType};
79use crate::service::mailbox::MailboxRef;
80use crate::service::store::cached_kv::LeaderCachedKvBackend;
81use crate::state::{StateRef, become_follower, become_leader};
82
83pub const TABLE_ID_SEQ: &str = "table_id";
84pub const FLOW_ID_SEQ: &str = "flow_id";
85pub const METASRV_DATA_DIR: &str = "metasrv";
86
87// The datastores that implements metadata kvbackend.
88#[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)]
89#[serde(rename_all = "snake_case")]
90pub enum BackendImpl {
91    // Etcd as metadata storage.
92    #[default]
93    EtcdStore,
94    // In memory metadata storage - mostly used for testing.
95    MemoryStore,
96    #[cfg(feature = "pg_kvbackend")]
97    // Postgres as metadata storage.
98    PostgresStore,
99    #[cfg(feature = "mysql_kvbackend")]
100    // MySql as metadata storage.
101    MysqlStore,
102}
103
104/// Configuration options for the stats persistence.
105#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
106pub struct StatsPersistenceOptions {
107    /// TTL for the stats table that will be used to store the stats.
108    #[serde(with = "humantime_serde")]
109    pub ttl: Duration,
110    /// The interval to persist the stats.
111    #[serde(with = "humantime_serde")]
112    pub interval: Duration,
113}
114
115impl Default for StatsPersistenceOptions {
116    fn default() -> Self {
117        Self {
118            ttl: Duration::ZERO,
119            interval: Duration::from_mins(10),
120        }
121    }
122}
123
124#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
125#[serde(default)]
126pub struct BackendClientOptions {
127    #[serde(with = "humantime_serde")]
128    pub keep_alive_timeout: Duration,
129    #[serde(with = "humantime_serde")]
130    pub keep_alive_interval: Duration,
131    #[serde(with = "humantime_serde")]
132    pub connect_timeout: Duration,
133}
134
135impl Default for BackendClientOptions {
136    fn default() -> Self {
137        Self {
138            keep_alive_interval: Duration::from_secs(10),
139            keep_alive_timeout: Duration::from_secs(3),
140            connect_timeout: Duration::from_secs(3),
141        }
142    }
143}
144
145#[derive(Clone, PartialEq, Serialize, Deserialize)]
146#[serde(default)]
147pub struct MetasrvOptions {
148    /// The address the server listens on.
149    #[deprecated(note = "Use grpc.bind_addr instead")]
150    pub bind_addr: String,
151    /// The address the server advertises to the clients.
152    #[deprecated(note = "Use grpc.server_addr instead")]
153    pub server_addr: String,
154    /// The address of the store, e.g., etcd.
155    pub store_addrs: Vec<String>,
156    /// TLS configuration for kv store backend (PostgreSQL/MySQL)
157    /// Only applicable when using PostgreSQL or MySQL as the metadata store
158    #[serde(default)]
159    pub backend_tls: Option<TlsOption>,
160    /// The backend client options.
161    /// Currently, only applicable when using etcd as the metadata store.
162    #[serde(default)]
163    pub backend_client: BackendClientOptions,
164    /// The type of selector.
165    pub selector: SelectorType,
166    /// Whether to enable region failover.
167    pub enable_region_failover: bool,
168    /// The base heartbeat interval.
169    ///
170    /// This value is used to calculate the distributed time constants for components.
171    /// e.g., the region lease time is `heartbeat_interval * 3 + Duration::from_secs(1)`.
172    #[serde(with = "humantime_serde")]
173    pub heartbeat_interval: Duration,
174    /// The delay before starting region failure detection.
175    /// This delay helps prevent Metasrv from triggering unnecessary region failovers before all Datanodes are fully started.
176    /// Especially useful when the cluster is not deployed with GreptimeDB Operator and maintenance mode is not enabled.
177    #[serde(with = "humantime_serde")]
178    pub region_failure_detector_initialization_delay: Duration,
179    /// Whether to allow region failover on local WAL.
180    ///
181    /// If it's true, the region failover will be allowed even if the local WAL is used.
182    /// Note that this option is not recommended to be set to true, because it may lead to data loss during failover.
183    pub allow_region_failover_on_local_wal: bool,
184    pub grpc: GrpcOptions,
185    /// The HTTP server options.
186    pub http: HttpOptions,
187    /// The logging options.
188    pub logging: LoggingOptions,
189    /// The procedure options.
190    pub procedure: ProcedureConfig,
191    /// The failure detector options.
192    pub failure_detector: PhiAccrualFailureDetectorOptions,
193    /// The datanode options.
194    pub datanode: DatanodeClientOptions,
195    /// Whether to enable telemetry.
196    pub enable_telemetry: bool,
197    /// The data home directory.
198    pub data_home: String,
199    /// The WAL options.
200    pub wal: MetasrvWalConfig,
201    /// The store key prefix. If it is not empty, all keys in the store will be prefixed with it.
202    /// This is useful when multiple metasrv clusters share the same store.
203    pub store_key_prefix: String,
204    /// The max operations per txn
205    ///
206    /// This value is usually limited by which store is used for the `KvBackend`.
207    /// For example, if using etcd, this value should ensure that it is less than
208    /// or equal to the `--max-txn-ops` option value of etcd.
209    ///
210    /// TODO(jeremy): Currently, this option only affects the etcd store, but it may
211    /// also affect other stores in the future. In other words, each store needs to
212    /// limit the number of operations in a txn because an infinitely large txn could
213    /// potentially block other operations.
214    pub max_txn_ops: usize,
215    /// The factor that determines how often statistics should be flushed,
216    /// based on the number of received heartbeats. When the number of heartbeats
217    /// reaches this factor, a flush operation is triggered.
218    pub flush_stats_factor: usize,
219    /// The tracing options.
220    pub tracing: TracingOptions,
221    /// The memory options.
222    pub memory: MemoryOptions,
223    /// The datastore for kv metadata.
224    pub backend: BackendImpl,
225    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
226    /// Table name of rds kv backend.
227    pub meta_table_name: String,
228    #[cfg(feature = "pg_kvbackend")]
229    /// Lock id for meta kv election. Only effect when using pg_kvbackend.
230    pub meta_election_lock_id: u64,
231    #[cfg(feature = "pg_kvbackend")]
232    /// Optional PostgreSQL schema for metadata table (defaults to current search_path if empty).
233    pub meta_schema_name: Option<String>,
234    #[cfg(feature = "pg_kvbackend")]
235    /// Automatically create PostgreSQL schema if it doesn't exist (default: true).
236    pub auto_create_schema: bool,
237    #[serde(with = "humantime_serde")]
238    pub node_max_idle_time: Duration,
239    /// The event recorder options.
240    pub event_recorder: EventRecorderOptions,
241    /// The stats persistence options.
242    pub stats_persistence: StatsPersistenceOptions,
243    /// The GC scheduler options.
244    pub gc: GcSchedulerOptions,
245}
246
247impl fmt::Debug for MetasrvOptions {
248    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
249        let mut debug_struct = f.debug_struct("MetasrvOptions");
250        debug_struct
251            .field("store_addrs", &self.sanitize_store_addrs())
252            .field("backend_tls", &self.backend_tls)
253            .field("selector", &self.selector)
254            .field("enable_region_failover", &self.enable_region_failover)
255            .field(
256                "allow_region_failover_on_local_wal",
257                &self.allow_region_failover_on_local_wal,
258            )
259            .field("grpc", &self.grpc)
260            .field("http", &self.http)
261            .field("logging", &self.logging)
262            .field("procedure", &self.procedure)
263            .field("failure_detector", &self.failure_detector)
264            .field("datanode", &self.datanode)
265            .field("enable_telemetry", &self.enable_telemetry)
266            .field("data_home", &self.data_home)
267            .field("wal", &self.wal)
268            .field("store_key_prefix", &self.store_key_prefix)
269            .field("max_txn_ops", &self.max_txn_ops)
270            .field("flush_stats_factor", &self.flush_stats_factor)
271            .field("tracing", &self.tracing)
272            .field("backend", &self.backend)
273            .field("event_recorder", &self.event_recorder)
274            .field("stats_persistence", &self.stats_persistence)
275            .field("heartbeat_interval", &self.heartbeat_interval)
276            .field("backend_client", &self.backend_client);
277
278        #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
279        debug_struct.field("meta_table_name", &self.meta_table_name);
280
281        #[cfg(feature = "pg_kvbackend")]
282        debug_struct.field("meta_election_lock_id", &self.meta_election_lock_id);
283        #[cfg(feature = "pg_kvbackend")]
284        debug_struct.field("meta_schema_name", &self.meta_schema_name);
285
286        debug_struct
287            .field("node_max_idle_time", &self.node_max_idle_time)
288            .finish()
289    }
290}
291
292const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
293
294impl Default for MetasrvOptions {
295    fn default() -> Self {
296        Self {
297            #[allow(deprecated)]
298            bind_addr: String::new(),
299            #[allow(deprecated)]
300            server_addr: String::new(),
301            store_addrs: vec!["127.0.0.1:2379".to_string()],
302            backend_tls: Some(TlsOption::prefer()),
303            selector: SelectorType::default(),
304            enable_region_failover: false,
305            heartbeat_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
306            region_failure_detector_initialization_delay: Duration::from_secs(10 * 60),
307            allow_region_failover_on_local_wal: false,
308            grpc: GrpcOptions {
309                bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
310                ..Default::default()
311            },
312            http: HttpOptions::default(),
313            logging: LoggingOptions::default(),
314            procedure: ProcedureConfig {
315                max_retry_times: 12,
316                retry_delay: Duration::from_millis(500),
317                // The etcd the maximum size of any request is 1.5 MiB
318                // 1500KiB = 1536KiB (1.5MiB) - 36KiB (reserved size of key)
319                max_metadata_value_size: Some(ReadableSize::kb(1500)),
320                max_running_procedures: 128,
321            },
322            failure_detector: PhiAccrualFailureDetectorOptions::default(),
323            datanode: DatanodeClientOptions::default(),
324            enable_telemetry: true,
325            data_home: DEFAULT_DATA_HOME.to_string(),
326            wal: MetasrvWalConfig::default(),
327            store_key_prefix: String::new(),
328            max_txn_ops: 128,
329            flush_stats_factor: 3,
330            tracing: TracingOptions::default(),
331            memory: MemoryOptions::default(),
332            backend: BackendImpl::EtcdStore,
333            #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
334            meta_table_name: common_meta::kv_backend::DEFAULT_META_TABLE_NAME.to_string(),
335            #[cfg(feature = "pg_kvbackend")]
336            meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID,
337            #[cfg(feature = "pg_kvbackend")]
338            meta_schema_name: None,
339            #[cfg(feature = "pg_kvbackend")]
340            auto_create_schema: true,
341            node_max_idle_time: Duration::from_secs(24 * 60 * 60),
342            event_recorder: EventRecorderOptions::default(),
343            stats_persistence: StatsPersistenceOptions::default(),
344            gc: GcSchedulerOptions::default(),
345            backend_client: BackendClientOptions::default(),
346        }
347    }
348}
349
350impl Configurable for MetasrvOptions {
351    fn env_list_keys() -> Option<&'static [&'static str]> {
352        Some(&["wal.broker_endpoints", "store_addrs"])
353    }
354}
355
356impl MetasrvOptions {
357    fn sanitize_store_addrs(&self) -> Vec<String> {
358        self.store_addrs
359            .iter()
360            .map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
361            .collect()
362    }
363}
364
365pub struct MetasrvInfo {
366    pub server_addr: String,
367}
368#[derive(Clone)]
369pub struct Context {
370    pub server_addr: String,
371    pub in_memory: ResettableKvBackendRef,
372    pub kv_backend: KvBackendRef,
373    pub leader_cached_kv_backend: ResettableKvBackendRef,
374    pub meta_peer_client: MetaPeerClientRef,
375    pub mailbox: MailboxRef,
376    pub election: Option<ElectionRef>,
377    pub is_infancy: bool,
378    pub table_metadata_manager: TableMetadataManagerRef,
379    pub cache_invalidator: CacheInvalidatorRef,
380    pub leader_region_registry: LeaderRegionRegistryRef,
381    pub topic_stats_registry: TopicStatsRegistryRef,
382}
383
384impl Context {
385    pub fn reset_in_memory(&self) {
386        self.in_memory.reset();
387        self.leader_region_registry.reset();
388    }
389}
390
391/// The value of the leader. It is used to store the leader's address.
392pub struct LeaderValue(pub String);
393
394impl<T: AsRef<[u8]>> From<T> for LeaderValue {
395    fn from(value: T) -> Self {
396        let string = String::from_utf8_lossy(value.as_ref());
397        Self(string.to_string())
398    }
399}
400
401#[derive(Debug, Clone, Serialize, Deserialize)]
402pub struct MetasrvNodeInfo {
403    // The metasrv's address
404    pub addr: String,
405    // The node build version
406    pub version: String,
407    // The node build git commit hash
408    pub git_commit: String,
409    // The node start timestamp in milliseconds
410    pub start_time_ms: u64,
411    // The node total cpu millicores
412    #[serde(default)]
413    pub total_cpu_millicores: i64,
414    // The node total memory bytes
415    #[serde(default)]
416    pub total_memory_bytes: i64,
417    /// The node build cpu usage millicores
418    #[serde(default)]
419    pub cpu_usage_millicores: i64,
420    /// The node build memory usage bytes
421    #[serde(default)]
422    pub memory_usage_bytes: i64,
423    // The node hostname
424    #[serde(default)]
425    pub hostname: String,
426}
427
428// TODO(zyy17): Allow deprecated fields for backward compatibility. Remove this when the deprecated top-level fields are removed from the proto.
429#[allow(deprecated)]
430impl From<MetasrvNodeInfo> for api::v1::meta::MetasrvNodeInfo {
431    fn from(node_info: MetasrvNodeInfo) -> Self {
432        Self {
433            peer: Some(api::v1::meta::Peer {
434                addr: node_info.addr,
435                ..Default::default()
436            }),
437            // TODO(zyy17): The following top-level fields are deprecated. They are kept for backward compatibility and will be removed in a future version.
438            // New code should use the fields in `info.NodeInfo` instead.
439            version: node_info.version.clone(),
440            git_commit: node_info.git_commit.clone(),
441            start_time_ms: node_info.start_time_ms,
442            cpus: node_info.total_cpu_millicores as u32,
443            memory_bytes: node_info.total_memory_bytes as u64,
444            // The canonical location for node information.
445            info: Some(api::v1::meta::NodeInfo {
446                version: node_info.version,
447                git_commit: node_info.git_commit,
448                start_time_ms: node_info.start_time_ms,
449                total_cpu_millicores: node_info.total_cpu_millicores,
450                total_memory_bytes: node_info.total_memory_bytes,
451                cpu_usage_millicores: node_info.cpu_usage_millicores,
452                memory_usage_bytes: node_info.memory_usage_bytes,
453                cpus: node_info.total_cpu_millicores as u32,
454                memory_bytes: node_info.total_memory_bytes as u64,
455                hostname: node_info.hostname,
456            }),
457        }
458    }
459}
460
461#[derive(Clone, Copy)]
462pub enum SelectTarget {
463    Datanode,
464    Flownode,
465}
466
467impl Display for SelectTarget {
468    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
469        match self {
470            SelectTarget::Datanode => write!(f, "datanode"),
471            SelectTarget::Flownode => write!(f, "flownode"),
472        }
473    }
474}
475
476#[derive(Clone)]
477pub struct SelectorContext {
478    pub peer_discovery: PeerDiscoveryRef,
479}
480
481pub type SelectorRef = Arc<dyn Selector<Context = SelectorContext, Output = Vec<Peer>>>;
482pub type RegionStatAwareSelectorRef =
483    Arc<dyn RegionStatAwareSelector<Context = SelectorContext, Output = Vec<(RegionId, Peer)>>>;
484pub type ElectionRef = Arc<dyn Election<Leader = LeaderValue>>;
485
486pub struct MetaStateHandler {
487    subscribe_manager: Option<SubscriptionManagerRef>,
488    greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
489    leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
490    leadership_change_notifier: LeadershipChangeNotifier,
491    mailbox: MailboxRef,
492    state: StateRef,
493}
494
495impl MetaStateHandler {
496    pub async fn on_leader_start(&self) {
497        self.state.write().unwrap().next_state(become_leader(false));
498
499        if let Err(e) = self.leader_cached_kv_backend.load().await {
500            error!(e; "Failed to load kv into leader cache kv store");
501        } else {
502            self.state.write().unwrap().next_state(become_leader(true));
503        }
504
505        self.leadership_change_notifier
506            .notify_on_leader_start()
507            .await;
508
509        self.greptimedb_telemetry_task.should_report(true);
510    }
511
512    pub async fn on_leader_stop(&self) {
513        self.state.write().unwrap().next_state(become_follower());
514
515        // Enforces the mailbox to clear all pushers.
516        // The remaining heartbeat connections will be closed by the remote peer or keep-alive detection.
517        self.mailbox.reset().await;
518        self.leadership_change_notifier
519            .notify_on_leader_stop()
520            .await;
521
522        // Suspends reporting.
523        self.greptimedb_telemetry_task.should_report(false);
524
525        if let Some(sub_manager) = self.subscribe_manager.clone() {
526            info!("Leader changed, un_subscribe all");
527            if let Err(e) = sub_manager.unsubscribe_all() {
528                error!(e; "Failed to un_subscribe all");
529            }
530        }
531    }
532}
533
534pub struct Metasrv {
535    state: StateRef,
536    started: Arc<AtomicBool>,
537    start_time_ms: u64,
538    options: MetasrvOptions,
539    // It is only valid at the leader node and is used to temporarily
540    // store some data that will not be persisted.
541    in_memory: ResettableKvBackendRef,
542    kv_backend: KvBackendRef,
543    leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
544    meta_peer_client: MetaPeerClientRef,
545    // The selector is used to select a target datanode.
546    selector: SelectorRef,
547    selector_ctx: SelectorContext,
548    // The flow selector is used to select a target flownode.
549    flow_selector: SelectorRef,
550    handler_group: RwLock<Option<HeartbeatHandlerGroupRef>>,
551    handler_group_builder: Mutex<Option<HeartbeatHandlerGroupBuilder>>,
552    election: Option<ElectionRef>,
553    procedure_manager: ProcedureManagerRef,
554    mailbox: MailboxRef,
555    ddl_manager: DdlManagerRef,
556    wal_options_allocator: WalOptionsAllocatorRef,
557    table_metadata_manager: TableMetadataManagerRef,
558    runtime_switch_manager: RuntimeSwitchManagerRef,
559    memory_region_keeper: MemoryRegionKeeperRef,
560    greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
561    region_migration_manager: RegionMigrationManagerRef,
562    region_supervisor_ticker: Option<RegionSupervisorTickerRef>,
563    cache_invalidator: CacheInvalidatorRef,
564    leader_region_registry: LeaderRegionRegistryRef,
565    topic_stats_registry: TopicStatsRegistryRef,
566    wal_prune_ticker: Option<WalPruneTickerRef>,
567    region_flush_ticker: Option<RegionFlushTickerRef>,
568    table_id_sequence: SequenceRef,
569    reconciliation_manager: ReconciliationManagerRef,
570    resource_stat: ResourceStatRef,
571    gc_ticker: Option<GcTickerRef>,
572
573    plugins: Plugins,
574}
575
576impl Metasrv {
577    pub async fn try_start(&self) -> Result<()> {
578        if self
579            .started
580            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
581            .is_err()
582        {
583            warn!("Metasrv already started");
584            return Ok(());
585        }
586
587        let handler_group_builder =
588            self.handler_group_builder
589                .lock()
590                .unwrap()
591                .take()
592                .context(error::UnexpectedSnafu {
593                    violated: "expected heartbeat handler group builder",
594                })?;
595        *self.handler_group.write().unwrap() = Some(Arc::new(handler_group_builder.build()?));
596
597        // Creates default schema if not exists
598        self.table_metadata_manager
599            .init()
600            .await
601            .context(InitMetadataSnafu)?;
602
603        if let Some(election) = self.election() {
604            let procedure_manager = self.procedure_manager.clone();
605            let in_memory = self.in_memory.clone();
606            let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
607            let subscribe_manager = self.subscription_manager();
608            let mut rx = election.subscribe_leader_change();
609            let greptimedb_telemetry_task = self.greptimedb_telemetry_task.clone();
610            greptimedb_telemetry_task
611                .start()
612                .context(StartTelemetryTaskSnafu)?;
613
614            // Builds leadership change notifier.
615            let mut leadership_change_notifier = LeadershipChangeNotifier::default();
616            leadership_change_notifier.add_listener(self.wal_options_allocator.clone());
617            leadership_change_notifier
618                .add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager)));
619            leadership_change_notifier.add_listener(Arc::new(NodeExpiryListener::new(
620                self.options.node_max_idle_time,
621                self.in_memory.clone(),
622            )));
623            if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker {
624                leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _);
625            }
626            if let Some(wal_prune_ticker) = &self.wal_prune_ticker {
627                leadership_change_notifier.add_listener(wal_prune_ticker.clone() as _);
628            }
629            if let Some(region_flush_trigger) = &self.region_flush_ticker {
630                leadership_change_notifier.add_listener(region_flush_trigger.clone() as _);
631            }
632            if let Some(gc_ticker) = &self.gc_ticker {
633                leadership_change_notifier.add_listener(gc_ticker.clone() as _);
634            }
635            if let Some(customizer) = self.plugins.get::<LeadershipChangeNotifierCustomizerRef>() {
636                customizer.customize(&mut leadership_change_notifier);
637            }
638
639            let state_handler = MetaStateHandler {
640                greptimedb_telemetry_task,
641                subscribe_manager,
642                state: self.state.clone(),
643                leader_cached_kv_backend: leader_cached_kv_backend.clone(),
644                leadership_change_notifier,
645                mailbox: self.mailbox.clone(),
646            };
647            let _handle = common_runtime::spawn_global(async move {
648                loop {
649                    match rx.recv().await {
650                        Ok(msg) => {
651                            in_memory.reset();
652                            leader_cached_kv_backend.reset();
653                            info!("Leader's cache has bean cleared on leader change: {msg}");
654                            match msg {
655                                LeaderChangeMessage::Elected(_) => {
656                                    state_handler.on_leader_start().await;
657                                }
658                                LeaderChangeMessage::StepDown(leader) => {
659                                    error!("Leader :{:?} step down", leader);
660
661                                    state_handler.on_leader_stop().await;
662                                }
663                            }
664                        }
665                        Err(RecvError::Closed) => {
666                            error!("Not expected, is leader election loop still running?");
667                            break;
668                        }
669                        Err(RecvError::Lagged(_)) => {
670                            break;
671                        }
672                    }
673                }
674
675                state_handler.on_leader_stop().await;
676            });
677
678            // Register candidate and keep lease in background.
679            {
680                let election = election.clone();
681                let started = self.started.clone();
682                let node_info = self.node_info();
683                let _handle = common_runtime::spawn_global(async move {
684                    while started.load(Ordering::Acquire) {
685                        let res = election.register_candidate(&node_info).await;
686                        if let Err(e) = res {
687                            warn!(e; "Metasrv register candidate error");
688                        }
689                    }
690                });
691            }
692
693            // Campaign
694            {
695                let election = election.clone();
696                let started = self.started.clone();
697                let _handle = common_runtime::spawn_global(async move {
698                    while started.load(Ordering::Acquire) {
699                        let res = election.campaign().await;
700                        if let Err(e) = res {
701                            warn!(e; "Metasrv election error");
702                        }
703                        election.reset_campaign().await;
704                        info!("Metasrv re-initiate election");
705                    }
706                    info!("Metasrv stopped");
707                });
708            }
709        } else {
710            warn!(
711                "Ensure only one instance of Metasrv is running, as there is no election service."
712            );
713
714            if let Err(e) = self.wal_options_allocator.start().await {
715                error!(e; "Failed to start wal options allocator");
716            }
717            // Always load kv into cached kv store.
718            self.leader_cached_kv_backend
719                .load()
720                .await
721                .context(KvBackendSnafu)?;
722            self.procedure_manager
723                .start()
724                .await
725                .context(StartProcedureManagerSnafu)?;
726        }
727
728        info!("Metasrv started");
729
730        Ok(())
731    }
732
733    pub async fn shutdown(&self) -> Result<()> {
734        if self
735            .started
736            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
737            .is_err()
738        {
739            warn!("Metasrv already stopped");
740            return Ok(());
741        }
742
743        self.procedure_manager
744            .stop()
745            .await
746            .context(StopProcedureManagerSnafu)?;
747
748        info!("Metasrv stopped");
749
750        Ok(())
751    }
752
753    pub fn start_time_ms(&self) -> u64 {
754        self.start_time_ms
755    }
756
757    pub fn resource_stat(&self) -> &ResourceStatRef {
758        &self.resource_stat
759    }
760
761    pub fn node_info(&self) -> MetasrvNodeInfo {
762        let build_info = common_version::build_info();
763        MetasrvNodeInfo {
764            addr: self.options().grpc.server_addr.clone(),
765            version: build_info.version.to_string(),
766            git_commit: build_info.commit_short.to_string(),
767            start_time_ms: self.start_time_ms(),
768            total_cpu_millicores: self.resource_stat.get_total_cpu_millicores(),
769            total_memory_bytes: self.resource_stat.get_total_memory_bytes(),
770            cpu_usage_millicores: self.resource_stat.get_cpu_usage_millicores(),
771            memory_usage_bytes: self.resource_stat.get_memory_usage_bytes(),
772            hostname: hostname::get()
773                .unwrap_or_default()
774                .to_string_lossy()
775                .to_string(),
776        }
777    }
778
779    /// Looks up a datanode peer by peer_id, returning it only when it's alive.
780    /// A datanode is considered alive when it's still within the lease period.
781    pub(crate) async fn lookup_datanode_peer(&self, peer_id: u64) -> Result<Option<Peer>> {
782        discovery::utils::alive_datanode(
783            &DefaultSystemTimer,
784            self.meta_peer_client.as_ref(),
785            peer_id,
786            default_distributed_time_constants().datanode_lease,
787        )
788        .await
789    }
790
791    pub fn options(&self) -> &MetasrvOptions {
792        &self.options
793    }
794
795    pub fn in_memory(&self) -> &ResettableKvBackendRef {
796        &self.in_memory
797    }
798
799    pub fn kv_backend(&self) -> &KvBackendRef {
800        &self.kv_backend
801    }
802
803    pub fn meta_peer_client(&self) -> &MetaPeerClientRef {
804        &self.meta_peer_client
805    }
806
807    pub fn selector(&self) -> &SelectorRef {
808        &self.selector
809    }
810
811    pub fn selector_ctx(&self) -> &SelectorContext {
812        &self.selector_ctx
813    }
814
815    pub fn flow_selector(&self) -> &SelectorRef {
816        &self.flow_selector
817    }
818
819    pub fn handler_group(&self) -> Option<HeartbeatHandlerGroupRef> {
820        self.handler_group.read().unwrap().clone()
821    }
822
823    pub fn election(&self) -> Option<&ElectionRef> {
824        self.election.as_ref()
825    }
826
827    pub fn mailbox(&self) -> &MailboxRef {
828        &self.mailbox
829    }
830
831    pub fn ddl_manager(&self) -> &DdlManagerRef {
832        &self.ddl_manager
833    }
834
835    pub fn procedure_manager(&self) -> &ProcedureManagerRef {
836        &self.procedure_manager
837    }
838
839    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
840        &self.table_metadata_manager
841    }
842
843    pub fn runtime_switch_manager(&self) -> &RuntimeSwitchManagerRef {
844        &self.runtime_switch_manager
845    }
846
847    pub fn memory_region_keeper(&self) -> &MemoryRegionKeeperRef {
848        &self.memory_region_keeper
849    }
850
851    pub fn region_migration_manager(&self) -> &RegionMigrationManagerRef {
852        &self.region_migration_manager
853    }
854
855    pub fn publish(&self) -> Option<PublisherRef> {
856        self.plugins.get::<PublisherRef>()
857    }
858
859    pub fn subscription_manager(&self) -> Option<SubscriptionManagerRef> {
860        self.plugins.get::<SubscriptionManagerRef>()
861    }
862
863    pub fn table_id_sequence(&self) -> &SequenceRef {
864        &self.table_id_sequence
865    }
866
867    pub fn reconciliation_manager(&self) -> &ReconciliationManagerRef {
868        &self.reconciliation_manager
869    }
870
871    pub fn plugins(&self) -> &Plugins {
872        &self.plugins
873    }
874
875    pub fn started(&self) -> Arc<AtomicBool> {
876        self.started.clone()
877    }
878
879    #[inline]
880    pub fn new_ctx(&self) -> Context {
881        let server_addr = self.options().grpc.server_addr.clone();
882        let in_memory = self.in_memory.clone();
883        let kv_backend = self.kv_backend.clone();
884        let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
885        let meta_peer_client = self.meta_peer_client.clone();
886        let mailbox = self.mailbox.clone();
887        let election = self.election.clone();
888        let table_metadata_manager = self.table_metadata_manager.clone();
889        let cache_invalidator = self.cache_invalidator.clone();
890        let leader_region_registry = self.leader_region_registry.clone();
891        let topic_stats_registry = self.topic_stats_registry.clone();
892
893        Context {
894            server_addr,
895            in_memory,
896            kv_backend,
897            leader_cached_kv_backend,
898            meta_peer_client,
899            mailbox,
900            election,
901            is_infancy: false,
902            table_metadata_manager,
903            cache_invalidator,
904            leader_region_registry,
905            topic_stats_registry,
906        }
907    }
908}
909
910#[cfg(test)]
911mod tests {
912    use crate::metasrv::MetasrvNodeInfo;
913
914    #[test]
915    fn test_deserialize_metasrv_node_info() {
916        let str = r#"{"addr":"127.0.0.1:4002","version":"0.1.0","git_commit":"1234567890","start_time_ms":1715145600}"#;
917        let node_info: MetasrvNodeInfo = serde_json::from_str(str).unwrap();
918        assert_eq!(node_info.addr, "127.0.0.1:4002");
919        assert_eq!(node_info.version, "0.1.0");
920        assert_eq!(node_info.git_commit, "1234567890");
921        assert_eq!(node_info.start_time_ms, 1715145600);
922    }
923}