mirror of
				https://github.com/asterisk/asterisk.git
				synced 2025-10-31 10:47:18 +00:00 
			
		
		
		
	Unify manager behind a single event queue
git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@16957 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
		| @@ -59,11 +59,6 @@ | ||||
| #define AST_MAX_MANHEADERS 80 | ||||
| #define AST_MAX_MANHEADER_LEN 256 | ||||
|  | ||||
| struct eventqent { | ||||
| 	struct eventqent *next; | ||||
| 	char eventdata[1]; | ||||
| }; | ||||
|  | ||||
| struct mansession; | ||||
|  | ||||
| struct message { | ||||
|   | ||||
							
								
								
									
										242
									
								
								manager.c
									
									
									
									
									
								
							
							
						
						
									
										242
									
								
								manager.c
									
									
									
									
									
								
							| @@ -83,6 +83,14 @@ struct fast_originate_helper { | ||||
| 	struct ast_variable *vars; | ||||
| }; | ||||
|  | ||||
| struct eventqent { | ||||
| 	int usecount; | ||||
| 	int category; | ||||
| 	ast_mutex_t lock; | ||||
| 	struct eventqent *next; | ||||
| 	char eventdata[1]; | ||||
| }; | ||||
|  | ||||
| static int enabled = 0; | ||||
| static int portno = DEFAULT_MANAGER_PORT; | ||||
| static int asock = -1; | ||||
| @@ -93,6 +101,8 @@ static int httptimeout = 60; | ||||
| static pthread_t t; | ||||
| AST_MUTEX_DEFINE_STATIC(sessionlock); | ||||
| static int block_sockets = 0; | ||||
| static int num_sessions = 0; | ||||
| struct eventqent *master_eventq = NULL; | ||||
|  | ||||
| static struct permalias { | ||||
| 	int num; | ||||
| @@ -472,6 +482,23 @@ static int handle_showmanconn(int fd, int argc, char *argv[]) | ||||
| 	return RESULT_SUCCESS; | ||||
| } | ||||
|  | ||||
| /*! \brief  handle_showmanconn: CLI command show manager connected */ | ||||
| /* Should change to "manager show connected" */ | ||||
| static int handle_showmaneventq(int fd, int argc, char *argv[]) | ||||
| { | ||||
| 	struct eventqent *s; | ||||
| 	ast_mutex_lock(&sessionlock); | ||||
| 	s = master_eventq; | ||||
| 	while (s) { | ||||
| 		ast_cli(fd, "Usecount: %d\n",s->usecount); | ||||
| 		ast_cli(fd, "Category: %d\n", s->category); | ||||
| 		ast_cli(fd, "Event:\n%s", s->eventdata); | ||||
| 		s = s->next; | ||||
| 	} | ||||
| 	ast_mutex_unlock(&sessionlock); | ||||
| 	return RESULT_SUCCESS; | ||||
| } | ||||
|  | ||||
| static char showmancmd_help[] =  | ||||
| "Usage: show manager command <actionname>\n" | ||||
| "	Shows the detailed description for a specific Asterisk manager interface command.\n"; | ||||
| @@ -485,6 +512,11 @@ static char showmanconn_help[] = | ||||
| "	Prints a listing of the users that are currently connected to the\n" | ||||
| "Asterisk manager interface.\n"; | ||||
|  | ||||
| static char showmaneventq_help[] =  | ||||
| "Usage: show manager eventq\n" | ||||
| "	Prints a listing of all events pending in the Asterisk manger\n" | ||||
| "event queue.\n"; | ||||
|  | ||||
| static struct ast_cli_entry show_mancmd_cli = | ||||
| 	{ { "show", "manager", "command", NULL }, | ||||
| 	handle_showmancmd, "Show a manager interface command", showmancmd_help, complete_show_mancmd }; | ||||
| @@ -497,6 +529,24 @@ static struct ast_cli_entry show_manconn_cli = | ||||
| 	{ { "show", "manager", "connected", NULL }, | ||||
| 	handle_showmanconn, "Show connected manager interface users", showmanconn_help }; | ||||
|  | ||||
| static struct ast_cli_entry show_maneventq_cli = | ||||
| 	{ { "show", "manager", "eventq", NULL }, | ||||
| 	handle_showmaneventq, "Show manager interface queued events", showmaneventq_help }; | ||||
|  | ||||
| static void unuse_eventqent(struct eventqent *e) | ||||
| { | ||||
| 	/* XXX Need to atomically decrement the users.  Change this to atomic_dec | ||||
| 	       one day when we have such a beast XXX */ | ||||
| 	int val; | ||||
| 	ast_mutex_lock(&e->lock); | ||||
| 	e->usecount--; | ||||
| 	val = e->usecount && e->next; | ||||
| 	ast_mutex_unlock(&e->lock); | ||||
| 	/* Wake up sleeping beauty */ | ||||
| 	if (val) | ||||
| 		pthread_kill(t, SIGURG); | ||||
| } | ||||
|  | ||||
| static void free_session(struct mansession *s) | ||||
| { | ||||
| 	struct eventqent *eqe; | ||||
| @@ -508,7 +558,7 @@ static void free_session(struct mansession *s) | ||||
| 	while(s->eventq) { | ||||
| 		eqe = s->eventq; | ||||
| 		s->eventq = s->eventq->next; | ||||
| 		free(eqe); | ||||
| 		unuse_eventqent(eqe); | ||||
| 	} | ||||
| 	free(s); | ||||
| } | ||||
| @@ -530,6 +580,7 @@ static void destroy_session(struct mansession *s) | ||||
| 		else | ||||
| 			sessions = cur->next; | ||||
| 		free_session(s); | ||||
| 		num_sessions--; | ||||
| 	} else | ||||
| 		ast_log(LOG_WARNING, "Trying to delete nonexistent session %p?\n", s); | ||||
| 	ast_mutex_unlock(&sessionlock); | ||||
| @@ -876,7 +927,7 @@ static int action_waitevent(struct mansession *s, struct message *m) | ||||
| 		ast_log(LOG_DEBUG, "Starting waiting for an event!\n"); | ||||
| 	for (x=0;((x<timeout) || (timeout < 0)); x++) { | ||||
| 		ast_mutex_lock(&s->__lock); | ||||
| 		if (s->eventq) | ||||
| 		if (s->eventq && s->eventq->next) | ||||
| 			needexit = 1; | ||||
| 		if (s->waiting_thread != pthread_self()) | ||||
| 			needexit = 1; | ||||
| @@ -898,11 +949,14 @@ static int action_waitevent(struct mansession *s, struct message *m) | ||||
| 	if (s->waiting_thread == pthread_self()) { | ||||
| 		astman_send_response(s, m, "Success", "Waiting for Event..."); | ||||
| 		/* Only show events if we're the most recent waiter */ | ||||
| 		while(s->eventq) { | ||||
| 			astman_append(s, "%s", s->eventq->eventdata); | ||||
| 			eqe = s->eventq; | ||||
| 			s->eventq = s->eventq->next; | ||||
| 			free(eqe); | ||||
| 		while(s->eventq->next) { | ||||
| 			eqe = s->eventq->next; | ||||
| 			if (((s->readperm & eqe->category) == eqe->category) && | ||||
| 			    ((s->send_events & eqe->category) == eqe->category)) { | ||||
| 				astman_append(s, "%s", eqe->eventdata); | ||||
| 			} | ||||
| 			unuse_eventqent(s->eventq); | ||||
| 			s->eventq = eqe; | ||||
| 		} | ||||
| 		astman_append(s, | ||||
| 			"Event: WaitEventComplete\r\n" | ||||
| @@ -1566,6 +1620,30 @@ static int action_timeout(struct mansession *s, struct message *m) | ||||
| 	return 0; | ||||
| } | ||||
|  | ||||
| static int process_events(struct mansession *s) | ||||
| { | ||||
| 	struct eventqent *eqe; | ||||
| 	int ret = 0; | ||||
| 	ast_mutex_lock(&s->__lock); | ||||
| 	if (s->fd > -1) { | ||||
| 		s->busy--; | ||||
| 		if (!s->eventq) | ||||
| 			s->eventq = master_eventq; | ||||
| 		while(s->eventq->next) { | ||||
| 			eqe = s->eventq->next; | ||||
| 			if ((s->authenticated && (s->readperm & eqe->category) == eqe->category) && | ||||
| 			    ((s->send_events & eqe->category) == eqe->category)) { | ||||
| 				if (!ret && ast_carefulwrite(s->fd, eqe->eventdata, strlen(eqe->eventdata), s->writetimeout) < 0) | ||||
| 					ret = -1; | ||||
| 			} | ||||
| 			unuse_eventqent(s->eventq); | ||||
| 			s->eventq = eqe; | ||||
| 		} | ||||
| 	} | ||||
| 	ast_mutex_unlock(&s->__lock); | ||||
| 	return ret; | ||||
| } | ||||
|  | ||||
| static int process_message(struct mansession *s, struct message *m) | ||||
| { | ||||
| 	char action[80] = ""; | ||||
| @@ -1573,6 +1651,7 @@ static int process_message(struct mansession *s, struct message *m) | ||||
| 	char *id = astman_get_header(m,"ActionID"); | ||||
| 	char idText[256] = ""; | ||||
| 	char iabuf[INET_ADDRSTRLEN]; | ||||
| 	int ret = 0; | ||||
|  | ||||
| 	ast_copy_string(action, astman_get_header(m, "Action"), sizeof(action)); | ||||
| 	ast_log( LOG_DEBUG, "Manager received command '%s'\n", action ); | ||||
| @@ -1581,9 +1660,9 @@ static int process_message(struct mansession *s, struct message *m) | ||||
| 		astman_send_error(s, m, "Missing action in request"); | ||||
| 		return 0; | ||||
| 	} | ||||
|         if (!ast_strlen_zero(id)) { | ||||
|                 snprintf(idText, sizeof(idText), "ActionID: %s\r\n",id); | ||||
|         } | ||||
| 	if (!ast_strlen_zero(id)) { | ||||
| 		snprintf(idText, sizeof(idText), "ActionID: %s\r\n",id); | ||||
| 	} | ||||
| 	if (!s->authenticated) { | ||||
| 		if (!strcasecmp(action, "Challenge")) { | ||||
| 			char *authtype; | ||||
| @@ -1623,8 +1702,6 @@ static int process_message(struct mansession *s, struct message *m) | ||||
| 		} else | ||||
| 			astman_send_error(s, m, "Authentication Required"); | ||||
| 	} else { | ||||
| 		int ret=0; | ||||
| 		struct eventqent *eqe; | ||||
| 		ast_mutex_lock(&s->__lock); | ||||
| 		s->busy++; | ||||
| 		ast_mutex_unlock(&s->__lock); | ||||
| @@ -1642,23 +1719,10 @@ static int process_message(struct mansession *s, struct message *m) | ||||
| 		} | ||||
| 		if (!tmp) | ||||
| 			astman_send_error(s, m, "Invalid/unknown command"); | ||||
| 		ast_mutex_lock(&s->__lock); | ||||
| 		if (s->fd > -1) { | ||||
| 			s->busy--; | ||||
| 			while(s->eventq) { | ||||
| 				if (ast_carefulwrite(s->fd, s->eventq->eventdata, strlen(s->eventq->eventdata), s->writetimeout) < 0) { | ||||
| 					ret = -1; | ||||
| 					break; | ||||
| 				} | ||||
| 				eqe = s->eventq; | ||||
| 				s->eventq = s->eventq->next; | ||||
| 				free(eqe); | ||||
| 			} | ||||
| 		} | ||||
| 		ast_mutex_unlock(&s->__lock); | ||||
| 		return ret; | ||||
| 	} | ||||
| 	return 0; | ||||
| 	if (ret) | ||||
| 		return ret; | ||||
| 	return process_events(s); | ||||
| } | ||||
|  | ||||
| static int get_input(struct mansession *s, char *output) | ||||
| @@ -1687,12 +1751,20 @@ static int get_input(struct mansession *s, char *output) | ||||
| 	fds[0].fd = s->fd; | ||||
| 	fds[0].events = POLLIN; | ||||
| 	do { | ||||
| 		ast_mutex_lock(&s->__lock); | ||||
| 		s->waiting_thread = pthread_self(); | ||||
| 		ast_mutex_unlock(&s->__lock); | ||||
|  | ||||
| 		res = poll(fds, 1, -1); | ||||
|  | ||||
| 		ast_mutex_lock(&s->__lock); | ||||
| 		s->waiting_thread = AST_PTHREADT_NULL; | ||||
| 		ast_mutex_unlock(&s->__lock); | ||||
| 		if (res < 0) { | ||||
| 			if (errno == EINTR) { | ||||
| 				if (s->dead) | ||||
| 					return -1; | ||||
| 				continue; | ||||
| 				return 0; | ||||
| 			} | ||||
| 			ast_log(LOG_WARNING, "Select returned error: %s\n", strerror(errno)); | ||||
| 	 		return -1; | ||||
| @@ -1734,8 +1806,12 @@ static void *session_do(void *data) | ||||
| 				memset(&m, 0, sizeof(m)); | ||||
| 			} else if (m.hdrcount < AST_MAX_MANHEADERS - 1) | ||||
| 				m.hdrcount++; | ||||
| 		} else if (res < 0) | ||||
| 		} else if (res < 0) { | ||||
| 			break; | ||||
| 		} else if (s->eventq->next) { | ||||
| 			if (process_events(s)) | ||||
| 				break; | ||||
| 		} | ||||
| 	} | ||||
| 	if (s->authenticated) { | ||||
| 		if (option_verbose > 1) { | ||||
| @@ -1759,6 +1835,7 @@ static void *accept_thread(void *ignore) | ||||
| 	int as; | ||||
| 	struct sockaddr_in sin; | ||||
| 	socklen_t sinlen; | ||||
| 	struct eventqent *eqe; | ||||
| 	struct mansession *s, *prev=NULL, *next; | ||||
| 	struct protoent *p; | ||||
| 	int arg = 1; | ||||
| @@ -1779,6 +1856,7 @@ static void *accept_thread(void *ignore) | ||||
| 		while(s) { | ||||
| 			next = s->next; | ||||
| 			if (s->sessiontimeout && (now > s->sessiontimeout) && !s->inuse) { | ||||
| 				num_sessions--; | ||||
| 				if (prev) | ||||
| 					prev->next = next; | ||||
| 				else | ||||
| @@ -1792,6 +1870,14 @@ static void *accept_thread(void *ignore) | ||||
| 				prev = s; | ||||
| 			s = next; | ||||
| 		} | ||||
| 		/* Purge master event queue of old, unused events, but make sure we | ||||
| 		   always keep at least one in the queue */ | ||||
| 		eqe = master_eventq; | ||||
| 		while (master_eventq->next && !master_eventq->usecount) { | ||||
| 			eqe = master_eventq; | ||||
| 			master_eventq = master_eventq->next; | ||||
| 			free(eqe); | ||||
| 		} | ||||
| 		ast_mutex_unlock(&sessionlock); | ||||
|  | ||||
| 		sinlen = sizeof(sin); | ||||
| @@ -1831,8 +1917,17 @@ static void *accept_thread(void *ignore) | ||||
| 		s->fd = as; | ||||
| 		s->send_events = -1; | ||||
| 		ast_mutex_lock(&sessionlock); | ||||
| 		num_sessions++; | ||||
| 		s->next = sessions; | ||||
| 		sessions = s; | ||||
| 		/* Find the last place in the master event queue and hook ourselves | ||||
| 		   in there */ | ||||
| 		s->eventq = master_eventq; | ||||
| 		while(s->eventq->next) | ||||
| 			s->eventq = s->eventq->next; | ||||
| 		ast_mutex_lock(&s->eventq->lock); | ||||
| 		s->eventq->usecount++; | ||||
| 		ast_mutex_unlock(&s->eventq->lock); | ||||
| 		ast_mutex_unlock(&sessionlock); | ||||
| 		if (ast_pthread_create(&s->t, &attr, session_do, s)) | ||||
| 			destroy_session(s); | ||||
| @@ -1841,21 +1936,24 @@ static void *accept_thread(void *ignore) | ||||
| 	return NULL; | ||||
| } | ||||
|  | ||||
| static int append_event(struct mansession *s, const char *str) | ||||
| static int append_event(const char *str, int category) | ||||
| { | ||||
| 	struct eventqent *tmp, *prev=NULL; | ||||
| 	tmp = malloc(sizeof(struct eventqent) + strlen(str)); | ||||
| 	if (tmp) { | ||||
| 		ast_mutex_init(&tmp->lock); | ||||
| 		tmp->next = NULL; | ||||
| 		tmp->category = category; | ||||
| 		strcpy(tmp->eventdata, str); | ||||
| 		if (s->eventq) { | ||||
| 			prev = s->eventq; | ||||
| 		if (master_eventq) { | ||||
| 			prev = master_eventq; | ||||
| 			while(prev->next)  | ||||
| 				prev = prev->next; | ||||
| 			prev->next = tmp; | ||||
| 		} else { | ||||
| 			s->eventq = tmp; | ||||
| 			master_eventq = tmp; | ||||
| 		} | ||||
| 		tmp->usecount = num_sessions; | ||||
| 		return 0; | ||||
| 	} | ||||
| 	return -1; | ||||
| @@ -1870,45 +1968,33 @@ int manager_event(int category, const char *event, const char *fmt, ...) | ||||
| 	char *tmp_next = tmp; | ||||
| 	size_t tmp_left = sizeof(tmp) - 2; | ||||
| 	va_list ap; | ||||
| 	struct timeval now; | ||||
|  | ||||
| 	/* Abort if there aren't any manager sessions */ | ||||
| 	if (!num_sessions) | ||||
| 		return 0; | ||||
|  | ||||
| 	ast_build_string(&tmp_next, &tmp_left, "Event: %s\r\nPrivilege: %s\r\n", | ||||
| 			 event, authority_to_str(category, auth, sizeof(auth))); | ||||
| 	if (timestampevents) { | ||||
| 		now = ast_tvnow(); | ||||
| 		ast_build_string(&tmp_next, &tmp_left, "Timestamp: %ld.%06lu\r\n", | ||||
| 				 now.tv_sec, (unsigned long) now.tv_usec); | ||||
| 	} | ||||
| 	va_start(ap, fmt); | ||||
| 	ast_build_string_va(&tmp_next, &tmp_left, fmt, ap); | ||||
| 	va_end(ap); | ||||
| 	*tmp_next++ = '\r'; | ||||
| 	*tmp_next++ = '\n'; | ||||
| 	*tmp_next = '\0'; | ||||
| 	 | ||||
| 	ast_mutex_lock(&sessionlock); | ||||
| 	/* Append even to master list and wake up any sleeping sessions */ | ||||
| 	append_event(tmp, category); | ||||
| 	for (s = sessions; s; s = s->next) { | ||||
| 		if ((s->readperm & category) != category) | ||||
| 			continue; | ||||
|  | ||||
| 		if ((s->send_events & category) != category) | ||||
| 			continue; | ||||
|  | ||||
| 		if (ast_strlen_zero(tmp)) { | ||||
| 			struct timeval now; | ||||
|  | ||||
| 			ast_build_string(&tmp_next, &tmp_left, "Event: %s\r\nPrivilege: %s\r\n", | ||||
| 					 event, authority_to_str(category, auth, sizeof(auth))); | ||||
| 			if (timestampevents) { | ||||
| 				now = ast_tvnow(); | ||||
| 				ast_build_string(&tmp_next, &tmp_left, "Timestamp: %ld.%06lu\r\n", | ||||
| 						 now.tv_sec, (unsigned long) now.tv_usec); | ||||
| 			} | ||||
| 			va_start(ap, fmt); | ||||
| 			ast_build_string_va(&tmp_next, &tmp_left, fmt, ap); | ||||
| 			va_end(ap); | ||||
| 			*tmp_next++ = '\r'; | ||||
| 			*tmp_next++ = '\n'; | ||||
| 			*tmp_next = '\0'; | ||||
| 		} | ||||
|  | ||||
| 		ast_mutex_lock(&s->__lock); | ||||
| 		if (s->busy) { | ||||
| 			append_event(s, tmp); | ||||
| 			if (s->waiting_thread != AST_PTHREADT_NULL) | ||||
| 				pthread_kill(s->waiting_thread, SIGURG); | ||||
| 		} else if (!s->dead && !s->sessiontimeout) { | ||||
| 			if (ast_carefulwrite(s->fd, tmp, tmp_next - tmp, s->writetimeout) < 0) { | ||||
| 				ast_log(LOG_WARNING, "Disconnecting slow (or gone) manager session!\n"); | ||||
| 				s->dead = 1; | ||||
| 				pthread_kill(s->t, SIGURG); | ||||
| 			} | ||||
| 		} | ||||
| 		if (s->waiting_thread != AST_PTHREADT_NULL) | ||||
| 			pthread_kill(s->waiting_thread, SIGURG); | ||||
| 		ast_mutex_unlock(&s->__lock); | ||||
| 	} | ||||
| 	ast_mutex_unlock(&sessionlock); | ||||
| @@ -2084,12 +2170,23 @@ static char *generic_http_callback(int format, struct sockaddr_in *requestor, co | ||||
| 		s->managerid = rand() | (unsigned long)s; | ||||
| 		s->next = sessions; | ||||
| 		sessions = s; | ||||
| 		num_sessions++; | ||||
| 		/* Hook into the last spot in the event queue */ | ||||
| 		s->eventq = master_eventq; | ||||
| 		while(s->eventq->next) | ||||
| 			s->eventq = s->eventq->next; | ||||
| 		ast_mutex_lock(&s->eventq->lock); | ||||
| 		s->eventq->usecount++; | ||||
| 		ast_mutex_unlock(&s->eventq->lock); | ||||
| 		ast_mutex_unlock(&sessionlock); | ||||
| 	} | ||||
|  | ||||
| 	/* Reset HTTP timeout */ | ||||
| 	/* Reset HTTP timeout.  If we're not yet authenticated, keep it extremely short */ | ||||
| 	time(&s->sessiontimeout); | ||||
| 	s->sessiontimeout += httptimeout; | ||||
| 	if (!s->authenticated && (httptimeout > 5)) | ||||
| 		s->sessiontimeout += 5; | ||||
| 	else | ||||
| 		s->sessiontimeout += httptimeout; | ||||
| 	ast_mutex_unlock(&s->__lock); | ||||
| 	 | ||||
| 	memset(&m, 0, sizeof(m)); | ||||
| @@ -2248,8 +2345,11 @@ int init_manager(void) | ||||
| 		ast_cli_register(&show_mancmd_cli); | ||||
| 		ast_cli_register(&show_mancmds_cli); | ||||
| 		ast_cli_register(&show_manconn_cli); | ||||
| 		ast_cli_register(&show_maneventq_cli); | ||||
| 		ast_extension_state_add(NULL, NULL, manager_state_cb, NULL); | ||||
| 		registered = 1; | ||||
| 		/* Append placeholder event so master_eventq never runs dry */ | ||||
| 		append_event("Event: Placeholder\r\n\r\n", 0); | ||||
| 	} | ||||
| 	portno = DEFAULT_MANAGER_PORT; | ||||
| 	displayconnects = 1; | ||||
|   | ||||
		Reference in New Issue
	
	Block a user