MySQL配置
开启MySQL的binlog写入,并指定binlog为ROW模式
my.cnf
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1
添加Canal使用的用户
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
Canal配置
安装Canal
先把默认配置信息复制出来
mkdir -p /opt/volume/canal
docker run --name canal --rm -d canal/canal-server
docker cp canal:/home/admin/canal-server/conf /opt/volume/canal
docker stop canal
配置文件
/opt/volume/canal/example/instance.properties
canal.instance.master.address=192.168.88.171:3306
# binlog日志名
canal.instance.master.journal.name=
# binlog起始偏移量
canal.instance.master.position=
# binlog起始时间戳
canal.instance.master.timestamp=
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 监听所有表,指定库写法为db\..*
canal.instance.filter.regex=.*\\..*
# 路由键
canal.mq.topic=example
查看binlog信息
show master status;
/opt/volume/canal/canal.properties
# 关联到rabbitMQ配置
canal.serverMode = rabbitMQ
rabbitmq.host =192.168.88.141
rabbitmq.virtual.host = /
rabbitmq.exchange = canalExchange
rabbitmq.username = root
rabbitmq.password = rabbitmqroot
rabbitmq.deliveryMode = fanout
启动Canal
docker run -p 11111:11111 --name canal \
--restart always \
-v /etc/timezone:/etc/timezone:ro \
-v /etc/localtime:/etc/localtime:ro \
-v /opt/volume/canal/conf:/home/admin/canal-server/conf \
-d canal/canal-server
RabbitMQ 配置
创建名为canalExchage的交换机
创建名为canalQueue的队列
绑定交换机和队列
测试队列消息
在数据库里执行一些语句,查看对应队列已有新的消息
消费队列消息
Maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
RabbitMQ配置
spring:
rabbitmq:
host: 192.168.88.141
port: 5672
username: root
password: rabbitmqroot
virtual-host: /
listener:
simple:
acknowledge-mode: manual
消费队列消息,将数据同步到Elasticsearch
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
</dependency>
@Component
public class CanalMessageConsumer {
private static final String MOVIES_INDEX_NAME = "movies";
@SneakyThrows
@RabbitListener(queues = "canalQueue")
public void movieItemsConsumer(@Payload String message, @Header(value = AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) {
JSONObject jsonObject = JSON.parseObject(message);
Boolean isDdl = jsonObject.getBoolean("isDdl");
if(!isDdl) {
String type = jsonObject.getString("type");
if ("INSERT".equals(type) || "UPDATE".equals(type)) {
RestClient restClient = RestClient.builder(new HttpHost("192.168.88.161", 9200)).build();
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
ElasticsearchTransport elasticsearchTransport = new RestClientTransport(restClient,
new JacksonJsonpMapper(objectMapper));
ElasticsearchClient elasticsearchClient = new ElasticsearchClient(elasticsearchTransport);
String table = jsonObject.getString("table");
if ("t_movie".equals(table)) {
JSONArray dataArray = jsonObject.getJSONArray("data");
for (int i = 0; i < dataArray.size(); i++) {
Movie movie = dataArray.getObject(i, Movie.class);
Integer id = movie.getId();
SearchResponse<Movie> searchResponse = elasticsearchClient
.search(builder -> builder.index(MOVIES_INDEX_NAME)
.query(queryBuilder -> queryBuilder
.constantScore(constantScoreQueryBuilder -> constantScoreQueryBuilder
.filter(filterQueryBuilder -> filterQueryBuilder
.term(termQueryBuilder -> termQueryBuilder
.field("id").value(id))))), Movie.class);
HitsMetadata<Movie> hits = searchResponse.hits();
TotalHits totalHits = hits.total();
long value = 0;
if (totalHits != null) {
value = totalHits.value();
}
if (value == 0) {
Snowflake snowflake = IdUtil.getSnowflake();
elasticsearchClient.create(builder -> builder
.index(MOVIES_INDEX_NAME).id(snowflake.nextIdStr()).document(movie));
} else {
List<Hit<Movie>> hitList = hits.hits();
for(Hit<Movie> hit: hitList) {
Movie source = hit.source();
if (source != null) {
movie.setCelebrityId(source.getCelebrityId());
movie.setCelebrityName(source.getCelebrityName());
movie.setMovieCelebrityType(source.getMovieCelebrityType());
movie.setCelebrityChineseName(source.getCelebrityChineseName());
String hitId = hit.id();
elasticsearchClient.update(builder -> builder
.index(MOVIES_INDEX_NAME).id(hitId).doc(movie), Movie.class);
}
}
}
}
}
elasticsearchClient.shutdown();
}
}
channel.basicAck(deliveryTag, true);
}
}
PREVIOUSaxios模块化使用
NEXT缓存模式与实现