296 lines
7.5 KiB
Go
296 lines
7.5 KiB
Go
// Copyright (C) 2015 The GoHBase Authors. All rights reserved.
|
|
// This file is part of GoHBase.
|
|
// Use of this source code is governed by the Apache License 2.0
|
|
// that can be found in the COPYING file.
|
|
|
|
package gohbase
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/cznic/b"
|
|
"github.com/golang/protobuf/proto"
|
|
log "github.com/sirupsen/logrus"
|
|
"github.com/tsuna/gohbase/hrpc"
|
|
"github.com/tsuna/gohbase/pb"
|
|
"github.com/tsuna/gohbase/region"
|
|
"github.com/tsuna/gohbase/zk"
|
|
"golang.org/x/time/rate"
|
|
)
|
|
|
|
const (
|
|
standardClient = iota
|
|
adminClient
|
|
defaultRPCQueueSize = 100
|
|
defaultFlushInterval = 20 * time.Millisecond
|
|
defaultZkRoot = "/hbase"
|
|
defaultZkTimeout = 30 * time.Second
|
|
defaultEffectiveUser = "root"
|
|
// metaBurst is maximum number of request allowed at once.
|
|
metaBurst = 10
|
|
// metaLimit is rate at which to throttle requests to hbase:meta table.
|
|
metaLimit = rate.Limit(100)
|
|
)
|
|
|
|
// Client a regular HBase client
|
|
type Client interface {
|
|
Scan(s *hrpc.Scan) hrpc.Scanner
|
|
Get(g *hrpc.Get) (*hrpc.Result, error)
|
|
Put(p *hrpc.Mutate) (*hrpc.Result, error)
|
|
Delete(d *hrpc.Mutate) (*hrpc.Result, error)
|
|
Append(a *hrpc.Mutate) (*hrpc.Result, error)
|
|
Increment(i *hrpc.Mutate) (int64, error)
|
|
CheckAndPut(p *hrpc.Mutate, family string, qualifier string,
|
|
expectedValue []byte) (bool, error)
|
|
Close()
|
|
}
|
|
|
|
// RPCClient is core client of gohbase. It's exposed for testing.
|
|
type RPCClient interface {
|
|
SendRPC(rpc hrpc.Call) (proto.Message, error)
|
|
}
|
|
|
|
// Option is a function used to configure optional config items for a Client.
|
|
type Option func(*client)
|
|
|
|
// A Client provides access to an HBase cluster.
|
|
type client struct {
|
|
clientType int
|
|
|
|
regions keyRegionCache
|
|
|
|
// Maps a hrpc.RegionInfo to the *region.Client that we think currently
|
|
// serves it.
|
|
clients clientRegionCache
|
|
|
|
metaRegionInfo hrpc.RegionInfo
|
|
|
|
adminRegionInfo hrpc.RegionInfo
|
|
|
|
// The maximum size of the RPC queue in the region client
|
|
rpcQueueSize int
|
|
|
|
// zkClient is zookeeper for retrieving meta and admin information
|
|
zkClient zk.Client
|
|
|
|
// The root zookeeper path for Hbase. By default, this is usually "/hbase".
|
|
zkRoot string
|
|
|
|
// The zookeeper session timeout
|
|
zkTimeout time.Duration
|
|
|
|
// The timeout before flushing the RPC queue in the region client
|
|
flushInterval time.Duration
|
|
|
|
// The user used when accessing regions.
|
|
effectiveUser string
|
|
|
|
// metaLookupLimiter is used to throttle lookups to hbase:meta table
|
|
metaLookupLimiter *rate.Limiter
|
|
|
|
// How long to wait for a region lookup (either meta lookup or finding
|
|
// meta in ZooKeeper). Should be greater than or equal to the ZooKeeper
|
|
// session timeout.
|
|
regionLookupTimeout time.Duration
|
|
|
|
// regionReadTimeout is the maximum amount of time to wait for regionserver reply
|
|
regionReadTimeout time.Duration
|
|
|
|
done chan struct{}
|
|
closeOnce sync.Once
|
|
}
|
|
|
|
// NewClient creates a new HBase client.
|
|
func NewClient(zkquorum string, options ...Option) Client {
|
|
return newClient(zkquorum, options...)
|
|
}
|
|
|
|
func newClient(zkquorum string, options ...Option) *client {
|
|
log.WithFields(log.Fields{
|
|
"Host": zkquorum,
|
|
}).Debug("Creating new client.")
|
|
c := &client{
|
|
clientType: standardClient,
|
|
regions: keyRegionCache{regions: b.TreeNew(region.CompareGeneric)},
|
|
clients: clientRegionCache{
|
|
regions: make(map[hrpc.RegionClient]map[hrpc.RegionInfo]struct{}),
|
|
},
|
|
rpcQueueSize: defaultRPCQueueSize,
|
|
flushInterval: defaultFlushInterval,
|
|
metaRegionInfo: region.NewInfo(
|
|
0,
|
|
[]byte("hbase"),
|
|
[]byte("meta"),
|
|
[]byte("hbase:meta,,1"),
|
|
nil,
|
|
nil),
|
|
zkRoot: defaultZkRoot,
|
|
zkTimeout: defaultZkTimeout,
|
|
effectiveUser: defaultEffectiveUser,
|
|
metaLookupLimiter: rate.NewLimiter(metaLimit, metaBurst),
|
|
regionLookupTimeout: region.DefaultLookupTimeout,
|
|
regionReadTimeout: region.DefaultReadTimeout,
|
|
done: make(chan struct{}),
|
|
}
|
|
for _, option := range options {
|
|
option(c)
|
|
}
|
|
|
|
//Have to create the zkClient after the Options have been set
|
|
//since the zkTimeout could be changed as an option
|
|
c.zkClient = zk.NewClient(zkquorum, c.zkTimeout)
|
|
|
|
return c
|
|
}
|
|
|
|
// RpcQueueSize will return an option that will set the size of the RPC queues
|
|
// used in a given client
|
|
func RpcQueueSize(size int) Option {
|
|
return func(c *client) {
|
|
c.rpcQueueSize = size
|
|
}
|
|
}
|
|
|
|
// ZookeeperRoot will return an option that will set the zookeeper root path used in a given client.
|
|
func ZookeeperRoot(root string) Option {
|
|
return func(c *client) {
|
|
c.zkRoot = root
|
|
}
|
|
}
|
|
|
|
// ZookeeperTimeout will return an option that will set the zookeeper session timeout.
|
|
func ZookeeperTimeout(to time.Duration) Option {
|
|
return func(c *client) {
|
|
c.zkTimeout = to
|
|
}
|
|
}
|
|
|
|
// RegionLookupTimeout will return an option that sets the region lookup timeout
|
|
func RegionLookupTimeout(to time.Duration) Option {
|
|
return func(c *client) {
|
|
c.regionLookupTimeout = to
|
|
}
|
|
}
|
|
|
|
// RegionReadTimeout will return an option that sets the region read timeout
|
|
func RegionReadTimeout(to time.Duration) Option {
|
|
return func(c *client) {
|
|
c.regionReadTimeout = to
|
|
}
|
|
}
|
|
|
|
// EffectiveUser will return an option that will set the user used when accessing regions.
|
|
func EffectiveUser(user string) Option {
|
|
return func(c *client) {
|
|
c.effectiveUser = user
|
|
}
|
|
}
|
|
|
|
// FlushInterval will return an option that will set the timeout for flushing
|
|
// the RPC queues used in a given client
|
|
func FlushInterval(interval time.Duration) Option {
|
|
return func(c *client) {
|
|
c.flushInterval = interval
|
|
}
|
|
}
|
|
|
|
// Close closes connections to hbase master and regionservers
|
|
func (c *client) Close() {
|
|
c.closeOnce.Do(func() {
|
|
close(c.done)
|
|
if c.clientType == adminClient {
|
|
if ac := c.adminRegionInfo.Client(); ac != nil {
|
|
ac.Close()
|
|
}
|
|
}
|
|
c.clients.closeAll()
|
|
})
|
|
}
|
|
|
|
func (c *client) Scan(s *hrpc.Scan) hrpc.Scanner {
|
|
return newScanner(c, s)
|
|
}
|
|
|
|
func (c *client) Get(g *hrpc.Get) (*hrpc.Result, error) {
|
|
pbmsg, err := c.SendRPC(g)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
r, ok := pbmsg.(*pb.GetResponse)
|
|
if !ok {
|
|
return nil, fmt.Errorf("sendRPC returned not a GetResponse")
|
|
}
|
|
|
|
return hrpc.ToLocalResult(r.Result), nil
|
|
}
|
|
|
|
func (c *client) Put(p *hrpc.Mutate) (*hrpc.Result, error) {
|
|
return c.mutate(p)
|
|
}
|
|
|
|
func (c *client) Delete(d *hrpc.Mutate) (*hrpc.Result, error) {
|
|
return c.mutate(d)
|
|
}
|
|
|
|
func (c *client) Append(a *hrpc.Mutate) (*hrpc.Result, error) {
|
|
return c.mutate(a)
|
|
}
|
|
|
|
func (c *client) Increment(i *hrpc.Mutate) (int64, error) {
|
|
r, err := c.mutate(i)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if len(r.Cells) != 1 {
|
|
return 0, fmt.Errorf("increment returned %d cells, but we expected exactly one",
|
|
len(r.Cells))
|
|
}
|
|
|
|
val := binary.BigEndian.Uint64(r.Cells[0].Value)
|
|
return int64(val), nil
|
|
}
|
|
|
|
func (c *client) mutate(m *hrpc.Mutate) (*hrpc.Result, error) {
|
|
pbmsg, err := c.SendRPC(m)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
r, ok := pbmsg.(*pb.MutateResponse)
|
|
if !ok {
|
|
return nil, fmt.Errorf("sendRPC returned not a MutateResponse")
|
|
}
|
|
|
|
return hrpc.ToLocalResult(r.Result), nil
|
|
}
|
|
|
|
func (c *client) CheckAndPut(p *hrpc.Mutate, family string,
|
|
qualifier string, expectedValue []byte) (bool, error) {
|
|
cas, err := hrpc.NewCheckAndPut(p, family, qualifier, expectedValue)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
pbmsg, err := c.SendRPC(cas)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
r, ok := pbmsg.(*pb.MutateResponse)
|
|
if !ok {
|
|
return false, fmt.Errorf("sendRPC returned a %T instead of MutateResponse", pbmsg)
|
|
}
|
|
|
|
if r.Processed == nil {
|
|
return false, fmt.Errorf("protobuf in the response didn't contain the field "+
|
|
"indicating whether the CheckAndPut was successful or not: %s", r)
|
|
}
|
|
|
|
return r.GetProcessed(), nil
|
|
}
|