go-common/vendor/github.com/tsuna/gohbase/client.go
2019-04-22 02:59:20 +00:00

296 lines
7.5 KiB
Go

// Copyright (C) 2015 The GoHBase Authors. All rights reserved.
// This file is part of GoHBase.
// Use of this source code is governed by the Apache License 2.0
// that can be found in the COPYING file.
package gohbase
import (
"encoding/binary"
"fmt"
"sync"
"time"
"github.com/cznic/b"
"github.com/golang/protobuf/proto"
log "github.com/sirupsen/logrus"
"github.com/tsuna/gohbase/hrpc"
"github.com/tsuna/gohbase/pb"
"github.com/tsuna/gohbase/region"
"github.com/tsuna/gohbase/zk"
"golang.org/x/time/rate"
)
const (
standardClient = iota
adminClient
defaultRPCQueueSize = 100
defaultFlushInterval = 20 * time.Millisecond
defaultZkRoot = "/hbase"
defaultZkTimeout = 30 * time.Second
defaultEffectiveUser = "root"
// metaBurst is maximum number of request allowed at once.
metaBurst = 10
// metaLimit is rate at which to throttle requests to hbase:meta table.
metaLimit = rate.Limit(100)
)
// Client a regular HBase client
type Client interface {
Scan(s *hrpc.Scan) hrpc.Scanner
Get(g *hrpc.Get) (*hrpc.Result, error)
Put(p *hrpc.Mutate) (*hrpc.Result, error)
Delete(d *hrpc.Mutate) (*hrpc.Result, error)
Append(a *hrpc.Mutate) (*hrpc.Result, error)
Increment(i *hrpc.Mutate) (int64, error)
CheckAndPut(p *hrpc.Mutate, family string, qualifier string,
expectedValue []byte) (bool, error)
Close()
}
// RPCClient is core client of gohbase. It's exposed for testing.
type RPCClient interface {
SendRPC(rpc hrpc.Call) (proto.Message, error)
}
// Option is a function used to configure optional config items for a Client.
type Option func(*client)
// A Client provides access to an HBase cluster.
type client struct {
clientType int
regions keyRegionCache
// Maps a hrpc.RegionInfo to the *region.Client that we think currently
// serves it.
clients clientRegionCache
metaRegionInfo hrpc.RegionInfo
adminRegionInfo hrpc.RegionInfo
// The maximum size of the RPC queue in the region client
rpcQueueSize int
// zkClient is zookeeper for retrieving meta and admin information
zkClient zk.Client
// The root zookeeper path for Hbase. By default, this is usually "/hbase".
zkRoot string
// The zookeeper session timeout
zkTimeout time.Duration
// The timeout before flushing the RPC queue in the region client
flushInterval time.Duration
// The user used when accessing regions.
effectiveUser string
// metaLookupLimiter is used to throttle lookups to hbase:meta table
metaLookupLimiter *rate.Limiter
// How long to wait for a region lookup (either meta lookup or finding
// meta in ZooKeeper). Should be greater than or equal to the ZooKeeper
// session timeout.
regionLookupTimeout time.Duration
// regionReadTimeout is the maximum amount of time to wait for regionserver reply
regionReadTimeout time.Duration
done chan struct{}
closeOnce sync.Once
}
// NewClient creates a new HBase client.
func NewClient(zkquorum string, options ...Option) Client {
return newClient(zkquorum, options...)
}
func newClient(zkquorum string, options ...Option) *client {
log.WithFields(log.Fields{
"Host": zkquorum,
}).Debug("Creating new client.")
c := &client{
clientType: standardClient,
regions: keyRegionCache{regions: b.TreeNew(region.CompareGeneric)},
clients: clientRegionCache{
regions: make(map[hrpc.RegionClient]map[hrpc.RegionInfo]struct{}),
},
rpcQueueSize: defaultRPCQueueSize,
flushInterval: defaultFlushInterval,
metaRegionInfo: region.NewInfo(
0,
[]byte("hbase"),
[]byte("meta"),
[]byte("hbase:meta,,1"),
nil,
nil),
zkRoot: defaultZkRoot,
zkTimeout: defaultZkTimeout,
effectiveUser: defaultEffectiveUser,
metaLookupLimiter: rate.NewLimiter(metaLimit, metaBurst),
regionLookupTimeout: region.DefaultLookupTimeout,
regionReadTimeout: region.DefaultReadTimeout,
done: make(chan struct{}),
}
for _, option := range options {
option(c)
}
//Have to create the zkClient after the Options have been set
//since the zkTimeout could be changed as an option
c.zkClient = zk.NewClient(zkquorum, c.zkTimeout)
return c
}
// RpcQueueSize will return an option that will set the size of the RPC queues
// used in a given client
func RpcQueueSize(size int) Option {
return func(c *client) {
c.rpcQueueSize = size
}
}
// ZookeeperRoot will return an option that will set the zookeeper root path used in a given client.
func ZookeeperRoot(root string) Option {
return func(c *client) {
c.zkRoot = root
}
}
// ZookeeperTimeout will return an option that will set the zookeeper session timeout.
func ZookeeperTimeout(to time.Duration) Option {
return func(c *client) {
c.zkTimeout = to
}
}
// RegionLookupTimeout will return an option that sets the region lookup timeout
func RegionLookupTimeout(to time.Duration) Option {
return func(c *client) {
c.regionLookupTimeout = to
}
}
// RegionReadTimeout will return an option that sets the region read timeout
func RegionReadTimeout(to time.Duration) Option {
return func(c *client) {
c.regionReadTimeout = to
}
}
// EffectiveUser will return an option that will set the user used when accessing regions.
func EffectiveUser(user string) Option {
return func(c *client) {
c.effectiveUser = user
}
}
// FlushInterval will return an option that will set the timeout for flushing
// the RPC queues used in a given client
func FlushInterval(interval time.Duration) Option {
return func(c *client) {
c.flushInterval = interval
}
}
// Close closes connections to hbase master and regionservers
func (c *client) Close() {
c.closeOnce.Do(func() {
close(c.done)
if c.clientType == adminClient {
if ac := c.adminRegionInfo.Client(); ac != nil {
ac.Close()
}
}
c.clients.closeAll()
})
}
func (c *client) Scan(s *hrpc.Scan) hrpc.Scanner {
return newScanner(c, s)
}
func (c *client) Get(g *hrpc.Get) (*hrpc.Result, error) {
pbmsg, err := c.SendRPC(g)
if err != nil {
return nil, err
}
r, ok := pbmsg.(*pb.GetResponse)
if !ok {
return nil, fmt.Errorf("sendRPC returned not a GetResponse")
}
return hrpc.ToLocalResult(r.Result), nil
}
func (c *client) Put(p *hrpc.Mutate) (*hrpc.Result, error) {
return c.mutate(p)
}
func (c *client) Delete(d *hrpc.Mutate) (*hrpc.Result, error) {
return c.mutate(d)
}
func (c *client) Append(a *hrpc.Mutate) (*hrpc.Result, error) {
return c.mutate(a)
}
func (c *client) Increment(i *hrpc.Mutate) (int64, error) {
r, err := c.mutate(i)
if err != nil {
return 0, err
}
if len(r.Cells) != 1 {
return 0, fmt.Errorf("increment returned %d cells, but we expected exactly one",
len(r.Cells))
}
val := binary.BigEndian.Uint64(r.Cells[0].Value)
return int64(val), nil
}
func (c *client) mutate(m *hrpc.Mutate) (*hrpc.Result, error) {
pbmsg, err := c.SendRPC(m)
if err != nil {
return nil, err
}
r, ok := pbmsg.(*pb.MutateResponse)
if !ok {
return nil, fmt.Errorf("sendRPC returned not a MutateResponse")
}
return hrpc.ToLocalResult(r.Result), nil
}
func (c *client) CheckAndPut(p *hrpc.Mutate, family string,
qualifier string, expectedValue []byte) (bool, error) {
cas, err := hrpc.NewCheckAndPut(p, family, qualifier, expectedValue)
if err != nil {
return false, err
}
pbmsg, err := c.SendRPC(cas)
if err != nil {
return false, err
}
r, ok := pbmsg.(*pb.MutateResponse)
if !ok {
return false, fmt.Errorf("sendRPC returned a %T instead of MutateResponse", pbmsg)
}
if r.Processed == nil {
return false, fmt.Errorf("protobuf in the response didn't contain the field "+
"indicating whether the CheckAndPut was successful or not: %s", r)
}
return r.GetProcessed(), nil
}