Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.sksamuel.elastic4s.akka.reactivestreams
import akka.actor.{Actor, ActorRefFactory, PoisonPill, Props, Stash}
import com.sksamuel.elastic4s.requests.searches.{SearchHit, SearchRequest, SearchResponse}
import com.sksamuel.elastic4s.akka.reactivestreams.PublishActor.Ready
import com.sksamuel.elastic4s.akka.reactivestreams.ReactiveElastic.{Impl, Scroll, SearchAfter}
import com.sksamuel.elastic4s.{ElasticClient, RequestFailure, RequestSuccess}
import com.sksamuel.elastic4s.ext.OptionImplicits.RichOption
import org.reactivestreams.{Publisher, Subscriber, Subscription}
Expand All @@ -12,9 +13,9 @@ import scala.collection.mutable
import scala.concurrent.Future
import scala.util.{Failure, Success}

/** An implementation of the reactive API Publisher, that publishes documents using an elasticsearch scroll cursor. The
* initial query must be provided to the publisher, and there are helpers to create a query for all documents in an
* index (and type).
/** An implementation of the reactive API Publisher, that publishes documents using an elasticsearch scroll/search_after
* cursor. The initial query must be provided to the publisher, and there are helpers to create a query for all
* documents in an index (and type).
*
* @param client
* a client for the cluster
Expand All @@ -25,15 +26,25 @@ import scala.util.{Failure, Success}
* @param actorRefFactory
* an Actor reference factory required by the publisher
*/
class ScrollPublisher private[reactivestreams] (client: ElasticClient[Future], search: SearchRequest, maxItems: Long)(
class PaginatedPublisher private[reactivestreams] (
client: ElasticClient[Future],
search: SearchRequest,
maxItems: Long,
mode: Impl = Scroll
)(
implicit actorRefFactory: ActorRefFactory
) extends Publisher[SearchHit] {
require(search.keepAlive.isDefined, "Search Definition must have a scroll to be used as Publisher")
require(
mode != Scroll || search.keepAlive.isDefined,
"Scroll mode requires a scroll/keepAlive to be used as Publisher"
)
require(mode != SearchAfter || search.sorts.nonEmpty, "SearchAfter mode requires at least one sort field")
require(mode != SearchAfter || search.keepAlive.isEmpty, "SearchAfter mode cannot be used with scroll")

override def subscribe(s: Subscriber[_ >: SearchHit]): Unit = {
// Rule 1.9 subscriber cannot be null
if (s == null) throw new NullPointerException("Rule 1.9: Subscriber cannot be null")
val subscription = new ScrollSubscription(client, search, s, maxItems)
val subscription = new PaginatedSubscription(client, search, s, maxItems, mode)
s.onSubscribe(subscription)
// rule 1.03 the subscription should not invoke any onNext's until the onSubscribe call has returned
// even tho the user might call request in the onSubscribe, we can't start sending the results yet.
Expand All @@ -42,11 +53,17 @@ class ScrollPublisher private[reactivestreams] (client: ElasticClient[Future], s
}
}

class ScrollSubscription(client: ElasticClient[Future], query: SearchRequest, s: Subscriber[_ >: SearchHit], max: Long)(
class PaginatedSubscription(
client: ElasticClient[Future],
query: SearchRequest,
s: Subscriber[_ >: SearchHit],
max: Long,
mode: Impl
)(
implicit actorRefFactory: ActorRefFactory
) extends Subscription {

private val actor = actorRefFactory.actorOf(Props(new PublishActor(client, query, s, max)))
private val actor = actorRefFactory.actorOf(Props(new PublishActor(client, query, s, max, mode)))

private[reactivestreams] def ready(): Unit =
actor ! PublishActor.Ready
Expand All @@ -70,8 +87,13 @@ object PublishActor {
case class Request(n: Long)
}

class PublishActor(client: ElasticClient[Future], query: SearchRequest, s: Subscriber[_ >: SearchHit], max: Long)
extends Actor
class PublishActor(
client: ElasticClient[Future],
query: SearchRequest,
s: Subscriber[_ >: SearchHit],
max: Long,
mode: Impl
) extends Actor
with Stash {

import com.sksamuel.elastic4s.ElasticDsl._
Expand All @@ -80,6 +102,7 @@ class PublishActor(client: ElasticClient[Future], query: SearchRequest, s: Subsc
protected val logger: Logger = LoggerFactory.getLogger(getClass.getName)

private var scrollId: String = _
private var searchAfter: Seq[Any] = _
private var processed: Long = 0L
private val queue: mutable.Queue[SearchHit] = mutable.Queue.empty

Expand All @@ -91,7 +114,7 @@ class PublishActor(client: ElasticClient[Future], query: SearchRequest, s: Subsc
// this ready method signals to the actor that its ok to start sending data. In the meantime we just stash requests.
override def receive: PartialFunction[Any, Unit] = {
case Ready =>
context become ready
context become ready(mode)
logger.info("Scroll publisher has become 'Ready'")
unstashAll()
case _ =>
Expand All @@ -112,21 +135,31 @@ class PublishActor(client: ElasticClient[Future], query: SearchRequest, s: Subsc
}

// ready is the standard state, we can service requests and request more from upstream as well
private def ready: Actor.Receive = {
private def ready(mode: Impl): Actor.Receive = {
// if a request comes in for more than is currently available,
// we will send a request for more while sending what we can now
case PublishActor.Request(n) if n > queue.size =>
val toRequest = n - queue.size
logger.debug(
s"Request for $n items, but only ${queue.size} available; sending ${queue.size} now, requesting $toRequest from upstream"
)
Option(scrollId) match {
case None => client.execute(query).onComplete(result => self ! result)
case Some(id) => client.execute(searchScroll(id) keepAlive keepAlive).onComplete(result => self ! result)
mode match {
case Scroll =>
Option(scrollId) match {
case None => client.execute(query).onComplete(result => self ! result)
case Some(id) => client.execute(searchScroll(id) keepAlive keepAlive).onComplete(result => self ! result)
}
case SearchAfter =>
Option(searchAfter) match {
case None => client.execute(query).onComplete(result => self ! result)
case Some(sa) =>
client.execute(query.searchAfter(sa)).onComplete(result => self ! result)
}
}

// we switch state while we're waiting on elasticsearch, so we know not to send another request to ES
// because we are using a scroll and can only have one active request at at time.
context become fetching
context become fetching(mode)
logger.info("Scroll publisher has become 'Fetching'")
// queue up a new request to handle the remaining ones required when the ES response comes in
self ! PublishActor.Request(toRequest)
Expand All @@ -138,7 +171,7 @@ class PublishActor(client: ElasticClient[Future], query: SearchRequest, s: Subsc
}

// fetching state is when we're waiting for a reply from es for a request we sent
private def fetching: Actor.Receive = {
private def fetching(mode: Impl): Actor.Receive = {
// if we're in fetching mode, its because we ran out of results to send
// so any requests must be stashed until a fresh batch arrives
case PublishActor.Request(n) =>
Expand All @@ -162,20 +195,33 @@ class PublishActor(client: ElasticClient[Future], query: SearchRequest, s: Subsc
// if we had no results from ES then we have nothing left to publish and our work here is done
case Success(resp: RequestSuccess[SearchResponse]) if resp.result.isEmpty =>
logger.debug("Response from ES came back empty; this means no more items upstream so will complete subscription")
scrollId = resp.result.scrollId.getOrElse(scrollId)
if (mode == Scroll) scrollId = resp.result.scrollId.getOrElse(scrollId)
s.onComplete()
logger.debug("Stopping publisher actor")
context.stop(self)
// more results and we can unleash the beast (stashed requests) and switch back to ready mode
case Success(resp: RequestSuccess[SearchResponse]) =>
scrollId = resp.result.scrollId.getOrError("Response did not include a scroll id")
mode match {
case Scroll => scrollId = resp.result.scrollId.getOrError("Response did not include a scroll id")
case SearchAfter => {
val searchHits = resp.result.hits.hits
if (searchHits.nonEmpty) {
val searchHits = resp.result.hits.hits
searchAfter = searchHits(searchHits.length - 1).sort
.map(_.asInstanceOf[Seq[String]])
.getOrError("Response did not include a scroll id")
}
}
}
queue ++= resp.result.hits.hits
context become ready
context become ready(mode)
unstashAll()
}

override def postStop(): Unit = {
super.postStop()
client.execute(clearScroll(scrollId))
if (mode == Scroll && scrollId != null) {
client.execute(clearScroll(scrollId))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import scala.concurrent.Future
import scala.concurrent.duration._

object ReactiveElastic {
sealed trait Impl
case object Scroll extends Impl
case object SearchAfter extends Impl

implicit class ReactiveElastic(client: ElasticClient[Future]) {

Expand Down Expand Up @@ -47,15 +50,21 @@ object ReactiveElastic {
subscriber(config)
}

def publisher(indexes: Indexes, elements: Long, keepAlive: String)(
def publisher(indexes: Indexes, elements: Long, keepAlive: String, mode: Impl)(
implicit actorRefFactory: ActorRefFactory
): ScrollPublisher =
publisher(search(indexes).query("*:*").scroll(keepAlive), elements)
): PaginatedPublisher =
publisher(search(indexes).query("*:*").scroll(keepAlive), elements, mode)

def publisher(q: SearchRequest)(implicit actorRefFactory: ActorRefFactory): ScrollPublisher =
publisher(q, Long.MaxValue)
def publisher(q: SearchRequest)(implicit actorRefFactory: ActorRefFactory): PaginatedPublisher =
publisher(q, Long.MaxValue, Scroll)

def publisher(q: SearchRequest, elements: Long)(implicit actorRefFactory: ActorRefFactory): ScrollPublisher =
new ScrollPublisher(client, q, elements)
def publisher(q: SearchRequest, mode: Impl)(implicit actorRefFactory: ActorRefFactory): PaginatedPublisher =
publisher(q, Long.MaxValue, mode)

def publisher(q: SearchRequest, elements: Long, mode: Impl = Scroll)(implicit
actorRefFactory: ActorRefFactory
): PaginatedPublisher = {
new PaginatedPublisher(client, q, elements, mode)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.scalatest.wordspec.AnyWordSpec

import scala.util.Try

class ScrollPublisherIntegrationTest extends AnyWordSpec with DockerTests with Matchers {
class PaginatedPublisherIntegrationTest extends AnyWordSpec with DockerTests with Matchers {

import ReactiveElastic._
import com.sksamuel.elastic4s.jackson.ElasticJackson.Implicits._
Expand Down Expand Up @@ -60,7 +60,7 @@ class ScrollPublisherIntegrationTest extends AnyWordSpec with DockerTests with M
}.await

"elastic-streams" should {
"publish all data from the index" in {
"publish all data from the index using Scroll Mode" in {

val publisher = client.publisher(search(indexName) query "*:*" scroll "1m")

Expand All @@ -77,5 +77,25 @@ class ScrollPublisherIntegrationTest extends AnyWordSpec with DockerTests with M
completionLatch.await(10, TimeUnit.SECONDS) shouldBe true
documentLatch.await(10, TimeUnit.SECONDS) shouldBe true
}

"publish all data from the index using SearchAfter mode" in {
val publisher = client.publisher(
search(indexName) query "*:*" sortBy fieldSort("_doc"),
SearchAfter
)

val completionLatch = new CountDownLatch(1)
val documentLatch = new CountDownLatch(emperors.length)

publisher.subscribe(new Subscriber[SearchHit] {
override def onComplete(): Unit = completionLatch.countDown()
override def onError(t: Throwable): Unit = fail(t)
override def onSubscribe(s: Subscription): Unit = s.request(1000)
override def onNext(t: SearchHit): Unit = documentLatch.countDown()
})

completionLatch.await(10, TimeUnit.SECONDS) shouldBe true
documentLatch.await(10, TimeUnit.SECONDS) shouldBe true
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.sksamuel.elastic4s.akka.reactivestreams

import akka.actor.ActorSystem
import com.sksamuel.elastic4s.jackson.ElasticJackson
import com.sksamuel.elastic4s.requests.searches.SearchHit
import com.sksamuel.elastic4s.testkit.DockerTests
import org.reactivestreams.Publisher
import org.reactivestreams.tck.{PublisherVerification, TestEnvironment}
import org.scalatestplus.testng.TestNGSuiteLike

import scala.util.Try

class PaginatedPublisherSearchAfterVerificationTest
extends PublisherVerification[SearchHit](
new TestEnvironment(DEFAULT_TIMEOUT_MILLIS),
PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS
) with TestNGSuiteLike with DockerTests {

import ElasticJackson.Implicits._
import ReactiveElastic._

implicit val system: ActorSystem = ActorSystem()

Try {
client.execute {
deleteIndex("searchafterpubver")
}.await
}

Try {
client.execute {
createIndex("searchafterpubver")
}.await
}

client.execute {
bulk(
indexInto("searchafterpubver") source Empire("Parthian", "Persia", "Ctesiphon"),
indexInto("searchafterpubver") source Empire("Ptolemaic", "Egypt", "Alexandria"),
indexInto("searchafterpubver") source Empire("British", "Worldwide", "London"),
indexInto("searchafterpubver") source Empire("Achaemenid", "Persia", "Babylon"),
indexInto("searchafterpubver") source Empire("Sasanian", "Persia", "Ctesiphon"),
indexInto("searchafterpubver") source Empire("Mongol", "East Asia", "Avarga"),
indexInto("searchafterpubver") source Empire("Roman", "Mediterranean", "Rome"),
indexInto("searchafterpubver") source Empire("Sumerian", "Mesopotamia", "Uruk"),
indexInto("searchafterpubver") source Empire("Klingon", "Space", "Kronos"),
indexInto("searchafterpubver") source Empire("Romulan", "Space", "Romulus"),
indexInto("searchafterpubver") source Empire("Cardassian", "Space", "Cardassia Prime"),
indexInto("searchafterpubver") source Empire("Egyptian", "Egypt", "Memphis"),
indexInto("searchafterpubver") source Empire("Babylonian", "Levant", "Babylon")
).refreshImmediately
}.await

private val query = search("searchafterpubver").matchAllQuery().sortBy(fieldSort("_doc")).limit(2)

override def boundedDepthOfOnNextAndRequestRecursion: Long = 2L

override def createFailedPublisher(): Publisher[SearchHit] = null

override def createPublisher(elements: Long): Publisher[SearchHit] = {
new PaginatedPublisher(client, query, elements, SearchAfter)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.sksamuel.elastic4s.akka.reactivestreams

import akka.actor.ActorSystem
import com.sksamuel.elastic4s.testkit.DockerTests
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

class PaginatedPublisherUnitTest extends AnyWordSpec with Matchers with DockerTests {

import ReactiveElastic._

implicit val system: ActorSystem = ActorSystem()

"elastic-streams" should {
"throw exception if search definition has no scroll" in {
an[IllegalArgumentException] should be thrownBy
client.publisher(search("scrollpubint") query "*:*")
}
"throw exception if search definition has no sort for SearchAfter mode" in {
an[IllegalArgumentException] should be thrownBy
client.publisher(search("scrollpubint") query "*:*", SearchAfter)
}
"throw exception if search definition uses scroll with SearchAfter mode" in {
an[IllegalArgumentException] should be thrownBy
client.publisher(search("scrollpubint") query "*:*" scroll "1m" sortBy fieldSort("_doc"), SearchAfter)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.scalatestplus.testng.TestNGSuiteLike

import scala.util.Try

class ScrollPublisherVerificationTest
class PaginatedPublisherVerificationTest
extends PublisherVerification[SearchHit](
new TestEnvironment(DEFAULT_TIMEOUT_MILLIS),
PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS
Expand Down Expand Up @@ -57,7 +57,7 @@ class ScrollPublisherVerificationTest
override def createFailedPublisher(): Publisher[SearchHit] = null

override def createPublisher(elements: Long): Publisher[SearchHit] = {
new ScrollPublisher(client, query, elements)
new PaginatedPublisher(client, query, elements)
}
}

Expand Down
Loading