Skip to content

Feature request : SearchAfterPublisher #2666

@philkpler

Description

@philkpler

Hi,

Would be possible to add a publisher for search_after queries ?
Based on the existing ScrollPublisher it would require minor changes (as both approaches work the same way : one specific first query and following ones built the same way)

In PublishActor, replace the scrollId var by a searchAfter one

private var scrollId: String                = _

by

private var searchAfter: Seq[Any] = _

In the ready function, replace the pattern matching to handle first query vs next ones, first one is a simple query, next ones will include the search_after array :

Option(scrollId) match {
	case None     => client.execute(query).onComplete(result => self ! result)
        case Some(id) => client.execute(searchScroll(id) keepAlive keepAlive).onComplete(result => self ! result)
}

to

Option(searchAfter) match {
	case None => client.execute(query).onComplete(result => self ! result)
	case Some(sa) => {
		client.execute(query.searchAfter(sa)).onComplete(result => self ! result)
	}
}

In the fetching one, replace 2 last cases of the pattern matching (the next search_after array is extracted from the last hit) :

      // if we had no results from ES then we have nothing left to publish and our work here is done
    case Success(resp: RequestSuccess[SearchResponse]) if resp.result.isEmpty =>
      logger.debug("Response from ES came back empty; this means no more items upstream so will complete subscription")
      scrollId = resp.result.scrollId.getOrElse(scrollId)
      s.onComplete()
      logger.debug("Stopping publisher actor")
      context.stop(self)
    // more results and we can unleash the beast (stashed requests) and switch back to ready mode
    case Success(resp: RequestSuccess[SearchResponse]) =>
      scrollId = resp.result.scrollId.getOrError("Response did not include a scroll id")
      queue ++= resp.result.hits.hits
      context become ready
      unstashAll()

to

     // if we had no results from ES then we have nothing left to publish and our work here is done
     case Success(resp: RequestSuccess[SearchResponse]) if resp.result.isEmpty =>
      logger.debug("Response from ES came back empty; this means no more items upstream so will complete subscription")
      s.onComplete()
      logger.debug("Stopping publisher actor")
      context.stop(self)
    // more results and we can unleash the beast (stashed requests) and switch back to ready mode
    case Success(resp: RequestSuccess[SearchResponse]) =>
      val searchHits = resp.result.hits.hits
      searchAfter = searchHits(searchHits.length - 1).sort
        .map(_.asInstanceOf[Seq[String]])
        .getOrError("Response did not include a scroll id")
      queue ++= resp.result.hits.hits
      context become ready
      unstashAll()

And in the postStop one, no need to clear the scroll

I guess a single PaginatedPublisher class could handle both ways, scroll and search_after, according to a mode parameter.

What do you think ?

Regards

Philippe

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions