0
点赞
收藏
分享

微信扫一扫

module declares its path as: github.com/IBM/sarama but was required as: gith

看来在您当前的项目中已经存在一个模块,声明其路径为 github.com/IBM/sarama,但我们需要使用的是 github.com/Shopify/sarama。这可能会导致依赖冲突。要解决这个问题,您可以采取以下步骤:

  1. 移除或更新现有的 go.mod 文件: 确保您的项目使用的是正确的 sarama 库。
  2. 更新 go.mod 文件: 确保您的 go.mod 文件引用的是 github.com/Shopify/sarama。您可以手动编辑 go.mod 文件,或者使用 go get 命令。

首先,删除现有的 go.modgo.sum 文件:

rm go.mod go.sum

然后,重新初始化 Go 模块并获取正确的 sarama 库:

go mod init your_module_name
go get github.com/Shopify/sarama

以下是一个完整的示例,包括 go.mod 文件和 main.go 文件。

1. 创建 go.mod 文件

go mod init your_module_name

然后在 go.mod 文件中确保引用 github.com/Shopify/sarama

module your_module_name

go 1.18

require github.com/Shopify/sarama v1.34.1

2. 创建 main.go 文件

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"

	"github.com/Shopify/sarama"
)

func main() {
	// Kafka broker addresses
	brokers := []string{"localhost:9092"}
	// Kafka topic
	topic := "your_topic"
	// Consumer group ID
	group := "your_group"

	// Sarama logger configuration
	sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)

	// Sarama configuration
	config := sarama.NewConfig()
	config.Version = sarama.V0_9_0_0 // Kafka version
	config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
	config.Consumer.Offsets.Initial = sarama.OffsetOldest

	// Create new consumer group
	consumerGroup, err := sarama.NewConsumerGroup(brokers, group, config)
	if err != nil {
		log.Fatalf("Error creating consumer group: %v", err)
	}
	defer func() {
		if err := consumerGroup.Close(); err != nil {
			log.Fatalf("Error closing consumer group: %v", err)
		}
	}()

	// Create a new consumer instance
	consumer := Consumer{
		ready: make(chan bool),
	}

	// Handle termination signals
	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
	defer cancel()

	// Run the consumer group in a separate goroutine
	go func() {
		for {
			if err := consumerGroup.Consume(ctx, []string{topic}, &consumer); err != nil {
				log.Fatalf("Error consuming messages: %v", err)
			}
			// Check if context was canceled, signaling termination
			if ctx.Err() != nil {
				return
			}
			consumer.ready = make(chan bool)
		}
	}()

	// Wait for consumer to be ready
	<-consumer.ready
	log.Println("Sarama consumer up and running! Press Ctrl+C to stop.")

	// Wait for termination signal
	<-ctx.Done()
	log.Println("Terminating Sarama consumer...")
}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
	ready chan bool
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
	// Mark the consumer as ready
	close(consumer.ready)
	return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for message := range claim.Messages() {
		log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
		session.MarkMessage(message, "")
	}
	return nil
}

运行代码

确保 Kafka 服务器已启动,并且指定的主题存在,然后运行以上 Go 程序来消费 Kafka 消息。

go run main.go

这段代码将在控制台中打印消费到的 Kafka 消息的值、时间戳和主题信息。这样就可以避免模块路径冲突,并确保使用正确的 sarama 库。

举报

相关推荐

0 条评论