Implement internal abstraction for iostreams

fopencookie/funclose is a non-standard API and should not be used
in portable software. Additionally, the way FILE's fd is used in
non-blocking mode is undefined behaviour and cannot be relied on.

This introduces internal abstraction for io streams, that allows
implementing the desired virtualization of read/write operations
with necessary timeout handling.

ASTERISK-24515 #close
ASTERISK-24517 #close

Change-Id: Id916aef418b665ced6a7489aef74908b6e376e85
This commit is contained in:
Timo Teräs
2016-06-02 22:10:06 +03:00
parent 0cc14597b2
commit 070a51bf7c
13 changed files with 1002 additions and 1103 deletions

View File

@@ -1549,8 +1549,7 @@ static void acl_change_stasis_unsubscribe(void)
struct mansession_session {
/*! \todo XXX need to document which fields it is protecting */
struct ast_sockaddr addr; /*!< address we are connecting from */
FILE *f; /*!< fdopen() on the underlying fd */
int fd; /*!< descriptor used for output. Either the socket (AMI) or a temporary file (HTTP) */
struct ast_iostream *stream; /*!< AMI stream */
int inuse; /*!< number of HTTP sessions using this entry */
int needdestroy; /*!< Whether an HTTP session should be destroyed */
pthread_t waiting_thread; /*!< Sleeping thread using this descriptor */
@@ -1592,9 +1591,8 @@ enum mansession_message_parsing {
*/
struct mansession {
struct mansession_session *session;
struct ast_iostream *stream;
struct ast_tcptls_session_instance *tcptls_session;
FILE *f;
int fd;
enum mansession_message_parsing parsing;
int write_error:1;
struct manager_custom_hook *hook;
@@ -2166,10 +2164,6 @@ static void session_destructor(void *obj)
ast_datastore_free(datastore);
}
if (session->f != NULL) {
fflush(session->f);
fclose(session->f);
}
if (eqe) {
ast_atomic_fetchadd_int(&eqe->usecount, -1);
}
@@ -2204,7 +2198,6 @@ static struct mansession_session *build_mansession(const struct ast_sockaddr *ad
return NULL;
}
newsession->fd = -1;
newsession->waiting_thread = AST_PTHREADT_NULL;
newsession->writetimeout = 100;
newsession->send_events = -1;
@@ -2617,7 +2610,7 @@ static char *handle_showmanconn(struct ast_cli_entry *e, int cmd, struct ast_cli
ast_sockaddr_stringify_addr(&session->addr),
(int) (session->sessionstart),
(int) (now - session->sessionstart),
session->fd,
session->stream ? ast_iostream_get_fd(session->stream) : -1,
session->inuse,
session->readperm,
session->writeperm);
@@ -2889,7 +2882,6 @@ int ast_hook_send_action(struct manager_custom_hook *hook, const char *msg)
* This is necessary to meet the previous design of manager.c
*/
s.hook = hook;
s.f = (void*)1; /* set this to something so our request will make it through all functions that test it*/
ao2_lock(act_found);
if (act_found->registered && act_found->func) {
@@ -2920,9 +2912,8 @@ int ast_hook_send_action(struct manager_custom_hook *hook, const char *msg)
*/
static int send_string(struct mansession *s, char *string)
{
int res;
FILE *f = s->f ? s->f : s->session->f;
int fd = s->f ? s->fd : s->session->fd;
struct ast_iostream *stream = s->stream ? s->stream : s->session->stream;
int len, res;
/* It's a result from one of the hook's action invocation */
if (s->hook) {
@@ -2934,7 +2925,12 @@ static int send_string(struct mansession *s, char *string)
return 0;
}
if ((res = ast_careful_fwrite(f, fd, string, strlen(string), s->session->writetimeout))) {
len = strlen(string);
ast_iostream_set_timeout_inactivity(stream, s->session->writetimeout);
res = ast_iostream_write(stream, string, len);
ast_iostream_set_timeout_disable(stream);
if (res < len) {
s->write_error = 1;
}
@@ -2975,10 +2971,10 @@ void astman_append(struct mansession *s, const char *fmt, ...)
return;
}
if (s->f != NULL || s->session->f != NULL) {
if (s->tcptls_session != NULL && s->tcptls_session->stream != NULL) {
send_string(s, ast_str_buffer(buf));
} else {
ast_verbose("fd == -1 in astman_append, should not happen\n");
ast_verbose("No connection stream in astman_append, should not happen\n");
}
}
@@ -4119,7 +4115,7 @@ static int action_waitevent(struct mansession *s, const struct message *m)
break;
}
if (s->session->managerid == 0) { /* AMI session */
if (ast_wait_for_input(s->session->fd, 1000)) {
if (ast_wait_for_input(ast_iostream_get_fd(s->session->stream), 1000)) {
break;
}
} else { /* HTTP session */
@@ -5924,7 +5920,7 @@ static int process_events(struct mansession *s)
int ret = 0;
ao2_lock(s->session);
if (s->session->f != NULL) {
if (s->session->stream != NULL) {
struct eventqent *eqe = s->session->last_ev;
while ((eqe = advance_event(eqe))) {
@@ -6466,7 +6462,7 @@ static int get_input(struct mansession *s, char *output)
s->session->waiting_thread = pthread_self();
ao2_unlock(s->session);
res = ast_wait_for_input(s->session->fd, timeout);
res = ast_wait_for_input(ast_iostream_get_fd(s->session->stream), timeout);
ao2_lock(s->session);
s->session->waiting_thread = AST_PTHREADT_NULL;
@@ -6484,7 +6480,7 @@ static int get_input(struct mansession *s, char *output)
}
ao2_lock(s->session);
res = fread(src + s->session->inlen, 1, maxlen - s->session->inlen, s->session->f);
res = ast_iostream_read(s->session->stream, src + s->session->inlen, maxlen - s->session->inlen);
if (res < 1) {
res = -1; /* error return */
} else {
@@ -6617,13 +6613,12 @@ static void *session_do(void *data)
struct mansession s = {
.tcptls_session = data,
};
int flags;
int res;
int arg = 1;
struct ast_sockaddr ser_remote_address_tmp;
struct protoent *p;
if (ast_atomic_fetchadd_int(&unauth_sessions, +1) >= authlimit) {
fclose(ser->f);
ast_iostream_close(ser->stream);
ast_atomic_fetchadd_int(&unauth_sessions, -1);
goto done;
}
@@ -6632,7 +6627,7 @@ static void *session_do(void *data)
session = build_mansession(&ser_remote_address_tmp);
if (session == NULL) {
fclose(ser->f);
ast_iostream_close(ser->stream);
ast_atomic_fetchadd_int(&unauth_sessions, -1);
goto done;
}
@@ -6640,20 +6635,10 @@ static void *session_do(void *data)
/* here we set TCP_NODELAY on the socket to disable Nagle's algorithm.
* This is necessary to prevent delays (caused by buffering) as we
* write to the socket in bits and pieces. */
p = getprotobyname("tcp");
if (p) {
int arg = 1;
if( setsockopt(ser->fd, p->p_proto, TCP_NODELAY, (char *)&arg, sizeof(arg) ) < 0 ) {
ast_log(LOG_WARNING, "Failed to set manager tcp connection to TCP_NODELAY mode: %s\nSome manager actions may be slow to respond.\n", strerror(errno));
}
} else {
ast_log(LOG_WARNING, "Failed to set manager tcp connection to TCP_NODELAY, getprotobyname(\"tcp\") failed\nSome manager actions may be slow to respond.\n");
if (setsockopt(ast_iostream_get_fd(ser->stream), IPPROTO_TCP, TCP_NODELAY, (char *)&arg, sizeof(arg) ) < 0) {
ast_log(LOG_WARNING, "Failed to set manager tcp connection to TCP_NODELAY mode: %s\nSome manager actions may be slow to respond.\n", strerror(errno));
}
/* make sure socket is non-blocking */
flags = fcntl(ser->fd, F_GETFL);
flags |= O_NONBLOCK;
fcntl(ser->fd, F_SETFL, flags);
ast_iostream_nonblock(ser->stream);
ao2_lock(session);
/* Hook to the tail of the event queue */
@@ -6662,8 +6647,7 @@ static void *session_do(void *data)
ast_mutex_init(&s.lock);
/* these fields duplicate those in the 'ser' structure */
session->fd = s.fd = ser->fd;
session->f = s.f = ser->f;
session->stream = s.stream = ser->stream;
ast_sockaddr_copy(&session->addr, &ser_remote_address_tmp);
s.session = session;
@@ -6682,9 +6666,9 @@ static void *session_do(void *data)
* We cannot let the stream exclusively wait for data to arrive.
* We have to wake up the task to send async events.
*/
ast_tcptls_stream_set_exclusive_input(ser->stream_cookie, 0);
ast_iostream_set_exclusive_input(ser->stream, 0);
ast_tcptls_stream_set_timeout_sequence(ser->stream_cookie,
ast_iostream_set_timeout_sequence(ser->stream,
ast_tvnow(), authtimeout * 1000);
astman_append(&s, "Asterisk Call Manager/%s\r\n", AMI_VERSION); /* welcome prompt */
@@ -6693,7 +6677,7 @@ static void *session_do(void *data)
break;
}
if (session->authenticated) {
ast_tcptls_stream_set_timeout_disable(ser->stream_cookie);
ast_iostream_set_timeout_disable(ser->stream);
}
}
/* session is over, explain why and terminate */
@@ -7552,23 +7536,9 @@ static void xml_translate(struct ast_str **out, char *in, struct ast_variable *g
static void close_mansession_file(struct mansession *s)
{
if (s->f) {
if (fclose(s->f)) {
ast_log(LOG_ERROR, "fclose() failed: %s\n", strerror(errno));
}
s->f = NULL;
s->fd = -1;
} else if (s->fd != -1) {
/*
* Issuing shutdown() is necessary here to avoid a race
* condition where the last data written may not appear
* in the TCP stream. See ASTERISK-23548
*/
shutdown(s->fd, SHUT_RDWR);
if (close(s->fd)) {
ast_log(LOG_ERROR, "close() failed: %s\n", strerror(errno));
}
s->fd = -1;
if (s->stream) {
ast_iostream_close(s->stream);
s->stream = NULL;
} else {
ast_log(LOG_ERROR, "Attempted to close file/file descriptor on mansession without a valid file or file descriptor.\n");
}
@@ -7577,17 +7547,20 @@ static void close_mansession_file(struct mansession *s)
static void process_output(struct mansession *s, struct ast_str **out, struct ast_variable *params, enum output_format format)
{
char *buf;
size_t l;
off_t l;
int fd;
if (!s->f)
if (!s->stream)
return;
/* Ensure buffer is NULL-terminated */
fprintf(s->f, "%c", 0);
fflush(s->f);
ast_iostream_write(s->stream, "", 1);
if ((l = ftell(s->f)) > 0) {
if (MAP_FAILED == (buf = mmap(NULL, l, PROT_READ | PROT_WRITE, MAP_PRIVATE, s->fd, 0))) {
fd = ast_iostream_get_fd(s->stream);
l = lseek(fd, SEEK_CUR, 0);
if (l > 0) {
if (MAP_FAILED == (buf = mmap(NULL, l, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0))) {
ast_log(LOG_WARNING, "mmap failed. Manager output was not processed\n");
} else {
if (format == FORMAT_XML || format == FORMAT_HTML) {
@@ -7614,6 +7587,7 @@ static int generic_http_callback(struct ast_tcptls_session_instance *ser,
struct mansession s = { .session = NULL, .tcptls_session = ser };
struct mansession_session *session = NULL;
uint32_t ident;
int fd;
int blastaway = 0;
struct ast_variable *v;
struct ast_variable *params = get_params;
@@ -7669,17 +7643,17 @@ static int generic_http_callback(struct ast_tcptls_session_instance *ser,
}
s.session = session;
s.fd = mkstemp(template); /* create a temporary file for command output */
fd = mkstemp(template); /* create a temporary file for command output */
unlink(template);
if (s.fd <= -1) {
if (fd <= -1) {
ast_http_error(ser, 500, "Server Error", "Internal Server Error (mkstemp failed)");
goto generic_callback_out;
}
s.f = fdopen(s.fd, "w+");
if (!s.f) {
s.stream = ast_iostream_from_fd(&fd);
if (!s.stream) {
ast_log(LOG_WARNING, "HTTP Manager, fdopen failed: %s!\n", strerror(errno));
ast_http_error(ser, 500, "Server Error", "Internal Server Error (fdopen failed)");
close(s.fd);
close(fd);
goto generic_callback_out;
}
@@ -7819,9 +7793,9 @@ generic_callback_out:
if (blastaway) {
session_destroy(session);
} else {
if (session->f) {
fclose(session->f);
session->f = NULL;
if (session->stream) {
ast_iostream_close(session->stream);
session->stream = NULL;
}
unref_mansession(session);
}
@@ -7846,6 +7820,7 @@ static int auth_http_callback(struct ast_tcptls_session_instance *ser,
struct message m = { 0 };
unsigned int idx;
size_t hdrlen;
int fd;
time_t time_now = time(NULL);
unsigned long nonce = 0, nc;
@@ -8024,17 +7999,17 @@ static int auth_http_callback(struct ast_tcptls_session_instance *ser,
ast_mutex_init(&s.lock);
s.session = session;
s.fd = mkstemp(template); /* create a temporary file for command output */
fd = mkstemp(template); /* create a temporary file for command output */
unlink(template);
if (s.fd <= -1) {
if (fd <= -1) {
ast_http_error(ser, 500, "Server Error", "Internal Server Error (mkstemp failed)");
goto auth_callback_out;
}
s.f = fdopen(s.fd, "w+");
if (!s.f) {
s.stream = ast_iostream_from_fd(&fd);
if (!s.stream) {
ast_log(LOG_WARNING, "HTTP Manager, fdopen failed: %s!\n", strerror(errno));
ast_http_error(ser, 500, "Server Error", "Internal Server Error (fdopen failed)");
close(s.fd);
close(fd);
goto auth_callback_out;
}
@@ -8085,7 +8060,7 @@ static int auth_http_callback(struct ast_tcptls_session_instance *ser,
m.headers[idx] = NULL;
}
result_size = ftell(s.f); /* Calculate approx. size of result */
result_size = lseek(ast_iostream_get_fd(s.stream), SEEK_CUR, 0); /* Calculate approx. size of result */
http_header = ast_str_create(80);
out = ast_str_create(result_size * 2 + 512);
@@ -8137,11 +8112,10 @@ auth_callback_out:
ast_free(out);
ao2_lock(session);
if (session->f) {
fclose(session->f);
if (session->stream) {
ast_iostream_close(session->stream);
session->stream = NULL;
}
session->f = NULL;
session->fd = -1;
ao2_unlock(session);
if (session->needdestroy) {