您现在的位置是:首页 >技术交流 >suricata6 workers 流程网站首页技术交流

suricata6 workers 流程

唐装鼠 2024-09-02 00:01:02
简介suricata6 workers 流程

结构体

  1. TmModule
typedef struct TmModule_ {
    const char *name;

    /** thread handling */
    TmEcode (*ThreadInit)(ThreadVars *, const void *, void **);
    void (*ThreadExitPrintStats)(ThreadVars *, void *);
    TmEcode (*ThreadDeinit)(ThreadVars *, void *);

    /** the packet processing function */
    TmEcode (*Func)(ThreadVars *, Packet *, void *);

    TmEcode (*PktAcqLoop)(ThreadVars *, void *, void *);

    /** terminates the capture loop in PktAcqLoop */
    TmEcode (*PktAcqBreakLoop)(ThreadVars *, void *);

    TmEcode (*Management)(ThreadVars *, void *);

    /** global Init/DeInit */
    TmEcode (*Init)(void);
    TmEcode (*DeInit)(void);
#ifdef UNITTESTS
    void (*RegisterTests)(void);
#endif
    uint8_t cap_flags;   /**< Flags to indicate the capability requierment of
                             the given TmModule */
    /* Other flags used by the module */
    uint8_t flags;
} TmModule;

  1. TmSlot
typedef struct TmSlot_ {
    /* function pointers */
    union {
        TmSlotFunc SlotFunc;
        TmEcode (*PktAcqLoop)(ThreadVars *, void *, void *);
        TmEcode (*Management)(ThreadVars *, void *);
    };
    /** linked list of slots, used when a pipeline has multiple slots
     *  in a single thread. */
    struct TmSlot_ *slot_next;

    SC_ATOMIC_DECLARE(void *, slot_data);

    TmEcode (*SlotThreadInit)(ThreadVars *, const void *, void **);
    void (*SlotThreadExitPrintStats)(ThreadVars *, void *);
    TmEcode (*SlotThreadDeinit)(ThreadVars *, void *);

    /* data storage */
    const void *slot_initdata;
    /* store the thread module id */
    int tm_id;

} TmSlot;
  1. ThreadVars
typedef struct ThreadVars_ {
    pthread_t t;
    /** function pointer to the function that runs the packet pipeline for
     *  this thread. It is passed directly to pthread_create(), hence the
     *  void pointers in and out. */
    void *(*tm_func)(void *);

    char name[16];
    char *printable_name;
    char *thread_group_name;

    uint8_t thread_setup_flags;

    /** the type of thread as defined in tm-threads.h (TVT_PPT, TVT_MGMT) */
    uint8_t type;

    uint16_t cpu_affinity; /** cpu or core number to set affinity to */
    int thread_priority; /** priority (real time) for this thread. Look at threads.h */


    /** TmModule::flags for each module part of this thread */
    uint8_t tmm_flags;

    uint8_t cap_flags; /**< Flags to indicate the capabilities of all the
                            TmModules resgitered under this thread */
    uint8_t inq_id;
    uint8_t outq_id;

    /** local id */
    int id;

    /** incoming queue and handler */
    Tmq *inq;
    struct Packet_ * (*tmqh_in)(struct ThreadVars_ *);

    SC_ATOMIC_DECLARE(uint32_t, flags);

    /** list of of TmSlot objects together forming the packet pipeline. */
    struct TmSlot_ *tm_slots;

    /** pointer to the flowworker in the pipeline. Used as starting point
     *  for injected packets. Can be NULL if the flowworker is not part
     *  of this thread. */
    struct TmSlot_ *tm_flowworker;

    /** outgoing queue and handler */
    Tmq *outq;
    void *outctx;
    void (*tmqh_out)(struct ThreadVars_ *, struct Packet_ *);

    /** queue for decoders to temporarily store extra packets they
     *  generate. */
    PacketQueueNoLock decode_pq;

    /** Stream packet queue for flow time out injection. Either a pointer to the
     *  workers input queue or to stream_pq_local */
    struct PacketQueue_ *stream_pq;
    struct PacketQueue_ *stream_pq_local;

    /* counters */

    /** private counter store: counter updates modify this */
    StatsPrivateThreadContext perf_private_ctx;

    /** pointer to the next thread */
    struct ThreadVars_ *next;

    /** public counter store: counter syncs update this */
    StatsPublicThreadContext perf_public_ctx;

    /* mutex and condition used by management threads */

    SCCtrlMutex *ctrl_mutex;
    SCCtrlCondT *ctrl_cond;

    struct FlowQueue_ *flow_queue;
    bool break_loop;
    int socketfd;
    struct sockaddr_in servaddr;

} ThreadVars;

注册流程

  1. 运行模式注册
main(int argc, char ** argv) (main.c:26)
SuricataMain(int argc, char ** argv) (suricata.c:2940)
InitGlobal() (suricata.c:2928)
RunModeRegisterRunModes() (runmodes.c:224)
RunModeIdsPcapRegister() (runmode-pcap.c:45)
  1. 模块注册
main(int argc, char ** argv) (main.c:26)
SuricataMain(int argc, char ** argv) (suricata.c:3008)
PostConfLoadedSetup(SCInstance * suri) (suricata.c:2777)
RegisterAllModules() (suricata.c:893)

启动运行模式

如下pcap-workers模式调用堆栈:

main(int argc, char ** argv) (main.c:26)
SuricataMain(int argc, char ** argv) (suricata.c:3065)
RunModeDispatch(int runmode, const char * custom_mode, const char * capture_plugin_name, const char * capture_plugin_args) (runmodes.c:406)
RunModeIdsPcapWorkers() (runmode-pcap.c:319)
RunModeSetLiveCaptureWorkers(ConfigIfaceParserFunc ConfigParser, ConfigIfaceThreadsCountFunc ModThreadsCount, const char * recv_mod_name, const char * decode_mod_name, const char * thread_name, const char * live_dev) (util-runmodes.c:353)
//创建数据包处理线程
RunModeSetLiveCaptureWorkersForDevice(ConfigIfaceThreadsCountFunc ModThreadsCount, const char * recv_mod_name, const char * decode_mod_name, const char * thread_name, const char * live_dev, void * aconf, unsigned char single_mode) (util-runmodes.c:288)

数据包处理线程创建流程:

//创建ThreadVars存储线程信息
//成员struct TmSlot_ *tm_slots;是一个链表,一个slot对应一个处理模块
//tm_func是线程运行函数,TmThreadCreatePacketHandler最后一个参数指定线程功能,TmThreadCreatePacketHandler调用TmThreadSetSlots通过线程功能名称来设置tm_func
//功能名称与tm_func对应关系:
//varslot:TmThreadsSlotVar
//pktacqloop:TmThreadsSlotPktAcqLoop,收包线程
//management:TmThreadsManagement,管理线程
//command:TmThreadsManagement,命令线程
//custom:用户自定义函数,自定义线程
ThreadVars *tv = TmThreadCreatePacketHandler(tname,
                "packetpool", "packetpool",
                "packetpool", "packetpool",
                "pktacqloop");
//从注册的模块中获取收包模块
tm_module = TmModuleGetByName(recv_mod_name);
//用模块初始化Slot,slot和模块成员基本是一一对应的,主要有:
//线程操作函数:InitFunc(线程初始化)、DeinitFunc(线程销毁)、ExitPrintStatsFunc(退出线程时获取线程状态)
//工作函数:PktAcqLoop(收包slot工作函数)、Management(管理slot工作函数)、Func(其他slot工作函数,如解码、流重组等)
//收包模块独有函数:PktAcqBreakLoop(停止收包)
//slot和模块通过slot->tm_id进行关联:在注销slot时,如果slot是收包slot,通过slot->tm_id找到收包模块,调用模块PktAcqBreakLoop停止收包
TmSlotSetFuncAppend(tv, tm_module, aconf);
//从注册的模块中获取解码模块
tm_module = TmModuleGetByName(decode_mod_name);
TmSlotSetFuncAppend(tv, tm_module, NULL);
//从注册的模块中获取流重组模块
tm_module = TmModuleGetByName("FlowWorker");
TmSlotSetFuncAppend(tv, tm_module, NULL);
//从注册的模块中获取阻断模块
tm_module = TmModuleGetByName("RespondReject");
TmSlotSetFuncAppend(tv, tm_module, NULL);
//创建数据包处理线程,线程函数tv->tm_func,线程参数tv
//在tv->tm_func,会将tv传给每个slot
TmThreadSpawn(tv);

数据包处理线程工作流程

1.工作流程

    ThreadVars *tv = (ThreadVars *)td;
    TmSlot *s = tv->tm_slots;
    //初始化所有slot
    for (slot = s; slot != NULL; slot = slot->slot_next) {
        if (slot->SlotThreadInit != NULL) {
            void *slot_data = NULL;
            //用slot_initdata初始化slot,创建slot_data设置slot->slot_data
            r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
            ......
            (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
        }
    }
    while(run) {
    	//启动第一个slot进行收包,传入slot链表
        r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
        ......
    }
  1. slot初始化函数与工作函数的关系
    初始化函数原型:
/*
* param tv pointer to ThreadVars,即启动模式时创建的ThreadVars,
* param initdata slot初始化参数,只有收包slot传了此参数,其他slot都是NULL,收包slot传入的是网卡信息
* param data pointer 每个slot创建自己的slot数据,如收包slot创建PcapThreadVars(用于保存收包统计信息、网卡信息等)
*/
TmEcode (*SlotThreadInit)(ThreadVars *tv, const void *initdata, void **data);

工作函数原型:

//解码、流程组slot工作函数:
//参数data:slot初始化时创建的slot_data
TmEcode (*Func)(ThreadVars *tv, Packet *p, void *data);
//收包slot工作函数:
//参数data:即初始化创建的slot数据
//参数slot:slot链表
//收包slot从data获取网卡信息,从网卡收到包后,调用TmThreadsSlotProcessPkt开始解码、流重组
TmEcode (*PktAcqLoop)(ThreadVars *tv, void *data, void *slot);
TmEcode (*Management)(ThreadVars *, void *);
  1. TmThreadsSlotVarRun
    收包后调用TmThreadsSlotProcessPkt处理数据包,
    TmThreadsSlotProcessPkt调用TmThreadsSlotVarRun
for (TmSlot *s = slot; s != NULL; s = s->slot_next) {
        TmEcode r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data));
    }

slot_data

  1. 收包slot
    主要存储网卡配置、收包统计等信息
    如下libpcap收包的slot data
typedef struct PcapThreadVars_
{
    /* thread specific handle */
    pcap_t *pcap_handle;
    /* handle state */
    unsigned char pcap_state;
    /* thread specific bpf */
    struct bpf_program filter;
    /* ptr to string from config */
    const char *bpf_filter;

    time_t last_stats_dump;

    /* data link type for the thread */
    int datalink;

    /* counters */
    uint64_t pkts;
    uint64_t bytes;

    uint16_t capture_kernel_packets;
    uint16_t capture_kernel_drops;
    uint16_t capture_kernel_ifdrops;

    ThreadVars *tv;
    TmSlot *slot;

    /** callback result -- set if one of the thread module failed. */
    int cb_result;

    /* pcap buffer size */
    int pcap_buffer_size;
    int pcap_snaplen;

    ChecksumValidationMode checksum_mode;

    LiveDevice *livedev;

    PcapStats64 last_stats64;
} PcapThreadVars;
  1. 解码slot
    主要存储解码的数据包统计信息,比如ipv4包数、ipv6包数、tcp包数、udp包数等。
typedef struct DecodeThreadVars_
{
    /** Specific context for udp protocol detection (here atm) */
    AppLayerThreadCtx *app_tctx;

    /** stats/counters */
    uint16_t counter_pkts;
    uint16_t counter_bytes;
    uint16_t counter_avg_pkt_size;
    uint16_t counter_max_pkt_size;
    uint16_t counter_max_mac_addrs_src;
    uint16_t counter_max_mac_addrs_dst;

    uint16_t counter_invalid;

    uint16_t counter_eth;
    uint16_t counter_chdlc;
    uint16_t counter_ipv4;
    uint16_t counter_ipv6;
    uint16_t counter_tcp;
    uint16_t counter_udp;
    uint16_t counter_icmpv4;
    uint16_t counter_icmpv6;

    uint16_t counter_sll;
    uint16_t counter_raw;
    uint16_t counter_null;
    uint16_t counter_sctp;
    uint16_t counter_ppp;
    uint16_t counter_geneve;
    uint16_t counter_gre;
    uint16_t counter_vlan;
    uint16_t counter_vlan_qinq;
    uint16_t counter_vxlan;
    uint16_t counter_vntag;
    uint16_t counter_ieee8021ah;
    uint16_t counter_pppoe;
    uint16_t counter_teredo;
    uint16_t counter_mpls;
    uint16_t counter_ipv4inipv6;
    uint16_t counter_ipv6inipv6;
    uint16_t counter_erspan;

    /** frag stats - defrag runs in the context of the decoder. */
    uint16_t counter_defrag_ipv4_fragments;
    uint16_t counter_defrag_ipv4_reassembled;
    uint16_t counter_defrag_ipv4_timeouts;
    uint16_t counter_defrag_ipv6_fragments;
    uint16_t counter_defrag_ipv6_reassembled;
    uint16_t counter_defrag_ipv6_timeouts;
    uint16_t counter_defrag_max_hit;

    uint16_t counter_flow_memcap;

    uint16_t counter_flow_tcp;
    uint16_t counter_flow_udp;
    uint16_t counter_flow_icmp4;
    uint16_t counter_flow_icmp6;
    uint16_t counter_flow_tcp_reuse;
    uint16_t counter_flow_get_used;
    uint16_t counter_flow_get_used_eval;
    uint16_t counter_flow_get_used_eval_reject;
    uint16_t counter_flow_get_used_eval_busy;
    uint16_t counter_flow_get_used_failed;

    uint16_t counter_flow_spare_sync;
    uint16_t counter_flow_spare_sync_empty;
    uint16_t counter_flow_spare_sync_incomplete;
    uint16_t counter_flow_spare_sync_avg;

    uint16_t counter_engine_events[DECODE_EVENT_MAX];

    /* thread data for flow logging api: only used at forced
     * flow recycle during lookups */
    void *output_flow_thread_data;

} DecodeThreadVars;
  1. 流重组slot
    主要存储流信息

typedef struct FlowWorkerThreadData_ {
    DecodeThreadVars *dtv;

    union {
        StreamTcpThread *stream_thread;
        void *stream_thread_ptr;
    };

    SC_ATOMIC_DECLARE(DetectEngineThreadCtxPtr, detect_thread);

    void *output_thread; /* Output thread data. */
    void *output_thread_flow; /* Output thread data. */

    uint16_t local_bypass_pkts;
    uint16_t local_bypass_bytes;
    uint16_t both_bypass_pkts;
    uint16_t both_bypass_bytes;

    PacketQueueNoLock pq;
    FlowLookupStruct fls;

    struct {
        uint16_t flows_injected;
        uint16_t flows_removed;
        uint16_t flows_aside_needs_work;
        uint16_t flows_aside_pkt_inject;
    } cnt;

} FlowWorkerThreadData;

FlowWorkerThreadData主要的数据是FlowLookupStruct fls;
如下是FlowLookupStruct结构体:


typedef struct FlowLookupStruct_ // TODO name
{
    /** thread store of spare queues */
    FlowQueuePrivate spare_queue;
    DecodeThreadVars *dtv;
    FlowQueuePrivate work_queue;
    uint32_t emerg_spare_sync_stamp;
} FlowLookupStruct;
风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。