312 lines
7.1 KiB
Go
312 lines
7.1 KiB
Go
// Copyright (C) 2017 The GoHBase Authors. All rights reserved.
|
|
// This file is part of GoHBase.
|
|
// Use of this source code is governed by the Apache License 2.0
|
|
// that can be found in the COPYING file.
|
|
|
|
package gohbase
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
"github.com/tsuna/gohbase/hrpc"
|
|
"github.com/tsuna/gohbase/pb"
|
|
)
|
|
|
|
const noScannerID = math.MaxUint64
|
|
|
|
// rowPadding used to pad the row key when constructing a row before
|
|
var rowPadding = []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
|
|
|
|
type scanner struct {
|
|
RPCClient
|
|
// rpc is original scan query
|
|
rpc *hrpc.Scan
|
|
// scannerID is the id of scanner on current region
|
|
scannerID uint64
|
|
// startRow is the start row in the current region
|
|
startRow []byte
|
|
results []*pb.Result
|
|
closed bool
|
|
}
|
|
|
|
func (s *scanner) Close() error {
|
|
if s.closed {
|
|
return errors.New("scanner has already been closed")
|
|
}
|
|
s.closed = true
|
|
if s.scannerID != noScannerID {
|
|
go func() {
|
|
// if we are closing in the middle of scanning a region,
|
|
// send a close scanner request
|
|
// TODO: add a deadline
|
|
rpc, err := hrpc.NewScanRange(context.Background(),
|
|
s.rpc.Table(), s.startRow, nil,
|
|
hrpc.ScannerID(s.scannerID),
|
|
hrpc.CloseScanner(),
|
|
hrpc.NumberOfRows(0))
|
|
if err != nil {
|
|
panic(fmt.Sprintf("should not happen: %s", err))
|
|
}
|
|
|
|
// If the request fails, the scanner lease will be expired
|
|
// and it will be closed automatically by hbase.
|
|
// No need to bother clients about that.
|
|
s.SendRPC(rpc)
|
|
}()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *scanner) fetch() ([]*pb.Result, error) {
|
|
// keep looping until we have error, some non-empty result or until close
|
|
for {
|
|
resp, region, err := s.request()
|
|
if err != nil {
|
|
s.Close()
|
|
return nil, err
|
|
}
|
|
|
|
s.update(resp, region)
|
|
|
|
if s.shouldClose(resp, region) {
|
|
s.Close()
|
|
}
|
|
|
|
if rs := resp.Results; len(rs) > 0 {
|
|
return rs, nil
|
|
} else if s.closed {
|
|
return nil, io.EOF
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *scanner) peek() (*pb.Result, error) {
|
|
if len(s.results) == 0 {
|
|
if s.closed {
|
|
// done scanning
|
|
return nil, io.EOF
|
|
}
|
|
|
|
rs, err := s.fetch()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// fetch cannot return zero results
|
|
s.results = rs
|
|
}
|
|
return s.results[0], nil
|
|
}
|
|
|
|
func (s *scanner) shift() {
|
|
if len(s.results) == 0 {
|
|
return
|
|
}
|
|
// set to nil so that GC isn't blocked to clean up the result
|
|
s.results[0] = nil
|
|
s.results = s.results[1:]
|
|
}
|
|
|
|
// coalesce combines result with partial if they belong to the same row
|
|
// and returns the coalesced result and whether coalescing happened
|
|
func (s *scanner) coalesce(result, partial *pb.Result) (*pb.Result, bool) {
|
|
if result == nil {
|
|
return partial, true
|
|
}
|
|
if !result.GetPartial() {
|
|
// results is not partial, shouldn't coalesce
|
|
return result, false
|
|
}
|
|
|
|
if len(partial.Cell) > 0 && !bytes.Equal(result.Cell[0].Row, partial.Cell[0].Row) {
|
|
// new row
|
|
result.Partial = proto.Bool(false)
|
|
return result, false
|
|
}
|
|
|
|
// same row, add the partial
|
|
result.Cell = append(result.Cell, partial.Cell...)
|
|
if partial.GetStale() {
|
|
result.Stale = proto.Bool(partial.GetStale())
|
|
}
|
|
return result, true
|
|
}
|
|
|
|
func newScanner(c RPCClient, rpc *hrpc.Scan) *scanner {
|
|
return &scanner{
|
|
RPCClient: c,
|
|
rpc: rpc,
|
|
startRow: rpc.StartRow(),
|
|
scannerID: noScannerID,
|
|
}
|
|
}
|
|
|
|
func toLocalResult(r *pb.Result) *hrpc.Result {
|
|
if r == nil {
|
|
return nil
|
|
}
|
|
return hrpc.ToLocalResult(r)
|
|
}
|
|
|
|
func (s *scanner) Next() (*hrpc.Result, error) {
|
|
var (
|
|
result, partial *pb.Result
|
|
err error
|
|
)
|
|
|
|
select {
|
|
case <-s.rpc.Context().Done():
|
|
s.Close()
|
|
return nil, s.rpc.Context().Err()
|
|
default:
|
|
}
|
|
|
|
if s.rpc.AllowPartialResults() {
|
|
// if client handles partials, just return it
|
|
result, err := s.peek()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
s.shift()
|
|
return toLocalResult(result), nil
|
|
}
|
|
|
|
for {
|
|
partial, err = s.peek()
|
|
if err == io.EOF && result != nil {
|
|
// no more results, return what we have. Next call to the Next() will get EOF
|
|
result.Partial = proto.Bool(false)
|
|
return toLocalResult(result), nil
|
|
}
|
|
if err != nil {
|
|
// return whatever we have so far and the error
|
|
return toLocalResult(result), err
|
|
}
|
|
|
|
var done bool
|
|
result, done = s.coalesce(result, partial)
|
|
if done {
|
|
s.shift()
|
|
}
|
|
if !result.GetPartial() {
|
|
// if not partial anymore, return it
|
|
return toLocalResult(result), nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *scanner) request() (*pb.ScanResponse, hrpc.RegionInfo, error) {
|
|
var (
|
|
rpc *hrpc.Scan
|
|
err error
|
|
)
|
|
|
|
if s.scannerID == noScannerID {
|
|
// starting to scan on a new region
|
|
rpc, err = hrpc.NewScanRange(
|
|
s.rpc.Context(),
|
|
s.rpc.Table(),
|
|
s.startRow,
|
|
s.rpc.StopRow(),
|
|
s.rpc.Options()...)
|
|
} else {
|
|
// continuing to scan current region
|
|
rpc, err = hrpc.NewScanRange(s.rpc.Context(),
|
|
s.rpc.Table(),
|
|
s.startRow,
|
|
nil,
|
|
hrpc.ScannerID(s.scannerID),
|
|
hrpc.NumberOfRows(s.rpc.NumberOfRows()))
|
|
}
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
res, err := s.SendRPC(rpc)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
scanres, ok := res.(*pb.ScanResponse)
|
|
if !ok {
|
|
return nil, nil, errors.New("got non-ScanResponse for scan request")
|
|
}
|
|
return scanres, rpc.Region(), nil
|
|
}
|
|
|
|
// update updates the scanner for the next scan request
|
|
func (s *scanner) update(resp *pb.ScanResponse, region hrpc.RegionInfo) {
|
|
if resp.GetMoreResultsInRegion() {
|
|
if resp.ScannerId != nil {
|
|
s.scannerID = resp.GetScannerId()
|
|
}
|
|
} else {
|
|
// we are done with this region, prepare scan for next region
|
|
s.scannerID = noScannerID
|
|
|
|
// Normal Scan
|
|
if !s.rpc.Reversed() {
|
|
s.startRow = region.StopKey()
|
|
return
|
|
}
|
|
|
|
// Reversed Scan
|
|
// return if we are at the end
|
|
if len(region.StartKey()) == 0 {
|
|
s.startRow = region.StartKey()
|
|
return
|
|
}
|
|
|
|
// create the nearest value lower than the current region startKey
|
|
rsk := region.StartKey()
|
|
// if last element is 0x0, just shorten the slice
|
|
if rsk[len(rsk)-1] == 0x0 {
|
|
s.startRow = rsk[:len(rsk)-1]
|
|
return
|
|
}
|
|
|
|
// otherwise lower the last element byte value by 1 and pad with 0xffs
|
|
tmp := make([]byte, len(rsk), len(rsk)+len(rowPadding))
|
|
copy(tmp, rsk)
|
|
tmp[len(tmp)-1] = tmp[len(tmp)-1] - 1
|
|
s.startRow = append(tmp, rowPadding...)
|
|
}
|
|
}
|
|
|
|
// shouldClose check if this scanner should be closed and should stop fetching new results
|
|
func (s *scanner) shouldClose(resp *pb.ScanResponse, region hrpc.RegionInfo) bool {
|
|
if resp.MoreResults != nil && !*resp.MoreResults {
|
|
// the filter for the whole scan has been exhausted, close the scanner
|
|
return true
|
|
}
|
|
|
|
if s.scannerID != noScannerID {
|
|
// not done with this region yet
|
|
return false
|
|
}
|
|
|
|
// Check to see if this region is the last we should scan because:
|
|
// (1) it's the last region
|
|
if len(region.StopKey()) == 0 && !s.rpc.Reversed() {
|
|
return true
|
|
}
|
|
if s.rpc.Reversed() && len(region.StartKey()) == 0 {
|
|
return true
|
|
}
|
|
// (3) because its stop_key is greater than or equal to the stop_key of this scanner,
|
|
// provided that (2) we're not trying to scan until the end of the table.
|
|
if !s.rpc.Reversed() {
|
|
return len(s.rpc.StopRow()) != 0 && // (2)
|
|
bytes.Compare(s.rpc.StopRow(), region.StopKey()) <= 0 // (3)
|
|
}
|
|
|
|
// Reversed Scanner
|
|
return len(s.rpc.StopRow()) != 0 && // (2)
|
|
bytes.Compare(s.rpc.StopRow(), region.StartKey()) >= 0 // (3)
|
|
}
|