// Copyright 2012-2015 Oliver Eilhard. All rights reserved. // Use of this source code is governed by a MIT-license. // See http://olivere.mit-license.org/license.txt for details. package elastic import ( "errors" "fmt" "net/url" "strings" "gopkg.in/olivere/elastic.v3/uritemplates" ) const ( defaultKeepAlive = "5m" ) var ( // End of stream (or scan) EOS = errors.New("EOS") // No ScrollId ErrNoScrollId = errors.New("no scrollId") ) // ScanService manages a cursor through documents in Elasticsearch. type ScanService struct { client *Client indices []string types []string keepAlive string searchSource *SearchSource pretty bool routing string preference string size *int } // NewScanService creates a new service to iterate through the results // of a query. func NewScanService(client *Client) *ScanService { builder := &ScanService{ client: client, searchSource: NewSearchSource().Query(NewMatchAllQuery()), } return builder } // Index sets the name(s) of the index to use for scan. func (s *ScanService) Index(indices ...string) *ScanService { if s.indices == nil { s.indices = make([]string, 0) } s.indices = append(s.indices, indices...) return s } // Types allows to restrict the scan to a list of types. func (s *ScanService) Type(types ...string) *ScanService { if s.types == nil { s.types = make([]string, 0) } s.types = append(s.types, types...) return s } // Scroll is an alias for KeepAlive, the time to keep // the cursor alive (e.g. "5m" for 5 minutes). func (s *ScanService) Scroll(keepAlive string) *ScanService { s.keepAlive = keepAlive return s } // KeepAlive sets the maximum time the cursor will be // available before expiration (e.g. "5m" for 5 minutes). func (s *ScanService) KeepAlive(keepAlive string) *ScanService { s.keepAlive = keepAlive return s } // Fields tells Elasticsearch to only load specific fields from a search hit. // See http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-fields.html. func (s *ScanService) Fields(fields ...string) *ScanService { s.searchSource = s.searchSource.Fields(fields...) return s } // SearchSource sets the search source builder to use with this service. func (s *ScanService) SearchSource(searchSource *SearchSource) *ScanService { s.searchSource = searchSource if s.searchSource == nil { s.searchSource = NewSearchSource().Query(NewMatchAllQuery()) } return s } // Routing allows for (a comma-separated) list of specific routing values. func (s *ScanService) Routing(routings ...string) *ScanService { s.routing = strings.Join(routings, ",") return s } // Preference specifies the node or shard the operation should be // performed on (default: "random"). func (s *ScanService) Preference(preference string) *ScanService { s.preference = preference return s } // Query sets the query to perform, e.g. MatchAllQuery. func (s *ScanService) Query(query Query) *ScanService { s.searchSource = s.searchSource.Query(query) return s } // PostFilter is executed as the last filter. It only affects the // search hits but not facets. See // http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-post-filter.html // for details. func (s *ScanService) PostFilter(postFilter Query) *ScanService { s.searchSource = s.searchSource.PostFilter(postFilter) return s } // FetchSource indicates whether the response should contain the stored // _source for every hit. func (s *ScanService) FetchSource(fetchSource bool) *ScanService { s.searchSource = s.searchSource.FetchSource(fetchSource) return s } // FetchSourceContext indicates how the _source should be fetched. func (s *ScanService) FetchSourceContext(fetchSourceContext *FetchSourceContext) *ScanService { s.searchSource = s.searchSource.FetchSourceContext(fetchSourceContext) return s } // Version can be set to true to return a version for each search hit. // See http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-version.html. func (s *ScanService) Version(version bool) *ScanService { s.searchSource = s.searchSource.Version(version) return s } // Sort the results by the given field, in the given order. // Use the alternative SortWithInfo to use a struct to define the sorting. // See http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-sort.html // for detailed documentation of sorting. func (s *ScanService) Sort(field string, ascending bool) *ScanService { s.searchSource = s.searchSource.Sort(field, ascending) return s } // SortWithInfo defines how to sort results. // Use the Sort func for a shortcut. // See http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-sort.html // for detailed documentation of sorting. func (s *ScanService) SortWithInfo(info SortInfo) *ScanService { s.searchSource = s.searchSource.SortWithInfo(info) return s } // SortBy defines how to sort results. // Use the Sort func for a shortcut. // See http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-sort.html // for detailed documentation of sorting. func (s *ScanService) SortBy(sorter ...Sorter) *ScanService { s.searchSource = s.searchSource.SortBy(sorter...) return s } // Pretty enables the caller to indent the JSON output. func (s *ScanService) Pretty(pretty bool) *ScanService { s.pretty = pretty return s } // Size is the number of results to return per shard, not per request. // So a size of 10 which hits 5 shards will return a maximum of 50 results // per scan request. func (s *ScanService) Size(size int) *ScanService { s.size = &size return s } // Do executes the query and returns a "server-side cursor". func (s *ScanService) Do() (*ScanCursor, error) { // Build url path := "/" // Indices part indexPart := make([]string, 0) for _, index := range s.indices { index, err := uritemplates.Expand("{index}", map[string]string{ "index": index, }) if err != nil { return nil, err } indexPart = append(indexPart, index) } if len(indexPart) > 0 { path += strings.Join(indexPart, ",") } // Types typesPart := make([]string, 0) for _, typ := range s.types { typ, err := uritemplates.Expand("{type}", map[string]string{ "type": typ, }) if err != nil { return nil, err } typesPart = append(typesPart, typ) } if len(typesPart) > 0 { path += "/" + strings.Join(typesPart, ",") } // Search path += "/_search" // Parameters params := make(url.Values) if !s.searchSource.hasSort() { // TODO: ES 2.1 deprecates search_type=scan. See https://www.elastic.co/guide/en/elasticsearch/reference/current/breaking_21_search_changes.html#_literal_search_type_scan_literal_deprecated. params.Set("search_type", "scan") } if s.pretty { params.Set("pretty", fmt.Sprintf("%v", s.pretty)) } if s.keepAlive != "" { params.Set("scroll", s.keepAlive) } else { params.Set("scroll", defaultKeepAlive) } if s.size != nil && *s.size > 0 { params.Set("size", fmt.Sprintf("%d", *s.size)) } if s.routing != "" { params.Set("routing", s.routing) } // Get response body, err := s.searchSource.Source() if err != nil { return nil, err } res, err := s.client.PerformRequest("POST", path, params, body) if err != nil { return nil, err } // Return result searchResult := new(SearchResult) if err := s.client.decoder.Decode(res.Body, searchResult); err != nil { return nil, err } cursor := NewScanCursor(s.client, s.keepAlive, s.pretty, searchResult) return cursor, nil } // scanCursor represents a single page of results from // an Elasticsearch Scan operation. type ScanCursor struct { Results *SearchResult client *Client keepAlive string pretty bool currentPage int } // newScanCursor returns a new initialized instance // of scanCursor. func NewScanCursor(client *Client, keepAlive string, pretty bool, searchResult *SearchResult) *ScanCursor { return &ScanCursor{ client: client, keepAlive: keepAlive, pretty: pretty, Results: searchResult, } } // TotalHits is a convenience method that returns the number // of hits the cursor will iterate through. func (c *ScanCursor) TotalHits() int64 { if c.Results.Hits == nil { return 0 } return c.Results.Hits.TotalHits } // Next returns the next search result or nil when all // documents have been scanned. // // Usage: // // for { // res, err := cursor.Next() // if err == elastic.EOS { // // End of stream (or scan) // break // } // if err != nil { // // Handle error // } // // Work with res // } // func (c *ScanCursor) Next() (*SearchResult, error) { if c.currentPage > 0 { if c.Results.Hits == nil || len(c.Results.Hits.Hits) == 0 || c.Results.Hits.TotalHits == 0 { return nil, EOS } } if c.Results.ScrollId == "" { return nil, EOS } // Build url path := "/_search/scroll" // Parameters params := make(url.Values) if c.pretty { params.Set("pretty", fmt.Sprintf("%v", c.pretty)) } if c.keepAlive != "" { params.Set("scroll", c.keepAlive) } else { params.Set("scroll", defaultKeepAlive) } // Set body body := c.Results.ScrollId // Get response res, err := c.client.PerformRequest("POST", path, params, body) if err != nil { return nil, err } // Return result c.Results = &SearchResult{ScrollId: body} if err := c.client.decoder.Decode(res.Body, c.Results); err != nil { return nil, err } c.currentPage += 1 return c.Results, nil }