1use std::path::Path;
16use std::sync::Arc;
17use std::time::Duration;
18
19use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
20use catalog::information_extension::DistributedInformationExtension;
21use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend};
22use clap::Parser;
23use client::client_manager::NodeClients;
24use common_base::Plugins;
25use common_config::{Configurable, DEFAULT_DATA_HOME};
26use common_grpc::channel_manager::ChannelConfig;
27use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
28use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
29use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
30use common_meta::heartbeat::handler::HandlerGroupExecutor;
31use common_meta::key::flow::FlowMetadataManager;
32use common_meta::key::TableMetadataManager;
33use common_telemetry::info;
34use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
35use common_version::{short_version, verbose_version};
36use flow::{
37 get_flow_auth_options, FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder,
38 FrontendClient, FrontendInvoker,
39};
40use meta_client::{MetaClientOptions, MetaClientType};
41use snafu::{ensure, OptionExt, ResultExt};
42use tracing_appender::non_blocking::WorkerGuard;
43
44use crate::error::{
45 BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu,
46 MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
47};
48use crate::options::{GlobalOptions, GreptimeOptions};
49use crate::{create_resource_limit_metrics, log_versions, App};
50
51pub const APP_NAME: &str = "greptime-flownode";
52
53type FlownodeOptions = GreptimeOptions<flow::FlownodeOptions>;
54
55pub struct Instance {
56 flownode: FlownodeInstance,
57
58 #[cfg(feature = "enterprise")]
61 components: Components,
62
63 _guard: Vec<WorkerGuard>,
65}
66
67#[cfg(feature = "enterprise")]
68pub struct Components {
69 pub catalog_manager: catalog::CatalogManagerRef,
70 pub fe_client: Arc<FrontendClient>,
71 pub kv_backend: common_meta::kv_backend::KvBackendRef,
72}
73
74impl Instance {
75 pub fn new(
76 flownode: FlownodeInstance,
77 #[cfg(feature = "enterprise")] components: Components,
78 guard: Vec<WorkerGuard>,
79 ) -> Self {
80 Self {
81 flownode,
82 #[cfg(feature = "enterprise")]
83 components,
84 _guard: guard,
85 }
86 }
87
88 pub fn flownode(&self) -> &FlownodeInstance {
89 &self.flownode
90 }
91
92 pub fn flownode_mut(&mut self) -> &mut FlownodeInstance {
94 &mut self.flownode
95 }
96
97 #[cfg(feature = "enterprise")]
98 pub fn components(&self) -> &Components {
99 &self.components
100 }
101}
102
103#[async_trait::async_trait]
104impl App for Instance {
105 fn name(&self) -> &str {
106 APP_NAME
107 }
108
109 async fn start(&mut self) -> Result<()> {
110 plugins::start_flownode_plugins(self.flownode.flow_engine().plugins().clone())
111 .await
112 .context(StartFlownodeSnafu)?;
113
114 self.flownode.start().await.context(StartFlownodeSnafu)
115 }
116
117 async fn stop(&mut self) -> Result<()> {
118 self.flownode
119 .shutdown()
120 .await
121 .context(ShutdownFlownodeSnafu)
122 }
123}
124
125#[derive(Parser)]
126pub struct Command {
127 #[clap(subcommand)]
128 subcmd: SubCommand,
129}
130
131impl Command {
132 pub async fn build(&self, opts: FlownodeOptions) -> Result<Instance> {
133 self.subcmd.build(opts).await
134 }
135
136 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<FlownodeOptions> {
137 match &self.subcmd {
138 SubCommand::Start(cmd) => cmd.load_options(global_options),
139 }
140 }
141}
142
143#[derive(Parser)]
144enum SubCommand {
145 Start(StartCommand),
146}
147
148impl SubCommand {
149 async fn build(&self, opts: FlownodeOptions) -> Result<Instance> {
150 match self {
151 SubCommand::Start(cmd) => cmd.build(opts).await,
152 }
153 }
154}
155
156#[derive(Debug, Parser, Default)]
157struct StartCommand {
158 #[clap(long)]
160 node_id: Option<u64>,
161 #[clap(long, alias = "rpc-addr")]
163 rpc_bind_addr: Option<String>,
164 #[clap(long, alias = "rpc-hostname")]
168 rpc_server_addr: Option<String>,
169 #[clap(long, value_delimiter = ',', num_args = 1..)]
171 metasrv_addrs: Option<Vec<String>>,
172 #[clap(short, long)]
174 config_file: Option<String>,
175 #[clap(long, default_value = "GREPTIMEDB_FLOWNODE")]
177 env_prefix: String,
178 #[clap(long)]
179 http_addr: Option<String>,
180 #[clap(long)]
182 http_timeout: Option<u64>,
183 #[clap(long)]
185 user_provider: Option<String>,
186}
187
188impl StartCommand {
189 fn load_options(&self, global_options: &GlobalOptions) -> Result<FlownodeOptions> {
190 let mut opts = FlownodeOptions::load_layered_options(
191 self.config_file.as_deref(),
192 self.env_prefix.as_ref(),
193 )
194 .context(LoadLayeredConfigSnafu)?;
195
196 self.merge_with_cli_options(global_options, &mut opts)?;
197
198 Ok(opts)
199 }
200
201 fn merge_with_cli_options(
203 &self,
204 global_options: &GlobalOptions,
205 opts: &mut FlownodeOptions,
206 ) -> Result<()> {
207 let opts = &mut opts.component;
208
209 if let Some(dir) = &global_options.log_dir {
210 opts.logging.dir.clone_from(dir);
211 }
212
213 if opts.logging.dir.is_empty() {
215 opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
216 .join(DEFAULT_LOGGING_DIR)
217 .to_string_lossy()
218 .to_string();
219 }
220
221 if global_options.log_level.is_some() {
222 opts.logging.level.clone_from(&global_options.log_level);
223 }
224
225 opts.tracing = TracingOptions {
226 #[cfg(feature = "tokio-console")]
227 tokio_console_addr: global_options.tokio_console_addr.clone(),
228 };
229
230 if let Some(addr) = &self.rpc_bind_addr {
231 opts.grpc.bind_addr.clone_from(addr);
232 }
233
234 if let Some(server_addr) = &self.rpc_server_addr {
235 opts.grpc.server_addr.clone_from(server_addr);
236 }
237
238 if let Some(node_id) = self.node_id {
239 opts.node_id = Some(node_id);
240 }
241
242 if let Some(metasrv_addrs) = &self.metasrv_addrs {
243 opts.meta_client
244 .get_or_insert_with(MetaClientOptions::default)
245 .metasrv_addrs
246 .clone_from(metasrv_addrs);
247 }
248
249 if let Some(http_addr) = &self.http_addr {
250 opts.http.addr.clone_from(http_addr);
251 }
252
253 if let Some(http_timeout) = self.http_timeout {
254 opts.http.timeout = Duration::from_secs(http_timeout);
255 }
256
257 if let Some(user_provider) = &self.user_provider {
258 opts.user_provider = Some(user_provider.clone());
259 }
260
261 ensure!(
262 opts.node_id.is_some(),
263 MissingConfigSnafu {
264 msg: "Missing node id option"
265 }
266 );
267
268 Ok(())
269 }
270
271 async fn build(&self, opts: FlownodeOptions) -> Result<Instance> {
272 common_runtime::init_global_runtimes(&opts.runtime);
273
274 let guard = common_telemetry::init_global_logging(
275 APP_NAME,
276 &opts.component.logging,
277 &opts.component.tracing,
278 opts.component.node_id.map(|x| x.to_string()),
279 None,
280 );
281
282 log_versions(verbose_version(), short_version(), APP_NAME);
283 create_resource_limit_metrics(APP_NAME);
284
285 info!("Flownode start command: {:#?}", self);
286 info!("Flownode options: {:#?}", opts);
287
288 let plugin_opts = opts.plugins;
289 let mut opts = opts.component;
290 opts.grpc.detect_server_addr();
291
292 let mut plugins = Plugins::new();
293 plugins::setup_flownode_plugins(&mut plugins, &plugin_opts, &opts)
294 .await
295 .context(StartFlownodeSnafu)?;
296
297 let member_id = opts
298 .node_id
299 .context(MissingConfigSnafu { msg: "'node_id'" })?;
300
301 let meta_config = opts.meta_client.as_ref().context(MissingConfigSnafu {
302 msg: "'meta_client_options'",
303 })?;
304
305 let meta_client = meta_client::create_meta_client(
306 MetaClientType::Flownode { member_id },
307 meta_config,
308 None,
309 None,
310 )
311 .await
312 .context(MetaClientInitSnafu)?;
313
314 let cache_max_capacity = meta_config.metadata_cache_max_capacity;
315 let cache_ttl = meta_config.metadata_cache_ttl;
316 let cache_tti = meta_config.metadata_cache_tti;
317
318 let cached_meta_backend =
320 CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
321 .cache_max_capacity(cache_max_capacity)
322 .cache_ttl(cache_ttl)
323 .cache_tti(cache_tti)
324 .build();
325 let cached_meta_backend = Arc::new(cached_meta_backend);
326
327 let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
329 CacheRegistryBuilder::default()
330 .add_cache(cached_meta_backend.clone())
331 .build(),
332 );
333 let fundamental_cache_registry =
334 build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
335 let layered_cache_registry = Arc::new(
336 with_default_composite_cache_registry(
337 layered_cache_builder.add_cache_registry(fundamental_cache_registry),
338 )
339 .context(BuildCacheRegistrySnafu)?
340 .build(),
341 );
342
343 let information_extension =
344 Arc::new(DistributedInformationExtension::new(meta_client.clone()));
345 let catalog_manager = KvBackendCatalogManagerBuilder::new(
346 information_extension,
347 cached_meta_backend.clone(),
348 layered_cache_registry.clone(),
349 )
350 .build();
351
352 let table_metadata_manager =
353 Arc::new(TableMetadataManager::new(cached_meta_backend.clone()));
354 table_metadata_manager
355 .init()
356 .await
357 .context(InitMetadataSnafu)?;
358
359 let executor = HandlerGroupExecutor::new(vec![
360 Arc::new(ParseMailboxMessageHandler),
361 Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
362 ]);
363
364 let heartbeat_task = flow::heartbeat::HeartbeatTask::new(
365 &opts,
366 meta_client.clone(),
367 opts.heartbeat.clone(),
368 Arc::new(executor),
369 );
370
371 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone()));
372 let flow_auth_header = get_flow_auth_options(&opts).context(StartFlownodeSnafu)?;
373 let frontend_client = FrontendClient::from_meta_client(
374 meta_client.clone(),
375 flow_auth_header,
376 opts.query.clone(),
377 );
378 let frontend_client = Arc::new(frontend_client);
379 let flownode_builder = FlownodeBuilder::new(
380 opts.clone(),
381 plugins,
382 table_metadata_manager,
383 catalog_manager.clone(),
384 flow_metadata_manager,
385 frontend_client.clone(),
386 )
387 .with_heartbeat_task(heartbeat_task);
388
389 let mut flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?;
390 let services = FlownodeServiceBuilder::new(&opts)
391 .with_default_grpc_server(flownode.flownode_server())
392 .enable_http_service()
393 .build()
394 .context(StartFlownodeSnafu)?;
395 flownode.setup_services(services);
396 let flownode = flownode;
397
398 let channel_config = ChannelConfig {
401 timeout: None,
402 ..Default::default()
403 };
404 let client = Arc::new(NodeClients::new(channel_config));
405
406 let invoker = FrontendInvoker::build_from(
407 flownode.flow_engine().streaming_engine(),
408 catalog_manager.clone(),
409 cached_meta_backend.clone(),
410 layered_cache_registry.clone(),
411 meta_client.clone(),
412 client,
413 )
414 .await
415 .context(StartFlownodeSnafu)?;
416 flownode
417 .flow_engine()
418 .streaming_engine()
419 .set_frontend_invoker(invoker)
421 .await;
422
423 #[cfg(feature = "enterprise")]
424 let components = Components {
425 catalog_manager: catalog_manager.clone(),
426 fe_client: frontend_client,
427 kv_backend: cached_meta_backend,
428 };
429
430 #[cfg(not(feature = "enterprise"))]
431 return Ok(Instance::new(flownode, guard));
432 #[cfg(feature = "enterprise")]
433 Ok(Instance::new(flownode, components, guard))
434 }
435}