看来在您当前的项目中已经存在一个模块,声明其路径为 github.com/IBM/sarama
,但我们需要使用的是 github.com/Shopify/sarama
。这可能会导致依赖冲突。要解决这个问题,您可以采取以下步骤:
- 移除或更新现有的
go.mod
文件: 确保您的项目使用的是正确的sarama
库。 - 更新
go.mod
文件: 确保您的go.mod
文件引用的是github.com/Shopify/sarama
。您可以手动编辑go.mod
文件,或者使用go get
命令。
首先,删除现有的 go.mod
和 go.sum
文件:
rm go.mod go.sum
然后,重新初始化 Go 模块并获取正确的 sarama
库:
go mod init your_module_name
go get github.com/Shopify/sarama
以下是一个完整的示例,包括 go.mod
文件和 main.go
文件。
1. 创建 go.mod
文件
go mod init your_module_name
然后在 go.mod
文件中确保引用 github.com/Shopify/sarama
:
module your_module_name
go 1.18
require github.com/Shopify/sarama v1.34.1
2. 创建 main.go
文件
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"github.com/Shopify/sarama"
)
func main() {
// Kafka broker addresses
brokers := []string{"localhost:9092"}
// Kafka topic
topic := "your_topic"
// Consumer group ID
group := "your_group"
// Sarama logger configuration
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
// Sarama configuration
config := sarama.NewConfig()
config.Version = sarama.V0_9_0_0 // Kafka version
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Offsets.Initial = sarama.OffsetOldest
// Create new consumer group
consumerGroup, err := sarama.NewConsumerGroup(brokers, group, config)
if err != nil {
log.Fatalf("Error creating consumer group: %v", err)
}
defer func() {
if err := consumerGroup.Close(); err != nil {
log.Fatalf("Error closing consumer group: %v", err)
}
}()
// Create a new consumer instance
consumer := Consumer{
ready: make(chan bool),
}
// Handle termination signals
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
// Run the consumer group in a separate goroutine
go func() {
for {
if err := consumerGroup.Consume(ctx, []string{topic}, &consumer); err != nil {
log.Fatalf("Error consuming messages: %v", err)
}
// Check if context was canceled, signaling termination
if ctx.Err() != nil {
return
}
consumer.ready = make(chan bool)
}
}()
// Wait for consumer to be ready
<-consumer.ready
log.Println("Sarama consumer up and running! Press Ctrl+C to stop.")
// Wait for termination signal
<-ctx.Done()
log.Println("Terminating Sarama consumer...")
}
// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(consumer.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")
}
return nil
}
运行代码
确保 Kafka 服务器已启动,并且指定的主题存在,然后运行以上 Go 程序来消费 Kafka 消息。
go run main.go
这段代码将在控制台中打印消费到的 Kafka 消息的值、时间戳和主题信息。这样就可以避免模块路径冲突,并确保使用正确的 sarama
库。