From f1d5120f63d7a3b36ba5940c1631a431795c819c Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Wed, 7 Dec 2011 11:27:50 -0600 Subject: [PATCH] FS-3735 --- src/mod/endpoints/mod_rtmp/mod_rtmp.c | 3 ++- src/mod/endpoints/mod_rtmp/mod_rtmp.h | 1 + src/mod/endpoints/mod_rtmp/rtmp.c | 16 +++++++++++++--- src/mod/endpoints/mod_rtmp/rtmp_tcp.c | 11 +++++++++-- 4 files changed, 25 insertions(+), 6 deletions(-) diff --git a/src/mod/endpoints/mod_rtmp/mod_rtmp.c b/src/mod/endpoints/mod_rtmp/mod_rtmp.c index ae3b3a6279..e00fcd0b6f 100644 --- a/src/mod/endpoints/mod_rtmp/mod_rtmp.c +++ b/src/mod/endpoints/mod_rtmp/mod_rtmp.c @@ -407,6 +407,7 @@ switch_status_t rtmp_read_frame(switch_core_session_t *session, switch_frame_t * return SWITCH_STATUS_SUCCESS; cng: + data = (switch_byte_t *) tech_pvt->read_frame.data; data[0] = 65; data[1] = 0; @@ -414,7 +415,7 @@ cng: tech_pvt->read_frame.flags = SFF_CNG; tech_pvt->read_frame.codec = &tech_pvt->read_codec; - switch_core_timer_sync(&tech_pvt->timer); + //switch_core_timer_sync(&tech_pvt->timer); *frame = &tech_pvt->read_frame; diff --git a/src/mod/endpoints/mod_rtmp/mod_rtmp.h b/src/mod/endpoints/mod_rtmp/mod_rtmp.h index aec0430b56..388afeacb5 100644 --- a/src/mod/endpoints/mod_rtmp/mod_rtmp.h +++ b/src/mod/endpoints/mod_rtmp/mod_rtmp.h @@ -520,6 +520,7 @@ struct rtmp_private { const char *auth; uint16_t maxlen; + int over_size; }; struct rtmp_reg; diff --git a/src/mod/endpoints/mod_rtmp/rtmp.c b/src/mod/endpoints/mod_rtmp/rtmp.c index 24900a408c..b2e5481466 100644 --- a/src/mod/endpoints/mod_rtmp/rtmp.c +++ b/src/mod/endpoints/mod_rtmp/rtmp.c @@ -105,7 +105,9 @@ void rtmp_handle_control(rtmp_session_t *rsession, int amfnumber) void rtmp_handle_invoke(rtmp_session_t *rsession, int amfnumber) { rtmp_state_t *state = &rsession->amfstate[amfnumber]; - //amf0_data *dump; +#ifdef RTMP_DEBUG_IO + amf0_data *dump; +#endif int i = 0; buffer_helper_t helper = { state->buf, 0, state->origlen }; int64_t transaction_id; @@ -885,10 +887,18 @@ switch_status_t rtmp_handle_data(rtmp_session_t *rsession) uint16_t len = state->origlen; switch_mutex_lock(rsession->tech_pvt->readbuf_mutex); - if (rsession->tech_pvt->maxlen && switch_buffer_inuse(rsession->tech_pvt->readbuf) > rsession->tech_pvt->maxlen * 3) { + if (rsession->tech_pvt->maxlen && switch_buffer_inuse(rsession->tech_pvt->readbuf) > rsession->tech_pvt->maxlen * 5) { + rsession->tech_pvt->over_size++; + } else { + rsession->tech_pvt->over_size = 0; + } + if (rsession->tech_pvt->over_size > 10) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, + "%s buffer > %u for 10 consecutive packets... Flushing buffer\n", + switch_core_session_get_name(rsession->tech_pvt->session), rsession->tech_pvt->maxlen * 5); switch_buffer_zero(rsession->tech_pvt->readbuf); #ifdef RTMP_DEBUG_IO - fprintf(rsession->io_debug_in, "[chunk_stream=%d type=0x%x ts=%d stream_id=0x%x] FLUSH BUFFER [exceeded %u]\n", rsession->amfnumber, state->type, (int)state->ts, state->stream_id, rsession->tech_pvt->maxlen * 3); + fprintf(rsession->io_debug_in, "[chunk_stream=%d type=0x%x ts=%d stream_id=0x%x] FLUSH BUFFER [exceeded %u]\n", rsession->amfnumber, state->type, (int)state->ts, state->stream_id, rsession->tech_pvt->maxlen * 5); #endif } switch_buffer_write(rsession->tech_pvt->readbuf, &len, 2); diff --git a/src/mod/endpoints/mod_rtmp/rtmp_tcp.c b/src/mod/endpoints/mod_rtmp/rtmp_tcp.c index 62bb709b01..239a358dcf 100644 --- a/src/mod/endpoints/mod_rtmp/rtmp_tcp.c +++ b/src/mod/endpoints/mod_rtmp/rtmp_tcp.c @@ -197,10 +197,10 @@ void *SWITCH_THREAD_FUNC rtmp_io_tcp_thread(switch_thread_t *thread, void *obj) switch_mutex_unlock(io->mutex); if (status != SWITCH_STATUS_SUCCESS && status != SWITCH_STATUS_TIMEOUT) { - //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "pollset_poll failed\n"); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "pollset_poll failed\n"); continue; } else if (status == SWITCH_STATUS_TIMEOUT) { - switch_yield(1); + switch_cond_next(); } for (i = 0; i < numfds; i++) { @@ -219,6 +219,10 @@ void *SWITCH_THREAD_FUNC rtmp_io_tcp_thread(switch_thread_t *thread, void *obj) if (switch_socket_opt_set(newsocket, SWITCH_SO_NONBLOCK, TRUE)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't set socket as non-blocking\n"); } + + if (switch_socket_opt_set(newsocket, SWITCH_SO_TCP_NODELAY, 1)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't disable Nagle.\n"); + } if (rtmp_session_request(io->base.profile, &newsession) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "RTMP session request failed\n"); @@ -314,6 +318,9 @@ switch_status_t rtmp_tcp_init(rtmp_profile_t *profile, const char *bindaddr, rtm if (switch_socket_opt_set(io_tcp->listen_socket, SWITCH_SO_REUSEADDR, 1)) { goto fail; } + if (switch_socket_opt_set(io_tcp->listen_socket, SWITCH_SO_TCP_NODELAY, 1)) { + goto fail; + } if (switch_socket_bind(io_tcp->listen_socket, sa)) { goto fail; }