package themis

import (
	"fmt"
	"sync"
	"time"

	"github.com/juju/errors"
	"github.com/ngaut/log"
	"github.com/pingcap/go-hbase"
	"github.com/pingcap/go-themis/oracle"
)

type TxnConfig struct {
	ConcurrentPrewriteAndCommit bool
	WaitSecondaryCommit         bool
	TTLInMs                     uint64
	MaxRowsInOneTxn             int
	// options below is for debugging and testing
	brokenPrewriteSecondaryTest            bool
	brokenPrewriteSecondaryAndRollbackTest bool
	brokenCommitPrimaryTest                bool
	brokenCommitSecondaryTest              bool
}

var defaultTxnConf = TxnConfig{
	ConcurrentPrewriteAndCommit:            true,
	WaitSecondaryCommit:                    false,
	MaxRowsInOneTxn:                        50000,
	TTLInMs:                                5 * 1000, // default txn TTL: 5s
	brokenPrewriteSecondaryTest:            false,
	brokenPrewriteSecondaryAndRollbackTest: false,
	brokenCommitPrimaryTest:                false,
	brokenCommitSecondaryTest:              false,
}

type themisTxn struct {
	client             hbase.HBaseClient
	rpc                *themisRPC
	lockCleaner        LockManager
	oracle             oracle.Oracle
	mutationCache      *columnMutationCache
	startTs            uint64
	commitTs           uint64
	primaryRow         *rowMutation
	primary            *hbase.ColumnCoordinate
	secondaryRows      []*rowMutation
	secondary          []*hbase.ColumnCoordinate
	primaryRowOffset   int
	singleRowTxn       bool
	secondaryLockBytes []byte
	conf               TxnConfig
	hooks              *txnHook
}

var _ Txn = (*themisTxn)(nil)

var (
	// ErrSimulated is used when maybe rollback occurs error too.
	ErrSimulated           = errors.New("simulated error")
	maxCleanLockRetryCount = 30
	pauseTime              = 300 * time.Millisecond
)

func NewTxn(c hbase.HBaseClient, oracle oracle.Oracle) (Txn, error) {
	return NewTxnWithConf(c, defaultTxnConf, oracle)
}

func NewTxnWithConf(c hbase.HBaseClient, conf TxnConfig, oracle oracle.Oracle) (Txn, error) {
	var err error
	txn := &themisTxn{
		client:           c,
		mutationCache:    newColumnMutationCache(),
		oracle:           oracle,
		primaryRowOffset: -1,
		conf:             conf,
		rpc:              newThemisRPC(c, oracle, conf),
		hooks:            newHook(),
	}
	txn.startTs, err = txn.oracle.GetTimestamp()
	if err != nil {
		return nil, errors.Trace(err)
	}
	txn.lockCleaner = newThemisLockManager(txn.rpc, c)
	return txn, nil
}

func (txn *themisTxn) setHook(hooks *txnHook) {
	txn.hooks = hooks
}

func (txn *themisTxn) Gets(tbl string, gets []*hbase.Get) ([]*hbase.ResultRow, error) {
	results, err := txn.rpc.themisBatchGet([]byte(tbl), gets, txn.startTs, false)
	if err != nil {
		return nil, errors.Trace(err)
	}
	var ret []*hbase.ResultRow
	hasLock := false
	for _, r := range results {
		// if this row is locked, try clean lock and get again
		if isLockResult(r) {
			hasLock = true
			err = txn.constructLockAndClean([]byte(tbl), r.SortedColumns)
			if err != nil {
				// TODO if it's a conflict error, it means this lock
				// isn't expired, maybe we can retry or return partial results.
				return nil, errors.Trace(err)
			}
		}
		// it's OK, because themisBatchGet doesn't return nil value.
		ret = append(ret, r)
	}
	if hasLock {
		// after we cleaned locks, try to get again.
		ret, err = txn.rpc.themisBatchGet([]byte(tbl), gets, txn.startTs, true)
		if err != nil {
			return nil, errors.Trace(err)
		}
	}
	return ret, nil
}

func (txn *themisTxn) Get(tbl string, g *hbase.Get) (*hbase.ResultRow, error) {
	r, err := txn.rpc.themisGet([]byte(tbl), g, txn.startTs, false)
	if err != nil {
		return nil, errors.Trace(err)
	}
	// contain locks, try to clean and get again
	if r != nil && isLockResult(r) {
		r, err = txn.tryToCleanLockAndGetAgain([]byte(tbl), g, r.SortedColumns)
		if err != nil {
			return nil, errors.Trace(err)
		}
	}
	return r, nil
}

func (txn *themisTxn) Put(tbl string, p *hbase.Put) {
	// add mutation to buffer
	for _, e := range getEntriesFromPut(p) {
		txn.mutationCache.addMutation([]byte(tbl), p.Row, e.Column, e.typ, e.value, false)
	}
}

func (txn *themisTxn) Delete(tbl string, p *hbase.Delete) error {
	entries, err := getEntriesFromDel(p)
	if err != nil {
		return errors.Trace(err)
	}
	for _, e := range entries {
		txn.mutationCache.addMutation([]byte(tbl), p.Row, e.Column, e.typ, e.value, false)
	}
	return nil
}

func (txn *themisTxn) Commit() error {
	if txn.mutationCache.getMutationCount() == 0 {
		return nil
	}
	if txn.mutationCache.getRowCount() > txn.conf.MaxRowsInOneTxn {
		return ErrTooManyRows
	}

	txn.selectPrimaryAndSecondaries()
	err := txn.prewritePrimary()
	if err != nil {
		// no need to check wrong region here, hbase client will retry when
		// occurs single row NotInRegion error.
		log.Error(errors.ErrorStack(err))
		// it's safe to retry, because this transaction is not committed.
		return ErrRetryable
	}

	err = txn.prewriteSecondary()
	if err != nil {
		if isWrongRegionErr(err) {
			log.Warn("region info outdated")
			// reset hbase client buffered region info
			txn.client.CleanAllRegionCache()
		}
		return ErrRetryable
	}

	txn.commitTs, err = txn.oracle.GetTimestamp()
	if err != nil {
		log.Error(errors.ErrorStack(err))
		return ErrRetryable
	}
	err = txn.commitPrimary()
	if err != nil {
		// commit primary error, rollback
		log.Error("commit primary row failed", txn.startTs, err)
		txn.rollbackRow(txn.primaryRow.tbl, txn.primaryRow)
		txn.rollbackSecondaryRow(len(txn.secondaryRows) - 1)
		return ErrRetryable
	}
	txn.commitSecondary()
	log.Debug("themis txn commit successfully", txn.startTs, txn.commitTs)
	return nil
}

func (txn *themisTxn) commitSecondary() {
	if bypass, _, _ := txn.hooks.beforeCommitSecondary(txn, nil); !bypass {
		return
	}
	if txn.conf.brokenCommitSecondaryTest {
		txn.brokenCommitSecondary()
		return
	}
	if txn.conf.ConcurrentPrewriteAndCommit {
		txn.batchCommitSecondary(txn.conf.WaitSecondaryCommit)
	} else {
		txn.commitSecondarySync()
	}
}

func (txn *themisTxn) commitSecondarySync() {
	for _, r := range txn.secondaryRows {
		err := txn.rpc.commitSecondaryRow(r.tbl, r.row, r.mutationList(false), txn.startTs, txn.commitTs)
		if err != nil {
			// fail of secondary commit will not stop the commits of next
			// secondaries
			log.Warning(err)
		}
	}
}

func (txn *themisTxn) batchCommitSecondary(wait bool) error {
	//will batch commit all rows in a region
	rsRowMap, err := txn.groupByRegion()
	if err != nil {
		return errors.Trace(err)
	}

	wg := sync.WaitGroup{}
	for _, regionRowMap := range rsRowMap {
		wg.Add(1)
		_, firstRowM := getFirstEntity(regionRowMap)
		go func(cli *themisRPC, tbl string, rMap map[string]*rowMutation, startTs, commitTs uint64) {
			defer wg.Done()
			err := cli.batchCommitSecondaryRows([]byte(tbl), rMap, startTs, commitTs)
			if err != nil {
				// fail of secondary commit will not stop the commits of next
				// secondaries
				if isWrongRegionErr(err) {
					txn.client.CleanAllRegionCache()
					log.Warn("region info outdated when committing secondary rows, don't panic")
				}
			}
		}(txn.rpc, string(firstRowM.tbl), regionRowMap, txn.startTs, txn.commitTs)
	}
	if wait {
		wg.Wait()
	}
	return nil
}

func (txn *themisTxn) groupByRegion() (map[string]map[string]*rowMutation, error) {
	rsRowMap := make(map[string]map[string]*rowMutation)
	for _, rm := range txn.secondaryRows {
		region, err := txn.client.LocateRegion(rm.tbl, rm.row, true)
		if err != nil {
			return nil, errors.Trace(err)
		}
		key := getBatchGroupKey(region, string(rm.tbl))
		if _, exists := rsRowMap[key]; !exists {
			rsRowMap[key] = map[string]*rowMutation{}
		}
		rsRowMap[key][string(rm.row)] = rm
	}
	return rsRowMap, nil
}

func (txn *themisTxn) commitPrimary() error {
	if txn.conf.brokenCommitPrimaryTest {
		return txn.brokenCommitPrimary()
	}
	return txn.rpc.commitRow(txn.primary.Table, txn.primary.Row,
		txn.primaryRow.mutationList(false),
		txn.startTs, txn.commitTs, txn.primaryRowOffset)
}

func (txn *themisTxn) selectPrimaryAndSecondaries() {
	txn.secondary = nil
	for tblName, rowMutations := range txn.mutationCache.mutations {
		for _, rowMutation := range rowMutations {
			row := rowMutation.row
			findPrimaryInRow := false
			for i, mutation := range rowMutation.mutationList(true) {
				colcord := hbase.NewColumnCoordinate([]byte(tblName), row, mutation.Family, mutation.Qual)
				// set the first column as primary if primary is not set by user
				if txn.primaryRowOffset == -1 &&
					(txn.primary == nil || txn.primary.Equal(colcord)) {
					txn.primary = colcord
					txn.primaryRowOffset = i
					txn.primaryRow = rowMutation
					findPrimaryInRow = true
				} else {
					txn.secondary = append(txn.secondary, colcord)
				}
			}
			if !findPrimaryInRow {
				txn.secondaryRows = append(txn.secondaryRows, rowMutation)
			}
		}
	}

	// hook for test
	if bypass, _, _ := txn.hooks.afterChoosePrimaryAndSecondary(txn, nil); !bypass {
		return
	}

	if len(txn.secondaryRows) == 0 {
		txn.singleRowTxn = true
	}
	// construct secondary lock
	secondaryLock := txn.constructSecondaryLock(hbase.TypePut)
	if secondaryLock != nil {
		txn.secondaryLockBytes = secondaryLock.Encode()
	} else {
		txn.secondaryLockBytes = nil
	}
}

func (txn *themisTxn) constructSecondaryLock(typ hbase.Type) *themisSecondaryLock {
	if txn.primaryRow.getSize() <= 1 && len(txn.secondaryRows) == 0 {
		return nil
	}
	l := newThemisSecondaryLock()
	l.primaryCoordinate = txn.primary
	l.ts = txn.startTs
	// TODO set client addr
	return l
}

func (txn *themisTxn) constructPrimaryLock() *themisPrimaryLock {
	l := newThemisPrimaryLock()
	l.typ = txn.primaryRow.getType(txn.primary.Column)
	l.ts = txn.startTs
	for _, c := range txn.secondary {
		l.addSecondary(c, txn.mutationCache.getMutation(c).typ)
	}
	return l
}

func (txn *themisTxn) constructLockAndClean(tbl []byte, lockKvs []*hbase.Kv) error {
	locks, err := getLocksFromResults([]byte(tbl), lockKvs, txn.rpc)
	if err != nil {
		return errors.Trace(err)
	}
	for _, lock := range locks {
		err := txn.cleanLockWithRetry(lock)
		if err != nil {
			return errors.Trace(err)
		}
	}
	return nil
}

func (txn *themisTxn) tryToCleanLockAndGetAgain(tbl []byte, g *hbase.Get, lockKvs []*hbase.Kv) (*hbase.ResultRow, error) {
	// try to clean locks
	err := txn.constructLockAndClean(tbl, lockKvs)
	if err != nil {
		return nil, errors.Trace(err)
	}
	// get again, ignore lock
	r, err := txn.rpc.themisGet([]byte(tbl), g, txn.startTs, true)
	if err != nil {
		return nil, errors.Trace(err)
	}
	return r, nil
}

func (txn *themisTxn) commitSecondaryAndCleanLock(lock *themisSecondaryLock, commitTs uint64) error {
	cc := lock.Coordinate()
	mutation := &columnMutation{
		Column: &cc.Column,
		mutationValuePair: &mutationValuePair{
			typ: lock.typ,
		},
	}
	err := txn.rpc.commitSecondaryRow(cc.Table, cc.Row,
		[]*columnMutation{mutation}, lock.Timestamp(), commitTs)
	if err != nil {
		return errors.Trace(err)
	}
	return nil
}

func (txn *themisTxn) cleanLockWithRetry(lock Lock) error {
	for i := 0; i < maxCleanLockRetryCount; i++ {
		if exists, err := txn.lockCleaner.IsLockExists(lock.Coordinate(), 0, lock.Timestamp()); err != nil || !exists {
			return errors.Trace(err)
		}
		log.Warnf("lock exists txn: %v lock-txn: %v row: %q", txn.startTs, lock.Timestamp(), lock.Coordinate().Row)
		// try clean lock
		err := txn.tryToCleanLock(lock)
		if errorEqual(err, ErrLockNotExpired) {
			log.Warn("sleep a while, and retry clean lock", txn.startTs)
			// TODO(dongxu) use cleverer retry sleep time interval
			time.Sleep(pauseTime)
			continue
		} else if err != nil {
			return errors.Trace(err)
		}
		// lock cleaned successfully
		return nil
	}
	return ErrCleanLockFailed
}

func (txn *themisTxn) tryToCleanLock(lock Lock) error {
	// if it's secondary lock, first we'll check if its primary lock has been released.
	if lock.Role() == RoleSecondary {
		// get primary lock
		pl := lock.Primary()
		// check primary lock is exists
		exists, err := txn.lockCleaner.IsLockExists(pl.Coordinate(), 0, pl.Timestamp())
		if err != nil {
			return errors.Trace(err)
		}
		if !exists {
			// primary row is committed, commit this row
			cc := pl.Coordinate()
			commitTs, err := txn.lockCleaner.GetCommitTimestamp(cc, pl.Timestamp())
			if err != nil {
				return errors.Trace(err)
			}
			if commitTs > 0 {
				// if this transction has been committed
				log.Info("txn has been committed, ts:", commitTs, "prewriteTs:", pl.Timestamp())
				// commit secondary rows
				err := txn.commitSecondaryAndCleanLock(lock.(*themisSecondaryLock), commitTs)
				if err != nil {
					return errors.Trace(err)
				}
				return nil
			}
		}
	}
	expired, err := txn.rpc.checkAndSetLockIsExpired(lock)
	if err != nil {
		return errors.Trace(err)
	}
	// only clean expired lock
	if expired {
		// try to clean primary lock
		pl := lock.Primary()
		commitTs, cleanedLock, err := txn.lockCleaner.CleanLock(pl.Coordinate(), pl.Timestamp())
		if err != nil {
			return errors.Trace(err)
		}
		if cleanedLock != nil {
			pl = cleanedLock
		}
		log.Info("try clean secondary locks", pl.Timestamp())
		// clean secondary locks
		// erase lock and data if commitTs is 0; otherwise, commit it.
		for k, v := range pl.(*themisPrimaryLock).secondaries {
			cc := &hbase.ColumnCoordinate{}
			if err = cc.ParseFromString(k); err != nil {
				return errors.Trace(err)
			}
			if commitTs == 0 {
				// commitTs == 0, means clean primary lock successfully
				// expire trx havn't committed yet, we must delete lock and
				// dirty data
				err = txn.lockCleaner.EraseLockAndData(cc, pl.Timestamp())
				if err != nil {
					return errors.Trace(err)
				}
			} else {
				// primary row is committed, so we must commit other
				// secondary rows
				mutation := &columnMutation{
					Column: &cc.Column,
					mutationValuePair: &mutationValuePair{
						typ: v,
					},
				}
				err = txn.rpc.commitSecondaryRow(cc.Table, cc.Row,
					[]*columnMutation{mutation}, pl.Timestamp(), commitTs)
				if err != nil {
					return errors.Trace(err)
				}
			}
		}
	} else {
		return ErrLockNotExpired
	}
	return nil
}

func (txn *themisTxn) batchPrewriteSecondaryRowsWithLockClean(tbl []byte, rowMs map[string]*rowMutation) error {
	locks, err := txn.batchPrewriteSecondaryRows(tbl, rowMs)
	if err != nil {
		return errors.Trace(err)
	}

	// lock clean
	if locks != nil && len(locks) > 0 {
		// hook for test
		if bypass, _, err := txn.hooks.onSecondaryOccursLock(txn, locks); !bypass {
			return errors.Trace(err)
		}
		// try one more time after clean lock successfully
		for _, lock := range locks {
			err = txn.cleanLockWithRetry(lock)
			if err != nil {
				return errors.Trace(err)
			}

			// prewrite all secondary rows
			locks, err = txn.batchPrewriteSecondaryRows(tbl, rowMs)
			if err != nil {
				return errors.Trace(err)
			}
			if len(locks) > 0 {
				for _, l := range locks {
					log.Errorf("can't clean lock, column:%q; conflict lock: %+v, lock ts: %d", l.Coordinate(), l, l.Timestamp())
				}
				return ErrRetryable
			}
		}
	}
	return nil
}

func (txn *themisTxn) prewriteRowWithLockClean(tbl []byte, mutation *rowMutation, containPrimary bool) error {
	lock, err := txn.prewriteRow(tbl, mutation, containPrimary)
	if err != nil {
		return errors.Trace(err)
	}
	// lock clean
	if lock != nil {
		// hook for test
		if bypass, _, err := txn.hooks.beforePrewriteLockClean(txn, lock); !bypass {
			return errors.Trace(err)
		}
		err = txn.cleanLockWithRetry(lock)
		if err != nil {
			return errors.Trace(err)
		}
		// try one more time after clean lock successfully
		lock, err = txn.prewriteRow(tbl, mutation, containPrimary)
		if err != nil {
			return errors.Trace(err)
		}
		if lock != nil {
			log.Errorf("can't clean lock, column:%q; conflict lock: %+v, lock ts: %d", lock.Coordinate(), lock, lock.Timestamp())
			return ErrRetryable
		}
	}
	return nil
}

func (txn *themisTxn) batchPrewriteSecondaryRows(tbl []byte, rowMs map[string]*rowMutation) (map[string]Lock, error) {
	return txn.rpc.batchPrewriteSecondaryRows(tbl, rowMs, txn.startTs, txn.secondaryLockBytes)
}

func (txn *themisTxn) prewriteRow(tbl []byte, mutation *rowMutation, containPrimary bool) (Lock, error) {
	// hook for test
	if bypass, ret, err := txn.hooks.onPrewriteRow(txn, []interface{}{mutation, containPrimary}); !bypass {
		return ret.(Lock), errors.Trace(err)
	}
	if containPrimary {
		// try to get lock
		return txn.rpc.prewriteRow(tbl, mutation.row,
			mutation.mutationList(true),
			txn.startTs,
			txn.constructPrimaryLock().Encode(),
			txn.secondaryLockBytes, txn.primaryRowOffset)
	}
	return txn.rpc.prewriteSecondaryRow(tbl, mutation.row,
		mutation.mutationList(true),
		txn.startTs,
		txn.secondaryLockBytes)
}

func (txn *themisTxn) prewritePrimary() error {
	// hook for test
	if bypass, _, err := txn.hooks.beforePrewritePrimary(txn, nil); !bypass {
		return err
	}
	err := txn.prewriteRowWithLockClean(txn.primary.Table, txn.primaryRow, true)
	if err != nil {
		log.Debugf("prewrite primary %v %q failed: %v", txn.startTs, txn.primaryRow.row, err.Error())
		return errors.Trace(err)
	}
	log.Debugf("prewrite primary %v %q successfully", txn.startTs, txn.primaryRow.row)
	return nil
}

func (txn *themisTxn) prewriteSecondary() error {
	// hook for test
	if bypass, _, err := txn.hooks.beforePrewriteSecondary(txn, nil); !bypass {
		return err
	}
	if txn.conf.brokenPrewriteSecondaryTest {
		return txn.brokenPrewriteSecondary()
	}
	if txn.conf.ConcurrentPrewriteAndCommit {
		return txn.batchPrewriteSecondaries()
	}
	return txn.prewriteSecondarySync()
}

func (txn *themisTxn) prewriteSecondarySync() error {
	for i, mu := range txn.secondaryRows {
		err := txn.prewriteRowWithLockClean(mu.tbl, mu, false)
		if err != nil {
			// rollback
			txn.rollbackRow(txn.primaryRow.tbl, txn.primaryRow)
			txn.rollbackSecondaryRow(i)
			return errors.Trace(err)
		}
	}
	return nil
}

// just for test
func (txn *themisTxn) brokenCommitPrimary() error {
	// do nothing
	log.Warn("Simulating primary commit failed")
	return nil
}

// just for test
func (txn *themisTxn) brokenCommitSecondary() {
	// do nothing
	log.Warn("Simulating secondary commit failed")
}

func (txn *themisTxn) brokenPrewriteSecondary() error {
	log.Warn("Simulating prewrite secondary failed")
	for i, rm := range txn.secondaryRows {
		if i == len(txn.secondary)-1 {
			if !txn.conf.brokenPrewriteSecondaryAndRollbackTest {
				// simulating prewrite failed, need rollback
				txn.rollbackRow(txn.primaryRow.tbl, txn.primaryRow)
				txn.rollbackSecondaryRow(i)
			}
			// maybe rollback occurs error too
			return ErrSimulated
		}
		txn.prewriteRowWithLockClean(rm.tbl, rm, false)
	}
	return nil
}

func (txn *themisTxn) batchPrewriteSecondaries() error {
	wg := sync.WaitGroup{}
	//will batch prewrite all rows in a region
	rsRowMap, err := txn.groupByRegion()
	if err != nil {
		return errors.Trace(err)
	}

	errChan := make(chan error, len(rsRowMap))
	defer close(errChan)
	successChan := make(chan map[string]*rowMutation, len(rsRowMap))
	defer close(successChan)

	for _, regionRowMap := range rsRowMap {
		wg.Add(1)
		_, firstRowM := getFirstEntity(regionRowMap)
		go func(tbl []byte, rMap map[string]*rowMutation) {
			defer wg.Done()
			err := txn.batchPrewriteSecondaryRowsWithLockClean(tbl, rMap)
			if err != nil {
				errChan <- err
			} else {
				successChan <- rMap
			}
		}(firstRowM.tbl, regionRowMap)
	}
	wg.Wait()

	if len(errChan) != 0 {
		// occur error, clean success prewrite mutations
		log.Warnf("batch prewrite secondary rows error, rolling back %d %d", len(successChan), txn.startTs)
		txn.rollbackRow(txn.primaryRow.tbl, txn.primaryRow)
	L:
		for {
			select {
			case succMutMap := <-successChan:
				{
					for _, rowMut := range succMutMap {
						txn.rollbackRow(rowMut.tbl, rowMut)
					}
				}
			default:
				break L
			}
		}

		err := <-errChan
		if err != nil {
			log.Error("batch prewrite secondary rows error, txn:", txn.startTs, err)
		}
		return errors.Trace(err)
	}
	return nil
}

func getFirstEntity(rowMap map[string]*rowMutation) (string, *rowMutation) {
	for row, rowM := range rowMap {
		return row, rowM
	}
	return "", nil
}

func getBatchGroupKey(rInfo *hbase.RegionInfo, tblName string) string {
	return rInfo.Server + "_" + rInfo.Name
}

func (txn *themisTxn) rollbackRow(tbl []byte, mutation *rowMutation) error {
	l := fmt.Sprintf("\nrolling back %q %d {\n", mutation.row, txn.startTs)
	for _, v := range mutation.getColumns() {
		l += fmt.Sprintf("\t%s:%s\n", string(v.Family), string(v.Qual))
	}
	l += "}\n"
	log.Warn(l)
	for _, col := range mutation.getColumns() {
		cc := &hbase.ColumnCoordinate{
			Table:  tbl,
			Row:    mutation.row,
			Column: col,
		}
		err := txn.lockCleaner.EraseLockAndData(cc, txn.startTs)
		if err != nil {
			return errors.Trace(err)
		}
	}
	return nil
}

func (txn *themisTxn) rollbackSecondaryRow(successIndex int) error {
	for i := successIndex; i >= 0; i-- {
		r := txn.secondaryRows[i]
		err := txn.rollbackRow(r.tbl, r)
		if err != nil {
			return errors.Trace(err)
		}
	}
	return nil
}

func (txn *themisTxn) GetScanner(tbl []byte, startKey, endKey []byte, batchSize int) *ThemisScanner {
	scanner := newThemisScanner(tbl, txn, batchSize, txn.client)
	if startKey != nil {
		scanner.setStartRow(startKey)
	}
	if endKey != nil {
		scanner.setStopRow(endKey)
	}
	return scanner
}

func (txn *themisTxn) Release() {
	txn.primary = nil
	txn.primaryRow = nil
	txn.secondary = nil
	txn.secondaryRows = nil
	txn.startTs = 0
	txn.commitTs = 0
}

func (txn *themisTxn) String() string {
	return fmt.Sprintf("%d", txn.startTs)
}

func (txn *themisTxn) GetCommitTS() uint64 {
	return txn.commitTs
}

func (txn *themisTxn) GetStartTS() uint64 {
	return txn.startTs
}

func (txn *themisTxn) LockRow(tbl string, rowkey []byte) error {
	g := hbase.NewGet(rowkey)
	r, err := txn.Get(tbl, g)
	if err != nil {
		log.Warnf("get row error, table:%s, row:%q, error:%v", tbl, rowkey, err)
		return errors.Trace(err)
	}
	if r == nil {
		log.Warnf("has not data to lock, table:%s, row:%q", tbl, rowkey)
		return nil
	}
	for _, v := range r.Columns {
		txn.mutationCache.addMutation([]byte(tbl), rowkey, &v.Column, hbase.TypeMinimum, nil, true)
	}
	return nil
}