您现在的位置是:首页 >技术交流 >suricata6 workers 流程网站首页技术交流
suricata6 workers 流程
简介suricata6 workers 流程
结构体
- TmModule
typedef struct TmModule_ {
const char *name;
/** thread handling */
TmEcode (*ThreadInit)(ThreadVars *, const void *, void **);
void (*ThreadExitPrintStats)(ThreadVars *, void *);
TmEcode (*ThreadDeinit)(ThreadVars *, void *);
/** the packet processing function */
TmEcode (*Func)(ThreadVars *, Packet *, void *);
TmEcode (*PktAcqLoop)(ThreadVars *, void *, void *);
/** terminates the capture loop in PktAcqLoop */
TmEcode (*PktAcqBreakLoop)(ThreadVars *, void *);
TmEcode (*Management)(ThreadVars *, void *);
/** global Init/DeInit */
TmEcode (*Init)(void);
TmEcode (*DeInit)(void);
#ifdef UNITTESTS
void (*RegisterTests)(void);
#endif
uint8_t cap_flags; /**< Flags to indicate the capability requierment of
the given TmModule */
/* Other flags used by the module */
uint8_t flags;
} TmModule;
- TmSlot
typedef struct TmSlot_ {
/* function pointers */
union {
TmSlotFunc SlotFunc;
TmEcode (*PktAcqLoop)(ThreadVars *, void *, void *);
TmEcode (*Management)(ThreadVars *, void *);
};
/** linked list of slots, used when a pipeline has multiple slots
* in a single thread. */
struct TmSlot_ *slot_next;
SC_ATOMIC_DECLARE(void *, slot_data);
TmEcode (*SlotThreadInit)(ThreadVars *, const void *, void **);
void (*SlotThreadExitPrintStats)(ThreadVars *, void *);
TmEcode (*SlotThreadDeinit)(ThreadVars *, void *);
/* data storage */
const void *slot_initdata;
/* store the thread module id */
int tm_id;
} TmSlot;
- ThreadVars
typedef struct ThreadVars_ {
pthread_t t;
/** function pointer to the function that runs the packet pipeline for
* this thread. It is passed directly to pthread_create(), hence the
* void pointers in and out. */
void *(*tm_func)(void *);
char name[16];
char *printable_name;
char *thread_group_name;
uint8_t thread_setup_flags;
/** the type of thread as defined in tm-threads.h (TVT_PPT, TVT_MGMT) */
uint8_t type;
uint16_t cpu_affinity; /** cpu or core number to set affinity to */
int thread_priority; /** priority (real time) for this thread. Look at threads.h */
/** TmModule::flags for each module part of this thread */
uint8_t tmm_flags;
uint8_t cap_flags; /**< Flags to indicate the capabilities of all the
TmModules resgitered under this thread */
uint8_t inq_id;
uint8_t outq_id;
/** local id */
int id;
/** incoming queue and handler */
Tmq *inq;
struct Packet_ * (*tmqh_in)(struct ThreadVars_ *);
SC_ATOMIC_DECLARE(uint32_t, flags);
/** list of of TmSlot objects together forming the packet pipeline. */
struct TmSlot_ *tm_slots;
/** pointer to the flowworker in the pipeline. Used as starting point
* for injected packets. Can be NULL if the flowworker is not part
* of this thread. */
struct TmSlot_ *tm_flowworker;
/** outgoing queue and handler */
Tmq *outq;
void *outctx;
void (*tmqh_out)(struct ThreadVars_ *, struct Packet_ *);
/** queue for decoders to temporarily store extra packets they
* generate. */
PacketQueueNoLock decode_pq;
/** Stream packet queue for flow time out injection. Either a pointer to the
* workers input queue or to stream_pq_local */
struct PacketQueue_ *stream_pq;
struct PacketQueue_ *stream_pq_local;
/* counters */
/** private counter store: counter updates modify this */
StatsPrivateThreadContext perf_private_ctx;
/** pointer to the next thread */
struct ThreadVars_ *next;
/** public counter store: counter syncs update this */
StatsPublicThreadContext perf_public_ctx;
/* mutex and condition used by management threads */
SCCtrlMutex *ctrl_mutex;
SCCtrlCondT *ctrl_cond;
struct FlowQueue_ *flow_queue;
bool break_loop;
int socketfd;
struct sockaddr_in servaddr;
} ThreadVars;
注册流程
- 运行模式注册
main(int argc, char ** argv) (main.c:26)
SuricataMain(int argc, char ** argv) (suricata.c:2940)
InitGlobal() (suricata.c:2928)
RunModeRegisterRunModes() (runmodes.c:224)
RunModeIdsPcapRegister() (runmode-pcap.c:45)
- 模块注册
main(int argc, char ** argv) (main.c:26)
SuricataMain(int argc, char ** argv) (suricata.c:3008)
PostConfLoadedSetup(SCInstance * suri) (suricata.c:2777)
RegisterAllModules() (suricata.c:893)
启动运行模式
如下pcap-workers模式调用堆栈:
main(int argc, char ** argv) (main.c:26)
SuricataMain(int argc, char ** argv) (suricata.c:3065)
RunModeDispatch(int runmode, const char * custom_mode, const char * capture_plugin_name, const char * capture_plugin_args) (runmodes.c:406)
RunModeIdsPcapWorkers() (runmode-pcap.c:319)
RunModeSetLiveCaptureWorkers(ConfigIfaceParserFunc ConfigParser, ConfigIfaceThreadsCountFunc ModThreadsCount, const char * recv_mod_name, const char * decode_mod_name, const char * thread_name, const char * live_dev) (util-runmodes.c:353)
//创建数据包处理线程
RunModeSetLiveCaptureWorkersForDevice(ConfigIfaceThreadsCountFunc ModThreadsCount, const char * recv_mod_name, const char * decode_mod_name, const char * thread_name, const char * live_dev, void * aconf, unsigned char single_mode) (util-runmodes.c:288)
数据包处理线程创建流程:
//创建ThreadVars存储线程信息
//成员struct TmSlot_ *tm_slots;是一个链表,一个slot对应一个处理模块
//tm_func是线程运行函数,TmThreadCreatePacketHandler最后一个参数指定线程功能,TmThreadCreatePacketHandler调用TmThreadSetSlots通过线程功能名称来设置tm_func
//功能名称与tm_func对应关系:
//varslot:TmThreadsSlotVar
//pktacqloop:TmThreadsSlotPktAcqLoop,收包线程
//management:TmThreadsManagement,管理线程
//command:TmThreadsManagement,命令线程
//custom:用户自定义函数,自定义线程
ThreadVars *tv = TmThreadCreatePacketHandler(tname,
"packetpool", "packetpool",
"packetpool", "packetpool",
"pktacqloop");
//从注册的模块中获取收包模块
tm_module = TmModuleGetByName(recv_mod_name);
//用模块初始化Slot,slot和模块成员基本是一一对应的,主要有:
//线程操作函数:InitFunc(线程初始化)、DeinitFunc(线程销毁)、ExitPrintStatsFunc(退出线程时获取线程状态)
//工作函数:PktAcqLoop(收包slot工作函数)、Management(管理slot工作函数)、Func(其他slot工作函数,如解码、流重组等)
//收包模块独有函数:PktAcqBreakLoop(停止收包)
//slot和模块通过slot->tm_id进行关联:在注销slot时,如果slot是收包slot,通过slot->tm_id找到收包模块,调用模块PktAcqBreakLoop停止收包
TmSlotSetFuncAppend(tv, tm_module, aconf);
//从注册的模块中获取解码模块
tm_module = TmModuleGetByName(decode_mod_name);
TmSlotSetFuncAppend(tv, tm_module, NULL);
//从注册的模块中获取流重组模块
tm_module = TmModuleGetByName("FlowWorker");
TmSlotSetFuncAppend(tv, tm_module, NULL);
//从注册的模块中获取阻断模块
tm_module = TmModuleGetByName("RespondReject");
TmSlotSetFuncAppend(tv, tm_module, NULL);
//创建数据包处理线程,线程函数tv->tm_func,线程参数tv
//在tv->tm_func,会将tv传给每个slot
TmThreadSpawn(tv);
数据包处理线程工作流程
1.工作流程
ThreadVars *tv = (ThreadVars *)td;
TmSlot *s = tv->tm_slots;
//初始化所有slot
for (slot = s; slot != NULL; slot = slot->slot_next) {
if (slot->SlotThreadInit != NULL) {
void *slot_data = NULL;
//用slot_initdata初始化slot,创建slot_data设置slot->slot_data
r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
......
(void)SC_ATOMIC_SET(slot->slot_data, slot_data);
}
}
while(run) {
//启动第一个slot进行收包,传入slot链表
r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
......
}
- slot初始化函数与工作函数的关系
初始化函数原型:
/*
* param tv pointer to ThreadVars,即启动模式时创建的ThreadVars,
* param initdata slot初始化参数,只有收包slot传了此参数,其他slot都是NULL,收包slot传入的是网卡信息
* param data pointer 每个slot创建自己的slot数据,如收包slot创建PcapThreadVars(用于保存收包统计信息、网卡信息等)
*/
TmEcode (*SlotThreadInit)(ThreadVars *tv, const void *initdata, void **data);
工作函数原型:
//解码、流程组slot工作函数:
//参数data:slot初始化时创建的slot_data
TmEcode (*Func)(ThreadVars *tv, Packet *p, void *data);
//收包slot工作函数:
//参数data:即初始化创建的slot数据
//参数slot:slot链表
//收包slot从data获取网卡信息,从网卡收到包后,调用TmThreadsSlotProcessPkt开始解码、流重组
TmEcode (*PktAcqLoop)(ThreadVars *tv, void *data, void *slot);
TmEcode (*Management)(ThreadVars *, void *);
- TmThreadsSlotVarRun
收包后调用TmThreadsSlotProcessPkt处理数据包,
TmThreadsSlotProcessPkt调用TmThreadsSlotVarRun
for (TmSlot *s = slot; s != NULL; s = s->slot_next) {
TmEcode r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data));
}
slot_data
- 收包slot
主要存储网卡配置、收包统计等信息
如下libpcap收包的slot data
typedef struct PcapThreadVars_
{
/* thread specific handle */
pcap_t *pcap_handle;
/* handle state */
unsigned char pcap_state;
/* thread specific bpf */
struct bpf_program filter;
/* ptr to string from config */
const char *bpf_filter;
time_t last_stats_dump;
/* data link type for the thread */
int datalink;
/* counters */
uint64_t pkts;
uint64_t bytes;
uint16_t capture_kernel_packets;
uint16_t capture_kernel_drops;
uint16_t capture_kernel_ifdrops;
ThreadVars *tv;
TmSlot *slot;
/** callback result -- set if one of the thread module failed. */
int cb_result;
/* pcap buffer size */
int pcap_buffer_size;
int pcap_snaplen;
ChecksumValidationMode checksum_mode;
LiveDevice *livedev;
PcapStats64 last_stats64;
} PcapThreadVars;
- 解码slot
主要存储解码的数据包统计信息,比如ipv4包数、ipv6包数、tcp包数、udp包数等。
typedef struct DecodeThreadVars_
{
/** Specific context for udp protocol detection (here atm) */
AppLayerThreadCtx *app_tctx;
/** stats/counters */
uint16_t counter_pkts;
uint16_t counter_bytes;
uint16_t counter_avg_pkt_size;
uint16_t counter_max_pkt_size;
uint16_t counter_max_mac_addrs_src;
uint16_t counter_max_mac_addrs_dst;
uint16_t counter_invalid;
uint16_t counter_eth;
uint16_t counter_chdlc;
uint16_t counter_ipv4;
uint16_t counter_ipv6;
uint16_t counter_tcp;
uint16_t counter_udp;
uint16_t counter_icmpv4;
uint16_t counter_icmpv6;
uint16_t counter_sll;
uint16_t counter_raw;
uint16_t counter_null;
uint16_t counter_sctp;
uint16_t counter_ppp;
uint16_t counter_geneve;
uint16_t counter_gre;
uint16_t counter_vlan;
uint16_t counter_vlan_qinq;
uint16_t counter_vxlan;
uint16_t counter_vntag;
uint16_t counter_ieee8021ah;
uint16_t counter_pppoe;
uint16_t counter_teredo;
uint16_t counter_mpls;
uint16_t counter_ipv4inipv6;
uint16_t counter_ipv6inipv6;
uint16_t counter_erspan;
/** frag stats - defrag runs in the context of the decoder. */
uint16_t counter_defrag_ipv4_fragments;
uint16_t counter_defrag_ipv4_reassembled;
uint16_t counter_defrag_ipv4_timeouts;
uint16_t counter_defrag_ipv6_fragments;
uint16_t counter_defrag_ipv6_reassembled;
uint16_t counter_defrag_ipv6_timeouts;
uint16_t counter_defrag_max_hit;
uint16_t counter_flow_memcap;
uint16_t counter_flow_tcp;
uint16_t counter_flow_udp;
uint16_t counter_flow_icmp4;
uint16_t counter_flow_icmp6;
uint16_t counter_flow_tcp_reuse;
uint16_t counter_flow_get_used;
uint16_t counter_flow_get_used_eval;
uint16_t counter_flow_get_used_eval_reject;
uint16_t counter_flow_get_used_eval_busy;
uint16_t counter_flow_get_used_failed;
uint16_t counter_flow_spare_sync;
uint16_t counter_flow_spare_sync_empty;
uint16_t counter_flow_spare_sync_incomplete;
uint16_t counter_flow_spare_sync_avg;
uint16_t counter_engine_events[DECODE_EVENT_MAX];
/* thread data for flow logging api: only used at forced
* flow recycle during lookups */
void *output_flow_thread_data;
} DecodeThreadVars;
- 流重组slot
主要存储流信息
typedef struct FlowWorkerThreadData_ {
DecodeThreadVars *dtv;
union {
StreamTcpThread *stream_thread;
void *stream_thread_ptr;
};
SC_ATOMIC_DECLARE(DetectEngineThreadCtxPtr, detect_thread);
void *output_thread; /* Output thread data. */
void *output_thread_flow; /* Output thread data. */
uint16_t local_bypass_pkts;
uint16_t local_bypass_bytes;
uint16_t both_bypass_pkts;
uint16_t both_bypass_bytes;
PacketQueueNoLock pq;
FlowLookupStruct fls;
struct {
uint16_t flows_injected;
uint16_t flows_removed;
uint16_t flows_aside_needs_work;
uint16_t flows_aside_pkt_inject;
} cnt;
} FlowWorkerThreadData;
FlowWorkerThreadData主要的数据是FlowLookupStruct fls;
如下是FlowLookupStruct结构体:
typedef struct FlowLookupStruct_ // TODO name
{
/** thread store of spare queues */
FlowQueuePrivate spare_queue;
DecodeThreadVars *dtv;
FlowQueuePrivate work_queue;
uint32_t emerg_spare_sync_stamp;
} FlowLookupStruct;
风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。