您现在的位置是:首页 >学无止境 >FLASHDB的tsdb时序数据库代码分析网站首页学无止境
FLASHDB的tsdb时序数据库代码分析
1、背景
好好看看FlashDB,在使用过程中读取时存在不太好用的地方,因此深入学习下FlashDB的TSDB的相关代码,看看如何解决工程中的问题。主要我每次取的数据是有上限的,而读取API不能读取条数,都是采用迭代方式,不适合我的项目工程。看看采集什么方式比较好,如果有大牛有好方法,请在评论区里指点下。
1.1 参考资料
FlashDB嵌入式数据库之TSDB数据存储解析_¥风笛¥的博客-CSDN博客
typedef struct fdb_db *fdb_db_t;
struct fdb_db {
const char *name; /**< database name */
fdb_db_type type; /**< database type */
union {
#ifdef FDB_USING_FAL_MODE
const struct fal_partition *part; /**< flash partition for saving database */
#endif
#ifdef FDB_USING_FILE_MODE
const char *dir; /**< directory path for saving database */
#endif
} storage;
uint32_t sec_size; /**< flash section size. It's a multiple of block size */
uint32_t max_size; /**< database max size. It's a multiple of section size */
bool init_ok; /**< initialized successfully */
bool file_mode; /**< is file mode, default is false */
bool not_formatable; /**< is can NOT be formated mode, default is false */
#ifdef FDB_USING_FILE_MODE
#if defined(FDB_USING_FILE_POSIX_MODE)
int cur_file; /**< current file object */
#elif defined(FDB_USING_FILE_LIBC_MODE)
FILE *cur_file; /**< current file object */
#endif
uint32_t cur_sec; /**< current operate sector address */
#endif
void (*lock)(fdb_db_t db); /**< lock the database operate */
void (*unlock)(fdb_db_t db); /**< unlock the database operate */
void *user_data;
};
2、时间序列数据库分析
时间序列数据库三个重要结构体,分别为时间序数据库对象、时间序列数据库扇区信息、时间序列数据节点索引日志。
/* TSDB structure 时间序列数据库对象结构体*/
struct fdb_tsdb {
struct fdb_db parent; /**< inherit from fdb_db */
struct tsdb_sec_info cur_sec; /**< current using sector 当前正在使用的扇区信息*/
fdb_time_t last_time; /**< last TSL timestamp 最新数据时间戳*/
fdb_get_time get_time; /**< the current timestamp get function */
size_t max_len; /**< the maximum length of each log */
uint32_t oldest_addr; /**< the oldest sector start address */
bool rollover; /**< the oldest data will rollover by newest data, default is true */
void *user_data;
};
typedef struct fdb_tsdb *fdb_tsdb_t;
/* TSDB section information 时间序列数据库扇区信息结构体*/
struct tsdb_sec_info {
bool check_ok; /**< sector header check is OK */
fdb_sector_store_status_t status; /**< sector store status 第几个状态了@see fdb_sector_store_status_t */
uint32_t addr; /**< sector start address */
uint32_t magic; /**< magic word(`T`, `S`, `L`, `0`) */
fdb_time_t start_time; /**< the first start node's timestamp, 0xFFFFFFFF: unused */
fdb_time_t end_time; /**< the last end node's timestamp, 0xFFFFFFFF: unused */
uint32_t end_idx; /**< the last end node's index, 0xFFFFFFFF: unused */
fdb_tsl_status_t end_info_stat[2]; /**< the last end node's info status */
size_t remain; /**< remain size */
uint32_t empty_idx; /**< the next empty node index address */
uint32_t empty_data; /**< the next empty node's data end address */
};
typedef struct tsdb_sec_info *tsdb_sec_info_t;
/*扇区头数据*/
struct sector_hdr_data {
uint8_t status[FDB_STORE_STATUS_TABLE_SIZE]; /**< sector store status存储的状态4个,倒过来读取,看看最新到哪个状态了 @see fdb_sector_store_status_t */
uint32_t magic; /**< magic word(`T`, `S`, `L`, `0`) */
fdb_time_t start_time; /**< the first start node's timestamp */
struct {
fdb_time_t time; /**< the last end node's timestamp */
uint32_t index; /**< the last end node's index */
uint8_t status[TSL_STATUS_TABLE_SIZE]; /**< end node status, @see fdb_tsl_status_t */
} end_info[2];
uint32_t reserved;
};
typedef struct sector_hdr_data *sector_hdr_data_t;
/* time series log node index data 时间序列日志节点数据*/
struct log_idx_data {
uint8_t status_table[TSL_STATUS_TABLE_SIZE]; /**< node status, @see fdb_tsl_status_t */
fdb_time_t time; /**< node timestamp */
uint32_t log_len; /**< node total length (header + name + value), must align by FDB_WRITE_GRAN */
uint32_t log_addr; /**< node address */
};
typedef struct log_idx_data *log_idx_data_t;
对于一个扇区的数据,前40个字节存放扇区信息--struct sector_hdr_data ,接着是存放节点log信息---struct log_idx_data,数据信息从扇区的结尾开始存放---长度由对应的存放节点log信息中的长度tsl.log_len确定。节点log信息向后依次存放,节点数据信息从扇区尾向前依次存放。
主要研究数据的存储或读取。首先看存储数据
2.1 存储数据
fdb_err_t fdb_tsl_append(fdb_tsdb_t db, fdb_blob_t blob)
最重要的调用 tsl_append
fdb_err_t tsl_append(fdb_tsdb_t db, fdb_blob_t blob)
{
fdb_err_t result = FDB_NO_ERR;
fdb_time_t cur_time = db->get_time();
FDB_ASSERT(blob->size <= db->max_len);
/* check the current timestamp, MUST more than the last save timestamp */
if (cur_time < db->last_time) {
FDB_INFO("Warning: current timestamp (%" PRIdMAX ") is less than the last save timestamp (%" PRIdMAX "). This tsl will be dropped.
",
(intmax_t )cur_time, (intmax_t )(db->last_time));
return FDB_WRITE_ERR;
}
result = update_sec_status(db, &db->cur_sec, blob, cur_time); //扇区信息更新
if (result != FDB_NO_ERR) {
return result;
}
/* write the TSL node */
result = write_tsl(db, blob, cur_time); //写时间序列数据
if (result != FDB_NO_ERR) {
return result;
}
//更新DB中关于当前扇区结束node的信息和其他信息
/* recalculate the current using sector info */
db->cur_sec.end_idx = db->cur_sec.empty_idx;
db->cur_sec.end_time = cur_time;
db->cur_sec.empty_idx += LOG_IDX_DATA_SIZE;
db->cur_sec.empty_data -= FDB_WG_ALIGN(blob->size);
db->cur_sec.remain -= LOG_IDX_DATA_SIZE + FDB_WG_ALIGN(blob->size);
db->last_time = cur_time; //更新最新数据时间戳
return result;
}
时间序列数据库扇区信息对于扇区状态为
FDB_SECTOR_STORE_EMPTY:
在更新扇区状态的函数中更新数据状态位FDB_SECTOR_STORE_USING,将flash中相关信息也更新一下,包括status_table[0]和start_time两个字段。
FDB_SECTOR_STORE_FULL:
直接返回FDB_SAVED_FULL,表示扇区满了。
FDB_SECTOR_STORE_USING:
扇区正在使用且剩余空间不足(小于LOG_IDX_DATA_SIZE+存储数据的大小),存储结束node的索引号和时间戳
static fdb_err_t update_sec_status(fdb_tsdb_t db, tsdb_sec_info_t sector, fdb_blob_t blob, fdb_time_t cur_time)
result = update_sec_status(db, &db->cur_sec, blob, cur_time);
---》
uint8_t status[FDB_STORE_STATUS_TABLE_SIZE];
......
_FDB_WRITE_STATUS(db, sector->addr, status, FDB_SECTOR_STORE_STATUS_NUM, FDB_SECTOR_STORE_USING, true);
/* save the start timestamp */
FLASH_WRITE(db, sector->addr + SECTOR_START_TIME_OFFSET, (uint32_t *)&cur_time, sizeof(fdb_time_t), true);-----》
fdb_err_t _fdb_write_status(fdb_db_t db, uint32_t addr, uint8_t status_table[], size_t status_num, size_t status_index, bool sync)
....
result = _fdb_flash_write(db, addr + byte_index, (uint32_t *) &status_table[byte_index], FDB_WRITE_GRAN / 8, sync); //实际上就是status[0] = 0x00;
2.1 读取数据
enum fdb_tsl_status {
FDB_TSL_UNUSED,
FDB_TSL_PRE_WRITE,
FDB_TSL_WRITE,
FDB_TSL_USER_STATUS1,
FDB_TSL_DELETED,
FDB_TSL_USER_STATUS2,
#define FDB_TSL_STATUS_NUM 6
};
typedef enum fdb_tsl_status fdb_tsl_status_t;
/* time series log node object */
struct fdb_tsl {
fdb_tsl_status_t status; /**< 节点状态node status, @see fdb_log_status_t */
fdb_time_t time; /**< 节点时间戳node timestamp */
uint32_t log_len; /**< 节点长度,必须对齐log length, must align by FDB_WRITE_GRAN */
struct {
uint32_t index; /**< node索引的地址node index address */
uint32_t log; /**< log数据地址log data address */
} addr; /*地址信息*/
};
typedef struct fdb_tsl *fdb_tsl_t;
读取数据,从分析fdb_tsl_iter_by_time开始
/**
* The TSDB iterator for each TSL by timestamp.
*
* @param db database object
* @param from starting timestamp. It will be a reverse iterator when ending timestamp less than starting timestamp
* @param to ending timestamp
* @param cb callback
* @param arg callback argument
*/
void fdb_tsl_iter_by_time(fdb_tsdb_t db, fdb_time_t from, fdb_time_t to, fdb_tsl_cb cb, void *cb_arg)
{
struct tsdb_sec_info sector;
uint32_t sec_addr, start_addr, traversed_len = 0;
struct fdb_tsl tsl;
bool found_start_tsl = false;
//定义了两个函数指针
uint32_t (*get_sector_addr)(fdb_tsdb_t , tsdb_sec_info_t , uint32_t);
uint32_t (*get_tsl_addr)(tsdb_sec_info_t , fdb_tsl_t);
if (!db_init_ok(db)) {
FDB_INFO("Error: TSL (%s) isn't initialize OK.
", db_name(db));
}
/*正序的*/
if(from <= to) {
start_addr = db->oldest_addr; //开始就是最老数据
get_sector_addr = get_next_sector_addr; //函数指针赋值 下一个扇区
get_tsl_addr = get_next_tsl_addr; //函数指针赋值 下一个时间序列log地址
} else { //倒序
start_addr = db->cur_sec.addr; //开始就是最新的数据
get_sector_addr = get_last_sector_addr; //函数指针赋值--上一个扇区
get_tsl_addr = get_last_tsl_addr; //函数指针赋值---上一个时间序列log地址
}
// FDB_INFO("from %s", ctime((const time_t * )&from));
// FDB_INFO("to %s", ctime((const time_t * )&to));
if (cb == NULL) {
return;
}
sec_addr = start_addr; /*扇区内起始地址*/
db_lock(db);
/* search all sectors */
do {
traversed_len += db_sec_size(db); //加上一个扇区的大小
/*根据提供的扇区地址获取扇区信息,填充到sector中去*/
if (read_sector_info(db, sec_addr, §or, false) != FDB_NO_ERR) {
continue;
}
/* sector has TSL */
if ((sector.status == FDB_SECTOR_STORE_USING || sector.status == FDB_SECTOR_STORE_FULL)) {
if (sector.status == FDB_SECTOR_STORE_USING) {
/* copy the current using sector status */
sector = db->cur_sec;
}
if ((found_start_tsl)
|| (!found_start_tsl &&
((from <= to && ((sec_addr == start_addr && from <= sector.start_time) || from <= sector.end_time)) ||
(from > to && ((sec_addr == start_addr && from >= sector.end_time) || from >= sector.start_time)))
)) {
//第一个节点索引地址 索引的最后一个地址
uint32_t start = sector.addr + SECTOR_HDR_DATA_SIZE, end = sector.end_idx;
found_start_tsl = true;
/* search the first start TSL address */
tsl.addr.index = search_start_tsl_addr(db, start, end, from, to);
/* search all TSL */
do {
//读取节点索引log
read_tsl(db, &tsl);
if (tsl.status != FDB_TSL_UNUSED) {
//争取
if ((from <= to && tsl.time >= from && tsl.time <= to)
|| (from > to && tsl.time <= from && tsl.time >= to)) {
/* iterator is interrupted when callback return true */
if (cb(&tsl, cb_arg)) {
goto __exit;
}
} else {
goto __exit;
}
}
} while ((tsl.addr.index = get_tsl_addr(§or, &tsl)) != FAILED_ADDR);
}
} else if (sector.status == FDB_SECTOR_STORE_EMPTY) {
goto __exit;
}
} while ((sec_addr = get_sector_addr(db, §or, traversed_len)) != FAILED_ADDR);
__exit:
db_unlock(db);
}
重点来看search_start_tsl_addr这个查找函数
/*
* Found the matched TSL address.
*/
static int search_start_tsl_addr(fdb_tsdb_t db, int start, int end, fdb_time_t from, fdb_time_t to)
{
struct fdb_tsl tsl;
while (true) {
/*二分法*/
tsl.addr.index = start + FDB_ALIGN((end - start) / 2, LOG_IDX_DATA_SIZE);
read_tsl(db, &tsl);
if (tsl.time < from) {
start = tsl.addr.index + LOG_IDX_DATA_SIZE;
} else if (tsl.time > from) {
end = tsl.addr.index - LOG_IDX_DATA_SIZE;
} else {
return tsl.addr.index;
}
if (start > end) {
if (from > to) {
tsl.addr.index = start;
read_tsl(db, &tsl);
if (tsl.time > from) {
start -= LOG_IDX_DATA_SIZE;
}
}
break;
}
}
return start;
}