meta_srv/
metasrv.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16
17use std::fmt::{self, Display};
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::sync::{Arc, Mutex, RwLock};
20use std::time::Duration;
21
22use api::v1::meta::{HeartbeatConfig, Role};
23use clap::ValueEnum;
24use common_base::Plugins;
25use common_base::readable_size::ReadableSize;
26use common_config::{Configurable, DEFAULT_DATA_HOME};
27use common_event_recorder::EventRecorderOptions;
28use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
29use common_meta::cache_invalidator::CacheInvalidatorRef;
30use common_meta::ddl::allocator::resource_id::ResourceIdAllocatorRef;
31use common_meta::ddl_manager::DdlManagerRef;
32use common_meta::distributed_time_constants::{
33    self, BASE_HEARTBEAT_INTERVAL, default_distributed_time_constants, frontend_heartbeat_interval,
34};
35use common_meta::key::TableMetadataManagerRef;
36use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
37use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
38use common_meta::leadership_notifier::{
39    LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef,
40};
41use common_meta::node_expiry_listener::NodeExpiryListener;
42use common_meta::peer::{Peer, PeerDiscoveryRef};
43use common_meta::reconciliation::manager::ReconciliationManagerRef;
44use common_meta::region_keeper::MemoryRegionKeeperRef;
45use common_meta::region_registry::LeaderRegionRegistryRef;
46use common_meta::stats::topic::TopicStatsRegistryRef;
47use common_meta::wal_provider::WalProviderRef;
48use common_options::datanode::DatanodeClientOptions;
49use common_options::memory::MemoryOptions;
50use common_procedure::ProcedureManagerRef;
51use common_procedure::options::ProcedureConfig;
52use common_stat::ResourceStatRef;
53use common_telemetry::logging::{LoggingOptions, TracingOptions};
54use common_telemetry::{error, info, warn};
55use common_time::util::DefaultSystemTimer;
56use common_wal::config::MetasrvWalConfig;
57use serde::{Deserialize, Serialize};
58use servers::grpc::GrpcOptions;
59use servers::http::HttpOptions;
60use servers::tls::TlsOption;
61use snafu::{OptionExt, ResultExt};
62use store_api::storage::RegionId;
63use tokio::sync::broadcast::error::RecvError;
64
65use crate::cluster::MetaPeerClientRef;
66use crate::discovery;
67use crate::election::{Election, LeaderChangeMessage};
68use crate::error::{
69    self, InitMetadataSnafu, KvBackendSnafu, Result, StartProcedureManagerSnafu,
70    StartTelemetryTaskSnafu, StopProcedureManagerSnafu,
71};
72use crate::failure_detector::PhiAccrualFailureDetectorOptions;
73use crate::gc::{GcSchedulerOptions, GcTickerRef};
74use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatHandlerGroupRef};
75use crate::procedure::ProcedureManagerListenerAdapter;
76use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
77use crate::procedure::wal_prune::manager::WalPruneTickerRef;
78use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
79use crate::region::flush_trigger::RegionFlushTickerRef;
80use crate::region::supervisor::RegionSupervisorTickerRef;
81use crate::selector::{RegionStatAwareSelector, Selector, SelectorType};
82use crate::service::mailbox::MailboxRef;
83use crate::service::store::cached_kv::LeaderCachedKvBackend;
84use crate::state::{StateRef, become_follower, become_leader};
85
86pub const TABLE_ID_SEQ: &str = "table_id";
87pub const FLOW_ID_SEQ: &str = "flow_id";
88pub const METASRV_DATA_DIR: &str = "metasrv";
89
90// The datastores that implements metadata kvbackend.
91#[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)]
92#[serde(rename_all = "snake_case")]
93pub enum BackendImpl {
94    // Etcd as metadata storage.
95    #[default]
96    EtcdStore,
97    // In memory metadata storage - mostly used for testing.
98    MemoryStore,
99    #[cfg(feature = "pg_kvbackend")]
100    // Postgres as metadata storage.
101    PostgresStore,
102    #[cfg(feature = "mysql_kvbackend")]
103    // MySql as metadata storage.
104    MysqlStore,
105}
106
107/// Configuration options for the stats persistence.
108#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
109pub struct StatsPersistenceOptions {
110    /// TTL for the stats table that will be used to store the stats.
111    #[serde(with = "humantime_serde")]
112    pub ttl: Duration,
113    /// The interval to persist the stats.
114    #[serde(with = "humantime_serde")]
115    pub interval: Duration,
116}
117
118impl Default for StatsPersistenceOptions {
119    fn default() -> Self {
120        Self {
121            ttl: Duration::ZERO,
122            interval: Duration::from_mins(10),
123        }
124    }
125}
126
127/// Heartbeat configuration for a single node type.
128#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
129#[serde(default)]
130pub struct HeartbeatOptions {
131    /// Heartbeat interval.
132    #[serde(with = "humantime_serde")]
133    pub interval: Duration,
134    /// Retry interval when heartbeat connection fails.
135    #[serde(with = "humantime_serde")]
136    pub retry_interval: Duration,
137}
138
139impl Default for HeartbeatOptions {
140    fn default() -> Self {
141        Self {
142            interval: BASE_HEARTBEAT_INTERVAL,
143            retry_interval: BASE_HEARTBEAT_INTERVAL,
144        }
145    }
146}
147
148impl HeartbeatOptions {
149    pub fn datanode_from(base_interval: Duration) -> Self {
150        Self {
151            interval: base_interval,
152            retry_interval: base_interval,
153        }
154    }
155
156    pub fn frontend_from(base_interval: Duration) -> Self {
157        Self {
158            interval: frontend_heartbeat_interval(base_interval),
159            retry_interval: base_interval,
160        }
161    }
162
163    pub fn flownode_from(base_interval: Duration) -> Self {
164        Self {
165            interval: base_interval,
166            retry_interval: base_interval,
167        }
168    }
169}
170
171impl From<HeartbeatOptions> for HeartbeatConfig {
172    fn from(opts: HeartbeatOptions) -> Self {
173        Self {
174            heartbeat_interval_ms: opts.interval.as_millis() as u64,
175            retry_interval_ms: opts.retry_interval.as_millis() as u64,
176        }
177    }
178}
179
180#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
181#[serde(default)]
182pub struct BackendClientOptions {
183    #[serde(with = "humantime_serde")]
184    pub keep_alive_timeout: Duration,
185    #[serde(with = "humantime_serde")]
186    pub keep_alive_interval: Duration,
187    #[serde(with = "humantime_serde")]
188    pub connect_timeout: Duration,
189}
190
191impl Default for BackendClientOptions {
192    fn default() -> Self {
193        Self {
194            keep_alive_interval: Duration::from_secs(10),
195            keep_alive_timeout: Duration::from_secs(3),
196            connect_timeout: Duration::from_secs(3),
197        }
198    }
199}
200
201#[derive(Clone, PartialEq, Serialize, Deserialize)]
202#[serde(default)]
203pub struct MetasrvOptions {
204    /// The address the server listens on.
205    #[deprecated(note = "Use grpc.bind_addr instead")]
206    pub bind_addr: String,
207    /// The address the server advertises to the clients.
208    #[deprecated(note = "Use grpc.server_addr instead")]
209    pub server_addr: String,
210    /// The address of the store, e.g., etcd.
211    pub store_addrs: Vec<String>,
212    /// TLS configuration for kv store backend (PostgreSQL/MySQL)
213    /// Only applicable when using PostgreSQL or MySQL as the metadata store
214    #[serde(default)]
215    pub backend_tls: Option<TlsOption>,
216    /// The backend client options.
217    /// Currently, only applicable when using etcd as the metadata store.
218    #[serde(default)]
219    pub backend_client: BackendClientOptions,
220    /// The type of selector.
221    pub selector: SelectorType,
222    /// Whether to enable region failover.
223    pub enable_region_failover: bool,
224    /// The base heartbeat interval.
225    ///
226    /// This value is used to calculate the distributed time constants for components.
227    /// e.g., the region lease time is `heartbeat_interval * 3 + Duration::from_secs(1)`.
228    #[serde(with = "humantime_serde")]
229    pub heartbeat_interval: Duration,
230    /// The delay before starting region failure detection.
231    /// This delay helps prevent Metasrv from triggering unnecessary region failovers before all Datanodes are fully started.
232    /// Especially useful when the cluster is not deployed with GreptimeDB Operator and maintenance mode is not enabled.
233    #[serde(with = "humantime_serde")]
234    pub region_failure_detector_initialization_delay: Duration,
235    /// Whether to allow region failover on local WAL.
236    ///
237    /// If it's true, the region failover will be allowed even if the local WAL is used.
238    /// Note that this option is not recommended to be set to true, because it may lead to data loss during failover.
239    pub allow_region_failover_on_local_wal: bool,
240    pub grpc: GrpcOptions,
241    /// The HTTP server options.
242    pub http: HttpOptions,
243    /// The logging options.
244    pub logging: LoggingOptions,
245    /// The procedure options.
246    pub procedure: ProcedureConfig,
247    /// The failure detector options.
248    pub failure_detector: PhiAccrualFailureDetectorOptions,
249    /// The datanode options.
250    pub datanode: DatanodeClientOptions,
251    /// Whether to enable telemetry.
252    pub enable_telemetry: bool,
253    /// The data home directory.
254    pub data_home: String,
255    /// The WAL options.
256    pub wal: MetasrvWalConfig,
257    /// The store key prefix. If it is not empty, all keys in the store will be prefixed with it.
258    /// This is useful when multiple metasrv clusters share the same store.
259    pub store_key_prefix: String,
260    /// The max operations per txn
261    ///
262    /// This value is usually limited by which store is used for the `KvBackend`.
263    /// For example, if using etcd, this value should ensure that it is less than
264    /// or equal to the `--max-txn-ops` option value of etcd.
265    ///
266    /// TODO(jeremy): Currently, this option only affects the etcd store, but it may
267    /// also affect other stores in the future. In other words, each store needs to
268    /// limit the number of operations in a txn because an infinitely large txn could
269    /// potentially block other operations.
270    pub max_txn_ops: usize,
271    /// The factor that determines how often statistics should be flushed,
272    /// based on the number of received heartbeats. When the number of heartbeats
273    /// reaches this factor, a flush operation is triggered.
274    pub flush_stats_factor: usize,
275    /// The tracing options.
276    pub tracing: TracingOptions,
277    /// The memory options.
278    pub memory: MemoryOptions,
279    /// The datastore for kv metadata.
280    pub backend: BackendImpl,
281    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
282    /// Table name of rds kv backend.
283    pub meta_table_name: String,
284    #[cfg(feature = "pg_kvbackend")]
285    /// Lock id for meta kv election. Only effect when using pg_kvbackend.
286    pub meta_election_lock_id: u64,
287    #[cfg(feature = "pg_kvbackend")]
288    /// Optional PostgreSQL schema for metadata table (defaults to current search_path if empty).
289    pub meta_schema_name: Option<String>,
290    #[cfg(feature = "pg_kvbackend")]
291    /// Automatically create PostgreSQL schema if it doesn't exist (default: true).
292    pub auto_create_schema: bool,
293    #[serde(with = "humantime_serde")]
294    pub node_max_idle_time: Duration,
295    /// The event recorder options.
296    pub event_recorder: EventRecorderOptions,
297    /// The stats persistence options.
298    pub stats_persistence: StatsPersistenceOptions,
299    /// The GC scheduler options.
300    pub gc: GcSchedulerOptions,
301}
302
303impl fmt::Debug for MetasrvOptions {
304    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
305        let mut debug_struct = f.debug_struct("MetasrvOptions");
306        debug_struct
307            .field("store_addrs", &self.sanitize_store_addrs())
308            .field("backend_tls", &self.backend_tls)
309            .field("selector", &self.selector)
310            .field("enable_region_failover", &self.enable_region_failover)
311            .field(
312                "allow_region_failover_on_local_wal",
313                &self.allow_region_failover_on_local_wal,
314            )
315            .field("grpc", &self.grpc)
316            .field("http", &self.http)
317            .field("logging", &self.logging)
318            .field("procedure", &self.procedure)
319            .field("failure_detector", &self.failure_detector)
320            .field("datanode", &self.datanode)
321            .field("enable_telemetry", &self.enable_telemetry)
322            .field("data_home", &self.data_home)
323            .field("wal", &self.wal)
324            .field("store_key_prefix", &self.store_key_prefix)
325            .field("max_txn_ops", &self.max_txn_ops)
326            .field("flush_stats_factor", &self.flush_stats_factor)
327            .field("tracing", &self.tracing)
328            .field("backend", &self.backend)
329            .field("event_recorder", &self.event_recorder)
330            .field("stats_persistence", &self.stats_persistence)
331            .field("heartbeat_interval", &self.heartbeat_interval)
332            .field("backend_client", &self.backend_client);
333
334        #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
335        debug_struct.field("meta_table_name", &self.meta_table_name);
336
337        #[cfg(feature = "pg_kvbackend")]
338        debug_struct.field("meta_election_lock_id", &self.meta_election_lock_id);
339        #[cfg(feature = "pg_kvbackend")]
340        debug_struct.field("meta_schema_name", &self.meta_schema_name);
341
342        debug_struct
343            .field("node_max_idle_time", &self.node_max_idle_time)
344            .finish()
345    }
346}
347
348const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
349
350impl Default for MetasrvOptions {
351    fn default() -> Self {
352        Self {
353            #[allow(deprecated)]
354            bind_addr: String::new(),
355            #[allow(deprecated)]
356            server_addr: String::new(),
357            store_addrs: vec!["127.0.0.1:2379".to_string()],
358            backend_tls: Some(TlsOption::prefer()),
359            selector: SelectorType::default(),
360            enable_region_failover: false,
361            heartbeat_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
362            region_failure_detector_initialization_delay: Duration::from_secs(10 * 60),
363            allow_region_failover_on_local_wal: false,
364            grpc: GrpcOptions {
365                bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
366                ..Default::default()
367            },
368            http: HttpOptions::default(),
369            logging: LoggingOptions::default(),
370            procedure: ProcedureConfig {
371                max_retry_times: 12,
372                retry_delay: Duration::from_millis(500),
373                // The etcd the maximum size of any request is 1.5 MiB
374                // 1500KiB = 1536KiB (1.5MiB) - 36KiB (reserved size of key)
375                max_metadata_value_size: Some(ReadableSize::kb(1500)),
376                max_running_procedures: 128,
377            },
378            failure_detector: PhiAccrualFailureDetectorOptions::default(),
379            datanode: DatanodeClientOptions::default(),
380            enable_telemetry: true,
381            data_home: DEFAULT_DATA_HOME.to_string(),
382            wal: MetasrvWalConfig::default(),
383            store_key_prefix: String::new(),
384            max_txn_ops: 128,
385            flush_stats_factor: 3,
386            tracing: TracingOptions::default(),
387            memory: MemoryOptions::default(),
388            backend: BackendImpl::EtcdStore,
389            #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
390            meta_table_name: common_meta::kv_backend::DEFAULT_META_TABLE_NAME.to_string(),
391            #[cfg(feature = "pg_kvbackend")]
392            meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID,
393            #[cfg(feature = "pg_kvbackend")]
394            meta_schema_name: None,
395            #[cfg(feature = "pg_kvbackend")]
396            auto_create_schema: true,
397            node_max_idle_time: Duration::from_secs(24 * 60 * 60),
398            event_recorder: EventRecorderOptions::default(),
399            stats_persistence: StatsPersistenceOptions::default(),
400            gc: GcSchedulerOptions::default(),
401            backend_client: BackendClientOptions::default(),
402        }
403    }
404}
405
406impl Configurable for MetasrvOptions {
407    fn env_list_keys() -> Option<&'static [&'static str]> {
408        Some(&["wal.broker_endpoints", "store_addrs"])
409    }
410}
411
412impl MetasrvOptions {
413    fn sanitize_store_addrs(&self) -> Vec<String> {
414        self.store_addrs
415            .iter()
416            .map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
417            .collect()
418    }
419}
420
421pub struct MetasrvInfo {
422    pub server_addr: String,
423}
424#[derive(Clone)]
425pub struct Context {
426    pub server_addr: String,
427    pub in_memory: ResettableKvBackendRef,
428    pub kv_backend: KvBackendRef,
429    pub leader_cached_kv_backend: ResettableKvBackendRef,
430    pub meta_peer_client: MetaPeerClientRef,
431    pub mailbox: MailboxRef,
432    pub election: Option<ElectionRef>,
433    pub is_infancy: bool,
434    pub table_metadata_manager: TableMetadataManagerRef,
435    pub cache_invalidator: CacheInvalidatorRef,
436    pub leader_region_registry: LeaderRegionRegistryRef,
437    pub topic_stats_registry: TopicStatsRegistryRef,
438    pub heartbeat_interval: Duration,
439    pub is_handshake: bool,
440}
441
442impl Context {
443    pub fn reset_in_memory(&self) {
444        self.in_memory.reset();
445        self.leader_region_registry.reset();
446    }
447
448    pub fn with_handshake(mut self, is_handshake: bool) -> Self {
449        self.is_handshake = is_handshake;
450        self
451    }
452
453    pub fn heartbeat_options_for(&self, role: Role) -> HeartbeatOptions {
454        match role {
455            Role::Datanode => HeartbeatOptions::datanode_from(self.heartbeat_interval),
456            Role::Frontend => HeartbeatOptions::frontend_from(self.heartbeat_interval),
457            Role::Flownode => HeartbeatOptions::flownode_from(self.heartbeat_interval),
458        }
459    }
460}
461
462/// The value of the leader. It is used to store the leader's address.
463pub struct LeaderValue(pub String);
464
465impl<T: AsRef<[u8]>> From<T> for LeaderValue {
466    fn from(value: T) -> Self {
467        let string = String::from_utf8_lossy(value.as_ref());
468        Self(string.to_string())
469    }
470}
471
472#[derive(Debug, Clone, Serialize, Deserialize)]
473pub struct MetasrvNodeInfo {
474    // The metasrv's address
475    pub addr: String,
476    // The node build version
477    pub version: String,
478    // The node build git commit hash
479    pub git_commit: String,
480    // The node start timestamp in milliseconds
481    pub start_time_ms: u64,
482    // The node total cpu millicores
483    #[serde(default)]
484    pub total_cpu_millicores: i64,
485    // The node total memory bytes
486    #[serde(default)]
487    pub total_memory_bytes: i64,
488    /// The node build cpu usage millicores
489    #[serde(default)]
490    pub cpu_usage_millicores: i64,
491    /// The node build memory usage bytes
492    #[serde(default)]
493    pub memory_usage_bytes: i64,
494    // The node hostname
495    #[serde(default)]
496    pub hostname: String,
497}
498
499// TODO(zyy17): Allow deprecated fields for backward compatibility. Remove this when the deprecated top-level fields are removed from the proto.
500#[allow(deprecated)]
501impl From<MetasrvNodeInfo> for api::v1::meta::MetasrvNodeInfo {
502    fn from(node_info: MetasrvNodeInfo) -> Self {
503        Self {
504            peer: Some(api::v1::meta::Peer {
505                addr: node_info.addr,
506                ..Default::default()
507            }),
508            // TODO(zyy17): The following top-level fields are deprecated. They are kept for backward compatibility and will be removed in a future version.
509            // New code should use the fields in `info.NodeInfo` instead.
510            version: node_info.version.clone(),
511            git_commit: node_info.git_commit.clone(),
512            start_time_ms: node_info.start_time_ms,
513            cpus: node_info.total_cpu_millicores as u32,
514            memory_bytes: node_info.total_memory_bytes as u64,
515            // The canonical location for node information.
516            info: Some(api::v1::meta::NodeInfo {
517                version: node_info.version,
518                git_commit: node_info.git_commit,
519                start_time_ms: node_info.start_time_ms,
520                total_cpu_millicores: node_info.total_cpu_millicores,
521                total_memory_bytes: node_info.total_memory_bytes,
522                cpu_usage_millicores: node_info.cpu_usage_millicores,
523                memory_usage_bytes: node_info.memory_usage_bytes,
524                cpus: node_info.total_cpu_millicores as u32,
525                memory_bytes: node_info.total_memory_bytes as u64,
526                hostname: node_info.hostname,
527            }),
528        }
529    }
530}
531
532#[derive(Clone, Copy)]
533pub enum SelectTarget {
534    Datanode,
535    Flownode,
536}
537
538impl Display for SelectTarget {
539    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
540        match self {
541            SelectTarget::Datanode => write!(f, "datanode"),
542            SelectTarget::Flownode => write!(f, "flownode"),
543        }
544    }
545}
546
547#[derive(Clone)]
548pub struct SelectorContext {
549    pub peer_discovery: PeerDiscoveryRef,
550}
551
552pub type SelectorRef = Arc<dyn Selector<Context = SelectorContext, Output = Vec<Peer>>>;
553pub type RegionStatAwareSelectorRef =
554    Arc<dyn RegionStatAwareSelector<Context = SelectorContext, Output = Vec<(RegionId, Peer)>>>;
555pub type ElectionRef = Arc<dyn Election<Leader = LeaderValue>>;
556
557pub struct MetaStateHandler {
558    subscribe_manager: Option<SubscriptionManagerRef>,
559    greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
560    leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
561    leadership_change_notifier: LeadershipChangeNotifier,
562    mailbox: MailboxRef,
563    state: StateRef,
564}
565
566impl MetaStateHandler {
567    pub async fn on_leader_start(&self) {
568        self.state.write().unwrap().next_state(become_leader(false));
569
570        if let Err(e) = self.leader_cached_kv_backend.load().await {
571            error!(e; "Failed to load kv into leader cache kv store");
572        } else {
573            self.state.write().unwrap().next_state(become_leader(true));
574        }
575
576        self.leadership_change_notifier
577            .notify_on_leader_start()
578            .await;
579
580        self.greptimedb_telemetry_task.should_report(true);
581    }
582
583    pub async fn on_leader_stop(&self) {
584        self.state.write().unwrap().next_state(become_follower());
585
586        // Enforces the mailbox to clear all pushers.
587        // The remaining heartbeat connections will be closed by the remote peer or keep-alive detection.
588        self.mailbox.reset().await;
589        self.leadership_change_notifier
590            .notify_on_leader_stop()
591            .await;
592
593        // Suspends reporting.
594        self.greptimedb_telemetry_task.should_report(false);
595
596        if let Some(sub_manager) = self.subscribe_manager.clone() {
597            info!("Leader changed, un_subscribe all");
598            if let Err(e) = sub_manager.unsubscribe_all() {
599                error!(e; "Failed to un_subscribe all");
600            }
601        }
602    }
603}
604
605pub struct Metasrv {
606    state: StateRef,
607    started: Arc<AtomicBool>,
608    start_time_ms: u64,
609    options: MetasrvOptions,
610    // It is only valid at the leader node and is used to temporarily
611    // store some data that will not be persisted.
612    in_memory: ResettableKvBackendRef,
613    kv_backend: KvBackendRef,
614    leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
615    meta_peer_client: MetaPeerClientRef,
616    // The selector is used to select a target datanode.
617    selector: SelectorRef,
618    selector_ctx: SelectorContext,
619    // The flow selector is used to select a target flownode.
620    flow_selector: SelectorRef,
621    handler_group: RwLock<Option<HeartbeatHandlerGroupRef>>,
622    handler_group_builder: Mutex<Option<HeartbeatHandlerGroupBuilder>>,
623    election: Option<ElectionRef>,
624    procedure_manager: ProcedureManagerRef,
625    mailbox: MailboxRef,
626    ddl_manager: DdlManagerRef,
627    wal_provider: WalProviderRef,
628    table_metadata_manager: TableMetadataManagerRef,
629    runtime_switch_manager: RuntimeSwitchManagerRef,
630    memory_region_keeper: MemoryRegionKeeperRef,
631    greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
632    region_migration_manager: RegionMigrationManagerRef,
633    region_supervisor_ticker: Option<RegionSupervisorTickerRef>,
634    cache_invalidator: CacheInvalidatorRef,
635    leader_region_registry: LeaderRegionRegistryRef,
636    topic_stats_registry: TopicStatsRegistryRef,
637    wal_prune_ticker: Option<WalPruneTickerRef>,
638    region_flush_ticker: Option<RegionFlushTickerRef>,
639    table_id_allocator: ResourceIdAllocatorRef,
640    reconciliation_manager: ReconciliationManagerRef,
641    resource_stat: ResourceStatRef,
642    gc_ticker: Option<GcTickerRef>,
643
644    plugins: Plugins,
645}
646
647impl Metasrv {
648    pub async fn try_start(&self) -> Result<()> {
649        if self
650            .started
651            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
652            .is_err()
653        {
654            warn!("Metasrv already started");
655            return Ok(());
656        }
657
658        let handler_group_builder =
659            self.handler_group_builder
660                .lock()
661                .unwrap()
662                .take()
663                .context(error::UnexpectedSnafu {
664                    violated: "expected heartbeat handler group builder",
665                })?;
666        *self.handler_group.write().unwrap() = Some(Arc::new(handler_group_builder.build()?));
667
668        // Creates default schema if not exists
669        self.table_metadata_manager
670            .init()
671            .await
672            .context(InitMetadataSnafu)?;
673
674        if let Some(election) = self.election() {
675            let procedure_manager = self.procedure_manager.clone();
676            let in_memory = self.in_memory.clone();
677            let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
678            let subscribe_manager = self.subscription_manager();
679            let mut rx = election.subscribe_leader_change();
680            let greptimedb_telemetry_task = self.greptimedb_telemetry_task.clone();
681            greptimedb_telemetry_task
682                .start()
683                .context(StartTelemetryTaskSnafu)?;
684
685            // Builds leadership change notifier.
686            let mut leadership_change_notifier = LeadershipChangeNotifier::default();
687            leadership_change_notifier.add_listener(self.wal_provider.clone());
688            leadership_change_notifier
689                .add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager)));
690            leadership_change_notifier.add_listener(Arc::new(NodeExpiryListener::new(
691                self.options.node_max_idle_time,
692                self.in_memory.clone(),
693            )));
694            if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker {
695                leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _);
696            }
697            if let Some(wal_prune_ticker) = &self.wal_prune_ticker {
698                leadership_change_notifier.add_listener(wal_prune_ticker.clone() as _);
699            }
700            if let Some(region_flush_trigger) = &self.region_flush_ticker {
701                leadership_change_notifier.add_listener(region_flush_trigger.clone() as _);
702            }
703            if let Some(gc_ticker) = &self.gc_ticker {
704                leadership_change_notifier.add_listener(gc_ticker.clone() as _);
705            }
706            if let Some(customizer) = self.plugins.get::<LeadershipChangeNotifierCustomizerRef>() {
707                customizer.customize(&mut leadership_change_notifier);
708            }
709
710            let state_handler = MetaStateHandler {
711                greptimedb_telemetry_task,
712                subscribe_manager,
713                state: self.state.clone(),
714                leader_cached_kv_backend: leader_cached_kv_backend.clone(),
715                leadership_change_notifier,
716                mailbox: self.mailbox.clone(),
717            };
718            let _handle = common_runtime::spawn_global(async move {
719                loop {
720                    match rx.recv().await {
721                        Ok(msg) => {
722                            in_memory.reset();
723                            leader_cached_kv_backend.reset();
724                            info!("Leader's cache has bean cleared on leader change: {msg}");
725                            match msg {
726                                LeaderChangeMessage::Elected(_) => {
727                                    state_handler.on_leader_start().await;
728                                }
729                                LeaderChangeMessage::StepDown(leader) => {
730                                    error!("Leader :{:?} step down", leader);
731
732                                    state_handler.on_leader_stop().await;
733                                }
734                            }
735                        }
736                        Err(RecvError::Closed) => {
737                            error!("Not expected, is leader election loop still running?");
738                            break;
739                        }
740                        Err(RecvError::Lagged(_)) => {
741                            break;
742                        }
743                    }
744                }
745
746                state_handler.on_leader_stop().await;
747            });
748
749            // Register candidate and keep lease in background.
750            {
751                let election = election.clone();
752                let started = self.started.clone();
753                let node_info = self.node_info();
754                let _handle = common_runtime::spawn_global(async move {
755                    while started.load(Ordering::Acquire) {
756                        let res = election.register_candidate(&node_info).await;
757                        if let Err(e) = res {
758                            warn!(e; "Metasrv register candidate error");
759                        }
760                    }
761                });
762            }
763
764            // Campaign
765            {
766                let election = election.clone();
767                let started = self.started.clone();
768                let _handle = common_runtime::spawn_global(async move {
769                    while started.load(Ordering::Acquire) {
770                        let res = election.campaign().await;
771                        if let Err(e) = res {
772                            warn!(e; "Metasrv election error");
773                        }
774                        election.reset_campaign().await;
775                        info!("Metasrv re-initiate election");
776                    }
777                    info!("Metasrv stopped");
778                });
779            }
780        } else {
781            warn!(
782                "Ensure only one instance of Metasrv is running, as there is no election service."
783            );
784
785            if let Err(e) = self.wal_provider.start().await {
786                error!(e; "Failed to start wal provider");
787            }
788            // Always load kv into cached kv store.
789            self.leader_cached_kv_backend
790                .load()
791                .await
792                .context(KvBackendSnafu)?;
793            self.procedure_manager
794                .start()
795                .await
796                .context(StartProcedureManagerSnafu)?;
797        }
798
799        info!("Metasrv started");
800
801        Ok(())
802    }
803
804    pub async fn shutdown(&self) -> Result<()> {
805        if self
806            .started
807            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
808            .is_err()
809        {
810            warn!("Metasrv already stopped");
811            return Ok(());
812        }
813
814        self.procedure_manager
815            .stop()
816            .await
817            .context(StopProcedureManagerSnafu)?;
818
819        info!("Metasrv stopped");
820
821        Ok(())
822    }
823
824    pub fn start_time_ms(&self) -> u64 {
825        self.start_time_ms
826    }
827
828    pub fn resource_stat(&self) -> &ResourceStatRef {
829        &self.resource_stat
830    }
831
832    pub fn node_info(&self) -> MetasrvNodeInfo {
833        let build_info = common_version::build_info();
834        MetasrvNodeInfo {
835            addr: self.options().grpc.server_addr.clone(),
836            version: build_info.version.to_string(),
837            git_commit: build_info.commit_short.to_string(),
838            start_time_ms: self.start_time_ms(),
839            total_cpu_millicores: self.resource_stat.get_total_cpu_millicores(),
840            total_memory_bytes: self.resource_stat.get_total_memory_bytes(),
841            cpu_usage_millicores: self.resource_stat.get_cpu_usage_millicores(),
842            memory_usage_bytes: self.resource_stat.get_memory_usage_bytes(),
843            hostname: hostname::get()
844                .unwrap_or_default()
845                .to_string_lossy()
846                .to_string(),
847        }
848    }
849
850    /// Looks up a datanode peer by peer_id, returning it only when it's alive.
851    /// A datanode is considered alive when it's still within the lease period.
852    pub(crate) async fn lookup_datanode_peer(&self, peer_id: u64) -> Result<Option<Peer>> {
853        discovery::utils::alive_datanode(
854            &DefaultSystemTimer,
855            self.meta_peer_client.as_ref(),
856            peer_id,
857            default_distributed_time_constants().datanode_lease,
858        )
859        .await
860    }
861
862    pub fn options(&self) -> &MetasrvOptions {
863        &self.options
864    }
865
866    pub fn in_memory(&self) -> &ResettableKvBackendRef {
867        &self.in_memory
868    }
869
870    pub fn kv_backend(&self) -> &KvBackendRef {
871        &self.kv_backend
872    }
873
874    pub fn meta_peer_client(&self) -> &MetaPeerClientRef {
875        &self.meta_peer_client
876    }
877
878    pub fn selector(&self) -> &SelectorRef {
879        &self.selector
880    }
881
882    pub fn selector_ctx(&self) -> &SelectorContext {
883        &self.selector_ctx
884    }
885
886    pub fn flow_selector(&self) -> &SelectorRef {
887        &self.flow_selector
888    }
889
890    pub fn handler_group(&self) -> Option<HeartbeatHandlerGroupRef> {
891        self.handler_group.read().unwrap().clone()
892    }
893
894    pub fn election(&self) -> Option<&ElectionRef> {
895        self.election.as_ref()
896    }
897
898    pub fn mailbox(&self) -> &MailboxRef {
899        &self.mailbox
900    }
901
902    pub fn ddl_manager(&self) -> &DdlManagerRef {
903        &self.ddl_manager
904    }
905
906    pub fn procedure_manager(&self) -> &ProcedureManagerRef {
907        &self.procedure_manager
908    }
909
910    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
911        &self.table_metadata_manager
912    }
913
914    pub fn runtime_switch_manager(&self) -> &RuntimeSwitchManagerRef {
915        &self.runtime_switch_manager
916    }
917
918    pub fn memory_region_keeper(&self) -> &MemoryRegionKeeperRef {
919        &self.memory_region_keeper
920    }
921
922    pub fn region_migration_manager(&self) -> &RegionMigrationManagerRef {
923        &self.region_migration_manager
924    }
925
926    pub fn publish(&self) -> Option<PublisherRef> {
927        self.plugins.get::<PublisherRef>()
928    }
929
930    pub fn subscription_manager(&self) -> Option<SubscriptionManagerRef> {
931        self.plugins.get::<SubscriptionManagerRef>()
932    }
933
934    pub fn table_id_allocator(&self) -> &ResourceIdAllocatorRef {
935        &self.table_id_allocator
936    }
937
938    pub fn reconciliation_manager(&self) -> &ReconciliationManagerRef {
939        &self.reconciliation_manager
940    }
941
942    pub fn plugins(&self) -> &Plugins {
943        &self.plugins
944    }
945
946    pub fn started(&self) -> Arc<AtomicBool> {
947        self.started.clone()
948    }
949
950    pub fn gc_ticker(&self) -> Option<GcTickerRef> {
951        self.gc_ticker.as_ref().cloned()
952    }
953
954    #[inline]
955    pub fn new_ctx(&self) -> Context {
956        let server_addr = self.options().grpc.server_addr.clone();
957        let in_memory = self.in_memory.clone();
958        let kv_backend = self.kv_backend.clone();
959        let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
960        let meta_peer_client = self.meta_peer_client.clone();
961        let mailbox = self.mailbox.clone();
962        let election = self.election.clone();
963        let table_metadata_manager = self.table_metadata_manager.clone();
964        let cache_invalidator = self.cache_invalidator.clone();
965        let leader_region_registry = self.leader_region_registry.clone();
966        let topic_stats_registry = self.topic_stats_registry.clone();
967
968        Context {
969            server_addr,
970            in_memory,
971            kv_backend,
972            leader_cached_kv_backend,
973            meta_peer_client,
974            mailbox,
975            election,
976            is_infancy: false,
977            table_metadata_manager,
978            cache_invalidator,
979            leader_region_registry,
980            topic_stats_registry,
981            heartbeat_interval: self.options().heartbeat_interval,
982            is_handshake: false,
983        }
984    }
985}
986
987#[cfg(test)]
988mod tests {
989    use crate::metasrv::MetasrvNodeInfo;
990
991    #[test]
992    fn test_deserialize_metasrv_node_info() {
993        let str = r#"{"addr":"127.0.0.1:4002","version":"0.1.0","git_commit":"1234567890","start_time_ms":1715145600}"#;
994        let node_info: MetasrvNodeInfo = serde_json::from_str(str).unwrap();
995        assert_eq!(node_info.addr, "127.0.0.1:4002");
996        assert_eq!(node_info.version, "0.1.0");
997        assert_eq!(node_info.git_commit, "1234567890");
998        assert_eq!(node_info.start_time_ms, 1715145600);
999    }
1000}