您现在的位置是:首页 >技术交流 >apt_task的工作原理网站首页技术交流
apt_task的工作原理
apt是apr toolkit的缩写,它在apr平台基础上,进一步封装,抽象提炼了一些基础功能。本文分析apt_task的工作原理。
1. 基础类apt_task_t
出于封装的目的,apt_task_t的定义放在apt_task.c文件里,头文件中只有一个声明:
/** Opaque task declaration */
typedef struct apt_task_t apt_task_t;
struct apt_task_t {
APR_RING_ENTRY(apt_task_t) link; /* entry to parent task ring */
APR_RING_HEAD(apt_task_head_t, apt_task_t) head; /* head of child tasks ring */
const char *name; /* name of the task */
void *obj; /* external object associated with the task */
apr_pool_t *pool; /* memory pool to allocate task data from */
apt_task_msg_pool_t *msg_pool; /* message pool to allocate task messages from */
apr_thread_mutex_t *data_guard; /* mutex to protect task data */
apr_thread_t *thread_handle; /* thread handle */
apt_task_state_e state; /* current task state */
apt_task_vtable_t vtable; /* table of virtual methods */
apt_task_t *parent_task; /* parent (master) task */
apr_size_t pending_start; /* number of pending start requests */
apr_size_t pending_term; /* number of pending terminate requests */
apr_size_t pending_off; /* number of pending taking-offline requests */
apr_size_t pending_on; /* number of pending bringing-online requests */
apt_bool_t running; /* task is running (TRUE if even terminate has already been requested) */
apt_bool_t auto_ready; /* if TRUE, task is implicitly ready to process messages */
};
它是apr 环(ring)的结构体,所有该task的子task都保存在这个环上。
name:task名字,系统中展现为线程的cmd name
obj: 私有数据
msg_pool: 消息池
data_guard:保护锁
thread_handle:运行task的线程句柄
state:一个task的状态
vtable:task中重要生命周期回调表,下面会有介绍。
parent_task:父task。
pending_task:环上等待开始的task个数。
pending_term:环上等待结束的task个数。
pending_off:环上等待下线的task个数。
pending_on:环上等待上线的task个数。
running:如果一个task开始执行了,它的状态会变成TASK_STATE_RUNNING,同时这个标志位也会为True
auto_ready:如果是True,则在执行task的时候会显示的调用process_start
1.1 apt_task_state_e
这个枚举的定义也在apt_task.c里,定义了四个值:
typedef enum {
TASK_STATE_IDLE, /**< no task activity */
TASK_STATE_START_REQUESTED, /**< start of the task has been requested, but it's not running yet */
TASK_STATE_RUNNING, /**< task is running */
TASK_STATE_TERMINATE_REQUESTED /**< termination of the task has been requested, but it's still running */
} apt_task_state_e;
1.2 apt_task_vtable_t
相当于虚函数表,定义了task相关的方法抽象,具体回调函数由实现方实现。定义在apt_task.h里:
/** Table of task virtual methods */
struct apt_task_vtable_t {
struct apt_task_vtable_t {
/** Virtual destroy method */
apt_task_method_f destroy;
/** Virtual start method*/
apt_task_method_f start;
/** Virtual terminate method */
apt_task_method_f terminate;
/** Virtual run method*/
apt_task_method_f run;
/** Virtual signal_msg method */
apt_bool_t (*signal_msg)(apt_task_t *task, apt_task_msg_t *msg);
/** Virtual process_msg method */
apt_bool_t (*process_msg)(apt_task_t *task, apt_task_msg_t *msg);
/** Virtual process_start method */
apt_bool_t (*process_start)(apt_task_t *task);
/** Virtual process_terminate method */
apt_bool_t (*process_terminate)(apt_task_t *task);
/** Virtual pre-run event handler */
apt_task_event_f on_pre_run;
/** Virtual post-run event handler */
apt_task_event_f on_post_run;
/** Virtual start-complete event handler */
apt_task_event_f on_start_complete;
/** Virtual terminate-complete event handler */
apt_task_event_f on_terminate_complete;
/** Virtual take-offline-complete event handler */
apt_task_event_f on_offline_complete;
/** Virtual bring-online-complete event handler */
apt_task_event_f on_online_complete;
};
函数说明:
变量(函数指针) | 说明 | 相关函数 |
---|---|---|
destroy | task销毁函数,相当于析构函数 | apt_task_destroy |
start | task开始 | apt_task_start |
terminate | task终止 | apt_task_terminate |
run | task执行任务函数,同时会改变对应的task状态; 一般不需要修改,内部有函数apt_consumer_task_run | apt_task_run |
signal_msg | 发送信号消息, 在其“子类”apr_consumer_task中生产一个消息,并通过apr_queue发送;一般不需要修改,内部有函数apt_consumer_task_msg_signal | apt_task_msg_signal/apt_task_core_msg_signal |
process_msg | USER类型消息的处理 | apt_task_msg_process |
process_start | 信号消息前处理 | apt_task_run/apt_task_ready |
process_terminate | 处理CORE_TASK_MSG_TERMINATE_REQUEST消息 | apt_core_task_msg_process |
on_pre_run | 在run之前需要执行的内容 | apt_task_terminate |
on_post_run | 在run之后需要执行的内容 | apt_task_terminate |
on_start_complete | start完成回调,结束后调用apt_task_core_msg_signal(task->parent_task,task->msg_pool,CORE_TASK_MSG_START_COMPLETE); | apt_task_start_complete_raise |
on_terminate_complete | terminate完成 | apt_task_terminate_complete_raise |
on_offline_complete | offline完成回调,结束后调用apt_task_core_msg_signal(task->parent_task,task->msg_pool,CORE_TASK_MSG_TAKEOFFLINE_COMPLETE); | apt_task_offline_complete_raise |
on_online_complete | online完成回调,结束后调用apt_task_core_msg_signal(task->parent_task,task->msg_pool,CORE_TASK_MSG_BRINGONLINE_COMPLETE); | apt_task_online_complete_raise |
1.3 apt_task的创建
apt_task_create()
APT_DECLARE(apt_task_t*) apt_task_create(
void *obj,
apt_task_msg_pool_t *msg_pool,
apr_pool_t *pool)
{
apt_task_t *task = apr_palloc(pool,sizeof(apt_task_t));
task->obj = obj;
task->pool = pool;
task->msg_pool = msg_pool;
if(!task->msg_pool) {
task->msg_pool = apt_task_msg_pool_create_dynamic(0,pool);
}
task->state = TASK_STATE_IDLE;
task->thread_handle = NULL;
if(apr_thread_mutex_create(&task->data_guard, APR_THREAD_MUTEX_DEFAULT, task->pool) != APR_SUCCESS) {
return NULL;
}
/* reset vtable */
apt_task_vtable_reset(&task->vtable);
task->vtable.terminate = apt_task_terminate_request;
task->vtable.process_start = apt_task_start_process_internal;
task->vtable.process_terminate = apt_task_terminate_process_internal;
APR_RING_ELEM_INIT(task, link);
APR_RING_INIT(&task->head, apt_task_t, link);
task->parent_task = NULL;
task->pending_start = 0;
task->pending_term = 0;
task->pending_off = 0;
task->pending_on = 0;
task->auto_ready = TRUE;
task->name = "Task";
return task;
}
这里初始化了三个函数:terminate、process_start和process_terminate,这三个函数几乎不需要用户自定义修改。
1.4 apt_task_run ()函数
static void* APR_THREAD_FUNC apt_task_run(apr_thread_t *thread_handle, void *data)
{
apt_task_t *task = data;
#if APR_HAS_SETTHREADNAME
apr_thread_name_set(task->name);
#endif
/* raise pre-run event */
if(task->vtable.on_pre_run) {
task->vtable.on_pre_run(task);
}
apr_thread_mutex_lock(task->data_guard);
task->state = TASK_STATE_RUNNING;
task->running = TRUE;
apr_thread_mutex_unlock(task->data_guard);
if(task->auto_ready == TRUE) {
/* start child tasks (if any) */
if(task->vtable.process_start) {
task->vtable.process_start(task);
}
}
/* run task */
if(task->vtable.run) {
task->vtable.run(task);
}
apr_thread_mutex_lock(task->data_guard);
task->state = TASK_STATE_IDLE;
task->running = FALSE;
apr_thread_mutex_unlock(task->data_guard);
/* raise post-run event */
if(task->vtable.on_post_run) {
task->vtable.on_post_run(task);
}
apr_thread_exit(thread_handle,APR_SUCCESS);
return NULL;
}
apt_task_run是apt_task内部默认的执行函数,它定义了apt_task的生命周期。按顺序执行各个阶段的回调,比如:on_pre_run、process_start、run、on_post_run。
2. APT消息apt_task_msg_t
消息声明及实现文件分别为apt_task_msg.h和apt_task_msg.c。
/** Enumeration of task message types */
typedef enum {
TASK_MSG_CORE, /**< core task message type */
TASK_MSG_USER /**< user defined task messages start from here */
} apt_task_msg_type_e;
消息分为两大类,CORE和USER分别对应内核侧和用户侧。下面这个枚举对内核侧的消息类别进行细分:
/** Enumeration of core task messages */
typedef enum {
CORE_TASK_MSG_NONE, /**< indefinite message */
CORE_TASK_MSG_START_COMPLETE, /**< start-complete message */
CORE_TASK_MSG_TERMINATE_REQUEST, /**< terminate-request message */
CORE_TASK_MSG_TERMINATE_COMPLETE, /**< terminate-complete message */
CORE_TASK_MSG_TAKEOFFLINE_REQUEST, /**< take-offline-request message */
CORE_TASK_MSG_TAKEOFFLINE_COMPLETE, /**< take-offline-complete message */
CORE_TASK_MSG_BRINGONLINE_REQUEST, /**< bring-online-request message */
CORE_TASK_MSG_BRINGONLINE_COMPLETE, /**< bring-online-complete message */
} apt_core_task_msg_type_e;
消息定义:
/** Task message is used for inter task communication */
struct apt_task_msg_t {
/** Message pool the task message is allocated from */
apt_task_msg_pool_t *msg_pool;
/** Task msg type */
int type;
/** Task msg sub type */
int sub_type;
/** Context specific data */
char data[1];
};
2.1 消息处理函数apt_task_msg_process
这个函数的实现在apt_task.c里:
APT_DECLARE(apt_bool_t) apt_task_msg_process(apt_task_t *task, apt_task_msg_t *msg)
{
apt_bool_t status = FALSE;
apt_log(APT_LOG_MARK,APT_PRIO_DEBUG,"Process Message [%s] [" APT_PTR_FMT ";%d;%d]",
task->name, msg, msg->type, msg->sub_type);
if(msg->type == TASK_MSG_CORE) {
status = apt_core_task_msg_process(task,msg);
}
else {
if(task->vtable.process_msg) {
status = task->vtable.process_msg(task,msg);
}
}
apt_task_msg_release(msg);
return status;
}
逻辑很简单,根据消息类型调用对应和处理函数。内核消息调用apt_core_task_msg_process(),用户消息调用回调函数process_msg()
3. task_consumer_task_t
这相当于apt_task的一个子类。其定义放在apt_consumer_task.c里面:
struct apt_consumer_task_t {
void *obj;
apt_task_t *base;
apr_queue_t *msg_queue;
#if APR_HAS_QUEUE_TIMEOUT
apt_timer_queue_t *timer_queue;
#endif
};
obj: 自定义数据
base:基类指针
msg_queue: 消息队列
3.1 创建函数apt_consumer_task_create
APT_DECLARE(apt_consumer_task_t*) apt_consumer_task_create(
void *obj,
apt_task_msg_pool_t *msg_pool,
apr_pool_t *pool)
{
apt_task_vtable_t *vtable;
apt_consumer_task_t *consumer_task = apr_palloc(pool,sizeof(apt_consumer_task_t));
consumer_task->obj = obj;
consumer_task->msg_queue = NULL;
if(apr_queue_create(&consumer_task->msg_queue,1024,pool) != APR_SUCCESS) {
return NULL;
}
consumer_task->base = apt_task_create(consumer_task,msg_pool,pool);
if(!consumer_task->base) {
return NULL;
}
vtable = apt_task_vtable_get(consumer_task->base);
if(vtable) {
vtable->run = apt_consumer_task_run;
vtable->signal_msg = apt_consumer_task_msg_signal;
}
#if APR_HAS_QUEUE_TIMEOUT
consumer_task->timer_queue = apt_timer_queue_create(pool);
#endif
return consumer_task;
}
这里初始化了两个vtable函数apt_consumer_task_run和apt_consumer_task_msg_signal;这两个函数构建了默认的consumer_task的生命周期与内部状态运转逻辑,一般不要覆盖这个指针。
3.2 apt_consumer_task_run
static apt_bool_t apt_consumer_task_run(apt_task_t *task)
{
apr_status_t rv;
void *msg;
apt_bool_t *running;
apt_consumer_task_t *consumer_task;
#if APR_HAS_QUEUE_TIMEOUT
apr_interval_time_t timeout;
apr_uint32_t queue_timeout;
apr_time_t time_now, time_last = 0;
#endif
const char *task_name;
consumer_task = apt_task_object_get(task);
if(!consumer_task) {
return FALSE;
}
task_name = apt_task_name_get(consumer_task->base),
running = apt_task_running_flag_get(task);
if(!running) {
return FALSE;
}
while(*running) {
#if APR_HAS_QUEUE_TIMEOUT
if(apt_timer_queue_timeout_get(consumer_task->timer_queue,&queue_timeout) == TRUE) {
timeout = (apr_interval_time_t)queue_timeout * 1000;
time_last = apr_time_now();
apt_log(APT_LOG_MARK,APT_PRIO_DEBUG,"Wait for Messages [%s] timeout [%u]",
task_name, queue_timeout);
rv = apr_queue_timedpop(consumer_task->msg_queue,timeout,&msg);
}
else
{
timeout = -1;
apt_log(APT_LOG_MARK,APT_PRIO_DEBUG,"Wait for Messages [%s]",task_name);
rv = apr_queue_pop(consumer_task->msg_queue,&msg);
}
#else
apt_log(APT_LOG_MARK,APT_PRIO_DEBUG,"Wait for Messages [%s]",task_name);
rv = apr_queue_pop(consumer_task->msg_queue,&msg);
#endif
if(rv == APR_SUCCESS) {
if(msg) {
apt_task_msg_t *task_msg = msg;
apt_task_msg_process(consumer_task->base,task_msg);
}
}
else if(rv != APR_TIMEUP) {
apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Pop Message [%s] status: %d",task_name,rv);
}
#if APR_HAS_QUEUE_TIMEOUT
if(timeout != -1) {
time_now = apr_time_now();
if(time_now > time_last) {
apt_timer_queue_advance(consumer_task->timer_queue,(apr_uint32_t)((time_now - time_last)/1000));
}
else {
/* If NTP has drifted the clock backwards, advance the queue based on the set timeout but not actual time difference */
if(rv == APR_TIMEUP) {
apt_timer_queue_advance(consumer_task->timer_queue,queue_timeout);
}
}
}
#endif
}
return TRUE;
}
3.3 apt_consumer_task_msg_signal
static apt_bool_t apt_consumer_task_msg_signal(apt_task_t *task, apt_task_msg_t *msg)
{
apt_consumer_task_t *consumer_task = apt_task_object_get(task);
return (apr_queue_push(consumer_task->msg_queue,msg) == APR_SUCCESS) ? TRUE : FALSE;
}
4. 测试实例
tests/apttest/src/consumer_task_suite.c这个实例展示了consumer_task的用法。