diff --git a/docs/source/design/mooncake-store.md b/docs/source/design/mooncake-store.md index 4ec84995d..5b1d20dc5 100644 --- a/docs/source/design/mooncake-store.md +++ b/docs/source/design/mooncake-store.md @@ -105,6 +105,25 @@ struct ReplicateConfig { }; ``` +### Upsert + +```C++ +tl::expected Upsert(const ObjectKey& key, + std::vector& slices, + const ReplicateConfig& config); + +std::vector> BatchUpsert( + const std::vector& keys, + std::vector>& batched_slices, + const ReplicateConfig& config); +``` + +`Upsert` inserts `key` if it does not exist and updates the existing object if +it does. It uses the same replication configuration model as `Put`, while +allowing the store to reuse existing placement for in-place updates when the +current layout permits it. `BatchUpsert` performs the same operation for +multiple keys using a shared replication configuration. + ### Remove ```C++ @@ -516,6 +535,40 @@ The Master Service handles object-related interfaces as follows: Before writing an object, the Client calls PutStart to request storage space allocation from the Master Service. After completing data writing, the Client calls PutEnd to notify the Master Service to mark the object write as completed. +- Upsert + +```C++ +tl::expected, ErrorCode> UpsertStart( + const std::string& key, + const std::vector& slice_lengths, + const ReplicateConfig& config); + +std::vector, ErrorCode>> +BatchUpsertStart(const std::vector& keys, + const std::vector>& slice_lengths, + const ReplicateConfig& config); + +tl::expected UpsertEnd( + const std::string& key, ReplicaType replica_type); + +std::vector> BatchUpsertEnd( + const std::vector& keys); + +tl::expected UpsertRevoke( + const std::string& key, ReplicaType replica_type); + +std::vector> BatchUpsertRevoke( + const std::vector& keys); +``` + +`UpsertStart` / `UpsertEnd` / `UpsertRevoke` mirror the existing put lifecycle +but operate on insert-or-update semantics. If the key does not exist, the flow +behaves like `PutStart`. If the key already exists, the Master may reuse the +current allocation for an in-place update or allocate new space when the object +layout changes. The batch variants provide the same control flow for multiple +keys and are the lower-level primitives used by the high-level `BatchUpsert` +path. + - GetReplicaList ```C++ diff --git a/docs/source/python-api-reference/mooncake-store.md b/docs/source/python-api-reference/mooncake-store.md index 1c511c996..28ac9b63d 100644 --- a/docs/source/python-api-reference/mooncake-store.md +++ b/docs/source/python-api-reference/mooncake-store.md @@ -629,6 +629,120 @@ result = store.put_batch(keys, values) --- +#### upsert() + +Insert a new object if the key does not exist, or update the existing object in place when possible. They use the same replication configuration model as `put()`. + +Upsert binary data in the distributed storage. + +```python +def upsert(self, key: str, value: bytes, config: ReplicateConfig = None) -> int +``` + +**Parameters:** +- `key` (str): Unique object identifier +- `value` (bytes): Binary data to insert or update +- `config` (ReplicateConfig, optional): Replication configuration + +**Returns:** +- `int`: Status code (0 = success, non-zero = error code) + +**Example:** +```python +config = ReplicateConfig() +config.replica_num = 2 + +rc = store.upsert("weights", b"new-bytes", config) +if rc == 0: + print("Upsert succeeded") +``` + +#### upsert_from() + +Upsert object data directly from a pre-allocated buffer (zero-copy). + +```python +def upsert_from(self, key: str, buffer_ptr: int, size: int, config: ReplicateConfig = None) -> int +``` + +**Parameters:** +- `key` (str): Object identifier +- `buffer_ptr` (int): Memory address of the source buffer +- `size` (int): Number of bytes to insert or update +- `config` (ReplicateConfig, optional): Replication configuration + +**Returns:** +- `int`: Status code (0 = success, non-zero = error code) + +**Note:** This is the zero-copy counterpart of `upsert()`. As with +`put_from()`, register the buffer before issuing the request. + +#### batch_upsert_from() + +Upsert multiple objects directly from pre-allocated buffers. + +```python +def batch_upsert_from(self, keys: List[str], buffer_ptrs: List[int], sizes: List[int], + config: ReplicateConfig = None) -> List[int] +``` + +**Parameters:** +- `keys` (List[str]): List of object identifiers +- `buffer_ptrs` (List[int]): List of source buffer addresses +- `sizes` (List[int]): List of byte lengths for each buffer +- `config` (ReplicateConfig, optional): Replication configuration shared by all objects + +**Returns:** +- `List[int]`: List of status codes for each upsert + +#### upsert_parts() + +Upsert data from multiple buffer parts as a single object (insert or update). + +```python +def upsert_parts(self, key: str, *parts, config: ReplicateConfig = None) -> int +``` + +**Parameters:** +- `key` (str): Object identifier +- `*parts`: Variable number of bytes-like objects to concatenate +- `config` (ReplicateConfig, optional): Replication configuration + +**Returns:** +- `int`: Status code (0 = success, non-zero = error code) + +**Example:** +```python +part1 = b"Hello, " +part2 = b"World!" +result = store.upsert_parts("greeting", part1, part2) +``` + +#### upsert_batch() + +Upsert multiple objects in a single batch operation. + +```python +def upsert_batch(self, keys: List[str], values: List[bytes], config: ReplicateConfig = None) -> int +``` + +**Parameters:** +- `keys` (List[str]): List of object identifiers +- `values` (List[bytes]): List of binary data to insert or update +- `config` (ReplicateConfig, optional): Replication configuration for all objects + +**Returns:** +- `int`: Status code (0 = success, non-zero = error code) + +**Example:** +```python +keys = ["key1", "key2", "key3"] +values = [b"value1", b"value2", b"value3"] +result = store.upsert_batch(keys, values) +``` + +--- + #### get_batch() Retrieve multiple objects in a single batch operation. @@ -1533,6 +1647,133 @@ def batch_pub_tensor(self, keys: List[str], tensors_list: List[torch.Tensor], co --- +#### upsert_tensor() + +Insert a tensor if its key is missing, or update the existing tensor if the key already exists. The current tensor upsert helpers use the default `ReplicateConfig` and therefore do not take a `config` parameter. + +Upsert a PyTorch tensor into the store. + +```python +def upsert_tensor(self, key: str, tensor: torch.Tensor) -> int +``` + +**Parameters:** +- `key` (str): Object identifier +- `tensor` (torch.Tensor): The PyTorch tensor to insert or update + +**Returns:** +- `int`: Status code (0 = success, non-zero = error code) + +**Note:** This function requires `torch` to be installed and available in the environment. + +#### upsert_tensor_from() + +Upsert a tensor directly from a pre-allocated buffer. The buffer layout must be +`[TensorMetadata][tensor data]`, matching the layout used by +`get_tensor_into()`. + +```python +def upsert_tensor_from(self, key: str, buffer_ptr: int, size: int) -> int +``` + +**Parameters:** +- `key` (str): Object identifier +- `buffer_ptr` (int): Buffer pointer containing serialized tensor metadata and payload +- `size` (int): Actual serialized byte length of the tensor buffer + +**Returns:** +- `int`: Status code (0 = success, non-zero = error code) + +**Note:** This function is not supported for dummy client. + +#### batch_upsert_tensor_from() + +Upsert multiple tensors directly from pre-allocated buffers. Each buffer must +use layout `[TensorMetadata][tensor data]`. + +```python +def batch_upsert_tensor_from(self, keys: List[str], buffer_ptrs: List[int], sizes: List[int]) -> List[int] +``` + +**Parameters:** +- `keys` (List[str]): List of object identifiers +- `buffer_ptrs` (List[int]): List of serialized tensor buffer pointers +- `sizes` (List[int]): List of actual serialized byte lengths + +**Returns:** +- `List[int]`: List of status codes for each tensor upsert + +#### batch_upsert_tensor() + +Upsert a batch of PyTorch tensors into the store (insert or update). + +```python +def batch_upsert_tensor(self, keys: List[str], tensors_list: List[torch.Tensor]) -> List[int] +``` + +**Parameters:** +- `keys` (List[str]): List of object identifiers +- `tensors_list` (List[torch.Tensor]): List of tensors to insert or update + +**Returns:** +- `List[int]`: List of status codes for each tensor operation. + +**Note:** This function requires `torch` to be installed and available in the environment. Not supported for dummy client. + +#### upsert_pub_tensor() + +Upsert a PyTorch tensor with configurable replication settings (insert or update). + +```python +def upsert_pub_tensor(self, key: str, tensor: torch.Tensor, config: ReplicateConfig = None) -> int +``` + +**Parameters:** +- `key` (str): Unique object identifier +- `tensor` (torch.Tensor): PyTorch tensor to insert or update +- `config` (ReplicateConfig, optional): Replication configuration + +**Returns:** +- `int`: Status code (0 = success, non-zero = error code) + +**Note:** This function requires `torch` to be installed and available in the environment. Not supported for dummy client. + +**Example:** +```python +import torch +from mooncake.store import ReplicateConfig + +tensor = torch.randn(100, 100) + +config = ReplicateConfig() +config.replica_num = 2 +config.with_soft_pin = True + +result = store.upsert_pub_tensor("my_tensor", tensor, config) +if result == 0: + print("Tensor upserted successfully") +``` + +#### batch_upsert_pub_tensor() + +Batch upsert PyTorch tensors with configurable replication settings (insert or update). + +```python +def batch_upsert_pub_tensor(self, keys: List[str], tensors_list: List[torch.Tensor], config: ReplicateConfig = None) -> List[int] +``` + +**Parameters:** +- `keys` (List[str]): List of object identifiers +- `tensors_list` (List[torch.Tensor]): List of tensors to insert or update +- `config` (ReplicateConfig, optional): Replication configuration + +**Returns:** +- `List[int]`: List of status codes for each tensor operation. + +**Note:** This function requires `torch` to be installed and available in the environment. Not supported for dummy client. + +--- + ### PyTorch Tensor Operations (Zero Copy) These methods provide direct support for storing and retrieving PyTorch tensors. They automatically handle serialization and metadata, and include built-in support for **Tensor Parallelism (TP)** by automatically splitting and reconstructing tensor shards. diff --git a/mooncake-integration/store/store_py.cpp b/mooncake-integration/store/store_py.cpp index e74c61cd3..bc2328d88 100644 --- a/mooncake-integration/store/store_py.cpp +++ b/mooncake-integration/store/store_py.cpp @@ -967,6 +967,231 @@ class MooncakeStorePyWrapper { return final_results; } + // --- Upsert tensor methods --- + + int upsert_tensor_impl(const std::string &key, pybind11::object tensor, + const ReplicateConfig &config) { + auto info = extract_tensor_info(tensor, key); + if (!info.valid()) return to_py_ret(ErrorCode::INVALID_PARAMS); + + std::vector> values; + values.emplace_back(reinterpret_cast(&info.metadata), + sizeof(TensorMetadata)); + values.emplace_back(reinterpret_cast(info.data_ptr), + info.tensor_size); + + py::gil_scoped_release release_gil; + int ret = store_->upsert_parts(key, values, config); + if (ret != 0) + LOG(ERROR) << "upsert_parts failed for key " << key << " with code " + << ret; + return ret; + } + + int upsert_tensor(const std::string &key, pybind11::object tensor) { + if (!is_client_initialized() || use_dummy_client_) { + LOG(ERROR) << "Client not initialized or Dummy client not " + "supported for tensors"; + return to_py_ret(ErrorCode::INVALID_PARAMS); + } + return upsert_tensor_impl(key, tensor, ReplicateConfig{}); + } + + int upsert_tensor_from(const std::string &key, uintptr_t buffer_ptr, + size_t size) { + if (buffer_ptr == 0) { + LOG(ERROR) << "Buffer pointer cannot be null"; + return to_py_ret(ErrorCode::INVALID_PARAMS); + } + void *buffer = reinterpret_cast(buffer_ptr); + if (!is_client_initialized()) { + LOG(ERROR) << "Client is not initialized"; + return to_py_ret(ErrorCode::INVALID_PARAMS); + } + if (use_dummy_client_) { + LOG(ERROR) + << "upsert_tensor_from is not supported for dummy client"; + return to_py_ret(ErrorCode::INVALID_PARAMS); + } + if (size <= sizeof(TensorMetadata)) { + LOG(ERROR) << "Buffer size too small for tensor metadata"; + return to_py_ret(ErrorCode::INVALID_PARAMS); + } + py::gil_scoped_release release_gil; + return store_->upsert_from(key, buffer, size, ReplicateConfig{}); + } + + std::vector batch_upsert_tensor_from( + const std::vector &keys, + const std::vector &buffer_ptrs, + const std::vector &sizes) { + if (!is_client_initialized()) { + LOG(ERROR) << "Client is not initialized"; + return std::vector(keys.size(), + to_py_ret(ErrorCode::INVALID_PARAMS)); + } + if (use_dummy_client_) { + LOG(ERROR) + << "batch_upsert_tensor_from is not supported for dummy client"; + return std::vector(keys.size(), + to_py_ret(ErrorCode::INVALID_PARAMS)); + } + if (keys.empty()) { + return std::vector(); + } + if (keys.size() != buffer_ptrs.size() || keys.size() != sizes.size()) { + LOG(ERROR) << "Size mismatch: keys, buffer_ptrs, and sizes must " + "have the same length"; + return std::vector(keys.size(), + to_py_ret(ErrorCode::INVALID_PARAMS)); + } + for (size_t i = 0; i < sizes.size(); ++i) { + if (buffer_ptrs[i] == 0) { + LOG(ERROR) << "Buffer pointer at index " << i + << " cannot be null"; + return std::vector(keys.size(), + to_py_ret(ErrorCode::INVALID_PARAMS)); + } + if (sizes[i] <= sizeof(TensorMetadata)) { + LOG(ERROR) << "Buffer size at index " << i + << " too small for tensor metadata"; + return std::vector(keys.size(), + to_py_ret(ErrorCode::INVALID_PARAMS)); + } + } + std::vector buffers; + buffers.reserve(buffer_ptrs.size()); + for (uintptr_t ptr : buffer_ptrs) { + buffers.push_back(reinterpret_cast(ptr)); + } + py::gil_scoped_release release_gil; + return store_->batch_upsert_from(keys, buffers, sizes, + ReplicateConfig{}); + } + + std::vector batch_upsert_tensor_impl( + const std::vector &keys, + const pybind11::list &tensors_list, + const ReplicateConfig &config = ReplicateConfig{}) { + std::vector infos(keys.size()); + std::vector results(keys.size(), 0); + + // 1. Extract Metadata (GIL Held) + for (size_t i = 0; i < keys.size(); ++i) { + infos[i] = extract_tensor_info(tensors_list[i], keys[i]); + if (!infos[i].valid()) + results[i] = to_py_ret(ErrorCode::INVALID_PARAMS); + } + + // 2. Prepare Buffers and Execute (GIL Released) + { + py::gil_scoped_release release_gil; + + std::vector valid_keys; + std::vector buffer_ptrs; + std::vector buffer_sizes; + std::vector original_indices; + + std::vector> temp_allocations; + + for (size_t i = 0; i < infos.size(); ++i) { + if (!infos[i].valid()) continue; + + size_t total_size = + sizeof(TensorMetadata) + infos[i].tensor_size; + auto alloc_result = + store_->client_buffer_allocator_->allocate(total_size); + + if (!alloc_result) { + LOG(ERROR) + << "Failed to allocate buffer for key: " << keys[i]; + results[i] = to_py_ret(ErrorCode::INVALID_PARAMS); + continue; + } + + // Copy Metadata & Data + char *dst = static_cast(alloc_result->ptr()); + memcpy(dst, &infos[i].metadata, sizeof(TensorMetadata)); + memcpy(dst + sizeof(TensorMetadata), + reinterpret_cast(infos[i].data_ptr), + infos[i].tensor_size); + + valid_keys.push_back(keys[i]); + buffer_ptrs.push_back(alloc_result->ptr()); + buffer_sizes.push_back(total_size); + original_indices.push_back(i); + + temp_allocations.push_back( + std::make_unique(std::move(*alloc_result))); + } + + if (!valid_keys.empty()) { + std::vector op_results = store_->batch_upsert_from( + valid_keys, buffer_ptrs, buffer_sizes, config); + for (size_t i = 0; i < op_results.size(); ++i) { + results[original_indices[i]] = op_results[i]; + } + } + } + + return results; + } + + std::vector batch_upsert_tensor( + const std::vector &keys, + const pybind11::list &tensors_list) { + if (!is_client_initialized() || use_dummy_client_) + return std::vector(keys.size(), + to_py_ret(ErrorCode::INVALID_PARAMS)); + + if (keys.size() != tensors_list.size() || keys.empty()) { + if (!keys.empty()) LOG(ERROR) << "Size mismatch in batch_upsert"; + return std::vector(keys.size(), + to_py_ret(ErrorCode::INVALID_PARAMS)); + } + + return batch_upsert_tensor_impl(keys, tensors_list, ReplicateConfig{}); + } + + int upsert_pub_tensor(const std::string &key, pybind11::object tensor, + const ReplicateConfig &config = ReplicateConfig{}) { + if (!is_client_initialized() || use_dummy_client_) { + LOG(ERROR) << "Client not initialized or Dummy client not " + "supported for tensors"; + return to_py_ret(ErrorCode::INVALID_PARAMS); + } + + int validate_result = validate_replicate_config(config); + if (validate_result) return validate_result; + + return upsert_tensor_impl(key, tensor, config); + } + + std::vector batch_upsert_pub_tensor( + const std::vector &keys, + const pybind11::list &tensors_list, + const ReplicateConfig &config = ReplicateConfig{}) { + if (!is_client_initialized() || use_dummy_client_) + return std::vector(keys.size(), + to_py_ret(ErrorCode::INVALID_PARAMS)); + + if (keys.size() != tensors_list.size() || keys.empty()) { + if (!keys.empty()) + LOG(ERROR) << "Size mismatch in batch_upsert_pub"; + return std::vector(keys.size(), + to_py_ret(ErrorCode::INVALID_PARAMS)); + } + + int validate_result = validate_replicate_config(config); + if (validate_result) + return std::vector(keys.size(), + to_py_ret(ErrorCode::INVALID_PARAMS)); + + return batch_upsert_tensor_impl(keys, tensors_list, config); + } + + // --- End Upsert tensor methods --- + int validate_replicate_config( const ReplicateConfig &config = ReplicateConfig{}) { if (!config.preferred_segments.empty() && @@ -1633,6 +1858,153 @@ PYBIND11_MODULE(store, m) { "Put a batch of full tensors directly from pre-allocated " "buffers for Tensor Parallelism. Each buffer is internally split " "and stored under key_tp_ for all ranks.") + .def("upsert_tensor", &MooncakeStorePyWrapper::upsert_tensor, + py::arg("key"), py::arg("tensor"), + "Upsert a PyTorch tensor into the store (insert or update)") + .def("upsert_tensor_from", &MooncakeStorePyWrapper::upsert_tensor_from, + py::arg("key"), py::arg("buffer_ptr"), py::arg("size"), + "Upsert a tensor directly from a pre-allocated buffer. Buffer " + "layout must be [TensorMetadata][tensor data].") + .def("batch_upsert_tensor_from", + &MooncakeStorePyWrapper::batch_upsert_tensor_from, py::arg("keys"), + py::arg("buffer_ptrs"), py::arg("sizes"), + "Upsert tensors directly from pre-allocated buffers for " + "multiple keys. Each buffer layout: [TensorMetadata][tensor " + "data].") + .def("batch_upsert_tensor", + &MooncakeStorePyWrapper::batch_upsert_tensor, py::arg("keys"), + py::arg("tensors_list"), + "Upsert a batch of PyTorch tensors into the store (insert or " + "update)") + .def("upsert_pub_tensor", + &MooncakeStorePyWrapper::upsert_pub_tensor, py::arg("key"), + py::arg("tensor"), py::arg("config") = ReplicateConfig{}, + "Upsert a PyTorch tensor with configurable replication settings") + .def("batch_upsert_pub_tensor", + &MooncakeStorePyWrapper::batch_upsert_pub_tensor, py::arg("keys"), + py::arg("tensors_list"), py::arg("config") = ReplicateConfig{}, + "Batch upsert PyTorch tensors with configurable replication " + "settings") + .def( + "upsert_from", + [](MooncakeStorePyWrapper &self, const std::string &key, + uintptr_t buffer_ptr, size_t size, + const ReplicateConfig &config = ReplicateConfig{}) { + if (!self.is_client_initialized()) { + LOG(ERROR) << "Client is not initialized"; + return to_py_ret(ErrorCode::INVALID_PARAMS); + } + void *buffer = reinterpret_cast(buffer_ptr); + py::gil_scoped_release release; + return self.store_->upsert_from(key, buffer, size, config); + }, + py::arg("key"), py::arg("buffer_ptr"), py::arg("size"), + py::arg("config") = ReplicateConfig{}, + "Upsert object data directly from a pre-allocated buffer") + .def( + "batch_upsert_from", + [](MooncakeStorePyWrapper &self, + const std::vector &keys, + const std::vector &buffer_ptrs, + const std::vector &sizes, + const ReplicateConfig &config = ReplicateConfig{}) { + if (!self.is_client_initialized()) { + LOG(ERROR) << "Client is not initialized"; + return std::vector( + keys.size(), to_py_ret(ErrorCode::INVALID_PARAMS)); + } + std::vector buffers; + buffers.reserve(buffer_ptrs.size()); + for (uintptr_t ptr : buffer_ptrs) { + buffers.push_back(reinterpret_cast(ptr)); + } + py::gil_scoped_release release; + return self.store_->batch_upsert_from(keys, buffers, sizes, + config); + }, + py::arg("keys"), py::arg("buffer_ptrs"), py::arg("sizes"), + py::arg("config") = ReplicateConfig{}, + "Upsert object data directly from pre-allocated buffers for " + "multiple keys") + .def( + "upsert", + [](MooncakeStorePyWrapper &self, const std::string &key, + py::buffer buf, + const ReplicateConfig &config = ReplicateConfig{}) { + if (!self.is_client_initialized()) { + LOG(ERROR) << "Client is not initialized"; + return to_py_ret(ErrorCode::INVALID_PARAMS); + } + py::buffer_info info = buf.request(/*writable=*/false); + py::gil_scoped_release release; + return self.store_->upsert( + key, + std::span(static_cast(info.ptr), + static_cast(info.size)), + config); + }, + py::arg("key"), py::arg("value"), + py::arg("config") = ReplicateConfig{}, + "Upsert raw bytes into the store (insert or update)") + .def( + "upsert_parts", + [](MooncakeStorePyWrapper &self, const std::string &key, + py::args parts, + const ReplicateConfig &config = ReplicateConfig{}) { + if (!self.is_client_initialized()) { + LOG(ERROR) << "Client is not initialized"; + return to_py_ret(ErrorCode::INVALID_PARAMS); + } + std::vector infos; + std::vector> spans; + infos.reserve(parts.size()); + spans.reserve(parts.size()); + + for (auto &obj : parts) { + py::buffer buf = py::reinterpret_borrow(obj); + infos.emplace_back(buf.request(false)); + const auto &info = infos.back(); + if (info.ndim != 1 || info.itemsize != 1) + throw std::runtime_error( + "parts must be 1-D bytes-like"); + + spans.emplace_back(static_cast(info.ptr), + static_cast(info.size)); + } + + py::gil_scoped_release unlock; + return self.store_->upsert_parts(key, spans, config); + }, + py::arg("key"), py::arg("config") = ReplicateConfig{}, + "Upsert multiple byte parts as a single object (insert or update)") + .def( + "upsert_batch", + [](MooncakeStorePyWrapper &self, + const std::vector &keys, + const std::vector &buffers, + const ReplicateConfig &config = ReplicateConfig{}) { + if (!self.is_client_initialized()) { + LOG(ERROR) << "Client is not initialized"; + return to_py_ret(ErrorCode::INVALID_PARAMS); + } + std::vector infos; + std::vector> spans; + infos.reserve(buffers.size()); + spans.reserve(buffers.size()); + + for (const auto &buf : buffers) { + infos.emplace_back(buf.request(/*writable=*/false)); + const auto &info = infos.back(); + spans.emplace_back(static_cast(info.ptr), + static_cast(info.size)); + } + + py::gil_scoped_release release; + return self.store_->upsert_batch(keys, spans, config); + }, + py::arg("keys"), py::arg("values"), + py::arg("config") = ReplicateConfig{}, + "Batch upsert raw bytes for multiple keys (insert or update)") .def( "register_buffer", [](MooncakeStorePyWrapper &self, uintptr_t buffer_ptr, @@ -1694,14 +2066,12 @@ PYBIND11_MODULE(store, m) { [](MooncakeStorePyWrapper &self, const std::string &key, uintptr_t buffer_ptr, size_t size, const ReplicateConfig &config = ReplicateConfig{}) { - // Put data directly from user-provided buffer + if (!self.is_client_initialized()) { + LOG(ERROR) << "Client is not initialized"; + return to_py_ret(ErrorCode::INVALID_PARAMS); + } void *buffer = reinterpret_cast(buffer_ptr); py::gil_scoped_release release; - if (self.use_dummy_client_) { - LOG(ERROR) << "put_from is not supported for dummy client " - "now"; - return -1; - } return self.store_->put_from(key, buffer, size, config); }, py::arg("key"), py::arg("buffer_ptr"), py::arg("size"), @@ -1713,18 +2083,14 @@ PYBIND11_MODULE(store, m) { uintptr_t buffer_ptr, uintptr_t metadata_buffer_ptr, size_t size, size_t metadata_size, const ReplicateConfig &config = ReplicateConfig{}) { - // Put data directly from user-provided buffer with - // metadata + if (!self.is_client_initialized()) { + LOG(ERROR) << "Client is not initialized"; + return to_py_ret(ErrorCode::INVALID_PARAMS); + } void *buffer = reinterpret_cast(buffer_ptr); void *metadata_buffer = reinterpret_cast(metadata_buffer_ptr); py::gil_scoped_release release; - if (self.use_dummy_client_) { - LOG(ERROR) - << "put_from_with_metadata is not supported for dummy " - "client now"; - return -1; - } return self.store_->put_from_with_metadata( key, buffer, metadata_buffer, size, metadata_size, config); }, @@ -1832,6 +2198,10 @@ PYBIND11_MODULE(store, m) { const std::vector> &all_buffer_ptrs, const std::vector> &all_sizes, const ReplicateConfig &config = ReplicateConfig{}) { + if (!self.is_client_initialized()) { + LOG(ERROR) << "Client is not initialized"; + return std::vector{}; + } py::gil_scoped_release release; return self.store_->batch_put_from_multi_buffers( keys, CastAddrs2Ptrs(all_buffer_ptrs), all_sizes, config); diff --git a/mooncake-store/include/client_service.h b/mooncake-store/include/client_service.h index 583a784e0..6a691a716 100644 --- a/mooncake-store/include/client_service.h +++ b/mooncake-store/include/client_service.h @@ -201,6 +201,28 @@ class Client { std::vector>& batched_slices, const ReplicateConfig& config); + /** + * @brief Upserts data: inserts if key doesn't exist, updates if it does + * @param key Object key + * @param slices Vector of data slices to store + * @param config Replication configuration + * @return ErrorCode indicating success/failure + */ + tl::expected Upsert(const ObjectKey& key, + std::vector& slices, + const ReplicateConfig& config); + + /** + * @brief Batch upsert data with replication + * @param keys Object keys + * @param batched_slices Vector of vectors of data slices + * @param config Replication configuration + */ + std::vector> BatchUpsert( + const std::vector& keys, + std::vector>& batched_slices, + const ReplicateConfig& config); + /** * @brief Removes an object and all its replicas * @param key Key to remove @@ -590,6 +612,9 @@ class Client { void SubmitTransfers(std::vector& ops); void WaitForTransfers(std::vector& ops); void FinalizeBatchPut(std::vector& ops); + void StartBatchUpsert(std::vector& ops, + const ReplicateConfig& config); + void FinalizeBatchUpsert(std::vector& ops); std::vector> CollectResults( const std::vector& ops); diff --git a/mooncake-store/include/dummy_client.h b/mooncake-store/include/dummy_client.h index 1387a4e5f..f65cf7ddc 100644 --- a/mooncake-store/include/dummy_client.h +++ b/mooncake-store/include/dummy_client.h @@ -92,6 +92,25 @@ class DummyClient : public PyClient { const std::vector> &values, const ReplicateConfig &config = ReplicateConfig{}); + int upsert(const std::string &key, std::span value, + const ReplicateConfig &config = ReplicateConfig{}); + + int upsert_from(const std::string &key, void *buffer, size_t size, + const ReplicateConfig &config = ReplicateConfig{}); + + std::vector batch_upsert_from( + const std::vector &keys, + const std::vector &buffers, const std::vector &sizes, + const ReplicateConfig &config = ReplicateConfig{}); + + int upsert_parts(const std::string &key, + std::vector> values, + const ReplicateConfig &config = ReplicateConfig{}); + + int upsert_batch(const std::vector &keys, + const std::vector> &values, + const ReplicateConfig &config = ReplicateConfig{}); + [[nodiscard]] std::string get_hostname() const; // Check if a pointer falls within the hot cache shm region diff --git a/mooncake-store/include/master_client.h b/mooncake-store/include/master_client.h index c163a59f1..97719dbe6 100644 --- a/mooncake-store/include/master_client.h +++ b/mooncake-store/include/master_client.h @@ -197,6 +197,36 @@ class MasterClient { [[nodiscard]] std::vector> BatchPutRevoke( const std::vector& keys); + /** + * @brief Starts an upsert operation (insert or update) + * @param key Object key + * @param slice_lengths Vector of slice lengths + * @param config Replication configuration + * @return Replica descriptors on success, ErrorCode on failure + */ + [[nodiscard]] tl::expected, ErrorCode> + UpsertStart(const std::string& key, + const std::vector& slice_lengths, + const ReplicateConfig& config); + + [[nodiscard]] std::vector< + tl::expected, ErrorCode>> + BatchUpsertStart(const std::vector& keys, + const std::vector>& slice_lengths, + const ReplicateConfig& config); + + [[nodiscard]] tl::expected UpsertEnd( + const std::string& key, ReplicaType replica_type); + + [[nodiscard]] std::vector> BatchUpsertEnd( + const std::vector& keys); + + [[nodiscard]] tl::expected UpsertRevoke( + const std::string& key, ReplicaType replica_type); + + [[nodiscard]] std::vector> BatchUpsertRevoke( + const std::vector& keys); + /** * @brief Removes an object and all its replicas * @param key Key to remove diff --git a/mooncake-store/include/master_service.h b/mooncake-store/include/master_service.h index a8d21d236..9acc0fe1c 100644 --- a/mooncake-store/include/master_service.h +++ b/mooncake-store/include/master_service.h @@ -238,6 +238,53 @@ class MasterService { std::vector> BatchPutRevoke( const UUID& client_id, const std::vector& keys); + /** + * @brief Start an upsert operation. If the key does not exist, behaves + * like PutStart. If the key exists with the same size, performs in-place + * update (reuses existing buffers). If the key exists with a different + * size, deletes old replicas and allocates new ones. + * @return Replica descriptors on success, or error code on failure. + * Possible errors: OBJECT_HAS_REPLICATION_TASK (Copy/Move/Offload in + * progress), OBJECT_REPLICA_BUSY (replicas have non-zero refcnt). + */ + auto UpsertStart(const UUID& client_id, const std::string& key, + const uint64_t slice_length, const ReplicateConfig& config) + -> tl::expected, ErrorCode>; + + /** + * @brief Complete an upsert operation. Delegates to PutEnd. + */ + auto UpsertEnd(const UUID& client_id, const std::string& key, + ReplicaType replica_type) -> tl::expected; + + /** + * @brief Revoke an upsert operation. Delegates to PutRevoke. + */ + auto UpsertRevoke(const UUID& client_id, const std::string& key, + ReplicaType replica_type) + -> tl::expected; + + /** + * @brief Start a batch of upsert operations. + */ + std::vector, ErrorCode>> + BatchUpsertStart(const UUID& client_id, + const std::vector& keys, + const std::vector& slice_lengths, + const ReplicateConfig& config); + + /** + * @brief Complete a batch of upsert operations. Delegates to BatchPutEnd. + */ + std::vector> BatchUpsertEnd( + const UUID& client_id, const std::vector& keys); + + /** + * @brief Revoke a batch of upsert operations. Delegates to BatchPutRevoke. + */ + std::vector> BatchUpsertRevoke( + const UUID& client_id, const std::vector& keys); + /** * @brief Evict a disk replica for a key (triggered by client-side disk * eviction). @@ -520,8 +567,10 @@ class MasterService { ObjectMetadata(ObjectMetadata&&) = delete; ObjectMetadata& operator=(ObjectMetadata&&) = delete; - const UUID client_id; - const std::chrono::system_clock::time_point put_start_time; + // Updated by UpsertStart (Case B) to reflect the new writer. + UUID client_id; + // Updated by UpsertStart (Case B) to reset the discard timeout. + std::chrono::system_clock::time_point put_start_time; const size_t size; mutable SpinLock lock; @@ -802,6 +851,15 @@ class MasterService { // Helper to clean up stale handles pointing to unmounted segments bool CleanupStaleHandles(ObjectMetadata& metadata); + // Helper: allocate replicas, create ObjectMetadata, insert into shard, + // and return descriptor list. Shared by PutStart and UpsertStart. + auto AllocateAndInsertMetadata( + MetadataShardAccessorRW& shard, const UUID& client_id, + const std::string& key, uint64_t value_length, + const ReplicateConfig& config, + const std::chrono::system_clock::time_point& now) + -> tl::expected, ErrorCode>; + /** * @brief Helper to discard expired processing keys. */ diff --git a/mooncake-store/include/pyclient.h b/mooncake-store/include/pyclient.h index 9866d99d7..960e488b0 100644 --- a/mooncake-store/include/pyclient.h +++ b/mooncake-store/include/pyclient.h @@ -204,6 +204,27 @@ class PyClient { const std::vector> &values, const ReplicateConfig &config = ReplicateConfig{}) = 0; + virtual int upsert(const std::string &key, std::span value, + const ReplicateConfig &config = ReplicateConfig{}) = 0; + + virtual int upsert_from( + const std::string &key, void *buffer, size_t size, + const ReplicateConfig &config = ReplicateConfig{}) = 0; + + virtual std::vector batch_upsert_from( + const std::vector &keys, + const std::vector &buffers, const std::vector &sizes, + const ReplicateConfig &config = ReplicateConfig{}) = 0; + + virtual int upsert_parts( + const std::string &key, std::vector> values, + const ReplicateConfig &config = ReplicateConfig{}) = 0; + + virtual int upsert_batch( + const std::vector &keys, + const std::vector> &values, + const ReplicateConfig &config = ReplicateConfig{}) = 0; + [[nodiscard]] virtual std::string get_hostname() const = 0; virtual int remove(const std::string &key, bool force = false) = 0; diff --git a/mooncake-store/include/real_client.h b/mooncake-store/include/real_client.h index 3cc255db1..e779c1186 100644 --- a/mooncake-store/include/real_client.h +++ b/mooncake-store/include/real_client.h @@ -219,6 +219,25 @@ class RealClient : public PyClient { const std::vector> &values, const ReplicateConfig &config = ReplicateConfig{}); + int upsert(const std::string &key, std::span value, + const ReplicateConfig &config = ReplicateConfig{}); + + int upsert_from(const std::string &key, void *buffer, size_t size, + const ReplicateConfig &config = ReplicateConfig{}); + + std::vector batch_upsert_from( + const std::vector &keys, + const std::vector &buffers, const std::vector &sizes, + const ReplicateConfig &config = ReplicateConfig{}); + + int upsert_parts(const std::string &key, + std::vector> values, + const ReplicateConfig &config = ReplicateConfig{}); + + int upsert_batch(const std::vector &keys, + const std::vector> &values, + const ReplicateConfig &config = ReplicateConfig{}); + [[nodiscard]] std::string get_hostname() const; /** @@ -342,6 +361,29 @@ class RealClient : public PyClient { const std::string &key, std::span value, const ReplicateConfig &config, const UUID &client_id); + tl::expected upsert_dummy_helper( + const std::string &key, std::span value, + const ReplicateConfig &config, const UUID &client_id); + + tl::expected upsert_parts_dummy_helper( + const std::string &key, std::vector> values, + const ReplicateConfig &config, const UUID &client_id); + + tl::expected upsert_from_dummy_helper( + const std::string &key, uint64_t dummy_buffer, size_t size, + const ReplicateConfig &config, const UUID &client_id); + + std::vector> batch_upsert_from_dummy_helper( + const std::vector &keys, + const std::vector &dummy_buffers, + const std::vector &sizes, const ReplicateConfig &config, + const UUID &client_id); + + tl::expected upsert_batch_dummy_helper( + const std::vector &keys, + const std::vector> &values, + const ReplicateConfig &config, const UUID &client_id); + tl::expected put_batch_dummy_helper( const std::vector &keys, const std::vector> &values, @@ -455,6 +497,34 @@ class RealClient : public PyClient { const std::string &key, void *buffer, size_t size, const ReplicateConfig &config = ReplicateConfig{}); + tl::expected upsert_internal( + const std::string &key, std::span value, + const ReplicateConfig &config = ReplicateConfig{}, + std::shared_ptr client_buffer_allocator = + nullptr); + + tl::expected upsert_from_internal( + const std::string &key, void *buffer, size_t size, + const ReplicateConfig &config = ReplicateConfig{}); + + std::vector> batch_upsert_from_internal( + const std::vector &keys, + const std::vector &buffers, const std::vector &sizes, + const ReplicateConfig &config = ReplicateConfig{}); + + tl::expected upsert_parts_internal( + const std::string &key, std::vector> values, + const ReplicateConfig &config = ReplicateConfig{}, + std::shared_ptr client_buffer_allocator = + nullptr); + + tl::expected upsert_batch_internal( + const std::vector &keys, + const std::vector> &values, + const ReplicateConfig &config = ReplicateConfig{}, + std::shared_ptr client_buffer_allocator = + nullptr); + std::vector> batch_put_from_internal( const std::vector &keys, const std::vector &buffers, const std::vector &sizes, diff --git a/mooncake-store/include/replica.h b/mooncake-store/include/replica.h index 2790268f7..dba29c3bb 100644 --- a/mooncake-store/include/replica.h +++ b/mooncake-store/include/replica.h @@ -240,6 +240,12 @@ class Replica { return replica.is_processing(); } + [[nodiscard]] bool is_busy() const { return refcnt_.load() > 0; } + + [[nodiscard]] static bool fn_is_busy(const Replica& replica) { + return replica.is_busy(); + } + [[nodiscard]] ReplicaType type() const { return std::visit(ReplicaTypeVisitor{}, data_); } @@ -299,6 +305,14 @@ class Replica { } } + void mark_processing() { + if (status_ == ReplicaStatus::COMPLETE) { + status_ = ReplicaStatus::PROCESSING; + } else { + LOG(ERROR) << "Cannot mark_processing from status: " << status_; + } + } + void inc_refcnt() { refcnt_.fetch_add(1); } void dec_refcnt() { refcnt_.fetch_sub(1); } diff --git a/mooncake-store/include/rpc_service.h b/mooncake-store/include/rpc_service.h index ba668e8ec..991c00a96 100644 --- a/mooncake-store/include/rpc_service.h +++ b/mooncake-store/include/rpc_service.h @@ -77,6 +77,30 @@ class WrappedMasterService { std::vector> BatchPutRevoke( const UUID& client_id, const std::vector& keys); + tl::expected, ErrorCode> UpsertStart( + const UUID& client_id, const std::string& key, + const uint64_t slice_length, const ReplicateConfig& config); + + tl::expected UpsertEnd(const UUID& client_id, + const std::string& key, + ReplicaType replica_type); + + tl::expected UpsertRevoke(const UUID& client_id, + const std::string& key, + ReplicaType replica_type); + + std::vector, ErrorCode>> + BatchUpsertStart(const UUID& client_id, + const std::vector& keys, + const std::vector& slice_lengths, + const ReplicateConfig& config); + + std::vector> BatchUpsertEnd( + const UUID& client_id, const std::vector& keys); + + std::vector> BatchUpsertRevoke( + const UUID& client_id, const std::vector& keys); + tl::expected Remove(const std::string& key, bool force = false); diff --git a/mooncake-store/include/types.h b/mooncake-store/include/types.h index d408a4848..cd6bd53dd 100644 --- a/mooncake-store/include/types.h +++ b/mooncake-store/include/types.h @@ -255,6 +255,7 @@ enum class ErrorCode : int32_t { REPLICA_IS_GONE = -712, ///< Replica existed once, but is gone now. REPLICA_NOT_IN_LOCAL_MEMORY = -713, ///< Replica does not reside in current node memory. + OBJECT_REPLICA_BUSY = -714, ///< Object replicas have non-zero refcnt. // Transfer errors (Range: -800 to -899) TRANSFER_FAIL = -800, ///< Transfer operation failed. diff --git a/mooncake-store/src/client_service.cpp b/mooncake-store/src/client_service.cpp index 018d89e13..6ef3867bb 100644 --- a/mooncake-store/src/client_service.cpp +++ b/mooncake-store/src/client_service.cpp @@ -1210,6 +1210,116 @@ tl::expected Client::Put(const ObjectKey& key, return {}; } +tl::expected Client::Upsert(const ObjectKey& key, + std::vector& slices, + const ReplicateConfig& config) { + // Prepare slice lengths + std::vector slice_lengths; + for (size_t i = 0; i < slices.size(); ++i) { + slice_lengths.emplace_back(slices[i].size); + } + + ReplicateConfig client_cfg = config; + if (protocol_ == "cxl") { + client_cfg.preferred_segment = local_hostname_; + } + + // Start upsert operation + auto start_result = + master_client_.UpsertStart(key, slice_lengths, client_cfg); + if (!start_result) { + ErrorCode err = start_result.error(); + if (err == ErrorCode::NO_AVAILABLE_HANDLE) { + LOG(WARNING) << "Failed to start upsert operation for key=" << key + << PUT_NO_SPACE_HELPER_STR; + } else { + LOG(ERROR) << "Failed to start upsert operation for key=" << key + << ": " << toString(err); + } + return tl::unexpected(err); + } + + // Record transfer latency + auto t0 = std::chrono::steady_clock::now(); + + // Handle disk replicas first + if (storage_backend_) { + for (auto it = start_result.value().rbegin(); + it != start_result.value().rend(); ++it) { + const auto& replica = *it; + if (replica.is_disk_replica()) { + auto disk_descriptor = replica.get_disk_descriptor(); + PutToLocalFile(key, slices, disk_descriptor); + break; + } + } + } + + // Transfer to memory replicas + for (const auto& replica : start_result.value()) { + if (replica.is_memory_replica()) { + ErrorCode transfer_err = TransferWrite(replica, slices); + if (transfer_err != ErrorCode::OK) { + auto revoke_result = + master_client_.UpsertRevoke(key, ReplicaType::MEMORY); + if (!revoke_result) { + LOG(ERROR) << "Failed to revoke upsert operation"; + return tl::unexpected(revoke_result.error()); + } + return tl::unexpected(transfer_err); + } + } + } + + auto us = std::chrono::duration_cast( + std::chrono::steady_clock::now() - t0) + .count(); + if (metrics_) { + metrics_->transfer_metric.put_latency_us.observe(us); + } + + // End upsert operation + auto end_result = master_client_.UpsertEnd(key, ReplicaType::MEMORY); + if (!end_result) { + ErrorCode err = end_result.error(); + LOG(ERROR) << "Failed to end upsert operation: " << err; + return tl::unexpected(err); + } + + return {}; +} + +std::vector> Client::BatchUpsert( + const std::vector& keys, + std::vector>& batched_slices, + const ReplicateConfig& config) { + ReplicateConfig client_cfg = config; + if (protocol_ == "cxl") { + client_cfg.preferred_segment = local_hostname_; + } + if (client_cfg.prefer_alloc_in_same_node) { + LOG(ERROR) << "prefer_alloc_in_same_node is not supported for upsert"; + return std::vector>( + keys.size(), tl::unexpected(ErrorCode::INVALID_PARAMS)); + } + + std::vector ops = CreatePutOperations(keys, batched_slices); + StartBatchUpsert(ops, client_cfg); + + auto t0 = std::chrono::steady_clock::now(); + SubmitTransfers(ops); + WaitForTransfers(ops); + auto us = std::chrono::duration_cast( + std::chrono::steady_clock::now() - t0) + .count(); + if (metrics_) { + metrics_->transfer_metric.batch_put_latency_us.observe(us); + } + + FinalizeBatchUpsert(ops); + return CollectResults(ops); +} + // TODO: `client.cpp` is too long, consider split it into multiple files enum class PutOperationState { PENDING, @@ -1332,6 +1442,52 @@ void Client::StartBatchPut(std::vector& ops, } } +void Client::StartBatchUpsert(std::vector& ops, + const ReplicateConfig& config) { + std::vector keys; + std::vector> slice_lengths; + + keys.reserve(ops.size()); + slice_lengths.reserve(ops.size()); + + for (const auto& op : ops) { + keys.emplace_back(op.key); + + std::vector slice_sizes; + slice_sizes.reserve(op.slices.size()); + for (const auto& slice : op.slices) { + slice_sizes.emplace_back(slice.size); + } + slice_lengths.emplace_back(std::move(slice_sizes)); + } + + auto start_responses = + master_client_.BatchUpsertStart(keys, slice_lengths, config); + + // Ensure response size matches request size + if (start_responses.size() != ops.size()) { + LOG(ERROR) << "BatchUpsertStart response size mismatch: expected " + << ops.size() << ", got " << start_responses.size(); + for (auto& op : ops) { + op.SetError(ErrorCode::RPC_FAIL, + "BatchUpsertStart response size mismatch"); + } + return; + } + + // Process individual responses with robust error handling + for (size_t i = 0; i < ops.size(); ++i) { + if (!start_responses[i]) { + ops[i].SetError(start_responses[i].error(), + "Master failed to start upsert operation"); + } else { + ops[i].replicas = start_responses[i].value(); + VLOG(1) << "Successfully started upsert for key " << ops[i].key + << " with " << ops[i].replicas.size() << " replicas"; + } + } +} + void Client::SubmitTransfers(std::vector& ops) { if (!transfer_submitter_) { LOG(ERROR) << "TransferSubmitter not initialized"; @@ -1561,6 +1717,103 @@ void Client::FinalizeBatchPut(std::vector& ops) { } } +void Client::FinalizeBatchUpsert(std::vector& ops) { + std::vector successful_keys; + std::vector successful_indices; + std::vector failed_keys; + std::vector failed_indices; + + successful_keys.reserve(ops.size()); + successful_indices.reserve(ops.size()); + failed_keys.reserve(ops.size()); + failed_indices.reserve(ops.size()); + + for (size_t i = 0; i < ops.size(); ++i) { + auto& op = ops[i]; + + if (!op.IsResolved() && !op.replicas.empty() && + !op.pending_transfers.empty()) { + successful_keys.emplace_back(op.key); + successful_indices.emplace_back(i); + } else if (op.state != PutOperationState::PENDING && + !op.replicas.empty()) { + failed_keys.emplace_back(op.key); + failed_indices.emplace_back(i); + } + } + + // Process successful operations + if (!successful_keys.empty()) { + auto end_responses = + master_client_.BatchUpsertEnd(successful_keys); + if (end_responses.size() != successful_keys.size()) { + LOG(ERROR) << "BatchUpsertEnd response size mismatch: expected " + << successful_keys.size() << ", got " + << end_responses.size(); + for (size_t idx : successful_indices) { + ops[idx].SetError(ErrorCode::RPC_FAIL, + "BatchUpsertEnd response size mismatch"); + } + } else { + for (size_t i = 0; i < end_responses.size(); ++i) { + const size_t op_idx = successful_indices[i]; + if (!end_responses[i]) { + LOG(ERROR) << "Failed to finalize upsert for key " + << successful_keys[i] << ": " + << toString(end_responses[i].error()); + ops[op_idx].SetError(end_responses[i].error(), + "BatchUpsertEnd failed"); + } else { + ops[op_idx].SetSuccess(); + VLOG(1) << "Successfully completed upsert for key " + << successful_keys[i]; + } + } + } + } + + // Process failed operations that need cleanup + if (!failed_keys.empty()) { + auto revoke_responses = + master_client_.BatchUpsertRevoke(failed_keys); + if (revoke_responses.size() != failed_keys.size()) { + LOG(ERROR) << "BatchUpsertRevoke response size mismatch: expected " + << failed_keys.size() << ", got " + << revoke_responses.size(); + for (size_t idx : failed_indices) { + ops[idx].SetError(ErrorCode::RPC_FAIL, + "BatchUpsertRevoke response size mismatch"); + } + } else { + for (size_t i = 0; i < revoke_responses.size(); ++i) { + const size_t op_idx = failed_indices[i]; + if (!revoke_responses[i]) { + LOG(ERROR) + << "Failed to revoke upsert for key " << failed_keys[i] + << ": " << toString(revoke_responses[i].error()); + std::string original_context = + ops[op_idx].failure_context.value_or("unknown error"); + ops[op_idx].failure_context = + original_context + "; revoke also failed"; + } else { + LOG(INFO) << "Successfully revoked failed upsert for key " + << failed_keys[i]; + } + } + } + } + + // Ensure all operations have definitive results + for (auto& op : ops) { + if (!op.IsResolved()) { + op.SetError(ErrorCode::INTERNAL_ERROR, + "Operation not resolved after finalization"); + LOG(ERROR) << "Operation for key " << op.key + << " was not properly resolved"; + } + } +} + std::vector> Client::CollectResults( const std::vector& ops) { std::vector> results; @@ -1573,7 +1826,9 @@ std::vector> Client::CollectResults( // Additional validation and logging for debugging if (!op.result.has_value()) { - // if error == object already exist, consider as ok + // If error == object already exist, consider as ok (Put semantics). + // UpsertStart never returns this error, so this branch is + // unreachable for BatchUpsert — kept for BatchPut compatibility. if (op.result.error() == ErrorCode::OBJECT_ALREADY_EXISTS) { results.back() = {}; continue; diff --git a/mooncake-store/src/dummy_client.cpp b/mooncake-store/src/dummy_client.cpp index 9a1083698..30005d1e8 100644 --- a/mooncake-store/src/dummy_client.cpp +++ b/mooncake-store/src/dummy_client.cpp @@ -542,6 +542,51 @@ int DummyClient::put_parts(const std::string& key, key, values, config, client_id_)); } +int DummyClient::upsert(const std::string& key, std::span value, + const ReplicateConfig& config) { + return to_py_ret(invoke_rpc<&RealClient::upsert_dummy_helper, void>( + key, value, config, client_id_)); +} + +int DummyClient::upsert_from(const std::string& key, void* buffer, size_t size, + const ReplicateConfig& config) { + uint64_t dummy_addr = reinterpret_cast(buffer); + return to_py_ret(invoke_rpc<&RealClient::upsert_from_dummy_helper, void>( + key, dummy_addr, size, config, client_id_)); +} + +std::vector DummyClient::batch_upsert_from( + const std::vector& keys, const std::vector& buffer_ptrs, + const std::vector& sizes, const ReplicateConfig& config) { + std::vector buffers; + for (auto ptr : buffer_ptrs) { + buffers.push_back(reinterpret_cast(ptr)); + } + auto internal_results = + invoke_batch_rpc<&RealClient::batch_upsert_from_dummy_helper, void>( + keys.size(), keys, buffers, sizes, config, client_id_); + std::vector results; + results.reserve(internal_results.size()); + for (const auto& result : internal_results) { + results.push_back(to_py_ret(result)); + } + return results; +} + +int DummyClient::upsert_parts(const std::string& key, + std::vector> values, + const ReplicateConfig& config) { + return to_py_ret(invoke_rpc<&RealClient::upsert_parts_dummy_helper, void>( + key, values, config, client_id_)); +} + +int DummyClient::upsert_batch(const std::vector& keys, + const std::vector>& values, + const ReplicateConfig& config) { + return to_py_ret(invoke_rpc<&RealClient::upsert_batch_dummy_helper, void>( + keys, values, config, client_id_)); +} + int DummyClient::remove(const std::string& key, bool force) { return to_py_ret( invoke_rpc<&RealClient::remove_internal, void>(key, force)); diff --git a/mooncake-store/src/master_client.cpp b/mooncake-store/src/master_client.cpp index 9fe6d5a3c..ce2417050 100644 --- a/mooncake-store/src/master_client.cpp +++ b/mooncake-store/src/master_client.cpp @@ -92,6 +92,36 @@ struct RpcNameTraits<&WrappedMasterService::BatchPutRevoke> { static constexpr const char* value = "BatchPutRevoke"; }; +template <> +struct RpcNameTraits<&WrappedMasterService::UpsertStart> { + static constexpr const char* value = "UpsertStart"; +}; + +template <> +struct RpcNameTraits<&WrappedMasterService::BatchUpsertStart> { + static constexpr const char* value = "BatchUpsertStart"; +}; + +template <> +struct RpcNameTraits<&WrappedMasterService::UpsertEnd> { + static constexpr const char* value = "UpsertEnd"; +}; + +template <> +struct RpcNameTraits<&WrappedMasterService::BatchUpsertEnd> { + static constexpr const char* value = "BatchUpsertEnd"; +}; + +template <> +struct RpcNameTraits<&WrappedMasterService::UpsertRevoke> { + static constexpr const char* value = "UpsertRevoke"; +}; + +template <> +struct RpcNameTraits<&WrappedMasterService::BatchUpsertRevoke> { + static constexpr const char* value = "BatchUpsertRevoke"; +}; + template <> struct RpcNameTraits<&WrappedMasterService::Remove> { static constexpr const char* value = "Remove"; @@ -533,6 +563,95 @@ std::vector> MasterClient::BatchPutRevoke( return result; } +tl::expected, ErrorCode> +MasterClient::UpsertStart(const std::string& key, + const std::vector& slice_lengths, + const ReplicateConfig& config) { + ScopedVLogTimer timer(1, "MasterClient::UpsertStart"); + timer.LogRequest("key=", key, ", slice_count=", slice_lengths.size()); + + uint64_t total_slice_length = 0; + for (const auto& slice_length : slice_lengths) { + total_slice_length += slice_length; + } + + auto result = invoke_rpc<&WrappedMasterService::UpsertStart, + std::vector>( + client_id_, key, total_slice_length, config); + timer.LogResponseExpected(result); + return result; +} + +std::vector, ErrorCode>> +MasterClient::BatchUpsertStart( + const std::vector& keys, + const std::vector>& slice_lengths, + const ReplicateConfig& config) { + ScopedVLogTimer timer(1, "MasterClient::BatchUpsertStart"); + timer.LogRequest("keys_count=", keys.size()); + + std::vector total_slice_lengths; + total_slice_lengths.reserve(slice_lengths.size()); + for (const auto& slice_lengths : slice_lengths) { + uint64_t total_slice_length = 0; + for (const auto& slice_length : slice_lengths) { + total_slice_length += slice_length; + } + total_slice_lengths.emplace_back(total_slice_length); + } + + auto result = invoke_batch_rpc<&WrappedMasterService::BatchUpsertStart, + std::vector>( + keys.size(), client_id_, keys, total_slice_lengths, config); + timer.LogResponse("result=", result.size(), " operations"); + return result; +} + +tl::expected MasterClient::UpsertEnd( + const std::string& key, ReplicaType replica_type) { + ScopedVLogTimer timer(1, "MasterClient::UpsertEnd"); + timer.LogRequest("key=", key); + + auto result = invoke_rpc<&WrappedMasterService::UpsertEnd, void>( + client_id_, key, replica_type); + timer.LogResponseExpected(result); + return result; +} + +std::vector> MasterClient::BatchUpsertEnd( + const std::vector& keys) { + ScopedVLogTimer timer(1, "MasterClient::BatchUpsertEnd"); + timer.LogRequest("keys_count=", keys.size()); + + auto result = invoke_batch_rpc<&WrappedMasterService::BatchUpsertEnd, void>( + keys.size(), client_id_, keys); + timer.LogResponse("result=", result.size(), " operations"); + return result; +} + +tl::expected MasterClient::UpsertRevoke( + const std::string& key, ReplicaType replica_type) { + ScopedVLogTimer timer(1, "MasterClient::UpsertRevoke"); + timer.LogRequest("key=", key); + + auto result = invoke_rpc<&WrappedMasterService::UpsertRevoke, void>( + client_id_, key, replica_type); + timer.LogResponseExpected(result); + return result; +} + +std::vector> MasterClient::BatchUpsertRevoke( + const std::vector& keys) { + ScopedVLogTimer timer(1, "MasterClient::BatchUpsertRevoke"); + timer.LogRequest("keys_count=", keys.size()); + + auto result = + invoke_batch_rpc<&WrappedMasterService::BatchUpsertRevoke, void>( + keys.size(), client_id_, keys); + timer.LogResponse("result=", result.size(), " operations"); + return result; +} + tl::expected MasterClient::Remove(const std::string& key, bool force) { ScopedVLogTimer timer(1, "MasterClient::Remove"); diff --git a/mooncake-store/src/master_service.cpp b/mooncake-store/src/master_service.cpp index 4ab4cdc3e..e0e12b739 100644 --- a/mooncake-store/src/master_service.cpp +++ b/mooncake-store/src/master_service.cpp @@ -729,6 +729,64 @@ auto MasterService::GetReplicaList(const std::string& key) default_kv_lease_ttl_); } +auto MasterService::AllocateAndInsertMetadata( + MetadataShardAccessorRW& shard, const UUID& client_id, + const std::string& key, uint64_t value_length, + const ReplicateConfig& config, + const std::chrono::system_clock::time_point& now) + -> tl::expected, ErrorCode> { + std::vector replicas; + { + ScopedAllocatorAccess allocator_access = + segment_manager_.getAllocatorAccess(); + const auto& allocator_manager = allocator_access.getAllocatorManager(); + + std::vector preferred_segments; + if (!config.preferred_segment.empty()) { + preferred_segments.push_back(config.preferred_segment); + } else if (!config.preferred_segments.empty()) { + preferred_segments = config.preferred_segments; + } + + auto allocation_result = allocation_strategy_->Allocate( + allocator_manager, value_length, config.replica_num, + preferred_segments); + + if (!allocation_result.has_value()) { + VLOG(1) << "Failed to allocate replicas for key=" << key + << ", error: " << allocation_result.error(); + if (allocation_result.error() == ErrorCode::INVALID_PARAMS) { + return tl::make_unexpected(ErrorCode::INVALID_PARAMS); + } + need_eviction_ = true; + return tl::make_unexpected(ErrorCode::NO_AVAILABLE_HANDLE); + } + + replicas = std::move(allocation_result.value()); + } + + if (use_disk_replica_) { + std::string file_path = + ResolvePathFromKey(key, root_fs_dir_, cluster_id_); + replicas.emplace_back(file_path, value_length, + ReplicaStatus::PROCESSING); + } + + std::vector replica_list; + replica_list.reserve(replicas.size()); + for (const auto& replica : replicas) { + replica_list.emplace_back(replica.get_descriptor()); + } + + shard->metadata.emplace( + std::piecewise_construct, std::forward_as_tuple(key), + std::forward_as_tuple(client_id, now, value_length, std::move(replicas), + config.with_soft_pin, config.with_hard_pin)); + shard->processing_keys.insert(key); + + return replica_list; +} + auto MasterService::PutStart(const UUID& client_id, const std::string& key, const uint64_t slice_length, const ReplicateConfig& config) @@ -740,8 +798,6 @@ auto MasterService::PutStart(const UUID& client_id, const std::string& key, return tl::make_unexpected(ErrorCode::INVALID_PARAMS); } - // Validate slice lengths - uint64_t total_length = 0; if ((memory_allocator_type_ == BufferAllocatorType::CACHELIB) && (slice_length > kMaxSliceSize)) { LOG(ERROR) << "key=" << key << ", slice_length=" << slice_length @@ -749,11 +805,9 @@ auto MasterService::PutStart(const UUID& client_id, const std::string& key, << ", error=invalid_slice_size"; return tl::make_unexpected(ErrorCode::INVALID_PARAMS); } - total_length += slice_length; - VLOG(1) << "key=" << key << ", value_length=" << total_length - << ", slice_length=" << slice_length << ", config=" << config - << ", action=put_start_begin"; + VLOG(1) << "key=" << key << ", value_length=" << slice_length + << ", config=" << config << ", action=put_start_begin"; std::shared_lock shared_lock(snapshot_mutex_); // Lock the shard and check if object already exists @@ -783,62 +837,8 @@ auto MasterService::PutStart(const UUID& client_id, const std::string& key, } } - // Allocate replicas - std::vector replicas; - { - ScopedAllocatorAccess allocator_access = - segment_manager_.getAllocatorAccess(); - const auto& allocator_manager = allocator_access.getAllocatorManager(); - - std::vector preferred_segments; - if (!config.preferred_segment.empty()) { - preferred_segments.push_back(config.preferred_segment); - } else if (!config.preferred_segments.empty()) { - preferred_segments = config.preferred_segments; - } - - auto allocation_result = allocation_strategy_->Allocate( - allocator_manager, slice_length, config.replica_num, - preferred_segments); - - if (!allocation_result.has_value()) { - VLOG(1) << "Failed to allocate all replicas for key=" << key - << ", error: " << allocation_result.error(); - if (allocation_result.error() == ErrorCode::INVALID_PARAMS) { - return tl::make_unexpected(ErrorCode::INVALID_PARAMS); - } - need_eviction_ = true; - return tl::make_unexpected(ErrorCode::NO_AVAILABLE_HANDLE); - } - - replicas = std::move(allocation_result.value()); - } - - // If disk replica is enabled, allocate a disk replica - if (use_disk_replica_) { - // Allocate a file path for the disk replica - std::string file_path = - ResolvePathFromKey(key, root_fs_dir_, cluster_id_); - replicas.emplace_back(file_path, total_length, - ReplicaStatus::PROCESSING); - } - - std::vector replica_list; - replica_list.reserve(replicas.size()); - for (const auto& replica : replicas) { - replica_list.emplace_back(replica.get_descriptor()); - } - - // No need to set lease here. The object will not be evicted until - // PutEnd is called. - shard->metadata.emplace( - std::piecewise_construct, std::forward_as_tuple(key), - std::forward_as_tuple(client_id, now, total_length, std::move(replicas), - config.with_soft_pin, config.with_hard_pin)); - // Also insert the metadata into processing set for monitoring. - shard->processing_keys.insert(key); - - return replica_list; + return AllocateAndInsertMetadata(shard, client_id, key, slice_length, + config, now); } auto MasterService::PutEnd(const UUID& client_id, const std::string& key, @@ -1010,6 +1010,227 @@ std::vector> MasterService::BatchPutRevoke( return results; } +// UpsertStart — insert-or-update entry point. +// +// Three-way dispatch depending on key state: +// Case A: key does not exist → allocate new buffers (same as PutStart) +// Case B: key exists, same size → in-place update (reuse existing buffers) +// Case C: key exists, different size → discard old + allocate new +// +// Before reaching Case B/C the function runs safety checks and may preempt +// an in-progress Put/Upsert on the same key. Preempted PROCESSING replicas +// are moved to discarded_replicas_ for delayed release (the previous writer +// may still be performing RDMA writes to those buffers). +// +// Note: during Case B the key is temporarily unreadable (all replicas are +// PROCESSING). Readers will get REPLICA_IS_NOT_READY until UpsertEnd. +auto MasterService::UpsertStart(const UUID& client_id, const std::string& key, + const uint64_t slice_length, + const ReplicateConfig& config) + -> tl::expected, ErrorCode> { + // --- Parameter validation (same as PutStart) --- + if (config.replica_num == 0 || key.empty() || slice_length == 0) { + LOG(ERROR) << "key=" << key << ", replica_num=" << config.replica_num + << ", slice_length=" << slice_length + << ", key_size=" << key.size() << ", error=invalid_params"; + return tl::make_unexpected(ErrorCode::INVALID_PARAMS); + } + + if ((memory_allocator_type_ == BufferAllocatorType::CACHELIB) && + (slice_length > kMaxSliceSize)) { + LOG(ERROR) << "key=" << key << ", slice_length=" << slice_length + << ", max_size=" << kMaxSliceSize + << ", error=invalid_slice_size"; + return tl::make_unexpected(ErrorCode::INVALID_PARAMS); + } + + VLOG(1) << "key=" << key << ", value_length=" << slice_length + << ", config=" << config << ", action=upsert_start_begin"; + + // --- Lock acquisition --- + // snapshot_mutex_ (shared): allows concurrent reads/writes, blocks only + // during full metadata snapshots. + // shard lock (exclusive via MetadataShardAccessorRW): serializes all + // operations on keys that hash to the same shard. + std::shared_lock shared_lock(snapshot_mutex_); + MetadataShardAccessorRW shard(this, getShardIndex(key)); + + const auto now = std::chrono::system_clock::now(); + auto it = shard->metadata.find(key); + + // --- Step 0: stale handle cleanup --- + // If all memory replicas point to unmounted segments (node crashed and + // restarted), the metadata is useless — erase it and treat as new key. + if (it != shard->metadata.end() && CleanupStaleHandles(it->second)) { + shard->processing_keys.erase(key); + shard->metadata.erase(it); + it = shard->metadata.end(); + } + + // --- Step 1: safety checks and preemption (only if key exists) --- + if (it != shard->metadata.end()) { + auto& metadata = it->second; + + // Reject if a Copy/Move task is actively reading this key's replicas. + // Writing during replication would corrupt the copy. + if (shard->replication_tasks.count(key) > 0) { + LOG(INFO) << "key=" << key << ", error=object_has_replication_task"; + return tl::make_unexpected(ErrorCode::OBJECT_HAS_REPLICATION_TASK); + } + + // Reject if an offload-to-disk task is in progress (same reason). + if (shard->offloading_tasks.count(key) > 0) { + LOG(INFO) << "key=" << key << ", error=object_has_offloading_task"; + return tl::make_unexpected(ErrorCode::OBJECT_HAS_REPLICATION_TASK); + } + + // Preempt an in-progress Put/Upsert on the same key. The previous + // writer's PROCESSING replicas are moved to discarded_replicas_ with a + // TTL so they are not freed while the old writer may still be doing + // RDMA writes. Unlike PutStart (which only preempts after a timeout), + // UpsertStart preempts immediately. + if (shard->processing_keys.count(key) > 0) { + auto processing_replicas = + metadata.PopReplicas(&Replica::fn_is_processing); + if (!processing_replicas.empty()) { + std::lock_guard lock(discarded_replicas_mutex_); + discarded_replicas_.emplace_back( + std::move(processing_replicas), + now + put_start_release_timeout_sec_); + } + shard->processing_keys.erase(key); + + // If no COMPLETE replicas survive the preemption, this key + // effectively does not exist — fall through to Case A. + if (!metadata.HasReplica(&Replica::fn_is_completed)) { + shard->metadata.erase(it); + it = shard->metadata.end(); + } + } + } + + // --- Case A: key does not exist (or was erased above) --- + // Allocate fresh buffers, identical to PutStart. + if (it == shard->metadata.end()) { + VLOG(1) << "key=" << key << ", action=upsert_start_case_a"; + return AllocateAndInsertMetadata(shard, client_id, key, slice_length, + config, now); + } + + // --- Step 2: key exists with COMPLETE replicas → Case B or C --- + auto& metadata = it->second; + + // Reject if any reader holds a reference (refcnt > 0). Overwriting a + // buffer that an RDMA read is streaming from would cause data corruption. + // The client should retry after readers finish. + if (metadata.HasReplica(&Replica::fn_is_busy)) { + LOG(INFO) << "key=" << key << ", error=object_replica_busy"; + return tl::make_unexpected(ErrorCode::OBJECT_REPLICA_BUSY); + } + + if (metadata.size == slice_length) { + // --- Case B: same size — in-place update --- + // Reuse existing buffer addresses. No allocation or deallocation. + // The client will RDMA-write new data to the same addresses. + // + // hard_pinned is const and preserved automatically — upsert does not + // change the eviction protection level of an existing object. + metadata.client_id = client_id; + metadata.put_start_time = now; + + // Reconcile soft_pin state with the incoming config. + { + SpinLocker locker(&metadata.lock); + if (config.with_soft_pin && !metadata.soft_pin_timeout) { + metadata.soft_pin_timeout.emplace(); + MasterMetricManager::instance().inc_soft_pin_key_count(1); + } else if (!config.with_soft_pin && metadata.soft_pin_timeout) { + metadata.soft_pin_timeout.reset(); + MasterMetricManager::instance().dec_soft_pin_key_count(1); + } + } + + // Mark COMPLETE → PROCESSING so readers won't see stale data + // mid-transfer. The key becomes unreadable until UpsertEnd. + metadata.VisitReplicas(&Replica::fn_is_completed, [](Replica& replica) { + replica.mark_processing(); + }); + + shard->processing_keys.insert(key); + + // Return the existing descriptors — same buffer addresses as before. + std::vector replica_list; + const auto& all_replicas = metadata.GetAllReplicas(); + replica_list.reserve(all_replicas.size()); + for (const auto& replica : all_replicas) { + replica_list.emplace_back(replica.get_descriptor()); + } + + VLOG(1) << "key=" << key << ", action=upsert_start_case_b_inplace"; + return replica_list; + } + + // --- Case C: different size — discard old replicas and reallocate --- + // Old buffers cannot be reused. Move them to discarded_replicas_ for + // delayed release (readers may still hold descriptors without refcnt), + // then allocate fresh buffers at the new size. + auto old_replicas = metadata.PopReplicas(); + if (!old_replicas.empty()) { + std::lock_guard lock(discarded_replicas_mutex_); + discarded_replicas_.emplace_back(std::move(old_replicas), + now + put_start_release_timeout_sec_); + } + shard->metadata.erase(it); + + VLOG(1) << "key=" << key << ", action=upsert_start_case_c_reallocate"; + return AllocateAndInsertMetadata(shard, client_id, key, slice_length, + config, now); +} + +auto MasterService::UpsertEnd(const UUID& client_id, const std::string& key, + ReplicaType replica_type) + -> tl::expected { + return PutEnd(client_id, key, replica_type); +} + +auto MasterService::UpsertRevoke(const UUID& client_id, const std::string& key, + ReplicaType replica_type) + -> tl::expected { + return PutRevoke(client_id, key, replica_type); +} + +std::vector, ErrorCode>> +MasterService::BatchUpsertStart(const UUID& client_id, + const std::vector& keys, + const std::vector& slice_lengths, + const ReplicateConfig& config) { + if (keys.size() != slice_lengths.size()) { + LOG(ERROR) << "BatchUpsertStart: keys.size()=" << keys.size() + << " != slice_lengths.size()=" << slice_lengths.size(); + return std::vector< + tl::expected, ErrorCode>>( + keys.size(), tl::make_unexpected(ErrorCode::INVALID_PARAMS)); + } + std::vector, ErrorCode>> + results; + results.reserve(keys.size()); + for (size_t i = 0; i < keys.size(); ++i) { + results.emplace_back( + UpsertStart(client_id, keys[i], slice_lengths[i], config)); + } + return results; +} + +std::vector> MasterService::BatchUpsertEnd( + const UUID& client_id, const std::vector& keys) { + return BatchPutEnd(client_id, keys); +} + +std::vector> MasterService::BatchUpsertRevoke( + const UUID& client_id, const std::vector& keys) { + return BatchPutRevoke(client_id, keys); +} + auto MasterService::EvictDiskReplica(const UUID& client_id, const std::string& key, ReplicaType replica_type) diff --git a/mooncake-store/src/real_client.cpp b/mooncake-store/src/real_client.cpp index 489ceca17..162b2a3a5 100644 --- a/mooncake-store/src/real_client.cpp +++ b/mooncake-store/src/real_client.cpp @@ -2220,6 +2220,396 @@ int RealClient::put_from(const std::string &key, void *buffer, size_t size, return to_py_ret(put_from_internal(key, buffer, size, config)); } +// --- Upsert implementations --- + +tl::expected RealClient::upsert_internal( + const std::string &key, std::span value, + const ReplicateConfig &config, + std::shared_ptr client_buffer_allocator) { + if (config.prefer_alloc_in_same_node) { + LOG(ERROR) << "prefer_alloc_in_same_node is not supported."; + return tl::unexpected(ErrorCode::INVALID_PARAMS); + } + if (!client_) { + LOG(ERROR) << "Client is not initialized"; + return tl::unexpected(ErrorCode::INVALID_PARAMS); + } + if (!client_buffer_allocator) { + LOG(ERROR) << "Client buffer allocator is not provided"; + return tl::unexpected(ErrorCode::INVALID_PARAMS); + } + auto alloc_result = client_buffer_allocator->allocate(value.size_bytes()); + if (!alloc_result) { + LOG(ERROR) << "Failed to allocate buffer for upsert operation, key: " + << key << ", value size: " << value.size(); + return tl::unexpected(ErrorCode::INVALID_PARAMS); + } + auto &buffer_handle = *alloc_result; + memcpy(buffer_handle.ptr(), value.data(), value.size_bytes()); + + std::vector slices = split_into_slices(buffer_handle); + + auto result = client_->Upsert(key, slices, config); + if (!result) { + return tl::unexpected(result.error()); + } + return {}; +} + +int RealClient::upsert(const std::string &key, std::span value, + const ReplicateConfig &config) { + return to_py_ret( + upsert_internal(key, value, config, client_buffer_allocator_)); +} + +tl::expected RealClient::upsert_dummy_helper( + const std::string &key, std::span value, + const ReplicateConfig &config, const UUID &client_id) { + std::shared_lock lock(dummy_client_mutex_); + auto it = shm_contexts_.find(client_id); + if (it == shm_contexts_.end()) { + LOG(ERROR) << "client_id=" << client_id << ", error=shm_not_mapped"; + return tl::unexpected(ErrorCode::INVALID_PARAMS); + } + auto &context = it->second; + return upsert_internal(key, value, config, context.client_buffer_allocator); +} + +tl::expected RealClient::upsert_from_internal( + const std::string &key, void *buffer, size_t size, + const ReplicateConfig &config) { + if (config.prefer_alloc_in_same_node) { + LOG(ERROR) << "prefer_alloc_in_same_node is not supported."; + return tl::unexpected(ErrorCode::INVALID_PARAMS); + } + if (!client_) { + LOG(ERROR) << "Client is not initialized"; + return tl::unexpected(ErrorCode::INVALID_PARAMS); + } + if (size == 0) { + LOG(WARNING) << "Attempting to upsert empty data for key: " << key; + return {}; + } + + std::vector slices; + uint64_t offset = 0; + while (offset < size) { + auto chunk_size = std::min(size - offset, kMaxSliceSize); + void *chunk_ptr = static_cast(buffer) + offset; + slices.emplace_back(Slice{chunk_ptr, chunk_size}); + offset += chunk_size; + } + + auto result = client_->Upsert(key, slices, config); + if (!result) { + return tl::unexpected(result.error()); + } + return {}; +} + +int RealClient::upsert_from(const std::string &key, void *buffer, size_t size, + const ReplicateConfig &config) { + return to_py_ret(upsert_from_internal(key, buffer, size, config)); +} + +std::vector> +RealClient::batch_upsert_from_internal(const std::vector &keys, + const std::vector &buffers, + const std::vector &sizes, + const ReplicateConfig &config) { + if (config.prefer_alloc_in_same_node) { + LOG(ERROR) << "prefer_alloc_in_same_node is not supported."; + return std::vector>( + keys.size(), tl::unexpected(ErrorCode::INVALID_PARAMS)); + } + if (!client_) { + LOG(ERROR) << "Client is not initialized"; + return std::vector>( + keys.size(), tl::unexpected(ErrorCode::INVALID_PARAMS)); + } + if (keys.size() != buffers.size() || keys.size() != sizes.size()) { + LOG(ERROR) << "Mismatched sizes for keys, buffers, and sizes"; + return std::vector>( + keys.size(), tl::unexpected(ErrorCode::INVALID_PARAMS)); + } + + std::vector> ordered_batched_slices; + ordered_batched_slices.reserve(keys.size()); + + for (size_t i = 0; i < keys.size(); ++i) { + std::vector slices; + uint64_t offset = 0; + while (offset < sizes[i]) { + auto chunk_size = std::min(sizes[i] - offset, kMaxSliceSize); + void *chunk_ptr = static_cast(buffers[i]) + offset; + slices.emplace_back(Slice{chunk_ptr, chunk_size}); + offset += chunk_size; + } + ordered_batched_slices.emplace_back(std::move(slices)); + } + + return client_->BatchUpsert(keys, ordered_batched_slices, config); +} + +std::vector RealClient::batch_upsert_from( + const std::vector &keys, const std::vector &buffers, + const std::vector &sizes, const ReplicateConfig &config) { + auto internal_results = + batch_upsert_from_internal(keys, buffers, sizes, config); + std::vector results; + results.reserve(internal_results.size()); + for (const auto &result : internal_results) { + results.push_back(to_py_ret(result)); + } + return results; +} + +tl::expected RealClient::upsert_from_dummy_helper( + const std::string &key, uint64_t dummy_buffer, size_t size, + const ReplicateConfig &config, const UUID &client_id) { + std::shared_lock lock(dummy_client_mutex_); + auto it = shm_contexts_.find(client_id); + if (it == shm_contexts_.end()) { + LOG(ERROR) << "client_id=" << client_id << ", error=shm_not_mapped"; + return tl::unexpected(ErrorCode::INVALID_PARAMS); + } + auto &context = it->second; + + for (const auto &shm : context.mapped_shms) { + if (dummy_buffer >= shm.dummy_base_addr && + dummy_buffer + size <= shm.dummy_base_addr + shm.shm_size) { + void *real_buffer = reinterpret_cast( + dummy_buffer + shm.shm_addr_offset); + return upsert_from_internal(key, real_buffer, size, config); + } + } + + LOG(ERROR) << "Dummy buffer at " << dummy_buffer << " (size " << size + << ") not found in any mapped shared memory for client " + << client_id; + return tl::unexpected(ErrorCode::INVALID_PARAMS); +} + +std::vector> +RealClient::batch_upsert_from_dummy_helper( + const std::vector &keys, + const std::vector &dummy_buffers, + const std::vector &sizes, const ReplicateConfig &config, + const UUID &client_id) { + std::shared_lock lock(dummy_client_mutex_); + auto it = shm_contexts_.find(client_id); + if (it == shm_contexts_.end()) { + LOG(ERROR) << "client_id=" << client_id << ", error=shm_not_mapped"; + return std::vector>( + keys.size(), tl::unexpected(ErrorCode::INVALID_PARAMS)); + } + auto &context = it->second; + + std::vector buffers; + buffers.reserve(dummy_buffers.size()); + const MappedShm *last_hit_shm = nullptr; + + for (size_t i = 0; i < dummy_buffers.size(); ++i) { + uint64_t dummy_addr = dummy_buffers[i]; + size_t size = sizes[i]; + bool found = false; + + if (last_hit_shm && dummy_addr >= last_hit_shm->dummy_base_addr && + dummy_addr + size <= + last_hit_shm->dummy_base_addr + last_hit_shm->shm_size) { + buffers.push_back(reinterpret_cast( + dummy_addr + last_hit_shm->shm_addr_offset)); + found = true; + } else { + for (const auto &shm : context.mapped_shms) { + if (dummy_addr >= shm.dummy_base_addr && + dummy_addr + size <= shm.dummy_base_addr + shm.shm_size) { + buffers.push_back(reinterpret_cast( + dummy_addr + shm.shm_addr_offset)); + found = true; + last_hit_shm = &shm; + break; + } + } + } + + if (!found) { + LOG(ERROR) << "Dummy buffer at " << dummy_addr << " (size " << size + << ") not found in any mapped shared memory for client " + << client_id; + return std::vector>( + keys.size(), tl::unexpected(ErrorCode::INVALID_PARAMS)); + } + } + return batch_upsert_from_internal(keys, buffers, sizes, config); +} + +tl::expected RealClient::upsert_parts_internal( + const std::string &key, std::vector> values, + const ReplicateConfig &config, + std::shared_ptr client_buffer_allocator) { + if (config.prefer_alloc_in_same_node) { + LOG(ERROR) << "prefer_alloc_in_same_node is not supported."; + return tl::unexpected(ErrorCode::INVALID_PARAMS); + } + if (!client_) { + LOG(ERROR) << "Client is not initialized"; + return tl::unexpected(ErrorCode::INVALID_PARAMS); + } + if (!client_buffer_allocator) { + LOG(ERROR) << "Client buffer allocator is not provided"; + return tl::unexpected(ErrorCode::INVALID_PARAMS); + } + + size_t total_size = 0; + for (const auto &value : values) { + total_size += value.size_bytes(); + } + if (total_size == 0) { + LOG(WARNING) << "Attempting to upsert empty data for key: " << key; + return {}; + } + + auto alloc_result = client_buffer_allocator->allocate(total_size); + if (!alloc_result) { + LOG(ERROR) + << "Failed to allocate buffer for upsert_parts operation, key: " + << key << ", total size: " << total_size; + return tl::unexpected(ErrorCode::INVALID_PARAMS); + } + + auto &buffer_handle = *alloc_result; + size_t offset = 0; + for (const auto &value : values) { + memcpy(static_cast(buffer_handle.ptr()) + offset, value.data(), + value.size_bytes()); + offset += value.size_bytes(); + } + + std::vector slices = split_into_slices(buffer_handle); + + auto result = client_->Upsert(key, slices, config); + if (!result) { + LOG(ERROR) << "Upsert operation failed with error: " + << toString(result.error()); + return tl::unexpected(result.error()); + } + return {}; +} + +int RealClient::upsert_parts(const std::string &key, + std::vector> values, + const ReplicateConfig &config) { + return to_py_ret( + upsert_parts_internal(key, values, config, client_buffer_allocator_)); +} + +tl::expected RealClient::upsert_parts_dummy_helper( + const std::string &key, std::vector> values, + const ReplicateConfig &config, const UUID &client_id) { + std::shared_lock lock(dummy_client_mutex_); + auto it = shm_contexts_.find(client_id); + if (it == shm_contexts_.end()) { + LOG(ERROR) << "client_id=" << client_id << ", error=shm_not_mapped"; + return tl::unexpected(ErrorCode::INVALID_PARAMS); + } + auto &context = it->second; + return upsert_parts_internal(key, values, config, + context.client_buffer_allocator); +} + +tl::expected RealClient::upsert_batch_internal( + const std::vector &keys, + const std::vector> &values, + const ReplicateConfig &config, + std::shared_ptr client_buffer_allocator) { + if (config.prefer_alloc_in_same_node) { + LOG(ERROR) << "prefer_alloc_in_same_node is not supported."; + return tl::unexpected(ErrorCode::INVALID_PARAMS); + } + if (!client_) { + LOG(ERROR) << "Client is not initialized"; + return tl::unexpected(ErrorCode::INVALID_PARAMS); + } + if (keys.size() != values.size()) { + LOG(ERROR) << "Key and value size mismatch"; + return tl::unexpected(ErrorCode::INVALID_PARAMS); + } + if (!client_buffer_allocator) { + LOG(ERROR) << "Client buffer allocator is not provided"; + return tl::unexpected(ErrorCode::INVALID_PARAMS); + } + std::vector buffer_handles; + std::unordered_map> batched_slices; + batched_slices.reserve(keys.size()); + + for (size_t i = 0; i < keys.size(); ++i) { + auto &key = keys[i]; + auto &value = values[i]; + auto alloc_result = + client_buffer_allocator->allocate(value.size_bytes()); + if (!alloc_result) { + LOG(ERROR) + << "Failed to allocate buffer for upsert_batch operation, key: " + << key << ", value size: " << value.size(); + return tl::unexpected(ErrorCode::INVALID_PARAMS); + } + auto &buffer_handle = *alloc_result; + memcpy(buffer_handle.ptr(), value.data(), value.size_bytes()); + auto slices = split_into_slices(buffer_handle); + buffer_handles.emplace_back(std::move(*alloc_result)); + batched_slices.emplace(key, std::move(slices)); + } + + // Convert unordered_map to vector format expected by BatchUpsert + std::vector> ordered_batched_slices; + ordered_batched_slices.reserve(keys.size()); + for (const auto &key : keys) { + auto it = batched_slices.find(key); + if (it != batched_slices.end()) { + ordered_batched_slices.emplace_back(it->second); + } else { + LOG(ERROR) << "Missing slices for key: " << key; + return tl::unexpected(ErrorCode::INVALID_PARAMS); + } + } + + auto results = client_->BatchUpsert(keys, ordered_batched_slices, config); + + // Check if any operations failed + for (size_t i = 0; i < results.size(); ++i) { + if (!results[i]) { + return tl::unexpected(results[i].error()); + } + } + return {}; +} + +tl::expected RealClient::upsert_batch_dummy_helper( + const std::vector &keys, + const std::vector> &values, + const ReplicateConfig &config, const UUID &client_id) { + std::shared_lock lock(dummy_client_mutex_); + auto it = shm_contexts_.find(client_id); + if (it == shm_contexts_.end()) { + LOG(ERROR) << "client_id=" << client_id << ", error=shm_not_mapped"; + return tl::unexpected(ErrorCode::INVALID_PARAMS); + } + auto &context = it->second; + + return upsert_batch_internal(keys, values, config, + context.client_buffer_allocator); +} + +int RealClient::upsert_batch(const std::vector &keys, + const std::vector> &values, + const ReplicateConfig &config) { + return to_py_ret( + upsert_batch_internal(keys, values, config, client_buffer_allocator_)); +} + +// --- End Upsert implementations --- + std::vector RealClient::batch_get_into( const std::vector &keys, const std::vector &buffers, const std::vector &sizes) { diff --git a/mooncake-store/src/real_client_main.cpp b/mooncake-store/src/real_client_main.cpp index c5a9c14ab..ca3150f5e 100644 --- a/mooncake-store/src/real_client_main.cpp +++ b/mooncake-store/src/real_client_main.cpp @@ -37,10 +37,19 @@ void RegisterClientRpcService(coro_rpc::coro_rpc_server &server, server.register_handler<&RealClient::getSize_internal>(&real_client); server.register_handler<&RealClient::batch_put_from_dummy_helper>( &real_client); - server.register_handler<&RealClient::batch_get_into_dummy_helper>( - &real_client); server.register_handler< &RealClient::batch_put_from_multi_buffers_dummy_helper>(&real_client); + server.register_handler<&RealClient::upsert_dummy_helper>(&real_client); + server.register_handler<&RealClient::upsert_from_dummy_helper>( + &real_client); + server.register_handler<&RealClient::upsert_parts_dummy_helper>( + &real_client); + server.register_handler<&RealClient::batch_upsert_from_dummy_helper>( + &real_client); + server.register_handler<&RealClient::upsert_batch_dummy_helper>( + &real_client); + server.register_handler<&RealClient::batch_get_into_dummy_helper>( + &real_client); server.register_handler< &RealClient::batch_get_into_multi_buffers_dummy_helper>(&real_client); server.register_handler<&RealClient::map_shm_internal>(&real_client); diff --git a/mooncake-store/src/rpc_service.cpp b/mooncake-store/src/rpc_service.cpp index d80498f86..49583457c 100644 --- a/mooncake-store/src/rpc_service.cpp +++ b/mooncake-store/src/rpc_service.cpp @@ -653,6 +653,155 @@ std::vector> WrappedMasterService::BatchPutRevoke( return results; } +tl::expected, ErrorCode> +WrappedMasterService::UpsertStart(const UUID& client_id, const std::string& key, + const uint64_t slice_length, + const ReplicateConfig& config) { + return execute_rpc( + "UpsertStart", + [&] { + return master_service_.UpsertStart(client_id, key, slice_length, + config); + }, + [&](auto& timer) { + timer.LogRequest("client_id=", client_id, ", key=", key, + ", slice_length=", slice_length); + }, + [&] { MasterMetricManager::instance().inc_put_start_requests(); }, + [] { MasterMetricManager::instance().inc_put_start_failures(); }); +} + +tl::expected WrappedMasterService::UpsertEnd( + const UUID& client_id, const std::string& key, ReplicaType replica_type) { + return execute_rpc( + "UpsertEnd", + [&] { return master_service_.UpsertEnd(client_id, key, replica_type); }, + [&](auto& timer) { + timer.LogRequest("client_id=", client_id, ", key=", key, + ", replica_type=", replica_type); + }, + [] { MasterMetricManager::instance().inc_put_end_requests(); }, + [] { MasterMetricManager::instance().inc_put_end_failures(); }); +} + +tl::expected WrappedMasterService::UpsertRevoke( + const UUID& client_id, const std::string& key, ReplicaType replica_type) { + return execute_rpc( + "UpsertRevoke", + [&] { + return master_service_.UpsertRevoke(client_id, key, replica_type); + }, + [&](auto& timer) { + timer.LogRequest("client_id=", client_id, ", key=", key, + ", replica_type=", replica_type); + }, + [] { MasterMetricManager::instance().inc_put_revoke_requests(); }, + [] { MasterMetricManager::instance().inc_put_revoke_failures(); }); +} + +std::vector, ErrorCode>> +WrappedMasterService::BatchUpsertStart( + const UUID& client_id, const std::vector& keys, + const std::vector& slice_lengths, const ReplicateConfig& config) { + ScopedVLogTimer timer(1, "BatchUpsertStart"); + const size_t total_keys = keys.size(); + timer.LogRequest("client_id=", client_id, ", keys_count=", total_keys); + MasterMetricManager::instance().inc_batch_put_start_requests(total_keys); + + auto results = master_service_.BatchUpsertStart(client_id, keys, + slice_lengths, config); + + size_t failure_count = 0; + for (size_t i = 0; i < results.size(); ++i) { + if (!results[i].has_value()) { + failure_count++; + auto error = results[i].error(); + LOG(ERROR) << "BatchUpsertStart failed for key[" << i << "] '" + << keys[i] << "': " << toString(error); + } + } + + if (failure_count == total_keys) { + MasterMetricManager::instance().inc_batch_put_start_failures( + failure_count); + } else if (failure_count != 0) { + MasterMetricManager::instance().inc_batch_put_start_partial_success( + failure_count); + } + + timer.LogResponse("total=", results.size(), + ", success=", results.size() - failure_count, + ", failures=", failure_count); + return results; +} + +std::vector> WrappedMasterService::BatchUpsertEnd( + const UUID& client_id, const std::vector& keys) { + ScopedVLogTimer timer(1, "BatchUpsertEnd"); + const size_t total_keys = keys.size(); + timer.LogRequest("client_id=", client_id, ", keys_count=", total_keys); + MasterMetricManager::instance().inc_batch_put_end_requests(total_keys); + + auto results = master_service_.BatchUpsertEnd(client_id, keys); + + size_t failure_count = 0; + for (size_t i = 0; i < results.size(); ++i) { + if (!results[i].has_value()) { + failure_count++; + auto error = results[i].error(); + LOG(ERROR) << "BatchUpsertEnd failed for key[" << i << "] '" + << keys[i] << "': " << toString(error); + } + } + + if (failure_count == total_keys) { + MasterMetricManager::instance().inc_batch_put_end_failures( + failure_count); + } else if (failure_count != 0) { + MasterMetricManager::instance().inc_batch_put_end_partial_success( + failure_count); + } + + timer.LogResponse("total=", results.size(), + ", success=", results.size() - failure_count, + ", failures=", failure_count); + return results; +} + +std::vector> +WrappedMasterService::BatchUpsertRevoke(const UUID& client_id, + const std::vector& keys) { + ScopedVLogTimer timer(1, "BatchUpsertRevoke"); + const size_t total_keys = keys.size(); + timer.LogRequest("client_id=", client_id, ", keys_count=", total_keys); + MasterMetricManager::instance().inc_batch_put_revoke_requests(total_keys); + + auto results = master_service_.BatchUpsertRevoke(client_id, keys); + + size_t failure_count = 0; + for (size_t i = 0; i < results.size(); ++i) { + if (!results[i].has_value()) { + failure_count++; + auto error = results[i].error(); + LOG(ERROR) << "BatchUpsertRevoke failed for key[" << i << "] '" + << keys[i] << "': " << toString(error); + } + } + + if (failure_count == total_keys) { + MasterMetricManager::instance().inc_batch_put_revoke_failures( + failure_count); + } else if (failure_count != 0) { + MasterMetricManager::instance().inc_batch_put_revoke_partial_success( + failure_count); + } + + timer.LogResponse("total=", results.size(), + ", success=", results.size() - failure_count, + ", failures=", failure_count); + return results; +} + tl::expected WrappedMasterService::Remove( const std::string& key, bool force) { return execute_rpc( @@ -1048,6 +1197,18 @@ void RegisterRpcService( &wrapped_master_service); server.register_handler<&mooncake::WrappedMasterService::BatchPutRevoke>( &wrapped_master_service); + server.register_handler<&mooncake::WrappedMasterService::UpsertStart>( + &wrapped_master_service); + server.register_handler<&mooncake::WrappedMasterService::UpsertEnd>( + &wrapped_master_service); + server.register_handler<&mooncake::WrappedMasterService::UpsertRevoke>( + &wrapped_master_service); + server.register_handler<&mooncake::WrappedMasterService::BatchUpsertStart>( + &wrapped_master_service); + server.register_handler<&mooncake::WrappedMasterService::BatchUpsertEnd>( + &wrapped_master_service); + server.register_handler<&mooncake::WrappedMasterService::BatchUpsertRevoke>( + &wrapped_master_service); server.register_handler<&mooncake::WrappedMasterService::Remove>( &wrapped_master_service); server.register_handler<&mooncake::WrappedMasterService::RemoveByRegex>( diff --git a/mooncake-store/src/types.cpp b/mooncake-store/src/types.cpp index 733f6b5b5..f64da3c0e 100644 --- a/mooncake-store/src/types.cpp +++ b/mooncake-store/src/types.cpp @@ -34,6 +34,7 @@ const std::string& toString(ErrorCode errorCode) noexcept { {ErrorCode::REPLICA_NOT_FOUND, "REPLICA_NOT_FOUND"}, {ErrorCode::REPLICA_ALREADY_EXISTS, "REPLICA_ALREADY_EXISTS"}, {ErrorCode::REPLICA_IS_GONE, "REPLICA_IS_GONE"}, + {ErrorCode::OBJECT_REPLICA_BUSY, "OBJECT_REPLICA_BUSY"}, {ErrorCode::TRANSFER_FAIL, "TRANSFER_FAIL"}, {ErrorCode::RPC_FAIL, "RPC_FAIL"}, {ErrorCode::ETCD_OPERATION_ERROR, "ETCD_OPERATION_ERROR"}, diff --git a/mooncake-store/tests/client_integration_test.cpp b/mooncake-store/tests/client_integration_test.cpp index ddf1f183f..084fc0a11 100644 --- a/mooncake-store/tests/client_integration_test.cpp +++ b/mooncake-store/tests/client_integration_test.cpp @@ -1460,6 +1460,278 @@ TEST_F(EvictionNotificationTest, DiskReplicaRemovedAfterEviction) { seg_ptr_ = nullptr; } +// Test Upsert Case A: key does not exist — equivalent to Put +TEST_F(ClientIntegrationTest, UpsertNewKey) { + const std::string test_data = "upsert_new_key_data"; + const std::string key = "upsert_case_a_key"; + void* buffer = client_buffer_allocator_->allocate(test_data.size()); + ASSERT_NE(buffer, nullptr); + + memcpy(buffer, test_data.data(), test_data.size()); + std::vector slices; + slices.emplace_back(Slice{buffer, test_data.size()}); + + ReplicateConfig config; + config.replica_num = 1; + + // Upsert on a non-existent key should succeed (Case A) + auto upsert_result = test_client_->Upsert(key, slices, config); + ASSERT_TRUE(upsert_result.has_value()) + << "Upsert (Case A) failed: " << toString(upsert_result.error()); + client_buffer_allocator_->deallocate(buffer, test_data.size()); + + // Verify data through Get + buffer = client_buffer_allocator_->allocate(test_data.size()); + slices.clear(); + slices.emplace_back(Slice{buffer, test_data.size()}); + auto get_result = test_client_->Get(key, slices); + ASSERT_TRUE(get_result.has_value()) + << "Get after Upsert failed: " << toString(get_result.error()); + ASSERT_EQ(memcmp(slices[0].ptr, test_data.data(), test_data.size()), 0); + client_buffer_allocator_->deallocate(buffer, test_data.size()); + + // Clean up + std::this_thread::sleep_for( + std::chrono::milliseconds(default_kv_lease_ttl_)); + auto remove_result = test_client_->Remove(key); + ASSERT_TRUE(remove_result.has_value()) + << "Remove failed: " << toString(remove_result.error()); +} + +// Test Upsert Case B: key exists, same size — in-place update +TEST_F(ClientIntegrationTest, UpsertSameSize) { + const std::string key = "upsert_case_b_key"; + const size_t data_size = 64; + + // Initial data: all 'A' + std::string initial_data(data_size, 'A'); + void* buffer = client_buffer_allocator_->allocate(data_size); + ASSERT_NE(buffer, nullptr); + memcpy(buffer, initial_data.data(), data_size); + std::vector slices; + slices.emplace_back(Slice{buffer, data_size}); + + ReplicateConfig config; + config.replica_num = 1; + + // First Put to create the key + auto put_result = test_client_->Put(key, slices, config); + ASSERT_TRUE(put_result.has_value()) + << "Initial Put failed: " << toString(put_result.error()); + client_buffer_allocator_->deallocate(buffer, data_size); + + // Wait for lease to expire so refcnt drops to 0 + std::this_thread::sleep_for( + std::chrono::milliseconds(default_kv_lease_ttl_)); + + // Upsert with same size but different content: all 'B' + std::string updated_data(data_size, 'B'); + buffer = client_buffer_allocator_->allocate(data_size); + ASSERT_NE(buffer, nullptr); + memcpy(buffer, updated_data.data(), data_size); + slices.clear(); + slices.emplace_back(Slice{buffer, data_size}); + + auto upsert_result = test_client_->Upsert(key, slices, config); + ASSERT_TRUE(upsert_result.has_value()) + << "Upsert (Case B) failed: " << toString(upsert_result.error()); + client_buffer_allocator_->deallocate(buffer, data_size); + + // Verify the data was updated + buffer = client_buffer_allocator_->allocate(data_size); + slices.clear(); + slices.emplace_back(Slice{buffer, data_size}); + auto get_result = test_client_->Get(key, slices); + ASSERT_TRUE(get_result.has_value()) + << "Get after Upsert failed: " << toString(get_result.error()); + ASSERT_EQ(memcmp(slices[0].ptr, updated_data.data(), data_size), 0) + << "Data should be updated to 'B's after in-place upsert"; + client_buffer_allocator_->deallocate(buffer, data_size); + + // Clean up + std::this_thread::sleep_for( + std::chrono::milliseconds(default_kv_lease_ttl_)); + auto remove_result = test_client_->Remove(key); + ASSERT_TRUE(remove_result.has_value()) + << "Remove failed: " << toString(remove_result.error()); +} + +// Test Upsert Case C: key exists, different size — delete and reallocate +TEST_F(ClientIntegrationTest, UpsertDifferentSize) { + const std::string key = "upsert_case_c_key"; + const size_t initial_size = 64; + const size_t updated_size = 128; + + // Initial data: 64 bytes of 'X' + std::string initial_data(initial_size, 'X'); + void* buffer = client_buffer_allocator_->allocate(initial_size); + ASSERT_NE(buffer, nullptr); + memcpy(buffer, initial_data.data(), initial_size); + std::vector slices; + slices.emplace_back(Slice{buffer, initial_size}); + + ReplicateConfig config; + config.replica_num = 1; + + auto put_result = test_client_->Put(key, slices, config); + ASSERT_TRUE(put_result.has_value()) + << "Initial Put failed: " << toString(put_result.error()); + client_buffer_allocator_->deallocate(buffer, initial_size); + + // Wait for lease to expire so refcnt drops to 0 + std::this_thread::sleep_for( + std::chrono::milliseconds(default_kv_lease_ttl_)); + + // Upsert with different (larger) size: 128 bytes of 'Y' + std::string updated_data(updated_size, 'Y'); + buffer = client_buffer_allocator_->allocate(updated_size); + ASSERT_NE(buffer, nullptr); + memcpy(buffer, updated_data.data(), updated_size); + slices.clear(); + slices.emplace_back(Slice{buffer, updated_size}); + + auto upsert_result = test_client_->Upsert(key, slices, config); + ASSERT_TRUE(upsert_result.has_value()) + << "Upsert (Case C) failed: " << toString(upsert_result.error()); + client_buffer_allocator_->deallocate(buffer, updated_size); + + // Verify the data was updated with new size + buffer = client_buffer_allocator_->allocate(updated_size); + slices.clear(); + slices.emplace_back(Slice{buffer, updated_size}); + auto get_result = test_client_->Get(key, slices); + ASSERT_TRUE(get_result.has_value()) + << "Get after Upsert failed: " << toString(get_result.error()); + ASSERT_EQ(slices[0].size, updated_size); + ASSERT_EQ(memcmp(slices[0].ptr, updated_data.data(), updated_size), 0) + << "Data should be updated to 'Y's with new size"; + client_buffer_allocator_->deallocate(buffer, updated_size); + + // Clean up + std::this_thread::sleep_for( + std::chrono::milliseconds(default_kv_lease_ttl_)); + auto remove_result = test_client_->Remove(key); + ASSERT_TRUE(remove_result.has_value()) + << "Remove failed: " << toString(remove_result.error()); +} + +// Test BatchUpsert with mixed cases (A + B + C) +TEST_F(ClientIntegrationTest, BatchUpsertMixed) { + const size_t data_size = 64; + ReplicateConfig config; + config.replica_num = 1; + + // Pre-create key_b (for Case B: same size) and key_c (for Case C: diff + // size) + std::string key_b = "batch_upsert_case_b"; + std::string key_c = "batch_upsert_case_c"; + + // Put key_b: 64 bytes of 'M' + { + std::string data(data_size, 'M'); + void* buf = client_buffer_allocator_->allocate(data_size); + memcpy(buf, data.data(), data_size); + std::vector sl; + sl.emplace_back(Slice{buf, data_size}); + auto r = test_client_->Put(key_b, sl, config); + ASSERT_TRUE(r.has_value()) + << "Put key_b failed: " << toString(r.error()); + client_buffer_allocator_->deallocate(buf, data_size); + } + + // Put key_c: 64 bytes of 'N' + { + std::string data(data_size, 'N'); + void* buf = client_buffer_allocator_->allocate(data_size); + memcpy(buf, data.data(), data_size); + std::vector sl; + sl.emplace_back(Slice{buf, data_size}); + auto r = test_client_->Put(key_c, sl, config); + ASSERT_TRUE(r.has_value()) + << "Put key_c failed: " << toString(r.error()); + client_buffer_allocator_->deallocate(buf, data_size); + } + + // Wait for lease to expire + std::this_thread::sleep_for( + std::chrono::milliseconds(default_kv_lease_ttl_)); + + // Now BatchUpsert: + // key_a (new) → Case A, 64 bytes of 'P' + // key_b (exists, same) → Case B, 64 bytes of 'Q' + // key_c (exists, larger) → Case C, 128 bytes of 'R' + std::string key_a = "batch_upsert_case_a"; + std::vector keys = {key_a, key_b, key_c}; + + const size_t size_a = data_size; + const size_t size_b = data_size; + const size_t size_c = data_size * 2; + std::string data_a(size_a, 'P'); + std::string data_b(size_b, 'Q'); + std::string data_c(size_c, 'R'); + + std::vector> batched_slices; + std::vector alloc_ptrs; // track for cleanup + + auto alloc_and_fill = [&](const std::string& data, size_t sz) { + void* buf = client_buffer_allocator_->allocate(sz); + EXPECT_NE(buf, nullptr); + memcpy(buf, data.data(), sz); + alloc_ptrs.push_back(buf); + std::vector sl; + sl.emplace_back(Slice{buf, sz}); + batched_slices.push_back(std::move(sl)); + }; + + alloc_and_fill(data_a, size_a); + alloc_and_fill(data_b, size_b); + alloc_and_fill(data_c, size_c); + + auto batch_results = + test_client_->BatchUpsert(keys, batched_slices, config); + ASSERT_EQ(batch_results.size(), 3); + for (size_t i = 0; i < batch_results.size(); ++i) { + ASSERT_TRUE(batch_results[i].has_value()) + << "BatchUpsert failed for key " << keys[i] << ": " + << toString(batch_results[i].error()); + } + + // Free write buffers + client_buffer_allocator_->deallocate(alloc_ptrs[0], size_a); + client_buffer_allocator_->deallocate(alloc_ptrs[1], size_b); + client_buffer_allocator_->deallocate(alloc_ptrs[2], size_c); + + // Verify each key's data + auto verify = [&](const std::string& key, const std::string& expected, + size_t sz) { + void* buf = client_buffer_allocator_->allocate(sz); + std::vector sl; + sl.emplace_back(Slice{buf, sz}); + auto get_result = test_client_->Get(key, sl); + EXPECT_TRUE(get_result.has_value()) + << "Get failed for " << key << ": " << toString(get_result.error()); + if (get_result.has_value()) { + EXPECT_EQ(sl[0].size, sz); + EXPECT_EQ(memcmp(sl[0].ptr, expected.data(), sz), 0) + << "Data mismatch for " << key; + } + client_buffer_allocator_->deallocate(buf, sz); + }; + + verify(key_a, data_a, size_a); + verify(key_b, data_b, size_b); + verify(key_c, data_c, size_c); + + // Clean up + std::this_thread::sleep_for( + std::chrono::milliseconds(default_kv_lease_ttl_)); + for (const auto& key : keys) { + auto r = test_client_->Remove(key); + EXPECT_TRUE(r.has_value()) + << "Remove failed for " << key << ": " << toString(r.error()); + } +} + } // namespace testing } // namespace mooncake diff --git a/mooncake-store/tests/master_service_test.cpp b/mooncake-store/tests/master_service_test.cpp index c55200243..068214512 100644 --- a/mooncake-store/tests/master_service_test.cpp +++ b/mooncake-store/tests/master_service_test.cpp @@ -4272,6 +4272,431 @@ TEST_F(MasterServiceTest, ForceRemoveAllLeasedObjects) { ASSERT_FALSE(exist_result.value()); } } +// ===================== Upsert Tests ===================== + +TEST_F(MasterServiceTest, UpsertNewKey) { + // Case A: key does not exist — behaves like PutStart + std::unique_ptr service_(new MasterService()); + [[maybe_unused]] const auto context = PrepareSimpleSegment(*service_); + const UUID client_id = generate_uuid(); + + std::string key = "upsert_new_key"; + uint64_t slice_length = 1024; + ReplicateConfig config; + config.replica_num = 1; + + auto upsert_result = + service_->UpsertStart(client_id, key, slice_length, config); + ASSERT_TRUE(upsert_result.has_value()); + auto replicas = upsert_result.value(); + EXPECT_EQ(1, replicas.size()); + EXPECT_EQ(ReplicaStatus::PROCESSING, replicas[0].status); + + // During upsert, GetReplicaList should return not ready + auto get_result = service_->GetReplicaList(key); + EXPECT_FALSE(get_result.has_value()); + EXPECT_EQ(ErrorCode::REPLICA_IS_NOT_READY, get_result.error()); + + // UpsertEnd completes the operation + auto end_result = service_->UpsertEnd(client_id, key, ReplicaType::MEMORY); + ASSERT_TRUE(end_result.has_value()); + + // Verify replica is COMPLETE + auto final_result = service_->GetReplicaList(key); + ASSERT_TRUE(final_result.has_value()); + EXPECT_EQ(1, final_result.value().replicas.size()); + EXPECT_EQ(ReplicaStatus::COMPLETE, final_result.value().replicas[0].status); +} + +TEST_F(MasterServiceTest, UpsertSameSize) { + // Case B: key exists with same size — in-place update + std::unique_ptr service_(new MasterService()); + [[maybe_unused]] const auto context = PrepareSimpleSegment(*service_); + const UUID client_id = generate_uuid(); + + std::string key = "upsert_same_size"; + uint64_t slice_length = 1024; + ReplicateConfig config; + config.replica_num = 1; + + // First: PutStart + PutEnd to create the object + auto put_result = service_->PutStart(client_id, key, slice_length, config); + ASSERT_TRUE(put_result.has_value()); + auto original_replicas = put_result.value(); + auto put_end = service_->PutEnd(client_id, key, ReplicaType::MEMORY); + ASSERT_TRUE(put_end.has_value()); + + // UpsertStart with same size — should reuse buffers + const UUID new_client_id = generate_uuid(); + auto upsert_result = + service_->UpsertStart(new_client_id, key, slice_length, config); + ASSERT_TRUE(upsert_result.has_value()); + auto upsert_replicas = upsert_result.value(); + EXPECT_EQ(1, upsert_replicas.size()); + EXPECT_EQ(ReplicaStatus::PROCESSING, upsert_replicas[0].status); + + // Verify same buffer address (in-place reuse) + EXPECT_EQ(original_replicas[0] + .get_memory_descriptor() + .buffer_descriptor.buffer_address_, + upsert_replicas[0] + .get_memory_descriptor() + .buffer_descriptor.buffer_address_); + + // UpsertEnd with the new client_id + auto end_result = + service_->UpsertEnd(new_client_id, key, ReplicaType::MEMORY); + ASSERT_TRUE(end_result.has_value()); + + // Verify replica is COMPLETE again + auto final_result = service_->GetReplicaList(key); + ASSERT_TRUE(final_result.has_value()); + EXPECT_EQ(ReplicaStatus::COMPLETE, final_result.value().replicas[0].status); +} + +TEST_F(MasterServiceTest, UpsertSameSizeRefreshesMetadata) { + // Case B: verify client_id and put_start_time are refreshed + std::unique_ptr service_(new MasterService()); + [[maybe_unused]] const auto context = PrepareSimpleSegment(*service_); + const UUID client_id_a = generate_uuid(); + const UUID client_id_b = generate_uuid(); + + std::string key = "upsert_refresh_metadata"; + uint64_t slice_length = 1024; + ReplicateConfig config; + config.replica_num = 1; + + // Create object with client_a + auto put_result = + service_->PutStart(client_id_a, key, slice_length, config); + ASSERT_TRUE(put_result.has_value()); + auto put_end = service_->PutEnd(client_id_a, key, ReplicaType::MEMORY); + ASSERT_TRUE(put_end.has_value()); + + // UpsertStart with client_b + auto upsert_result = + service_->UpsertStart(client_id_b, key, slice_length, config); + ASSERT_TRUE(upsert_result.has_value()); + + // UpsertEnd with client_a should fail (client_id was refreshed to client_b) + auto end_fail = service_->UpsertEnd(client_id_a, key, ReplicaType::MEMORY); + EXPECT_FALSE(end_fail.has_value()); + EXPECT_EQ(ErrorCode::ILLEGAL_CLIENT, end_fail.error()); + + // UpsertEnd with client_b should succeed + auto end_ok = service_->UpsertEnd(client_id_b, key, ReplicaType::MEMORY); + ASSERT_TRUE(end_ok.has_value()); +} + +TEST_F(MasterServiceTest, UpsertDifferentSize) { + // Case C: key exists with different size — delete and reallocate + std::unique_ptr service_(new MasterService()); + [[maybe_unused]] const auto context = PrepareSimpleSegment(*service_); + const UUID client_id = generate_uuid(); + + std::string key = "upsert_diff_size"; + uint64_t original_size = 1024; + uint64_t new_size = 2048; + ReplicateConfig config; + config.replica_num = 1; + + // Create object with original_size + auto put_result = service_->PutStart(client_id, key, original_size, config); + ASSERT_TRUE(put_result.has_value()); + auto original_replicas = put_result.value(); + auto put_end = service_->PutEnd(client_id, key, ReplicaType::MEMORY); + ASSERT_TRUE(put_end.has_value()); + + // UpsertStart with different size + auto upsert_result = + service_->UpsertStart(client_id, key, new_size, config); + ASSERT_TRUE(upsert_result.has_value()); + auto new_replicas = upsert_result.value(); + EXPECT_EQ(1, new_replicas.size()); + EXPECT_EQ(ReplicaStatus::PROCESSING, new_replicas[0].status); + + // Buffer address should be different (reallocated) + EXPECT_NE(original_replicas[0] + .get_memory_descriptor() + .buffer_descriptor.buffer_address_, + new_replicas[0] + .get_memory_descriptor() + .buffer_descriptor.buffer_address_); + + // UpsertEnd + auto end_result = service_->UpsertEnd(client_id, key, ReplicaType::MEMORY); + ASSERT_TRUE(end_result.has_value()); + + // Verify the object is complete + auto final_result = service_->GetReplicaList(key); + ASSERT_TRUE(final_result.has_value()); + EXPECT_EQ(ReplicaStatus::COMPLETE, final_result.value().replicas[0].status); +} + +TEST_F(MasterServiceTest, UpsertConflictReplicationTask) { + // Upsert should fail if Copy is in progress + const uint64_t kv_lease_ttl = 50; + auto service_config = MasterServiceConfig::builder() + .set_default_kv_lease_ttl(kv_lease_ttl) + .build(); + std::unique_ptr service_(new MasterService(service_config)); + + [[maybe_unused]] const auto ctx1 = + PrepareSimpleSegment(*service_, "segment_1"); + [[maybe_unused]] const auto ctx2 = + PrepareSimpleSegment(*service_, "segment_2"); + UUID client_id = generate_uuid(); + + std::string key = "upsert_conflict_copy"; + uint64_t slice_length = 1024; + ReplicateConfig config; + config.replica_num = 1; + config.preferred_segment = "segment_1"; + + // Create object + auto put_result = service_->PutStart(client_id, key, slice_length, config); + ASSERT_TRUE(put_result.has_value()); + auto put_end = service_->PutEnd(client_id, key, ReplicaType::MEMORY); + ASSERT_TRUE(put_end.has_value()); + + // Start a Copy + auto copy_result = + service_->CopyStart(client_id, key, "segment_1", {"segment_2"}); + ASSERT_TRUE(copy_result.has_value()); + + // UpsertStart should fail with OBJECT_HAS_REPLICATION_TASK + auto upsert_result = + service_->UpsertStart(client_id, key, slice_length, config); + EXPECT_FALSE(upsert_result.has_value()); + EXPECT_EQ(ErrorCode::OBJECT_HAS_REPLICATION_TASK, upsert_result.error()); +} + +TEST_F(MasterServiceTest, UpsertPreemptsInProgressPut) { + // Upsert should preempt an in-progress Put (no discard timeout needed + // for preemption via Upsert — Upsert always preempts immediately) + std::unique_ptr service_(new MasterService()); + [[maybe_unused]] const auto context = PrepareSimpleSegment(*service_); + const UUID client_a = generate_uuid(); + const UUID client_b = generate_uuid(); + + std::string key = "upsert_preempt"; + uint64_t slice_length = 1024; + ReplicateConfig config; + config.replica_num = 1; + + // Client A starts a Put but doesn't finish + auto put_result = service_->PutStart(client_a, key, slice_length, config); + ASSERT_TRUE(put_result.has_value()); + + // Client B upserts the same key — should preempt client A + auto upsert_result = + service_->UpsertStart(client_b, key, slice_length, config); + ASSERT_TRUE(upsert_result.has_value()); + auto upsert_replicas = upsert_result.value(); + EXPECT_EQ(1, upsert_replicas.size()); + EXPECT_EQ(ReplicaStatus::PROCESSING, upsert_replicas[0].status); + + // Client A's PutEnd should fail + auto put_end_a = service_->PutEnd(client_a, key, ReplicaType::MEMORY); + EXPECT_FALSE(put_end_a.has_value()); + + // Client B's UpsertEnd should succeed + auto upsert_end = service_->UpsertEnd(client_b, key, ReplicaType::MEMORY); + ASSERT_TRUE(upsert_end.has_value()); + + // Verify final state + auto final_result = service_->GetReplicaList(key); + ASSERT_TRUE(final_result.has_value()); + EXPECT_EQ(ReplicaStatus::COMPLETE, final_result.value().replicas[0].status); +} + +TEST_F(MasterServiceTest, UpsertRevoke) { + // UpsertRevoke should clean up like PutRevoke + std::unique_ptr service_(new MasterService()); + [[maybe_unused]] const auto context = PrepareSimpleSegment(*service_); + const UUID client_id = generate_uuid(); + + std::string key = "upsert_revoke"; + uint64_t slice_length = 1024; + ReplicateConfig config; + config.replica_num = 1; + + // UpsertStart (Case A — new key) + auto upsert_result = + service_->UpsertStart(client_id, key, slice_length, config); + ASSERT_TRUE(upsert_result.has_value()); + + // UpsertRevoke + auto revoke_result = + service_->UpsertRevoke(client_id, key, ReplicaType::MEMORY); + ASSERT_TRUE(revoke_result.has_value()); + + // Key should be gone + auto exist_result = service_->ExistKey(key); + ASSERT_TRUE(exist_result.has_value()); + EXPECT_FALSE(exist_result.value()); +} + +TEST_F(MasterServiceTest, UpsertInPlaceThenRevoke) { + // UpsertRevoke after in-place UpsertStart should clean up + std::unique_ptr service_(new MasterService()); + [[maybe_unused]] const auto context = PrepareSimpleSegment(*service_); + const UUID client_id = generate_uuid(); + + std::string key = "upsert_inplace_revoke"; + uint64_t slice_length = 1024; + ReplicateConfig config; + config.replica_num = 1; + + // Create object first + auto put_result = service_->PutStart(client_id, key, slice_length, config); + ASSERT_TRUE(put_result.has_value()); + auto put_end = service_->PutEnd(client_id, key, ReplicaType::MEMORY); + ASSERT_TRUE(put_end.has_value()); + + // UpsertStart in-place (same size) + const UUID new_client = generate_uuid(); + auto upsert_result = + service_->UpsertStart(new_client, key, slice_length, config); + ASSERT_TRUE(upsert_result.has_value()); + + // UpsertRevoke — replicas are PROCESSING, should be erased + auto revoke_result = + service_->UpsertRevoke(new_client, key, ReplicaType::MEMORY); + ASSERT_TRUE(revoke_result.has_value()); + + // Key should be gone (no valid replicas left) + auto exist_result = service_->ExistKey(key); + ASSERT_TRUE(exist_result.has_value()); + EXPECT_FALSE(exist_result.value()); +} + +TEST_F(MasterServiceTest, BatchUpsertStart) { + // Test batch upsert with a mix of new and existing keys + std::unique_ptr service_(new MasterService()); + [[maybe_unused]] const auto context = PrepareSimpleSegment(*service_); + const UUID client_id = generate_uuid(); + + ReplicateConfig config; + config.replica_num = 1; + + // Create key_1 with size 1024 + auto put_result = service_->PutStart(client_id, "key_1", 1024, config); + ASSERT_TRUE(put_result.has_value()); + auto put_end = service_->PutEnd(client_id, "key_1", ReplicaType::MEMORY); + ASSERT_TRUE(put_end.has_value()); + + // BatchUpsertStart: key_1 (same size), key_2 (new) + std::vector keys = {"key_1", "key_2"}; + std::vector slice_lengths = {1024, 2048}; + + auto results = + service_->BatchUpsertStart(client_id, keys, slice_lengths, config); + ASSERT_EQ(2, results.size()); + EXPECT_TRUE(results[0].has_value()); // key_1: Case B (in-place) + EXPECT_TRUE(results[1].has_value()); // key_2: Case A (new) + + // Complete both + auto end_results = service_->BatchUpsertEnd(client_id, keys); + ASSERT_EQ(2, end_results.size()); + EXPECT_TRUE(end_results[0].has_value()); + EXPECT_TRUE(end_results[1].has_value()); +} + +TEST_F(MasterServiceTest, UpsertPreemptsInProgressUpsert) { + // Upsert should preempt an in-progress Upsert (Case B in-place). + // After preemption, all replicas were PROCESSING (no COMPLETE survives), + // so metadata is erased and the new upsert falls through to Case A. + std::unique_ptr service_(new MasterService()); + [[maybe_unused]] const auto context = PrepareSimpleSegment(*service_); + const UUID client_a = generate_uuid(); + const UUID client_b = generate_uuid(); + const UUID client_c = generate_uuid(); + + std::string key = "upsert_preempt_upsert"; + uint64_t slice_length = 1024; + ReplicateConfig config; + config.replica_num = 1; + + // Step 1: Create the object via Put + auto put_result = service_->PutStart(client_a, key, slice_length, config); + ASSERT_TRUE(put_result.has_value()); + auto put_end = service_->PutEnd(client_a, key, ReplicaType::MEMORY); + ASSERT_TRUE(put_end.has_value()); + + // Step 2: Client B starts in-place upsert (Case B) — marks COMPLETE → PROCESSING + auto upsert_b = service_->UpsertStart(client_b, key, slice_length, config); + ASSERT_TRUE(upsert_b.has_value()); + + // Key should be unreadable now (all replicas are PROCESSING) + auto get_mid = service_->GetReplicaList(key); + EXPECT_FALSE(get_mid.has_value()); + EXPECT_EQ(ErrorCode::REPLICA_IS_NOT_READY, get_mid.error()); + + // Step 3: Client C upserts the same key — preempts Client B + auto upsert_c = service_->UpsertStart(client_c, key, slice_length, config); + ASSERT_TRUE(upsert_c.has_value()); + EXPECT_EQ(1, upsert_c.value().size()); + + // Step 4: Client B's UpsertEnd should fail (preempted) + auto end_b = service_->UpsertEnd(client_b, key, ReplicaType::MEMORY); + EXPECT_FALSE(end_b.has_value()); + + // Step 5: Client C's UpsertEnd should succeed + auto end_c = service_->UpsertEnd(client_c, key, ReplicaType::MEMORY); + ASSERT_TRUE(end_c.has_value()); + + // Final verification + auto final_result = service_->GetReplicaList(key); + ASSERT_TRUE(final_result.has_value()); + EXPECT_EQ(1, final_result.value().replicas.size()); + EXPECT_EQ(ReplicaStatus::COMPLETE, final_result.value().replicas[0].status); +} + +TEST_F(MasterServiceTest, UpsertDifferentSizeThenRevoke) { + // Case C (different size) followed by UpsertRevoke. + // Old replicas go to discarded_replicas_, new replicas are erased by revoke. + // The key should disappear entirely. + std::unique_ptr service_(new MasterService()); + [[maybe_unused]] const auto context = PrepareSimpleSegment(*service_); + const UUID client_id = generate_uuid(); + + std::string key = "upsert_diff_revoke"; + uint64_t original_size = 1024; + uint64_t new_size = 2048; + ReplicateConfig config; + config.replica_num = 1; + + // Create object with original size + auto put_result = service_->PutStart(client_id, key, original_size, config); + ASSERT_TRUE(put_result.has_value()); + auto put_end = service_->PutEnd(client_id, key, ReplicaType::MEMORY); + ASSERT_TRUE(put_end.has_value()); + + // Verify the key exists + auto exist_before = service_->ExistKey(key); + ASSERT_TRUE(exist_before.has_value()); + EXPECT_TRUE(exist_before.value()); + + // UpsertStart with different size (Case C) — old replicas discarded, + // new replicas allocated + auto upsert_result = + service_->UpsertStart(client_id, key, new_size, config); + ASSERT_TRUE(upsert_result.has_value()); + + // Revoke — erase the newly allocated PROCESSING replicas + auto revoke_result = + service_->UpsertRevoke(client_id, key, ReplicaType::MEMORY); + ASSERT_TRUE(revoke_result.has_value()); + + // Key should be gone (old replicas in discarded, new replicas erased) + auto exist_after = service_->ExistKey(key); + ASSERT_TRUE(exist_after.has_value()); + EXPECT_FALSE(exist_after.value()); +} + +// ===================== Hard Pin Tests ===================== + TEST_F(MasterServiceTest, HardPinObjectNotEvicted) { // Hard-pinned objects must survive eviction under memory pressure, // even after lease expires and all non-pinned objects are gone.