您现在的位置是:首页 >其他 >【Rust代码实践】在NCEI逐日数据处理当中使用的Rust代码及其数据分享网站首页其他
【Rust代码实践】在NCEI逐日数据处理当中使用的Rust代码及其数据分享
简介【Rust代码实践】在NCEI逐日数据处理当中使用的Rust代码及其数据分享
我的工作和气象没有任何关系,但是对于怀揣着这份热爱希望能交到更多的好朋友,无论是气象分析还是Rust的
这次处理的是NCEI的1929年至2024年的逐日数据,数据量很大,我写了一些代码顺利将数据写入到mysql当中,Rust的学习断断续续,希望各位朋友能批判我工作当中不到位的地方,共同进步!!!
use csv::{ReaderBuilder, StringRecord};
use indicatif::{ProgressBar, ProgressStyle};
use mysql::prelude::*;
use mysql::*;
use std::error::Error;
use std::path::{Path, PathBuf};
use walkdir::WalkDir;
use chrono::NaiveDate;
// 定义一个别名
type StdResult<T> = std::result::Result<T, Box<dyn Error>>;
/// ---------------------------
/// 数据库连接信息
/// ---------------------------
const MYSQL_USER: &str = "";
const MYSQL_PASSWORD: &str = "";
const MYSQL_HOST: &str = "localhost";
const MYSQL_PORT: u16 = 3306;
const MYSQL_DATABASE: &str = "ncei_day_weather_data";
/// ---------------------------
/// CSV 文件中需要的每日气象列 (除去 STATION, LAT, LON, ELEVATION, NAME)
/// 下面将它们存储到更合适的数据库类型 (DATE, DOUBLE, INT, 等)
///
/// 这里列的顺序要和代码里解析逻辑对应
/// ---------------------------
static DAILY_FIELD_NAMES: &[&str] = &[
"date",
"temp",
"temp_attributes",
"dewp",
"dewp_attributes",
"slp",
"slp_attributes",
"stp",
"stp_attributes",
"visib",
"visib_attributes",
"wdsp",
"wdsp_attributes",
"mxspd",
"gust",
"max",
"max_attributes",
"min",
"min_attributes",
"prcp",
"prcp_attributes",
"sndp",
"frshtt",
];
///
/// main 函数
///
fn main() -> StdResult<()> {
// ---------------------------
// 要处理的 CSV 文件所在目录
// ---------------------------
let input_dir = Path::new(r"G:气象数据美国国家环境信息中心(NCEI)逐日气象数据");
let base_url = format!("mysql://{}:{}@{}:{}", MYSQL_USER, MYSQL_PASSWORD, MYSQL_HOST, MYSQL_PORT);
let base_opts = Opts::from_url(&base_url)?;
let mut base_conn = Conn::new(base_opts)?;
create_database_if_not_exists(&mut base_conn, MYSQL_DATABASE)?;
// 连接到具体数据库
let url_with_db = format!(
"mysql://{}:{}@{}:{}/{}",
MYSQL_USER, MYSQL_PASSWORD, MYSQL_HOST, MYSQL_PORT, MYSQL_DATABASE
);
let opts = Opts::from_url(&url_with_db)?;
let pool = Pool::new(opts)?;
let mut conn = pool.get_conn()?;
// 确保站点信息表存在
create_station_info_table_if_not_exists(&mut conn)?;
// 收集所有 .csv 文件路径
let files = collect_csv_files(input_dir);
let total_files = files.len();
println!("共找到 {} 个 CSV 文件,开始处理...", total_files);
// 进度条
let pb = ProgressBar::new(total_files as u64);
pb.set_style(
ProgressStyle::default_bar()
.template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} 文件 ({eta})")
.unwrap()
.progress_chars("#>-"),
);
let mut success_count = 0_usize;
let mut error_count = 0_usize;
for path in files {
let station_code = match path.file_stem() {
Some(os_str) => os_str.to_string_lossy().to_string(),
None => {
eprintln!("无法获取文件名: {}", path.display());
error_count += 1;
pb.inc(1);
continue;
}
};
match process_single_csv(&mut conn, &path, &station_code) {
Ok(_) => success_count += 1,
Err(e) => {
eprintln!("
❌ 处理失败: {},错误: {}", path.display(), e);
error_count += 1;
}
}
pb.inc(1);
}
pb.finish_with_message("所有文件处理结束");
println!("
处理结果:");
println!("✅ 成功: {} 个文件", success_count);
println!("❌ 失败: {} 个文件", error_count);
Ok(())
}
/// ---------------------------
/// 创建数据库(如果不存在)
/// ---------------------------
fn create_database_if_not_exists(conn: &mut Conn, db_name: &str) -> StdResult<()> {
let create_db_sql = format!("CREATE DATABASE IF NOT EXISTS `{}`", db_name);
conn.query_drop(create_db_sql)?;
Ok(())
}
/// ---------------------------
/// station_info 表
/// ---------------------------
fn create_station_info_table_if_not_exists(conn: &mut PooledConn) -> StdResult<()> {
let create_sql = r#"
CREATE TABLE IF NOT EXISTS `station_info` (
`station_code` VARCHAR(20) NOT NULL,
`latitude` DOUBLE DEFAULT NULL,
`longitude` DOUBLE DEFAULT NULL,
`elevation` DOUBLE DEFAULT NULL,
`station_name` VARCHAR(255) DEFAULT NULL,
PRIMARY KEY (`station_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"#;
conn.exec_drop(create_sql, ())?;
Ok(())
}
/// ---------------------------
/// 收集目录下所有 .csv 文件
/// ---------------------------
fn collect_csv_files(input_dir: &Path) -> Vec<PathBuf> {
WalkDir::new(input_dir)
.into_iter()
.filter_map(|e| e.ok()) // 过滤异常
.filter(|e| e.file_type().is_file()) // 只要文件
.filter(|e| {
e.path()
.extension()
.and_then(|ext| ext.to_str())
.map(|s| s.eq_ignore_ascii_case("csv"))
.unwrap_or(false)
})
.map(|e| e.path().to_path_buf())
.collect()
}
/// ---------------------------
/// 处理单个 CSV
/// 1) 抽取/更新站点信息到 station_info
/// 2) 创建 (或升级) weather_data_{station} 表
/// 3) 批量插入每日数据,使用更细的列类型
/// ---------------------------
fn process_single_csv(conn: &mut PooledConn, csv_path: &Path, station_code: &str) -> StdResult<()> {
let mut reader = ReaderBuilder::new().flexible(true).from_path(csv_path)?;
// 读取表头
let headers = reader.headers()?.clone();
if headers.len() < 2 {
return Err(format!("表头列数太少: {}", headers.len()).into());
}
// 全部读入内存(示例做法)
let mut all_records = vec![];
for rec in reader.records() {
let record = rec?;
all_records.push(record);
}
// 先更新 station_info (这里假设第一行足够代表该站点信息)
if !all_records.is_empty() {
let first = &all_records[0];
upsert_station_info(conn, station_code, first)?;
}
// 为该站点创建 / 升级数据表
let table_name = format!("weather_data_{}", station_code);
create_daily_table_if_not_exists(conn, &table_name)?;
// 把每日数据解析为 Rust 结构,然后批量插入
let mut parsed_rows = Vec::new();
for record in &all_records {
if let Some(row) = parse_csv_record_to_daily_row(record) {
parsed_rows.push(row);
} else {
// 说明此行校验/解析失败或属于异常范围,直接跳过
continue;
}
}
batch_insert_daily_rows(conn, &table_name, &parsed_rows)?;
Ok(())
}
/// ---------------------------
/// station_info 插入或更新
/// (ON DUPLICATE KEY UPDATE)
/// ---------------------------
fn upsert_station_info(conn: &mut PooledConn, station_code: &str, rec: &StringRecord) -> StdResult<()> {
// 原先的 CSV 顺序: [0:STATION, 1:DATE, 2:LAT, 3:LON, 4:ELEV, 5:NAME, 6:TEMP...]
let lat = parse_f64_or_null(rec.get(2));
let lon = parse_f64_or_null(rec.get(3));
let elev = parse_f64_or_null(rec.get(4));
let name = rec.get(5).unwrap_or("").trim();
let sql = r#"
INSERT INTO station_info (station_code, latitude, longitude, elevation, station_name)
VALUES (:code, :lat, :lon, :elev, :sname)
ON DUPLICATE KEY UPDATE
latitude = VALUES(latitude),
longitude = VALUES(longitude),
elevation = VALUES(elevation),
station_name = VALUES(station_name)
"#;
conn.exec_drop(
sql,
params! {
"code" => station_code,
"lat" => lat,
"lon" => lon,
"elev" => elev,
"sname" => name,
},
)?;
Ok(())
}
/// ---------------------------
/// 创建每日数据表 (更细分的类型)
/// ---------------------------
fn create_daily_table_if_not_exists(conn: &mut PooledConn, table_name: &str) -> StdResult<()> {
// 下面每个字段都给了更具体的类型:
// date -> DATE
// temp, dewp, slp, stp, visib, wdsp, mxspd, gust, max, min, prcp, sndp -> DOUBLE
// *_attributes -> VARCHAR(50)
// frshtt -> INT
// 可根据实际数据分布再做细调
let create_sql = format!(
r#"
CREATE TABLE IF NOT EXISTS `{table_name}` (
`date` DATE NOT NULL,
`temp` DOUBLE DEFAULT NULL,
`temp_attributes` VARCHAR(50) DEFAULT NULL,
`dewp` DOUBLE DEFAULT NULL,
`dewp_attributes` VARCHAR(50) DEFAULT NULL,
`slp` DOUBLE DEFAULT NULL,
`slp_attributes` VARCHAR(50) DEFAULT NULL,
`stp` DOUBLE DEFAULT NULL,
`stp_attributes` VARCHAR(50) DEFAULT NULL,
`visib` DOUBLE DEFAULT NULL,
`visib_attributes` VARCHAR(50) DEFAULT NULL,
`wdsp` DOUBLE DEFAULT NULL,
`wdsp_attributes` VARCHAR(50) DEFAULT NULL,
`mxspd` DOUBLE DEFAULT NULL,
`gust` DOUBLE DEFAULT NULL,
`max` DOUBLE DEFAULT NULL,
`max_attributes` VARCHAR(50) DEFAULT NULL,
`min` DOUBLE DEFAULT NULL,
`min_attributes` VARCHAR(50) DEFAULT NULL,
`prcp` DOUBLE DEFAULT NULL,
`prcp_attributes` VARCHAR(50) DEFAULT NULL,
`sndp` DOUBLE DEFAULT NULL,
`frshtt` INT DEFAULT NULL,
UNIQUE KEY idx_unique_date(`date`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"#,
);
conn.exec_drop(create_sql, ())?;
Ok(())
}
/// ---------------------------
/// 代表每日气象的解析后结构
/// 注意:对应 DAILY_FIELD_NAMES 的顺序
/// ---------------------------
#[derive(Debug)]
struct DailyRow {
date: NaiveDate, // DATE
temp: Option<f64>, // DOUBLE
temp_attr: Option<String>,
dewp: Option<f64>,
dewp_attr: Option<String>,
slp: Option<f64>,
slp_attr: Option<String>,
stp: Option<f64>,
stp_attr: Option<String>,
visib: Option<f64>,
visib_attr: Option<String>,
wdsp: Option<f64>,
wdsp_attr: Option<String>,
mxspd: Option<f64>,
gust: Option<f64>,
max: Option<f64>,
max_attr: Option<String>,
min: Option<f64>,
min_attr: Option<String>,
prcp: Option<f64>,
prcp_attr: Option<String>,
sndp: Option<f64>,
frshtt: Option<i32>,
}
/// ---------------------------
/// 从 CSV 的一行 StringRecord 中解析出一个 DailyRow
/// 如果解析/校验失败,返回 None
/// ---------------------------
fn parse_csv_record_to_daily_row(rec: &StringRecord) -> Option<DailyRow> {
// 这里注意对应 CSV 的字段顺序:
// 0 STATION
// 1 DATE
// 2 LAT
// 3 LON
// 4 ELEV
// 5 NAME
// 6 TEMP
// 7 TEMP_ATTRIBUTES
// 8 DEWP
// 9 DEWP_ATTRIBUTES
// 10 SLP
// 11 SLP_ATTRIBUTES
// 12 STP
// 13 STP_ATTRIBUTES
// 14 VISIB
// 15 VISIB_ATTRIBUTES
// 16 WDSP
// 17 WDSP_ATTRIBUTES
// 18 MXSPD
// 19 GUST
// 20 MAX
// 21 MAX_ATTRIBUTES
// 22 MIN
// 23 MIN_ATTRIBUTES
// 24 PRCP
// 25 PRCP_ATTRIBUTES
// 26 SNDP
// 27 FRSHTT
//
// 这里要小心索引越界,如果 CSV 某些列缺失,会导致 rec.get(...) = None。
// 为简化,先检查长度,如果不够就返回 None。
if rec.len() < 28 {
eprintln!("行列数不足,无法解析: {:?}", rec);
return None;
}
// 1) 解析 date (第 1 列)
let date_str = rec.get(1).unwrap().trim();
let date_parsed = match parse_date_ymd(date_str) {
Some(d) => d,
None => {
eprintln!("日期格式不符或无法解析: {}", date_str);
return None;
}
};
// 2) 依次解析其他字段
// 对数值进行 parse_f64_or_null,并做简单的范围过滤(可选)
// 也可以把过于离谱的数据直接当 None 或者整行舍弃。
let temp = parse_f64_or_null(rec.get(6));
// 例如,若温度超出 -200 ~ 190 范围,我们视为离谱 -> 跳过整行
if let Some(t) = temp {
if t < -200.0 || t > 190.0 {
eprintln!("温度超出合理范围 ({:.1}), 跳过行: {:?}", t, rec);
return None;
}
}
let temp_attr = parse_str_or_null(rec.get(7));
let dewp = parse_f64_or_null(rec.get(8));
let dewp_attr = parse_str_or_null(rec.get(9));
let slp = parse_f64_or_null(rec.get(10));
let slp_attr = parse_str_or_null(rec.get(11));
let stp = parse_f64_or_null(rec.get(12));
let stp_attr = parse_str_or_null(rec.get(13));
let visib = parse_f64_or_null(rec.get(14));
let visib_attr = parse_str_or_null(rec.get(15));
let wdsp = parse_f64_or_null(rec.get(16));
let wdsp_attr = parse_str_or_null(rec.get(17));
let mxspd = parse_f64_or_null(rec.get(18));
let gust = parse_f64_or_null(rec.get(19));
let max = parse_f64_or_null(rec.get(20));
let max_attr = parse_str_or_null(rec.get(21));
let min = parse_f64_or_null(rec.get(22));
let min_attr = parse_str_or_null(rec.get(23));
let prcp = parse_f64_or_null(rec.get(24));
let prcp_attr = parse_str_or_null(rec.get(25));
let sndp = parse_f64_or_null(rec.get(26));
// 解析 frshtt 为 int
let frshtt = parse_i32_or_null(rec.get(27));
Some(DailyRow {
date: date_parsed,
temp,
temp_attr,
dewp,
dewp_attr,
slp,
slp_attr,
stp,
stp_attr,
visib,
visib_attr,
wdsp,
wdsp_attr,
mxspd,
gust,
max,
max_attr,
min,
min_attr,
prcp,
prcp_attr,
sndp,
frshtt,
})
}
/// ---------------------------
/// 辅助函数:解析日期字符串 (yyyy/MM/dd) -> NaiveDate
/// 也可以加更多格式兼容
/// ---------------------------
fn parse_date_ymd(s: &str) -> Option<NaiveDate> {
let s = s.trim();
if s.is_empty() {
return None;
}
// 尝试用 yyyy-MM-dd
if let Ok(d) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
return Some(d);
}
// 尝试用 yyyy/MM/dd
if let Ok(d) = NaiveDate::parse_from_str(s, "%Y/%m/%d") {
return Some(d);
}
None
}
/// ---------------------------
/// 辅助函数:解析浮点数,失败或空时返回 None
/// ---------------------------
/// 将字符串解析为 f64;若为空字符串、"9999.9" 或解析失败则返回 None
fn parse_f64_or_null(s: Option<&str>) -> Option<f64> {
match s {
Some(txt) => {
let trimmed = txt.trim();
// 如果本身就为空字符串
if trimmed.is_empty() {
return None;
}
// 如果等于 "9999.9",也视为无效 => 返回 None
if trimmed == "9999.9" {
return None;
}
// 正常解析
trimmed.parse::<f64>().ok()
}
None => None,
}
}
/// ---------------------------
/// 辅助函数:解析整型,失败或空时返回 None
/// ---------------------------
fn parse_i32_or_null(s: Option<&str>) -> Option<i32> {
match s {
Some(txt) => {
let trimmed = txt.trim();
if trimmed.is_empty() {
None
} else {
trimmed.parse::<i32>().ok()
}
}
None => None,
}
}
/// ---------------------------
/// 辅助函数:若字符串非空则返回Some,否则None
/// ---------------------------
fn parse_str_or_null(s: Option<&str>) -> Option<String> {
s.map(|v| {
let t = v.trim();
if t.is_empty() {
None
} else {
Some(t.to_string())
}
}).flatten()
}
/// ---------------------------
/// 批量插入解析后的 DailyRow
/// ---------------------------
fn batch_insert_daily_rows(
conn: &mut PooledConn,
table_name: &str,
rows: &[DailyRow],
) -> StdResult<()> {
if rows.is_empty() {
return Ok(());
}
// 构造 INSERT IGNORE 语句
let columns_str = DAILY_FIELD_NAMES.join(", ");
// 生成单行占位符,如 "(?, ?, ?, ...)"
let placeholders_single_row = {
let mut ph = String::new();
for (i, _) in DAILY_FIELD_NAMES.iter().enumerate() {
if i == 0 {
ph.push('?');
} else {
ph.push_str(", ?");
}
}
format!("({})", ph)
};
// 一次插入多少行
let chunk_size = 500;
for chunk in rows.chunks(chunk_size) {
let mut sql = format!("INSERT IGNORE INTO `{}` ({}) VALUES ", table_name, columns_str);
let placeholders_joined = vec![placeholders_single_row.clone(); chunk.len()].join(", ");
sql.push_str(&placeholders_joined);
// 收集所有参数
let mut params: Vec<Value> = Vec::new();
for row in chunk {
// 将 struct 字段一个个转成 Value
// 注意顺序要与 DAILY_FIELD_NAMES 对应
// 1) date (DATE)
params.push(Value::from(row.date.format("%Y-%m-%d").to_string()));
// 2) temp (DOUBLE)
params.push(opt_f64_to_value(row.temp));
// 3) temp_attributes (VARCHAR)
params.push(opt_str_to_value(&row.temp_attr));
// 4) dewp
params.push(opt_f64_to_value(row.dewp));
// 5) dewp_attr
params.push(opt_str_to_value(&row.dewp_attr));
// 6) slp
params.push(opt_f64_to_value(row.slp));
// 7) slp_attr
params.push(opt_str_to_value(&row.slp_attr));
// 8) stp
params.push(opt_f64_to_value(row.stp));
// 9) stp_attr
params.push(opt_str_to_value(&row.stp_attr));
// 10) visib
params.push(opt_f64_to_value(row.visib));
// 11) visib_attr
params.push(opt_str_to_value(&row.visib_attr));
// 12) wdsp
params.push(opt_f64_to_value(row.wdsp));
// 13) wdsp_attr
params.push(opt_str_to_value(&row.wdsp_attr));
// 14) mxspd
params.push(opt_f64_to_value(row.mxspd));
// 15) gust
params.push(opt_f64_to_value(row.gust));
// 16) max
params.push(opt_f64_to_value(row.max));
// 17) max_attr
params.push(opt_str_to_value(&row.max_attr));
// 18) min
params.push(opt_f64_to_value(row.min));
// 19) min_attr
params.push(opt_str_to_value(&row.min_attr));
// 20) prcp
params.push(opt_f64_to_value(row.prcp));
// 21) prcp_attr
params.push(opt_str_to_value(&row.prcp_attr));
// 22) sndp
params.push(opt_f64_to_value(row.sndp));
// 23) frshtt
params.push(opt_i32_to_value(row.frshtt));
}
conn.exec_drop(sql, params)?;
}
Ok(())
}
/// ---------------------------
/// 将 Option<f64> 转为 mysql::Value
/// ---------------------------
fn opt_f64_to_value(opt: Option<f64>) -> Value {
match opt {
Some(v) => Value::from(v),
None => Value::NULL,
}
}
/// ---------------------------
/// 将 Option<i32> 转为 mysql::Value
/// ---------------------------
fn opt_i32_to_value(opt: Option<i32>) -> Value {
match opt {
Some(v) => Value::from(v),
None => Value::NULL,
}
}
/// ---------------------------
/// 将 Option<String> 转为 mysql::Value
/// ---------------------------
fn opt_str_to_value(opt: &Option<String>) -> Value {
match opt {
Some(s) => Value::from(s.as_str()),
None => Value::NULL,
}
}
Cargo.toml
[package]
name = "NCEI_day"
version = "0.1.0"
edition = "2021"
[dependencies]
csv = "1.3.1"
walkdir = "2.5.0"
indicatif = "0.17.11"
mysql = "23.0.1"
chrono = "0.4.39"
这个代码的作用是将逐日数据首先进行简单的判断,数据是否合理,太过于离谱的数据会被筛选掉,另外这个温度应该是华氏度,为了便于理解,我也把代码写上了详细的注释,我把数据库导出为sql,需要的朋友可以自行探索一下
通过网盘分享的文件:ncei_day_weather_data.zip
链接: https://pan.baidu.com/s/17NobkMbMoKPp2pv0jdUdeA?pwd=ca7c
风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。