meta_srv/
metasrv.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16
17use std::fmt::{self, Display};
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::sync::{Arc, Mutex, RwLock};
20use std::time::Duration;
21
22use clap::ValueEnum;
23use common_base::Plugins;
24use common_base::readable_size::ReadableSize;
25use common_config::{Configurable, DEFAULT_DATA_HOME};
26use common_event_recorder::EventRecorderOptions;
27use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
28use common_meta::cache_invalidator::CacheInvalidatorRef;
29use common_meta::ddl_manager::DdlManagerRef;
30use common_meta::distributed_time_constants;
31use common_meta::key::TableMetadataManagerRef;
32use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
33use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
34use common_meta::leadership_notifier::{
35    LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef,
36};
37use common_meta::node_expiry_listener::NodeExpiryListener;
38use common_meta::peer::{Peer, PeerDiscoveryRef};
39use common_meta::reconciliation::manager::ReconciliationManagerRef;
40use common_meta::region_keeper::MemoryRegionKeeperRef;
41use common_meta::region_registry::LeaderRegionRegistryRef;
42use common_meta::sequence::SequenceRef;
43use common_meta::stats::topic::TopicStatsRegistryRef;
44use common_meta::wal_options_allocator::WalOptionsAllocatorRef;
45use common_options::datanode::DatanodeClientOptions;
46use common_options::memory::MemoryOptions;
47use common_procedure::ProcedureManagerRef;
48use common_procedure::options::ProcedureConfig;
49use common_stat::ResourceStatRef;
50use common_telemetry::logging::{LoggingOptions, TracingOptions};
51use common_telemetry::{error, info, warn};
52use common_time::util::DefaultSystemTimer;
53use common_wal::config::MetasrvWalConfig;
54use serde::{Deserialize, Serialize};
55use servers::grpc::GrpcOptions;
56use servers::http::HttpOptions;
57use servers::tls::TlsOption;
58use snafu::{OptionExt, ResultExt};
59use store_api::storage::RegionId;
60use tokio::sync::broadcast::error::RecvError;
61
62use crate::cluster::MetaPeerClientRef;
63use crate::discovery;
64use crate::election::{Election, LeaderChangeMessage};
65use crate::error::{
66    self, InitMetadataSnafu, KvBackendSnafu, Result, StartProcedureManagerSnafu,
67    StartTelemetryTaskSnafu, StopProcedureManagerSnafu,
68};
69use crate::failure_detector::PhiAccrualFailureDetectorOptions;
70use crate::gc::{GcSchedulerOptions, GcTickerRef};
71use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatHandlerGroupRef};
72use crate::procedure::ProcedureManagerListenerAdapter;
73use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
74use crate::procedure::wal_prune::manager::WalPruneTickerRef;
75use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
76use crate::region::flush_trigger::RegionFlushTickerRef;
77use crate::region::supervisor::RegionSupervisorTickerRef;
78use crate::selector::{RegionStatAwareSelector, Selector, SelectorType};
79use crate::service::mailbox::MailboxRef;
80use crate::service::store::cached_kv::LeaderCachedKvBackend;
81use crate::state::{StateRef, become_follower, become_leader};
82
83pub const TABLE_ID_SEQ: &str = "table_id";
84pub const FLOW_ID_SEQ: &str = "flow_id";
85pub const METASRV_DATA_DIR: &str = "metasrv";
86
87// The datastores that implements metadata kvbackend.
88#[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)]
89#[serde(rename_all = "snake_case")]
90pub enum BackendImpl {
91    // Etcd as metadata storage.
92    #[default]
93    EtcdStore,
94    // In memory metadata storage - mostly used for testing.
95    MemoryStore,
96    #[cfg(feature = "pg_kvbackend")]
97    // Postgres as metadata storage.
98    PostgresStore,
99    #[cfg(feature = "mysql_kvbackend")]
100    // MySql as metadata storage.
101    MysqlStore,
102}
103
104/// Configuration options for the stats persistence.
105#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
106pub struct StatsPersistenceOptions {
107    /// TTL for the stats table that will be used to store the stats.
108    #[serde(with = "humantime_serde")]
109    pub ttl: Duration,
110    /// The interval to persist the stats.
111    #[serde(with = "humantime_serde")]
112    pub interval: Duration,
113}
114
115impl Default for StatsPersistenceOptions {
116    fn default() -> Self {
117        Self {
118            ttl: Duration::ZERO,
119            interval: Duration::from_mins(10),
120        }
121    }
122}
123
124#[derive(Clone, PartialEq, Serialize, Deserialize)]
125#[serde(default)]
126pub struct MetasrvOptions {
127    /// The address the server listens on.
128    #[deprecated(note = "Use grpc.bind_addr instead")]
129    pub bind_addr: String,
130    /// The address the server advertises to the clients.
131    #[deprecated(note = "Use grpc.server_addr instead")]
132    pub server_addr: String,
133    /// The address of the store, e.g., etcd.
134    pub store_addrs: Vec<String>,
135    /// TLS configuration for kv store backend (PostgreSQL/MySQL)
136    /// Only applicable when using PostgreSQL or MySQL as the metadata store
137    #[serde(default)]
138    pub backend_tls: Option<TlsOption>,
139    /// The type of selector.
140    pub selector: SelectorType,
141    /// Whether to use the memory store.
142    pub use_memory_store: bool,
143    /// Whether to enable region failover.
144    pub enable_region_failover: bool,
145    /// The delay before starting region failure detection.
146    /// This delay helps prevent Metasrv from triggering unnecessary region failovers before all Datanodes are fully started.
147    /// Especially useful when the cluster is not deployed with GreptimeDB Operator and maintenance mode is not enabled.
148    #[serde(with = "humantime_serde")]
149    pub region_failure_detector_initialization_delay: Duration,
150    /// Whether to allow region failover on local WAL.
151    ///
152    /// If it's true, the region failover will be allowed even if the local WAL is used.
153    /// Note that this option is not recommended to be set to true, because it may lead to data loss during failover.
154    pub allow_region_failover_on_local_wal: bool,
155    pub grpc: GrpcOptions,
156    /// The HTTP server options.
157    pub http: HttpOptions,
158    /// The logging options.
159    pub logging: LoggingOptions,
160    /// The procedure options.
161    pub procedure: ProcedureConfig,
162    /// The failure detector options.
163    pub failure_detector: PhiAccrualFailureDetectorOptions,
164    /// The datanode options.
165    pub datanode: DatanodeClientOptions,
166    /// Whether to enable telemetry.
167    pub enable_telemetry: bool,
168    /// The data home directory.
169    pub data_home: String,
170    /// The WAL options.
171    pub wal: MetasrvWalConfig,
172    /// The store key prefix. If it is not empty, all keys in the store will be prefixed with it.
173    /// This is useful when multiple metasrv clusters share the same store.
174    pub store_key_prefix: String,
175    /// The max operations per txn
176    ///
177    /// This value is usually limited by which store is used for the `KvBackend`.
178    /// For example, if using etcd, this value should ensure that it is less than
179    /// or equal to the `--max-txn-ops` option value of etcd.
180    ///
181    /// TODO(jeremy): Currently, this option only affects the etcd store, but it may
182    /// also affect other stores in the future. In other words, each store needs to
183    /// limit the number of operations in a txn because an infinitely large txn could
184    /// potentially block other operations.
185    pub max_txn_ops: usize,
186    /// The factor that determines how often statistics should be flushed,
187    /// based on the number of received heartbeats. When the number of heartbeats
188    /// reaches this factor, a flush operation is triggered.
189    pub flush_stats_factor: usize,
190    /// The tracing options.
191    pub tracing: TracingOptions,
192    /// The memory options.
193    pub memory: MemoryOptions,
194    /// The datastore for kv metadata.
195    pub backend: BackendImpl,
196    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
197    /// Table name of rds kv backend.
198    pub meta_table_name: String,
199    #[cfg(feature = "pg_kvbackend")]
200    /// Lock id for meta kv election. Only effect when using pg_kvbackend.
201    pub meta_election_lock_id: u64,
202    #[cfg(feature = "pg_kvbackend")]
203    /// Optional PostgreSQL schema for metadata table (defaults to current search_path if empty).
204    pub meta_schema_name: Option<String>,
205    #[serde(with = "humantime_serde")]
206    pub node_max_idle_time: Duration,
207    /// The event recorder options.
208    pub event_recorder: EventRecorderOptions,
209    /// The stats persistence options.
210    pub stats_persistence: StatsPersistenceOptions,
211    /// The GC scheduler options.
212    pub gc: GcSchedulerOptions,
213}
214
215impl fmt::Debug for MetasrvOptions {
216    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
217        let mut debug_struct = f.debug_struct("MetasrvOptions");
218        debug_struct
219            .field("store_addrs", &self.sanitize_store_addrs())
220            .field("backend_tls", &self.backend_tls)
221            .field("selector", &self.selector)
222            .field("use_memory_store", &self.use_memory_store)
223            .field("enable_region_failover", &self.enable_region_failover)
224            .field(
225                "allow_region_failover_on_local_wal",
226                &self.allow_region_failover_on_local_wal,
227            )
228            .field("grpc", &self.grpc)
229            .field("http", &self.http)
230            .field("logging", &self.logging)
231            .field("procedure", &self.procedure)
232            .field("failure_detector", &self.failure_detector)
233            .field("datanode", &self.datanode)
234            .field("enable_telemetry", &self.enable_telemetry)
235            .field("data_home", &self.data_home)
236            .field("wal", &self.wal)
237            .field("store_key_prefix", &self.store_key_prefix)
238            .field("max_txn_ops", &self.max_txn_ops)
239            .field("flush_stats_factor", &self.flush_stats_factor)
240            .field("tracing", &self.tracing)
241            .field("backend", &self.backend)
242            .field("event_recorder", &self.event_recorder)
243            .field("stats_persistence", &self.stats_persistence);
244
245        #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
246        debug_struct.field("meta_table_name", &self.meta_table_name);
247
248        #[cfg(feature = "pg_kvbackend")]
249        debug_struct.field("meta_election_lock_id", &self.meta_election_lock_id);
250        #[cfg(feature = "pg_kvbackend")]
251        debug_struct.field("meta_schema_name", &self.meta_schema_name);
252
253        debug_struct
254            .field("node_max_idle_time", &self.node_max_idle_time)
255            .finish()
256    }
257}
258
259const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
260
261impl Default for MetasrvOptions {
262    fn default() -> Self {
263        Self {
264            #[allow(deprecated)]
265            bind_addr: String::new(),
266            #[allow(deprecated)]
267            server_addr: String::new(),
268            store_addrs: vec!["127.0.0.1:2379".to_string()],
269            backend_tls: None,
270            selector: SelectorType::default(),
271            use_memory_store: false,
272            enable_region_failover: false,
273            region_failure_detector_initialization_delay: Duration::from_secs(10 * 60),
274            allow_region_failover_on_local_wal: false,
275            grpc: GrpcOptions {
276                bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
277                ..Default::default()
278            },
279            http: HttpOptions::default(),
280            logging: LoggingOptions::default(),
281            procedure: ProcedureConfig {
282                max_retry_times: 12,
283                retry_delay: Duration::from_millis(500),
284                // The etcd the maximum size of any request is 1.5 MiB
285                // 1500KiB = 1536KiB (1.5MiB) - 36KiB (reserved size of key)
286                max_metadata_value_size: Some(ReadableSize::kb(1500)),
287                max_running_procedures: 128,
288            },
289            failure_detector: PhiAccrualFailureDetectorOptions::default(),
290            datanode: DatanodeClientOptions::default(),
291            enable_telemetry: true,
292            data_home: DEFAULT_DATA_HOME.to_string(),
293            wal: MetasrvWalConfig::default(),
294            store_key_prefix: String::new(),
295            max_txn_ops: 128,
296            flush_stats_factor: 3,
297            tracing: TracingOptions::default(),
298            memory: MemoryOptions::default(),
299            backend: BackendImpl::EtcdStore,
300            #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
301            meta_table_name: common_meta::kv_backend::DEFAULT_META_TABLE_NAME.to_string(),
302            #[cfg(feature = "pg_kvbackend")]
303            meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID,
304            #[cfg(feature = "pg_kvbackend")]
305            meta_schema_name: None,
306            node_max_idle_time: Duration::from_secs(24 * 60 * 60),
307            event_recorder: EventRecorderOptions::default(),
308            stats_persistence: StatsPersistenceOptions::default(),
309            gc: GcSchedulerOptions::default(),
310        }
311    }
312}
313
314impl Configurable for MetasrvOptions {
315    fn env_list_keys() -> Option<&'static [&'static str]> {
316        Some(&["wal.broker_endpoints", "store_addrs"])
317    }
318}
319
320impl MetasrvOptions {
321    fn sanitize_store_addrs(&self) -> Vec<String> {
322        self.store_addrs
323            .iter()
324            .map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
325            .collect()
326    }
327}
328
329pub struct MetasrvInfo {
330    pub server_addr: String,
331}
332#[derive(Clone)]
333pub struct Context {
334    pub server_addr: String,
335    pub in_memory: ResettableKvBackendRef,
336    pub kv_backend: KvBackendRef,
337    pub leader_cached_kv_backend: ResettableKvBackendRef,
338    pub meta_peer_client: MetaPeerClientRef,
339    pub mailbox: MailboxRef,
340    pub election: Option<ElectionRef>,
341    pub is_infancy: bool,
342    pub table_metadata_manager: TableMetadataManagerRef,
343    pub cache_invalidator: CacheInvalidatorRef,
344    pub leader_region_registry: LeaderRegionRegistryRef,
345    pub topic_stats_registry: TopicStatsRegistryRef,
346}
347
348impl Context {
349    pub fn reset_in_memory(&self) {
350        self.in_memory.reset();
351        self.leader_region_registry.reset();
352    }
353}
354
355/// The value of the leader. It is used to store the leader's address.
356pub struct LeaderValue(pub String);
357
358impl<T: AsRef<[u8]>> From<T> for LeaderValue {
359    fn from(value: T) -> Self {
360        let string = String::from_utf8_lossy(value.as_ref());
361        Self(string.to_string())
362    }
363}
364
365#[derive(Debug, Clone, Serialize, Deserialize)]
366pub struct MetasrvNodeInfo {
367    // The metasrv's address
368    pub addr: String,
369    // The node build version
370    pub version: String,
371    // The node build git commit hash
372    pub git_commit: String,
373    // The node start timestamp in milliseconds
374    pub start_time_ms: u64,
375    // The node total cpu millicores
376    #[serde(default)]
377    pub total_cpu_millicores: i64,
378    // The node total memory bytes
379    #[serde(default)]
380    pub total_memory_bytes: i64,
381    /// The node build cpu usage millicores
382    #[serde(default)]
383    pub cpu_usage_millicores: i64,
384    /// The node build memory usage bytes
385    #[serde(default)]
386    pub memory_usage_bytes: i64,
387    // The node hostname
388    #[serde(default)]
389    pub hostname: String,
390}
391
392// TODO(zyy17): Allow deprecated fields for backward compatibility. Remove this when the deprecated top-level fields are removed from the proto.
393#[allow(deprecated)]
394impl From<MetasrvNodeInfo> for api::v1::meta::MetasrvNodeInfo {
395    fn from(node_info: MetasrvNodeInfo) -> Self {
396        Self {
397            peer: Some(api::v1::meta::Peer {
398                addr: node_info.addr,
399                ..Default::default()
400            }),
401            // TODO(zyy17): The following top-level fields are deprecated. They are kept for backward compatibility and will be removed in a future version.
402            // New code should use the fields in `info.NodeInfo` instead.
403            version: node_info.version.clone(),
404            git_commit: node_info.git_commit.clone(),
405            start_time_ms: node_info.start_time_ms,
406            cpus: node_info.total_cpu_millicores as u32,
407            memory_bytes: node_info.total_memory_bytes as u64,
408            // The canonical location for node information.
409            info: Some(api::v1::meta::NodeInfo {
410                version: node_info.version,
411                git_commit: node_info.git_commit,
412                start_time_ms: node_info.start_time_ms,
413                total_cpu_millicores: node_info.total_cpu_millicores,
414                total_memory_bytes: node_info.total_memory_bytes,
415                cpu_usage_millicores: node_info.cpu_usage_millicores,
416                memory_usage_bytes: node_info.memory_usage_bytes,
417                cpus: node_info.total_cpu_millicores as u32,
418                memory_bytes: node_info.total_memory_bytes as u64,
419                hostname: node_info.hostname,
420            }),
421        }
422    }
423}
424
425#[derive(Clone, Copy)]
426pub enum SelectTarget {
427    Datanode,
428    Flownode,
429}
430
431impl Display for SelectTarget {
432    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
433        match self {
434            SelectTarget::Datanode => write!(f, "datanode"),
435            SelectTarget::Flownode => write!(f, "flownode"),
436        }
437    }
438}
439
440#[derive(Clone)]
441pub struct SelectorContext {
442    pub peer_discovery: PeerDiscoveryRef,
443}
444
445pub type SelectorRef = Arc<dyn Selector<Context = SelectorContext, Output = Vec<Peer>>>;
446pub type RegionStatAwareSelectorRef =
447    Arc<dyn RegionStatAwareSelector<Context = SelectorContext, Output = Vec<(RegionId, Peer)>>>;
448pub type ElectionRef = Arc<dyn Election<Leader = LeaderValue>>;
449
450pub struct MetaStateHandler {
451    subscribe_manager: Option<SubscriptionManagerRef>,
452    greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
453    leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
454    leadership_change_notifier: LeadershipChangeNotifier,
455    state: StateRef,
456}
457
458impl MetaStateHandler {
459    pub async fn on_leader_start(&self) {
460        self.state.write().unwrap().next_state(become_leader(false));
461
462        if let Err(e) = self.leader_cached_kv_backend.load().await {
463            error!(e; "Failed to load kv into leader cache kv store");
464        } else {
465            self.state.write().unwrap().next_state(become_leader(true));
466        }
467
468        self.leadership_change_notifier
469            .notify_on_leader_start()
470            .await;
471
472        self.greptimedb_telemetry_task.should_report(true);
473    }
474
475    pub async fn on_leader_stop(&self) {
476        self.state.write().unwrap().next_state(become_follower());
477
478        self.leadership_change_notifier
479            .notify_on_leader_stop()
480            .await;
481
482        // Suspends reporting.
483        self.greptimedb_telemetry_task.should_report(false);
484
485        if let Some(sub_manager) = self.subscribe_manager.clone() {
486            info!("Leader changed, un_subscribe all");
487            if let Err(e) = sub_manager.unsubscribe_all() {
488                error!(e; "Failed to un_subscribe all");
489            }
490        }
491    }
492}
493
494pub struct Metasrv {
495    state: StateRef,
496    started: Arc<AtomicBool>,
497    start_time_ms: u64,
498    options: MetasrvOptions,
499    // It is only valid at the leader node and is used to temporarily
500    // store some data that will not be persisted.
501    in_memory: ResettableKvBackendRef,
502    kv_backend: KvBackendRef,
503    leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
504    meta_peer_client: MetaPeerClientRef,
505    // The selector is used to select a target datanode.
506    selector: SelectorRef,
507    selector_ctx: SelectorContext,
508    // The flow selector is used to select a target flownode.
509    flow_selector: SelectorRef,
510    handler_group: RwLock<Option<HeartbeatHandlerGroupRef>>,
511    handler_group_builder: Mutex<Option<HeartbeatHandlerGroupBuilder>>,
512    election: Option<ElectionRef>,
513    procedure_manager: ProcedureManagerRef,
514    mailbox: MailboxRef,
515    ddl_manager: DdlManagerRef,
516    wal_options_allocator: WalOptionsAllocatorRef,
517    table_metadata_manager: TableMetadataManagerRef,
518    runtime_switch_manager: RuntimeSwitchManagerRef,
519    memory_region_keeper: MemoryRegionKeeperRef,
520    greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
521    region_migration_manager: RegionMigrationManagerRef,
522    region_supervisor_ticker: Option<RegionSupervisorTickerRef>,
523    cache_invalidator: CacheInvalidatorRef,
524    leader_region_registry: LeaderRegionRegistryRef,
525    topic_stats_registry: TopicStatsRegistryRef,
526    wal_prune_ticker: Option<WalPruneTickerRef>,
527    region_flush_ticker: Option<RegionFlushTickerRef>,
528    table_id_sequence: SequenceRef,
529    reconciliation_manager: ReconciliationManagerRef,
530    resource_stat: ResourceStatRef,
531    gc_ticker: Option<GcTickerRef>,
532
533    plugins: Plugins,
534}
535
536impl Metasrv {
537    pub async fn try_start(&self) -> Result<()> {
538        if self
539            .started
540            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
541            .is_err()
542        {
543            warn!("Metasrv already started");
544            return Ok(());
545        }
546
547        let handler_group_builder =
548            self.handler_group_builder
549                .lock()
550                .unwrap()
551                .take()
552                .context(error::UnexpectedSnafu {
553                    violated: "expected heartbeat handler group builder",
554                })?;
555        *self.handler_group.write().unwrap() = Some(Arc::new(handler_group_builder.build()?));
556
557        // Creates default schema if not exists
558        self.table_metadata_manager
559            .init()
560            .await
561            .context(InitMetadataSnafu)?;
562
563        if let Some(election) = self.election() {
564            let procedure_manager = self.procedure_manager.clone();
565            let in_memory = self.in_memory.clone();
566            let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
567            let subscribe_manager = self.subscription_manager();
568            let mut rx = election.subscribe_leader_change();
569            let greptimedb_telemetry_task = self.greptimedb_telemetry_task.clone();
570            greptimedb_telemetry_task
571                .start()
572                .context(StartTelemetryTaskSnafu)?;
573
574            // Builds leadership change notifier.
575            let mut leadership_change_notifier = LeadershipChangeNotifier::default();
576            leadership_change_notifier.add_listener(self.wal_options_allocator.clone());
577            leadership_change_notifier
578                .add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager)));
579            leadership_change_notifier.add_listener(Arc::new(NodeExpiryListener::new(
580                self.options.node_max_idle_time,
581                self.in_memory.clone(),
582            )));
583            if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker {
584                leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _);
585            }
586            if let Some(wal_prune_ticker) = &self.wal_prune_ticker {
587                leadership_change_notifier.add_listener(wal_prune_ticker.clone() as _);
588            }
589            if let Some(region_flush_trigger) = &self.region_flush_ticker {
590                leadership_change_notifier.add_listener(region_flush_trigger.clone() as _);
591            }
592            if let Some(gc_ticker) = &self.gc_ticker {
593                leadership_change_notifier.add_listener(gc_ticker.clone() as _);
594            }
595            if let Some(customizer) = self.plugins.get::<LeadershipChangeNotifierCustomizerRef>() {
596                customizer.customize(&mut leadership_change_notifier);
597            }
598
599            let state_handler = MetaStateHandler {
600                greptimedb_telemetry_task,
601                subscribe_manager,
602                state: self.state.clone(),
603                leader_cached_kv_backend: leader_cached_kv_backend.clone(),
604                leadership_change_notifier,
605            };
606            let _handle = common_runtime::spawn_global(async move {
607                loop {
608                    match rx.recv().await {
609                        Ok(msg) => {
610                            in_memory.reset();
611                            leader_cached_kv_backend.reset();
612                            info!("Leader's cache has bean cleared on leader change: {msg}");
613                            match msg {
614                                LeaderChangeMessage::Elected(_) => {
615                                    state_handler.on_leader_start().await;
616                                }
617                                LeaderChangeMessage::StepDown(leader) => {
618                                    error!("Leader :{:?} step down", leader);
619
620                                    state_handler.on_leader_stop().await;
621                                }
622                            }
623                        }
624                        Err(RecvError::Closed) => {
625                            error!("Not expected, is leader election loop still running?");
626                            break;
627                        }
628                        Err(RecvError::Lagged(_)) => {
629                            break;
630                        }
631                    }
632                }
633
634                state_handler.on_leader_stop().await;
635            });
636
637            // Register candidate and keep lease in background.
638            {
639                let election = election.clone();
640                let started = self.started.clone();
641                let node_info = self.node_info();
642                let _handle = common_runtime::spawn_global(async move {
643                    while started.load(Ordering::Acquire) {
644                        let res = election.register_candidate(&node_info).await;
645                        if let Err(e) = res {
646                            warn!(e; "Metasrv register candidate error");
647                        }
648                    }
649                });
650            }
651
652            // Campaign
653            {
654                let election = election.clone();
655                let started = self.started.clone();
656                let _handle = common_runtime::spawn_global(async move {
657                    while started.load(Ordering::Acquire) {
658                        let res = election.campaign().await;
659                        if let Err(e) = res {
660                            warn!(e; "Metasrv election error");
661                        }
662                        election.reset_campaign().await;
663                        info!("Metasrv re-initiate election");
664                    }
665                    info!("Metasrv stopped");
666                });
667            }
668        } else {
669            warn!(
670                "Ensure only one instance of Metasrv is running, as there is no election service."
671            );
672
673            if let Err(e) = self.wal_options_allocator.start().await {
674                error!(e; "Failed to start wal options allocator");
675            }
676            // Always load kv into cached kv store.
677            self.leader_cached_kv_backend
678                .load()
679                .await
680                .context(KvBackendSnafu)?;
681            self.procedure_manager
682                .start()
683                .await
684                .context(StartProcedureManagerSnafu)?;
685        }
686
687        info!("Metasrv started");
688
689        Ok(())
690    }
691
692    pub async fn shutdown(&self) -> Result<()> {
693        if self
694            .started
695            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
696            .is_err()
697        {
698            warn!("Metasrv already stopped");
699            return Ok(());
700        }
701
702        self.procedure_manager
703            .stop()
704            .await
705            .context(StopProcedureManagerSnafu)?;
706
707        info!("Metasrv stopped");
708
709        Ok(())
710    }
711
712    pub fn start_time_ms(&self) -> u64 {
713        self.start_time_ms
714    }
715
716    pub fn resource_stat(&self) -> &ResourceStatRef {
717        &self.resource_stat
718    }
719
720    pub fn node_info(&self) -> MetasrvNodeInfo {
721        let build_info = common_version::build_info();
722        MetasrvNodeInfo {
723            addr: self.options().grpc.server_addr.clone(),
724            version: build_info.version.to_string(),
725            git_commit: build_info.commit_short.to_string(),
726            start_time_ms: self.start_time_ms(),
727            total_cpu_millicores: self.resource_stat.get_total_cpu_millicores(),
728            total_memory_bytes: self.resource_stat.get_total_memory_bytes(),
729            cpu_usage_millicores: self.resource_stat.get_cpu_usage_millicores(),
730            memory_usage_bytes: self.resource_stat.get_memory_usage_bytes(),
731            hostname: hostname::get()
732                .unwrap_or_default()
733                .to_string_lossy()
734                .to_string(),
735        }
736    }
737
738    /// Looks up a datanode peer by peer_id, returning it only when it's alive.
739    /// A datanode is considered alive when it's still within the lease period.
740    pub(crate) async fn lookup_datanode_peer(&self, peer_id: u64) -> Result<Option<Peer>> {
741        discovery::utils::alive_datanode(
742            &DefaultSystemTimer,
743            self.meta_peer_client.as_ref(),
744            peer_id,
745            Duration::from_secs(distributed_time_constants::DATANODE_LEASE_SECS),
746        )
747        .await
748    }
749
750    pub fn options(&self) -> &MetasrvOptions {
751        &self.options
752    }
753
754    pub fn in_memory(&self) -> &ResettableKvBackendRef {
755        &self.in_memory
756    }
757
758    pub fn kv_backend(&self) -> &KvBackendRef {
759        &self.kv_backend
760    }
761
762    pub fn meta_peer_client(&self) -> &MetaPeerClientRef {
763        &self.meta_peer_client
764    }
765
766    pub fn selector(&self) -> &SelectorRef {
767        &self.selector
768    }
769
770    pub fn selector_ctx(&self) -> &SelectorContext {
771        &self.selector_ctx
772    }
773
774    pub fn flow_selector(&self) -> &SelectorRef {
775        &self.flow_selector
776    }
777
778    pub fn handler_group(&self) -> Option<HeartbeatHandlerGroupRef> {
779        self.handler_group.read().unwrap().clone()
780    }
781
782    pub fn election(&self) -> Option<&ElectionRef> {
783        self.election.as_ref()
784    }
785
786    pub fn mailbox(&self) -> &MailboxRef {
787        &self.mailbox
788    }
789
790    pub fn ddl_manager(&self) -> &DdlManagerRef {
791        &self.ddl_manager
792    }
793
794    pub fn procedure_manager(&self) -> &ProcedureManagerRef {
795        &self.procedure_manager
796    }
797
798    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
799        &self.table_metadata_manager
800    }
801
802    pub fn runtime_switch_manager(&self) -> &RuntimeSwitchManagerRef {
803        &self.runtime_switch_manager
804    }
805
806    pub fn memory_region_keeper(&self) -> &MemoryRegionKeeperRef {
807        &self.memory_region_keeper
808    }
809
810    pub fn region_migration_manager(&self) -> &RegionMigrationManagerRef {
811        &self.region_migration_manager
812    }
813
814    pub fn publish(&self) -> Option<PublisherRef> {
815        self.plugins.get::<PublisherRef>()
816    }
817
818    pub fn subscription_manager(&self) -> Option<SubscriptionManagerRef> {
819        self.plugins.get::<SubscriptionManagerRef>()
820    }
821
822    pub fn table_id_sequence(&self) -> &SequenceRef {
823        &self.table_id_sequence
824    }
825
826    pub fn reconciliation_manager(&self) -> &ReconciliationManagerRef {
827        &self.reconciliation_manager
828    }
829
830    pub fn plugins(&self) -> &Plugins {
831        &self.plugins
832    }
833
834    pub fn started(&self) -> Arc<AtomicBool> {
835        self.started.clone()
836    }
837
838    #[inline]
839    pub fn new_ctx(&self) -> Context {
840        let server_addr = self.options().grpc.server_addr.clone();
841        let in_memory = self.in_memory.clone();
842        let kv_backend = self.kv_backend.clone();
843        let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
844        let meta_peer_client = self.meta_peer_client.clone();
845        let mailbox = self.mailbox.clone();
846        let election = self.election.clone();
847        let table_metadata_manager = self.table_metadata_manager.clone();
848        let cache_invalidator = self.cache_invalidator.clone();
849        let leader_region_registry = self.leader_region_registry.clone();
850        let topic_stats_registry = self.topic_stats_registry.clone();
851
852        Context {
853            server_addr,
854            in_memory,
855            kv_backend,
856            leader_cached_kv_backend,
857            meta_peer_client,
858            mailbox,
859            election,
860            is_infancy: false,
861            table_metadata_manager,
862            cache_invalidator,
863            leader_region_registry,
864            topic_stats_registry,
865        }
866    }
867}
868
869#[cfg(test)]
870mod tests {
871    use crate::metasrv::MetasrvNodeInfo;
872
873    #[test]
874    fn test_deserialize_metasrv_node_info() {
875        let str = r#"{"addr":"127.0.0.1:4002","version":"0.1.0","git_commit":"1234567890","start_time_ms":1715145600}"#;
876        let node_info: MetasrvNodeInfo = serde_json::from_str(str).unwrap();
877        assert_eq!(node_info.addr, "127.0.0.1:4002");
878        assert_eq!(node_info.version, "0.1.0");
879        assert_eq!(node_info.git_commit, "1234567890");
880        assert_eq!(node_info.start_time_ms, 1715145600);
881    }
882}