diff --git a/elastic4s-core/src/test/scala/com/sksamuel/elastic4s/requests/update/UpdateByQueryHandlerTest.scala b/elastic4s-core/src/test/scala/com/sksamuel/elastic4s/requests/update/UpdateByQueryHandlerTest.scala new file mode 100644 index 0000000000..89541fd127 --- /dev/null +++ b/elastic4s-core/src/test/scala/com/sksamuel/elastic4s/requests/update/UpdateByQueryHandlerTest.scala @@ -0,0 +1,32 @@ +package com.sksamuel.elastic4s.requests.update + +import com.sksamuel.elastic4s.handlers.update.UpdateHandlers +import com.sksamuel.elastic4s.requests.common.Preference.Shards +import com.sksamuel.elastic4s.requests.searches.queries.matches.MatchAllQuery +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class UpdateByQueryHandlerTest extends AnyFlatSpec with Matchers with UpdateHandlers { + + it should "build preference parameter with ShardsPreferenceRequest" in { + val request = UpdateByQueryAsyncRequest( + indexes = "test_index", + query = MatchAllQuery() + ).preference(Shards(List("0", "1", "2", "3"))) + + val elasticRequest = AsyncUpdateByQueryHandler.build(request) + + elasticRequest.params should contain("preference" -> "_shards:0,1,2,3") + } + + it should "not include preference parameter when shards is None" in { + val request = UpdateByQueryAsyncRequest( + indexes = "test_index", + query = MatchAllQuery() + ) + + val elasticRequest = AsyncUpdateByQueryHandler.build(request) + + elasticRequest.params should not contain key("preference") + } +} diff --git a/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/update/UpdateByQueryAsyncRequest.scala b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/update/UpdateByQueryAsyncRequest.scala index e2b2bd1e72..dc0186d099 100644 --- a/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/update/UpdateByQueryAsyncRequest.scala +++ b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/update/UpdateByQueryAsyncRequest.scala @@ -3,9 +3,10 @@ package com.sksamuel.elastic4s.requests.update import com.sksamuel.elastic4s.Indexes import com.sksamuel.elastic4s.ext.OptionImplicits._ import com.sksamuel.elastic4s.requests.admin.IndicesOptionsRequest -import com.sksamuel.elastic4s.requests.common.{RefreshPolicy, Slice, Slicing} +import com.sksamuel.elastic4s.requests.common.{Preference, RefreshPolicy, Slice, Slicing} import com.sksamuel.elastic4s.requests.script.Script import com.sksamuel.elastic4s.requests.searches.queries.Query + import scala.concurrent.duration.FiniteDuration case class UpdateByQueryAsyncRequest( @@ -27,7 +28,8 @@ case class UpdateByQueryAsyncRequest( shouldStoreResult: Option[Boolean] = None, size: Option[Int] = None, indicesOptions: Option[IndicesOptionsRequest] = None, - routing: Option[String] = None + routing: Option[String] = None, + preference: Option[String] = None ) extends BaseUpdateByQueryRequest { def proceedOnConflicts(proceedOnConflicts: Boolean): UpdateByQueryAsyncRequest = @@ -73,5 +75,8 @@ case class UpdateByQueryAsyncRequest( def indicesOptions(options: IndicesOptionsRequest): UpdateByQueryAsyncRequest = copy(indicesOptions = options.some) + def preference(pref: Preference): UpdateByQueryAsyncRequest = preference(pref.value) + def preference(pref: String): UpdateByQueryAsyncRequest = copy(preference = pref.some) + override val waitForCompletion: Option[Boolean] = Some(false) } diff --git a/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/update/UpdateByQueryRequest.scala b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/update/UpdateByQueryRequest.scala index a74cf1a120..dd2b5678fc 100644 --- a/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/update/UpdateByQueryRequest.scala +++ b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/update/UpdateByQueryRequest.scala @@ -1,7 +1,7 @@ package com.sksamuel.elastic4s.requests.update import com.sksamuel.elastic4s.Indexes -import com.sksamuel.elastic4s.requests.common.{RefreshPolicy, Slice, Slicing} +import com.sksamuel.elastic4s.requests.common.{Preference, RefreshPolicy, Slice, Slicing} import com.sksamuel.elastic4s.requests.script.Script import com.sksamuel.elastic4s.requests.searches.queries.Query import com.sksamuel.elastic4s.ext.OptionImplicits._ @@ -49,6 +49,8 @@ trait BaseUpdateByQueryRequest { val indicesOptions: Option[IndicesOptionsRequest] val routing: Option[String] + + val preference: Option[String] } case class UpdateByQueryRequest( indexes: Indexes, @@ -71,7 +73,8 @@ case class UpdateByQueryRequest( shouldStoreResult: Option[Boolean] = None, size: Option[Int] = None, indicesOptions: Option[IndicesOptionsRequest] = None, - routing: Option[String] = None + routing: Option[String] = None, + preference: Option[String] = None ) extends BaseUpdateByQueryRequest { def proceedOnConflicts(proceedOnConflicts: Boolean): UpdateByQueryRequest = @@ -119,4 +122,7 @@ case class UpdateByQueryRequest( copy(shouldStoreResult = shouldStoreResult.some) def indicesOptions(options: IndicesOptionsRequest): UpdateByQueryRequest = copy(indicesOptions = options.some) + + def preference(pref: Preference): UpdateByQueryRequest = preference(pref.value) + def preference(pref: String): UpdateByQueryRequest = copy(preference = pref.some) } diff --git a/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/update/UpdateHandlers.scala b/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/update/UpdateHandlers.scala index 49138c7f93..384a61833b 100644 --- a/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/update/UpdateHandlers.scala +++ b/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/update/UpdateHandlers.scala @@ -109,6 +109,7 @@ trait UpdateHandlers { request.slices.foreach(s => if (s == Slicing.AutoSlices) params.put("slices", Slicing.AutoSlicesValue) else params.put("slices", s) ) + request.preference.foreach(params.put("preference", _)) request.indicesOptions.foreach { opts => IndicesOptionsParams(opts).foreach { case (key, value) => params.put(key, value) }