You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
271 lines
7.7 KiB
271 lines
7.7 KiB
9 years ago
|
// Copyright 2012-2015 Oliver Eilhard. All rights reserved.
|
||
|
// Use of this source code is governed by a MIT-license.
|
||
|
// See http://olivere.mit-license.org/license.txt for details.
|
||
|
|
||
|
package elastic
|
||
|
|
||
|
import (
|
||
|
"encoding/json"
|
||
|
"errors"
|
||
|
)
|
||
|
|
||
|
// Reindexer simplifies the process of reindexing an index. You typically
|
||
|
// reindex a source index to a target index. However, you can also specify
|
||
|
// a query that filters out documents from the source index before bulk
|
||
|
// indexing them into the target index. The caller may also specify a
|
||
|
// different client for the target, e.g. when copying indices from one
|
||
|
// Elasticsearch cluster to another.
|
||
|
//
|
||
|
// Internally, the Reindex users a scan and scroll operation on the source
|
||
|
// index and bulk indexing to push data into the target index.
|
||
|
//
|
||
|
// By default the reindexer fetches the _source, _parent, and _routing
|
||
|
// attributes from the source index, using the provided CopyToTargetIndex
|
||
|
// will copy those attributes into the destinationIndex.
|
||
|
// This behaviour can be overridden by setting the ScanFields and providing a
|
||
|
// custom ReindexerFunc.
|
||
|
//
|
||
|
// The caller is responsible for setting up and/or clearing the target index
|
||
|
// before starting the reindex process.
|
||
|
//
|
||
|
// See http://www.elastic.co/guide/en/elasticsearch/guide/current/reindex.html
|
||
|
// for more information about reindexing.
|
||
|
type Reindexer struct {
|
||
|
sourceClient, targetClient *Client
|
||
|
sourceIndex string
|
||
|
query Query
|
||
|
scanFields []string
|
||
|
bulkSize int
|
||
|
size int
|
||
|
scroll string
|
||
|
reindexerFunc ReindexerFunc
|
||
|
progress ReindexerProgressFunc
|
||
|
statsOnly bool
|
||
|
}
|
||
|
|
||
|
// A ReindexerFunc receives each hit from the sourceIndex.
|
||
|
// It can choose to add any number of BulkableRequests to the bulkService.
|
||
|
type ReindexerFunc func(hit *SearchHit, bulkService *BulkService) error
|
||
|
|
||
|
// CopyToTargetIndex returns a ReindexerFunc that copies the SearchHit's
|
||
|
// _source, _parent, and _routing attributes into the targetIndex
|
||
|
func CopyToTargetIndex(targetIndex string) ReindexerFunc {
|
||
|
return func(hit *SearchHit, bulkService *BulkService) error {
|
||
|
// TODO(oe) Do we need to deserialize here?
|
||
|
source := make(map[string]interface{})
|
||
|
if err := json.Unmarshal(*hit.Source, &source); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
req := NewBulkIndexRequest().Index(targetIndex).Type(hit.Type).Id(hit.Id).Doc(source)
|
||
|
if hit.Parent != "" {
|
||
|
req = req.Parent(hit.Parent)
|
||
|
}
|
||
|
if hit.Routing != "" {
|
||
|
req = req.Routing(hit.Routing)
|
||
|
}
|
||
|
bulkService.Add(req)
|
||
|
return nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// ReindexerProgressFunc is a callback that can be used with Reindexer
|
||
|
// to report progress while reindexing data.
|
||
|
type ReindexerProgressFunc func(current, total int64)
|
||
|
|
||
|
// ReindexerResponse is returned from the Do func in a Reindexer.
|
||
|
// By default, it returns the number of succeeded and failed bulk operations.
|
||
|
// To return details about all failed items, set StatsOnly to false in
|
||
|
// Reindexer.
|
||
|
type ReindexerResponse struct {
|
||
|
Success int64
|
||
|
Failed int64
|
||
|
Errors []*BulkResponseItem
|
||
|
}
|
||
|
|
||
|
// NewReindexer returns a new Reindexer.
|
||
|
func NewReindexer(client *Client, source string, reindexerFunc ReindexerFunc) *Reindexer {
|
||
|
return &Reindexer{
|
||
|
sourceClient: client,
|
||
|
sourceIndex: source,
|
||
|
reindexerFunc: reindexerFunc,
|
||
|
statsOnly: true,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// TargetClient specifies a different client for the target. This is
|
||
|
// necessary when the target index is in a different Elasticsearch cluster.
|
||
|
// By default, the source and target clients are the same.
|
||
|
func (ix *Reindexer) TargetClient(c *Client) *Reindexer {
|
||
|
ix.targetClient = c
|
||
|
return ix
|
||
|
}
|
||
|
|
||
|
// Query specifies the query to apply to the source. It filters out those
|
||
|
// documents to be indexed into target. A nil query does not filter out any
|
||
|
// documents.
|
||
|
func (ix *Reindexer) Query(q Query) *Reindexer {
|
||
|
ix.query = q
|
||
|
return ix
|
||
|
}
|
||
|
|
||
|
// ScanFields specifies the fields the scan query should load.
|
||
|
// The default fields are _source, _parent, _routing.
|
||
|
func (ix *Reindexer) ScanFields(scanFields ...string) *Reindexer {
|
||
|
ix.scanFields = scanFields
|
||
|
return ix
|
||
|
}
|
||
|
|
||
|
// BulkSize returns the number of documents to send to Elasticsearch per chunk.
|
||
|
// The default is 500.
|
||
|
func (ix *Reindexer) BulkSize(bulkSize int) *Reindexer {
|
||
|
ix.bulkSize = bulkSize
|
||
|
return ix
|
||
|
}
|
||
|
|
||
|
// Size is the number of results to return per shard, not per request.
|
||
|
// So a size of 10 which hits 5 shards will return a maximum of 50 results
|
||
|
// per scan request.
|
||
|
func (ix *Reindexer) Size(size int) *Reindexer {
|
||
|
ix.size = size
|
||
|
return ix
|
||
|
}
|
||
|
|
||
|
// Scroll specifies for how long the scroll operation on the source index
|
||
|
// should be maintained. The default is 5m.
|
||
|
func (ix *Reindexer) Scroll(timeout string) *Reindexer {
|
||
|
ix.scroll = timeout
|
||
|
return ix
|
||
|
}
|
||
|
|
||
|
// Progress indicates a callback that will be called while indexing.
|
||
|
func (ix *Reindexer) Progress(f ReindexerProgressFunc) *Reindexer {
|
||
|
ix.progress = f
|
||
|
return ix
|
||
|
}
|
||
|
|
||
|
// StatsOnly indicates whether the Do method should return details e.g. about
|
||
|
// the documents that failed while indexing. It is true by default, i.e. only
|
||
|
// the number of documents that succeeded/failed are returned. Set to false
|
||
|
// if you want all the details.
|
||
|
func (ix *Reindexer) StatsOnly(statsOnly bool) *Reindexer {
|
||
|
ix.statsOnly = statsOnly
|
||
|
return ix
|
||
|
}
|
||
|
|
||
|
// Do starts the reindexing process.
|
||
|
func (ix *Reindexer) Do() (*ReindexerResponse, error) {
|
||
|
if ix.sourceClient == nil {
|
||
|
return nil, errors.New("no source client")
|
||
|
}
|
||
|
if ix.sourceIndex == "" {
|
||
|
return nil, errors.New("no source index")
|
||
|
}
|
||
|
if ix.targetClient == nil {
|
||
|
ix.targetClient = ix.sourceClient
|
||
|
}
|
||
|
if ix.scanFields == nil {
|
||
|
ix.scanFields = []string{"_source", "_parent", "_routing"}
|
||
|
}
|
||
|
if ix.bulkSize <= 0 {
|
||
|
ix.bulkSize = 500
|
||
|
}
|
||
|
if ix.scroll == "" {
|
||
|
ix.scroll = "5m"
|
||
|
}
|
||
|
|
||
|
// Count total to report progress (if necessary)
|
||
|
var err error
|
||
|
var current, total int64
|
||
|
if ix.progress != nil {
|
||
|
total, err = ix.count()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Prepare scan and scroll to iterate through the source index
|
||
|
scanner := ix.sourceClient.Scan(ix.sourceIndex).Scroll(ix.scroll).Fields(ix.scanFields...)
|
||
|
if ix.query != nil {
|
||
|
scanner = scanner.Query(ix.query)
|
||
|
}
|
||
|
if ix.size > 0 {
|
||
|
scanner = scanner.Size(ix.size)
|
||
|
}
|
||
|
cursor, err := scanner.Do()
|
||
|
|
||
|
bulk := ix.targetClient.Bulk()
|
||
|
|
||
|
ret := &ReindexerResponse{
|
||
|
Errors: make([]*BulkResponseItem, 0),
|
||
|
}
|
||
|
|
||
|
// Main loop iterates through the source index and bulk indexes into target.
|
||
|
for {
|
||
|
docs, err := cursor.Next()
|
||
|
if err == EOS {
|
||
|
break
|
||
|
}
|
||
|
if err != nil {
|
||
|
return ret, err
|
||
|
}
|
||
|
|
||
|
if docs.TotalHits() > 0 {
|
||
|
for _, hit := range docs.Hits.Hits {
|
||
|
if ix.progress != nil {
|
||
|
current++
|
||
|
ix.progress(current, total)
|
||
|
}
|
||
|
|
||
|
err := ix.reindexerFunc(hit, bulk)
|
||
|
if err != nil {
|
||
|
return ret, err
|
||
|
}
|
||
|
|
||
|
if bulk.NumberOfActions() >= ix.bulkSize {
|
||
|
bulk, err = ix.commit(bulk, ret)
|
||
|
if err != nil {
|
||
|
return ret, err
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Final flush
|
||
|
if bulk.NumberOfActions() > 0 {
|
||
|
bulk, err = ix.commit(bulk, ret)
|
||
|
if err != nil {
|
||
|
return ret, err
|
||
|
}
|
||
|
bulk = nil
|
||
|
}
|
||
|
|
||
|
return ret, nil
|
||
|
}
|
||
|
|
||
|
// count returns the number of documents in the source index.
|
||
|
// The query is taken into account, if specified.
|
||
|
func (ix *Reindexer) count() (int64, error) {
|
||
|
service := ix.sourceClient.Count(ix.sourceIndex)
|
||
|
if ix.query != nil {
|
||
|
service = service.Query(ix.query)
|
||
|
}
|
||
|
return service.Do()
|
||
|
}
|
||
|
|
||
|
// commit commits a bulk, updates the stats, and returns a fresh bulk service.
|
||
|
func (ix *Reindexer) commit(bulk *BulkService, ret *ReindexerResponse) (*BulkService, error) {
|
||
|
bres, err := bulk.Do()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
ret.Success += int64(len(bres.Succeeded()))
|
||
|
failed := bres.Failed()
|
||
|
ret.Failed += int64(len(failed))
|
||
|
if !ix.statsOnly {
|
||
|
ret.Errors = append(ret.Errors, failed...)
|
||
|
}
|
||
|
bulk = ix.targetClient.Bulk()
|
||
|
return bulk, nil
|
||
|
}
|