您现在的位置是:首页 >技术教程 >通过logstash实现mysql与es的双向数据同步网站首页技术教程

通过logstash实现mysql与es的双向数据同步

延锋L 2023-06-10 04:00:02
简介通过logstash实现mysql与es的双向数据同步

参考题目

  1. 一种基于MySQL和Elasticsearch的数据同步方法及系统
  2. 基于MySQL和Elasticsearch的数据同步方法
  3. 一种基于MySQL和Elasticsearch的数据同步系统
  4. 基于MySQL和Elasticsearch的数据同步技术

目录

1【理论调研】

方案1:使用Logstash实现数据同步

方案2:使用Canal实现数据同步

方案3:使用Debezium实现数据同步

使用其他工具

2【使用Logstash实现MySQL和ES之间的双向数据同步】

2.0【MySQL测试数据库sql导入代码】

2.1【Logstash实现MySQL数据同步至ES】

2.2【Logstash实现ES数据同步至MySQL】

2.2.1【Bug记录】

2.2.2【参考文章】


1【理论调研】

实现MySQL和ES的双向数据同步,可以考虑使用以下几种解决方案:

实现MySQL和Elasticsearch(ES)之间的双向数据同步,需要使用一些工具和技术。以下是一些可能的方法:

方案1:使用Logstash实现数据同步

Logstash是一种流处理工具,可以从不同的来源获取数据并将其转换为指定格式输出到目标存储中,它支持从MySQL数据库读取数据,并将数据写入ES中,也可以从ES中读取数据并将数据写入MySQL数据库中。使用Logstash实现MySQL和ES的双向数据同步,可以按照以下步骤进行:

  1. 在MySQL和ES上安装Logstash;
  2. 配置MySQL和ES的连接信息,包括主机地址、端口、用户名、密码等;
  3. 配置Logstash的输入和输出插件,从MySQL中读取数据并写入ES中,同时从ES中读取数据并写入MySQL中;
  4. 启动Logstash并监控同步过程。

Logstash是一个流处理引擎,可以轻松地将数据从MySQL和ES之间传输。使用Logstash,您可以轻松地将MySQL表的数据导入到ES中,也可以将ES中的数据写回MySQL表中。您可以使用以下配置文件将数据从MySQL同步到ES:

input {
  jdbc {
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydatabase"
    jdbc_user => "myuser"
    jdbc_password => "mypassword"
    jdbc_driver_library => "/path/to/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    statement => "SELECT * FROM mytable"
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "myindex"
    document_id => "%{id}"
  }
}

这将从MySQL的“mytable”表中选择所有行,并将它们写入名为“myindex”的ES索引中。

如果您想将ES中的数据写回MySQL表中,您可以使用类似以下的配置文件:

input {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "myindex"
    query => '{"query": {"match_all": {}}}'
    scroll => "5m"
    docinfo => true
  }
}

output {
  jdbc {
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydatabase"
    jdbc_user => "myuser"
    jdbc_password => "mypassword"
    jdbc_driver_library => "/path/to/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    statement => "UPDATE mytable SET myfield = ? WHERE id = ?"
    prepared_statement_bind_values => ["%{myfield}", "%{[@metadata][_id]}"]
  }
}

这将从名为“myindex”的ES索引中选择所有文档,并将它们写回名为“mytable”的MySQL表中。

方案2:使用Canal实现数据同步

Canal是阿里巴巴开源的一款基于数据库增量日志解析,提供增量数据订阅和消费的组件,它支持从MySQL中读取增量数据,并将数据写入ES中,同时支持从ES中读取数据并将数据写入MySQL中。使用Canal实现MySQL和ES的双向数据同步,可以按照以下步骤进行:

  1. 在MySQL和ES上安装Canal;
  2. 配置MySQL和ES的连接信息,包括主机地址、端口、用户名、密码等;
  3. 配置Canal的实例,包括MySQL的binlog信息、ES的索引信息等;
  4. 启动Canal并监控同步过程。

需要注意的是,在使用Logstash或Canal进行数据同步时,可能会出现数据类型不匹配、数据格式错误、数据丢失等问题,需要根据具体情况进行调整和优化。同时,为了确保数据同步的实时性和准确性,可以考虑增加监控和告警机制。

方案3:使用Debezium实现数据同步

Debezium是一个开源的分布式平台,可在数据源和目标之间实现实时数据流。它支持MySQL和ES之间的数据同步,并支持双向同步。使用Debezium,您可以在MySQL和ES之间实时同步数据更改。您可以按照以下步骤使用Debezium进行双向数据同步:

  • 下载并安装Debezium
  • 配置Debezium以监视MySQL表的更改
  • 配置Debezium以将更改写入ES
  • 配置Debezium以监视ES的更改
  • 配置Debezium以将更改写回MySQL

使用其他工具

除了Logstash和Debezium之外,还有一些其他工具可用于MySQL和ES之间的数据同步。例如,您可以使用StreamSets Data Collector或Apache Nifi来将数据从MySQL导入到ES,并将数据从ES写回MySQL。您还可以编写自己的脚本来执行此操作。无论您选择哪种方法,确保您的同步逻辑能够处理。

2【使用Logstash实现MySQL和ES之间的双向数据同步】

软件版本:

  1. logstash -f ../config/newsManager/mysql2es.conf
  2. logstash -f ../config/newsManager/es2mysql.conf

2.0【MySQL测试数据库sql导入代码】

  1. MySQL数据库名称:news_manager
  2. MySQL数据库版本:5.5.40
/*
SQLyog Ultimate v12.09 (64 bit)
MySQL - 5.5.40 : Database - news_manager
*********************************************************************
*/


/*!40101 SET NAMES utf8 */;

/*!40101 SET SQL_MODE=''*/;

/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
CREATE DATABASE /*!32312 IF NOT EXISTS*/`news_manager` /*!40100 DEFAULT CHARACTER SET utf8 */;

USE `news_manager`;

/*Table structure for table `item_user` */

DROP TABLE IF EXISTS `item_user`;

CREATE TABLE `item_user` (
  `item_user_id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` int(11) DEFAULT NULL,
  `item_id` int(11) DEFAULT NULL,
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  `status` int(11) DEFAULT '1' COMMENT '1:启用 0:禁用',
  PRIMARY KEY (`item_user_id`),
  KEY `FK_Reference_2` (`user_id`),
  KEY `FK_Reference_3` (`item_id`),
  CONSTRAINT `FK_Reference_2` FOREIGN KEY (`user_id`) REFERENCES `user_info` (`user_id`),
  CONSTRAINT `FK_Reference_3` FOREIGN KEY (`item_id`) REFERENCES `news_item` (`item_id`)
) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_hungarian_ci;

/*Data for the table `item_user` */

insert  into `item_user`(`item_user_id`,`user_id`,`item_id`,`create_time`,`update_time`,`status`) values (1,1,2,'2020-11-23 11:24:16','2020-11-25 10:27:54',1),(2,2,4,NULL,'2020-11-25 09:38:17',1),(3,1,1,'2020-11-24 09:19:58','2020-11-25 09:38:21',1),(5,1,18,NULL,'2020-11-25 09:44:16',1),(6,1,27,'2020-11-25 11:11:35','2020-11-25 11:11:35',1),(7,1,28,'2020-11-25 11:17:59','2020-11-25 11:17:59',1),(8,1,29,'2020-11-25 11:29:14','2020-11-25 11:29:14',1),(9,1,30,'2020-11-25 11:30:54','2020-11-25 11:30:54',1),(10,1,31,'2020-11-25 11:36:51','2020-11-25 11:36:51',1),(11,1,32,'2020-11-25 16:26:23','2020-11-25 16:26:23',1),(12,1,33,'2020-11-25 16:26:37','2020-11-25 16:26:37',1),(13,1,34,'2020-11-26 10:01:29','2020-11-26 10:01:29',1),(14,1,35,'2020-11-26 10:28:53','2020-11-26 10:28:53',1);

/*Table structure for table `logs_info` */

DROP TABLE IF EXISTS `logs_info`;

CREATE TABLE `logs_info` (
  `logs_id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` int(11) DEFAULT NULL,
  `logs_content` text COLLATE utf8mb4_hungarian_ci,
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  PRIMARY KEY (`logs_id`),
  KEY `FK_Reference_1` (`user_id`),
  CONSTRAINT `FK_Reference_1` FOREIGN KEY (`user_id`) REFERENCES `user_info` (`user_id`)
) ENGINE=InnoDB AUTO_INCREMENT=16 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_hungarian_ci;

/*Data for the table `logs_info` */

insert  into `logs_info`(`logs_id`,`user_id`,`logs_content`,`create_time`,`update_time`) values (1,1,NULL,NULL,'2020-11-24 09:27:05'),(2,2,NULL,NULL,'2020-11-24 09:27:12'),(3,4,NULL,NULL,'2020-11-23 11:29:06'),(14,1,'woshishenren','2020-11-24 09:24:52','2020-11-24 09:24:52'),(15,1,'woshishenren','2020-11-24 09:25:58','2020-11-24 09:25:58');

/*Table structure for table `news_info` */

DROP TABLE IF EXISTS `news_info`;

CREATE TABLE `news_info` (
  `new_id` int(11) NOT NULL AUTO_INCREMENT,
  `item_id` int(11) DEFAULT NULL,
  `news_title` varchar(255) COLLATE utf8mb4_hungarian_ci NOT NULL,
  `news_image` varchar(255) COLLATE utf8mb4_hungarian_ci DEFAULT NULL,
  `news_content` text COLLATE utf8mb4_hungarian_ci,
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  PRIMARY KEY (`new_id`),
  KEY `FK_Reference_4` (`item_id`),
  CONSTRAINT `FK_Reference_4` FOREIGN KEY (`item_id`) REFERENCES `news_item` (`item_id`)
) ENGINE=InnoDB AUTO_INCREMENT=20 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_hungarian_ci;

/*Data for the table `news_info` */

insert  into `news_info`(`new_id`,`item_id`,`news_title`,`news_image`,`news_content`,`create_time`,`update_time`) values (1,2,'蓝桥杯比赛',NULL,NULL,NULL,'2020-11-23 09:27:39'),(2,3,'新学期学费',NULL,NULL,NULL,'2020-11-23 09:28:10'),(3,1,'拔河比赛',NULL,'拔河比赛要使劲!!!','2020-11-25 14:57:28','2020-11-25 14:57:32'),(4,18,'街舞比赛',NULL,'一起摇摆~','2020-11-25 15:54:09','2020-11-25 15:54:11'),(10,27,'数学建模',NULL,'一起加油!','2020-11-25 16:10:02','2020-11-25 22:17:19'),(11,29,'班班唱',NULL,'《走向复兴》','2020-11-25 16:12:23','2020-11-25 16:12:23'),(12,1,'篮球比赛',NULL,'冲冲冲~','2020-11-25 16:13:04','2020-11-25 16:13:04'),(13,1,'NECCS',NULL,'冲呀~','2020-11-25 16:27:22','2020-11-26 08:38:03'),(14,18,'卓见杯',NULL,'啦啦啦~','2020-11-25 17:41:32','2020-11-25 22:17:56'),(15,33,'动则升阳',NULL,'年轻不养生,年老养医生!','2020-11-26 00:12:42','2020-11-26 00:12:42'),(16,33,'11月26日',NULL,'筑基修士','2020-11-26 10:02:20','2020-11-26 10:02:20'),(17,35,'大家好',NULL,'333','2020-11-26 10:29:35','2020-11-26 10:29:35'),(18,35,'大家好!!!',NULL,'333','2020-11-26 10:29:45','2020-11-26 10:29:45'),(19,35,'我是新增数据!',NULL,'我是新增数据!','2023-03-15 16:43:01','2023-03-15 16:43:02');

/*Table structure for table `news_item` */

DROP TABLE IF EXISTS `news_item`;

CREATE TABLE `news_item` (
  `item_id` int(11) NOT NULL AUTO_INCREMENT,
  `item_name` varchar(255) COLLATE utf8mb4_hungarian_ci NOT NULL,
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  `status` int(11) DEFAULT '1' COMMENT '1:启用 0:禁用',
  PRIMARY KEY (`item_id`)
) ENGINE=InnoDB AUTO_INCREMENT=36 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_hungarian_ci;

/*Data for the table `news_item` */

insert  into `news_item`(`item_id`,`item_name`,`create_time`,`update_time`,`status`) values (1,'呵呵哒','2020-11-24 15:47:00','2020-11-26 10:28:39',1),(2,'党支部','2020-11-24 15:47:03','2020-11-25 14:44:31',0),(3,'分团委','2020-11-24 15:47:05','2020-11-25 14:43:54',1),(4,'院团委','2020-11-24 15:47:08','2020-11-25 14:44:38',1),(5,'111','2020-11-23 15:22:54','2020-11-25 14:45:55',1),(6,'学生会','2020-11-24 09:27:36','2020-11-25 14:46:01',1),(8,'党支部','2020-11-24 13:51:13','2020-11-25 14:46:07',1),(18,'党支部','2020-11-25 09:11:51','2020-11-25 15:49:06',1),(19,'院团委','2020-11-25 10:42:54','2020-11-25 14:46:16',1),(20,'111','2020-11-25 10:54:12','2020-11-25 14:46:19',1),(21,'学生会','2020-11-25 10:56:21','2020-11-25 14:46:35',1),(22,'党支部','2020-11-25 10:57:35','2020-11-25 14:46:43',1),(23,'分团委','2020-11-25 11:00:20','2020-11-25 14:46:48',1),(24,'院团委','2020-11-25 11:00:47','2020-11-25 14:46:55',1),(25,'qweqwe','2020-11-25 11:01:37','2020-11-25 11:01:37',1),(26,'eqweqweqwe','2020-11-25 11:01:53','2020-11-25 11:01:53',1),(27,'分团委','2020-11-25 11:11:35','2020-11-25 15:49:18',1),(28,'sadsads','2020-11-25 11:17:59','2020-11-25 11:18:40',0),(29,'院团委','2020-11-25 11:29:13','2020-11-25 15:49:25',1),(30,'789','2020-11-25 11:30:54','2020-11-25 11:37:19',0),(31,'zyk','2020-11-25 11:36:51','2020-11-25 11:37:19',0),(32,'委员会','2020-11-25 16:26:23','2020-11-26 08:37:48',0),(33,'委员会~~~','2020-11-25 16:26:37','2020-11-26 10:01:40',1),(34,'演示~','2020-11-26 10:01:29','2020-11-26 10:01:33',0),(35,'筑基修士!!!修改!','2020-11-26 10:28:53','2023-03-15 16:44:39',1);

/*Table structure for table `user_info` */

DROP TABLE IF EXISTS `user_info`;

CREATE TABLE `user_info` (
  `user_id` int(11) NOT NULL AUTO_INCREMENT,
  `user_name` varchar(255) COLLATE utf8mb4_hungarian_ci NOT NULL,
  `user_pwd` varchar(255) COLLATE utf8mb4_hungarian_ci NOT NULL,
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  `status` int(11) DEFAULT '1' COMMENT '1:启用 0:禁用',
  PRIMARY KEY (`user_id`)
) ENGINE=InnoDB AUTO_INCREMENT=121 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_hungarian_ci;

/*Data for the table `user_info` */

insert  into `user_info`(`user_id`,`user_name`,`user_pwd`,`create_time`,`update_time`,`status`) values (1,'宋书航','1','2020-11-23 09:30:16','2020-11-25 22:16:25',1),(2,'雨柔子','1','2020-11-23 11:25:41','2020-11-25 22:16:25',1),(4,'王五','1','2020-11-23 11:25:58','2020-11-25 22:16:26',0),(5,'赵柳','1','2020-11-23 11:26:12','2020-11-25 22:16:26',0),(8,'田七','1','2020-11-23 11:26:29','2020-11-25 22:16:27',0),(9,'田七','1','2020-11-23 15:03:23','2020-11-25 22:16:28',0),(10,'田七','1','2020-11-23 15:03:43','2020-11-25 22:16:28',0),(11,'戴沐白','1','2020-11-24 10:45:06','2020-11-25 22:16:29',1),(12,'张小凡','1','2020-11-24 10:45:29','2020-11-25 22:16:29',1),(13,'userName2','1','2020-11-24 10:45:29','2020-11-25 22:16:30',0),(15,'碧瑶','1','2020-11-24 10:45:29','2020-11-25 22:16:31',1),(16,'赵恋凡','1','2020-11-24 10:45:29','2020-11-25 22:16:31',1),(17,'李长寿','1','2020-11-24 10:45:29','2020-11-25 22:16:32',1),(18,'蓝梦娥','1','2020-11-24 10:45:29','2020-11-25 22:16:33',1),(22,'路明非','123456','2020-11-24 10:45:29','2020-11-25 17:44:31',1),(23,'楚子航','123456','2020-11-24 10:45:29','2020-11-25 22:14:26',1),(33,'乔微尼','123456','2020-11-24 10:45:29','2020-11-25 23:05:48',1),(97,'userName86','123456','2020-11-24 10:45:32','2020-11-24 10:45:32',1),(98,'userName87','123456','2020-11-24 10:45:32','2020-11-24 10:45:32',1),(99,'userName88','123456','2020-11-24 10:45:32','2020-11-24 10:45:32',1),(100,'userName89','123456','2020-11-24 10:45:32','2020-11-24 10:45:32',1),(101,'2020年好运来~','123456','2020-11-24 10:45:32','2020-11-26 10:01:06',1),(102,'333','123456','2020-11-24 10:45:32','2020-11-26 10:28:14',1),(103,'666','888','2020-11-24 10:45:32','2020-11-26 10:28:26',1),(104,'userName93','123456','2020-11-24 10:45:32','2020-11-26 10:00:54',0),(105,'userName94','123456','2020-11-24 10:45:32','2020-11-26 10:00:47',0),(106,'userName95','123456','2020-11-24 10:45:32','2020-11-26 10:00:47',0),(107,'userName96','123456','2020-11-24 10:45:32','2020-11-26 10:00:47',0),(108,'userName97','123456','2020-11-24 10:45:32','2020-11-26 10:00:47',0),(109,'userName98','123456','2020-11-24 10:45:32','2020-11-25 10:12:58',0),(110,'userName99','123456','2020-11-24 10:45:32','2020-11-25 10:12:58',0),(111,'userName100','123456','2020-11-24 10:45:33','2020-11-25 10:12:58',0),(115,'萧潜','1','2020-11-25 23:00:16','2020-11-25 23:00:16',1),(116,'演示视频','haha','2020-11-26 10:00:33','2020-11-26 10:27:37',0),(117,'啦啦啦','1','2020-11-26 10:27:14','2020-11-26 10:27:37',0),(118,'演示视频','222','2020-11-26 10:27:23','2020-11-26 10:27:37',0),(119,'实训小组hyy','111','2020-11-26 14:37:14','2020-11-26 14:37:14',1),(120,'我是新增数据!修改!','1111','2023-04-18 20:38:41','2023-04-18 20:48:13',1);

/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

2.1【Logstash实现MySQL数据同步至ES】

先启动es,在启动logstash。

大数据周会-本周学习内容总结06【Linux启动ELK步骤】

input {
	stdin {
    }

    jdbc { # 01
		# 配置MySQL数据库链接,变量为数据库名
		jdbc_connection_string => "jdbc:mysql://1.2.3.4:3306/news_manager"
		# 配置MySQL数据库用户名和密码
		jdbc_user => "root"
		jdbc_password => "root"
		# MySQL驱动jar包存放位置
		jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.31.jar"
		# MySQL驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql,文件路径+名称:statement_filepath
		# statement_filepath => ""
		# 要执行的sql语句
		statement => "select * from item_user"
		# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "item_user"
	}

    jdbc { # 02
		# 配置MySQL数据库链接,变量为数据库名
		jdbc_connection_string => "jdbc:mysql://1.2.3.4:3306/news_manager"
		# 配置MySQL数据库用户名和密码
		jdbc_user => "root"
		jdbc_password => "root"
		# MySQL驱动jar包存放位置
		jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.31.jar"
		# MySQL驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql,文件路径+名称:statement_filepath
		# statement_filepath => ""
		# 要执行的sql语句
		statement => "select * from logs_info"
		# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "logs_info"
	}

    jdbc { # 03
		# 配置MySQL数据库链接,变量为数据库名
		jdbc_connection_string => "jdbc:mysql://1.2.3.4:3306/news_manager"
		# 配置MySQL数据库用户名和密码
		jdbc_user => "root"
		jdbc_password => "root"
		# MySQL驱动jar包存放位置
		jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.31.jar"
		# MySQL驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql,文件路径+名称:statement_filepath
		# statement_filepath => ""
		# 要执行的sql语句
		statement => "select * from news_item"
		# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "news_item"
	}

    jdbc { # 04
		# 配置MySQL数据库链接,变量为数据库名
		jdbc_connection_string => "jdbc:mysql://1.2.3.4:3306/news_manager"
		# 配置MySQL数据库用户名和密码
		jdbc_user => "root"
		jdbc_password => "root"
		# MySQL驱动jar包存放位置
		jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.31.jar"
		# MySQL驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql,文件路径+名称:statement_filepath
		# statement_filepath => ""
		# 要执行的sql语句
		statement => "select * from news_info"
		# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "news_info"
	}

    jdbc { # 05
		# 配置MySQL数据库链接,变量为数据库名
		jdbc_connection_string => "jdbc:mysql://1.2.3.4:3306/news_manager"
		# 配置MySQL数据库用户名和密码
		jdbc_user => "root"
		jdbc_password => "root"
		# MySQL驱动jar包存放位置
		jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.31.jar"
		# MySQL驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql,文件路径+名称:statement_filepath
		# statement_filepath => ""
		# 要执行的sql语句
		statement => "select * from user_info"
		# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "user_info"
	}
}

filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}

output {
	if[type] == "item_user" { # 01
		elasticsearch {
			hosts => ["1.2.3.4:9200"]
			index => "test_item_user"
			# document_id => "%{id}"
		}
	}

	if[type] == "logs_info" { # 02
		elasticsearch {
			hosts => ["1.2.3.4:9200"]
			index => "test_logs_info"
			# document_id => "%{id}"
		}
	}

	if[type] == "news_item" { # 03
		elasticsearch {
			hosts => ["1.2.3.4:9200"]
			index => "test_news_item"
			# document_id => "%{id}"
		}
	}

	if[type] == "news_info" { # 04
		elasticsearch {
			hosts => ["1.2.3.4:9200"]
			index => "test_news_info"
			# document_id => "%{id}"
		}
	}

	if[type] == "user_info" { # 05
		elasticsearch {
			hosts => ["1.2.3.4:9200"]
			index => "test_user_info"
			# document_id => "%{id}"
		}
	}

    stdout {
        codec => json_lines
    }
}

2.2【Logstash实现ES数据同步至MySQL】

  1. logstash-plugin install --no-verify logstash-output-jdbc   # Logstash安装插件logstash-output-jdbc
  2. logstash-plugin list   # 查看Logstash已安装的插件

【es与mysql双向同步-通过logstash将es同步至mysql】功能已实现,但是只进行了简单测试。问题包括但不限于:中文乱码、时间戳字段插入错误等。

input {
	elasticsearch {
		hosts => ["hadoop100:9200"]
		index => "test_user_info"
		query => '{ "query": { "match_all": {} } }'
		schedule => "* * * * *"
	}
}

output {
	jdbc {
		driver_jar_path => "/opt/jar/mysql-connector-java-5.1.31.jar"
		driver_class => "com.mysql.jdbc.Driver"
		# user => "root"
		# password => "root"
		# "jdbc:mysql://xxx.xxx.xxx.xxx:xxxx/douyin?autoReconnect=true&user=xxxx@xxxx&password=xxxxx"
		connection_string => "jdbc:mysql://1.2.3.4:3306/school_matriculate?autoReconnect=true&user=root&password=root&useSSL=false&serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8"
		# statement => ["insert into tb_videos(md5, Id, view, timestamp) values(?,?,?,?)","[md5]", "[Id]", "[view]", "[timestamp]"]
		# statement => ["INSERT INTO user_info (user_name, user_pwd, create_time, update_time, status) VALUES (?, ?, ?, ?, ?)", "[user_name]", "[user_pwd]", "[create_time]", "[update_time]", "[status]"]
		statement => ["INSERT INTO user_info (user_name, user_pwd) VALUES (?, ?)", "[user_name]", "[user_pwd]"]
	}
}

2.2.1【Bug记录】

Unknown setting 'jdbc_user' for jdbc

[2023-04-19T20:27:23,656][ERROR][logstash.outputs.jdbc    ] Unknown setting 'prepared_statement_bind_values' for jdbc

[2023-04-19T20:27:23,656][ERROR][logstash.outputs.jdbc    ] Unknown setting 'jdbc_password' for jdbc

[2023-04-19T20:27:23,656][ERROR][logstash.outputs.jdbc    ] Unknown setting 'jdbc_driver_library' for jdbc

[2023-04-19T20:27:23,656][ERROR][logstash.outputs.jdbc    ] Unknown setting 'jdbc_connection_string' for jdbc

[2023-04-19T20:27:23,656][ERROR][logstash.outputs.jdbc    ] Unknown setting 'jdbc_driver_class' for jdbc

[2023-04-19T20:27:23,669][ERROR][logstash.agent           ] Failed to execute action

java.sql.SQLException: Access denied for user ''@'upward' (using password: NO)

        at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1094) ~[mysql-connector-java-5.1.31.jar:?]

[2023-04-19T21:48:53,751][ERROR][com.zaxxer.hikari.pool.HikariPool][main] HikariPool-1 - Exception during pool initialization.

com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Could not create connection to database server. Attempted reconnect 3 times. Giving up.

2.2.2【参考文章】

  1. Java:Logstash如何安装插件logstash-output-jdbc_netyeaxi的博客-CSDN博客
  2. logstash的logstash-output-jdbc插件安装_logstash output jdbc_&捕风的汉子&的博客-CSDN博客
  3. logstash-output-jdbc使用
  4. https://github.com/theangryangel/logstash-output-jdbc

  5. https://www.elastic.co/guide/en/logstash/current/index.html

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。