From 53f09acbecc6627592647fd53b9c5eb30001a387 Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Fri, 4 Sep 2009 18:34:52 +0000 Subject: [PATCH] add mutex to deal with small race git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@14769 d0543943-73ff-0310-b7d9-9358b9ac24b2 --- .../mod_conference/mod_conference.c | 150 ++++++++++-------- 1 file changed, 86 insertions(+), 64 deletions(-) diff --git a/src/mod/applications/mod_conference/mod_conference.c b/src/mod/applications/mod_conference/mod_conference.c index 942bdbdd95..b4fbdb20bc 100644 --- a/src/mod/applications/mod_conference/mod_conference.c +++ b/src/mod/applications/mod_conference/mod_conference.c @@ -313,9 +313,10 @@ struct conference_member { uint32_t score; uint32_t score_iir; switch_mutex_t *flag_mutex; - switch_mutex_t *control_mutex; + switch_mutex_t *write_mutex; switch_mutex_t *audio_in_mutex; switch_mutex_t *audio_out_mutex; + switch_mutex_t *read_mutex; switch_codec_implementation_t orig_read_impl; switch_codec_t read_codec; switch_codec_t write_codec; @@ -419,6 +420,10 @@ static switch_status_t conf_api_sub_undeaf(conference_member_t *member, switch_s static switch_status_t conference_add_event_data(conference_obj_t *conference, switch_event_t *event); static switch_status_t conference_add_event_member_data(conference_member_t *member, switch_event_t *event); + +#define lock_member(_member) switch_mutex_lock(_member->write_mutex); switch_mutex_lock(_member->read_mutex) +#define unlock_member(_member) switch_mutex_unlock(_member->read_mutex); switch_mutex_unlock(_member->write_mutex) + static switch_status_t conference_add_event_data(conference_obj_t *conference, switch_event_t *event) { switch_status_t status = SWITCH_STATUS_SUCCESS; @@ -478,8 +483,8 @@ static conference_relationship_t *member_get_relationship(conference_member_t *m if (member == NULL || other_member == NULL || member->relationships == NULL) return NULL; - switch_mutex_lock(member->control_mutex); - switch_mutex_lock(other_member->control_mutex); + lock_member(member); + lock_member(other_member); for (rel = member->relationships; rel; rel = rel->next) { if (rel->id == other_member->id) { @@ -492,8 +497,8 @@ static conference_relationship_t *member_get_relationship(conference_member_t *m } } - switch_mutex_unlock(other_member->control_mutex); - switch_mutex_unlock(member->control_mutex); + unlock_member(other_member); + unlock_member(member); return rel ? rel : global; } @@ -557,10 +562,10 @@ static conference_relationship_t *member_add_relationship(conference_member_t *m rel->id = id; - switch_mutex_lock(member->control_mutex); + lock_member(member); rel->next = member->relationships; member->relationships = rel; - switch_mutex_unlock(member->control_mutex); + unlock_member(member); return rel; } @@ -574,7 +579,7 @@ static switch_status_t member_del_relationship(conference_member_t *member, uint if (member == NULL || id == 0) return status; - switch_mutex_lock(member->control_mutex); + lock_member(member); for (rel = member->relationships; rel; rel = rel->next) { if (rel->id == id) { /* we just forget about rel here cos it was allocated by the member's pool @@ -588,7 +593,7 @@ static switch_status_t member_del_relationship(conference_member_t *member, uint } last = rel; } - switch_mutex_unlock(member->control_mutex); + unlock_member(member); return status; } @@ -608,7 +613,7 @@ static switch_status_t conference_add_member(conference_obj_t *conference, confe switch_mutex_lock(conference->mutex); switch_mutex_lock(member->audio_in_mutex); switch_mutex_lock(member->audio_out_mutex); - switch_mutex_lock(member->control_mutex); + lock_member(member); switch_mutex_lock(conference->member_mutex); switch_clear_flag(conference, CFLAG_DESTRUCT); @@ -703,7 +708,7 @@ static switch_status_t conference_add_member(conference_obj_t *conference, confe } } - switch_mutex_unlock(member->control_mutex); + unlock_member(member); switch_mutex_unlock(member->audio_out_mutex); switch_mutex_unlock(member->audio_in_mutex); @@ -725,18 +730,18 @@ static switch_status_t conference_del_member(conference_obj_t *conference, confe switch_assert(conference != NULL); switch_assert(member != NULL); - switch_mutex_lock(member->control_mutex); + lock_member(member); member_fnode = member->fnode; member_sh = member->sh; member->fnode = NULL; member->sh = NULL; - switch_mutex_unlock(member->control_mutex); + unlock_member(member); switch_mutex_lock(conference->mutex); switch_mutex_lock(conference->member_mutex); switch_mutex_lock(member->audio_in_mutex); switch_mutex_lock(member->audio_out_mutex); - switch_mutex_lock(member->control_mutex); + lock_member(member); switch_clear_flag(member, MFLAG_INTREE); for (imember = conference->members; imember; imember = imember->next) { @@ -832,7 +837,7 @@ static switch_status_t conference_del_member(conference_obj_t *conference, confe } } switch_mutex_unlock(conference->member_mutex); - switch_mutex_unlock(member->control_mutex); + unlock_member(member); switch_mutex_unlock(member->audio_out_mutex); switch_mutex_unlock(member->audio_in_mutex); switch_mutex_unlock(conference->mutex); @@ -1407,7 +1412,7 @@ static void conference_loop_fn_energy_up(conference_member_t *member, caller_con if (member == NULL) return; - switch_mutex_lock(member->control_mutex); + lock_member(member); member->energy_level += 200; if (member->energy_level > 3000) { member->energy_level = 3000; @@ -1420,7 +1425,7 @@ static void conference_loop_fn_energy_up(conference_member_t *member, caller_con switch_event_add_header(event, SWITCH_STACK_BOTTOM, "New-Level", "%d", member->energy_level); switch_event_fire(&event); } - switch_mutex_unlock(member->control_mutex); + unlock_member(member); switch_snprintf(msg, sizeof(msg), "Energy level %d", member->energy_level); conference_member_say(member, msg, 0); @@ -1434,7 +1439,7 @@ static void conference_loop_fn_energy_equ_conf(conference_member_t *member, call if (member == NULL) return; - switch_mutex_lock(member->control_mutex); + lock_member(member); member->energy_level = member->conference->energy_level; if (test_eflag(member->conference, EFLAG_ENERGY_LEVEL) && @@ -1444,7 +1449,7 @@ static void conference_loop_fn_energy_equ_conf(conference_member_t *member, call switch_event_add_header(event, SWITCH_STACK_BOTTOM, "New-Level", "%d", member->energy_level); switch_event_fire(&event); } - switch_mutex_unlock(member->control_mutex); + unlock_member(member); switch_snprintf(msg, sizeof(msg), "Energy level %d", member->energy_level); conference_member_say(member, msg, 0); @@ -1458,7 +1463,7 @@ static void conference_loop_fn_energy_dn(conference_member_t *member, caller_con if (member == NULL) return; - switch_mutex_lock(member->control_mutex); + lock_member(member); member->energy_level -= 100; if (member->energy_level < 0) { member->energy_level = 0; @@ -1471,7 +1476,7 @@ static void conference_loop_fn_energy_dn(conference_member_t *member, caller_con switch_event_add_header(event, SWITCH_STACK_BOTTOM, "New-Level", "%d", member->energy_level); switch_event_fire(&event); } - switch_mutex_unlock(member->control_mutex); + unlock_member(member); switch_snprintf(msg, sizeof(msg), "Energy level %d", member->energy_level); conference_member_say(member, msg, 0); @@ -1485,7 +1490,7 @@ static void conference_loop_fn_volume_talk_up(conference_member_t *member, calle if (member == NULL) return; - switch_mutex_lock(member->control_mutex); + lock_member(member); member->volume_out_level++; switch_normalize_volume(member->volume_out_level); @@ -1496,7 +1501,7 @@ static void conference_loop_fn_volume_talk_up(conference_member_t *member, calle switch_event_add_header(event, SWITCH_STACK_BOTTOM, "New-Level", "%d", member->volume_out_level); switch_event_fire(&event); } - switch_mutex_unlock(member->control_mutex); + unlock_member(member); switch_snprintf(msg, sizeof(msg), "Volume level %d", member->volume_out_level); conference_member_say(member, msg, 0); @@ -1510,7 +1515,7 @@ static void conference_loop_fn_volume_talk_zero(conference_member_t *member, cal if (member == NULL) return; - switch_mutex_lock(member->control_mutex); + lock_member(member); member->volume_out_level = 0; if (test_eflag(member->conference, EFLAG_VOLUME_LEVEL) && @@ -1520,7 +1525,7 @@ static void conference_loop_fn_volume_talk_zero(conference_member_t *member, cal switch_event_add_header(event, SWITCH_STACK_BOTTOM, "New-Level", "%d", member->volume_out_level); switch_event_fire(&event); } - switch_mutex_unlock(member->control_mutex); + unlock_member(member); switch_snprintf(msg, sizeof(msg), "Volume level %d", member->volume_out_level); conference_member_say(member, msg, 0); @@ -1534,7 +1539,7 @@ static void conference_loop_fn_volume_talk_dn(conference_member_t *member, calle if (member == NULL) return; - switch_mutex_lock(member->control_mutex); + lock_member(member); member->volume_out_level--; switch_normalize_volume(member->volume_out_level); @@ -1545,7 +1550,7 @@ static void conference_loop_fn_volume_talk_dn(conference_member_t *member, calle switch_event_add_header(event, SWITCH_STACK_BOTTOM, "New-Level", "%d", member->volume_out_level); switch_event_fire(&event); } - switch_mutex_unlock(member->control_mutex); + unlock_member(member); switch_snprintf(msg, sizeof(msg), "Volume level %d", member->volume_out_level); conference_member_say(member, msg, 0); @@ -1559,7 +1564,7 @@ static void conference_loop_fn_volume_listen_up(conference_member_t *member, cal if (member == NULL) return; - switch_mutex_lock(member->control_mutex); + lock_member(member); member->volume_in_level++; switch_normalize_volume(member->volume_in_level); @@ -1570,7 +1575,7 @@ static void conference_loop_fn_volume_listen_up(conference_member_t *member, cal switch_event_add_header(event, SWITCH_STACK_BOTTOM, "New-Level", "%d", member->volume_in_level); switch_event_fire(&event); } - switch_mutex_unlock(member->control_mutex); + unlock_member(member); switch_snprintf(msg, sizeof(msg), "Gain level %d", member->volume_in_level); conference_member_say(member, msg, 0); @@ -1584,7 +1589,7 @@ static void conference_loop_fn_volume_listen_zero(conference_member_t *member, c if (member == NULL) return; - switch_mutex_lock(member->control_mutex); + lock_member(member); member->volume_in_level = 0; if (test_eflag(member->conference, EFLAG_GAIN_LEVEL) && @@ -1594,7 +1599,7 @@ static void conference_loop_fn_volume_listen_zero(conference_member_t *member, c switch_event_add_header(event, SWITCH_STACK_BOTTOM, "New-Level", "%d", member->volume_in_level); switch_event_fire(&event); } - switch_mutex_unlock(member->control_mutex); + unlock_member(member); switch_snprintf(msg, sizeof(msg), "Gain level %d", member->volume_in_level); conference_member_say(member, msg, 0); @@ -1608,7 +1613,7 @@ static void conference_loop_fn_volume_listen_dn(conference_member_t *member, cal if (member == NULL) return; - switch_mutex_lock(member->control_mutex); + lock_member(member); member->volume_in_level--; switch_normalize_volume(member->volume_in_level); @@ -1619,7 +1624,7 @@ static void conference_loop_fn_volume_listen_dn(conference_member_t *member, cal switch_event_add_header(event, SWITCH_STACK_BOTTOM, "New-Level", "%d", member->volume_in_level); switch_event_fire(&event); } - switch_mutex_unlock(member->control_mutex); + unlock_member(member); switch_snprintf(msg, sizeof(msg), "Gain level %d", member->volume_in_level); conference_member_say(member, msg, 0); @@ -1766,9 +1771,11 @@ static void *SWITCH_THREAD_FUNC conference_loop_input(switch_thread_t *thread, v while (switch_test_flag(member, MFLAG_RUNNING) && switch_channel_ready(channel)) { - if (switch_channel_test_app_flag(channel, CF_APP_TAGGED)) { + switch_mutex_lock(member->read_mutex); + + if (switch_channel_ready(channel) && switch_channel_test_app_flag(channel, CF_APP_TAGGED)) { switch_yield(100000); - continue; + goto do_continue; } @@ -1777,7 +1784,7 @@ static void *SWITCH_THREAD_FUNC conference_loop_input(switch_thread_t *thread, v /* end the loop, if appropriate */ if (!SWITCH_READ_ACCEPTABLE(status) || !switch_test_flag(member, MFLAG_RUNNING)) { - break; + goto do_break; } if (switch_test_flag(read_frame, SFF_CNG)) { @@ -1798,7 +1805,7 @@ static void *SWITCH_THREAD_FUNC conference_loop_input(switch_thread_t *thread, v } } } - continue; + goto do_continue; } energy_level = member->energy_level; @@ -1940,12 +1947,21 @@ static void *SWITCH_THREAD_FUNC conference_loop_input(switch_thread_t *thread, v ok = switch_buffer_write(member->audio_buffer, data, datalen); switch_mutex_unlock(member->audio_in_mutex); if (!ok) { - break; + goto do_break; } } } + + do_continue: + + switch_mutex_unlock(member->read_mutex); + } + do_break: + + switch_mutex_unlock(member->read_mutex); + switch_resample_destroy(&member->read_resampler); switch_clear_flag_locked(member, MFLAG_ITHREAD); @@ -2123,12 +2139,14 @@ static void conference_loop_output(conference_member_t *member) switch_size_t file_sample_len = csamples; switch_size_t file_data_len = file_sample_len * 2; + + switch_mutex_lock(member->write_mutex); + if (switch_test_flag(member, MFLAG_RESTART)) { + switch_mutex_unlock(member->write_mutex); goto top; } - switch_mutex_lock(member->control_mutex); - if (switch_core_session_dequeue_event(member->session, &event, SWITCH_FALSE) == SWITCH_STATUS_SUCCESS) { if (event->event_id == SWITCH_EVENT_MESSAGE) { char *from = switch_event_get_header(event, "from"); @@ -2330,7 +2348,7 @@ static void conference_loop_output(conference_member_t *member) switch_clear_flag_locked(member, MFLAG_FLUSH_BUFFER); } - switch_mutex_unlock(member->control_mutex); + switch_mutex_unlock(member->write_mutex); if (switch_core_session_private_event_count(member->session)) { @@ -2347,7 +2365,8 @@ static void conference_loop_output(conference_member_t *member) switch_cond_next(); } - } /* Rinse ... Repeat */ + } /* Rinse ... Repeat */ + if (member->digit_stream != NULL) { switch_ivr_digit_stream_destroy(&member->digit_stream); @@ -2415,10 +2434,11 @@ static void *SWITCH_THREAD_FUNC conference_record_thread_run(switch_thread_t *th member->mux_frame = switch_core_alloc(member->pool, member->frame_size); - switch_mutex_init(&member->control_mutex, SWITCH_MUTEX_NESTED, rec->pool); + switch_mutex_init(&member->write_mutex, SWITCH_MUTEX_NESTED, rec->pool); switch_mutex_init(&member->flag_mutex, SWITCH_MUTEX_NESTED, rec->pool); switch_mutex_init(&member->audio_in_mutex, SWITCH_MUTEX_NESTED, rec->pool); switch_mutex_init(&member->audio_out_mutex, SWITCH_MUTEX_NESTED, rec->pool); + switch_mutex_init(&member->read_mutex, SWITCH_MUTEX_NESTED, rec->pool); /* Setup an audio buffer for the incoming audio */ if (switch_buffer_create_dynamic(&member->audio_buffer, CONF_DBLOCK_SIZE, CONF_DBUFFER_SIZE, 0) != SWITCH_STATUS_SUCCESS) { @@ -2575,7 +2595,7 @@ static uint32_t conference_member_stop_file(conference_member_t *member, file_st if (member == NULL) return count; - switch_mutex_lock(member->control_mutex); + lock_member(member); if (stop == FILE_STOP_ALL) { for (nptr = member->fnode; nptr; nptr = nptr->next) { @@ -2589,7 +2609,7 @@ static uint32_t conference_member_stop_file(conference_member_t *member, file_st } } - switch_mutex_unlock(member->control_mutex); + unlock_member(member); return count; } @@ -2610,10 +2630,10 @@ static void conference_send_all_dtmf(conference_member_t *member, conference_obj const char *p; for (p = dtmf; p && *p; p++) { switch_dtmf_t digit = { *p, SWITCH_DEFAULT_DTMF_DURATION}; - switch_mutex_lock(imember->control_mutex); + lock_member(imember); switch_core_session_kill_channel(imember->session, SWITCH_SIG_BREAK); switch_core_session_send_dtmf(imember->session, &digit); - switch_mutex_unlock(imember->control_mutex); + unlock_member(imember); } } } @@ -2802,14 +2822,14 @@ static switch_status_t conference_member_play_file(conference_member_t *member, fnode->file = switch_core_strdup(fnode->pool, file); /* Queue the node */ switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(member->session), SWITCH_LOG_DEBUG, "Queueing file '%s' for play\n", file); - switch_mutex_lock(member->control_mutex); + lock_member(member); for (nptr = member->fnode; nptr && nptr->next; nptr = nptr->next); if (nptr) { nptr->next = fnode; } else { member->fnode = fnode; } - switch_mutex_unlock(member->control_mutex); + unlock_member(member); status = SWITCH_STATUS_SUCCESS; done: @@ -2867,7 +2887,7 @@ static switch_status_t conference_member_say(conference_member_t *member, char * } /* Queue the node */ - switch_mutex_lock(member->control_mutex); + lock_member(member); for (nptr = member->fnode; nptr && nptr->next; nptr = nptr->next); if (nptr) { @@ -2893,7 +2913,7 @@ static switch_status_t conference_member_say(conference_member_t *member, char * } switch_core_speech_feed_tts(fnode->sh, text, &flags); - switch_mutex_unlock(member->control_mutex); + unlock_member(member); status = SWITCH_STATUS_SUCCESS; @@ -3196,12 +3216,12 @@ static switch_status_t conf_api_sub_kick(conference_member_t *member, switch_str if (member == NULL) return SWITCH_STATUS_GENERR; - switch_mutex_lock(member->control_mutex); + lock_member(member); switch_clear_flag(member, MFLAG_RUNNING); switch_set_flag_locked(member, MFLAG_KICKED); switch_core_session_kill_channel(member->session, SWITCH_SIG_BREAK); - switch_mutex_unlock(member->control_mutex); + unlock_member(member); if (stream != NULL) { stream->write_function(stream, "OK kicked %u\n", member->id); } @@ -3231,10 +3251,10 @@ static switch_status_t conf_api_sub_dtmf(conference_member_t *member, switch_str return SWITCH_STATUS_GENERR; } - switch_mutex_lock(member->control_mutex); + lock_member(member); switch_core_session_kill_channel(member->session, SWITCH_SIG_BREAK); switch_core_session_send_dtmf_string(member->session, (char *) data); - switch_mutex_unlock(member->control_mutex); + unlock_member(member); if (stream != NULL) { stream->write_function(stream, "OK sent %s to %u\n", (char *) data, member->id); @@ -3259,9 +3279,9 @@ static switch_status_t conf_api_sub_energy(conference_member_t *member, switch_s return SWITCH_STATUS_GENERR; if (data) { - switch_mutex_lock(member->control_mutex); + lock_member(member); member->energy_level = atoi((char *) data); - switch_mutex_unlock(member->control_mutex); + unlock_member(member); } if (stream != NULL) { stream->write_function(stream, "Energy %u = %d\n", member->id, member->energy_level); @@ -3285,10 +3305,10 @@ static switch_status_t conf_api_sub_volume_in(conference_member_t *member, switc return SWITCH_STATUS_GENERR; if (data) { - switch_mutex_lock(member->control_mutex); + lock_member(member); member->volume_in_level = atoi((char *) data); switch_normalize_volume(member->volume_in_level); - switch_mutex_unlock(member->control_mutex); + unlock_member(member); } if (stream != NULL) { stream->write_function(stream, "Volume IN %u = %d\n", member->id, member->volume_in_level); @@ -3312,10 +3332,10 @@ static switch_status_t conf_api_sub_volume_out(conference_member_t *member, swit return SWITCH_STATUS_GENERR; if (data) { - switch_mutex_lock(member->control_mutex); + lock_member(member); member->volume_out_level = atoi((char *) data); switch_normalize_volume(member->volume_out_level); - switch_mutex_unlock(member->control_mutex); + unlock_member(member); } if (stream != NULL) { stream->write_function(stream, "Volume OUT %u = %d\n", member->id, member->volume_out_level); @@ -3986,7 +4006,8 @@ static switch_status_t conf_api_sub_transfer(conference_obj_t *conference, switc } /* move the member from the old conference to the new one */ - switch_mutex_lock(member->control_mutex); + lock_member(member); + if (conference != new_conference) { conference_del_member(conference, member); conference_add_member(new_conference, member); @@ -4001,7 +4022,7 @@ static switch_status_t conf_api_sub_transfer(conference_obj_t *conference, switc } } - switch_mutex_unlock(member->control_mutex); + unlock_member(member); stream->write_function(stream, "OK Member '%d' sent to conference %s.\n", member->id, argv[2]); @@ -5252,7 +5273,8 @@ SWITCH_STANDARD_APP(conference_function) /* Prepare MUTEXS */ member.id = next_member_id(); switch_mutex_init(&member.flag_mutex, SWITCH_MUTEX_NESTED, member.pool); - switch_mutex_init(&member.control_mutex, SWITCH_MUTEX_NESTED, member.pool); + switch_mutex_init(&member.write_mutex, SWITCH_MUTEX_NESTED, member.pool); + switch_mutex_init(&member.read_mutex, SWITCH_MUTEX_NESTED, member.pool); switch_mutex_init(&member.audio_in_mutex, SWITCH_MUTEX_NESTED, member.pool); switch_mutex_init(&member.audio_out_mutex, SWITCH_MUTEX_NESTED, member.pool);