您现在的位置是:首页 >技术交流 >apt_task的工作原理网站首页技术交流

apt_task的工作原理

云梦谭 2024-09-19 00:01:06
简介apt_task的工作原理

        apt是apr toolkit的缩写,它在apr平台基础上,进一步封装,抽象提炼了一些基础功能。本文分析apt_task的工作原理。

1. 基础类apt_task_t

        出于封装的目的,apt_task_t的定义放在apt_task.c文件里,头文件中只有一个声明:

/** Opaque task declaration */
typedef struct apt_task_t apt_task_t;
struct apt_task_t {
	APR_RING_ENTRY(apt_task_t) link;                 /* entry to parent task ring */
	APR_RING_HEAD(apt_task_head_t, apt_task_t) head; /* head of child tasks ring */

	const char          *name;          /* name of the task */
	void                *obj;           /* external object associated with the task */
	apr_pool_t          *pool;          /* memory pool to allocate task data from */
	apt_task_msg_pool_t *msg_pool;      /* message pool to allocate task messages from */
	apr_thread_mutex_t  *data_guard;    /* mutex to protect task data */
	apr_thread_t        *thread_handle; /* thread handle */
	apt_task_state_e     state;         /* current task state */
	apt_task_vtable_t    vtable;        /* table of virtual methods */
	apt_task_t          *parent_task;   /* parent (master) task */
	apr_size_t           pending_start; /* number of pending start requests */
	apr_size_t           pending_term;  /* number of pending terminate requests */
	apr_size_t           pending_off;   /* number of pending taking-offline requests */
	apr_size_t           pending_on;    /* number of pending bringing-online requests */
	apt_bool_t           running;       /* task is running (TRUE if even terminate has already been requested) */
	apt_bool_t           auto_ready;    /* if TRUE, task is implicitly ready to process messages */
};

它是apr 环(ring)的结构体,所有该task的子task都保存在这个环上。

name:task名字,系统中展现为线程的cmd name

obj: 私有数据

msg_pool: 消息池

data_guard:保护锁

thread_handle:运行task的线程句柄

state:一个task的状态

vtable:task中重要生命周期回调表,下面会有介绍。

parent_task:父task。

pending_task:环上等待开始的task个数。

pending_term:环上等待结束的task个数。

pending_off:环上等待下线的task个数。

pending_on:环上等待上线的task个数。

running:如果一个task开始执行了,它的状态会变成TASK_STATE_RUNNING,同时这个标志位也会为True

auto_ready:如果是True,则在执行task的时候会显示的调用process_start

1.1 apt_task_state_e

这个枚举的定义也在apt_task.c里,定义了四个值:

typedef enum {
	TASK_STATE_IDLE,               /**< no task activity */
	TASK_STATE_START_REQUESTED,    /**< start of the task has been requested, but it's not running yet */
	TASK_STATE_RUNNING,            /**< task is running */
	TASK_STATE_TERMINATE_REQUESTED /**< termination of the task has been requested, but it's still running */
} apt_task_state_e;

1.2 apt_task_vtable_t

相当于虚函数表,定义了task相关的方法抽象,具体回调函数由实现方实现。定义在apt_task.h里:

/** Table of task virtual methods */
struct apt_task_vtable_t {
struct apt_task_vtable_t {
	/** Virtual destroy method */
	apt_task_method_f destroy;
	/** Virtual start method*/
	apt_task_method_f start;
	/** Virtual terminate method */
	apt_task_method_f terminate;
	/** Virtual run method*/
	apt_task_method_f run;

	/** Virtual signal_msg method  */
	apt_bool_t (*signal_msg)(apt_task_t *task, apt_task_msg_t *msg);
	/** Virtual process_msg method */
	apt_bool_t (*process_msg)(apt_task_t *task, apt_task_msg_t *msg);

	/** Virtual process_start method */
	apt_bool_t (*process_start)(apt_task_t *task);
	/** Virtual process_terminate method */
	apt_bool_t (*process_terminate)(apt_task_t *task);

	/** Virtual pre-run event handler */
	apt_task_event_f on_pre_run;
	/** Virtual post-run event handler */
	apt_task_event_f on_post_run;
	/** Virtual start-complete event handler */
	apt_task_event_f on_start_complete;
	/** Virtual terminate-complete event handler */
	apt_task_event_f on_terminate_complete;
	/** Virtual take-offline-complete event handler */
	apt_task_event_f on_offline_complete;
	/** Virtual bring-online-complete event handler */
	apt_task_event_f on_online_complete;
};

函数说明:

变量(函数指针)说明相关函数
destroytask销毁函数,相当于析构函数apt_task_destroy
starttask开始apt_task_start
terminatetask终止apt_task_terminate
runtask执行任务函数,同时会改变对应的task状态;
一般不需要修改,内部有函数apt_consumer_task_run
apt_task_run
signal_msg发送信号消息,
在其“子类”apr_consumer_task中生产一个消息,并通过apr_queue发送;一般不需要修改,内部有函数apt_consumer_task_msg_signal
apt_task_msg_signal/apt_task_core_msg_signal
process_msgUSER类型消息的处理apt_task_msg_process
process_start信号消息前处理apt_task_run/apt_task_ready
process_terminate处理CORE_TASK_MSG_TERMINATE_REQUEST消息apt_core_task_msg_process
on_pre_run在run之前需要执行的内容apt_task_terminate
on_post_run在run之后需要执行的内容apt_task_terminate
on_start_completestart完成回调,结束后调用apt_task_core_msg_signal(task->parent_task,task->msg_pool,CORE_TASK_MSG_START_COMPLETE);apt_task_start_complete_raise
on_terminate_completeterminate完成apt_task_terminate_complete_raise
on_offline_completeoffline完成回调,结束后调用apt_task_core_msg_signal(task->parent_task,task->msg_pool,CORE_TASK_MSG_TAKEOFFLINE_COMPLETE);apt_task_offline_complete_raise
on_online_completeonline完成回调,结束后调用apt_task_core_msg_signal(task->parent_task,task->msg_pool,CORE_TASK_MSG_BRINGONLINE_COMPLETE);apt_task_online_complete_raise

1.3 apt_task的创建

apt_task_create()

APT_DECLARE(apt_task_t*) apt_task_create(
								void *obj,
								apt_task_msg_pool_t *msg_pool,
								apr_pool_t *pool)
{
	apt_task_t *task = apr_palloc(pool,sizeof(apt_task_t));
	task->obj = obj;
	task->pool = pool;
	task->msg_pool = msg_pool;

	if(!task->msg_pool) {
		task->msg_pool = apt_task_msg_pool_create_dynamic(0,pool);
	}

	task->state = TASK_STATE_IDLE;
	task->thread_handle = NULL;
	if(apr_thread_mutex_create(&task->data_guard, APR_THREAD_MUTEX_DEFAULT, task->pool) != APR_SUCCESS) {
		return NULL;
	}

	/* reset vtable */
	apt_task_vtable_reset(&task->vtable);
	task->vtable.terminate = apt_task_terminate_request;
	task->vtable.process_start = apt_task_start_process_internal;
	task->vtable.process_terminate = apt_task_terminate_process_internal;
	
	APR_RING_ELEM_INIT(task, link);
	APR_RING_INIT(&task->head, apt_task_t, link);

	task->parent_task = NULL;
	task->pending_start = 0;
	task->pending_term = 0;
	task->pending_off = 0;
	task->pending_on = 0;
	task->auto_ready = TRUE;
	task->name = "Task";
	return task;
}

这里初始化了三个函数:terminate、process_start和process_terminate,这三个函数几乎不需要用户自定义修改。

1.4 apt_task_run ()函数

static void* APR_THREAD_FUNC apt_task_run(apr_thread_t *thread_handle, void *data)
{
	apt_task_t *task = data;
	
#if APR_HAS_SETTHREADNAME
	apr_thread_name_set(task->name);
#endif
	/* raise pre-run event */
	if(task->vtable.on_pre_run) {
		task->vtable.on_pre_run(task);
	}
	apr_thread_mutex_lock(task->data_guard);
	task->state = TASK_STATE_RUNNING;
	task->running = TRUE;
	apr_thread_mutex_unlock(task->data_guard);

	if(task->auto_ready == TRUE) {
		/* start child tasks (if any) */
		if(task->vtable.process_start) {
			task->vtable.process_start(task);
		}
	}

	/* run task */
	if(task->vtable.run) {
		task->vtable.run(task);
	}

	apr_thread_mutex_lock(task->data_guard);
	task->state = TASK_STATE_IDLE;
	task->running = FALSE;
	apr_thread_mutex_unlock(task->data_guard);
	/* raise post-run event */
	if(task->vtable.on_post_run) {
		task->vtable.on_post_run(task);
	}

	apr_thread_exit(thread_handle,APR_SUCCESS);
	return NULL;
}

 apt_task_run是apt_task内部默认的执行函数,它定义了apt_task的生命周期。按顺序执行各个阶段的回调,比如:on_pre_run、process_start、run、on_post_run。

2. APT消息apt_task_msg_t

消息声明及实现文件分别为apt_task_msg.h和apt_task_msg.c。

/** Enumeration of task message types */
typedef enum {
	TASK_MSG_CORE,                      /**< core task message type */
	TASK_MSG_USER                       /**< user defined task messages start from here */
} apt_task_msg_type_e;

消息分为两大类,CORE和USER分别对应内核侧和用户侧。下面这个枚举对内核侧的消息类别进行细分:

/** Enumeration of core task messages */
typedef enum {
	CORE_TASK_MSG_NONE,                 /**< indefinite message */
	CORE_TASK_MSG_START_COMPLETE,       /**< start-complete message */
	CORE_TASK_MSG_TERMINATE_REQUEST,    /**< terminate-request message */
	CORE_TASK_MSG_TERMINATE_COMPLETE,   /**< terminate-complete message */
	CORE_TASK_MSG_TAKEOFFLINE_REQUEST,  /**< take-offline-request message */
	CORE_TASK_MSG_TAKEOFFLINE_COMPLETE, /**< take-offline-complete message */
	CORE_TASK_MSG_BRINGONLINE_REQUEST,  /**< bring-online-request message */
	CORE_TASK_MSG_BRINGONLINE_COMPLETE, /**< bring-online-complete message */
} apt_core_task_msg_type_e;

 消息定义:

/** Task message is used for inter task communication */
struct apt_task_msg_t {
	/** Message pool the task message is allocated from */
	apt_task_msg_pool_t *msg_pool;
	/** Task msg type */
	int                  type;
	/** Task msg sub type */
	int                  sub_type;
	/** Context specific data */
	char                 data[1];
};

2.1 消息处理函数apt_task_msg_process

这个函数的实现在apt_task.c里:

APT_DECLARE(apt_bool_t) apt_task_msg_process(apt_task_t *task, apt_task_msg_t *msg)
{
	apt_bool_t status = FALSE;
	apt_log(APT_LOG_MARK,APT_PRIO_DEBUG,"Process Message [%s] [" APT_PTR_FMT ";%d;%d]",
		task->name, msg, msg->type, msg->sub_type);
	if(msg->type == TASK_MSG_CORE) {
		status = apt_core_task_msg_process(task,msg);
	}
	else {
		if(task->vtable.process_msg) {
			status = task->vtable.process_msg(task,msg);
		}
	}
	
	apt_task_msg_release(msg);
	return status;
}

逻辑很简单,根据消息类型调用对应和处理函数。内核消息调用apt_core_task_msg_process(),用户消息调用回调函数process_msg()

3. task_consumer_task_t

这相当于apt_task的一个子类。其定义放在apt_consumer_task.c里面:

struct apt_consumer_task_t {
	void              *obj;
	apt_task_t        *base;
	apr_queue_t       *msg_queue;
#if APR_HAS_QUEUE_TIMEOUT
	apt_timer_queue_t *timer_queue;
#endif
};

obj: 自定义数据

base:基类指针

msg_queue: 消息队列

3.1 创建函数apt_consumer_task_create

APT_DECLARE(apt_consumer_task_t*) apt_consumer_task_create(
									void *obj,
									apt_task_msg_pool_t *msg_pool,
									apr_pool_t *pool)
{
	apt_task_vtable_t *vtable;
	apt_consumer_task_t *consumer_task = apr_palloc(pool,sizeof(apt_consumer_task_t));
	consumer_task->obj = obj;
	consumer_task->msg_queue = NULL;
	if(apr_queue_create(&consumer_task->msg_queue,1024,pool) != APR_SUCCESS) {
		return NULL;
	}
	
	consumer_task->base = apt_task_create(consumer_task,msg_pool,pool);
	if(!consumer_task->base) {
		return NULL;
	}

	vtable = apt_task_vtable_get(consumer_task->base);
	if(vtable) {
		vtable->run = apt_consumer_task_run;
		vtable->signal_msg = apt_consumer_task_msg_signal;
	}

#if APR_HAS_QUEUE_TIMEOUT
	consumer_task->timer_queue = apt_timer_queue_create(pool);
#endif

	return consumer_task;
}

这里初始化了两个vtable函数apt_consumer_task_run和apt_consumer_task_msg_signal这两个函数构建了默认的consumer_task的生命周期与内部状态运转逻辑,一般不要覆盖这个指针

3.2 apt_consumer_task_run

static apt_bool_t apt_consumer_task_run(apt_task_t *task)
{
	apr_status_t rv;
	void *msg;
	apt_bool_t *running;
	apt_consumer_task_t *consumer_task;
#if APR_HAS_QUEUE_TIMEOUT
	apr_interval_time_t timeout;
	apr_uint32_t queue_timeout;
	apr_time_t time_now, time_last = 0;
#endif
	const char *task_name;

	consumer_task = apt_task_object_get(task);
	if(!consumer_task) {
		return FALSE;
	}
	task_name = apt_task_name_get(consumer_task->base),

	running = apt_task_running_flag_get(task);
	if(!running) {
		return FALSE;
	}

	while(*running) {
#if APR_HAS_QUEUE_TIMEOUT
		if(apt_timer_queue_timeout_get(consumer_task->timer_queue,&queue_timeout) == TRUE) {
			timeout = (apr_interval_time_t)queue_timeout * 1000;
			time_last = apr_time_now();
			apt_log(APT_LOG_MARK,APT_PRIO_DEBUG,"Wait for Messages [%s] timeout [%u]",
				task_name, queue_timeout);
			rv = apr_queue_timedpop(consumer_task->msg_queue,timeout,&msg);
		}
		else
		{
			timeout = -1;
			apt_log(APT_LOG_MARK,APT_PRIO_DEBUG,"Wait for Messages [%s]",task_name);
			rv = apr_queue_pop(consumer_task->msg_queue,&msg);
		}
#else
		apt_log(APT_LOG_MARK,APT_PRIO_DEBUG,"Wait for Messages [%s]",task_name);
		rv = apr_queue_pop(consumer_task->msg_queue,&msg);
#endif
		if(rv == APR_SUCCESS) {
			if(msg) {
				apt_task_msg_t *task_msg = msg;
				apt_task_msg_process(consumer_task->base,task_msg);
			}
		}
		else if(rv != APR_TIMEUP) {
			apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Pop Message [%s] status: %d",task_name,rv);
		}

#if APR_HAS_QUEUE_TIMEOUT
		if(timeout != -1) {
			time_now = apr_time_now();
			if(time_now > time_last) {
				apt_timer_queue_advance(consumer_task->timer_queue,(apr_uint32_t)((time_now - time_last)/1000));
			}
			else {
				/* If NTP has drifted the clock backwards, advance the queue based on the set timeout but not actual time difference */
				if(rv == APR_TIMEUP) {
					apt_timer_queue_advance(consumer_task->timer_queue,queue_timeout);
				}
			}
		}
#endif
	}
	return TRUE;
}

 3.3 apt_consumer_task_msg_signal

static apt_bool_t apt_consumer_task_msg_signal(apt_task_t *task, apt_task_msg_t *msg)
{
	apt_consumer_task_t *consumer_task = apt_task_object_get(task);
	return (apr_queue_push(consumer_task->msg_queue,msg) == APR_SUCCESS) ? TRUE : FALSE;
}

4. 测试实例

tests/apttest/src/consumer_task_suite.c这个实例展示了consumer_task的用法。

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。