您现在的位置是:首页 >技术杂谈 >[Kubernetes] - RabbitMQ学习网站首页技术杂谈

[Kubernetes] - RabbitMQ学习

Yuan_xii 2024-07-01 11:59:31
简介[Kubernetes] - RabbitMQ学习

1.消息队列

  • 消息: 在应用间传送的数据
  • 队列,先进先出

1.2. 作用

  • 好处:解耦, 容错,削峰
  • 坏处:降低系统可用性,系统复杂度提高,一致性问题;

RabbitMQ组成部分:生产者,消费者,队列,交换机;

2. 安装部署rabbitmq

---
apiVersion: v1
kind: Secret
metadata:
  name: rabbitmq-secret
  namespace: rabbitmq
data:
  username: YWRtaW4K
  password: MTIzNDU2Cg==
type: Opaque
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: rabbitmq
  namespace: rabbitmq
  labels:
    app: rabbitmq
spec:
  replicas: 1
  selector:
    matchLabels:
      app: rabbitmq
  serviceName: rabbitmq-headless
  template:
    metadata:
      labels:
        app: rabbitmq
    spec:
      containers:
        - name: rabbitmq
          image: registry.cn-hangzhou.aliyuncs.com/yuanli123/rabbitmq:3.9.22-management
          ports:
            - name: tcp-5672
              containerPort: 5672
              protocol: TCP
            - name: tcp-15672
              containerPort: 15672
              protocol: TCP
#              不知道为什么自己使用的username会多出一个回车字符导致rabbitmq无法识别到
#          env:
#            - name: RABBITMQ_DEFAULT_USER
#              valueFrom:
#                secretKeyRef:
#                  name: rabbitmq-secret
#                  key: username
#            - name: RABBITMQ_DEFAULT_PASS
#              valueFrom:
#                secretKeyRef:
#                  name: rabbitmq-secret
#                  key: password
          resources:
            limits:
              cpu: '1'
              memory: '2Gi'
            requests:
              cpu: '200m'
              memory: '500Mi'
      imagePullSecrets:
        - name: regcred

---
apiVersion: v1
kind: Service
metadata:
  name: rabbitmq-headless
  namespace: rabbitmq
  labels:
    app: rabbitmq
spec:
  ports:
    - name: tcp-rabbitmq-5672
      port: 5672
      targetPort: 5672
      nodePort: 32672
  selector:
    app: rabbitmq
  type: NodePort
---
apiVersion: v1
kind: Service
metadata:
  name: rabbitmq-external
  namespace: rabbitmq
  labels:
    app: rabbitmq-external
spec:
  ports:
    - name: http-rabbitmq-external
      protocol: TCP
      port: 15672
      targetPort: 15672
  selector:
    app: rabbitmq
  type: ClusterIP

---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: rabbitmq-ingress
  namespace: rabbitmq
  annotations:
    nginx.ingress.kubernetes.io/rewrite-target: /
spec:
  ingressClassName: nginx
  rules:
    - host: rabbitmq.liyuan.com
      http:
        paths:
          - backend:
              service:
                name: rabbitmq-external
                port:
                  number: 15672
            pathType: Prefix
            path: /

根据上述yaml,再结合修改 /etc/hosts 文件
在这里插入图片描述
通过 http://rabbitmq.liyuan.com:30001/#/exchanges 访问
在这里插入图片描述
并暴露了 192,168.31.175:32672 用于发消息

2.1.名词解释

  • Broker: 接收和分发消息的应用
  • Virtual Host: 虚拟主机,一个Broker可以有多个Virtual Host, 每个Virtual Host都有自己一套的Exchange和Queue
  • Connection: 生产者/消费者和Broker之间的TCP链接
  • Channel: 发送消息的通道,channel是在connection内部建立逻辑链接,AMQP method包含了channel id帮助客户端和message Broker识别Broker,减少建立TCP Connection的开销;
  • Exchange:message到达broker的第一站,根据分发规则,查询表中的routing key,分发消息到queue中去,常用类型有:direct, topic, fanout(multicast)
  • Queue: 存放消息的队列
  • Binding:Exchange和Queue之间的虚拟链接,binding中可以包含routing key,Binding信息被保存到exchange中的查询表中,用于message的分发依据;

3.使用测试

3.1.pom.xml

# pom.yaml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>rqbbitmq-test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.16.0</version>
        </dependency>
    </dependencies>
</project>

3.2.生产者Producer

// Producer.java
package com.liyuan.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.31.175");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setPort(32672);
        try (Connection connection = connectionFactory.newConnection()) {
            Channel channel = connection.createChannel();
            String exchangeName = "xc_exchange_name";
            AMQP.Exchange.DeclareOk exchangeDeclare = channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, null);
            String queueName = "xc_queue_name";
            AMQP.Queue.DeclareOk queueDeclare = channel.queueDeclare(queueName, false, false, false, null);
            channel.queueBind(queueName, exchangeName, queueName);

            String message = "Hello, my name is liyuan.";

            channel.basicPublish(exchangeName, queueName, null, message.getBytes());

            channel.close();
        }

    }
}

3.3.消费者Consumer

package com.liyuan.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.31.175");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setPort(32672);
        try (Connection connection = connectionFactory.newConnection()) {
            Channel channel = connection.createChannel();
            String exchangeName = "xc_exchange_name";
            String queueName = "xc_queue_name";

            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println("Delivered consuming: " + consumerTag + " " + new String(message.getBody()));
                }
            };
            CancelCallback cancelCallback = new CancelCallback() {
                @Override
                public void handle(String consumerTag) throws IOException {
                    System.out.println("Canceled: " + consumerTag);
                }
            };
            channel.basicConsume(queueName, true, deliverCallback, cancelCallback);

            channel.close();
        }
    }
}

4.rabbitmq交换机态度

https://www.bilibili.com/video/BV1Am4y1z7Tu/?p=9&spm_id_from=pageDriver&vd_source=19a7becebd650259d15b703d8e74edef

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。