Skip to main content

meta_srv/
metasrv.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16
17use std::fmt::{self, Display};
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::sync::{Arc, Mutex, RwLock};
20use std::time::Duration;
21
22use api::v1::meta::{HeartbeatConfig, Role};
23use clap::ValueEnum;
24use common_base::Plugins;
25use common_base::readable_size::ReadableSize;
26use common_config::{Configurable, DEFAULT_DATA_HOME};
27use common_event_recorder::EventRecorderOptions;
28use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
29use common_meta::cache_invalidator::CacheInvalidatorRef;
30use common_meta::ddl::allocator::resource_id::ResourceIdAllocatorRef;
31use common_meta::ddl_manager::DdlManagerRef;
32use common_meta::distributed_time_constants::{
33    self, BASE_HEARTBEAT_INTERVAL, default_distributed_time_constants, frontend_heartbeat_interval,
34};
35use common_meta::election::LeaderChangeMessage;
36pub use common_meta::election::{ElectionRef, MetasrvNodeInfo};
37use common_meta::key::TableMetadataManagerRef;
38use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
39use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
40use common_meta::leadership_notifier::{
41    LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef,
42};
43use common_meta::node_expiry_listener::NodeExpiryListener;
44use common_meta::peer::{Peer, PeerDiscoveryRef};
45use common_meta::reconciliation::manager::ReconciliationManagerRef;
46use common_meta::region_keeper::MemoryRegionKeeperRef;
47use common_meta::region_registry::LeaderRegionRegistryRef;
48use common_meta::stats::topic::TopicStatsRegistryRef;
49use common_meta::wal_provider::WalProviderRef;
50use common_options::datanode::DatanodeClientOptions;
51use common_options::memory::MemoryOptions;
52use common_procedure::ProcedureManagerRef;
53use common_procedure::options::ProcedureConfig;
54use common_stat::ResourceStatRef;
55use common_telemetry::logging::{LoggingOptions, TracingOptions};
56use common_telemetry::{error, info, warn};
57use common_time::util::DefaultSystemTimer;
58use common_wal::config::MetasrvWalConfig;
59use serde::{Deserialize, Serialize};
60use servers::grpc::GrpcOptions;
61use servers::http::HttpOptions;
62use servers::tls::TlsOption;
63use snafu::{OptionExt, ResultExt};
64use store_api::storage::RegionId;
65use tokio::sync::broadcast::error::RecvError;
66
67use crate::cluster::MetaPeerClientRef;
68use crate::discovery;
69use crate::error::{
70    self, InitMetadataSnafu, KvBackendSnafu, Result, StartProcedureManagerSnafu,
71    StartTelemetryTaskSnafu, StopProcedureManagerSnafu,
72};
73use crate::failure_detector::PhiAccrualFailureDetectorOptions;
74use crate::gc::{GcSchedulerOptions, GcTickerRef};
75use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatHandlerGroupRef};
76use crate::procedure::ProcedureManagerListenerAdapter;
77use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
78use crate::procedure::wal_prune::manager::WalPruneTickerRef;
79use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
80use crate::region::flush_trigger::RegionFlushTickerRef;
81use crate::region::supervisor::RegionSupervisorTickerRef;
82use crate::selector::{RegionStatAwareSelector, Selector, SelectorType};
83use crate::service::mailbox::MailboxRef;
84use crate::service::store::cached_kv::LeaderCachedKvBackend;
85use crate::state::{StateRef, become_follower, become_leader};
86use crate::utils::database::DatabaseOperatorRef;
87
88pub const TABLE_ID_SEQ: &str = "table_id";
89pub const FLOW_ID_SEQ: &str = "flow_id";
90pub const METASRV_DATA_DIR: &str = "metasrv";
91
92// The datastores that implements metadata kvbackend.
93#[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)]
94#[serde(rename_all = "snake_case")]
95pub enum BackendImpl {
96    // Etcd as metadata storage.
97    #[default]
98    EtcdStore,
99    // In memory metadata storage - mostly used for testing.
100    MemoryStore,
101    #[cfg(feature = "pg_kvbackend")]
102    // Postgres as metadata storage.
103    PostgresStore,
104    #[cfg(feature = "mysql_kvbackend")]
105    // MySql as metadata storage.
106    MysqlStore,
107}
108
109/// Configuration options for the stats persistence.
110#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
111pub struct StatsPersistenceOptions {
112    /// TTL for the stats table that will be used to store the stats.
113    #[serde(with = "humantime_serde")]
114    pub ttl: Duration,
115    /// The interval to persist the stats.
116    #[serde(with = "humantime_serde")]
117    pub interval: Duration,
118}
119
120impl Default for StatsPersistenceOptions {
121    fn default() -> Self {
122        Self {
123            ttl: Duration::ZERO,
124            interval: Duration::from_mins(10),
125        }
126    }
127}
128
129/// Heartbeat configuration for a single node type.
130#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
131#[serde(default)]
132pub struct HeartbeatOptions {
133    /// Heartbeat interval.
134    #[serde(with = "humantime_serde")]
135    pub interval: Duration,
136    /// Retry interval when heartbeat connection fails.
137    #[serde(with = "humantime_serde")]
138    pub retry_interval: Duration,
139}
140
141impl Default for HeartbeatOptions {
142    fn default() -> Self {
143        Self {
144            interval: BASE_HEARTBEAT_INTERVAL,
145            retry_interval: BASE_HEARTBEAT_INTERVAL,
146        }
147    }
148}
149
150impl HeartbeatOptions {
151    pub fn datanode_from(base_interval: Duration) -> Self {
152        Self {
153            interval: base_interval,
154            retry_interval: base_interval,
155        }
156    }
157
158    pub fn frontend_from(base_interval: Duration) -> Self {
159        Self {
160            interval: frontend_heartbeat_interval(base_interval),
161            retry_interval: base_interval,
162        }
163    }
164
165    pub fn flownode_from(base_interval: Duration) -> Self {
166        Self {
167            interval: base_interval,
168            retry_interval: base_interval,
169        }
170    }
171}
172
173impl From<HeartbeatOptions> for HeartbeatConfig {
174    fn from(opts: HeartbeatOptions) -> Self {
175        Self {
176            heartbeat_interval_ms: opts.interval.as_millis() as u64,
177            retry_interval_ms: opts.retry_interval.as_millis() as u64,
178        }
179    }
180}
181
182#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
183#[serde(default)]
184pub struct BackendClientOptions {
185    #[serde(with = "humantime_serde")]
186    pub keep_alive_timeout: Duration,
187    #[serde(with = "humantime_serde")]
188    pub keep_alive_interval: Duration,
189    #[serde(with = "humantime_serde")]
190    pub connect_timeout: Duration,
191}
192
193impl Default for BackendClientOptions {
194    fn default() -> Self {
195        Self {
196            keep_alive_interval: Duration::from_secs(10),
197            keep_alive_timeout: Duration::from_secs(3),
198            connect_timeout: Duration::from_secs(3),
199        }
200    }
201}
202
203#[derive(Clone, PartialEq, Serialize, Deserialize)]
204#[serde(default)]
205pub struct MetasrvOptions {
206    /// The address the server listens on.
207    #[deprecated(note = "Use grpc.bind_addr instead")]
208    pub bind_addr: String,
209    /// The address the server advertises to the clients.
210    #[deprecated(note = "Use grpc.server_addr instead")]
211    pub server_addr: String,
212    /// The address of the store, e.g., etcd.
213    pub store_addrs: Vec<String>,
214    /// TLS configuration for kv store backend (PostgreSQL/MySQL)
215    /// Only applicable when using PostgreSQL or MySQL as the metadata store
216    #[serde(default)]
217    pub backend_tls: Option<TlsOption>,
218    /// The backend client options.
219    /// Currently, only applicable when using etcd as the metadata store.
220    #[serde(default)]
221    pub backend_client: BackendClientOptions,
222    /// The type of selector.
223    pub selector: SelectorType,
224    /// Whether to enable region failover.
225    pub enable_region_failover: bool,
226    /// The base heartbeat interval.
227    ///
228    /// This value is used to calculate the distributed time constants for components.
229    /// e.g., the region lease time is `heartbeat_interval * 3 + Duration::from_secs(1)`.
230    #[serde(with = "humantime_serde")]
231    pub heartbeat_interval: Duration,
232    /// The delay before starting region failure detection.
233    /// This delay helps prevent Metasrv from triggering unnecessary region failovers before all Datanodes are fully started.
234    /// Especially useful when the cluster is not deployed with GreptimeDB Operator and maintenance mode is not enabled.
235    #[serde(with = "humantime_serde")]
236    pub region_failure_detector_initialization_delay: Duration,
237    /// Whether to allow region failover on local WAL.
238    ///
239    /// If it's true, the region failover will be allowed even if the local WAL is used.
240    /// Note that this option is not recommended to be set to true, because it may lead to data loss during failover.
241    pub allow_region_failover_on_local_wal: bool,
242    pub grpc: GrpcOptions,
243    /// The HTTP server options.
244    pub http: HttpOptions,
245    /// The logging options.
246    pub logging: LoggingOptions,
247    /// The procedure options.
248    pub procedure: ProcedureConfig,
249    /// The failure detector options.
250    pub failure_detector: PhiAccrualFailureDetectorOptions,
251    /// The datanode options.
252    pub datanode: DatanodeClientOptions,
253    /// Whether to enable telemetry.
254    pub enable_telemetry: bool,
255    /// The data home directory.
256    pub data_home: String,
257    /// The WAL options.
258    pub wal: MetasrvWalConfig,
259    /// The store key prefix. If it is not empty, all keys in the store will be prefixed with it.
260    /// This is useful when multiple metasrv clusters share the same store.
261    pub store_key_prefix: String,
262    /// The max operations per txn
263    ///
264    /// This value is usually limited by which store is used for the `KvBackend`.
265    /// For example, if using etcd, this value should ensure that it is less than
266    /// or equal to the `--max-txn-ops` option value of etcd.
267    ///
268    /// TODO(jeremy): Currently, this option only affects the etcd store, but it may
269    /// also affect other stores in the future. In other words, each store needs to
270    /// limit the number of operations in a txn because an infinitely large txn could
271    /// potentially block other operations.
272    pub max_txn_ops: usize,
273    /// The factor that determines how often statistics should be flushed,
274    /// based on the number of received heartbeats. When the number of heartbeats
275    /// reaches this factor, a flush operation is triggered.
276    pub flush_stats_factor: usize,
277    /// The tracing options.
278    pub tracing: TracingOptions,
279    /// The memory options.
280    pub memory: MemoryOptions,
281    /// The datastore for kv metadata.
282    pub backend: BackendImpl,
283    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
284    /// Table name of rds kv backend.
285    pub meta_table_name: String,
286    #[cfg(feature = "pg_kvbackend")]
287    /// Lock id for meta kv election. Only effect when using pg_kvbackend.
288    pub meta_election_lock_id: u64,
289    #[cfg(feature = "pg_kvbackend")]
290    /// Optional PostgreSQL schema for metadata table (defaults to current search_path if empty).
291    pub meta_schema_name: Option<String>,
292    #[cfg(feature = "pg_kvbackend")]
293    /// Automatically create PostgreSQL schema if it doesn't exist (default: true).
294    pub auto_create_schema: bool,
295    #[serde(with = "humantime_serde")]
296    pub node_max_idle_time: Duration,
297    /// The event recorder options.
298    pub event_recorder: EventRecorderOptions,
299    /// The stats persistence options.
300    pub stats_persistence: StatsPersistenceOptions,
301    /// The GC scheduler options.
302    pub gc: GcSchedulerOptions,
303}
304
305impl fmt::Debug for MetasrvOptions {
306    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
307        let mut debug_struct = f.debug_struct("MetasrvOptions");
308        debug_struct
309            .field("store_addrs", &self.sanitize_store_addrs())
310            .field("backend_tls", &self.backend_tls)
311            .field("selector", &self.selector)
312            .field("enable_region_failover", &self.enable_region_failover)
313            .field(
314                "allow_region_failover_on_local_wal",
315                &self.allow_region_failover_on_local_wal,
316            )
317            .field("grpc", &self.grpc)
318            .field("http", &self.http)
319            .field("logging", &self.logging)
320            .field("procedure", &self.procedure)
321            .field("failure_detector", &self.failure_detector)
322            .field("datanode", &self.datanode)
323            .field("enable_telemetry", &self.enable_telemetry)
324            .field("data_home", &self.data_home)
325            .field("wal", &self.wal)
326            .field("store_key_prefix", &self.store_key_prefix)
327            .field("max_txn_ops", &self.max_txn_ops)
328            .field("flush_stats_factor", &self.flush_stats_factor)
329            .field("tracing", &self.tracing)
330            .field("backend", &self.backend)
331            .field("event_recorder", &self.event_recorder)
332            .field("stats_persistence", &self.stats_persistence)
333            .field("heartbeat_interval", &self.heartbeat_interval)
334            .field("backend_client", &self.backend_client);
335
336        #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
337        debug_struct.field("meta_table_name", &self.meta_table_name);
338
339        #[cfg(feature = "pg_kvbackend")]
340        debug_struct.field("meta_election_lock_id", &self.meta_election_lock_id);
341        #[cfg(feature = "pg_kvbackend")]
342        debug_struct.field("meta_schema_name", &self.meta_schema_name);
343
344        debug_struct
345            .field("node_max_idle_time", &self.node_max_idle_time)
346            .finish()
347    }
348}
349
350const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
351
352impl Default for MetasrvOptions {
353    fn default() -> Self {
354        Self {
355            #[allow(deprecated)]
356            bind_addr: String::new(),
357            #[allow(deprecated)]
358            server_addr: String::new(),
359            store_addrs: vec!["127.0.0.1:2379".to_string()],
360            backend_tls: Some(TlsOption::prefer()),
361            selector: SelectorType::default(),
362            enable_region_failover: false,
363            heartbeat_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
364            region_failure_detector_initialization_delay: Duration::from_secs(10 * 60),
365            allow_region_failover_on_local_wal: false,
366            grpc: GrpcOptions {
367                bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
368                ..Default::default()
369            },
370            http: HttpOptions::default(),
371            logging: LoggingOptions::default(),
372            procedure: ProcedureConfig {
373                max_retry_times: 12,
374                retry_delay: Duration::from_millis(500),
375                // The etcd the maximum size of any request is 1.5 MiB
376                // 1500KiB = 1536KiB (1.5MiB) - 36KiB (reserved size of key)
377                max_metadata_value_size: Some(ReadableSize::kb(1500)),
378                max_running_procedures: 128,
379            },
380            failure_detector: PhiAccrualFailureDetectorOptions::default(),
381            datanode: DatanodeClientOptions::default(),
382            enable_telemetry: true,
383            data_home: DEFAULT_DATA_HOME.to_string(),
384            wal: MetasrvWalConfig::default(),
385            store_key_prefix: String::new(),
386            max_txn_ops: 128,
387            flush_stats_factor: 3,
388            tracing: TracingOptions::default(),
389            memory: MemoryOptions::default(),
390            backend: BackendImpl::EtcdStore,
391            #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
392            meta_table_name: common_meta::kv_backend::DEFAULT_META_TABLE_NAME.to_string(),
393            #[cfg(feature = "pg_kvbackend")]
394            meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID,
395            #[cfg(feature = "pg_kvbackend")]
396            meta_schema_name: None,
397            #[cfg(feature = "pg_kvbackend")]
398            auto_create_schema: true,
399            node_max_idle_time: Duration::from_secs(24 * 60 * 60),
400            event_recorder: EventRecorderOptions::default(),
401            stats_persistence: StatsPersistenceOptions::default(),
402            gc: GcSchedulerOptions::default(),
403            backend_client: BackendClientOptions::default(),
404        }
405    }
406}
407
408impl Configurable for MetasrvOptions {
409    fn env_list_keys() -> Option<&'static [&'static str]> {
410        Some(&["wal.broker_endpoints", "store_addrs"])
411    }
412}
413
414impl MetasrvOptions {
415    fn sanitize_store_addrs(&self) -> Vec<String> {
416        self.store_addrs
417            .iter()
418            .map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
419            .collect()
420    }
421}
422
423pub struct MetasrvInfo {
424    pub server_addr: String,
425}
426#[derive(Clone)]
427pub struct Context {
428    pub server_addr: String,
429    pub in_memory: ResettableKvBackendRef,
430    pub kv_backend: KvBackendRef,
431    pub leader_cached_kv_backend: ResettableKvBackendRef,
432    pub meta_peer_client: MetaPeerClientRef,
433    pub mailbox: MailboxRef,
434    pub election: Option<ElectionRef>,
435    pub is_infancy: bool,
436    pub table_metadata_manager: TableMetadataManagerRef,
437    pub cache_invalidator: CacheInvalidatorRef,
438    pub leader_region_registry: LeaderRegionRegistryRef,
439    pub topic_stats_registry: TopicStatsRegistryRef,
440    pub heartbeat_interval: Duration,
441    pub is_handshake: bool,
442}
443
444impl Context {
445    pub fn reset_in_memory(&self) {
446        self.in_memory.reset();
447        self.leader_region_registry.reset();
448    }
449
450    pub fn with_handshake(mut self, is_handshake: bool) -> Self {
451        self.is_handshake = is_handshake;
452        self
453    }
454
455    pub fn heartbeat_options_for(&self, role: Role) -> HeartbeatOptions {
456        match role {
457            Role::Datanode => HeartbeatOptions::datanode_from(self.heartbeat_interval),
458            Role::Frontend => HeartbeatOptions::frontend_from(self.heartbeat_interval),
459            Role::Flownode => HeartbeatOptions::flownode_from(self.heartbeat_interval),
460        }
461    }
462}
463
464#[derive(Clone, Copy)]
465pub enum SelectTarget {
466    Datanode,
467    Flownode,
468}
469
470impl Display for SelectTarget {
471    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
472        match self {
473            SelectTarget::Datanode => write!(f, "datanode"),
474            SelectTarget::Flownode => write!(f, "flownode"),
475        }
476    }
477}
478
479#[derive(Clone)]
480pub struct SelectorContext {
481    pub peer_discovery: PeerDiscoveryRef,
482}
483
484pub type SelectorRef = Arc<dyn Selector<Context = SelectorContext, Output = Vec<Peer>>>;
485
486/// Context passed to a selector factory during metasrv bootstrap.
487///
488/// The factory runs after bootstrap has constructed the selector configured by
489/// [`MetasrvOptions::selector`], so plugins can either decorate `base_selector` or
490/// build a completely different selector using bootstrap-only dependencies like
491/// [`MetaPeerClientRef`].
492pub struct SelectorFactoryContext {
493    pub metasrv_options: MetasrvOptions,
494    pub meta_peer_client: MetaPeerClientRef,
495    pub in_memory: ResettableKvBackendRef,
496    pub election: Option<ElectionRef>,
497    pub base_selector: SelectorRef,
498}
499
500/// Builds the final datanode selector metasrv should use.
501pub trait SelectorFactory: Send + Sync {
502    fn build(&self, ctx: SelectorFactoryContext) -> SelectorRef;
503}
504
505/// Shared selector factory plugin registered through [`common_base::Plugins`].
506pub type SelectorFactoryRef = Arc<dyn SelectorFactory>;
507pub type RegionStatAwareSelectorRef =
508    Arc<dyn RegionStatAwareSelector<Context = SelectorContext, Output = Vec<(RegionId, Peer)>>>;
509
510pub struct MetaStateHandler {
511    subscribe_manager: Option<SubscriptionManagerRef>,
512    greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
513    leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
514    leadership_change_notifier: LeadershipChangeNotifier,
515    state: StateRef,
516}
517
518impl MetaStateHandler {
519    pub async fn on_leader_start(&self) {
520        self.state.write().unwrap().next_state(become_leader(false));
521
522        if let Err(e) = self.leader_cached_kv_backend.load().await {
523            error!(e; "Failed to load kv into leader cache kv store");
524        } else {
525            self.state.write().unwrap().next_state(become_leader(true));
526        }
527
528        self.leadership_change_notifier
529            .notify_on_leader_start()
530            .await;
531
532        self.greptimedb_telemetry_task.should_report(true);
533    }
534
535    pub async fn on_leader_stop(&self) {
536        self.state.write().unwrap().next_state(become_follower());
537
538        self.leadership_change_notifier
539            .notify_on_leader_stop()
540            .await;
541
542        // Suspends reporting.
543        self.greptimedb_telemetry_task.should_report(false);
544
545        if let Some(sub_manager) = self.subscribe_manager.clone() {
546            info!("Leader changed, un_subscribe all");
547            if let Err(e) = sub_manager.unsubscribe_all() {
548                error!(e; "Failed to un_subscribe all");
549            }
550        }
551    }
552}
553
554pub struct Metasrv {
555    state: StateRef,
556    started: Arc<AtomicBool>,
557    start_time_ms: u64,
558    options: MetasrvOptions,
559    // It is only valid at the leader node and is used to temporarily
560    // store some data that will not be persisted.
561    in_memory: ResettableKvBackendRef,
562    kv_backend: KvBackendRef,
563    leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
564    meta_peer_client: MetaPeerClientRef,
565    // The selector is used to select a target datanode.
566    selector: SelectorRef,
567    selector_ctx: SelectorContext,
568    // The flow selector is used to select a target flownode.
569    flow_selector: SelectorRef,
570    handler_group: RwLock<Option<HeartbeatHandlerGroupRef>>,
571    handler_group_builder: Mutex<Option<HeartbeatHandlerGroupBuilder>>,
572    election: Option<ElectionRef>,
573    procedure_manager: ProcedureManagerRef,
574    mailbox: MailboxRef,
575    ddl_manager: DdlManagerRef,
576    wal_provider: WalProviderRef,
577    table_metadata_manager: TableMetadataManagerRef,
578    runtime_switch_manager: RuntimeSwitchManagerRef,
579    memory_region_keeper: MemoryRegionKeeperRef,
580    greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
581    region_migration_manager: RegionMigrationManagerRef,
582    region_supervisor_ticker: Option<RegionSupervisorTickerRef>,
583    cache_invalidator: CacheInvalidatorRef,
584    leader_region_registry: LeaderRegionRegistryRef,
585    topic_stats_registry: TopicStatsRegistryRef,
586    wal_prune_ticker: Option<WalPruneTickerRef>,
587    region_flush_ticker: Option<RegionFlushTickerRef>,
588    table_id_allocator: ResourceIdAllocatorRef,
589    reconciliation_manager: ReconciliationManagerRef,
590    resource_stat: ResourceStatRef,
591    gc_ticker: Option<GcTickerRef>,
592    database_operator: DatabaseOperatorRef,
593
594    plugins: Plugins,
595}
596
597impl Metasrv {
598    pub async fn try_start(&self) -> Result<()> {
599        if self
600            .started
601            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
602            .is_err()
603        {
604            warn!("Metasrv already started");
605            return Ok(());
606        }
607
608        let handler_group_builder =
609            self.handler_group_builder
610                .lock()
611                .unwrap()
612                .take()
613                .context(error::UnexpectedSnafu {
614                    violated: "expected heartbeat handler group builder",
615                })?;
616        *self.handler_group.write().unwrap() = Some(Arc::new(handler_group_builder.build()?));
617
618        // Creates default schema if not exists
619        self.table_metadata_manager
620            .init()
621            .await
622            .context(InitMetadataSnafu)?;
623
624        if let Some(election) = self.election() {
625            let procedure_manager = self.procedure_manager.clone();
626            let in_memory = self.in_memory.clone();
627            let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
628            let subscribe_manager = self.subscription_manager();
629            let mut rx = election.subscribe_leader_change();
630            let greptimedb_telemetry_task = self.greptimedb_telemetry_task.clone();
631            greptimedb_telemetry_task
632                .start()
633                .context(StartTelemetryTaskSnafu)?;
634
635            // Builds leadership change notifier.
636            let mut leadership_change_notifier = LeadershipChangeNotifier::default();
637            leadership_change_notifier.add_listener(self.wal_provider.clone());
638            leadership_change_notifier
639                .add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager)));
640            leadership_change_notifier.add_listener(Arc::new(NodeExpiryListener::new(
641                self.options.node_max_idle_time,
642                self.in_memory.clone(),
643            )));
644            if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker {
645                leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _);
646            }
647            if let Some(wal_prune_ticker) = &self.wal_prune_ticker {
648                leadership_change_notifier.add_listener(wal_prune_ticker.clone() as _);
649            }
650            if let Some(region_flush_trigger) = &self.region_flush_ticker {
651                leadership_change_notifier.add_listener(region_flush_trigger.clone() as _);
652            }
653            if let Some(gc_ticker) = &self.gc_ticker {
654                leadership_change_notifier.add_listener(gc_ticker.clone() as _);
655            }
656            if let Some(customizer) = self.plugins.get::<LeadershipChangeNotifierCustomizerRef>() {
657                customizer.customize(&mut leadership_change_notifier);
658            }
659
660            let state_handler = MetaStateHandler {
661                greptimedb_telemetry_task,
662                subscribe_manager,
663                state: self.state.clone(),
664                leader_cached_kv_backend: leader_cached_kv_backend.clone(),
665                leadership_change_notifier,
666            };
667            let _handle = common_runtime::spawn_global(async move {
668                loop {
669                    match rx.recv().await {
670                        Ok(msg) => {
671                            in_memory.reset();
672                            leader_cached_kv_backend.reset();
673                            info!("Leader's cache has bean cleared on leader change: {msg}");
674                            match msg {
675                                LeaderChangeMessage::Elected(_) => {
676                                    state_handler.on_leader_start().await;
677                                }
678                                LeaderChangeMessage::StepDown(leader) => {
679                                    error!("Leader :{:?} step down", leader);
680
681                                    state_handler.on_leader_stop().await;
682                                }
683                            }
684                        }
685                        Err(RecvError::Closed) => {
686                            error!("Not expected, is leader election loop still running?");
687                            break;
688                        }
689                        Err(RecvError::Lagged(_)) => {
690                            break;
691                        }
692                    }
693                }
694
695                state_handler.on_leader_stop().await;
696            });
697
698            // Register candidate and keep lease in background.
699            {
700                let election = election.clone();
701                let started = self.started.clone();
702                let node_info = self.node_info();
703                let _handle = common_runtime::spawn_global(async move {
704                    while started.load(Ordering::Acquire) {
705                        let res = election.register_candidate(&node_info).await;
706                        if let Err(e) = res {
707                            warn!(e; "Metasrv register candidate error");
708                        }
709                    }
710                });
711            }
712
713            // Campaign
714            {
715                let election = election.clone();
716                let started = self.started.clone();
717                let _handle = common_runtime::spawn_global(async move {
718                    while started.load(Ordering::Acquire) {
719                        let res = election.campaign().await;
720                        if let Err(e) = res {
721                            warn!(e; "Metasrv election error");
722                        }
723                        election.reset_campaign().await;
724                        info!("Metasrv re-initiate election");
725                    }
726                    info!("Metasrv stopped");
727                });
728            }
729        } else {
730            warn!(
731                "Ensure only one instance of Metasrv is running, as there is no election service."
732            );
733
734            if let Err(e) = self.wal_provider.start().await {
735                error!(e; "Failed to start wal provider");
736            }
737            // Always load kv into cached kv store.
738            self.leader_cached_kv_backend
739                .load()
740                .await
741                .context(KvBackendSnafu)?;
742            self.procedure_manager
743                .start()
744                .await
745                .context(StartProcedureManagerSnafu)?;
746        }
747
748        info!("Metasrv started");
749
750        Ok(())
751    }
752
753    pub async fn shutdown(&self) -> Result<()> {
754        if self
755            .started
756            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
757            .is_err()
758        {
759            warn!("Metasrv already stopped");
760            return Ok(());
761        }
762
763        self.procedure_manager
764            .stop()
765            .await
766            .context(StopProcedureManagerSnafu)?;
767
768        info!("Metasrv stopped");
769
770        Ok(())
771    }
772
773    pub fn start_time_ms(&self) -> u64 {
774        self.start_time_ms
775    }
776
777    pub fn resource_stat(&self) -> &ResourceStatRef {
778        &self.resource_stat
779    }
780
781    pub fn node_info(&self) -> MetasrvNodeInfo {
782        let build_info = common_version::build_info();
783        MetasrvNodeInfo {
784            addr: self.options().grpc.server_addr.clone(),
785            version: build_info.version.to_string(),
786            git_commit: build_info.commit_short.to_string(),
787            start_time_ms: self.start_time_ms(),
788            total_cpu_millicores: self.resource_stat.get_total_cpu_millicores(),
789            total_memory_bytes: self.resource_stat.get_total_memory_bytes(),
790            cpu_usage_millicores: self.resource_stat.get_cpu_usage_millicores(),
791            memory_usage_bytes: self.resource_stat.get_memory_usage_bytes(),
792            hostname: hostname::get()
793                .unwrap_or_default()
794                .to_string_lossy()
795                .to_string(),
796        }
797    }
798
799    /// Looks up a datanode peer by peer_id, returning it only when it's alive.
800    /// A datanode is considered alive when it's still within the lease period.
801    pub(crate) async fn lookup_datanode_peer(&self, peer_id: u64) -> Result<Option<Peer>> {
802        discovery::utils::alive_datanode(
803            &DefaultSystemTimer,
804            self.meta_peer_client.as_ref(),
805            peer_id,
806            default_distributed_time_constants().datanode_lease,
807        )
808        .await
809    }
810
811    pub fn options(&self) -> &MetasrvOptions {
812        &self.options
813    }
814
815    pub fn in_memory(&self) -> &ResettableKvBackendRef {
816        &self.in_memory
817    }
818
819    pub fn kv_backend(&self) -> &KvBackendRef {
820        &self.kv_backend
821    }
822
823    pub fn meta_peer_client(&self) -> &MetaPeerClientRef {
824        &self.meta_peer_client
825    }
826
827    pub fn selector(&self) -> &SelectorRef {
828        &self.selector
829    }
830
831    pub fn selector_ctx(&self) -> &SelectorContext {
832        &self.selector_ctx
833    }
834
835    pub fn flow_selector(&self) -> &SelectorRef {
836        &self.flow_selector
837    }
838
839    pub fn handler_group(&self) -> Option<HeartbeatHandlerGroupRef> {
840        self.handler_group.read().unwrap().clone()
841    }
842
843    pub fn election(&self) -> Option<&ElectionRef> {
844        self.election.as_ref()
845    }
846
847    pub fn mailbox(&self) -> &MailboxRef {
848        &self.mailbox
849    }
850
851    pub fn ddl_manager(&self) -> &DdlManagerRef {
852        &self.ddl_manager
853    }
854
855    pub fn procedure_manager(&self) -> &ProcedureManagerRef {
856        &self.procedure_manager
857    }
858
859    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
860        &self.table_metadata_manager
861    }
862
863    pub fn runtime_switch_manager(&self) -> &RuntimeSwitchManagerRef {
864        &self.runtime_switch_manager
865    }
866
867    pub fn memory_region_keeper(&self) -> &MemoryRegionKeeperRef {
868        &self.memory_region_keeper
869    }
870
871    pub fn region_migration_manager(&self) -> &RegionMigrationManagerRef {
872        &self.region_migration_manager
873    }
874
875    pub fn publish(&self) -> Option<PublisherRef> {
876        self.plugins.get::<PublisherRef>()
877    }
878
879    pub fn subscription_manager(&self) -> Option<SubscriptionManagerRef> {
880        self.plugins.get::<SubscriptionManagerRef>()
881    }
882
883    pub fn table_id_allocator(&self) -> &ResourceIdAllocatorRef {
884        &self.table_id_allocator
885    }
886
887    pub fn reconciliation_manager(&self) -> &ReconciliationManagerRef {
888        &self.reconciliation_manager
889    }
890
891    pub fn database_operator(&self) -> &DatabaseOperatorRef {
892        &self.database_operator
893    }
894
895    pub fn plugins(&self) -> &Plugins {
896        &self.plugins
897    }
898
899    pub fn started(&self) -> Arc<AtomicBool> {
900        self.started.clone()
901    }
902
903    pub fn gc_ticker(&self) -> Option<GcTickerRef> {
904        self.gc_ticker.as_ref().cloned()
905    }
906
907    #[inline]
908    pub fn new_ctx(&self) -> Context {
909        let server_addr = self.options().grpc.server_addr.clone();
910        let in_memory = self.in_memory.clone();
911        let kv_backend = self.kv_backend.clone();
912        let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
913        let meta_peer_client = self.meta_peer_client.clone();
914        let mailbox = self.mailbox.clone();
915        let election = self.election.clone();
916        let table_metadata_manager = self.table_metadata_manager.clone();
917        let cache_invalidator = self.cache_invalidator.clone();
918        let leader_region_registry = self.leader_region_registry.clone();
919        let topic_stats_registry = self.topic_stats_registry.clone();
920
921        Context {
922            server_addr,
923            in_memory,
924            kv_backend,
925            leader_cached_kv_backend,
926            meta_peer_client,
927            mailbox,
928            election,
929            is_infancy: false,
930            table_metadata_manager,
931            cache_invalidator,
932            leader_region_registry,
933            topic_stats_registry,
934            heartbeat_interval: self.options().heartbeat_interval,
935            is_handshake: false,
936        }
937    }
938}
939
940#[cfg(test)]
941mod tests {
942    use crate::metasrv::MetasrvNodeInfo;
943
944    #[test]
945    fn test_deserialize_metasrv_node_info() {
946        let str = r#"{"addr":"127.0.0.1:4002","version":"0.1.0","git_commit":"1234567890","start_time_ms":1715145600}"#;
947        let node_info: MetasrvNodeInfo = serde_json::from_str(str).unwrap();
948        assert_eq!(node_info.addr, "127.0.0.1:4002");
949        assert_eq!(node_info.version, "0.1.0");
950        assert_eq!(node_info.git_commit, "1234567890");
951        assert_eq!(node_info.start_time_ms, 1715145600);
952    }
953}