相对于关系型 sql 数据库,nosql 数据库今年来兴起,典型的像 redis、 mongodb、leveldb、hbase、cassandra 等,nosql 横向扩展更加便捷,灵活的数据类型,无需预先建立存储字段,可自定义格式,在很多系统中除了 sql 关系型的数据,还需要部分 nosql 数据库来配合,甚至有些项目只用 nosql 库,因此熟悉一下 nosql 的访问非常有必要。
redis 提供了丰富的数据结构,除了简单的 kv 键值对,还有 list、hash、set、zset 等,支持事务和原子操作,还能使用 lua 来实现执行脚本,基本上每个项目必用。首先加入 redis 的依赖包:
go get github.com/go-redis/redis
该包对 redis 的调用主要定义了 redis.Client 对象,使用 NewClient 获得一个 redis 的客户端,值得注意的使用完以后需要调用其 Close 方法,内部使用了 baseClient 对象维护状态和连接池。
func GetRedisClient() *redis.Client {
return GetRedisClientWithDB(0)
}
func GetRedisClientWithDB(db int) *redis.Client {
return redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: db,
})}
有了 client 对象,就可以像 redis-cli 一样向 redis 服务器发送各种命令了。比如最简单的 set/set、setnx 操作:
func keyValue() {
client := session.GetRedisClient()
defer client.Close()
client.Set("one", 1, 0)
val, _ := client.Get("one").Result()
fmt.Println(val)
oldVal, _ := client.GetSet("one", "cos(0)").Result()
fmt.Println(oldVal)
val, _ = client.Get("one").Result()
fmt.Println(val)
client.Set("two", 2, 0)
client.MSet("three", 3, "four", 4, "five", 5)
vals, _ := client.MGet("one", "two", "three", "four", "five").Result()
fmt.Println(vals)
client.SetNX("once", "atomic", 0).Err()
did, _ := client.SetNX("once", "atomic", 0).Result()
fmt.Println(did)
}
对 redis 的调用结构都被封装成了 *Cmd 结构,比如 IntCmd、StringCmd、SliceCmd 等,这些 *Cmd 都嵌入了 baseCmd 结构,实现了 Cmder 接口,使用 Result() 方法获得调用结果,这个结果返回 cmd 中的 err 对象来表明错误,对应 redis 的 nil,这个包使用 redis.Nil,比如你 GET 一个不存在的 key,redis 会返回 Nil,对应包内的定义是:
const Nil = RedisError("redis: nil")
对其他数据结构类型的调用示例请参考本章节 github 上的代码,下面的例子是发布订阅模型:
func pubsub() {
client := session.GetRedisClient()
defer client.Close()
sub := client.Subscribe("channel")
go func() {
time.Sleep(1 * time.Second)
client.Publish("channel", "a message")
time.Sleep(1 * time.Second)
sub.Close()
}()
for message := range sub.Channel() {
fmt.Println(message.Channel, message.Payload)
}
}
使用 pipeline 和基于 watch 的事务:
func pipe() {
client := session.GetRedisClient()
defer client.Close()
// MULTI-DISCARD-EXEC 整体提交
// MULTI
pipe := client.TxPipeline()
pipe.Set("counter", 1, 0)
pipe.IncrBy("counter", 1)
pipe.Expire("counter", 10*time.Second)
_, err := pipe.Exec()
// EXEC
if err != nil {
fmt.Println(err)
}
}
func watch() {
client := session.GetRedisClient()
defer client.Close()
// WATCH-CAS 监测如改动则 EXEC 放弃
doFunc := func(tx *redis.Tx) error {
// MULTI
flag, _ := tx.Get("flag").Result()
val, _ := strconv.ParseInt(flag, 10, 0)
//pipe := tx.Pipeline()
//pipe.Set("flag", val+1, 0)
//pipe.Exec()
_, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
pipe.Set("flag", val+1, 0)
return nil
})
// EXEC
return err
}
//client.Watch(doFunc, "flag1", "flag2", "flag3")
client.Watch(doFunc, "flag")
}
leveldb 也是一个 kv 数据库引擎,它的设计和代码都算的上非常优雅,由谷歌的工程师发明,可以很方便的内嵌到程序里,默认按照 key 的字典顺序来存储访问,而且自动压缩,key 和 value 都支持任意长度的字节数。首先安装 leveldb 的依赖包:
go get github.com/syndtr/goleveldb
要使用 leveldb 接口,首先也得获得 level.DB 对象,其内部维持了 session 会话,和 redis 访问不同的是,它的接口需要使用 []byte 类型:
func keyValue() {
db := session.GetLevelDB()
defer db.Close()
db.Put([]byte("1st"), []byte("1"), nil)
data, _ := db.Get([]byte("1st"), nil)
fmt.Println(string(data))
}
迭代的时候可以按照 key 的顺序访问,这是 leveldb 的特点:
func iterAll() {
db := session.GetLevelDB()
defer db.Close()
db.Put([]byte("3rd"), []byte("3"), nil)
db.Put([]byte("2nd"), []byte("2"), nil)
db.Put([]byte("1st"), []byte("1"), nil)
// 会按照 1st 2nd 3rd 的顺序输出
iter := db.NewIterator(nil, nil)
for iter.Next() {
key := iter.Key()
value := iter.Value()
fmt.Printf("%v %v\n", string(key), string(value))
}
iter.Release()
}
遍历的时候也支持范围 range 和步进 seek 操作:
func iterRange() {
db := session.GetLevelDB()
defer db.Close()
iter := db.NewIterator(&util.Range{Start: []byte("2nd"), Limit: []byte("3rd")}, nil)
for iter.Next() {
key := iter.Key()
value := iter.Value()
fmt.Printf("%v %v\n", string(key), string(value))
}
iter.Release()
fmt.Println("------")
}
func seek() {
db := session.GetLevelDB()
defer db.Close()
iter := db.NewIterator(nil, nil)
for ok := iter.Seek([]byte("2nd")); ok; ok = iter.Next() {
key := iter.Key()
value := iter.Value()
fmt.Printf("%v %v\n", string(key), string(value))
}
iter.Release()
fmt.Println("------")
}
相对于 redis、leveldb 的访问,mongodb 会稍显复杂,除了有相似的 mongo.Client 对象,还需要了解一下 bson 的数据类型定义,先看如何获取 mongo.Client 对象:
func GetMongoClient() *mongo.Client {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
client, err := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://localhost:27017"))
if err != nil {
panic(err)
}
return client
}
时间操作数据的时候常用的是 Database、 Collection、Cursor 3 个对象,可以随意切换数据库,下面是一个插入的例子:
func insert() {
client := session.GetMongoClient()
defer client.Disconnect(nil)
persons := client.Database("test").Collection("persons")
persons.InsertOne(nil, bson.M{
"username": "zhangsan",
"nickname": "张三",
"age": 20,
"create_time": time.Now(),
})
}
这里的 bson.M 的定义:
type M = primitive.M
// bson.M{"foo": "bar", "hello": "world", "pi": 3.14159}.
type M map[string]interface{}
可以见 bson.M 和 map[string]interface{} 一样,类似的 bson.A 是个数组:
// bson.A{"bar", "world", 3.14159, bson.D{{"qux", 12345}}}
type A []interface{}
查询的例子如下:
func findOne() {
client := session.GetMongoClient()
defer client.Disconnect(nil)
persons := client.Database("test").Collection("persons")
var result bson.M
persons.FindOne(nil, bson.D{}).Decode(&result)
fmt.Println(result)
}
这里的查询条件使用 bson.D 就不太好理解了,F12 进去看看定义:
// bson.D{{"foo", "bar"}, {"hello", "world"}, {"pi", 3.14159}}
type D = primitive.D
type D []E
type E = primitive.E
// E represents a BSON element for a D. It is usually used inside a D.
type E struct {
Key string
Value interface{}
}
原来 bson.D 是一个 key、value 对象的数组,所以查询 userame=zhangsan 要这么写:
persons.FindOne(nil, bson.D{{"username","zhangsan"}}).Decode(&result)
为什么不是直接弄成 bson.M 呢? 主要是因为查询条件是有顺序的,特别是 mongodb 的 pipeline,顺序不同过滤数据的结果不同,数组才能保证顺序,而 bson.M 不能。
对查询的结果使用 cursor 遍历,要得到遍历的错误和标准的 sql 库 Scan() 函数类似,需要使用 cur.Err() 获取:
func find() {
client := session.GetMongoClient()
defer client.Disconnect(nil)
persons := client.Database("test").Collection("persons")
cur, _ := persons.Find(nil, bson.D{})
for cur.Next(nil) {
var result bson.M
cur.Decode(&result)
fmt.Println(result)
}
// cur.Err()
}
本章节的代码