cmd/
flownode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::Path;
16use std::sync::Arc;
17use std::time::Duration;
18
19use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
20use catalog::information_extension::DistributedInformationExtension;
21use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend};
22use clap::Parser;
23use client::client_manager::NodeClients;
24use common_base::Plugins;
25use common_config::{Configurable, DEFAULT_DATA_HOME};
26use common_grpc::channel_manager::ChannelConfig;
27use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
28use common_meta::heartbeat::handler::HandlerGroupExecutor;
29use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
30use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
31use common_meta::key::TableMetadataManager;
32use common_meta::key::flow::FlowMetadataManager;
33use common_stat::ResourceStatImpl;
34use common_telemetry::info;
35use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
36use common_version::{short_version, verbose_version};
37use flow::{
38    FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder, FrontendClient, FrontendInvoker,
39    get_flow_auth_options,
40};
41use meta_client::{MetaClientOptions, MetaClientType};
42use snafu::{OptionExt, ResultExt, ensure};
43use tracing_appender::non_blocking::WorkerGuard;
44
45use crate::error::{
46    BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu,
47    MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
48};
49use crate::options::{GlobalOptions, GreptimeOptions};
50use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
51
52pub const APP_NAME: &str = "greptime-flownode";
53
54type FlownodeOptions = GreptimeOptions<flow::FlownodeOptions>;
55
56pub struct Instance {
57    flownode: FlownodeInstance,
58
59    // The components of flownode, which make it easier to expand based
60    // on the components.
61    #[cfg(feature = "enterprise")]
62    components: Components,
63
64    // Keep the logging guard to prevent the worker from being dropped.
65    _guard: Vec<WorkerGuard>,
66}
67
68#[cfg(feature = "enterprise")]
69pub struct Components {
70    pub catalog_manager: catalog::CatalogManagerRef,
71    pub fe_client: Arc<FrontendClient>,
72    pub kv_backend: common_meta::kv_backend::KvBackendRef,
73}
74
75impl Instance {
76    pub fn new(
77        flownode: FlownodeInstance,
78        #[cfg(feature = "enterprise")] components: Components,
79        guard: Vec<WorkerGuard>,
80    ) -> Self {
81        Self {
82            flownode,
83            #[cfg(feature = "enterprise")]
84            components,
85            _guard: guard,
86        }
87    }
88
89    pub fn flownode(&self) -> &FlownodeInstance {
90        &self.flownode
91    }
92
93    /// allow customizing flownode for downstream projects
94    pub fn flownode_mut(&mut self) -> &mut FlownodeInstance {
95        &mut self.flownode
96    }
97
98    #[cfg(feature = "enterprise")]
99    pub fn components(&self) -> &Components {
100        &self.components
101    }
102}
103
104#[async_trait::async_trait]
105impl App for Instance {
106    fn name(&self) -> &str {
107        APP_NAME
108    }
109
110    async fn start(&mut self) -> Result<()> {
111        plugins::start_flownode_plugins(self.flownode.flow_engine().plugins().clone())
112            .await
113            .context(StartFlownodeSnafu)?;
114
115        self.flownode.start().await.context(StartFlownodeSnafu)
116    }
117
118    async fn stop(&mut self) -> Result<()> {
119        self.flownode
120            .shutdown()
121            .await
122            .context(ShutdownFlownodeSnafu)
123    }
124}
125
126#[derive(Parser)]
127pub struct Command {
128    #[clap(subcommand)]
129    subcmd: SubCommand,
130}
131
132impl Command {
133    pub async fn build(&self, opts: FlownodeOptions) -> Result<Instance> {
134        self.subcmd.build(opts).await
135    }
136
137    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<FlownodeOptions> {
138        match &self.subcmd {
139            SubCommand::Start(cmd) => cmd.load_options(global_options),
140        }
141    }
142}
143
144#[derive(Parser)]
145enum SubCommand {
146    Start(StartCommand),
147}
148
149impl SubCommand {
150    async fn build(&self, opts: FlownodeOptions) -> Result<Instance> {
151        match self {
152            SubCommand::Start(cmd) => cmd.build(opts).await,
153        }
154    }
155}
156
157#[derive(Debug, Parser, Default)]
158struct StartCommand {
159    /// Flownode's id
160    #[clap(long)]
161    node_id: Option<u64>,
162    /// Bind address for the gRPC server.
163    #[clap(long, alias = "rpc-addr")]
164    rpc_bind_addr: Option<String>,
165    /// The address advertised to the metasrv, and used for connections from outside the host.
166    /// If left empty or unset, the server will automatically use the IP address of the first network interface
167    /// on the host, with the same port number as the one specified in `rpc_bind_addr`.
168    #[clap(long, alias = "rpc-hostname")]
169    rpc_server_addr: Option<String>,
170    /// Metasrv address list;
171    #[clap(long, value_delimiter = ',', num_args = 1..)]
172    metasrv_addrs: Option<Vec<String>>,
173    /// The configuration file for flownode
174    #[clap(short, long)]
175    config_file: Option<String>,
176    /// The prefix of environment variables, default is `GREPTIMEDB_FLOWNODE`;
177    #[clap(long, default_value = "GREPTIMEDB_FLOWNODE")]
178    env_prefix: String,
179    #[clap(long)]
180    http_addr: Option<String>,
181    /// HTTP request timeout in seconds.
182    #[clap(long)]
183    http_timeout: Option<u64>,
184    /// User Provider cfg, for auth, currently only support static user provider
185    #[clap(long)]
186    user_provider: Option<String>,
187}
188
189impl StartCommand {
190    fn load_options(&self, global_options: &GlobalOptions) -> Result<FlownodeOptions> {
191        let mut opts = FlownodeOptions::load_layered_options(
192            self.config_file.as_deref(),
193            self.env_prefix.as_ref(),
194        )
195        .context(LoadLayeredConfigSnafu)?;
196
197        self.merge_with_cli_options(global_options, &mut opts)?;
198
199        Ok(opts)
200    }
201
202    // The precedence order is: cli > config file > environment variables > default values.
203    fn merge_with_cli_options(
204        &self,
205        global_options: &GlobalOptions,
206        opts: &mut FlownodeOptions,
207    ) -> Result<()> {
208        let opts = &mut opts.component;
209
210        if let Some(dir) = &global_options.log_dir {
211            opts.logging.dir.clone_from(dir);
212        }
213
214        // If the logging dir is not set, use the default logs dir in the data home.
215        if opts.logging.dir.is_empty() {
216            opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
217                .join(DEFAULT_LOGGING_DIR)
218                .to_string_lossy()
219                .to_string();
220        }
221
222        if global_options.log_level.is_some() {
223            opts.logging.level.clone_from(&global_options.log_level);
224        }
225
226        opts.tracing = TracingOptions {
227            #[cfg(feature = "tokio-console")]
228            tokio_console_addr: global_options.tokio_console_addr.clone(),
229        };
230
231        if let Some(addr) = &self.rpc_bind_addr {
232            opts.grpc.bind_addr.clone_from(addr);
233        }
234
235        if let Some(server_addr) = &self.rpc_server_addr {
236            opts.grpc.server_addr.clone_from(server_addr);
237        }
238
239        if let Some(node_id) = self.node_id {
240            opts.node_id = Some(node_id);
241        }
242
243        if let Some(metasrv_addrs) = &self.metasrv_addrs {
244            opts.meta_client
245                .get_or_insert_with(MetaClientOptions::default)
246                .metasrv_addrs
247                .clone_from(metasrv_addrs);
248        }
249
250        if let Some(http_addr) = &self.http_addr {
251            opts.http.addr.clone_from(http_addr);
252        }
253
254        if let Some(http_timeout) = self.http_timeout {
255            opts.http.timeout = Duration::from_secs(http_timeout);
256        }
257
258        if let Some(user_provider) = &self.user_provider {
259            opts.user_provider = Some(user_provider.clone());
260        }
261
262        ensure!(
263            opts.node_id.is_some(),
264            MissingConfigSnafu {
265                msg: "Missing node id option"
266            }
267        );
268
269        Ok(())
270    }
271
272    async fn build(&self, opts: FlownodeOptions) -> Result<Instance> {
273        common_runtime::init_global_runtimes(&opts.runtime);
274
275        let guard = common_telemetry::init_global_logging(
276            APP_NAME,
277            &opts.component.logging,
278            &opts.component.tracing,
279            opts.component.node_id.map(|x| x.to_string()),
280            None,
281        );
282
283        log_versions(verbose_version(), short_version(), APP_NAME);
284        maybe_activate_heap_profile(&opts.component.memory);
285        create_resource_limit_metrics(APP_NAME);
286
287        info!("Flownode start command: {:#?}", self);
288        info!("Flownode options: {:#?}", opts);
289
290        let plugin_opts = opts.plugins;
291        let mut opts = opts.component;
292        opts.grpc.detect_server_addr();
293
294        let mut plugins = Plugins::new();
295        plugins::setup_flownode_plugins(&mut plugins, &plugin_opts, &opts)
296            .await
297            .context(StartFlownodeSnafu)?;
298
299        let member_id = opts
300            .node_id
301            .context(MissingConfigSnafu { msg: "'node_id'" })?;
302
303        let meta_config = opts.meta_client.as_ref().context(MissingConfigSnafu {
304            msg: "'meta_client_options'",
305        })?;
306
307        let meta_client = meta_client::create_meta_client(
308            MetaClientType::Flownode { member_id },
309            meta_config,
310            None,
311            None,
312        )
313        .await
314        .context(MetaClientInitSnafu)?;
315
316        let cache_max_capacity = meta_config.metadata_cache_max_capacity;
317        let cache_ttl = meta_config.metadata_cache_ttl;
318        let cache_tti = meta_config.metadata_cache_tti;
319
320        // TODO(discord9): add helper function to ease the creation of cache registry&such
321        let cached_meta_backend =
322            CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
323                .cache_max_capacity(cache_max_capacity)
324                .cache_ttl(cache_ttl)
325                .cache_tti(cache_tti)
326                .build();
327        let cached_meta_backend = Arc::new(cached_meta_backend);
328
329        // Builds cache registry
330        let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
331            CacheRegistryBuilder::default()
332                .add_cache(cached_meta_backend.clone())
333                .build(),
334        );
335        let fundamental_cache_registry =
336            build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
337        let layered_cache_registry = Arc::new(
338            with_default_composite_cache_registry(
339                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
340            )
341            .context(BuildCacheRegistrySnafu)?
342            .build(),
343        );
344
345        // flownode's frontend to datanode need not timeout.
346        // Some queries are expected to take long time.
347        let channel_config = ChannelConfig {
348            timeout: None,
349            ..Default::default()
350        };
351        let client = Arc::new(NodeClients::new(channel_config));
352
353        let information_extension = Arc::new(DistributedInformationExtension::new(
354            meta_client.clone(),
355            client.clone(),
356        ));
357        let catalog_manager = KvBackendCatalogManagerBuilder::new(
358            information_extension,
359            cached_meta_backend.clone(),
360            layered_cache_registry.clone(),
361        )
362        .build();
363
364        let table_metadata_manager =
365            Arc::new(TableMetadataManager::new(cached_meta_backend.clone()));
366        table_metadata_manager
367            .init()
368            .await
369            .context(InitMetadataSnafu)?;
370
371        let executor = HandlerGroupExecutor::new(vec![
372            Arc::new(ParseMailboxMessageHandler),
373            Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
374        ]);
375
376        let mut resource_stat = ResourceStatImpl::default();
377        resource_stat.start_collect_cpu_usage();
378
379        let heartbeat_task = flow::heartbeat::HeartbeatTask::new(
380            &opts,
381            meta_client.clone(),
382            opts.heartbeat.clone(),
383            Arc::new(executor),
384            Arc::new(resource_stat),
385        );
386
387        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone()));
388        let flow_auth_header = get_flow_auth_options(&opts).context(StartFlownodeSnafu)?;
389        let frontend_client = FrontendClient::from_meta_client(
390            meta_client.clone(),
391            flow_auth_header,
392            opts.query.clone(),
393            opts.flow.batching_mode.clone(),
394        )
395        .context(StartFlownodeSnafu)?;
396        let frontend_client = Arc::new(frontend_client);
397        let flownode_builder = FlownodeBuilder::new(
398            opts.clone(),
399            plugins,
400            table_metadata_manager,
401            catalog_manager.clone(),
402            flow_metadata_manager,
403            frontend_client.clone(),
404        )
405        .with_heartbeat_task(heartbeat_task);
406
407        let mut flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?;
408        let services = FlownodeServiceBuilder::new(&opts)
409            .with_default_grpc_server(flownode.flownode_server())
410            .enable_http_service()
411            .build()
412            .context(StartFlownodeSnafu)?;
413        flownode.setup_services(services);
414        let flownode = flownode;
415
416        let invoker = FrontendInvoker::build_from(
417            flownode.flow_engine().streaming_engine(),
418            catalog_manager.clone(),
419            cached_meta_backend.clone(),
420            layered_cache_registry.clone(),
421            meta_client.clone(),
422            client,
423        )
424        .await
425        .context(StartFlownodeSnafu)?;
426        flownode
427            .flow_engine()
428            .streaming_engine()
429            // TODO(discord9): refactor and avoid circular reference
430            .set_frontend_invoker(invoker)
431            .await;
432
433        #[cfg(feature = "enterprise")]
434        let components = Components {
435            catalog_manager: catalog_manager.clone(),
436            fe_client: frontend_client,
437            kv_backend: cached_meta_backend,
438        };
439
440        #[cfg(not(feature = "enterprise"))]
441        return Ok(Instance::new(flownode, guard));
442        #[cfg(feature = "enterprise")]
443        Ok(Instance::new(flownode, components, guard))
444    }
445}