-
Notifications
You must be signed in to change notification settings - Fork 687
Feature request : SearchAfterPublisher #2666
Description
Hi,
Would be possible to add a publisher for search_after queries ?
Based on the existing ScrollPublisher it would require minor changes (as both approaches work the same way : one specific first query and following ones built the same way)
In PublishActor, replace the scrollId var by a searchAfter one
private var scrollId: String = _
by
private var searchAfter: Seq[Any] = _
In the ready function, replace the pattern matching to handle first query vs next ones, first one is a simple query, next ones will include the search_after array :
Option(scrollId) match {
case None => client.execute(query).onComplete(result => self ! result)
case Some(id) => client.execute(searchScroll(id) keepAlive keepAlive).onComplete(result => self ! result)
}
to
Option(searchAfter) match {
case None => client.execute(query).onComplete(result => self ! result)
case Some(sa) => {
client.execute(query.searchAfter(sa)).onComplete(result => self ! result)
}
}
In the fetching one, replace 2 last cases of the pattern matching (the next search_after array is extracted from the last hit) :
// if we had no results from ES then we have nothing left to publish and our work here is done
case Success(resp: RequestSuccess[SearchResponse]) if resp.result.isEmpty =>
logger.debug("Response from ES came back empty; this means no more items upstream so will complete subscription")
scrollId = resp.result.scrollId.getOrElse(scrollId)
s.onComplete()
logger.debug("Stopping publisher actor")
context.stop(self)
// more results and we can unleash the beast (stashed requests) and switch back to ready mode
case Success(resp: RequestSuccess[SearchResponse]) =>
scrollId = resp.result.scrollId.getOrError("Response did not include a scroll id")
queue ++= resp.result.hits.hits
context become ready
unstashAll()
to
// if we had no results from ES then we have nothing left to publish and our work here is done
case Success(resp: RequestSuccess[SearchResponse]) if resp.result.isEmpty =>
logger.debug("Response from ES came back empty; this means no more items upstream so will complete subscription")
s.onComplete()
logger.debug("Stopping publisher actor")
context.stop(self)
// more results and we can unleash the beast (stashed requests) and switch back to ready mode
case Success(resp: RequestSuccess[SearchResponse]) =>
val searchHits = resp.result.hits.hits
searchAfter = searchHits(searchHits.length - 1).sort
.map(_.asInstanceOf[Seq[String]])
.getOrError("Response did not include a scroll id")
queue ++= resp.result.hits.hits
context become ready
unstashAll()
And in the postStop one, no need to clear the scroll
I guess a single PaginatedPublisher class could handle both ways, scroll and search_after, according to a mode parameter.
What do you think ?
Regards
Philippe