golang实现从kafka导入导出一个channel的内容:
- 导出一个channel
package mainimport ( "flag" "fmt" "log" "time" "os" "strings" "encoding/binary" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" ab "github.com/hyperledger/fabric/protos/orderer")var ( brokers string topic string partition int)func main() { flag.StringVar(&brokers, "brokers", "localhost:9093", "Kafka brokers") flag.StringVar(&topic, "topic", "topic", "Kafka topic") flag.IntVar(&partition, "partition", 0, "Kafka topic partition") flag.Parse() config := sarama.NewConfig() client, err := sarama.NewClient(strings.Split(brokers, ","), config) if err != nil { log.Fatalf("Unable to create kafka client, error: %v\n", err) } err = exportTopic(client, topic, partition) if err != nil { log.Fatalf("Unabled to export topic, error: %v\n", err) }}func exportTopic(client sarama.Client, topic string, partition int) error { consumer, err := sarama.NewConsumerFromClient(client) if err != nil { return err } defer consumer.Close() //partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest) partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetOldest) if err != nil { return err } defer partitionConsumer.Close() file, err := os.OpenFile(fmt.Sprintf("%s.dat", topic), os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return err } defer file.Close() var countConnect int64 = 0 var countTimeToCut int64 = 0 var countRegular int64 = 0 var lastOffset int64 = 0 msg := new(ab.KafkaMessage) for { select { case err = <- partitionConsumer.Errors(): return err case in, ok := <- partitionConsumer.Messages(): if !ok { return fmt.Errorf("kafka consumer closed") } if err := proto.Unmarshal(in.Value, msg); err != nil { return err } // export mssage lastOffset = in.Offset if err := exportMessage(file, in.Key, in.Value); err != nil { return err } switch msg.Type.(type) { case *ab.KafkaMessage_Connect: countConnect ++ case *ab.KafkaMessage_TimeToCut: countTimeToCut ++ case *ab.KafkaMessage_Regular: countRegular ++ default: return fmt.Errorf("unknown kafka message") } case <- time.After(5 * time.Second): fmt.Printf("export summary total: %d (Connect=%d, TimeToCut=%d, Regular=%d)\n", lastOffset+1, countConnect, countTimeToCut, countRegular) return nil } } return nil}func exportMessage(file *os.File, key []byte, value []byte) error { if err := exportField(file, key); err != nil { return err } if err := exportField(file, value); err != nil { return err } return nil}func exportField(file *os.File, data []byte) error { l := len(data) if err := binary.Write(file, binary.LittleEndian, int32(l)); err != nil { return err } if l > 0 { if n, err := file.Write(data); err != nil { return err } else if n != l { return fmt.Errorf("incorrect bytes written expect %d, but %d", l, n) } } return nil}
- 导入一个channel
package mainimport ( "flag" "io" "fmt" "log" "os" "strings" "encoding/binary" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" ab "github.com/hyperledger/fabric/protos/orderer")var ( brokers string topic string)func main() { flag.StringVar(&brokers, "brokers", "localhost:9093", "Kafka brokers") flag.StringVar(&topic, "topic", "topic", "Kafka topic") flag.Parse() config := sarama.NewConfig() client, err := sarama.NewClient(strings.Split(brokers, ","), config) if err != nil { log.Fatalf("Unable to create kafka client, error: %v\n", err) } err = importTopic(client, topic) if err != nil { log.Fatalf("Unabled to export topic, error: %v\n", err) }}func importTopic(client sarama.Client, topic string) error { producer, err := sarama.NewAsyncProducerFromClient(client) if err != nil { return err } defer producer.Close() file, err := os.OpenFile(fmt.Sprintf("%s.dat", topic), os.O_RDONLY, 0644) if err != nil { return err } defer file.Close() var countConnect int64 = 0 var countTimeToCut int64 = 0 var countRegular int64 = 0 msg := new(ab.KafkaMessage) for { key, value, err := importMessage(file) if err == io.EOF { fmt.Printf("import summary total: %d (Connect=%d, TimeToCut=%d, Regular=%d)\n", (countConnect + countTimeToCut + countRegular), countConnect, countTimeToCut, countRegular) return nil } else if err != nil { return err } if err := proto.Unmarshal(value, msg); err != nil { return err } switch msg.Type.(type) { case *ab.KafkaMessage_Connect: countConnect ++ case *ab.KafkaMessage_TimeToCut: countTimeToCut ++ case *ab.KafkaMessage_Regular: countRegular ++ default: return fmt.Errorf("unknown kakfa message") } producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: sarama.ByteEncoder(key), Value: sarama.ByteEncoder(value)} }}func importMessage(file *os.File) ([]byte, []byte, error) { key, err := importField(file) if err != nil { return nil, nil, err } value, err := importField(file) if err == io.EOF { return nil, nil, fmt.Errorf("invalid EOF meet") } else if err != nil { return nil, nil, err } return key, value, nil}func importField(file *os.File) ([]byte, error) { var l int32 err := binary.Read(file, binary.LittleEndian, &l) if err != nil { return nil, err } if l == 0 { return nil, nil } data := make([]byte, l) if n, err := file.Read(data); err != nil { return nil, err } else if int32(n) != l { return nil, fmt.Errorf("incorrect bytes read expect %d, but %d", l, n) } return data, nil}
文章来源:智云一二三科技
文章标题:导入和导出kafka based channel的数据
文章地址:https://www.zhihuclub.com/2302.shtml