您现在的位置是:首页 >技术教程 >使用golang实现日志收集系统的logagent网站首页技术教程

使用golang实现日志收集系统的logagent

摔跤吧儿 2023-05-31 12:00:01
简介使用golang实现日志收集系统的logagent

整体架构

参考 七米老师的日志收集项目
在这里插入图片描述
主要用go实现logagent的部分,logagent的作用主要是实时监控日志追加的变化,并将变化发送到kafka中。
之前我们已经实现了 用go连接kafka并向其中发送数据,也实现了使用tail库监控日志追加操作
我们把这两部分结合起来实现监控日志追加并发送到kafka。

使用github.com/go-ini/ini配置参数

// 读取配置参数
	cfg, err:=ini.Load("config/config.ini")
	if err!=nil {
		logrus.Error((" load config error"))
		return
	}
[kafka]
address = 127.0.0.1:9092
chan_size = 1000

[collect]
logfile_path= D:/learn/go/log-collector-lmh/log_agent/config_version/log_file/xx.log

配置参数主要包括,kafka的启动端口,存储的数据大小限制,日志文件的路径。

初始化kafka

kafka.go

package kafka

import (
	"github.com/Shopify/sarama"
	"github.com/sirupsen/logrus"
)

var (
	Client sarama.SyncProducer
	MsgChan chan *sarama.ProducerMessage //占用的字节数少,传递的指针
)

func InitKafka(kafkaAddr string, chanSize int64) (err error){
	config:=sarama.NewConfig()
	// 生产者配置
	config.Producer.RequiredAcks=sarama.WaitForAll
	config.Producer.Partitioner=sarama.NewRandomPartitioner
	config.Producer.Return.Successes=true
	// 连接kafka
	Client,err=sarama.NewSyncProducer([]string{kafkaAddr}, config)
	if err!=nil {
		logrus.Error("producer closed", err)
		return
	}
	// 从管道中读取日志并发送到kafka
	MsgChan = make(chan *sarama.ProducerMessage, chanSize)
	go sendMsg()
	return
}

func sendMsg(){
	for {
		select {
			case msg := <- MsgChan:
				pid, offset, err := Client.SendMessage(msg)
				if err != nil {
					logrus.Warning("send msg failed, err:", err)
					return
				}
				logrus.Infof("send msg to kafka success. pid:%v offset:%v", pid, offset)
		}
	}
}

这里实现了连接kafka,并使用协程不断地读取MsgChan,读取到数据后向kafka发送,这里MsgChan通道的数据由tail监控到的日志变化写入。
main.go中调用

// 初始化kafka
	kafkaAddr:=cfg.Section("kafka").Key("address").String()
	chanSize:=cfg.Section("kafka").Key("chan_size").MustInt64(0)

	err=kafka.InitKafka(kafkaAddr, chanSize)
	if err!=nil {
		logrus.Error("kafka init failed")
	}
	logrus.Info("Kafka init success")

初始化tailf,并将日志数据写入ChanMsg

tailF.go

package tailF
import (
	"github.com/hpcloud/tail"
	"fmt"
)
var (
	TailObj *tail.Tail
)
func InitTail(filename string) (err error) {
	config := tail.Config{
		ReOpen: true,
		Follow: true,
		Location: &tail.SeekInfo{Offset: 0, Whence: 2},
		MustExist: false,
		Poll: true,
	}

	// 打开文件开始读取数据
	TailObj, err =  tail.TailFile(filename, config)
	if err != nil {
		fmt.Printf("create tail %s failed, err:%v
", filename, err)
		return
	}
	return
}

main.go中对应

// 初始化tailf
	fileName:=cfg.Section("collect").Key("logfile_path").String()
	err=tailF.InitTail(fileName)
	if err!=nil {
		logrus.Error(" tailf init failed")
	}
	logrus.Info("Init tail success")
	// 把读取的日志发往kafka
	err=run()
	if err!=nil {
		logrus.Error(" run error%s", err)
		return
	}
	logrus.Info("run success")

main.go中实现的run函数,读取tailF的数据,并写入ChanMsg

func run () (err error){
	for {
		line,ok:=<-tailF.TailObj.Lines
		if !ok {
			logrus.Warn("tail file %s close reopen
", tailF.TailObj.Filename)
			// 读取出错等一秒
			time.Sleep(time.Second)
			continue
		}
		// 使用通道将传输日志改为异步
		// 读取的日志封装为ProducerMessage
		msg:=&sarama.ProducerMessage{}
		msg.Topic="web_log"
		msg.Value=sarama.StringEncoder(line.Text)
		// 放到channel中
		kafka.MsgChan<-msg
	}
}

完整main.go

package main

import (
	"config_version/kafka"
	"config_version/tailF"
	"time"
	"github.com/Shopify/sarama"
	"github.com/go-ini/ini"
	"github.com/sirupsen/logrus"
)

func main() {
	// 读取配置参数
	cfg, err:=ini.Load("config/config.ini")
	if err!=nil {
		logrus.Error((" load config error"))
		return
	}
	// 初始化kafka
	kafkaAddr:=cfg.Section("kafka").Key("address").String()
	chanSize:=cfg.Section("kafka").Key("chan_size").MustInt64(0)

	err=kafka.InitKafka(kafkaAddr, chanSize)
	if err!=nil {
		logrus.Error("kafka init failed")
	}
	logrus.Info("Kafka init success")
	// 初始化tailf
	fileName:=cfg.Section("collect").Key("logfile_path").String()
	err=tailF.InitTail(fileName)
	if err!=nil {
		logrus.Error(" tailf init failed")
	}
	logrus.Info("Init tail success")
	// 把读取的日志发往kafka
	err=run()
	if err!=nil {
		logrus.Error(" run error%s", err)
		return
	}
	logrus.Info("run success")

}

func run () (err error){
	for {
		line,ok:=<-tailF.TailObj.Lines
		if !ok {
			logrus.Warn("tail file %s close reopen
", tailF.TailObj.Filename)
			// 读取出错等一秒
			time.Sleep(time.Second)
			continue
		}
		// 使用通道将传输日志改为异步
		// 读取的日志封装为ProducerMessage
		msg:=&sarama.ProducerMessage{}
		msg.Topic="web_log"
		msg.Value=sarama.StringEncoder(line.Text)
		// 放到channel中
		kafka.MsgChan<-msg
	}
}

至此, 我们实现了简化版的日志收集系统的logagent功能,目前日志的路径还需要手动写入配置文件中,修改的话还需重启项目,之后可以使用ETCD实现日志路径的自动配置。

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。