您现在的位置是:首页 >技术教程 >SpringBoot整合RabbitMQ实现RPC远程调用功能网站首页技术教程
SpringBoot整合RabbitMQ实现RPC远程调用功能
简介SpringBoot整合RabbitMQ实现RPC远程调用功能
在分布式系统中,RPC(Remote Procedure Call)是一种常用的通信机制,它可以让不同的节点之间像调用本地函数一样进行函数调用,隐藏了底层的网络通信细节。通过本教程,你可以了解RPC的基本原理以及如何使用Java实现一个简单的RPC客户端和服务端。
1. 交互过程
- 启动 RPC 服务端和客户端,创建连接和通道。
- 声明请求队列和回复队列,确保使用相同的队列名称。
- 客户端发送请求:客户端将请求消息发送到指定的
请求队列
中。 - 服务端监听请求队列:服务端在指定的
请求队列
上监听请求消息。 - 服务端接收请求:服务端接收到客户端发送的请求消息。
- 服务端处理请求:服务端根据请求消息中的参数,执行相应的业务逻辑,并得到处理结果。
- 服务端发送响应:服务端将处理结果作为响应消息发送到客户端指定的
回复队列
中。 - 客户端监听响应队列:客户端在指定的
回复队列
上监听响应消息。 - 客户端接收响应:客户端接收到服务端发送的响应消息。
- 客户端处理响应:客户端根据响应消息中的结果进行相应的处理。
2. 导入依赖
创建一个SpringBoot项目并导入依赖坐标
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.10.RELEASE</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.3.10.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.3.10.RELEASE</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.12.0</version>
</dependency>
</dependencies>
application.yml
spring:
rabbitmq:
host: localhost # 主机名
port: 5672 # 端口
username: guest # 用户名
password: guest # 密码
virtual-host: /
template:
receive-timeout: 2000
reply-timeout: 2000
listener:
simple:
concurrency: 1
max-concurrency: 3
prefetch: 1 # 消费者每次只能预取1条数据到内存并处理,默认为250条
acknowledge-mode: manual # 确定机制 manual:手动确认
publisher-returns: true
publisher-confirm-type: correlated
注意:需要提前开启
RabbitMQ服务
,否则项目运行会报错
3. RPC 服务端
首先,我们来看一下RPC服务端的代码。
package com.rabbit.rpc;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* rpc服务端
*/
public class RPCServer {
// 定义请求队列常量
private final static String REQUEST_QUEUE_NAME = "rpc_queue";
// 定义回复队列常量
private final static String REPLY_QUEUE_NAME = "rpc_reply_queue";
/**
* 服务端启动入口
*/
public static void main(String[] args) {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
//factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明请求队列和回复队列
channel.queueDeclare(REQUEST_QUEUE_NAME, false, false, false, null);
channel.queueDeclare(REPLY_QUEUE_NAME, false, false, false, null);
// 每次仅接收一条未经确认的消息
channel.basicQos(1);
// 构建消费者属性
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 获取消息
String message = new String(body, StandardCharsets.UTF_8);
// 进行业务处理构建响应数据 (这里做字符串拼接模拟响应数据)
String response = message + ":::";
// 构造响应基本属性
AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
.correlationId(properties.getCorrelationId()) // 设置关联id
.build();
// 发送响应数据到回复队列 (rpc_reply_queue)
channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes(StandardCharsets.UTF_8));
// 手动回执消息确认消费
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 消费rpc_queue队列的消息
channel.basicConsume(REQUEST_QUEUE_NAME, false, consumer);
// 持续监听请求消息
while (true) {
Thread.sleep(50);
}
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
}
}
}
在RPC服务端的代码中,我们首先创建了一个连接和一个通道,然后声明了请求队列
和回复队列
。我们设置每一次只接收一条未确认的消息,并创建了一个消费者对象,用于处理接收到的消息。
在handleDelivery
方法中,我们从消息中获取请求数据,并进行业务处理,然后构造响应数据并发送到回复队列。最后,我们手动确认消费,并继续监听请求消息。
4. RPC 客户端
接下来,我们看一下RPC客户端的代码。
package com.rabbit.rpc;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
/**
* rpc客户端
*/
public class RPCClient {
// 定义请求队列常量
private final static String REQUEST_QUEUE_NAME = "rpc_queue";
// 定义回复队列常量
private final static String REPLY_QUEUE_NAME = "rpc_reply_queue";
/**
* 客户端启动入口
*/
public static void main(String[] args) {
System.out.println("Response1: " + call("Hello, RPC Server1"));
System.out.println("Response2: " + call("Hello, RPC Server2"));
System.out.println("Response3: " + call("Hello, RPC Server3"));
}
/**
* 发送请求到队列并返回响应数据
*
* @param message 请求消息
* @return 响应数据
*/
public static String call(String message) {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
//factory.setHost("localhost");
String result = "";
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明请求队列和回复队列
channel.queueDeclare(REQUEST_QUEUE_NAME, false, false, false, null);
channel.queueDeclare(REPLY_QUEUE_NAME, false, false, false, null);
// 每次仅接收一条未经确认的消息
channel.basicQos(1);
// 生成关联id
String correlationId = UUID.randomUUID().toString();
// 构造请求基本属性
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.correlationId(correlationId) // 设置关联id
.replyTo(REPLY_QUEUE_NAME) // 设置回复队列 (rpc_reply_queue)
.build();
// 发送请求消息到rpc_queue队列
channel.basicPublish("", REQUEST_QUEUE_NAME, props, message.getBytes(StandardCharsets.UTF_8));
// 用于保存响应消息的阻塞队列
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
// 监听回复队列接收响应消息
String consumerTag = channel.basicConsume(REPLY_QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
// 通过关联id获取到响应数据
if (properties.getCorrelationId().equals(correlationId)) {
response.offer(new String(body, StandardCharsets.UTF_8));
}
}
});
// 清空阻塞队列
response.clear();
// 等待接收响应消息
result = response.take();
// 取消消费者的监听
channel.basicCancel(consumerTag);
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
}
return result;
}
}
在RPC客户端的代码中,我们同样创建了一个连接和一个通信,并声明了请求队列
和回复队列
。然后,我们设置每次只接收一条未确认的消息,并创建了一个唯一的关联id。
在call
方法中,我们构建了请求消息的基础属性,并将请求消息发送到请求队列
。接下来,我们创建了一个阻塞队列
,用于保存response
响应消息。
通过监听回复队列
接收响应消息,并通过关联id接收到对应的响应数据,然后将其放入response
队列中。最后,通过阻塞队列
接收响应消息,并返回响应数据。
5. 运行代码
现在,我们可以运行RPC服务端和客户端的代码了。首先运行服务端代码,它会启动一个监听请求的进程。然后,运行客户端代码,它会发送请求消息并等待接收响应消息。
最终会看到客户端发出了三次请求,并打印了对应的响应数据。
项目地址:项目Git地址
风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。