1#![feature(assert_matches, let_chains)]
16
17use async_trait::async_trait;
18use common_error::ext::ErrorExt;
19use common_error::status_code::StatusCode;
20use common_mem_prof::activate_heap_profile;
21use common_telemetry::{error, info, warn};
22use stat::{get_cpu_limit, get_memory_limit};
23
24use crate::error::Result;
25
26pub mod cli;
27pub mod datanode;
28pub mod error;
29pub mod flownode;
30pub mod frontend;
31pub mod metasrv;
32pub mod options;
33pub mod standalone;
34
35lazy_static::lazy_static! {
36 static ref APP_VERSION: prometheus::IntGaugeVec =
37 prometheus::register_int_gauge_vec!("greptime_app_version", "app version", &["version", "short_version", "app"]).unwrap();
38
39 static ref CPU_LIMIT: prometheus::IntGaugeVec =
40 prometheus::register_int_gauge_vec!("greptime_cpu_limit_in_millicores", "cpu limit in millicores", &["app"]).unwrap();
41
42 static ref MEMORY_LIMIT: prometheus::IntGaugeVec =
43 prometheus::register_int_gauge_vec!("greptime_memory_limit_in_bytes", "memory limit in bytes", &["app"]).unwrap();
44}
45
46#[cfg(unix)]
48async fn start_wait_for_close_signal() -> std::io::Result<()> {
49 use tokio::signal::unix::{signal, SignalKind};
50 let mut sigint = signal(SignalKind::interrupt())?;
51 let mut sigterm = signal(SignalKind::terminate())?;
52
53 tokio::select! {
54 _ = sigint.recv() => {
55 info!("Received SIGINT, shutting down");
56 }
57 _ = sigterm.recv() => {
58 info!("Received SIGTERM, shutting down");
59 }
60 }
61
62 Ok(())
63}
64
65#[cfg(not(unix))]
67async fn start_wait_for_close_signal() -> std::io::Result<()> {
68 tokio::signal::ctrl_c().await
69}
70
71#[async_trait]
72pub trait App: Send {
73 fn name(&self) -> &str;
74
75 async fn pre_start(&mut self) -> Result<()> {
77 Ok(())
78 }
79
80 async fn start(&mut self) -> Result<()>;
81
82 fn wait_signal(&self) -> bool {
84 true
85 }
86
87 async fn stop(&mut self) -> Result<()>;
88
89 async fn run(&mut self) -> Result<()> {
90 info!("Starting app: {}", self.name());
91
92 self.pre_start().await?;
93
94 self.start().await?;
95
96 if self.wait_signal() {
97 if let Err(e) = start_wait_for_close_signal().await {
98 error!(e; "Failed to listen for close signal");
99 }
103 }
104
105 self.stop().await?;
106 info!("Goodbye!");
107 Ok(())
108 }
109}
110
111pub fn log_versions(version: &str, short_version: &str, app: &str) {
116 APP_VERSION
118 .with_label_values(&[common_version::version(), short_version, app])
119 .inc();
120
121 info!("GreptimeDB version: {}", version);
123
124 log_env_flags();
125}
126
127pub fn create_resource_limit_metrics(app: &str) {
128 if let Some(cpu_limit) = get_cpu_limit() {
129 info!(
130 "GreptimeDB start with cpu limit in millicores: {}",
131 cpu_limit
132 );
133 CPU_LIMIT.with_label_values(&[app]).set(cpu_limit);
134 }
135
136 if let Some(memory_limit) = get_memory_limit() {
137 info!(
138 "GreptimeDB start with memory limit in bytes: {}",
139 memory_limit
140 );
141 MEMORY_LIMIT.with_label_values(&[app]).set(memory_limit);
142 }
143}
144
145fn log_env_flags() {
146 info!("command line arguments");
147 for argument in std::env::args() {
148 info!("argument: {}", argument);
149 }
150}
151
152pub fn maybe_activate_heap_profile(memory_options: &common_options::memory::MemoryOptions) {
153 if memory_options.enable_heap_profiling {
154 match activate_heap_profile() {
155 Ok(()) => {
156 info!("Heap profile is active");
157 }
158 Err(err) => {
159 if err.status_code() == StatusCode::Unsupported {
160 info!("Heap profile is not supported");
161 } else {
162 warn!(err; "Failed to activate heap profile");
163 }
164 }
165 }
166 }
167}