go-common/vendor/github.com/tsuna/gohbase/scanner.go
2019-04-22 02:59:20 +00:00

312 lines
7.1 KiB
Go

// Copyright (C) 2017 The GoHBase Authors. All rights reserved.
// This file is part of GoHBase.
// Use of this source code is governed by the Apache License 2.0
// that can be found in the COPYING file.
package gohbase
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"math"
"github.com/golang/protobuf/proto"
"github.com/tsuna/gohbase/hrpc"
"github.com/tsuna/gohbase/pb"
)
const noScannerID = math.MaxUint64
// rowPadding used to pad the row key when constructing a row before
var rowPadding = []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
type scanner struct {
RPCClient
// rpc is original scan query
rpc *hrpc.Scan
// scannerID is the id of scanner on current region
scannerID uint64
// startRow is the start row in the current region
startRow []byte
results []*pb.Result
closed bool
}
func (s *scanner) Close() error {
if s.closed {
return errors.New("scanner has already been closed")
}
s.closed = true
if s.scannerID != noScannerID {
go func() {
// if we are closing in the middle of scanning a region,
// send a close scanner request
// TODO: add a deadline
rpc, err := hrpc.NewScanRange(context.Background(),
s.rpc.Table(), s.startRow, nil,
hrpc.ScannerID(s.scannerID),
hrpc.CloseScanner(),
hrpc.NumberOfRows(0))
if err != nil {
panic(fmt.Sprintf("should not happen: %s", err))
}
// If the request fails, the scanner lease will be expired
// and it will be closed automatically by hbase.
// No need to bother clients about that.
s.SendRPC(rpc)
}()
}
return nil
}
func (s *scanner) fetch() ([]*pb.Result, error) {
// keep looping until we have error, some non-empty result or until close
for {
resp, region, err := s.request()
if err != nil {
s.Close()
return nil, err
}
s.update(resp, region)
if s.shouldClose(resp, region) {
s.Close()
}
if rs := resp.Results; len(rs) > 0 {
return rs, nil
} else if s.closed {
return nil, io.EOF
}
}
}
func (s *scanner) peek() (*pb.Result, error) {
if len(s.results) == 0 {
if s.closed {
// done scanning
return nil, io.EOF
}
rs, err := s.fetch()
if err != nil {
return nil, err
}
// fetch cannot return zero results
s.results = rs
}
return s.results[0], nil
}
func (s *scanner) shift() {
if len(s.results) == 0 {
return
}
// set to nil so that GC isn't blocked to clean up the result
s.results[0] = nil
s.results = s.results[1:]
}
// coalesce combines result with partial if they belong to the same row
// and returns the coalesced result and whether coalescing happened
func (s *scanner) coalesce(result, partial *pb.Result) (*pb.Result, bool) {
if result == nil {
return partial, true
}
if !result.GetPartial() {
// results is not partial, shouldn't coalesce
return result, false
}
if len(partial.Cell) > 0 && !bytes.Equal(result.Cell[0].Row, partial.Cell[0].Row) {
// new row
result.Partial = proto.Bool(false)
return result, false
}
// same row, add the partial
result.Cell = append(result.Cell, partial.Cell...)
if partial.GetStale() {
result.Stale = proto.Bool(partial.GetStale())
}
return result, true
}
func newScanner(c RPCClient, rpc *hrpc.Scan) *scanner {
return &scanner{
RPCClient: c,
rpc: rpc,
startRow: rpc.StartRow(),
scannerID: noScannerID,
}
}
func toLocalResult(r *pb.Result) *hrpc.Result {
if r == nil {
return nil
}
return hrpc.ToLocalResult(r)
}
func (s *scanner) Next() (*hrpc.Result, error) {
var (
result, partial *pb.Result
err error
)
select {
case <-s.rpc.Context().Done():
s.Close()
return nil, s.rpc.Context().Err()
default:
}
if s.rpc.AllowPartialResults() {
// if client handles partials, just return it
result, err := s.peek()
if err != nil {
return nil, err
}
s.shift()
return toLocalResult(result), nil
}
for {
partial, err = s.peek()
if err == io.EOF && result != nil {
// no more results, return what we have. Next call to the Next() will get EOF
result.Partial = proto.Bool(false)
return toLocalResult(result), nil
}
if err != nil {
// return whatever we have so far and the error
return toLocalResult(result), err
}
var done bool
result, done = s.coalesce(result, partial)
if done {
s.shift()
}
if !result.GetPartial() {
// if not partial anymore, return it
return toLocalResult(result), nil
}
}
}
func (s *scanner) request() (*pb.ScanResponse, hrpc.RegionInfo, error) {
var (
rpc *hrpc.Scan
err error
)
if s.scannerID == noScannerID {
// starting to scan on a new region
rpc, err = hrpc.NewScanRange(
s.rpc.Context(),
s.rpc.Table(),
s.startRow,
s.rpc.StopRow(),
s.rpc.Options()...)
} else {
// continuing to scan current region
rpc, err = hrpc.NewScanRange(s.rpc.Context(),
s.rpc.Table(),
s.startRow,
nil,
hrpc.ScannerID(s.scannerID),
hrpc.NumberOfRows(s.rpc.NumberOfRows()))
}
if err != nil {
return nil, nil, err
}
res, err := s.SendRPC(rpc)
if err != nil {
return nil, nil, err
}
scanres, ok := res.(*pb.ScanResponse)
if !ok {
return nil, nil, errors.New("got non-ScanResponse for scan request")
}
return scanres, rpc.Region(), nil
}
// update updates the scanner for the next scan request
func (s *scanner) update(resp *pb.ScanResponse, region hrpc.RegionInfo) {
if resp.GetMoreResultsInRegion() {
if resp.ScannerId != nil {
s.scannerID = resp.GetScannerId()
}
} else {
// we are done with this region, prepare scan for next region
s.scannerID = noScannerID
// Normal Scan
if !s.rpc.Reversed() {
s.startRow = region.StopKey()
return
}
// Reversed Scan
// return if we are at the end
if len(region.StartKey()) == 0 {
s.startRow = region.StartKey()
return
}
// create the nearest value lower than the current region startKey
rsk := region.StartKey()
// if last element is 0x0, just shorten the slice
if rsk[len(rsk)-1] == 0x0 {
s.startRow = rsk[:len(rsk)-1]
return
}
// otherwise lower the last element byte value by 1 and pad with 0xffs
tmp := make([]byte, len(rsk), len(rsk)+len(rowPadding))
copy(tmp, rsk)
tmp[len(tmp)-1] = tmp[len(tmp)-1] - 1
s.startRow = append(tmp, rowPadding...)
}
}
// shouldClose check if this scanner should be closed and should stop fetching new results
func (s *scanner) shouldClose(resp *pb.ScanResponse, region hrpc.RegionInfo) bool {
if resp.MoreResults != nil && !*resp.MoreResults {
// the filter for the whole scan has been exhausted, close the scanner
return true
}
if s.scannerID != noScannerID {
// not done with this region yet
return false
}
// Check to see if this region is the last we should scan because:
// (1) it's the last region
if len(region.StopKey()) == 0 && !s.rpc.Reversed() {
return true
}
if s.rpc.Reversed() && len(region.StartKey()) == 0 {
return true
}
// (3) because its stop_key is greater than or equal to the stop_key of this scanner,
// provided that (2) we're not trying to scan until the end of the table.
if !s.rpc.Reversed() {
return len(s.rpc.StopRow()) != 0 && // (2)
bytes.Compare(s.rpc.StopRow(), region.StopKey()) <= 0 // (3)
}
// Reversed Scanner
return len(s.rpc.StopRow()) != 0 && // (2)
bytes.Compare(s.rpc.StopRow(), region.StartKey()) >= 0 // (3)
}