您现在的位置是:首页 >技术教程 >Golang Gin框架mqtt消费者网站首页技术教程

Golang Gin框架mqtt消费者

幸运黒锦鲤 2025-02-15 12:01:04
简介Golang Gin框架mqtt消费者

这里主要是展示一个非常简单的Gin框架下的mqtt消费者,在保持启动后持续轮训消费。简单实用是主旨。

viper全局调用yaml文件中的数据

redisClient 缓存客户端
doJYService 业务逻辑代码

package main

import (
	"fmt"
	"github.com/gin-gonic/gin"
	"github.com/spf13/viper"
	"jieyu-gin/mqtt-service/config"
	"jieyu-gin/mqtt-service/doJYService"
	"jieyu-gin/mqtt-service/internal/redisClient"
)

func main() {
	router := gin.Default()
	gin.SetMode(gin.ReleaseMode)

	if err := config.InitConfig(); err != nil {
		fmt.Printf("Failed to initialize config: %v
", err)
		return
	}

	redisClient.InitRedisClient()

	doJYService.BatchProcessing()

	go func() {
		router.Run(viper.GetString("app.port"))
	}()
}

func BatchProcessing() {} 方法中 for{}中处理数据。

package doJYService

import (
	"fmt"
	mqtt "github.com/eclipse/paho.mqtt.golang"
	"jieyu-gin/mqtt-service/internal/database"
	"jieyu-gin/mqtt-service/internal/log"
	"jieyu-gin/mqtt-service/internal/redisClient"
	"time"
)

var broker = "127.0.0.1" //你的broker ip 服务器消费地址
var port = 1883
var userName = "jieyu_mqtt"
var passwd = "jieyu_mqtt_1688"
var topic = "jieyu_online"

var messageRecHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
	fmt.Printf("Clenit Received message: %s from topic: %s
", msg.Payload(), msg.Topic())
}

func BatchProcessing() string {

	opts := mqtt.NewClientOptions()
	opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
	opts.SetClientID("go_mqtt_consumer")
	opts.SetUsername(userName)
	opts.SetPassword(passwd)
	opts.SetKeepAlive(8 * time.Second)

	opts.SetDefaultPublishHandler(messageRecHandler)
	opts.OnConnect = connectHandler
	opts.OnConnectionLost = connectLostHandler
	client := mqtt.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		panic(token.Error())
	}

	sub(client, false)
	time.Sleep(30 * time.Second)

	for {
		time.Sleep(3 * time.Second)
		fmt.Println("目前没有数据,等待消息进场。")
	}

	return ""
}

// 初始化数据库和日志
func InitMysqlAndLog(name string, id string) {

	writer, err := log.New(id, name)
	if err != nil {
		panic(err)
	}

	database.Init(writer)

	redisClient.Set("log-"+name+"-"+id, writer.Name())
}

func sub(client mqtt.Client, producer bool) {
	token := client.Subscribe(topic, 1, nil)
	token.Wait()
	if producer {
		fmt.Printf("Producer subscribed to topic %s", topic)
	} else {
		fmt.Printf("Consumer subscribed to topic %s", topic)
	}
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
	fmt.Println("Connected")
}

var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
	fmt.Printf("Connect lost: %v", err)
}

效果图如下

消息持续消费,具体处理业务的逻辑代码自己写。

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。