您的位置 首页 golang

导入和导出kafka based channel的数据

golang实现从kafka导入导出一个channel的内容:

  1. 导出一个channel
package mainimport (    "flag"    "fmt"    "log"    "time"    "os"    "strings"    "encoding/binary"    "github.com/Shopify/sarama"    "github.com/golang/protobuf/proto" ab "github.com/hyperledger/fabric/protos/orderer")var (    brokers     string    topic       string    partition   int)func main() {    flag.StringVar(&brokers,    "brokers",      "localhost:9093",   "Kafka brokers")    flag.StringVar(&topic,      "topic",        "topic",            "Kafka topic")    flag.IntVar(&partition,     "partition",    0,                  "Kafka topic partition")    flag.Parse()    config := sarama.NewConfig()    client, err := sarama.NewClient(strings.Split(brokers, ","), config)    if err != nil {        log.Fatalf("Unable to create kafka client, error: %v\n", err)    }    err = exportTopic(client, topic, partition)    if err != nil {        log.Fatalf("Unabled to export topic, error: %v\n", err)    }}func exportTopic(client sarama.Client, topic string, partition int) error {    consumer, err := sarama.NewConsumerFromClient(client)    if err != nil {        return err    }    defer consumer.Close()  //partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)    partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetOldest)    if err != nil {        return err    }    defer partitionConsumer.Close()    file, err := os.OpenFile(fmt.Sprintf("%s.dat", topic), os.O_CREATE|os.O_WRONLY, 0644)    if err != nil {        return err    }    defer file.Close()    var countConnect   int64 = 0    var countTimeToCut int64 = 0    var countRegular   int64 = 0    var lastOffset     int64 = 0    msg := new(ab.KafkaMessage)    for {        select {        case err = <- partitionConsumer.Errors():            return err        case in, ok := <- partitionConsumer.Messages():            if !ok {                return fmt.Errorf("kafka consumer closed")            }            if err := proto.Unmarshal(in.Value, msg); err != nil {                return err            }            // export mssage            lastOffset = in.Offset            if err := exportMessage(file, in.Key, in.Value); err != nil {                return err            }            switch msg.Type.(type) {            case *ab.KafkaMessage_Connect:                countConnect ++            case *ab.KafkaMessage_TimeToCut:                countTimeToCut ++            case *ab.KafkaMessage_Regular:                countRegular ++            default:                return fmt.Errorf("unknown kafka message")            }        case <- time.After(5 * time.Second):            fmt.Printf("export summary total: %d (Connect=%d, TimeToCut=%d, Regular=%d)\n", lastOffset+1, countConnect, countTimeToCut, countRegular)            return nil        }    }    return nil}func exportMessage(file *os.File, key []byte, value []byte) error {    if err := exportField(file, key); err != nil {        return err    }    if err := exportField(file, value); err != nil {        return err    }    return nil}func exportField(file *os.File, data []byte) error {    l := len(data)    if err := binary.Write(file, binary.LittleEndian, int32(l)); err != nil {        return err    }    if l > 0 {        if n, err := file.Write(data); err != nil {            return err        } else if n != l {            return fmt.Errorf("incorrect bytes written expect %d, but %d", l, n)        }    }    return nil}
  1. 导入一个channel
package mainimport (    "flag"    "io"    "fmt"    "log"    "os"    "strings"    "encoding/binary"    "github.com/Shopify/sarama"    "github.com/golang/protobuf/proto" ab "github.com/hyperledger/fabric/protos/orderer")var (    brokers     string    topic       string)func main() {    flag.StringVar(&brokers,    "brokers",      "localhost:9093",   "Kafka brokers")    flag.StringVar(&topic,      "topic",        "topic",            "Kafka topic")    flag.Parse()    config := sarama.NewConfig()    client, err := sarama.NewClient(strings.Split(brokers, ","), config)    if err != nil {        log.Fatalf("Unable to create kafka client, error: %v\n", err)    }    err = importTopic(client, topic)    if err != nil {        log.Fatalf("Unabled to export topic, error: %v\n", err)    }}func importTopic(client sarama.Client, topic string) error {    producer, err := sarama.NewAsyncProducerFromClient(client)    if err != nil {        return err    }    defer producer.Close()    file, err := os.OpenFile(fmt.Sprintf("%s.dat", topic), os.O_RDONLY, 0644)    if err != nil {        return err    }    defer file.Close()    var countConnect   int64 = 0    var countTimeToCut int64 = 0    var countRegular   int64 = 0    msg := new(ab.KafkaMessage)    for {        key, value, err := importMessage(file)        if err == io.EOF {            fmt.Printf("import summary total: %d (Connect=%d, TimeToCut=%d, Regular=%d)\n", (countConnect + countTimeToCut + countRegular), countConnect, countTimeToCut, countRegular)            return nil        } else if err != nil {            return err        }        if err := proto.Unmarshal(value, msg); err != nil {            return err        }        switch msg.Type.(type) {        case *ab.KafkaMessage_Connect:            countConnect ++        case *ab.KafkaMessage_TimeToCut:            countTimeToCut ++        case *ab.KafkaMessage_Regular:            countRegular ++        default:            return fmt.Errorf("unknown kakfa message")        }        producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: sarama.ByteEncoder(key), Value: sarama.ByteEncoder(value)}    }}func importMessage(file *os.File) ([]byte, []byte, error) {    key, err := importField(file)    if err != nil {        return nil, nil, err    }    value, err := importField(file)    if err == io.EOF {        return nil, nil, fmt.Errorf("invalid EOF meet")    } else if err != nil {        return nil, nil, err    }    return key, value, nil}func importField(file *os.File) ([]byte, error) {    var l int32    err := binary.Read(file, binary.LittleEndian, &l)    if err != nil {        return nil, err    }    if l == 0 {        return nil, nil    }    data := make([]byte, l)    if n, err := file.Read(data); err != nil {        return nil, err    } else if int32(n) != l {        return nil, fmt.Errorf("incorrect bytes read expect %d, but %d", l, n)    }    return data, nil}

文章来源:智云一二三科技

文章标题:导入和导出kafka based channel的数据

文章地址:https://www.zhihuclub.com/2302.shtml

关于作者: 智云科技

热门文章

发表评论

您的电子邮箱地址不会被公开。

网站地图