Add new Drain feature (#1529)

Add new Drain feature

-when coturn server is in drain mode
  -current allocations will continue to work as usual
  -new allocations will be rejected with a 403 (Forbidden) response
  -when all allocations go away, then coturn will shutdown
-Enable drain mode with either
  -signaling SIGUSR1
  -turn_admin_server "drain" CLI command

This contribution is from Wire. https://wire.com/
This commit is contained in:
Scott Godin 2024-10-27 21:56:58 -04:00 committed by GitHub
parent 8e3a03d2da
commit edcdfc8b02
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 95 additions and 5 deletions

View File

@ -188,7 +188,8 @@ turn_params_t turn_params = {
{NULL, 0, {0, NULL}}, /*tls_alternate_servers_list*/
/////////////// stop server ////////////////
0, /*stop_turn_server*/
false, /*drain_turn_server*/
false, /*stop_turn_server*/
/////////////// MISC PARAMS ////////////////
0, /* stun_only */
@ -262,6 +263,7 @@ static void read_config_file(int argc, char **argv, int pass);
static void reload_ssl_certs(evutil_socket_t sock, short events, void *args);
static void shutdown_handler(evutil_socket_t sock, short events, void *args);
static void drain_handler(evutil_socket_t sock, short events, void *args);
//////////////////////////////////////////////////
@ -3344,6 +3346,8 @@ int main(int argc, char **argv) {
event_add(ev, NULL);
ev = evsignal_new(turn_params.listener.event_base, SIGINT, shutdown_handler, NULL);
event_add(ev, NULL);
ev = evsignal_new(turn_params.listener.event_base, SIGUSR1, drain_handler, NULL);
event_add(ev, NULL);
#endif
drop_privileges();
@ -3987,7 +3991,15 @@ static void reload_ssl_certs(evutil_socket_t sock, short events, void *args) {
static void shutdown_handler(evutil_socket_t sock, short events, void *args) {
TURN_LOG_FUNC(TURN_LOG_LEVEL_INFO, "Terminating on signal %d\n", sock);
turn_params.stop_turn_server = 1;
turn_params.stop_turn_server = true;
UNUSED_ARG(events);
UNUSED_ARG(args);
}
static void drain_handler(evutil_socket_t sock, short events, void *args) {
TURN_LOG_FUNC(TURN_LOG_LEVEL_INFO, "Draining then terminating on signal %d\n", sock);
enable_drain_mode();
UNUSED_ARG(events);
UNUSED_ARG(args);

View File

@ -286,8 +286,9 @@ typedef struct _turn_params_ {
turn_server_addrs_list_t alternate_servers_list;
turn_server_addrs_list_t tls_alternate_servers_list;
/////////////// stop server ////////////////
int stop_turn_server;
/////////////// stop/drain server ////////////////
bool drain_turn_server;
bool stop_turn_server;
////////////// MISC PARAMS ////////////////
@ -377,6 +378,7 @@ void send_auth_message_to_auth_server(struct auth_message *am);
void init_listener(void);
void setup_server(void);
void run_listener_server(struct listener_server *ls);
void enable_drain_mode(void);
////////// BPS ////////////////

View File

@ -1605,6 +1605,11 @@ void run_listener_server(struct listener_server *ls) {
}
}
if (turn_params.drain_turn_server && global_allocation_count == 0) {
turn_params.stop_turn_server = true;
TURN_LOG_FUNC(TURN_LOG_LEVEL_INFO, "Drain complete, shutting down now...\n");
}
run_events(ls->event_base, ls->ioa_eng);
rollover_logfile();
@ -1899,4 +1904,18 @@ void setup_server(void) {
void init_listener(void) { memset(&turn_params.listener, 0, sizeof(struct listener_server)); }
void enable_drain_mode(void) {
// Tell each turn_server we are draining
for (size_t i = 0; i < get_real_general_relay_servers_number(); i++) {
if (general_relay_servers[i]) {
general_relay_servers[i]->server.is_draining = true;
}
}
for (size_t i = 0; i < get_real_udp_relay_servers_number(); i++) {
if (udp_relay_servers[i]) {
udp_relay_servers[i]->server.is_draining = true;
}
}
turn_params.drain_turn_server = true;
}
///////////////////////////////

View File

@ -137,6 +137,9 @@ static const char *CLI_HELP_STR[] = {"",
"",
" quit, q, exit, bye - end CLI session",
"",
" drain - drain TURN Server, then shutdown",
" (wait for all allocations to go away, reject new)",
"",
" stop, shutdown, halt - shutdown TURN Server",
"",
" pc - print configuration",
@ -980,12 +983,18 @@ static int run_cli_input(struct cli_session *cs, const char *buf0, unsigned int
myprintf(cs, "%s\n", str);
close_cli_session(cs);
ret = -1;
} else if (strcmp(cmd, "drain") == 0) {
addr_debug_print(1, &(cs->addr), "Drain command received from CLI user");
const char *str = "TURN server is draining then shutting down";
myprintf(cs, "%s\n", str);
close_cli_session(cs);
enable_drain_mode();
} else if ((strcmp(cmd, "halt") == 0) || (strcmp(cmd, "shutdown") == 0) || (strcmp(cmd, "stop") == 0)) {
addr_debug_print(1, &(cs->addr), "Shutdown command received from CLI user");
const char *str = "TURN server is shutting down";
myprintf(cs, "%s\n", str);
close_cli_session(cs);
turn_params.stop_turn_server = 1;
turn_params.stop_turn_server = true;
sleep(10);
exit(0);
} else if ((strcmp(cmd, "?") == 0) || (strcmp(cmd, "h") == 0) || (strcmp(cmd, "help") == 0)) {

View File

@ -68,6 +68,13 @@ static TURN_MUTEX_DECLARE(o_to_realm_mutex);
static ur_string_map *o_to_realm = NULL;
static secrets_list_t realms_list;
#ifndef _MSC_VER
_Atomic
#else
volatile
#endif
size_t global_allocation_count = 0; // used for drain mode, to know when all allocations have gone away
static char userdb_type_unknown[] = "Unknown";
static char userdb_type_sqlite[] = "SQLite";
static char userdb_type_postgresql[] = "PostgreSQL";
@ -683,6 +690,15 @@ int check_new_allocation_quota(uint8_t *user, int oauth, uint8_t *realm) {
free(username);
ur_string_map_unlock(rp->status.alloc_counters);
}
#ifndef _MSC_VER
global_allocation_count++;
TURN_LOG_FUNC(TURN_LOG_LEVEL_DEBUG, "Global turn allocation count incremented, now %ld\n", global_allocation_count);
#else
size_t cur_count = (size_t)InterlockedIncrement((volatile LONG *)&global_allocation_count);
TURN_LOG_FUNC(TURN_LOG_LEVEL_DEBUG, "Global turn allocation count incremented, now %ld\n", cur_count);
#endif
return ret;
}
@ -709,6 +725,18 @@ void release_allocation_quota(uint8_t *user, int oauth, uint8_t *realm) {
ur_string_map_unlock(rp->status.alloc_counters);
free(username);
}
int log_level = TURN_LOG_LEVEL_DEBUG;
if (turn_params.drain_turn_server) {
log_level = TURN_LOG_LEVEL_INFO;
}
#ifndef _MSC_VER
global_allocation_count--;
TURN_LOG_FUNC(log_level, "Global turn allocation count decremented, now %ld\n", global_allocation_count);
#else
size_t cur_count = (size_t)InterlockedDecrement((volatile LONG *)&global_allocation_count);
TURN_LOG_FUNC(log_level, "Global turn allocation count decremented, now %ld\n", cur_count);
#endif
}
//////////////////////////////////

View File

@ -46,6 +46,14 @@
extern "C" {
#endif
#ifndef _MSC_VER
#include <stdatomic.h>
extern _Atomic
#else
extern volatile
#endif
size_t global_allocation_count;
//////////// REALM //////////////
struct _realm_status_t;

View File

@ -1288,6 +1288,13 @@ static int handle_turn_allocate(turn_turnserver *server, ts_ur_super_session *ss
}
}
if (server->is_draining) {
// Don't allow new allocations if we are draining
*err_code = 403; // 403 (Forbidden): RFC8656 - The request is valid, but the server is refusing to perform it,
// likely due to administrative restrictions....
*reason = (const uint8_t *)"Server is draining, then will shutdown, please try another server";
}
if (!(*err_code)) {
if (!af4 && !af6) {
switch (server->allocation_default_address_family) {
@ -5000,6 +5007,8 @@ void init_turn_server(turn_turnserver *server, turnserver_id id, int verbose, io
server->response_origin_only_with_rfc5780 = response_origin_only_with_rfc5780;
server->respond_http_unsupported = respond_http_unsupported;
server->is_draining = false;
}
ioa_engine_handle turn_server_get_engine(turn_turnserver *s) {

View File

@ -203,6 +203,9 @@ struct _turn_turnserver {
/* Return an HTTP 400 response to HTTP connections made to ports not
otherwise handling HTTP. */
vintp respond_http_unsupported;
/* Set to true on SIGUSR1 */
bool is_draining;
};
const char *get_version(turn_turnserver *server);