diff --git a/src/switch_pgsql.c b/src/switch_pgsql.c index 5a0ca21809..4264f86e91 100644 --- a/src/switch_pgsql.c +++ b/src/switch_pgsql.c @@ -43,7 +43,7 @@ struct switch_pgsql_handle { char *dsn; - const char *sql; + char *sql; PGconn* con; int sock; switch_pgsql_state_t state; @@ -96,250 +96,6 @@ SWITCH_DECLARE(switch_pgsql_handle_t *) switch_pgsql_handle_new(const char *dsn) return NULL; } -SWITCH_DECLARE(void) switch_pgsql_set_num_retries(switch_pgsql_handle_t *handle, int num_retries) -{ -#ifdef SWITCH_HAVE_PGSQL - if (handle) { - handle->num_retries = num_retries; - } -#endif -} - -SWITCH_DECLARE(switch_pgsql_status_t) switch_pgsql_handle_disconnect(switch_pgsql_handle_t *handle) -{ -#ifdef SWITCH_HAVE_PGSQL - - if (!handle) { - return SWITCH_PGSQL_FAIL; - } - - if (handle->state == SWITCH_PGSQL_STATE_CONNECTED) { - PQfinish(handle->con); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Disconnected from [%s]\n", handle->dsn); - } - - handle->state = SWITCH_PGSQL_STATE_DOWN; - - return SWITCH_PGSQL_SUCCESS; -#else - return SWITCH_PGSQL_FAIL; -#endif -} - -SWITCH_DECLARE(switch_pgsql_status_t) switch_pgsql_send_query(switch_pgsql_handle_t *handle, const char* sql) -{ -#ifdef SWITCH_HAVE_PGSQL - char *err_str; - - if (!PQsendQuery(handle->con, sql)) { - err_str = switch_pgsql_handle_get_error(handle); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failed to send query (%s) to database: %s\n", sql, err_str); - switch_pgsql_finish_results(handle); - goto error; - } - handle->sql = sql; - - return SWITCH_PGSQL_SUCCESS; - error: -#endif - return SWITCH_PGSQL_FAIL; -} - -SWITCH_DECLARE(switch_pgsql_status_t) switch_pgsql_cancel_real(const char *file, const char *func, int line, switch_pgsql_handle_t *handle) -{ - switch_pgsql_status_t ret = SWITCH_PGSQL_SUCCESS; -#ifdef SWITCH_HAVE_PGSQL - char err_buf[256]; - PGcancel *cancel = NULL; - - memset(err_buf, 0, 256); - cancel = PQgetCancel(handle->con); - if(!PQcancel(cancel, err_buf, 256)) { - switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_CRIT, "Failed to cancel long-running query (%s): %s\n", handle->sql, err_buf); - ret = SWITCH_PGSQL_FAIL; - } - PQfreeCancel(cancel); - - /* Make sure the query is fully cancelled */ - while (PQgetResult(handle->con) != NULL); - -#endif - - return ret; -} - - -SWITCH_DECLARE(switch_pgsql_status_t) switch_pgsql_next_result_timed(switch_pgsql_handle_t *handle, switch_pgsql_result_t **result_out, int msec) -{ -#ifdef SWITCH_HAVE_PGSQL - switch_pgsql_result_t *res; - switch_time_t start; - switch_time_t ctime; - unsigned int usec = msec * 1000; - char *err_str; - struct pollfd fds[2] = { {0} }; - int poll_res = 0; - - if(!handle) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "**BUG** Null handle passed to switch_pgsql_next_result.\n"); - return SWITCH_PGSQL_FAIL; - } - - /* Try to consume input that might be waiting right away */ - if (PQconsumeInput(handle->con)) { - /* And check to see if we have a full result ready for reading */ - if (PQisBusy(handle->con)) { - - /* Wait for a result to become available, up to msec milliseconds */ - start = switch_time_now(); - while((ctime = switch_micro_time_now()) - start <= usec) { - int wait_time = (usec - (ctime - start)) / 1000; - fds[0].fd = handle->sock; - fds[0].events |= POLLIN; - fds[0].events |= POLLERR; - - /* Wait for the PostgreSQL socket to be ready for data reads. */ - if ((poll_res = poll(&fds[0], 1, wait_time)) > -1 ) { - if (fds[0].revents & POLLIN) { - /* Then try to consume any input waiting. */ - if (PQconsumeInput(handle->con)) { - /* And check to see if we have a full result ready for reading */ - if (!PQisBusy(handle->con)) { - /* If we can pull a full result without blocking, then break this loop */ - break; - } - } else { - /* If we had an error trying to consume input, report it and cancel the query. */ - err_str = switch_pgsql_handle_get_error(handle); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "An error occurred trying to consume input for query (%s): %s\n", handle->sql, err_str); - switch_safe_free(err_str); - switch_pgsql_cancel(handle); - goto error; - } - } else if (fds[0].revents & POLLERR) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Poll error trying to read PGSQL socket for query (%s)\n", handle->sql); - goto error; - } - } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Poll failed trying to read PGSQL socket for query (%s)\n", handle->sql); - goto error; - } - } - - /* If we broke the loop above because of a timeout, report that and cancel the query. */ - if (ctime - start > usec) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Query (%s) took too long to complete or database not responding.\n", handle->sql); - switch_pgsql_cancel(handle); - goto error; - } - - - - } - } else { - /* If we had an error trying to consume input, report it and cancel the query. */ - err_str = switch_pgsql_handle_get_error(handle); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "An error occurred trying to consume input for query (%s): %s\n", handle->sql, err_str); - switch_safe_free(err_str); - /* switch_pgsql_cancel(handle); */ - goto error; - } - - - /* At this point, we know we can read a full result without blocking. */ - if(!(res = malloc(sizeof(switch_pgsql_result_t)))) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Malloc failed!\n"); - goto error; - } - memset(res, 0, sizeof(switch_pgsql_result_t)); - - - res->result = PQgetResult(handle->con); - if (res->result) { - *result_out = res; - res->status = PQresultStatus(res->result); - switch(res->status) { -#if POSTGRESQL_MAJOR_VERSION >= 9 && POSTGRESQL_MINOR_VERSION >= 2 - case PGRES_SINGLE_TUPLE: - /* Added in PostgreSQL 9.2 */ -#endif - case PGRES_TUPLES_OK: - { - res->rows = PQntuples(res->result); - handle->affected_rows = res->rows; - res->cols = PQnfields(res->result); - } - break; -#if POSTGRESQL_MAJOR_VERSION >= 9 && POSTGRESQL_MINOR_VERSION >= 1 - case PGRES_COPY_BOTH: - /* Added in PostgreSQL 9.1 */ -#endif - case PGRES_COPY_OUT: - case PGRES_COPY_IN: - case PGRES_COMMAND_OK: - break; - case PGRES_EMPTY_QUERY: - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Query (%s) returned PGRES_EMPTY_QUERY\n", handle->sql); - case PGRES_BAD_RESPONSE: - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Query (%s) returned PGRES_BAD_RESPONSE\n", handle->sql); - case PGRES_NONFATAL_ERROR: - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Query (%s) returned PGRES_NONFATAL_ERROR\n", handle->sql); - case PGRES_FATAL_ERROR: - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Query (%s) returned PGRES_FATAL_ERROR\n", handle->sql); - res->err = PQresultErrorMessage(res->result); - goto error; - break; - } - } else { - free(res); - res = NULL; - *result_out = NULL; - } - - return SWITCH_PGSQL_SUCCESS; - error: -#endif - return SWITCH_PGSQL_FAIL; -} - -SWITCH_DECLARE(void) switch_pgsql_free_result(switch_pgsql_result_t **result) -{ -#ifdef SWITCH_HAVE_PGSQL - - if (!*result) { - return; - } - - if ((*result)->result) { - PQclear((*result)->result); - } - free(*result); - *result = NULL; -#else - return; -#endif -} - -SWITCH_DECLARE(switch_pgsql_status_t) switch_pgsql_finish_results_real(const char* file, const char* func, int line, switch_pgsql_handle_t *handle) -{ -#ifdef SWITCH_HAVE_PGSQL - switch_pgsql_result_t *res = NULL; - switch_pgsql_status_t final_status = SWITCH_PGSQL_SUCCESS; - int done = 0; - do { - switch_pgsql_next_result(handle, &res); - if (res && res->err) { - switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_ERROR, "Error executing query:\n%s\n", res->err); - final_status = SWITCH_PGSQL_FAIL; - } - if (!res) done = 1; - switch_pgsql_free_result(&res); - } while (!done); - return final_status; -#else - return SWITCH_PGSQL_FAIL; -#endif -} #ifdef SWITCH_HAVE_PGSQL static int db_is_up(switch_pgsql_handle_t *handle) @@ -367,8 +123,8 @@ static int db_is_up(switch_pgsql_handle_t *handle) goto done; } - /* Try a non-blocking read on the connection to gobble up any EOF from a closed connection and mark the connection BAD if it is closed. */ - PQconsumeInput(handle->con); + /* Try a non-blocking read on the connection to gobble up any EOF from a closed connection and mark the connection BAD if it is closed. */ + PQconsumeInput(handle->con); if (PQstatus(handle->con) == CONNECTION_BAD) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "PQstatus returned bad connection; reconnecting...\n"); @@ -454,6 +210,273 @@ static int db_is_up(switch_pgsql_handle_t *handle) #endif +SWITCH_DECLARE(void) switch_pgsql_set_num_retries(switch_pgsql_handle_t *handle, int num_retries) +{ +#ifdef SWITCH_HAVE_PGSQL + if (handle) { + handle->num_retries = num_retries; + } +#endif +} + +SWITCH_DECLARE(switch_pgsql_status_t) switch_pgsql_handle_disconnect(switch_pgsql_handle_t *handle) +{ +#ifdef SWITCH_HAVE_PGSQL + + if (!handle) { + return SWITCH_PGSQL_FAIL; + } + + if (handle->state == SWITCH_PGSQL_STATE_CONNECTED) { + PQfinish(handle->con); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Disconnected from [%s]\n", handle->dsn); + } + switch_safe_free(handle->sql); + handle->state = SWITCH_PGSQL_STATE_DOWN; + + return SWITCH_PGSQL_SUCCESS; +#else + return SWITCH_PGSQL_FAIL; +#endif +} + +SWITCH_DECLARE(switch_pgsql_status_t) switch_pgsql_send_query(switch_pgsql_handle_t *handle, const char* sql) +{ +#ifdef SWITCH_HAVE_PGSQL + char *err_str; + + switch_safe_free(handle->sql); + handle->sql = strdup(sql); + if (!PQsendQuery(handle->con, sql)) { + err_str = switch_pgsql_handle_get_error(handle); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failed to send query (%s) to database: %s\n", sql, err_str); + switch_pgsql_finish_results(handle); + goto error; + } + + return SWITCH_PGSQL_SUCCESS; + error: +#endif + return SWITCH_PGSQL_FAIL; +} + +SWITCH_DECLARE(switch_pgsql_status_t) switch_pgsql_cancel_real(const char *file, const char *func, int line, switch_pgsql_handle_t *handle) +{ + switch_pgsql_status_t ret = SWITCH_PGSQL_SUCCESS; +#ifdef SWITCH_HAVE_PGSQL + char err_buf[256]; + PGcancel *cancel = NULL; + + memset(err_buf, 0, 256); + cancel = PQgetCancel(handle->con); + if(!PQcancel(cancel, err_buf, 256)) { + switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_CRIT, "Failed to cancel long-running query (%s): %s\n", handle->sql, err_buf); + ret = SWITCH_PGSQL_FAIL; + } + PQfreeCancel(cancel); + { + PGresult *tmp = NULL; + /* Make sure the query is fully cancelled */ + while ((tmp = PQgetResult(handle->con)) != NULL) PQclear(tmp); + } +#endif + return ret; +} + + +SWITCH_DECLARE(switch_pgsql_status_t) switch_pgsql_next_result_timed(switch_pgsql_handle_t *handle, switch_pgsql_result_t **result_out, int msec) +{ +#ifdef SWITCH_HAVE_PGSQL + switch_pgsql_result_t *res; + switch_time_t start; + switch_time_t ctime; + unsigned int usec = msec * 1000; + char *err_str; + struct pollfd fds[2] = { {0} }; + int poll_res = 0; + + if(!handle) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "**BUG** Null handle passed to switch_pgsql_next_result.\n"); + return SWITCH_PGSQL_FAIL; + } + + /* Try to consume input that might be waiting right away */ + if (PQconsumeInput(handle->con)) { + /* And check to see if we have a full result ready for reading */ + if (PQisBusy(handle->con)) { + + /* Wait for a result to become available, up to msec milliseconds */ + start = switch_time_now(); + while((ctime = switch_micro_time_now()) - start <= usec) { + int wait_time = (usec - (ctime - start)) / 1000; + fds[0].fd = handle->sock; + fds[0].events |= POLLIN; + fds[0].events |= POLLERR; + fds[0].events |= POLLNVAL; + fds[0].events |= POLLHUP; + fds[0].events |= POLLPRI; + fds[0].events |= POLLRDNORM; + fds[0].events |= POLLRDBAND; + + /* Wait for the PostgreSQL socket to be ready for data reads. */ + if ((poll_res = poll(&fds[0], 1, wait_time)) > 0 ) { + if (fds[0].revents & POLLHUP || fds[0].revents & POLLNVAL) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "PGSQL socket closed or invalid while waiting for result for query (%s)\n", handle->sql); + goto error; + } else if (fds[0].revents & POLLERR) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Poll error trying to read PGSQL socket for query (%s)\n", handle->sql); + goto error; + } else if (fds[0].revents & POLLIN || fds[0].revents & POLLPRI || fds[0].revents & POLLRDNORM || fds[0].revents & POLLRDBAND) { + /* Then try to consume any input waiting. */ + if (PQconsumeInput(handle->con)) { + if (PQstatus(handle->con) == CONNECTION_BAD) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Connection terminated while waiting for result.\n"); + handle->state = SWITCH_PGSQL_STATE_ERROR; + goto error; + } + + /* And check to see if we have a full result ready for reading */ + if (!PQisBusy(handle->con)) { + /* If we can pull a full result without blocking, then break this loop */ + break; + } + } else { + /* If we had an error trying to consume input, report it and cancel the query. */ + err_str = switch_pgsql_handle_get_error(handle); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "An error occurred trying to consume input for query (%s): %s\n", handle->sql, err_str); + switch_safe_free(err_str); + switch_pgsql_cancel(handle); + goto error; + } + } + } else if (poll_res == -1) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Poll failed trying to read PGSQL socket for query (%s)\n", handle->sql); + goto error; + } + } + + /* If we broke the loop above because of a timeout, report that and cancel the query. */ + if (ctime - start > usec) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Query (%s) took too long to complete or database not responding.\n", handle->sql); + switch_pgsql_cancel(handle); + goto error; + } + + } + } else { + /* If we had an error trying to consume input, report it and cancel the query. */ + err_str = switch_pgsql_handle_get_error(handle); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "An error occurred trying to consume input for query (%s): %s\n", handle->sql, err_str); + switch_safe_free(err_str); + /* switch_pgsql_cancel(handle); */ + goto error; + } + + + /* At this point, we know we can read a full result without blocking. */ + if(!(res = malloc(sizeof(switch_pgsql_result_t)))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Malloc failed!\n"); + goto error; + } + memset(res, 0, sizeof(switch_pgsql_result_t)); + + + res->result = PQgetResult(handle->con); + if (res->result) { + *result_out = res; + res->status = PQresultStatus(res->result); + switch(res->status) { +#if POSTGRESQL_MAJOR_VERSION >= 9 && POSTGRESQL_MINOR_VERSION >= 2 + case PGRES_SINGLE_TUPLE: + /* Added in PostgreSQL 9.2 */ +#endif + case PGRES_TUPLES_OK: + { + res->rows = PQntuples(res->result); + handle->affected_rows = res->rows; + res->cols = PQnfields(res->result); + } + break; +#if POSTGRESQL_MAJOR_VERSION >= 9 && POSTGRESQL_MINOR_VERSION >= 1 + case PGRES_COPY_BOTH: + /* Added in PostgreSQL 9.1 */ +#endif + case PGRES_COPY_OUT: + case PGRES_COPY_IN: + case PGRES_COMMAND_OK: + break; + case PGRES_EMPTY_QUERY: + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Query (%s) returned PGRES_EMPTY_QUERY\n", handle->sql); + case PGRES_BAD_RESPONSE: + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Query (%s) returned PGRES_BAD_RESPONSE\n", handle->sql); + case PGRES_NONFATAL_ERROR: + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Query (%s) returned PGRES_NONFATAL_ERROR\n", handle->sql); + case PGRES_FATAL_ERROR: + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Query (%s) returned PGRES_FATAL_ERROR\n", handle->sql); + res->err = PQresultErrorMessage(res->result); + goto error; + break; + } + } else { + free(res); + res = NULL; + *result_out = NULL; + } + + return SWITCH_PGSQL_SUCCESS; + error: + { + PGresult *tmp = NULL; + /* Make sure the failed connection does not have any transactions marked as in progress */ + while ((tmp = PQgetResult(handle->con)) != NULL) PQclear(tmp); + + /* Try to reconnect to the DB if we were dropped */ + db_is_up(handle); + } +#endif + return SWITCH_PGSQL_FAIL; +} + +SWITCH_DECLARE(void) switch_pgsql_free_result(switch_pgsql_result_t **result) +{ +#ifdef SWITCH_HAVE_PGSQL + + if (!*result) { + return; + } + + if ((*result)->result) { + PQclear((*result)->result); + } + free(*result); + *result = NULL; +#else + return; +#endif +} + +SWITCH_DECLARE(switch_pgsql_status_t) switch_pgsql_finish_results_real(const char* file, const char* func, int line, switch_pgsql_handle_t *handle) +{ +#ifdef SWITCH_HAVE_PGSQL + switch_pgsql_result_t *res = NULL; + switch_pgsql_status_t final_status = SWITCH_PGSQL_SUCCESS; + int done = 0; + do { + switch_pgsql_next_result(handle, &res); + if (res && res->err) { + switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_ERROR, "Error executing query:\n%s\n", res->err); + final_status = SWITCH_PGSQL_FAIL; + } + if (!res) done = 1; + switch_pgsql_free_result(&res); + } while (!done); + return final_status; +#else + return SWITCH_PGSQL_FAIL; +#endif +} + + SWITCH_DECLARE(switch_pgsql_status_t) switch_pgsql_handle_connect(switch_pgsql_handle_t *handle) { #ifdef SWITCH_HAVE_PGSQL @@ -557,11 +580,14 @@ SWITCH_DECLARE(switch_pgsql_status_t) switch_pgsql_handle_exec_base_detailed(con if (handle->auto_commit == SWITCH_FALSE && handle->in_txn == SWITCH_FALSE) { if (switch_pgsql_send_query(handle, "BEGIN") != SWITCH_PGSQL_SUCCESS) { er = strdup("Error sending BEGIN!"); - switch_pgsql_finish_results(handle); + if (switch_pgsql_finish_results(handle) != SWITCH_PGSQL_SUCCESS) { + db_is_up(handle); /* If finish_results failed, maybe the db went dead */ + } goto error; } if (switch_pgsql_finish_results(handle) != SWITCH_PGSQL_SUCCESS) { + db_is_up(handle); er = strdup("Error sending BEGIN!"); goto error; } @@ -570,7 +596,9 @@ SWITCH_DECLARE(switch_pgsql_status_t) switch_pgsql_handle_exec_base_detailed(con if (switch_pgsql_send_query(handle, sql) != SWITCH_PGSQL_SUCCESS) { er = strdup("Error sending query!"); - switch_pgsql_finish_results(handle); + if (switch_pgsql_finish_results(handle) != SWITCH_PGSQL_SUCCESS) { + db_is_up(handle); + } goto error; }