summaryrefslogtreecommitdiff
path: root/libs/all_in_one/src/ffi.rs
blob: b4c853de9ae6cd95463f084396e8a42abda07f22 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#![allow(clippy::missing_safety_doc)]
use std::{
    ffi::{c_char, c_int, CString},
    mem::take,
    ptr,
    str::FromStr,
    time::Duration,
};

use gateway::GatewayServer;
use opentelemetry::{global::set_text_map_propagator, sdk::propagation::TraceContextPropagator};
use ratelimit::RatelimiterServerComponent;
use rest::ReverseProxyServer;
use tokio::{runtime::Runtime, sync::mpsc};
use tracing::{debug, error};
use tracing_subscriber::{
    filter::Directive, fmt, prelude::__tracing_subscriber_SubscriberExt, util::SubscriberInitExt,
    EnvFilter,
};
use webhook::WebhookServer;

use crate::{
    errors::{handle_error, wrap_result, ERROR_HANDLER},
    utils::{load_config_file, start_component, AllInOneInstance},
};

#[no_mangle]
pub unsafe extern "C" fn set_error_handler(func: unsafe extern "C" fn(c_int, *mut c_char)) {
    debug!("Setting error handler");
    ERROR_HANDLER.with(|prev| {
        *prev.borrow_mut() = Some(func);
    });
}

#[no_mangle]
/// Loads the config json using the nova shared config loader
pub extern "C" fn load_config() -> *mut c_char {
    wrap_result(move || {
        let config = serde_json::to_string(&load_config_file()?)?;
        let c_str_song = CString::new(config)?;
        Ok(c_str_song.into_raw())
    })
    .or(Some(ptr::null::<c_char>() as *mut c_char))
    .expect("something has gone terribly wrong")
}

#[no_mangle]
pub unsafe extern "C" fn stop_instance(instance: *mut AllInOneInstance) {
    wrap_result(move || {
        let mut instance = Box::from_raw(instance);
        let handles = take(&mut instance.handles);
        instance.runtime.block_on(async move {
            for (name, sender, join) in handles {
                debug!("Halting component {}", name);
                let _ = sender
                    .send(())
                    .map_err(|_| error!("Component {} is not online", name));
                match join.await {
                    Ok(_) => {}
                    Err(error) => error!("Task for component {} panic'ed {}", name, error),
                };
                debug!("Component {} halted", name);
            }
        });

        instance.runtime.shutdown_timeout(Duration::from_secs(5));

        Ok(())
    });
}

/// # Panics
/// Panics if an incorrect `RUST_LOG` variables is specified.
#[no_mangle]
pub unsafe extern "C" fn create_instance(config: *mut c_char) -> *mut AllInOneInstance {
    // Returning a null pointer (unaligned) is expected.
    #[allow(clippy::cast_ptr_alignment)]
    wrap_result(move || {
        let value = CString::from_raw(config);
        let json = value.to_str()?;

        // Main stop signal for this instance
        let (error_sender, mut errors) = mpsc::channel(50);
        let mut handles = vec![];

        let runtime = Runtime::new()?;

        // Setup the tracing system
        set_text_map_propagator(TraceContextPropagator::new());
        tracing_subscriber::registry()
            .with(fmt::layer())
            .with(
                EnvFilter::builder()
                    .with_default_directive(Directive::from_str("info").expect(""))
                    .from_env()
                    .unwrap(),
            )
            .init();

        // Error handling task
        runtime.spawn(async move {
            while let Some(error) = errors.recv().await {
                handle_error(&error);
            }
        });

        handles.push(start_component::<GatewayServer>(
            json,
            error_sender.clone(),
            &runtime,
        )?);

        std::thread::sleep(Duration::from_secs(1));

        handles.push(start_component::<RatelimiterServerComponent>(
            json,
            error_sender.clone(),
            &runtime,
        )?);

        std::thread::sleep(Duration::from_secs(1));

        handles.push(start_component::<ReverseProxyServer>(
            json,
            error_sender.clone(),
            &runtime,
        )?);

        std::thread::sleep(Duration::from_secs(1));

        handles.push(start_component::<WebhookServer>(
            json,
            error_sender,
            &runtime,
        )?);

        let all_in_one = Box::into_raw(Box::new(AllInOneInstance { runtime, handles }));

        Ok(all_in_one)
    })
    .or(Some(ptr::null::<AllInOneInstance>() as *mut AllInOneInstance))
    .expect("something has gone terribly wrong")
}