您现在的位置是:首页 >学无止境 >FLASHDB的tsdb时序数据库代码分析网站首页学无止境

FLASHDB的tsdb时序数据库代码分析

书中倦客 2024-06-17 10:22:23
简介FLASHDB的tsdb时序数据库代码分析

1、背景

好好看看FlashDB,在使用过程中读取时存在不太好用的地方,因此深入学习下FlashDB的TSDB的相关代码,看看如何解决工程中的问题。主要我每次取的数据是有上限的,而读取API不能读取条数,都是采用迭代方式,不适合我的项目工程。看看采集什么方式比较好,如果有大牛有好方法,请在评论区里指点下。

1.1 参考资料

FlashDB嵌入式数据库之TSDB数据存储解析_¥风笛¥的博客-CSDN博客

typedef struct fdb_db *fdb_db_t;
struct fdb_db {
    const char *name;                            /**< database name */
    fdb_db_type type;                            /**< database type */
    union {
#ifdef FDB_USING_FAL_MODE
        const struct fal_partition *part;        /**< flash partition for saving database */
#endif
#ifdef FDB_USING_FILE_MODE
        const char *dir;                         /**< directory path for saving database */
#endif
    } storage;
    uint32_t sec_size;                           /**< flash section size. It's a multiple of block size */
    uint32_t max_size;                           /**< database max size. It's a multiple of section size */
    bool init_ok;                                /**< initialized successfully */
    bool file_mode;                              /**< is file mode, default is false */
    bool not_formatable;                         /**< is can NOT be formated mode, default is false */
#ifdef FDB_USING_FILE_MODE
#if defined(FDB_USING_FILE_POSIX_MODE)
    int cur_file;                                /**< current file object */
#elif defined(FDB_USING_FILE_LIBC_MODE)
    FILE *cur_file;                              /**< current file object */
#endif
    uint32_t cur_sec;                            /**< current operate sector address  */
#endif
    void (*lock)(fdb_db_t db);                   /**< lock the database operate */
    void (*unlock)(fdb_db_t db);                 /**< unlock the database operate */

    void *user_data;
};

2、时间序列数据库分析

时间序列数据库三个重要结构体,分别为时间序数据库对象、时间序列数据库扇区信息、时间序列数据节点索引日志。

/* TSDB structure 时间序列数据库对象结构体*/
struct fdb_tsdb {
    struct fdb_db parent;                        /**< inherit from fdb_db */
    struct tsdb_sec_info cur_sec;                /**< current using sector 当前正在使用的扇区信息*/
    fdb_time_t last_time;                        /**< last TSL timestamp 最新数据时间戳*/
    fdb_get_time get_time;                       /**< the current timestamp get function */
    size_t max_len;                              /**< the maximum length of each log */
    uint32_t oldest_addr;                        /**< the oldest sector start address */
    bool rollover;                               /**< the oldest data will rollover by newest data, default is true */

    void *user_data;
};
typedef struct fdb_tsdb *fdb_tsdb_t;


/* TSDB section information 时间序列数据库扇区信息结构体*/
struct tsdb_sec_info {
    bool check_ok;                               /**< sector header check is OK */
    fdb_sector_store_status_t status;            /**< sector store status 第几个状态了@see fdb_sector_store_status_t */
    uint32_t addr;                               /**< sector start address */
    uint32_t magic;                              /**< magic word(`T`, `S`, `L`, `0`) */
    fdb_time_t start_time;                       /**< the first start node's timestamp, 0xFFFFFFFF: unused */
    fdb_time_t end_time;                         /**< the last end node's timestamp, 0xFFFFFFFF: unused */
    uint32_t end_idx;                            /**< the last end node's index, 0xFFFFFFFF: unused */
    fdb_tsl_status_t end_info_stat[2];           /**< the last end node's info status */
    size_t remain;                               /**< remain size */
    uint32_t empty_idx;                          /**< the next empty node index address */
    uint32_t empty_data;                         /**< the next empty node's data end address */
};
typedef struct tsdb_sec_info *tsdb_sec_info_t;
 
/*扇区头数据*/
struct sector_hdr_data {
    uint8_t status[FDB_STORE_STATUS_TABLE_SIZE]; /**< sector store status存储的状态4个,倒过来读取,看看最新到哪个状态了 @see fdb_sector_store_status_t */
    uint32_t magic;                              /**< magic word(`T`, `S`, `L`, `0`) */
    fdb_time_t start_time;                       /**< the first start node's timestamp */
    struct {
        fdb_time_t time;                         /**< the last end node's timestamp */
        uint32_t index;                          /**< the last end node's index */
        uint8_t status[TSL_STATUS_TABLE_SIZE];   /**< end node status, @see fdb_tsl_status_t */
    } end_info[2];
    uint32_t reserved;
};
typedef struct sector_hdr_data *sector_hdr_data_t;

/* time series log node index data 时间序列日志节点数据*/
struct log_idx_data {
    uint8_t status_table[TSL_STATUS_TABLE_SIZE]; /**< node status, @see fdb_tsl_status_t */
    fdb_time_t time;                             /**< node timestamp */
    uint32_t log_len;                            /**< node total length (header + name + value), must align by FDB_WRITE_GRAN */
    uint32_t log_addr;                           /**< node address */
};
typedef struct log_idx_data *log_idx_data_t;

 对于一个扇区的数据,前40个字节存放扇区信息--struct sector_hdr_data ,接着是存放节点log信息---struct log_idx_data,数据信息从扇区的结尾开始存放---长度由对应的存放节点log信息中的长度tsl.log_len确定。节点log信息向后依次存放,节点数据信息从扇区尾向前依次存放。

 主要研究数据的存储或读取。首先看存储数据

2.1 存储数据

fdb_err_t fdb_tsl_append(fdb_tsdb_t db, fdb_blob_t blob)

最重要的调用 tsl_append

fdb_err_t tsl_append(fdb_tsdb_t db, fdb_blob_t blob)
{
    fdb_err_t result = FDB_NO_ERR;
    fdb_time_t cur_time = db->get_time();

    FDB_ASSERT(blob->size <= db->max_len);

    /* check the current timestamp, MUST more than the last save timestamp */
    if (cur_time < db->last_time) {
        FDB_INFO("Warning: current timestamp (%" PRIdMAX ") is less than the last save timestamp (%" PRIdMAX "). This tsl will be dropped.
",
                (intmax_t )cur_time, (intmax_t )(db->last_time));
        return FDB_WRITE_ERR;
    }

    result = update_sec_status(db, &db->cur_sec, blob, cur_time);  //扇区信息更新
    if (result != FDB_NO_ERR) {
        return result;
    }

    /* write the TSL node */
    result = write_tsl(db, blob, cur_time);            //写时间序列数据
    if (result != FDB_NO_ERR) {
        return result;
    }

   //更新DB中关于当前扇区结束node的信息和其他信息
    /* recalculate the current using sector info */    
    db->cur_sec.end_idx = db->cur_sec.empty_idx;
    db->cur_sec.end_time = cur_time;
    db->cur_sec.empty_idx += LOG_IDX_DATA_SIZE;
    db->cur_sec.empty_data -= FDB_WG_ALIGN(blob->size);
    db->cur_sec.remain -= LOG_IDX_DATA_SIZE + FDB_WG_ALIGN(blob->size);
    db->last_time = cur_time;  //更新最新数据时间戳

    return result;
}

时间序列数据库扇区信息对于扇区状态为

FDB_SECTOR_STORE_EMPTY:

        在更新扇区状态的函数中更新数据状态位FDB_SECTOR_STORE_USING,将flash中相关信息也更新一下,包括status_table[0]和start_time两个字段。

FDB_SECTOR_STORE_FULL:

        直接返回FDB_SAVED_FULL,表示扇区满了。

FDB_SECTOR_STORE_USING:

        扇区正在使用且剩余空间不足(小于LOG_IDX_DATA_SIZE+存储数据的大小),存储结束node的索引号和时间戳

static fdb_err_t update_sec_status(fdb_tsdb_t db, tsdb_sec_info_t sector, fdb_blob_t blob, fdb_time_t cur_time)

 result = update_sec_status(db, &db->cur_sec, blob, cur_time);

---》

uint8_t status[FDB_STORE_STATUS_TABLE_SIZE];

......

 _FDB_WRITE_STATUS(db, sector->addr, status, FDB_SECTOR_STORE_STATUS_NUM, FDB_SECTOR_STORE_USING, true);

 /* save the start timestamp */
        FLASH_WRITE(db, sector->addr + SECTOR_START_TIME_OFFSET, (uint32_t *)&cur_time, sizeof(fdb_time_t), true);

-----》

fdb_err_t _fdb_write_status(fdb_db_t db, uint32_t addr, uint8_t status_table[], size_t status_num, size_t status_index, bool sync)

....

 result = _fdb_flash_write(db, addr + byte_index, (uint32_t *) &status_table[byte_index], FDB_WRITE_GRAN / 8, sync); //实际上就是status[0] = 0x00;

2.1 读取数据

enum fdb_tsl_status {
    FDB_TSL_UNUSED,
    FDB_TSL_PRE_WRITE,
    FDB_TSL_WRITE,
    FDB_TSL_USER_STATUS1,
    FDB_TSL_DELETED,
    FDB_TSL_USER_STATUS2,
#define FDB_TSL_STATUS_NUM                       6
};
typedef enum fdb_tsl_status fdb_tsl_status_t;

/* time series log node object */
struct fdb_tsl {
    fdb_tsl_status_t status;                     /**< 节点状态node status, @see fdb_log_status_t */
    fdb_time_t time;                             /**< 节点时间戳node timestamp */
    uint32_t log_len;                            /**< 节点长度,必须对齐log length, must align by FDB_WRITE_GRAN */
    struct {
        uint32_t index;                          /**< node索引的地址node index address */
        uint32_t log;                            /**< log数据地址log data address */
    } addr;                                      /*地址信息*/
};
typedef struct fdb_tsl *fdb_tsl_t;

读取数据,从分析fdb_tsl_iter_by_time开始

/**
 * The TSDB iterator for each TSL by timestamp.
 *
 * @param db database object
 * @param from starting timestamp. It will be a reverse iterator when ending timestamp less than starting timestamp
 * @param to ending timestamp
 * @param cb callback
 * @param arg callback argument
 */
void fdb_tsl_iter_by_time(fdb_tsdb_t db, fdb_time_t from, fdb_time_t to, fdb_tsl_cb cb, void *cb_arg)
{
    struct tsdb_sec_info sector;
    uint32_t sec_addr, start_addr, traversed_len = 0;
    struct fdb_tsl tsl;
    bool found_start_tsl = false;
   //定义了两个函数指针
    uint32_t (*get_sector_addr)(fdb_tsdb_t , tsdb_sec_info_t , uint32_t);
    uint32_t (*get_tsl_addr)(tsdb_sec_info_t , fdb_tsl_t);

    if (!db_init_ok(db)) {
        FDB_INFO("Error: TSL (%s) isn't initialize OK.
", db_name(db));
    }
    /*正序的*/
    if(from <= to) {
        start_addr = db->oldest_addr;  //开始就是最老数据
        get_sector_addr = get_next_sector_addr;    //函数指针赋值  下一个扇区
        get_tsl_addr = get_next_tsl_addr;    //函数指针赋值  下一个时间序列log地址
    } else {  //倒序
        start_addr = db->cur_sec.addr;    //开始就是最新的数据
        get_sector_addr = get_last_sector_addr;   //函数指针赋值--上一个扇区
        get_tsl_addr = get_last_tsl_addr;        //函数指针赋值---上一个时间序列log地址
    }

//    FDB_INFO("from %s", ctime((const time_t * )&from));
//    FDB_INFO("to %s", ctime((const time_t * )&to));

    if (cb == NULL) {
        return;
    }

    sec_addr = start_addr;   /*扇区内起始地址*/
    db_lock(db);
    /* search all sectors */
    do {
        traversed_len += db_sec_size(db); //加上一个扇区的大小
        /*根据提供的扇区地址获取扇区信息,填充到sector中去*/
        if (read_sector_info(db, sec_addr, &sector, false) != FDB_NO_ERR) {
            continue;
        }
        /* sector has TSL */
        if ((sector.status == FDB_SECTOR_STORE_USING || sector.status == FDB_SECTOR_STORE_FULL)) {
            if (sector.status == FDB_SECTOR_STORE_USING) {
                /* copy the current using sector status  */
                sector = db->cur_sec;
            }
            if ((found_start_tsl)
                    || (!found_start_tsl &&
                            ((from <= to && ((sec_addr == start_addr && from <= sector.start_time) || from <= sector.end_time)) ||
                             (from > to  && ((sec_addr == start_addr && from >= sector.end_time) || from >= sector.start_time)))
                             )) {
                //第一个节点索引地址                                索引的最后一个地址
                uint32_t start = sector.addr + SECTOR_HDR_DATA_SIZE, end = sector.end_idx;

                found_start_tsl = true;
                /* search the first start TSL address */
                tsl.addr.index = search_start_tsl_addr(db, start, end, from, to);
                /* search all TSL */
                do {
                    //读取节点索引log
                    read_tsl(db, &tsl);
                    if (tsl.status != FDB_TSL_UNUSED) {
                            //争取
                        if ((from <= to && tsl.time >= from && tsl.time <= to)
                                || (from > to && tsl.time <= from && tsl.time >= to)) {
                            /* iterator is interrupted when callback return true */
                            if (cb(&tsl, cb_arg)) {
                                goto __exit;
                            }
                        } else {
                            goto __exit;
                        }
                    }
                } while ((tsl.addr.index = get_tsl_addr(&sector, &tsl)) != FAILED_ADDR);
            }
        } else if (sector.status == FDB_SECTOR_STORE_EMPTY) {
            goto __exit;
        }
    } while ((sec_addr = get_sector_addr(db, &sector, traversed_len)) != FAILED_ADDR);

__exit:
    db_unlock(db);
}

 重点来看search_start_tsl_addr这个查找函数


/*
 * Found the matched TSL address.
 */
static int search_start_tsl_addr(fdb_tsdb_t db, int start, int end, fdb_time_t from, fdb_time_t to)
{
    struct fdb_tsl tsl;
    while (true) {
        /*二分法*/
        tsl.addr.index = start + FDB_ALIGN((end - start) / 2, LOG_IDX_DATA_SIZE);
        read_tsl(db, &tsl);
        if (tsl.time < from) {
            start = tsl.addr.index + LOG_IDX_DATA_SIZE;
        } else if (tsl.time > from) {
            end = tsl.addr.index - LOG_IDX_DATA_SIZE;
        } else {
            return tsl.addr.index;
        }

        if (start > end) {
            if (from > to) {
                tsl.addr.index = start;
                read_tsl(db, &tsl);
                if (tsl.time > from) {
                    start -= LOG_IDX_DATA_SIZE;
                }
            }
            break;
        }
    }
    return start;
}

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。