1
0
mirror of https://github.com/kubernetes-sigs/descheduler.git synced 2026-01-26 13:29:11 +01:00

bump(*): kubernetes release-1.16.0 dependencies

This commit is contained in:
Mike Dame
2019-10-12 11:11:43 -04:00
parent 5af668e89a
commit 1652ba7976
28121 changed files with 3491095 additions and 2280257 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,4 @@
// Copyright 2015 Google Inc. All Rights Reserved.
// Copyright 2015 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -15,33 +15,367 @@
package bigtable
import (
"reflect"
"context"
"fmt"
"math"
"sort"
"strings"
"testing"
"time"
"cloud.google.com/go/bigtable/bttest"
"golang.org/x/net/context"
"google.golang.org/api/option"
"google.golang.org/grpc"
"cloud.google.com/go/internal/testutil"
"github.com/golang/protobuf/proto"
"google.golang.org/api/iterator"
btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2"
)
func TestAdminIntegration(t *testing.T) {
srv, err := bttest.NewServer("127.0.0.1:0")
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatal(err)
t.Fatalf("IntegrationEnv: %v", err)
}
defer srv.Close()
t.Logf("bttest.Server running on %s", srv.Addr)
defer testEnv.Close()
ctx, _ := context.WithTimeout(context.Background(), 2*time.Second)
timeout := 2 * time.Second
if testEnv.Config().UseProd {
timeout = 5 * time.Minute
}
ctx, _ := context.WithTimeout(context.Background(), timeout)
conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure())
adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("grpc.Dial: %v", err)
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()
iAdminClient, err := testEnv.NewInstanceAdminClient()
if err != nil {
t.Fatalf("NewInstanceAdminClient: %v", err)
}
if iAdminClient != nil {
defer iAdminClient.Close()
iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance)
if err != nil {
t.Errorf("InstanceInfo: %v", err)
}
if iInfo.Name != adminClient.instance {
t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
}
}
adminClient, err := NewAdminClient(ctx, "proj", "instance", option.WithGRPCConn(conn))
list := func() []string {
tbls, err := adminClient.Tables(ctx)
if err != nil {
t.Fatalf("Fetching list of tables: %v", err)
}
sort.Strings(tbls)
return tbls
}
containsAll := func(got, want []string) bool {
gotSet := make(map[string]bool)
for _, s := range got {
gotSet[s] = true
}
for _, s := range want {
if !gotSet[s] {
return false
}
}
return true
}
defer adminClient.DeleteTable(ctx, "mytable")
if err := adminClient.CreateTable(ctx, "mytable"); err != nil {
t.Fatalf("Creating table: %v", err)
}
defer adminClient.DeleteTable(ctx, "myothertable")
if err := adminClient.CreateTable(ctx, "myothertable"); err != nil {
t.Fatalf("Creating table: %v", err)
}
if got, want := list(), []string{"myothertable", "mytable"}; !containsAll(got, want) {
t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
}
must(adminClient.WaitForReplication(ctx, "mytable"))
if err := adminClient.DeleteTable(ctx, "myothertable"); err != nil {
t.Fatalf("Deleting table: %v", err)
}
tables := list()
if got, want := tables, []string{"mytable"}; !containsAll(got, want) {
t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
}
if got, unwanted := tables, []string{"myothertable"}; containsAll(got, unwanted) {
t.Errorf("adminClient.Tables return %#v. unwanted %#v", got, unwanted)
}
tblConf := TableConf{
TableID: "conftable",
Families: map[string]GCPolicy{
"fam1": MaxVersionsPolicy(1),
"fam2": MaxVersionsPolicy(2),
},
}
if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil {
t.Fatalf("Creating table from TableConf: %v", err)
}
defer adminClient.DeleteTable(ctx, tblConf.TableID)
tblInfo, err := adminClient.TableInfo(ctx, tblConf.TableID)
if err != nil {
t.Fatalf("Getting table info: %v", err)
}
sort.Strings(tblInfo.Families)
wantFams := []string{"fam1", "fam2"}
if !testutil.Equal(tblInfo.Families, wantFams) {
t.Errorf("Column family mismatch, got %v, want %v", tblInfo.Families, wantFams)
}
// Populate mytable and drop row ranges
if err = adminClient.CreateColumnFamily(ctx, "mytable", "cf"); err != nil {
t.Fatalf("Creating column family: %v", err)
}
client, err := testEnv.NewClient()
if err != nil {
t.Fatalf("NewClient: %v", err)
}
defer client.Close()
tbl := client.Open("mytable")
prefixes := []string{"a", "b", "c"}
for _, prefix := range prefixes {
for i := 0; i < 5; i++ {
mut := NewMutation()
mut.Set("cf", "col", 1000, []byte("1"))
if err := tbl.Apply(ctx, fmt.Sprintf("%v-%v", prefix, i), mut); err != nil {
t.Fatalf("Mutating row: %v", err)
}
}
}
if err = adminClient.DropRowRange(ctx, "mytable", "a"); err != nil {
t.Errorf("DropRowRange a: %v", err)
}
if err = adminClient.DropRowRange(ctx, "mytable", "c"); err != nil {
t.Errorf("DropRowRange c: %v", err)
}
if err = adminClient.DropRowRange(ctx, "mytable", "x"); err != nil {
t.Errorf("DropRowRange x: %v", err)
}
var gotRowCount int
must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool {
gotRowCount++
if !strings.HasPrefix(row.Key(), "b") {
t.Errorf("Invalid row after dropping range: %v", row)
}
return true
}))
if gotRowCount != 5 {
t.Errorf("Invalid row count after dropping range: got %v, want %v", gotRowCount, 5)
}
}
func TestInstanceUpdate(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
timeout := 2 * time.Second
if testEnv.Config().UseProd {
timeout = 5 * time.Minute
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()
iAdminClient, err := testEnv.NewInstanceAdminClient()
if err != nil {
t.Fatalf("NewInstanceAdminClient: %v", err)
}
if iAdminClient == nil {
return
}
defer iAdminClient.Close()
iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance)
if err != nil {
t.Errorf("InstanceInfo: %v", err)
}
if iInfo.Name != adminClient.instance {
t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
}
if iInfo.DisplayName != adminClient.instance {
t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
}
const numNodes = 4
// update cluster nodes
if err := iAdminClient.UpdateCluster(ctx, adminClient.instance, testEnv.Config().Cluster, int32(numNodes)); err != nil {
t.Errorf("UpdateCluster: %v", err)
}
// get cluster after updating
cis, err := iAdminClient.GetCluster(ctx, adminClient.instance, testEnv.Config().Cluster)
if err != nil {
t.Errorf("GetCluster %v", err)
}
if cis.ServeNodes != int(numNodes) {
t.Errorf("ServeNodes returned %d, want %d", cis.ServeNodes, int(numNodes))
}
}
func TestAdminSnapshotIntegration(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support snapshots")
}
timeout := 2 * time.Second
if testEnv.Config().UseProd {
timeout = 5 * time.Minute
}
ctx, _ := context.WithTimeout(context.Background(), timeout)
adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()
table := testEnv.Config().Table
cluster := testEnv.Config().Cluster
list := func(cluster string) ([]*SnapshotInfo, error) {
infos := []*SnapshotInfo(nil)
it := adminClient.Snapshots(ctx, cluster)
for {
s, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}
infos = append(infos, s)
}
return infos, err
}
// Delete the table at the end of the test. Schedule ahead of time
// in case the client fails
defer adminClient.DeleteTable(ctx, table)
if err := adminClient.CreateTable(ctx, table); err != nil {
t.Fatalf("Creating table: %v", err)
}
// Precondition: no snapshots
snapshots, err := list(cluster)
if err != nil {
t.Fatalf("Initial snapshot list: %v", err)
}
if got, want := len(snapshots), 0; got != want {
t.Fatalf("Initial snapshot list len: %d, want: %d", got, want)
}
// Create snapshot
defer adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot")
if err = adminClient.SnapshotTable(ctx, table, cluster, "mysnapshot", 5*time.Hour); err != nil {
t.Fatalf("Creating snaphot: %v", err)
}
// List snapshot
snapshots, err = list(cluster)
if err != nil {
t.Fatalf("Listing snapshots: %v", err)
}
if got, want := len(snapshots), 1; got != want {
t.Fatalf("Listing snapshot count: %d, want: %d", got, want)
}
if got, want := snapshots[0].Name, "mysnapshot"; got != want {
t.Fatalf("Snapshot name: %s, want: %s", got, want)
}
if got, want := snapshots[0].SourceTable, table; got != want {
t.Fatalf("Snapshot SourceTable: %s, want: %s", got, want)
}
if got, want := snapshots[0].DeleteTime, snapshots[0].CreateTime.Add(5*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 {
t.Fatalf("Snapshot DeleteTime: %s, want: %s", got, want)
}
// Get snapshot
snapshot, err := adminClient.SnapshotInfo(ctx, cluster, "mysnapshot")
if err != nil {
t.Fatalf("SnapshotInfo: %v", snapshot)
}
if got, want := *snapshot, *snapshots[0]; got != want {
t.Fatalf("SnapshotInfo: %v, want: %v", got, want)
}
// Restore
restoredTable := table + "-restored"
defer adminClient.DeleteTable(ctx, restoredTable)
if err = adminClient.CreateTableFromSnapshot(ctx, restoredTable, cluster, "mysnapshot"); err != nil {
t.Fatalf("CreateTableFromSnapshot: %v", err)
}
if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil {
t.Fatalf("Restored TableInfo: %v", err)
}
// Delete snapshot
if err = adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot"); err != nil {
t.Fatalf("DeleteSnapshot: %v", err)
}
snapshots, err = list(cluster)
if err != nil {
t.Fatalf("List after Delete: %v", err)
}
if got, want := len(snapshots), 0; got != want {
t.Fatalf("List after delete len: %d, want: %d", got, want)
}
}
func TestGranularity(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
timeout := 2 * time.Second
if testEnv.Config().UseProd {
timeout = 5 * time.Minute
}
ctx, _ := context.WithTimeout(context.Background(), timeout)
adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
@@ -55,19 +389,190 @@ func TestAdminIntegration(t *testing.T) {
sort.Strings(tbls)
return tbls
}
containsAll := func(got, want []string) bool {
gotSet := make(map[string]bool)
for _, s := range got {
gotSet[s] = true
}
for _, s := range want {
if !gotSet[s] {
return false
}
}
return true
}
defer adminClient.DeleteTable(ctx, "mytable")
if err := adminClient.CreateTable(ctx, "mytable"); err != nil {
t.Fatalf("Creating table: %v", err)
}
if err := adminClient.CreateTable(ctx, "myothertable"); err != nil {
t.Fatalf("Creating table: %v", err)
}
if got, want := list(), []string{"myothertable", "mytable"}; !reflect.DeepEqual(got, want) {
tables := list()
if got, want := tables, []string{"mytable"}; !containsAll(got, want) {
t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
}
if err := adminClient.DeleteTable(ctx, "myothertable"); err != nil {
t.Fatalf("Deleting table: %v", err)
// calling ModifyColumnFamilies to check the granularity of table
prefix := adminClient.instancePrefix()
req := &btapb.ModifyColumnFamiliesRequest{
Name: prefix + "/tables/" + "mytable",
Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
Id: "cf",
Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{&btapb.ColumnFamily{}},
}},
}
if got, want := list(), []string{"mytable"}; !reflect.DeepEqual(got, want) {
t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
table, err := adminClient.tClient.ModifyColumnFamilies(ctx, req)
if err != nil {
t.Fatalf("Creating column family: %v", err)
}
if table.Granularity != btapb.Table_TimestampGranularity(btapb.Table_MILLIS) {
t.Errorf("ModifyColumnFamilies returned granularity %#v, want %#v", table.Granularity, btapb.Table_TimestampGranularity(btapb.Table_MILLIS))
}
}
func TestInstanceAdminClient_AppProfile(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
timeout := 2 * time.Second
if testEnv.Config().UseProd {
timeout = 5 * time.Minute
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()
iAdminClient, err := testEnv.NewInstanceAdminClient()
if err != nil {
t.Fatalf("NewInstanceAdminClient: %v", err)
}
if iAdminClient == nil {
return
}
defer iAdminClient.Close()
profile := ProfileConf{
ProfileID: "app_profile1",
InstanceID: adminClient.instance,
ClusterID: testEnv.Config().Cluster,
Description: "creating new app profile 1",
RoutingPolicy: SingleClusterRouting,
}
createdProfile, err := iAdminClient.CreateAppProfile(ctx, profile)
if err != nil {
t.Fatalf("Creating app profile: %v", err)
}
gotProfile, err := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1")
if err != nil {
t.Fatalf("Get app profile: %v", err)
}
if !proto.Equal(createdProfile, gotProfile) {
t.Fatalf("created profile: %s, got profile: %s", createdProfile.Name, gotProfile.Name)
}
list := func(instanceID string) ([]*btapb.AppProfile, error) {
profiles := []*btapb.AppProfile(nil)
it := iAdminClient.ListAppProfiles(ctx, instanceID)
for {
s, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}
profiles = append(profiles, s)
}
return profiles, err
}
profiles, err := list(adminClient.instance)
if err != nil {
t.Fatalf("List app profile: %v", err)
}
if got, want := len(profiles), 1; got != want {
t.Fatalf("Initial app profile list len: %d, want: %d", got, want)
}
for _, test := range []struct {
desc string
uattrs ProfileAttrsToUpdate
want *btapb.AppProfile // nil means error
}{
{
desc: "empty update",
uattrs: ProfileAttrsToUpdate{},
want: nil,
},
{
desc: "empty description update",
uattrs: ProfileAttrsToUpdate{Description: ""},
want: &btapb.AppProfile{
Name: gotProfile.Name,
Description: "",
RoutingPolicy: gotProfile.RoutingPolicy,
Etag: gotProfile.Etag},
},
{
desc: "routing update",
uattrs: ProfileAttrsToUpdate{
RoutingPolicy: SingleClusterRouting,
ClusterID: testEnv.Config().Cluster,
},
want: &btapb.AppProfile{
Name: gotProfile.Name,
Description: "",
Etag: gotProfile.Etag,
RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{
SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{
ClusterId: testEnv.Config().Cluster,
}},
},
},
} {
err = iAdminClient.UpdateAppProfile(ctx, adminClient.instance, "app_profile1", test.uattrs)
if err != nil {
if test.want != nil {
t.Errorf("%s: %v", test.desc, err)
}
continue
}
if err == nil && test.want == nil {
t.Errorf("%s: got nil, want error", test.desc)
continue
}
got, _ := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1")
if !proto.Equal(got, test.want) {
t.Fatalf("%s : got profile : %v, want profile: %v", test.desc, gotProfile, test.want)
}
}
err = iAdminClient.DeleteAppProfile(ctx, adminClient.instance, "app_profile1")
if err != nil {
t.Fatalf("Delete app profile: %v", err)
}
}

View File

@@ -1,5 +1,5 @@
/*
Copyright 2015 Google Inc. All Rights Reserved.
Copyright 2015 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@ limitations under the License.
package bigtable // import "cloud.google.com/go/bigtable"
import (
"context"
"errors"
"fmt"
"io"
@@ -25,14 +26,15 @@ import (
"cloud.google.com/go/bigtable/internal/gax"
btopt "cloud.google.com/go/bigtable/internal/option"
"cloud.google.com/go/internal/trace"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"google.golang.org/api/option"
"google.golang.org/api/transport"
gtransport "google.golang.org/api/transport/grpc"
btpb "google.golang.org/genproto/googleapis/bigtable/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
const prodAddr = "bigtable.googleapis.com:443"
@@ -44,24 +46,48 @@ type Client struct {
conn *grpc.ClientConn
client btpb.BigtableClient
project, instance string
appProfile string
}
// ClientConfig has configurations for the client.
type ClientConfig struct {
// The id of the app profile to associate with all data operations sent from this client.
// If unspecified, the default app profile for the instance will be used.
AppProfile string
}
// NewClient creates a new Client for a given project and instance.
// The default ClientConfig will be used.
func NewClient(ctx context.Context, project, instance string, opts ...option.ClientOption) (*Client, error) {
return NewClientWithConfig(ctx, project, instance, ClientConfig{}, opts...)
}
// NewClientWithConfig creates a new client with the given config.
func NewClientWithConfig(ctx context.Context, project, instance string, config ClientConfig, opts ...option.ClientOption) (*Client, error) {
o, err := btopt.DefaultClientOptions(prodAddr, Scope, clientUserAgent)
if err != nil {
return nil, err
}
// Default to a small connection pool that can be overridden.
o = append(o,
option.WithGRPCConnectionPool(4),
// Set the max size to correspond to server-side limits.
option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(1<<28), grpc.MaxCallRecvMsgSize(1<<28))),
// TODO(grpc/grpc-go#1388) using connection pool without WithBlock
// can cause RPCs to fail randomly. We can delete this after the issue is fixed.
option.WithGRPCDialOption(grpc.WithBlock()))
o = append(o, opts...)
conn, err := transport.DialGRPC(ctx, o...)
conn, err := gtransport.Dial(ctx, o...)
if err != nil {
return nil, fmt.Errorf("dialing: %v", err)
}
return &Client{
conn: conn,
client: btpb.NewBigtableClient(conn),
project: project,
instance: instance,
conn: conn,
client: btpb.NewBigtableClient(conn),
project: project,
instance: instance,
appProfile: config.AppProfile,
}, nil
}
@@ -117,14 +143,24 @@ func (c *Client) Open(table string) *Table {
//
// By default, the yielded rows will contain all values in all cells.
// Use RowFilter to limit the cells returned.
func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts ...ReadOption) error {
ctx = metadata.NewContext(ctx, t.md)
func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts ...ReadOption) (err error) {
ctx = mergeOutgoingMetadata(ctx, t.md)
ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable.ReadRows")
defer func() { trace.EndSpan(ctx, err) }()
var prevRowKey string
err := gax.Invoke(ctx, func(ctx context.Context) error {
attrMap := make(map[string]interface{})
err = gax.Invoke(ctx, func(ctx context.Context) error {
if !arg.valid() {
// Empty row set, no need to make an API call.
// NOTE: we must return early if arg == RowList{} because reading
// an empty RowList from bigtable returns all rows from that table.
return nil
}
req := &btpb.ReadRowsRequest{
TableName: t.c.fullTableName(t.table),
Rows: arg.proto(),
TableName: t.c.fullTableName(t.table),
AppProfileId: t.c.appProfile,
Rows: arg.proto(),
}
for _, opt := range opts {
opt.set(req)
@@ -132,6 +168,7 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts
ctx, cancel := context.WithCancel(ctx) // for aborting the stream
defer cancel()
startTime := time.Now()
stream, err := t.c.client.ReadRows(ctx, req)
if err != nil {
return err
@@ -145,8 +182,15 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts
if err != nil {
// Reset arg for next Invoke call.
arg = arg.retainRowsAfter(prevRowKey)
attrMap["rowKey"] = prevRowKey
attrMap["error"] = err.Error()
attrMap["time_secs"] = time.Since(startTime).Seconds()
trace.TracePrintf(ctx, attrMap, "Retry details in ReadRows")
return err
}
attrMap["time_secs"] = time.Since(startTime).Seconds()
attrMap["rowCount"] = len(res.Chunks)
trace.TracePrintf(ctx, attrMap, "Details in ReadRows")
for _, cc := range res.Chunks {
row, err := cr.Process(cc)
@@ -208,13 +252,17 @@ func decodeFamilyProto(r Row, row string, f *btpb.Family) {
}
}
// RowSet is a set of rows to be read. It is satisfied by RowList and RowRange.
// RowSet is a set of rows to be read. It is satisfied by RowList, RowRange and RowRangeList.
// The serialized size of the RowSet must be no larger than 1MiB.
type RowSet interface {
proto() *btpb.RowSet
// retainRowsAfter returns a new RowSet that does not include the
// given row key or any row key lexicographically less than it.
retainRowsAfter(lastRowKey string) RowSet
// Valid reports whether this set can cover at least one row.
valid() bool
}
// RowList is a sequence of row keys.
@@ -238,6 +286,10 @@ func (r RowList) retainRowsAfter(lastRowKey string) RowSet {
return retryKeys
}
func (r RowList) valid() bool {
return len(r) > 0
}
// A RowRange is a half-open interval [Start, Limit) encompassing
// all the rows with keys at least as large as Start, and less than Limit.
// (Bigtable string comparison is the same as Go's.)
@@ -275,8 +327,9 @@ func (r RowRange) String() string {
}
func (r RowRange) proto() *btpb.RowSet {
var rr *btpb.RowRange
rr = &btpb.RowRange{StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte(r.start)}}
rr := &btpb.RowRange{
StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte(r.start)},
}
if !r.Unbounded() {
rr.EndKey = &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte(r.limit)}
}
@@ -284,6 +337,9 @@ func (r RowRange) proto() *btpb.RowSet {
}
func (r RowRange) retainRowsAfter(lastRowKey string) RowSet {
if lastRowKey == "" || lastRowKey < r.start {
return r
}
// Set the beginning of the range to the row after the last scanned.
start := lastRowKey + "\x00"
if r.Unbounded() {
@@ -292,12 +348,49 @@ func (r RowRange) retainRowsAfter(lastRowKey string) RowSet {
return NewRange(start, r.limit)
}
// SingleRow returns a RowRange for reading a single row.
func SingleRow(row string) RowRange {
return RowRange{
start: row,
limit: row + "\x00",
func (r RowRange) valid() bool {
return r.Unbounded() || r.start < r.limit
}
// RowRangeList is a sequence of RowRanges representing the union of the ranges.
type RowRangeList []RowRange
func (r RowRangeList) proto() *btpb.RowSet {
ranges := make([]*btpb.RowRange, len(r))
for i, rr := range r {
// RowRange.proto() returns a RowSet with a single element RowRange array
ranges[i] = rr.proto().RowRanges[0]
}
return &btpb.RowSet{RowRanges: ranges}
}
func (r RowRangeList) retainRowsAfter(lastRowKey string) RowSet {
if lastRowKey == "" {
return r
}
// Return a list of any range that has not yet been completely processed
var ranges RowRangeList
for _, rr := range r {
retained := rr.retainRowsAfter(lastRowKey)
if retained.valid() {
ranges = append(ranges, retained.(RowRange))
}
}
return ranges
}
func (r RowRangeList) valid() bool {
for _, rr := range r {
if rr.valid() {
return true
}
}
return false
}
// SingleRow returns a RowSet for reading a single row.
func SingleRow(row string) RowSet {
return RowList{row}
}
// PrefixRange returns a RowRange consisting of all keys starting with the prefix.
@@ -341,6 +434,9 @@ type ReadOption interface {
}
// RowFilter returns a ReadOption that applies f to the contents of read rows.
//
// If multiple RowFilters are provided, only the last is used. To combine filters,
// use ChainFilters or InterleaveFilters instead.
func RowFilter(f Filter) ReadOption { return rowFilter{f} }
type rowFilter struct{ f Filter }
@@ -368,21 +464,27 @@ func mutationsAreRetryable(muts []*btpb.Mutation) bool {
return true
}
// Apply applies a Mutation to a specific row.
func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...ApplyOption) error {
ctx = metadata.NewContext(ctx, t.md)
const maxMutations = 100000
// Apply mutates a row atomically. A mutation must contain at least one
// operation and at most 100000 operations.
func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...ApplyOption) (err error) {
ctx = mergeOutgoingMetadata(ctx, t.md)
after := func(res proto.Message) {
for _, o := range opts {
o.after(res)
}
}
ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable/Apply")
defer func() { trace.EndSpan(ctx, err) }()
var callOptions []gax.CallOption
if m.cond == nil {
req := &btpb.MutateRowRequest{
TableName: t.c.fullTableName(t.table),
RowKey: []byte(row),
Mutations: m.ops,
TableName: t.c.fullTableName(t.table),
AppProfileId: t.c.appProfile,
RowKey: []byte(row),
Mutations: m.ops,
}
if mutationsAreRetryable(m.ops) {
callOptions = retryOptions
@@ -401,20 +503,27 @@ func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...Appl
req := &btpb.CheckAndMutateRowRequest{
TableName: t.c.fullTableName(t.table),
AppProfileId: t.c.appProfile,
RowKey: []byte(row),
PredicateFilter: m.cond.proto(),
}
if m.mtrue != nil {
if m.mtrue.cond != nil {
return errors.New("bigtable: conditional mutations cannot be nested")
}
req.TrueMutations = m.mtrue.ops
}
if m.mfalse != nil {
if m.mfalse.cond != nil {
return errors.New("bigtable: conditional mutations cannot be nested")
}
req.FalseMutations = m.mfalse.ops
}
if mutationsAreRetryable(req.TrueMutations) && mutationsAreRetryable(req.FalseMutations) {
callOptions = retryOptions
}
var cmRes *btpb.CheckAndMutateRowResponse
err := gax.Invoke(ctx, func(ctx context.Context) error {
err = gax.Invoke(ctx, func(ctx context.Context) error {
var err error
cmRes, err = t.c.client.CheckAndMutateRow(ctx, req)
return err
@@ -468,25 +577,20 @@ func NewCondMutation(cond Filter, mtrue, mfalse *Mutation) *Mutation {
}
// Set sets a value in a specified column, with the given timestamp.
// The timestamp will be truncated to millisecond resolution.
// The timestamp will be truncated to millisecond granularity.
// A timestamp of ServerTime means to use the server timestamp.
func (m *Mutation) Set(family, column string, ts Timestamp, value []byte) {
if ts != ServerTime {
// Truncate to millisecond resolution, since that's the default table config.
// TODO(dsymonds): Provide a way to override this behaviour.
ts -= ts % 1000
}
m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_SetCell_{&btpb.Mutation_SetCell{
m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_SetCell_{SetCell: &btpb.Mutation_SetCell{
FamilyName: family,
ColumnQualifier: []byte(column),
TimestampMicros: int64(ts),
TimestampMicros: int64(ts.TruncateToMilliseconds()),
Value: value,
}}})
}
// DeleteCellsInColumn will delete all the cells whose columns are family:column.
func (m *Mutation) DeleteCellsInColumn(family, column string) {
m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromColumn_{&btpb.Mutation_DeleteFromColumn{
m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromColumn_{DeleteFromColumn: &btpb.Mutation_DeleteFromColumn{
FamilyName: family,
ColumnQualifier: []byte(column),
}}})
@@ -495,27 +599,28 @@ func (m *Mutation) DeleteCellsInColumn(family, column string) {
// DeleteTimestampRange deletes all cells whose columns are family:column
// and whose timestamps are in the half-open interval [start, end).
// If end is zero, it will be interpreted as infinity.
// The timestamps will be truncated to millisecond granularity.
func (m *Mutation) DeleteTimestampRange(family, column string, start, end Timestamp) {
m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromColumn_{&btpb.Mutation_DeleteFromColumn{
m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromColumn_{DeleteFromColumn: &btpb.Mutation_DeleteFromColumn{
FamilyName: family,
ColumnQualifier: []byte(column),
TimeRange: &btpb.TimestampRange{
StartTimestampMicros: int64(start),
EndTimestampMicros: int64(end),
StartTimestampMicros: int64(start.TruncateToMilliseconds()),
EndTimestampMicros: int64(end.TruncateToMilliseconds()),
},
}}})
}
// DeleteCellsInFamily will delete all the cells whose columns are family:*.
func (m *Mutation) DeleteCellsInFamily(family string) {
m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromFamily_{&btpb.Mutation_DeleteFromFamily{
m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromFamily_{DeleteFromFamily: &btpb.Mutation_DeleteFromFamily{
FamilyName: family,
}}})
}
// DeleteRow deletes the entire row.
func (m *Mutation) DeleteRow() {
m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromRow_{&btpb.Mutation_DeleteFromRow{}}})
m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromRow_{DeleteFromRow: &btpb.Mutation_DeleteFromRow{}}})
}
// entryErr is a container that combines an entry with the error that was returned for it.
@@ -525,7 +630,7 @@ type entryErr struct {
Err error
}
// ApplyBulk applies multiple Mutations.
// ApplyBulk applies multiple Mutations, up to a maximum of 100,000.
// Each mutation is individually applied atomically,
// but the set of mutations may be applied in any order.
//
@@ -535,8 +640,11 @@ type entryErr struct {
// will correspond to the relevant rowKeys/muts arguments.
//
// Conditional mutations cannot be applied in bulk and providing one will result in an error.
func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutation, opts ...ApplyOption) ([]error, error) {
ctx = metadata.NewContext(ctx, t.md)
func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutation, opts ...ApplyOption) (errs []error, err error) {
ctx = mergeOutgoingMetadata(ctx, t.md)
ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable/ApplyBulk")
defer func() { trace.EndSpan(ctx, err) }()
if len(rowKeys) != len(muts) {
return nil, fmt.Errorf("mismatched rowKeys and mutation array lengths: %d, %d", len(rowKeys), len(muts))
}
@@ -550,31 +658,31 @@ func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutatio
origEntries[i] = &entryErr{Entry: &btpb.MutateRowsRequest_Entry{RowKey: []byte(key), Mutations: mut.ops}}
}
// entries will be reduced after each invocation to just what needs to be retried.
entries := make([]*entryErr, len(rowKeys))
copy(entries, origEntries)
err := gax.Invoke(ctx, func(ctx context.Context) error {
err := t.doApplyBulk(ctx, entries, opts...)
for _, group := range groupEntries(origEntries, maxMutations) {
attrMap := make(map[string]interface{})
err = gax.Invoke(ctx, func(ctx context.Context) error {
attrMap["rowCount"] = len(group)
trace.TracePrintf(ctx, attrMap, "Row count in ApplyBulk")
err := t.doApplyBulk(ctx, group, opts...)
if err != nil {
// We want to retry the entire request with the current group
return err
}
group = t.getApplyBulkRetries(group)
if len(group) > 0 && len(idempotentRetryCodes) > 0 {
// We have at least one mutation that needs to be retried.
// Return an arbitrary error that is retryable according to callOptions.
return status.Errorf(idempotentRetryCodes[0], "Synthetic error: partial failure of ApplyBulk")
}
return nil
}, retryOptions...)
if err != nil {
// We want to retry the entire request with the current entries
return err
return nil, err
}
entries = t.getApplyBulkRetries(entries)
if len(entries) > 0 && len(idempotentRetryCodes) > 0 {
// We have at least one mutation that needs to be retried.
// Return an arbitrary error that is retryable according to callOptions.
return grpc.Errorf(idempotentRetryCodes[0], "Synthetic error: partial failure of ApplyBulk")
}
return nil
}, retryOptions...)
if err != nil {
return nil, err
}
// Accumulate all of the errors into an array to return, interspersed with nils for successful
// All the errors are accumulated into an array and returned, interspersed with nils for successful
// entries. The absence of any errors means we should return nil.
var errs []error
var foundErr bool
for _, entry := range origEntries {
if entry.Err != nil {
@@ -614,8 +722,9 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, opts ...
entries[i] = entryErr.Entry
}
req := &btpb.MutateRowsRequest{
TableName: t.c.fullTableName(t.table),
Entries: entries,
TableName: t.c.fullTableName(t.table),
AppProfileId: t.c.appProfile,
Entries: entries,
}
stream, err := t.c.client.MutateRows(ctx, req)
if err != nil {
@@ -631,11 +740,11 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, opts ...
}
for i, entry := range res.Entries {
status := entry.Status
if status.Code == int32(codes.OK) {
s := entry.Status
if s.Code == int32(codes.OK) {
entryErrs[i].Err = nil
} else {
entryErrs[i].Err = grpc.Errorf(codes.Code(status.Code), status.Message)
entryErrs[i].Err = status.Errorf(codes.Code(s.Code), s.Message)
}
}
after(res)
@@ -643,6 +752,32 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, opts ...
return nil
}
// groupEntries groups entries into groups of a specified size without breaking up
// individual entries.
func groupEntries(entries []*entryErr, maxSize int) [][]*entryErr {
var (
res [][]*entryErr
start int
gmuts int
)
addGroup := func(end int) {
if end-start > 0 {
res = append(res, entries[start:end])
start = end
gmuts = 0
}
}
for i, e := range entries {
emuts := len(e.Entry.Mutations)
if gmuts+emuts > maxSize {
addGroup(i)
}
gmuts += emuts
}
addGroup(len(entries))
return res
}
// Timestamp is in units of microseconds since 1 January 1970.
type Timestamp int64
@@ -659,19 +794,32 @@ func Now() Timestamp { return Time(time.Now()) }
// Time converts a Timestamp into a time.Time.
func (ts Timestamp) Time() time.Time { return time.Unix(0, int64(ts)*1e3) }
// TruncateToMilliseconds truncates a Timestamp to millisecond granularity,
// which is currently the only granularity supported.
func (ts Timestamp) TruncateToMilliseconds() Timestamp {
if ts == ServerTime {
return ts
}
return ts - ts%1000
}
// ApplyReadModifyWrite applies a ReadModifyWrite to a specific row.
// It returns the newly written cells.
func (t *Table) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadModifyWrite) (Row, error) {
ctx = metadata.NewContext(ctx, t.md)
ctx = mergeOutgoingMetadata(ctx, t.md)
req := &btpb.ReadModifyWriteRowRequest{
TableName: t.c.fullTableName(t.table),
RowKey: []byte(row),
Rules: m.ops,
TableName: t.c.fullTableName(t.table),
AppProfileId: t.c.appProfile,
RowKey: []byte(row),
Rules: m.ops,
}
res, err := t.c.client.ReadModifyWriteRow(ctx, req)
if err != nil {
return nil, err
}
if res.Row == nil {
return nil, errors.New("unable to apply ReadModifyWrite: res.Row=nil")
}
r := make(Row)
for _, fam := range res.Row.Families { // res is *btpb.Row, fam is *btpb.Family
decodeFamilyProto(r, row, fam)
@@ -700,7 +848,7 @@ func (m *ReadModifyWrite) AppendValue(family, column string, v []byte) {
m.ops = append(m.ops, &btpb.ReadModifyWriteRule{
FamilyName: family,
ColumnQualifier: []byte(column),
Rule: &btpb.ReadModifyWriteRule_AppendValue{v},
Rule: &btpb.ReadModifyWriteRule_AppendValue{AppendValue: v},
})
}
@@ -712,6 +860,52 @@ func (m *ReadModifyWrite) Increment(family, column string, delta int64) {
m.ops = append(m.ops, &btpb.ReadModifyWriteRule{
FamilyName: family,
ColumnQualifier: []byte(column),
Rule: &btpb.ReadModifyWriteRule_IncrementAmount{delta},
Rule: &btpb.ReadModifyWriteRule_IncrementAmount{IncrementAmount: delta},
})
}
// mergeOutgoingMetadata returns a context populated by the existing outgoing metadata,
// if any, joined with internal metadata.
func mergeOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context {
mdCopy, _ := metadata.FromOutgoingContext(ctx)
return metadata.NewOutgoingContext(ctx, metadata.Join(mdCopy, md))
}
// SampleRowKeys returns a sample of row keys in the table. The returned row keys will delimit contiguous sections of
// the table of approximately equal size, which can be used to break up the data for distributed tasks like mapreduces.
func (t *Table) SampleRowKeys(ctx context.Context) ([]string, error) {
ctx = mergeOutgoingMetadata(ctx, t.md)
var sampledRowKeys []string
err := gax.Invoke(ctx, func(ctx context.Context) error {
sampledRowKeys = nil
req := &btpb.SampleRowKeysRequest{
TableName: t.c.fullTableName(t.table),
AppProfileId: t.c.appProfile,
}
ctx, cancel := context.WithCancel(ctx) // for aborting the stream
defer cancel()
stream, err := t.c.client.SampleRowKeys(ctx, req)
if err != nil {
return err
}
for {
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
key := string(res.RowKey)
if key == "" {
continue
}
sampledRowKeys = append(sampledRowKeys, key)
}
return nil
}, retryOptions...)
return sampledRowKeys, err
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,5 @@
/*
Copyright 2016 Google Inc. All Rights Reserved.
Copyright 2016 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -16,19 +16,19 @@ limitations under the License.
package bttest_test
import (
"context"
"fmt"
"log"
"cloud.google.com/go/bigtable"
"cloud.google.com/go/bigtable/bttest"
"golang.org/x/net/context"
"google.golang.org/api/option"
"google.golang.org/grpc"
)
func ExampleNewServer() {
srv, err := bttest.NewServer("127.0.0.1:0")
srv, err := bttest.NewServer("localhost:0")
if err != nil {
log.Fatalln(err)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,106 @@
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bttest
import (
"context"
"github.com/golang/protobuf/ptypes/empty"
btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2"
iampb "google.golang.org/genproto/googleapis/iam/v1"
"google.golang.org/genproto/googleapis/longrunning"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var _ btapb.BigtableInstanceAdminServer = (*server)(nil)
var errUnimplemented = status.Error(codes.Unimplemented, "unimplemented feature")
func (s *server) CreateInstance(ctx context.Context, req *btapb.CreateInstanceRequest) (*longrunning.Operation, error) {
return nil, errUnimplemented
}
func (s *server) GetInstance(ctx context.Context, req *btapb.GetInstanceRequest) (*btapb.Instance, error) {
return nil, errUnimplemented
}
func (s *server) ListInstances(ctx context.Context, req *btapb.ListInstancesRequest) (*btapb.ListInstancesResponse, error) {
return nil, errUnimplemented
}
func (s *server) UpdateInstance(ctx context.Context, req *btapb.Instance) (*btapb.Instance, error) {
return nil, errUnimplemented
}
func (s *server) PartialUpdateInstance(ctx context.Context, req *btapb.PartialUpdateInstanceRequest) (*longrunning.Operation, error) {
return nil, errUnimplemented
}
func (s *server) DeleteInstance(ctx context.Context, req *btapb.DeleteInstanceRequest) (*empty.Empty, error) {
return nil, errUnimplemented
}
func (s *server) CreateCluster(ctx context.Context, req *btapb.CreateClusterRequest) (*longrunning.Operation, error) {
return nil, errUnimplemented
}
func (s *server) GetCluster(ctx context.Context, req *btapb.GetClusterRequest) (*btapb.Cluster, error) {
return nil, errUnimplemented
}
func (s *server) ListClusters(ctx context.Context, req *btapb.ListClustersRequest) (*btapb.ListClustersResponse, error) {
return nil, errUnimplemented
}
func (s *server) UpdateCluster(ctx context.Context, req *btapb.Cluster) (*longrunning.Operation, error) {
return nil, errUnimplemented
}
func (s *server) DeleteCluster(ctx context.Context, req *btapb.DeleteClusterRequest) (*empty.Empty, error) {
return nil, errUnimplemented
}
func (s *server) CreateAppProfile(ctx context.Context, req *btapb.CreateAppProfileRequest) (*btapb.AppProfile, error) {
return nil, errUnimplemented
}
func (s *server) GetAppProfile(ctx context.Context, req *btapb.GetAppProfileRequest) (*btapb.AppProfile, error) {
return nil, errUnimplemented
}
func (s *server) ListAppProfiles(ctx context.Context, req *btapb.ListAppProfilesRequest) (*btapb.ListAppProfilesResponse, error) {
return nil, errUnimplemented
}
func (s *server) UpdateAppProfile(ctx context.Context, req *btapb.UpdateAppProfileRequest) (*longrunning.Operation, error) {
return nil, errUnimplemented
}
func (s *server) DeleteAppProfile(ctx context.Context, req *btapb.DeleteAppProfileRequest) (*empty.Empty, error) {
return nil, errUnimplemented
}
func (s *server) GetIamPolicy(ctx context.Context, req *iampb.GetIamPolicyRequest) (*iampb.Policy, error) {
return nil, errUnimplemented
}
func (s *server) SetIamPolicy(ctx context.Context, req *iampb.SetIamPolicyRequest) (*iampb.Policy, error) {
return nil, errUnimplemented
}
func (s *server) TestIamPermissions(ctx context.Context, req *iampb.TestIamPermissionsRequest) (*iampb.TestIamPermissionsResponse, error) {
return nil, errUnimplemented
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,4 @@
// Copyright 2016 Google Inc. All Rights Reserved.
// Copyright 2016 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -17,6 +17,10 @@ package main
import (
"testing"
"time"
"cloud.google.com/go/bigtable"
"cloud.google.com/go/internal/testutil"
"github.com/google/go-cmp/cmp"
)
func TestParseDuration(t *testing.T) {
@@ -57,3 +61,113 @@ func TestParseDuration(t *testing.T) {
}
}
}
func TestParseArgs(t *testing.T) {
got, err := parseArgs([]string{"a=1", "b=2"}, []string{"a", "b"})
if err != nil {
t.Fatal(err)
}
want := map[string]string{"a": "1", "b": "2"}
if !testutil.Equal(got, want) {
t.Fatalf("got %v, want %v", got, want)
}
if _, err := parseArgs([]string{"a1"}, []string{"a1"}); err == nil {
t.Error("malformed: got nil, want error")
}
if _, err := parseArgs([]string{"a=1"}, []string{"b"}); err == nil {
t.Error("invalid: got nil, want error")
}
}
func TestParseColumnsFilter(t *testing.T) {
tests := []struct {
in string
out bigtable.Filter
fail bool
}{
{
in: "columnA",
out: bigtable.ColumnFilter("columnA"),
},
{
in: "familyA:columnA",
out: bigtable.ChainFilters(bigtable.FamilyFilter("familyA"), bigtable.ColumnFilter("columnA")),
},
{
in: "columnA,columnB",
out: bigtable.InterleaveFilters(bigtable.ColumnFilter("columnA"), bigtable.ColumnFilter("columnB")),
},
{
in: "familyA:columnA,columnB",
out: bigtable.InterleaveFilters(
bigtable.ChainFilters(bigtable.FamilyFilter("familyA"), bigtable.ColumnFilter("columnA")),
bigtable.ColumnFilter("columnB"),
),
},
{
in: "columnA,familyB:columnB",
out: bigtable.InterleaveFilters(
bigtable.ColumnFilter("columnA"),
bigtable.ChainFilters(bigtable.FamilyFilter("familyB"), bigtable.ColumnFilter("columnB")),
),
},
{
in: "familyA:columnA,familyB:columnB",
out: bigtable.InterleaveFilters(
bigtable.ChainFilters(bigtable.FamilyFilter("familyA"), bigtable.ColumnFilter("columnA")),
bigtable.ChainFilters(bigtable.FamilyFilter("familyB"), bigtable.ColumnFilter("columnB")),
),
},
{
in: "familyA:",
out: bigtable.FamilyFilter("familyA"),
},
{
in: ":columnA",
out: bigtable.ColumnFilter("columnA"),
},
{
in: ",:columnA,,familyB:columnB,",
out: bigtable.InterleaveFilters(
bigtable.ColumnFilter("columnA"),
bigtable.ChainFilters(bigtable.FamilyFilter("familyB"), bigtable.ColumnFilter("columnB")),
),
},
{
in: "familyA:columnA:cellA",
fail: true,
},
{
in: "familyA::columnA",
fail: true,
},
}
for _, tc := range tests {
got, err := parseColumnsFilter(tc.in)
if !tc.fail && err != nil {
t.Errorf("parseColumnsFilter(%q) unexpectedly failed: %v", tc.in, err)
continue
}
if tc.fail && err == nil {
t.Errorf("parseColumnsFilter(%q) did not fail", tc.in)
continue
}
if tc.fail {
continue
}
var cmpOpts cmp.Options
cmpOpts =
append(
cmpOpts,
cmp.AllowUnexported(bigtable.ChainFilters([]bigtable.Filter{}...)),
cmp.AllowUnexported(bigtable.InterleaveFilters([]bigtable.Filter{}...)))
if !cmp.Equal(got, tc.out, cmpOpts) {
t.Errorf("parseColumnsFilter(%q) = %v, want %v", tc.in, got, tc.out)
}
}
}

View File

@@ -1,4 +1,4 @@
// Copyright 2016 Google Inc. All Rights Reserved.
// Copyright 2016 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -14,10 +14,12 @@
// DO NOT EDIT. THIS IS AUTOMATICALLY GENERATED.
// Run "go generate" to regenerate.
//go:generate go run cbt.go -o cbtdoc.go doc
//go:generate go run cbt.go gcpolicy.go -o cbtdoc.go doc
/*
Cbt is a tool for doing basic interactions with Cloud Bigtable.
Cbt is a tool for doing basic interactions with Cloud Bigtable. To learn how to
install the cbt tool, see the
[cbt overview](https://cloud.google.com/bigtable/docs/cbt-overview).
Usage:
@@ -26,33 +28,74 @@ Usage:
The commands are:
count Count rows in a table
createinstance Create an instance with an initial cluster
createcluster Create a cluster in the configured instance
createfamily Create a column family
createtable Create a table
updatecluster Update a cluster in the configured instance
deleteinstance Delete an instance
deletecluster Delete a cluster from the configured instance
deletecolumn Delete all cells in a column
deletefamily Delete a column family
deleterow Delete a row
deletetable Delete a table
doc Print godoc-suitable documentation for cbt
help Print help text
listinstances List instances in a project
listclusters List clusters in an instance
lookup Read from a single row
ls List tables and column families
mddoc Print documentation for cbt in Markdown format
read Read rows
set Set value of a cell
setgcpolicy Set the GC policy for a column family
waitforreplication Block until all the completed writes have been replicated to all the clusters
createtablefromsnapshot Create a table from a snapshot (snapshots alpha)
createsnapshot Create a snapshot from a source table (snapshots alpha)
listsnapshots List snapshots in a cluster (snapshots alpha)
getsnapshot Get snapshot info (snapshots alpha)
deletesnapshot Delete snapshot in a cluster (snapshots alpha)
version Print the current cbt version
createappprofile Creates app profile for an instance
getappprofile Reads app profile for an instance
listappprofile Lists app profile for an instance
updateappprofile Updates app profile for an instance
deleteappprofile Deletes app profile for an instance
Use "cbt help <command>" for more information about a command.
The options are:
-project string
project ID
project ID, if unset uses gcloud configured project
-instance string
Cloud Bigtable instance
-creds string
if set, use application credentials in this file
Alpha features are not currently available to most Cloud Bigtable customers. The
features might be changed in backward-incompatible ways and are not recommended
for production use. They are not subject to any SLA or deprecation policy.
Note: cbt does not support specifying arbitrary bytes on the command line for
any value that Bigtable otherwise supports (e.g., row key, column qualifier,
etc.).
For convenience, values of the -project, -instance, -creds,
-admin-endpoint and -data-endpoint flags may be specified in
~/.cbtrc in this format:
project = my-project-123
instance = my-instance
creds = path-to-account-key.json
admin-endpoint = hostname:port
data-endpoint = hostname:port
All values are optional, and all will be overridden by flags.
Count rows in a table
Usage:
@@ -61,6 +104,34 @@ Usage:
Create an instance with an initial cluster
Usage:
cbt createinstance <instance-id> <display-name> <cluster-id> <zone> <num-nodes> <storage type>
instance-id Permanent, unique id for the instance
display-name Description of the instance
cluster-id Permanent, unique id for the cluster in the instance
zone The zone in which to create the cluster
num-nodes The number of nodes to create
storage-type SSD or HDD
Create a cluster in the configured instance
Usage:
cbt createcluster <cluster-id> <zone> <num-nodes> <storage type>
cluster-id Permanent, unique id for the cluster in the instance
zone The zone in which to create the cluster
num-nodes The number of nodes to create
storage-type SSD or HDD
Create a column family
Usage:
@@ -72,7 +143,47 @@ Usage:
Create a table
Usage:
cbt createtable <table>
cbt createtable <table> [families=family[:gcpolicy],...] [splits=split,...]
families: Column families and their associated GC policies. For gcpolicy,
see "setgcpolicy".
Example: families=family1:maxage=1w,family2:maxversions=1
splits: Row key to be used to initially split the table
Update a cluster in the configured instance
Usage:
cbt updatecluster <cluster-id> [num-nodes=num-nodes]
cluster-id Permanent, unique id for the cluster in the instance
num-nodes The number of nodes to update to
Delete an instance
Usage:
cbt deleteinstance <instance>
Delete a cluster from the configured instance
Usage:
cbt deletecluster <cluster>
Delete all cells in a column
Usage:
cbt deletecolumn <table> <row> <family> <column> [app-profile=<app profile id>]
app-profile=<app profile id> The app profile id to use for the request
@@ -88,7 +199,9 @@ Usage:
Delete a row
Usage:
cbt deleterow <table> <row>
cbt deleterow <table> <row> [app-profile=<app profile id>]
app-profile=<app profile id> The app profile id to use for the request
@@ -125,10 +238,22 @@ Usage:
List clusters in an instance
Usage:
cbt listclusters
Read from a single row
Usage:
cbt lookup <table> <row>
cbt lookup <table> <row> [columns=[family]:[qualifier],...] [cells-per-column=<n>] [app-profile=<app profile id>]
columns=[family]:[qualifier],... Read only these columns, comma-separated
cells-per-column=<n> Read only this many cells per column
app-profile=<app profile id> The app profile id to use for the request
@@ -153,11 +278,15 @@ Usage:
Read rows
Usage:
cbt read <table> [start=<row>] [end=<row>] [prefix=<prefix>] [count=<n>]
start=<row> Start reading at this row
end=<row> Stop reading before this row
prefix=<prefix> Read rows with this prefix
count=<n> Read only this many rows
cbt read <table> [start=<row>] [end=<row>] [prefix=<prefix>] [regex=<regex>] [columns=[family]:[qualifier],...] [count=<n>] [cells-per-column=<n>] [app-profile=<app profile id>]
start=<row> Start reading at this row
end=<row> Stop reading before this row
prefix=<prefix> Read rows with this prefix
regex=<regex> Read rows with keys matching this regex
columns=[family]:[qualifier],... Read only these columns, comma-separated
count=<n> Read only this many rows
cells-per-column=<n> Read only this many cells per column
app-profile=<app profile id> The app profile id to use for the request
@@ -166,7 +295,8 @@ Usage:
Set value of a cell
Usage:
cbt set <table> <row> family:column=val[@ts] ...
cbt set <table> <row> [app-profile=<app profile id>] family:column=val[@ts] ...
app-profile=<app profile id> The app profile id to use for the request
family:column=val[@ts] may be repeated to set multiple cells.
ts is an optional integer timestamp.
@@ -179,7 +309,7 @@ Usage:
Set the GC policy for a column family
Usage:
cbt setgcpolicy <table> <family> ( maxage=<d> | maxversions=<n> )
cbt setgcpolicy <table> <family> ((maxage=<d> | maxversions=<n>) [(and|or) (maxage=<d> | maxversions=<n>),...] | never)
maxage=<d> Maximum timestamp age to preserve (e.g. "1h", "4d")
maxversions=<n> Maximum number of versions to preserve
@@ -187,5 +317,109 @@ Usage:
Block until all the completed writes have been replicated to all the clusters
Usage:
cbt waitforreplication <table>
Create a table from a snapshot (snapshots alpha)
Usage:
cbt createtablefromsnapshot <table> <cluster> <snapshot>
table The name of the table to create
cluster The cluster where the snapshot is located
snapshot The snapshot to restore
Create a snapshot from a source table (snapshots alpha)
Usage:
cbt createsnapshot <cluster> <snapshot> <table> [ttl=<d>]
[ttl=<d>] Lifespan of the snapshot (e.g. "1h", "4d")
List snapshots in a cluster (snapshots alpha)
Usage:
cbt listsnapshots [<cluster>]
Get snapshot info (snapshots alpha)
Usage:
cbt getsnapshot <cluster> <snapshot>
Delete snapshot in a cluster (snapshots alpha)
Usage:
cbt deletesnapshot <cluster> <snapshot>
Print the current cbt version
Usage:
cbt version
Creates app profile for an instance
Usage:
usage: cbt createappprofile <instance-id> <profile-id> <description> (route-any | [ route-to=<cluster-id> : transactional-writes]) [optional flag]
optional flags may be `force`
Reads app profile for an instance
Usage:
cbt getappprofile <instance-id> <profile-id>
Lists app profile for an instance
Usage:
cbt listappprofile <instance-id>
Updates app profile for an instance
Usage:
usage: cbt updateappprofile <instance-id> <profile-id> <description>(route-any | [ route-to=<cluster-id> : transactional-writes]) [optional flag]
optional flags may be `force`
Deletes app profile for an instance
Usage:
cbt deleteappprofile <instance-id> <profile-id>
*/
package main

215
vendor/cloud.google.com/go/bigtable/cmd/cbt/gcpolicy.go generated vendored Normal file
View File

@@ -0,0 +1,215 @@
/*
Copyright 2015 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"bytes"
"errors"
"fmt"
"io"
"strconv"
"strings"
"unicode"
"cloud.google.com/go/bigtable"
)
// Parse a GC policy. Valid policies include
// never
// maxage = 5d
// maxversions = 3
// maxage = 5d || maxversions = 3
// maxage=30d || (maxage=3d && maxversions=100)
func parseGCPolicy(s string) (bigtable.GCPolicy, error) {
if strings.TrimSpace(s) == "never" {
return bigtable.NoGcPolicy(), nil
}
r := strings.NewReader(s)
p, err := parsePolicyExpr(r)
if err != nil {
return nil, fmt.Errorf("invalid GC policy: %v", err)
}
tok, err := getToken(r)
if err != nil {
return nil, err
}
if tok != "" {
return nil, fmt.Errorf("invalid GC policy: want end of input, got %q", tok)
}
return p, nil
}
// expr ::= term (op term)*
// op ::= "and" | "or" | "&&" | "||"
func parsePolicyExpr(r io.RuneScanner) (bigtable.GCPolicy, error) {
policy, err := parsePolicyTerm(r)
if err != nil {
return nil, err
}
for {
tok, err := getToken(r)
if err != nil {
return nil, err
}
var f func(...bigtable.GCPolicy) bigtable.GCPolicy
switch tok {
case "and", "&&":
f = bigtable.IntersectionPolicy
case "or", "||":
f = bigtable.UnionPolicy
default:
ungetToken(tok)
return policy, nil
}
p2, err := parsePolicyTerm(r)
if err != nil {
return nil, err
}
policy = f(policy, p2)
}
}
// term ::= "maxage" "=" duration | "maxversions" "=" int | "(" policy ")"
func parsePolicyTerm(r io.RuneScanner) (bigtable.GCPolicy, error) {
tok, err := getToken(r)
if err != nil {
return nil, err
}
switch tok {
case "":
return nil, errors.New("empty GC policy term")
case "maxage", "maxversions":
if err := expectToken(r, "="); err != nil {
return nil, err
}
tok2, err := getToken(r)
if err != nil {
return nil, err
}
if tok2 == "" {
return nil, errors.New("expected a token after '='")
}
if tok == "maxage" {
dur, err := parseDuration(tok2)
if err != nil {
return nil, err
}
return bigtable.MaxAgePolicy(dur), nil
}
n, err := strconv.ParseUint(tok2, 10, 16)
if err != nil {
return nil, err
}
return bigtable.MaxVersionsPolicy(int(n)), nil
case "(":
p, err := parsePolicyExpr(r)
if err != nil {
return nil, err
}
if err := expectToken(r, ")"); err != nil {
return nil, err
}
return p, nil
default:
return nil, fmt.Errorf("unexpected token: %q", tok)
}
}
func expectToken(r io.RuneScanner, want string) error {
got, err := getToken(r)
if err != nil {
return err
}
if got != want {
return fmt.Errorf("expected %q, saw %q", want, got)
}
return nil
}
const noToken = "_" // empty token is valid, so use "_" instead
// If not noToken, getToken will return this instead of reading a new token
// from the input.
var ungotToken = noToken
// getToken extracts the first token from the input. Valid tokens include
// any sequence of letters and digits, and these symbols: &&, ||, =, ( and ).
// getToken returns ("", nil) at end of input.
func getToken(r io.RuneScanner) (string, error) {
if ungotToken != noToken {
t := ungotToken
ungotToken = noToken
return t, nil
}
var err error
// Skip leading whitespace.
c := ' '
for unicode.IsSpace(c) {
c, _, err = r.ReadRune()
if err == io.EOF {
return "", nil
}
if err != nil {
return "", err
}
}
switch {
case c == '=' || c == '(' || c == ')':
return string(c), nil
case c == '&' || c == '|':
c2, _, err := r.ReadRune()
if err != nil && err != io.EOF {
return "", err
}
if c != c2 {
return "", fmt.Errorf("expected %c%c", c, c)
}
return string([]rune{c, c}), nil
case unicode.IsLetter(c) || unicode.IsDigit(c):
// Collect an alphanumeric token.
var b bytes.Buffer
for unicode.IsLetter(c) || unicode.IsDigit(c) {
b.WriteRune(c)
c, _, err = r.ReadRune()
if err == io.EOF {
break
}
if err != nil {
return "", err
}
}
r.UnreadRune()
return b.String(), nil
default:
return "", fmt.Errorf("bad rune %q", c)
}
}
// "unget" a token so the next call to getToken will return it.
func ungetToken(tok string) {
if ungotToken != noToken {
panic("ungetToken called twice")
}
ungotToken = tok
}

View File

@@ -0,0 +1,196 @@
/*
Copyright 2015 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"strings"
"testing"
"time"
"cloud.google.com/go/bigtable"
"github.com/google/go-cmp/cmp"
)
func TestParseGCPolicy(t *testing.T) {
for _, test := range []struct {
in string
want bigtable.GCPolicy
}{
{
"never",
bigtable.NoGcPolicy(),
},
{
"maxage=3h",
bigtable.MaxAgePolicy(3 * time.Hour),
},
{
"maxversions=2",
bigtable.MaxVersionsPolicy(2),
},
{
"maxversions=2 and maxage=1h",
bigtable.IntersectionPolicy(bigtable.MaxVersionsPolicy(2), bigtable.MaxAgePolicy(time.Hour)),
},
{
"(((maxversions=2 and (maxage=1h))))",
bigtable.IntersectionPolicy(bigtable.MaxVersionsPolicy(2), bigtable.MaxAgePolicy(time.Hour)),
},
{
"maxversions=7 or maxage=8h",
bigtable.UnionPolicy(bigtable.MaxVersionsPolicy(7), bigtable.MaxAgePolicy(8*time.Hour)),
},
{
"maxversions = 7||maxage = 8h",
bigtable.UnionPolicy(bigtable.MaxVersionsPolicy(7), bigtable.MaxAgePolicy(8*time.Hour)),
},
{
"maxversions=7||maxage=8h",
bigtable.UnionPolicy(bigtable.MaxVersionsPolicy(7), bigtable.MaxAgePolicy(8*time.Hour)),
},
{
"maxage=30d || (maxage=3d && maxversions=100)",
bigtable.UnionPolicy(
bigtable.MaxAgePolicy(30*24*time.Hour),
bigtable.IntersectionPolicy(
bigtable.MaxAgePolicy(3*24*time.Hour),
bigtable.MaxVersionsPolicy(100))),
},
{
"maxage=30d || (maxage=3d && maxversions=100) || maxversions=7",
bigtable.UnionPolicy(
bigtable.UnionPolicy(
bigtable.MaxAgePolicy(30*24*time.Hour),
bigtable.IntersectionPolicy(
bigtable.MaxAgePolicy(3*24*time.Hour),
bigtable.MaxVersionsPolicy(100))),
bigtable.MaxVersionsPolicy(7)),
},
{
// && and || have same precedence, left associativity
"maxage=1h && maxage=2h || maxage=3h",
bigtable.UnionPolicy(
bigtable.IntersectionPolicy(
bigtable.MaxAgePolicy(1*time.Hour),
bigtable.MaxAgePolicy(2*time.Hour)),
bigtable.MaxAgePolicy(3*time.Hour)),
},
} {
got, err := parseGCPolicy(test.in)
if err != nil {
t.Errorf("%s: %v", test.in, err)
continue
}
if !cmp.Equal(got, test.want, cmp.AllowUnexported(bigtable.IntersectionPolicy(), bigtable.UnionPolicy())) {
t.Errorf("%s: got %+v, want %+v", test.in, got, test.want)
}
}
}
func TestParseGCPolicyErrors(t *testing.T) {
for _, in := range []string{
"",
"a",
"b = 1h",
"c = 1",
"maxage=1", // need duration
"maxversions=1h", // need int
"maxage",
"maxversions",
"never=never",
"maxversions=1 && never",
"(((maxage=1h))",
"((maxage=1h)))",
"maxage=30d || ((maxage=3d && maxversions=100)",
"maxversions = 3 and",
} {
_, err := parseGCPolicy(in)
if err == nil {
t.Errorf("%s: got nil, want error", in)
}
}
}
func TestTokenizeGCPolicy(t *testing.T) {
for _, test := range []struct {
in string
want []string
}{
{
"maxage=5d",
[]string{"maxage", "=", "5d"},
},
{
"maxage = 5d",
[]string{"maxage", "=", "5d"},
},
{
"maxage=5d or maxversions=5",
[]string{"maxage", "=", "5d", "or", "maxversions", "=", "5"},
},
{
"maxage=5d || (maxversions=5)",
[]string{"maxage", "=", "5d", "||", "(", "maxversions", "=", "5", ")"},
},
{
"maxage=5d||( maxversions=5 )",
[]string{"maxage", "=", "5d", "||", "(", "maxversions", "=", "5", ")"},
},
} {
got, err := tokenizeGCPolicy(test.in)
if err != nil {
t.Errorf("%s: %v", test.in, err)
continue
}
if diff := cmp.Diff(got, test.want); diff != "" {
t.Errorf("%s: %s", test.in, diff)
}
}
}
func TestTokenizeGCPolicyErrors(t *testing.T) {
for _, in := range []string{
"a &",
"a & b",
"a &x b",
"a |",
"a | b",
"a |& b",
"a % b",
} {
_, err := tokenizeGCPolicy(in)
if err == nil {
t.Errorf("%s: got nil, want error", in)
}
}
}
func tokenizeGCPolicy(s string) ([]string, error) {
var tokens []string
r := strings.NewReader(s)
for {
tok, err := getToken(r)
if err != nil {
return nil, err
}
if tok == "" {
break
}
tokens = append(tokens, tok)
}
return tokens, nil
}

View File

@@ -1,4 +1,4 @@
// Copyright 2016 Google Inc. All Rights Reserved.
// Copyright 2016 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@ import (
"log"
"cloud.google.com/go/bigtable/bttest"
"google.golang.org/grpc"
)
var (
@@ -30,9 +31,18 @@ var (
port = flag.Int("port", 9000, "the port number to bind to on the local machine")
)
const (
maxMsgSize = 256 * 1024 * 1024 // 256 MiB
)
func main() {
grpc.EnableTracing = false
flag.Parse()
srv, err := bttest.NewServer(fmt.Sprintf("%s:%d", *host, *port))
opts := []grpc.ServerOption{
grpc.MaxRecvMsgSize(maxMsgSize),
grpc.MaxSendMsgSize(maxMsgSize),
}
srv, err := bttest.NewServer(fmt.Sprintf("%s:%d", *host, *port), opts...)
if err != nil {
log.Fatalf("failed to start emulator: %v", err)
}

View File

@@ -1,5 +1,5 @@
/*
Copyright 2015 Google Inc. All Rights Reserved.
Copyright 2015 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -21,45 +21,48 @@ package main
import (
"bytes"
"context"
"flag"
"fmt"
"log"
"math/rand"
"os"
"os/signal"
"sync"
"sync/atomic"
"time"
"cloud.google.com/go/bigtable"
"cloud.google.com/go/bigtable/internal/cbtrc"
"cloud.google.com/go/bigtable/internal/cbtconfig"
"cloud.google.com/go/bigtable/internal/stat"
"golang.org/x/net/context"
"google.golang.org/api/option"
"google.golang.org/grpc"
)
var (
runFor = flag.Duration("run_for", 5*time.Second, "how long to run the load test for")
runFor = flag.Duration("run_for", 5*time.Second,
"how long to run the load test for; 0 to run forever until SIGTERM")
scratchTable = flag.String("scratch_table", "loadtest-scratch", "name of table to use; should not already exist")
csvOutput = flag.String("csv_output", "",
"output path for statistics in .csv format. If this file already exists it will be overwritten.")
poolSize = flag.Int("pool_size", 1, "size of the gRPC connection pool to use for the data client")
reqCount = flag.Int("req_count", 100, "number of concurrent requests")
config *cbtrc.Config
config *cbtconfig.Config
client *bigtable.Client
adminClient *bigtable.AdminClient
)
func main() {
var err error
config, err = cbtrc.Load()
config, err = cbtconfig.Load()
if err != nil {
log.Fatal(err)
}
config.RegisterFlags()
flag.Parse()
if err := config.CheckFlags(); err != nil {
if err := config.CheckFlags(cbtconfig.ProjectAndInstanceRequired); err != nil {
log.Fatal(err)
}
if config.Creds != "" {
@@ -72,7 +75,12 @@ func main() {
var options []option.ClientOption
if *poolSize > 1 {
options = append(options, option.WithGRPCConnectionPool(*poolSize))
options = append(options,
option.WithGRPCConnectionPool(*poolSize),
// TODO(grpc/grpc-go#1388) using connection pool without WithBlock
// can cause RPCs to fail randomly. We can delete this after the issue is fixed.
option.WithGRPCDialOption(grpc.WithBlock()))
}
var csvFile *os.File
@@ -99,22 +107,33 @@ func main() {
// Create a scratch table.
log.Printf("Setting up scratch table...")
if err := adminClient.CreateTable(context.Background(), *scratchTable); err != nil {
log.Fatalf("Making scratch table %q: %v", *scratchTable, err)
tblConf := bigtable.TableConf{
TableID: *scratchTable,
Families: map[string]bigtable.GCPolicy{"f": bigtable.MaxVersionsPolicy(1)},
}
if err := adminClient.CreateColumnFamily(context.Background(), *scratchTable, "f"); err != nil {
log.Fatalf("Making scratch table column family: %v", err)
if err := adminClient.CreateTableFromConf(context.Background(), &tblConf); err != nil {
log.Fatalf("Making scratch table %q: %v", *scratchTable, err)
}
// Upon a successful run, delete the table. Don't bother checking for errors.
defer adminClient.DeleteTable(context.Background(), *scratchTable)
// Also delete the table on SIGTERM.
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
s := <-c
log.Printf("Caught %v, cleaning scratch table.", s)
_ = adminClient.DeleteTable(context.Background(), *scratchTable)
os.Exit(1)
}()
log.Printf("Starting load test... (run for %v)", *runFor)
tbl := client.Open(*scratchTable)
sem := make(chan int, *reqCount) // limit the number of requests happening at once
var reads, writes stats
stopTime := time.Now().Add(*runFor)
var wg sync.WaitGroup
for time.Now().Before(stopTime) {
for time.Now().Before(stopTime) || *runFor == 0 {
sem <- 1
wg.Add(1)
go func() {

View File

@@ -1,5 +1,5 @@
/*
Copyright 2016 Google Inc. All Rights Reserved.
Copyright 2016 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@ package main
import (
"bytes"
"context"
"flag"
"fmt"
"log"
@@ -33,9 +34,8 @@ import (
"time"
"cloud.google.com/go/bigtable"
"cloud.google.com/go/bigtable/internal/cbtrc"
"cloud.google.com/go/bigtable/internal/cbtconfig"
"cloud.google.com/go/bigtable/internal/stat"
"golang.org/x/net/context"
)
var (
@@ -43,7 +43,7 @@ var (
numScans = flag.Int("concurrent_scans", 1, "number of concurrent scans")
rowLimit = flag.Int("row_limit", 10000, "max number of records per scan")
config *cbtrc.Config
config *cbtconfig.Config
client *bigtable.Client
)
@@ -54,14 +54,14 @@ func main() {
}
var err error
config, err = cbtrc.Load()
config, err = cbtconfig.Load()
if err != nil {
log.Fatal(err)
}
config.RegisterFlags()
flag.Parse()
if err := config.CheckFlags(); err != nil {
if err := config.CheckFlags(cbtconfig.ProjectAndInstanceRequired); err != nil {
log.Fatal(err)
}
if config.Creds != "" {

View File

@@ -1,5 +1,5 @@
/*
Copyright 2015 Google Inc. All Rights Reserved.
Copyright 2015 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -19,6 +19,10 @@ Package bigtable is an API to Google Cloud Bigtable.
See https://cloud.google.com/bigtable/docs/ for general product documentation.
See https://godoc.org/cloud.google.com/go for authentication, timeouts,
connection pooling and similar aspects of this package.
Setup and Credentials
Use NewClient or NewAdminClient to create a client that can be used to access
@@ -31,7 +35,7 @@ is the simplest option. Those credentials will be used by default when NewClient
To use alternate credentials, pass them to NewClient or NewAdminClient using option.WithTokenSource.
For instance, you can use service account credentials by visiting
https://cloud.google.com/console/project/MYPROJECT/apiui/credential,
https://cloud.google.com/console/project/_/apiui/credential,
creating a new OAuth "Client ID", storing the JSON key somewhere accessible, and writing
jsonKey, err := ioutil.ReadFile(pathToKeyFile)
...
@@ -112,7 +116,7 @@ const (
// clientUserAgent identifies the version of this package.
// It should be bumped upon significant changes only.
const clientUserAgent = "cbt-go/20160628"
const clientUserAgent = "cbt-go/20180601"
// resourcePrefixHeader is the name of the metadata header used to indicate
// the resource being operated on.

215
vendor/cloud.google.com/go/bigtable/export_test.go generated vendored Normal file
View File

@@ -0,0 +1,215 @@
/*
Copyright 2016 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bigtable
import (
"context"
"errors"
"flag"
"fmt"
"strings"
"time"
"cloud.google.com/go/bigtable/bttest"
"google.golang.org/api/option"
"google.golang.org/grpc"
)
var legacyUseProd string
var integrationConfig IntegrationTestConfig
func init() {
c := &integrationConfig
flag.BoolVar(&c.UseProd, "it.use-prod", false, "Use remote bigtable instead of local emulator")
flag.StringVar(&c.AdminEndpoint, "it.admin-endpoint", "", "Admin api host and port")
flag.StringVar(&c.DataEndpoint, "it.data-endpoint", "", "Data api host and port")
flag.StringVar(&c.Project, "it.project", "", "Project to use for integration test")
flag.StringVar(&c.Instance, "it.instance", "", "Bigtable instance to use")
flag.StringVar(&c.Cluster, "it.cluster", "", "Bigtable cluster to use")
flag.StringVar(&c.Table, "it.table", "", "Bigtable table to create")
// Backwards compat
flag.StringVar(&legacyUseProd, "use_prod", "", `DEPRECATED: if set to "proj,instance,table", run integration test against production`)
}
// IntegrationTestConfig contains parameters to pick and setup a IntegrationEnv for testing
type IntegrationTestConfig struct {
UseProd bool
AdminEndpoint string
DataEndpoint string
Project string
Instance string
Cluster string
Table string
}
// IntegrationEnv represents a testing environment.
// The environment can be implemented using production or an emulator
type IntegrationEnv interface {
Config() IntegrationTestConfig
NewAdminClient() (*AdminClient, error)
// NewInstanceAdminClient will return nil if instance administration is unsupported in this environment
NewInstanceAdminClient() (*InstanceAdminClient, error)
NewClient() (*Client, error)
Close()
}
// NewIntegrationEnv creates a new environment based on the command line args
func NewIntegrationEnv() (IntegrationEnv, error) {
c := integrationConfig
if legacyUseProd != "" {
fmt.Println("WARNING: using legacy commandline arg -use_prod, please switch to -it.*")
parts := strings.SplitN(legacyUseProd, ",", 3)
c.UseProd = true
c.Project = parts[0]
c.Instance = parts[1]
c.Table = parts[2]
}
if integrationConfig.UseProd {
return NewProdEnv(c)
}
return NewEmulatedEnv(c)
}
// EmulatedEnv encapsulates the state of an emulator
type EmulatedEnv struct {
config IntegrationTestConfig
server *bttest.Server
}
// NewEmulatedEnv builds and starts the emulator based environment
func NewEmulatedEnv(config IntegrationTestConfig) (*EmulatedEnv, error) {
srv, err := bttest.NewServer("localhost:0", grpc.MaxRecvMsgSize(200<<20), grpc.MaxSendMsgSize(100<<20))
if err != nil {
return nil, err
}
if config.Project == "" {
config.Project = "project"
}
if config.Instance == "" {
config.Instance = "instance"
}
if config.Table == "" {
config.Table = "mytable"
}
config.AdminEndpoint = srv.Addr
config.DataEndpoint = srv.Addr
env := &EmulatedEnv{
config: config,
server: srv,
}
return env, nil
}
// Close stops & cleans up the emulator
func (e *EmulatedEnv) Close() {
e.server.Close()
}
// Config gets the config used to build this environment
func (e *EmulatedEnv) Config() IntegrationTestConfig {
return e.config
}
// NewAdminClient builds a new connected admin client for this environment
func (e *EmulatedEnv) NewAdminClient() (*AdminClient, error) {
timeout := 20 * time.Second
ctx, _ := context.WithTimeout(context.Background(), timeout)
conn, err := grpc.Dial(e.server.Addr, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
return nil, err
}
return NewAdminClient(ctx, e.config.Project, e.config.Instance, option.WithGRPCConn(conn))
}
// NewInstanceAdminClient returns nil for the emulated environment since the API is not implemented.
func (e *EmulatedEnv) NewInstanceAdminClient() (*InstanceAdminClient, error) {
return nil, nil
}
// NewClient builds a new connected data client for this environment
func (e *EmulatedEnv) NewClient() (*Client, error) {
timeout := 20 * time.Second
ctx, _ := context.WithTimeout(context.Background(), timeout)
conn, err := grpc.Dial(e.server.Addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(100<<20), grpc.MaxCallRecvMsgSize(100<<20)))
if err != nil {
return nil, err
}
return NewClient(ctx, e.config.Project, e.config.Instance, option.WithGRPCConn(conn))
}
// ProdEnv encapsulates the state necessary to connect to the external Bigtable service
type ProdEnv struct {
config IntegrationTestConfig
}
// NewProdEnv builds the environment representation
func NewProdEnv(config IntegrationTestConfig) (*ProdEnv, error) {
if config.Project == "" {
return nil, errors.New("Project not set")
}
if config.Instance == "" {
return nil, errors.New("Instance not set")
}
if config.Table == "" {
return nil, errors.New("Table not set")
}
return &ProdEnv{config}, nil
}
// Close is a no-op for production environments
func (e *ProdEnv) Close() {}
// Config gets the config used to build this environment
func (e *ProdEnv) Config() IntegrationTestConfig {
return e.config
}
// NewAdminClient builds a new connected admin client for this environment
func (e *ProdEnv) NewAdminClient() (*AdminClient, error) {
var clientOpts []option.ClientOption
if endpoint := e.config.AdminEndpoint; endpoint != "" {
clientOpts = append(clientOpts, option.WithEndpoint(endpoint))
}
return NewAdminClient(context.Background(), e.config.Project, e.config.Instance, clientOpts...)
}
// NewInstanceAdminClient returns a new connected instance admin client for this environment
func (e *ProdEnv) NewInstanceAdminClient() (*InstanceAdminClient, error) {
var clientOpts []option.ClientOption
if endpoint := e.config.AdminEndpoint; endpoint != "" {
clientOpts = append(clientOpts, option.WithEndpoint(endpoint))
}
return NewInstanceAdminClient(context.Background(), e.config.Project, clientOpts...)
}
// NewClient builds a connected data client for this environment
func (e *ProdEnv) NewClient() (*Client, error) {
var clientOpts []option.ClientOption
if endpoint := e.config.DataEndpoint; endpoint != "" {
clientOpts = append(clientOpts, option.WithEndpoint(endpoint))
}
return NewClient(context.Background(), e.config.Project, e.config.Instance, clientOpts...)
}

View File

@@ -1,5 +1,5 @@
/*
Copyright 2015 Google Inc. All Rights Reserved.
Copyright 2015 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@ package bigtable
import (
"fmt"
"strings"
"time"
btpb "google.golang.org/genproto/googleapis/bigtable/v2"
)
@@ -50,7 +51,7 @@ func (cf chainFilter) proto() *btpb.RowFilter {
chain.Filters = append(chain.Filters, sf.proto())
}
return &btpb.RowFilter{
Filter: &btpb.RowFilter_Chain_{chain},
Filter: &btpb.RowFilter_Chain_{Chain: chain},
}
}
@@ -76,7 +77,7 @@ func (ilf interleaveFilter) proto() *btpb.RowFilter {
inter.Filters = append(inter.Filters, sf.proto())
}
return &btpb.RowFilter{
Filter: &btpb.RowFilter_Interleave_{inter},
Filter: &btpb.RowFilter_Interleave_{Interleave: inter},
}
}
@@ -90,7 +91,7 @@ type rowKeyFilter string
func (rkf rowKeyFilter) String() string { return fmt.Sprintf("row(%s)", string(rkf)) }
func (rkf rowKeyFilter) proto() *btpb.RowFilter {
return &btpb.RowFilter{Filter: &btpb.RowFilter_RowKeyRegexFilter{[]byte(rkf)}}
return &btpb.RowFilter{Filter: &btpb.RowFilter_RowKeyRegexFilter{RowKeyRegexFilter: []byte(rkf)}}
}
// FamilyFilter returns a filter that matches cells whose family name
@@ -103,7 +104,7 @@ type familyFilter string
func (ff familyFilter) String() string { return fmt.Sprintf("col(%s:)", string(ff)) }
func (ff familyFilter) proto() *btpb.RowFilter {
return &btpb.RowFilter{Filter: &btpb.RowFilter_FamilyNameRegexFilter{string(ff)}}
return &btpb.RowFilter{Filter: &btpb.RowFilter_FamilyNameRegexFilter{FamilyNameRegexFilter: string(ff)}}
}
// ColumnFilter returns a filter that matches cells whose column name
@@ -116,7 +117,7 @@ type columnFilter string
func (cf columnFilter) String() string { return fmt.Sprintf("col(.*:%s)", string(cf)) }
func (cf columnFilter) proto() *btpb.RowFilter {
return &btpb.RowFilter{Filter: &btpb.RowFilter_ColumnQualifierRegexFilter{[]byte(cf)}}
return &btpb.RowFilter{Filter: &btpb.RowFilter_ColumnQualifierRegexFilter{ColumnQualifierRegexFilter: []byte(cf)}}
}
// ValueFilter returns a filter that matches cells whose value
@@ -129,7 +130,7 @@ type valueFilter string
func (vf valueFilter) String() string { return fmt.Sprintf("value_match(%s)", string(vf)) }
func (vf valueFilter) proto() *btpb.RowFilter {
return &btpb.RowFilter{Filter: &btpb.RowFilter_ValueRegexFilter{[]byte(vf)}}
return &btpb.RowFilter{Filter: &btpb.RowFilter_ValueRegexFilter{ValueRegexFilter: []byte(vf)}}
}
// LatestNFilter returns a filter that matches the most recent N cells in each column.
@@ -140,7 +141,7 @@ type latestNFilter int32
func (lnf latestNFilter) String() string { return fmt.Sprintf("col(*,%d)", lnf) }
func (lnf latestNFilter) proto() *btpb.RowFilter {
return &btpb.RowFilter{Filter: &btpb.RowFilter_CellsPerColumnLimitFilter{int32(lnf)}}
return &btpb.RowFilter{Filter: &btpb.RowFilter_CellsPerColumnLimitFilter{CellsPerColumnLimitFilter: int32(lnf)}}
}
// StripValueFilter returns a filter that replaces each value with the empty string.
@@ -150,7 +151,180 @@ type stripValueFilter struct{}
func (stripValueFilter) String() string { return "strip_value()" }
func (stripValueFilter) proto() *btpb.RowFilter {
return &btpb.RowFilter{Filter: &btpb.RowFilter_StripValueTransformer{true}}
return &btpb.RowFilter{Filter: &btpb.RowFilter_StripValueTransformer{StripValueTransformer: true}}
}
// TODO(dsymonds): More filters: cond, col/ts/value range, sampling
// TimestampRangeFilter returns a filter that matches any cells whose timestamp is within the given time bounds. A zero
// time means no bound.
// The timestamp will be truncated to millisecond granularity.
func TimestampRangeFilter(startTime time.Time, endTime time.Time) Filter {
trf := timestampRangeFilter{}
if !startTime.IsZero() {
trf.startTime = Time(startTime)
}
if !endTime.IsZero() {
trf.endTime = Time(endTime)
}
return trf
}
// TimestampRangeFilterMicros returns a filter that matches any cells whose timestamp is within the given time bounds,
// specified in units of microseconds since 1 January 1970. A zero value for the end time is interpreted as no bound.
// The timestamp will be truncated to millisecond granularity.
func TimestampRangeFilterMicros(startTime Timestamp, endTime Timestamp) Filter {
return timestampRangeFilter{startTime, endTime}
}
type timestampRangeFilter struct {
startTime Timestamp
endTime Timestamp
}
func (trf timestampRangeFilter) String() string {
return fmt.Sprintf("timestamp_range(%v,%v)", trf.startTime, trf.endTime)
}
func (trf timestampRangeFilter) proto() *btpb.RowFilter {
return &btpb.RowFilter{
Filter: &btpb.RowFilter_TimestampRangeFilter{TimestampRangeFilter: &btpb.TimestampRange{
StartTimestampMicros: int64(trf.startTime.TruncateToMilliseconds()),
EndTimestampMicros: int64(trf.endTime.TruncateToMilliseconds()),
},
}}
}
// ColumnRangeFilter returns a filter that matches a contiguous range of columns within a single
// family, as specified by an inclusive start qualifier and exclusive end qualifier.
func ColumnRangeFilter(family, start, end string) Filter {
return columnRangeFilter{family, start, end}
}
type columnRangeFilter struct {
family string
start string
end string
}
func (crf columnRangeFilter) String() string {
return fmt.Sprintf("columnRangeFilter(%s,%s,%s)", crf.family, crf.start, crf.end)
}
func (crf columnRangeFilter) proto() *btpb.RowFilter {
r := &btpb.ColumnRange{FamilyName: crf.family}
if crf.start != "" {
r.StartQualifier = &btpb.ColumnRange_StartQualifierClosed{StartQualifierClosed: []byte(crf.start)}
}
if crf.end != "" {
r.EndQualifier = &btpb.ColumnRange_EndQualifierOpen{EndQualifierOpen: []byte(crf.end)}
}
return &btpb.RowFilter{Filter: &btpb.RowFilter_ColumnRangeFilter{ColumnRangeFilter: r}}
}
// ValueRangeFilter returns a filter that matches cells with values that fall within
// the given range, as specified by an inclusive start value and exclusive end value.
func ValueRangeFilter(start, end []byte) Filter {
return valueRangeFilter{start, end}
}
type valueRangeFilter struct {
start []byte
end []byte
}
func (vrf valueRangeFilter) String() string {
return fmt.Sprintf("valueRangeFilter(%s,%s)", vrf.start, vrf.end)
}
func (vrf valueRangeFilter) proto() *btpb.RowFilter {
r := &btpb.ValueRange{}
if vrf.start != nil {
r.StartValue = &btpb.ValueRange_StartValueClosed{StartValueClosed: vrf.start}
}
if vrf.end != nil {
r.EndValue = &btpb.ValueRange_EndValueOpen{EndValueOpen: vrf.end}
}
return &btpb.RowFilter{Filter: &btpb.RowFilter_ValueRangeFilter{ValueRangeFilter: r}}
}
// ConditionFilter returns a filter that evaluates to one of two possible filters depending
// on whether or not the given predicate filter matches at least one cell.
// If the matched filter is nil then no results will be returned.
// IMPORTANT NOTE: The predicate filter does not execute atomically with the
// true and false filters, which may lead to inconsistent or unexpected
// results. Additionally, condition filters have poor performance, especially
// when filters are set for the false condition.
func ConditionFilter(predicateFilter, trueFilter, falseFilter Filter) Filter {
return conditionFilter{predicateFilter, trueFilter, falseFilter}
}
type conditionFilter struct {
predicateFilter Filter
trueFilter Filter
falseFilter Filter
}
func (cf conditionFilter) String() string {
return fmt.Sprintf("conditionFilter(%s,%s,%s)", cf.predicateFilter, cf.trueFilter, cf.falseFilter)
}
func (cf conditionFilter) proto() *btpb.RowFilter {
var tf *btpb.RowFilter
var ff *btpb.RowFilter
if cf.trueFilter != nil {
tf = cf.trueFilter.proto()
}
if cf.falseFilter != nil {
ff = cf.falseFilter.proto()
}
return &btpb.RowFilter{
Filter: &btpb.RowFilter_Condition_{Condition: &btpb.RowFilter_Condition{
PredicateFilter: cf.predicateFilter.proto(),
TrueFilter: tf,
FalseFilter: ff,
}}}
}
// CellsPerRowOffsetFilter returns a filter that skips the first N cells of each row, matching all subsequent cells.
func CellsPerRowOffsetFilter(n int) Filter {
return cellsPerRowOffsetFilter(n)
}
type cellsPerRowOffsetFilter int32
func (cof cellsPerRowOffsetFilter) String() string {
return fmt.Sprintf("cells_per_row_offset(%d)", cof)
}
func (cof cellsPerRowOffsetFilter) proto() *btpb.RowFilter {
return &btpb.RowFilter{Filter: &btpb.RowFilter_CellsPerRowOffsetFilter{CellsPerRowOffsetFilter: int32(cof)}}
}
// CellsPerRowLimitFilter returns a filter that matches only the first N cells of each row.
func CellsPerRowLimitFilter(n int) Filter {
return cellsPerRowLimitFilter(n)
}
type cellsPerRowLimitFilter int32
func (clf cellsPerRowLimitFilter) String() string {
return fmt.Sprintf("cells_per_row_limit(%d)", clf)
}
func (clf cellsPerRowLimitFilter) proto() *btpb.RowFilter {
return &btpb.RowFilter{Filter: &btpb.RowFilter_CellsPerRowLimitFilter{CellsPerRowLimitFilter: int32(clf)}}
}
// RowSampleFilter returns a filter that matches a row with a probability of p (must be in the interval (0, 1)).
func RowSampleFilter(p float64) Filter {
return rowSampleFilter(p)
}
type rowSampleFilter float64
func (rsf rowSampleFilter) String() string {
return fmt.Sprintf("filter(%f)", rsf)
}
func (rsf rowSampleFilter) proto() *btpb.RowFilter {
return &btpb.RowFilter{Filter: &btpb.RowFilter_RowSampleFilter{RowSampleFilter: float64(rsf)}}
}

View File

@@ -1,5 +1,5 @@
/*
Copyright 2015 Google Inc. All Rights Reserved.
Copyright 2015 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -52,7 +52,7 @@ func (ip intersectionPolicy) proto() *bttdpb.GcRule {
inter.Rules = append(inter.Rules, sp.proto())
}
return &bttdpb.GcRule{
Rule: &bttdpb.GcRule_Intersection_{inter},
Rule: &bttdpb.GcRule_Intersection_{Intersection: inter},
}
}
@@ -77,7 +77,7 @@ func (up unionPolicy) proto() *bttdpb.GcRule {
union.Rules = append(union.Rules, sp.proto())
}
return &bttdpb.GcRule{
Rule: &bttdpb.GcRule_Union_{union},
Rule: &bttdpb.GcRule_Union_{Union: union},
}
}
@@ -90,7 +90,7 @@ type maxVersionsPolicy int
func (mvp maxVersionsPolicy) String() string { return fmt.Sprintf("versions() > %d", int(mvp)) }
func (mvp maxVersionsPolicy) proto() *bttdpb.GcRule {
return &bttdpb.GcRule{Rule: &bttdpb.GcRule_MaxNumVersions{int32(mvp)}}
return &bttdpb.GcRule{Rule: &bttdpb.GcRule_MaxNumVersions{MaxNumVersions: int32(mvp)}}
}
// MaxAgePolicy returns a GC policy that applies to all cells
@@ -123,9 +123,45 @@ func (ma maxAgePolicy) proto() *bttdpb.GcRule {
// Fix this if people care about GC policies over 290 years.
ns := time.Duration(ma).Nanoseconds()
return &bttdpb.GcRule{
Rule: &bttdpb.GcRule_MaxAge{&durpb.Duration{
Rule: &bttdpb.GcRule_MaxAge{MaxAge: &durpb.Duration{
Seconds: ns / 1e9,
Nanos: int32(ns % 1e9),
}},
}
}
type noGCPolicy struct{}
func (n noGCPolicy) String() string { return "" }
func (n noGCPolicy) proto() *bttdpb.GcRule { return &bttdpb.GcRule{Rule: nil} }
// NoGcPolicy applies to all cells setting maxage and maxversions to nil implies no gc policies
func NoGcPolicy() GCPolicy { return noGCPolicy{} }
// GCRuleToString converts the given GcRule proto to a user-visible string.
func GCRuleToString(rule *bttdpb.GcRule) string {
if rule == nil {
return "<never>"
}
switch r := rule.Rule.(type) {
case *bttdpb.GcRule_MaxNumVersions:
return MaxVersionsPolicy(int(r.MaxNumVersions)).String()
case *bttdpb.GcRule_MaxAge:
return MaxAgePolicy(time.Duration(r.MaxAge.Seconds) * time.Second).String()
case *bttdpb.GcRule_Intersection_:
return joinRules(r.Intersection.Rules, " && ")
case *bttdpb.GcRule_Union_:
return joinRules(r.Union.Rules, " || ")
default:
return ""
}
}
func joinRules(rules []*bttdpb.GcRule, sep string) string {
var chunks []string
for _, r := range rules {
chunks = append(chunks, GCRuleToString(r))
}
return "(" + strings.Join(chunks, sep) + ")"
}

46
vendor/cloud.google.com/go/bigtable/gc_test.go generated vendored Normal file
View File

@@ -0,0 +1,46 @@
/*
Copyright 2017 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bigtable
import (
"testing"
"time"
bttdpb "google.golang.org/genproto/googleapis/bigtable/admin/v2"
)
func TestGcRuleToString(t *testing.T) {
intersection := IntersectionPolicy(MaxVersionsPolicy(5), MaxVersionsPolicy(10), MaxAgePolicy(16*time.Hour))
var tests = []struct {
proto *bttdpb.GcRule
want string
}{
{MaxAgePolicy(72 * time.Hour).proto(), "age() > 3d"},
{MaxVersionsPolicy(5).proto(), "versions() > 5"},
{intersection.proto(), "(versions() > 5 && versions() > 10 && age() > 16h)"},
{UnionPolicy(intersection, MaxAgePolicy(72*time.Hour)).proto(),
"((versions() > 5 && versions() > 10 && age() > 16h) || age() > 3d)"},
}
for _, test := range tests {
got := GCRuleToString(test.proto)
if got != test.want {
t.Errorf("got gc rule string: %v, wanted: %v", got, test.want)
}
}
}

View File

@@ -0,0 +1,262 @@
/*
Copyright 2015 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package cbtconfig encapsulates common code for reading configuration from .cbtrc and gcloud.
package cbtconfig
import (
"bufio"
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"log"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"time"
"golang.org/x/oauth2"
"google.golang.org/grpc/credentials"
)
// Config represents a configuration.
type Config struct {
Project, Instance string // required
Creds string // optional
AdminEndpoint string // optional
DataEndpoint string // optional
CertFile string // optional
UserAgent string // optional
TokenSource oauth2.TokenSource // derived
TLSCreds credentials.TransportCredentials // derived
}
// RequiredFlags describes the flag requirements for a cbt command.
type RequiredFlags uint
const (
// NoneRequired specifies that not flags are required.
NoneRequired RequiredFlags = 0
// ProjectRequired specifies that the -project flag is required.
ProjectRequired RequiredFlags = 1 << iota
// InstanceRequired specifies that the -instance flag is required.
InstanceRequired
// ProjectAndInstanceRequired specifies that both -project and -instance is required.
ProjectAndInstanceRequired = ProjectRequired | InstanceRequired
)
// RegisterFlags registers a set of standard flags for this config.
// It should be called before flag.Parse.
func (c *Config) RegisterFlags() {
flag.StringVar(&c.Project, "project", c.Project, "project ID, if unset uses gcloud configured project")
flag.StringVar(&c.Instance, "instance", c.Instance, "Cloud Bigtable instance")
flag.StringVar(&c.Creds, "creds", c.Creds, "if set, use application credentials in this file")
flag.StringVar(&c.AdminEndpoint, "admin-endpoint", c.AdminEndpoint, "Override the admin api endpoint")
flag.StringVar(&c.DataEndpoint, "data-endpoint", c.DataEndpoint, "Override the data api endpoint")
flag.StringVar(&c.CertFile, "cert-file", c.CertFile, "Override the TLS certificates file")
flag.StringVar(&c.UserAgent, "user-agent", c.UserAgent, "Override the user agent string")
}
// CheckFlags checks that the required config values are set.
func (c *Config) CheckFlags(required RequiredFlags) error {
var missing []string
if c.CertFile != "" {
b, err := ioutil.ReadFile(c.CertFile)
if err != nil {
return fmt.Errorf("Failed to load certificates from %s: %v", c.CertFile, err)
}
cp := x509.NewCertPool()
if !cp.AppendCertsFromPEM(b) {
return fmt.Errorf("Failed to append certificates from %s", c.CertFile)
}
c.TLSCreds = credentials.NewTLS(&tls.Config{RootCAs: cp})
}
if required != NoneRequired {
c.SetFromGcloud()
}
if required&ProjectRequired != 0 && c.Project == "" {
missing = append(missing, "-project")
}
if required&InstanceRequired != 0 && c.Instance == "" {
missing = append(missing, "-instance")
}
if len(missing) > 0 {
return fmt.Errorf("Missing %s", strings.Join(missing, " and "))
}
return nil
}
// Filename returns the filename consulted for standard configuration.
func Filename() string {
// TODO(dsymonds): Might need tweaking for Windows.
return filepath.Join(os.Getenv("HOME"), ".cbtrc")
}
// Load loads a .cbtrc file.
// If the file is not present, an empty config is returned.
func Load() (*Config, error) {
filename := Filename()
data, err := ioutil.ReadFile(filename)
if err != nil {
// silent fail if the file isn't there
if os.IsNotExist(err) {
return &Config{}, nil
}
return nil, fmt.Errorf("Reading %s: %v", filename, err)
}
c := new(Config)
s := bufio.NewScanner(bytes.NewReader(data))
for s.Scan() {
line := s.Text()
i := strings.Index(line, "=")
if i < 0 {
return nil, fmt.Errorf("Bad line in %s: %q", filename, line)
}
key, val := strings.TrimSpace(line[:i]), strings.TrimSpace(line[i+1:])
switch key {
default:
return nil, fmt.Errorf("Unknown key in %s: %q", filename, key)
case "project":
c.Project = val
case "instance":
c.Instance = val
case "creds":
c.Creds = val
case "admin-endpoint":
c.AdminEndpoint = val
case "data-endpoint":
c.DataEndpoint = val
case "cert-file":
c.CertFile = val
case "user-agent":
c.UserAgent = val
}
}
return c, s.Err()
}
// GcloudCredential holds gcloud credential information.
type GcloudCredential struct {
AccessToken string `json:"access_token"`
Expiry time.Time `json:"token_expiry"`
}
// Token creates an oauth2 token using gcloud credentials.
func (cred *GcloudCredential) Token() *oauth2.Token {
return &oauth2.Token{AccessToken: cred.AccessToken, TokenType: "Bearer", Expiry: cred.Expiry}
}
// GcloudConfig holds gcloud configuration values.
type GcloudConfig struct {
Configuration struct {
Properties struct {
Core struct {
Project string `json:"project"`
} `json:"core"`
} `json:"properties"`
} `json:"configuration"`
Credential GcloudCredential `json:"credential"`
}
// GcloudCmdTokenSource holds the comamnd arguments. It is only intended to be set by the program.
// TODO(deklerk) Can this be unexported?
type GcloudCmdTokenSource struct {
Command string
Args []string
}
// Token implements the oauth2.TokenSource interface
func (g *GcloudCmdTokenSource) Token() (*oauth2.Token, error) {
gcloudConfig, err := LoadGcloudConfig(g.Command, g.Args)
if err != nil {
return nil, err
}
return gcloudConfig.Credential.Token(), nil
}
// LoadGcloudConfig retrieves the gcloud configuration values we need use via the
// 'config-helper' command
func LoadGcloudConfig(gcloudCmd string, gcloudCmdArgs []string) (*GcloudConfig, error) {
out, err := exec.Command(gcloudCmd, gcloudCmdArgs...).Output()
if err != nil {
return nil, fmt.Errorf("Could not retrieve gcloud configuration")
}
var gcloudConfig GcloudConfig
if err := json.Unmarshal(out, &gcloudConfig); err != nil {
return nil, fmt.Errorf("Could not parse gcloud configuration")
}
return &gcloudConfig, nil
}
// SetFromGcloud retrieves and sets any missing config values from the gcloud
// configuration if possible possible
func (c *Config) SetFromGcloud() error {
if c.Creds == "" {
c.Creds = os.Getenv("GOOGLE_APPLICATION_CREDENTIALS")
if c.Creds == "" {
log.Printf("-creds flag unset, will use gcloud credential")
}
} else {
os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", c.Creds)
}
if c.Project == "" {
log.Printf("-project flag unset, will use gcloud active project")
}
if c.Creds != "" && c.Project != "" {
return nil
}
gcloudCmd := "gcloud"
if runtime.GOOS == "windows" {
gcloudCmd = gcloudCmd + ".cmd"
}
gcloudCmdArgs := []string{"config", "config-helper",
"--format=json(configuration.properties.core.project,credential)"}
gcloudConfig, err := LoadGcloudConfig(gcloudCmd, gcloudCmdArgs)
if err != nil {
return err
}
if c.Project == "" && gcloudConfig.Configuration.Properties.Core.Project != "" {
log.Printf("gcloud active project is \"%s\"",
gcloudConfig.Configuration.Properties.Core.Project)
c.Project = gcloudConfig.Configuration.Properties.Core.Project
}
if c.Creds == "" {
c.TokenSource = oauth2.ReuseTokenSource(
gcloudConfig.Credential.Token(),
&GcloudCmdTokenSource{Command: gcloudCmd, Args: gcloudCmdArgs})
}
return nil
}

View File

@@ -1,99 +0,0 @@
/*
Copyright 2015 Google Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package cbtrc encapsulates common code for reading .cbtrc files.
package cbtrc
import (
"bufio"
"bytes"
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
)
// Config represents a configuration.
type Config struct {
Project, Instance string // required
Creds string // optional
}
// RegisterFlags registers a set of standard flags for this config.
// It should be called before flag.Parse.
func (c *Config) RegisterFlags() {
flag.StringVar(&c.Project, "project", c.Project, "project ID")
flag.StringVar(&c.Instance, "instance", c.Instance, "Cloud Bigtable instance")
flag.StringVar(&c.Creds, "creds", c.Creds, "if set, use application credentials in this file")
}
// CheckFlags checks that the required config values are set.
func (c *Config) CheckFlags() error {
var missing []string
if c.Project == "" {
missing = append(missing, "-project")
}
if c.Instance == "" {
missing = append(missing, "-instance")
}
if len(missing) > 0 {
return fmt.Errorf("Missing %s", strings.Join(missing, " and "))
}
return nil
}
// Filename returns the filename consulted for standard configuration.
func Filename() string {
// TODO(dsymonds): Might need tweaking for Windows.
return filepath.Join(os.Getenv("HOME"), ".cbtrc")
}
// Load loads a .cbtrc file.
// If the file is not present, an empty config is returned.
func Load() (*Config, error) {
filename := Filename()
data, err := ioutil.ReadFile(filename)
if err != nil {
// silent fail if the file isn't there
if os.IsNotExist(err) {
return &Config{}, nil
}
return nil, fmt.Errorf("Reading %s: %v", filename, err)
}
c := new(Config)
s := bufio.NewScanner(bytes.NewReader(data))
for s.Scan() {
line := s.Text()
i := strings.Index(line, "=")
if i < 0 {
return nil, fmt.Errorf("Bad line in %s: %q", filename, line)
}
key, val := strings.TrimSpace(line[:i]), strings.TrimSpace(line[i+1:])
switch key {
default:
return nil, fmt.Errorf("Unknown key in %s: %q", filename, key)
case "project":
c.Project = val
case "instance":
c.Instance = val
case "creds":
c.Creds = val
}
}
return c, s.Err()
}

View File

@@ -1,5 +1,5 @@
/*
Copyright 2016 Google Inc. All Rights Reserved.
Copyright 2016 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// This is ia snapshot from github.com/googleapis/gax-go with minor modifications.
// Package gax is a snapshot from github.com/googleapis/gax-go/v2 with minor modifications.
package gax
import (
@@ -23,12 +23,14 @@ import (
"google.golang.org/grpc/codes"
)
// CallOption is a generic interface for modifying the behavior of outbound calls.
type CallOption interface {
Resolve(*CallSettings)
}
type callOptions []CallOption
// Resolve resolves all call options individually.
func (opts callOptions) Resolve(s *CallSettings) *CallSettings {
for _, opt := range opts {
opt.Resolve(s)
@@ -36,30 +38,32 @@ func (opts callOptions) Resolve(s *CallSettings) *CallSettings {
return s
}
// Encapsulates the call settings for a particular API call.
// CallSettings encapsulates the call settings for a particular API call.
type CallSettings struct {
Timeout time.Duration
RetrySettings RetrySettings
}
// Per-call configurable settings for retrying upon transient failure.
// RetrySettings are per-call configurable settings for retrying upon transient failure.
type RetrySettings struct {
RetryCodes map[codes.Code]bool
BackoffSettings BackoffSettings
}
// Parameters to the exponential backoff algorithm for retrying.
// BackoffSettings are parameters to the exponential backoff algorithm for retrying.
type BackoffSettings struct {
DelayTimeoutSettings MultipliableDuration
RPCTimeoutSettings MultipliableDuration
}
// MultipliableDuration defines parameters for backoff settings.
type MultipliableDuration struct {
Initial time.Duration
Max time.Duration
Multiplier float64
}
// Resolve merges the receiver CallSettings into the given CallSettings.
func (w CallSettings) Resolve(s *CallSettings) {
s.Timeout = w.Timeout
s.RetrySettings = w.RetrySettings

View File

@@ -1,5 +1,5 @@
/*
Copyright 2015 Google Inc. All Rights Reserved.
Copyright 2015 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -14,23 +14,18 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// This is ia snapshot from github.com/googleapis/gax-go with minor modifications.
// Package gax is a snapshot from github.com/googleapis/gax-go/v2 with minor modifications.
package gax
import (
"math/rand"
"context"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"log"
"os"
)
var logger *log.Logger = log.New(os.Stderr, "", log.LstdFlags)
// A user defined call stub.
// APICall is a user defined call stub.
type APICall func(context.Context) error
// scaleDuration returns the product of a and mult.
@@ -60,10 +55,7 @@ func invokeWithRetry(ctx context.Context, stub APICall, callSettings CallSetting
return err
}
// Sleep a random amount up to the current delay
d := time.Duration(rand.Int63n(int64(delay)))
delayCtx, _ := context.WithTimeout(ctx, delay)
logger.Printf("Retryable error: %v, retrying in %v", err, d)
<-delayCtx.Done()
delay = scaleDuration(delay, backoffSettings.DelayTimeoutSettings.Multiplier)

View File

@@ -1,5 +1,5 @@
/*
Copyright 2015 Google Inc. All Rights Reserved.
Copyright 2015 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -16,12 +16,12 @@ limitations under the License.
package gax
import (
"context"
"testing"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func TestRandomizedDelays(t *testing.T) {
@@ -34,16 +34,16 @@ func TestRandomizedDelays(t *testing.T) {
deadline := time.Now().Add(1 * time.Second)
ctx, _ := context.WithDeadline(context.Background(), deadline)
var invokeTime time.Time
Invoke(ctx, func(childCtx context.Context) error {
_ = Invoke(ctx, func(childCtx context.Context) error {
// Keep failing, make sure we never slept more than max (plus a fudge factor)
if !invokeTime.IsZero() {
if time.Since(invokeTime) > (max + 20*time.Millisecond) {
t.Fatalf("Slept too long: %v", max)
if got, want := time.Since(invokeTime), max; got > (want + 20*time.Millisecond) {
t.Logf("Slept too long. Got: %v, want: %v", got, max)
}
}
invokeTime = time.Now()
// Workaround for `go vet`: https://github.com/grpc/grpc-go/issues/90
errf := grpc.Errorf
errf := status.Errorf
return errf(codes.Unavailable, "")
}, settings...)
}

View File

@@ -1,5 +1,5 @@
/*
Copyright 2015 Google Inc. All Rights Reserved.
Copyright 2015 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.

View File

@@ -1,4 +1,4 @@
// Copyright 2016 Google Inc. All Rights Reserved.
// Copyright 2016 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -70,6 +70,7 @@ func quantile(data []time.Duration, k, q int) (quantile time.Duration, ok bool)
return time.Duration(weightLower*float64(data[lower]) + weightUpper*float64(data[upper])), true
}
// Aggregate is an aggregate of latencies.
type Aggregate struct {
Name string
Count, Errors int
@@ -123,10 +124,15 @@ func (agg *Aggregate) String() string {
// WriteCSV writes a csv file to the given Writer,
// with a header row and one row per aggregate.
func WriteCSV(aggs []*Aggregate, iow io.Writer) error {
func WriteCSV(aggs []*Aggregate, iow io.Writer) (err error) {
w := csv.NewWriter(iow)
defer w.Flush()
err := w.Write([]string{"name", "count", "errors", "min", "median", "max", "p75", "p90", "p95", "p99"})
defer func() {
w.Flush()
if err == nil {
err = w.Error()
}
}()
err = w.Write([]string{"name", "count", "errors", "min", "median", "max", "p75", "p90", "p95", "p99"})
if err != nil {
return err
}

View File

@@ -1,5 +1,5 @@
/*
Copyright 2016 Google Inc. All Rights Reserved.
Copyright 2016 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -152,9 +152,8 @@ func (cr *chunkReader) handleCellValue(cc *btpb.ReadRowsResponse_CellChunk) Row
if cc.GetCommitRow() {
return cr.commitRow()
} else {
cr.state = rowInProgress
}
cr.state = rowInProgress
}
return nil
@@ -163,7 +162,7 @@ func (cr *chunkReader) handleCellValue(cc *btpb.ReadRowsResponse_CellChunk) Row
func (cr *chunkReader) finishCell() {
ri := ReadItem{
Row: string(cr.curKey),
Column: fmt.Sprintf("%s:%s", cr.curFam, cr.curQual),
Column: string(cr.curFam) + ":" + string(cr.curQual),
Timestamp: Timestamp(cr.curTS),
Value: cr.curVal,
}

View File

@@ -1,5 +1,5 @@
/*
Copyright 2016 Google Inc. All Rights Reserved.
Copyright 2016 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -20,10 +20,10 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"reflect"
"strings"
"testing"
"cloud.google.com/go/internal/testutil"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/wrappers"
btspb "google.golang.org/genproto/googleapis/bigtable/v2"
@@ -48,7 +48,7 @@ func TestSingleCell(t *testing.T) {
t.Fatalf("Family name length mismatch %d, %d", 1, len(row["fm"]))
}
want := []ReadItem{ri("rk", "fm", "col", 1, "value")}
if !reflect.DeepEqual(row["fm"], want) {
if !testutil.Equal(row["fm"], want) {
t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm"], want)
}
if err := cr.Close(); err != nil {
@@ -59,10 +59,10 @@ func TestSingleCell(t *testing.T) {
func TestMultipleCells(t *testing.T) {
cr := newChunkReader()
cr.Process(cc("rs", "fm1", "col1", 0, "val1", 0, false))
cr.Process(cc("rs", "fm1", "col1", 1, "val2", 0, false))
cr.Process(cc("rs", "fm1", "col2", 0, "val3", 0, false))
cr.Process(cc("rs", "fm2", "col1", 0, "val4", 0, false))
mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "val1", 0, false))
mustProcess(t, cr, cc("rs", "fm1", "col1", 1, "val2", 0, false))
mustProcess(t, cr, cc("rs", "fm1", "col2", 0, "val3", 0, false))
mustProcess(t, cr, cc("rs", "fm2", "col1", 0, "val4", 0, false))
row, err := cr.Process(cc("rs", "fm2", "col2", 1, "extralongval5", 0, true))
if err != nil {
t.Fatalf("Processing chunk: %v", err)
@@ -76,14 +76,14 @@ func TestMultipleCells(t *testing.T) {
ri("rs", "fm1", "col1", 1, "val2"),
ri("rs", "fm1", "col2", 0, "val3"),
}
if !reflect.DeepEqual(row["fm1"], want) {
if !testutil.Equal(row["fm1"], want) {
t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want)
}
want = []ReadItem{
ri("rs", "fm2", "col1", 0, "val4"),
ri("rs", "fm2", "col2", 1, "extralongval5"),
}
if !reflect.DeepEqual(row["fm2"], want) {
if !testutil.Equal(row["fm2"], want) {
t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm2"], want)
}
if err := cr.Close(); err != nil {
@@ -94,8 +94,8 @@ func TestMultipleCells(t *testing.T) {
func TestSplitCells(t *testing.T) {
cr := newChunkReader()
cr.Process(cc("rs", "fm1", "col1", 0, "hello ", 11, false))
cr.Process(ccData("world", 0, false))
mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "hello ", 11, false))
mustProcess(t, cr, ccData("world", 0, false))
row, err := cr.Process(cc("rs", "fm1", "col2", 0, "val2", 0, true))
if err != nil {
t.Fatalf("Processing chunk: %v", err)
@@ -108,7 +108,7 @@ func TestSplitCells(t *testing.T) {
ri("rs", "fm1", "col1", 0, "hello world"),
ri("rs", "fm1", "col2", 0, "val2"),
}
if !reflect.DeepEqual(row["fm1"], want) {
if !testutil.Equal(row["fm1"], want) {
t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want)
}
if err := cr.Close(); err != nil {
@@ -124,7 +124,7 @@ func TestMultipleRows(t *testing.T) {
t.Fatalf("Processing chunk: %v", err)
}
want := []ReadItem{ri("rs1", "fm1", "col1", 1, "val1")}
if !reflect.DeepEqual(row["fm1"], want) {
if !testutil.Equal(row["fm1"], want) {
t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want)
}
@@ -133,7 +133,7 @@ func TestMultipleRows(t *testing.T) {
t.Fatalf("Processing chunk: %v", err)
}
want = []ReadItem{ri("rs2", "fm2", "col2", 2, "val2")}
if !reflect.DeepEqual(row["fm2"], want) {
if !testutil.Equal(row["fm2"], want) {
t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm2"], want)
}
@@ -150,7 +150,7 @@ func TestBlankQualifier(t *testing.T) {
t.Fatalf("Processing chunk: %v", err)
}
want := []ReadItem{ri("rs1", "fm1", "", 1, "val1")}
if !reflect.DeepEqual(row["fm1"], want) {
if !testutil.Equal(row["fm1"], want) {
t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want)
}
@@ -159,7 +159,7 @@ func TestBlankQualifier(t *testing.T) {
t.Fatalf("Processing chunk: %v", err)
}
want = []ReadItem{ri("rs2", "fm2", "col2", 2, "val2")}
if !reflect.DeepEqual(row["fm2"], want) {
if !testutil.Equal(row["fm2"], want) {
t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm2"], want)
}
@@ -170,14 +170,13 @@ func TestBlankQualifier(t *testing.T) {
func TestReset(t *testing.T) {
cr := newChunkReader()
cr.Process(cc("rs", "fm1", "col1", 0, "val1", 0, false))
cr.Process(cc("rs", "fm1", "col1", 1, "val2", 0, false))
cr.Process(cc("rs", "fm1", "col2", 0, "val3", 0, false))
cr.Process(ccReset())
row, _ := cr.Process(cc("rs1", "fm1", "col1", 1, "val1", 0, true))
mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "val1", 0, false))
mustProcess(t, cr, cc("rs", "fm1", "col1", 1, "val2", 0, false))
mustProcess(t, cr, cc("rs", "fm1", "col2", 0, "val3", 0, false))
mustProcess(t, cr, ccReset())
row := mustProcess(t, cr, cc("rs1", "fm1", "col1", 1, "val1", 0, true))
want := []ReadItem{ri("rs1", "fm1", "col1", 1, "val1")}
if !reflect.DeepEqual(row["fm1"], want) {
if !testutil.Equal(row["fm1"], want) {
t.Fatalf("Reset: got: %v\nwant: %v\n", row["fm1"], want)
}
if err := cr.Close(); err != nil {
@@ -188,13 +187,21 @@ func TestReset(t *testing.T) {
func TestNewFamEmptyQualifier(t *testing.T) {
cr := newChunkReader()
cr.Process(cc("rs", "fm1", "col1", 0, "val1", 0, false))
mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "val1", 0, false))
_, err := cr.Process(cc(nilStr, "fm2", nilStr, 0, "val2", 0, true))
if err == nil {
t.Fatalf("Expected error on second chunk with no qualifier set")
}
}
func mustProcess(t *testing.T, cr *chunkReader, cc *btspb.ReadRowsResponse_CellChunk) Row {
row, err := cr.Process(cc)
if err != nil {
t.Fatal(err)
}
return row
}
// The read rows acceptance test reads a json file specifying a number of tests,
// each consisting of one or more cell chunk text protos and one or more resulting
// cells or errors.
@@ -218,13 +225,13 @@ type TestResult struct {
}
func TestAcceptance(t *testing.T) {
testJson, err := ioutil.ReadFile("./testdata/read-rows-acceptance-test.json")
testJSON, err := ioutil.ReadFile("./testdata/read-rows-acceptance-test.json")
if err != nil {
t.Fatalf("could not open acceptance test file %v", err)
}
var accTest AcceptanceTest
err = json.Unmarshal(testJson, &accTest)
err = json.Unmarshal(testJSON, &accTest)
if err != nil {
t.Fatalf("could not parse acceptance test file: %v", err)
}
@@ -279,7 +286,7 @@ func runTestCase(t *testing.T, test TestCase) {
got := toSet(results)
want := toSet(test.Results)
if !reflect.DeepEqual(got, want) {
if !testutil.Equal(got, want) {
t.Fatalf("[%s]: got: %v\nwant: %v\n", test.Name, got, want)
}
}

View File

@@ -1,5 +1,5 @@
/*
Copyright 2016 Google Inc. All Rights Reserved.
Copyright 2016 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -16,37 +16,39 @@ limitations under the License.
package bigtable
import (
"reflect"
"context"
"strings"
"testing"
"time"
"cloud.google.com/go/bigtable/bttest"
"cloud.google.com/go/internal/testutil"
"github.com/golang/protobuf/ptypes/wrappers"
"golang.org/x/net/context"
"github.com/google/go-cmp/cmp"
"google.golang.org/api/option"
btpb "google.golang.org/genproto/googleapis/bigtable/v2"
rpcpb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func setupFakeServer(opt ...grpc.ServerOption) (tbl *Table, cleanup func(), err error) {
srv, err := bttest.NewServer("127.0.0.1:0", opt...)
srv, err := bttest.NewServer("localhost:0", opt...)
if err != nil {
return nil, nil, err
}
conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure())
conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
return nil, nil, err
}
client, err := NewClient(context.Background(), "client", "instance", option.WithGRPCConn(conn))
client, err := NewClient(context.Background(), "client", "instance", option.WithGRPCConn(conn), option.WithGRPCDialOption(grpc.WithBlock()))
if err != nil {
return nil, nil, err
}
adminClient, err := NewAdminClient(context.Background(), "client", "instance", option.WithGRPCConn(conn))
adminClient, err := NewAdminClient(context.Background(), "client", "instance", option.WithGRPCConn(conn), option.WithGRPCDialOption(grpc.WithBlock()))
if err != nil {
return nil, nil, err
}
@@ -75,18 +77,18 @@ func TestRetryApply(t *testing.T) {
errInjector := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if strings.HasSuffix(info.FullMethod, "MutateRow") && errCount < 3 {
errCount++
return nil, grpc.Errorf(code, "")
return nil, status.Errorf(code, "")
}
return handler(ctx, req)
}
tbl, cleanup, err := setupFakeServer(grpc.UnaryInterceptor(errInjector))
defer cleanup()
if err != nil {
t.Fatalf("fake server setup: %v", err)
}
defer cleanup()
mut := NewMutation()
mut.Set("cf", "col", 1, []byte("val"))
mut.Set("cf", "col", 1000, []byte("val"))
if err := tbl.Apply(ctx, "row1", mut); err != nil {
t.Errorf("applying single mutation with retries: %v", err)
}
@@ -108,8 +110,8 @@ func TestRetryApply(t *testing.T) {
mutTrue := NewMutation()
mutTrue.DeleteRow()
mutFalse := NewMutation()
mutFalse.Set("cf", "col", 1, []byte("val"))
condMut := NewCondMutation(ValueFilter("."), mutTrue, mutFalse)
mutFalse.Set("cf", "col", 1000, []byte("val"))
condMut := NewCondMutation(ValueFilter(".*"), mutTrue, mutFalse)
errCount = 0
code = codes.Unavailable // Will be retried
@@ -155,7 +157,7 @@ func TestRetryApplyBulk(t *testing.T) {
f = func(ss grpc.ServerStream) error {
if errCount < 3 {
errCount++
return grpc.Errorf(codes.Aborted, "")
return status.Errorf(codes.Aborted, "")
}
return nil
}
@@ -177,28 +179,28 @@ func TestRetryApplyBulk(t *testing.T) {
f = func(ss grpc.ServerStream) error {
var err error
req := new(btpb.MutateRowsRequest)
ss.RecvMsg(req)
must(ss.RecvMsg(req))
switch errCount {
case 0:
// Retryable request failure
err = grpc.Errorf(codes.Unavailable, "")
err = status.Errorf(codes.Unavailable, "")
case 1:
// Two mutations fail
writeMutateRowsResponse(ss, codes.Unavailable, codes.OK, codes.Aborted)
must(writeMutateRowsResponse(ss, codes.Unavailable, codes.OK, codes.Aborted))
err = nil
case 2:
// Two failures were retried. One will succeed.
if want, got := 2, len(req.Entries); want != got {
t.Errorf("2 bulk retries, got: %d, want %d", got, want)
}
writeMutateRowsResponse(ss, codes.OK, codes.Aborted)
must(writeMutateRowsResponse(ss, codes.OK, codes.Aborted))
err = nil
case 3:
// One failure was retried and will succeed.
if want, got := 1, len(req.Entries); want != got {
t.Errorf("1 bulk retry, got: %d, want %d", got, want)
}
writeMutateRowsResponse(ss, codes.OK)
must(writeMutateRowsResponse(ss, codes.OK))
err = nil
}
errCount++
@@ -216,12 +218,12 @@ func TestRetryApplyBulk(t *testing.T) {
f = func(ss grpc.ServerStream) error {
var err error
req := new(btpb.MutateRowsRequest)
ss.RecvMsg(req)
must(ss.RecvMsg(req))
switch errCount {
case 0:
// Give non-idempotent mutation a retryable error code.
// Nothing should be retried.
writeMutateRowsResponse(ss, codes.FailedPrecondition, codes.Aborted)
must(writeMutateRowsResponse(ss, codes.FailedPrecondition, codes.Aborted))
err = nil
case 1:
t.Errorf("unretryable errors: got one retry, want no retries")
@@ -231,20 +233,19 @@ func TestRetryApplyBulk(t *testing.T) {
}
errors, err = tbl.ApplyBulk(ctx, []string{"row1", "row2"}, []*Mutation{m1, niMut})
if err != nil {
t.Errorf("unretryable errors: request failed %v")
t.Errorf("unretryable errors: request failed %v", err)
}
want := []error{
grpc.Errorf(codes.FailedPrecondition, ""),
grpc.Errorf(codes.Aborted, ""),
status.Errorf(codes.FailedPrecondition, ""),
status.Errorf(codes.Aborted, ""),
}
if !reflect.DeepEqual(want, errors) {
if !testutil.Equal(want, errors) {
t.Errorf("unretryable errors: got: %v, want: %v", errors, want)
}
// Test individual errors and a deadline exceeded
f = func(ss grpc.ServerStream) error {
writeMutateRowsResponse(ss, codes.FailedPrecondition, codes.OK, codes.Aborted)
return nil
return writeMutateRowsResponse(ss, codes.FailedPrecondition, codes.OK, codes.Aborted)
}
ctx, _ = context.WithTimeout(ctx, 100*time.Millisecond)
errors, err = tbl.ApplyBulk(ctx, []string{"row1", "row2", "row3"}, []*Mutation{m1, m2, m3})
@@ -273,15 +274,23 @@ func TestRetainRowsAfter(t *testing.T) {
prevRowKey := "m"
want := NewRange("m\x00", "z")
got := prevRowRange.retainRowsAfter(prevRowKey)
if !reflect.DeepEqual(want, got) {
if !testutil.Equal(want, got, cmp.AllowUnexported(RowRange{})) {
t.Errorf("range retry: got %v, want %v", got, want)
}
prevRowRangeList := RowRangeList{NewRange("a", "d"), NewRange("e", "g"), NewRange("h", "l")}
prevRowKey = "f"
wantRowRangeList := RowRangeList{NewRange("f\x00", "g"), NewRange("h", "l")}
got = prevRowRangeList.retainRowsAfter(prevRowKey)
if !testutil.Equal(wantRowRangeList, got, cmp.AllowUnexported(RowRange{})) {
t.Errorf("range list retry: got %v, want %v", got, wantRowRangeList)
}
prevRowList := RowList{"a", "b", "c", "d", "e", "f"}
prevRowKey = "b"
wantList := RowList{"c", "d", "e", "f"}
got = prevRowList.retainRowsAfter(prevRowKey)
if !reflect.DeepEqual(wantList, got) {
if !testutil.Equal(wantList, got) {
t.Errorf("list retry: got %v, want %v", got, wantList)
}
}
@@ -310,24 +319,27 @@ func TestRetryReadRows(t *testing.T) {
f = func(ss grpc.ServerStream) error {
var err error
req := new(btpb.ReadRowsRequest)
ss.RecvMsg(req)
must(ss.RecvMsg(req))
switch errCount {
case 0:
// Retryable request failure
err = grpc.Errorf(codes.Unavailable, "")
err = status.Errorf(codes.Unavailable, "")
case 1:
// Write two rows then error
writeReadRowsResponse(ss, "a", "b")
err = grpc.Errorf(codes.Unavailable, "")
if want, got := "a", string(req.Rows.RowRanges[0].GetStartKeyClosed()); want != got {
t.Errorf("first retry, no data received yet: got %q, want %q", got, want)
}
must(writeReadRowsResponse(ss, "a", "b"))
err = status.Errorf(codes.Unavailable, "")
case 2:
// Retryable request failure
if want, got := "b\x00", string(req.Rows.RowRanges[0].GetStartKeyClosed()); want != got {
t.Errorf("2 range retries: got %q, want %q", got, want)
}
err = grpc.Errorf(codes.Unavailable, "")
err = status.Errorf(codes.Unavailable, "")
case 3:
// Write two more rows
writeReadRowsResponse(ss, "c", "d")
must(writeReadRowsResponse(ss, "c", "d"))
err = nil
}
errCount++
@@ -335,12 +347,12 @@ func TestRetryReadRows(t *testing.T) {
}
var got []string
tbl.ReadRows(ctx, NewRange("a", "z"), func(r Row) bool {
must(tbl.ReadRows(ctx, NewRange("a", "z"), func(r Row) bool {
got = append(got, r.Key())
return true
})
}))
want := []string{"a", "b", "c", "d"}
if !reflect.DeepEqual(got, want) {
if !testutil.Equal(got, want) {
t.Errorf("retry range integration: got %v, want %v", got, want)
}
}
@@ -357,3 +369,9 @@ func writeReadRowsResponse(ss grpc.ServerStream, rowKeys ...string) error {
}
return ss.SendMsg(&btpb.ReadRowsResponse{Chunks: chunks})
}
func must(err error) {
if err != nil {
panic(err)
}
}