您现在的位置是:首页 >技术教程 >基于Canal实现Mysql数据实时同步到Elasticsearch(Docker版)网站首页技术教程

基于Canal实现Mysql数据实时同步到Elasticsearch(Docker版)

姠惢荇者 2024-06-17 10:22:23
简介基于Canal实现Mysql数据实时同步到Elasticsearch(Docker版)

1、Canal简介

  Canal主要用途是对MySQL数据库增量日志进行解析,提供增量数据的订阅和消费,简单说就是可以对MySQL的增量数据进行实时同步,支持同步到MySQL、Elasticsearch、HBase等数据存储中去。

  Canal会模拟MySQL主库和从库的交互协议,从而伪装成MySQL的从库,然后向MySQL主库发送dump协议,MySQL主库收到dump请求会向canal推送binlog,canal通过解析binlog将数据同步到其他存储中去。
在这里插入图片描述

官方文档:《传送门》

2、基于Docker实现Mysql5.7的安装并开启binlog日志

2.1、Mysql安装
[root@localhost /]# docker pull mysql:5.7

[root@localhost /]# docker run --name mysql5.7 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7
2.2、开启Mysql5.7的binlog配置
#进入docker中的 Mysql5.7
[root@localhost /]# docker exec -it mysql5.7 /bin/bash

#在docker环境内安装vim工具,方便修改文件
bash-4.2# yum install vim

#修改my.cnf配置,修改内容如下:
bash-4.2# vim /etc/my.cnf

#退出docker
bash-4.2# exit

#重启docker mysql
[root@localhost /]# docker restart mysql5.7

  在/etc/my.cnf配置文件中添加如下配置:

[mysqld]
# 打开binlog
log-bin=mysql-bin
# 选择ROW(行)模式
binlog-format=ROW
# 配置MySQL replaction需要定义,不要和canal的slaveId重复
server_id=1

  在Mysql重新启动后,然后可以在客户端中执行show VARIABLES like 'log_bin’如果发现其中的value中为“ON”则说明配置生效了。

在这里插入图片描述

2.3、创建授权用户

  创建Mysql用户,方便后续使用。

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

3、基于docker实现Elasticsearch的安装和运行

#拉去镜像
docker pull elasticsearch:7.12.0

#创建docker容器挂在的目录
mkdir -p /usr/local/soft/es/config
mkdir -p /usr/local/soft/es/data
mkdir -p /usr/local/soft/es/plugins

#配置文件,注意:echo “http.host: 0.0.0.0”;“:”后面有个空格!
echo "http.host: 0.0.0.0" >> /usr/local/soft/es/config/elasticsearch.yml

#创建容器并运行
docker run --name elasticsearch -p 9200:9200  -p 9300:9300 
-e "discovery.type=single-node" 
-e ES_JAVA_OPTS="-Xms84m -Xmx512m" 
-v /usr/local/soft/es/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml 
-v /usr/local/soft/es/data:/usr/share/elasticsearch/data 
-v /usr/local/soft/es/plugins:/usr/share/elasticsearch/plugins 
-d elasticsearch:7.12.0
  • -p 端口映射
  • -e discovery.type=single-node 单点模式启动
  • -e ES_JAVA_OPTS=“-Xms84m -Xmx512m”:设置启动占用的内存范围
  • -v 目录挂载
  • -d 后台运行

  启动后,测试正常启动页面访问http://127.0.0.1:9200,出现如下页面说明启动成功。

在这里插入图片描述

4、安装并配置canal服务

#拉取镜像
docker pull canal/canal-server:v1.1.6
#运行镜像
docker run --name canal -d canal/canal-server:v1.1.6
#找到文件位置后 exit退出容器 将容器内部文件copy到外部
docker cp canal:/home/admin/canal-server/conf/canal.properties /usr/local/soft/canal-conf/
docker cp canal:/home/admin/canal-server/conf/example/instance.properties /usr/local/soft/canal-conf/
#修改配置文件instance.properties 和 canal.properties,实际上只修改了instance.properties的部分配置,canal.properties使用默认配置

#重新运行镜像,运行前,需要把前面运行的容器停止并删除
docker run --name canal -p 11111:11111 -d -v /usr/local/soft/canal-conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties -v /usr/local/soft/canal-conf/canal.properties:/home/admin/canal-server/conf/canal.properties canal/canal-server:v1.1.6

#查看canal服务日志
#仅能看到canal服务的启动日志,比较简单
docker logs canal
#查看数据同步相关日志
#进入canal服务
[root@localhost ~]# docker exec -it canal /bin/bash
#查看日志,数据同步日志
[root@908acdb7f259 canal-server]# cat canal-server/logs/example/example.log

  instance.properties需要修改的配置如下:

#Mysql数据库地址
canal.instance.master.address=192.168.1.236:3306
#初始化时的日志文件名称,可以不设置
canal.instance.master.journal.name=mysql-bin.000002
#初始化时的日志文件的当前位置,可以不设置
canal.instance.master.position=2071
canal.instance.master.timestamp=
canal.instance.master.gtid=

#Mysql数据库用户名和密码,前面创建的Mysql用户
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false

版本问题:在搭建canal服务的时候,我最初使用的是v1.1.6版本,同步数据的时候出现错误,后改成v1.1.5后,正常。

5、安装并配置canal-adapter

#拉取镜像
docker pull slpcat/canal-adapter:v1.1.5
#创建配置文件
mkdir -p /usr/local/soft/canal-adapter/conf/
#创建配置文件,在/usr/local/soft/canal-adapter/conf/目录下创建
touch application.yml
#创建连接数据库文件es7.yml
mkdir -p /usr/local/soft/canal-adapter/conf/es7
touch es7.yml

#启动
docker run --name canal-adapter -p 8081:8081 -v /usr/local/soft/canal-adapter/conf:/opt/canal-adapter/conf  -d slpcat/canal-adapter:v1.1.5

#查看是否报错
docker logs canal-adapter

  application.yml配置文件内容如下:

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
canal.conf:
  mode: tcp
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 192.168.1.236:11111  #canal服务地址
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://192.168.1.236:3306/my_test?useUnicode=true #数据库地址及用户名密码
      username: canal
      password: canal
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es7 # 该版本发现只能是es7/es6
        hosts: 192.168.1.236:9200 # 127.0.0.1:9200 for rest mode,ES链接,使用9200 是mode就需要修改成rest
        properties:
          mode: rest # transport # or rest
          cluster.name: elasticsearch

  es7.yml配置文件,主要实现Mysql表字段与ES索引的对应关系,具体内容如下:

dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  _index: ceshi
  _id: _id
  _type: _doc
  upsert: true
  #  pk: id 
  #SQL 字段映射
  sql: "SELECT 
    a.id AS _id,
    a.username AS username,
    a.age AS age,
    a.test AS test
  from 
    test a "
  #  objFields:
  #    _labels: array:;
  etlCondition: "where a.c_time>='{0}'"     # etl 的条件参数
  commitBatch: 1

  my_test.test表结构:

CREATE TABLE `my_test`.`Untitled`  (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `username` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL,
  `age` varbinary(50) NULL DEFAULT NULL,
  `test` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL,
  `c_time` datetime(0) NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP(0),
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 18 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;

6、其他

  完成上述配置,就完成了Mysql数据实时同步到Elasticsearch的配置,在实际环境中,还需要考虑服务防火墙、查看docker服务运行状态等问题,需要的命令如下:

  Docker命令:

#docker命令
docker ps -a #查看是否启动
docker logs elasticsearch  #启动日志查询,查询什么服务,elasticsearch修改成对应服务名即可
docker restart elasticsearch   #重启
docker exec -it elasticsearch bash #进入docker服务

  Centos7防火墙相关命令:

一、防火墙的开启、关闭、禁用命令

  • 设置开机启用防火墙:systemctl enable firewalld.service
  • 设置开机禁用防火墙:systemctl disable firewalld.service
  • 启动防火墙:systemctl start firewalld
  • 关闭防火墙:systemctl stop firewalld
  • 检查防火墙状态:systemctl status firewalld

二、使用firewall-cmd配置端口

  • 查看防火墙状态:firewall-cmd --state
  • 重新加载配置:firewall-cmd --reload
  • 查看开放的端口:firewall-cmd --list-ports
  • 开启防火墙端口:firewall-cmd --zone=public–add-port=9200/tcp --permanent
  • 关闭防火墙端口:firewall-cmd --zone=public --remove-port=9200/tcp --permanent

命令含义:
–zone #作用域
–add-port=9200/tcp #添加端口,格式为:端口/通讯协议
–permanent #永久生效,没有此参数重启后失效
注意:添加端口后,必须用命令firewall-cmd --reload重新加载一遍才会生效

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。