Skip to content
15 changes: 15 additions & 0 deletions mooncake-integration/store/store_py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1392,6 +1392,20 @@ class MooncakeHostMemAllocatorPyWrapper {
};

PYBIND11_MODULE(store, m) {
// Object data type classification
py::enum_<ObjectDataType>(m, "ObjectDataType")
.value("UNKNOWN", ObjectDataType::UNKNOWN)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should add a general type?

.value("KVCACHE", ObjectDataType::KVCACHE)
.value("TENSOR", ObjectDataType::TENSOR)
.value("WEIGHT", ObjectDataType::WEIGHT)
.value("SAMPLE", ObjectDataType::SAMPLE)
.value("ACTIVATION", ObjectDataType::ACTIVATION)
.value("GRADIENT", ObjectDataType::GRADIENT)
.value("OPTIMIZER_STATE", ObjectDataType::OPTIMIZER_STATE)
.value("METADATA", ObjectDataType::METADATA)
.value("GENERAL", ObjectDataType::GENERAL)
.export_values();

// Define the ReplicateConfig class
py::class_<ReplicateConfig>(m, "ReplicateConfig")
.def(py::init<>())
Expand All @@ -1402,6 +1416,7 @@ PYBIND11_MODULE(store, m) {
.def_readwrite("preferred_segment", &ReplicateConfig::preferred_segment)
.def_readwrite("prefer_alloc_in_same_node",
&ReplicateConfig::prefer_alloc_in_same_node)
.def_readwrite("data_type", &ReplicateConfig::data_type)
.def("__str__", [](const ReplicateConfig &config) {
std::ostringstream oss;
oss << config;
Expand Down
10 changes: 7 additions & 3 deletions mooncake-store/include/master_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -546,10 +546,12 @@ class MasterService {
const UUID& client_id_,
const std::chrono::system_clock::time_point put_start_time_,
size_t value_length, std::vector<Replica>&& reps,
bool enable_soft_pin, bool enable_hard_pin = false)
bool enable_soft_pin, bool enable_hard_pin = false,
ObjectDataType data_type_ = ObjectDataType::UNKNOWN)
: client_id(client_id_),
put_start_time(put_start_time_),
size(value_length),
data_type(data_type_),
lease_timeout(),
soft_pin_timeout(std::nullopt),
hard_pinned(enable_hard_pin),
Expand All @@ -572,6 +574,7 @@ class MasterService {
// Updated by UpsertStart (Case B) to reset the discard timeout.
std::chrono::system_clock::time_point put_start_time;
const size_t size;
const ObjectDataType data_type{ObjectDataType::UNKNOWN};

mutable SpinLock lock;
// Default constructor, creates a time_point representing
Expand Down Expand Up @@ -975,7 +978,8 @@ class MasterService {

void Create(const UUID& client_id, uint64_t total_length,
std::vector<Replica> replicas, bool enable_soft_pin,
bool enable_hard_pin = false) {
bool enable_hard_pin = false,
ObjectDataType data_type = ObjectDataType::UNKNOWN) {
if (Exists()) {
throw std::logic_error("Already exists");
}
Expand All @@ -984,7 +988,7 @@ class MasterService {
std::piecewise_construct, std::forward_as_tuple(key_),
std::forward_as_tuple(client_id, now, total_length,
std::move(replicas), enable_soft_pin,
enable_hard_pin));
enable_hard_pin, data_type));
it_ = result.first;
}

Expand Down
4 changes: 3 additions & 1 deletion mooncake-store/include/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ struct ReplicateConfig {
std::string preferred_segment{}; // Deprecated: Single preferred segment
// for backward compatibility
bool prefer_alloc_in_same_node{false};
ObjectDataType data_type{ObjectDataType::UNKNOWN};

friend std::ostream& operator<<(std::ostream& os,
const ReplicateConfig& config) noexcept {
Expand All @@ -107,7 +108,8 @@ struct ReplicateConfig {
<< config.preferred_segment;
}
os << ", prefer_alloc_in_same_node: "
<< config.prefer_alloc_in_same_node << " }";
<< config.prefer_alloc_in_same_node
<< ", data_type: " << config.data_type << " }";
return os;
}
};
Expand Down
41 changes: 41 additions & 0 deletions mooncake-store/include/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <limits>
#include <unordered_map>
#include <utility>
Expand Down Expand Up @@ -123,6 +124,46 @@ static constexpr uint64_t DEFAULT_PROCESSING_TASK_TIMEOUT_SEC =
300; // 0 to be no timeout
static constexpr uint32_t DEFAULT_MAX_RETRY_ATTEMPTS = 10;

/**
* @brief Data type classification for objects stored in Mooncake Store.
*
* This allows the store to track what kind of data each object holds,
* enabling future type-aware policies (eviction priority, replication
* strategies, etc.). Defaults to UNKNOWN for backward compatibility.
*/
enum class ObjectDataType : uint8_t {
UNKNOWN = 0,
KVCACHE = 1,
TENSOR = 2,
WEIGHT = 3,
SAMPLE = 4,
ACTIVATION = 5,
GRADIENT = 6,
OPTIMIZER_STATE = 7,
METADATA = 8,
GENERAL = 9,
// 10-255 reserved for future types
};

inline std::ostream& operator<<(std::ostream& os,
const ObjectDataType& type) noexcept {
static const std::unordered_map<ObjectDataType, std::string_view>
type_strings{{ObjectDataType::UNKNOWN, "UNKNOWN"},
{ObjectDataType::KVCACHE, "KVCACHE"},
{ObjectDataType::TENSOR, "TENSOR"},
{ObjectDataType::WEIGHT, "WEIGHT"},
{ObjectDataType::SAMPLE, "SAMPLE"},
{ObjectDataType::ACTIVATION, "ACTIVATION"},
{ObjectDataType::GRADIENT, "GRADIENT"},
{ObjectDataType::OPTIMIZER_STATE, "OPTIMIZER_STATE"},
{ObjectDataType::METADATA, "METADATA"},
{ObjectDataType::GENERAL, "GENERAL"}};

auto it = type_strings.find(type);
os << (it != type_strings.end() ? it->second : "UNKNOWN");
return os;
}
Comment on lines +148 to +165
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

types.h now uses std::string_view in the ObjectDataType stream operator, but the header does not include <string_view> (and also relies on indirect includes for std::ostream). This makes the header non-self-contained and can cause build breaks for translation units that include types.h directly. Add the missing standard includes in types.h.

Copilot uses AI. Check for mistakes.

// Forward declarations
class BufferAllocatorBase;
class CachelibBufferAllocator;
Expand Down
43 changes: 29 additions & 14 deletions mooncake-store/src/master_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,8 @@ auto MasterService::AllocateAndInsertMetadata(
shard->metadata.emplace(
std::piecewise_construct, std::forward_as_tuple(key),
std::forward_as_tuple(client_id, now, value_length, std::move(replicas),
config.with_soft_pin, config.with_hard_pin));
config.with_soft_pin, config.with_hard_pin,
config.data_type));
shard->processing_keys.insert(key);

return replica_list;
Expand Down Expand Up @@ -3867,7 +3868,7 @@ MasterService::MetadataSerializer::DeserializeShard(const msgpack::object& obj,
metadata_ptr->client_id, metadata_ptr->put_start_time,
metadata_ptr->size, metadata_ptr->PopReplicas(),
metadata_ptr->soft_pin_timeout.has_value(),
metadata_ptr->IsHardPinned()));
metadata_ptr->IsHardPinned(), metadata_ptr->data_type));

it->second.lease_timeout = metadata_ptr->lease_timeout;
it->second.soft_pin_timeout = metadata_ptr->soft_pin_timeout;
Expand All @@ -3882,12 +3883,12 @@ MasterService::MetadataSerializer::SerializeMetadata(
MsgpackPacker& packer) const {
// Pack ObjectMetadata using array structure for efficiency
// Format: [client_id, put_start_time, size, lease_timeout,
// has_soft_pin_timeout, soft_pin_timeout, replicas_count, replicas...,
// hard_pinned]
// has_soft_pin_timeout, soft_pin_timeout, replicas_count, data_type,
// replicas..., hard_pinned]

size_t array_size = 8; // client_id, put_start_time, size, lease_timeout,
size_t array_size = 9; // client_id, put_start_time, size, lease_timeout,
// has_soft_pin_timeout, soft_pin_timeout,
// replicas_count + hard_pinned
// replicas_count, data_type, hard_pinned
array_size += metadata.CountReplicas(); // One element per replica
packer.pack_array(array_size);

Expand Down Expand Up @@ -3927,6 +3928,9 @@ MasterService::MetadataSerializer::SerializeMetadata(
// Serialize replicas count
packer.pack(static_cast<uint32_t>(metadata.CountReplicas()));

// Serialize data_type
packer.pack(static_cast<uint8_t>(metadata.data_type));

Comment on lines 3884 to +3933
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Snapshot serialization now always writes the new array shape (8 + replicas_count) by inserting data_type. While the new deserializer can read both formats, this is only backward-compatible in the "new code reads old snapshots" direction. If any mixed-version deployment expects older binaries to load snapshots produced by newer binaries, they will likely fail on the extra field. Consider adding an explicit snapshot format/version marker or a compatibility mode to write the old format during rolling upgrades, and/or clarify the upgrade guarantees.

Copilot uses AI. Check for mistakes.
// Serialize replicas
for (const auto& replica : metadata.GetAllReplicas()) {
auto result = Serializer<Replica>::serialize(
Expand All @@ -3951,9 +3955,8 @@ MasterService::MetadataSerializer::DeserializeMetadata(
"deserialize ObjectMetadata state is not an array"));
}

// Need at least 7 elements: client_id, put_start_time, size, lease_timeout,
// has_soft_pin_timeout, soft_pin_timeout, replicas_count
// (8th element = hard_pinned is optional for backward compat)
// Need at least 7 elements for old format, 8 for data_type-only or
// hard_pinned-only, 9 for newest format with both
if (obj.via.array.size < 7) {
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
Expand Down Expand Up @@ -3986,15 +3989,27 @@ MasterService::MetadataSerializer::DeserializeMetadata(
// Deserialize replicas count
uint32_t replicas_count = array[index++].as<uint32_t>();

// Array size: 7 + replicas_count (old format) or 8 + replicas_count (new
// format with hard_pinned)
if (obj.via.array.size != 7 + replicas_count &&
obj.via.array.size != 8 + replicas_count) {
// Format detection:
// v1 (old): 7 + replicas_count — no data_type, no hard_pinned
// v2 (main): 8 + replicas_count — hard_pinned after replicas
// v3 (newest): 9 + replicas_count — adds data_type & hard_pinned
const uint32_t total_elements = obj.via.array.size;
const bool is_v1 = (total_elements == 7 + replicas_count);
const bool is_v2 = (total_elements == 8 + replicas_count);
const bool is_v3 = (total_elements == 9 + replicas_count);

if (!is_v1 && !is_v2 && !is_v3) {
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
"deserialize ObjectMetadata array size mismatch"));
}

// v3 has data_type right after replicas_count
ObjectDataType data_type = ObjectDataType::UNKNOWN;
if (is_v3) {
data_type = static_cast<ObjectDataType>(array[index++].as<uint8_t>());
}

// Deserialize replicas
std::vector<Replica> replicas;
replicas.reserve(replicas_count);
Expand All @@ -4020,7 +4035,7 @@ MasterService::MetadataSerializer::DeserializeMetadata(
client_id,
std::chrono::system_clock::time_point(
std::chrono::milliseconds(put_start_time_timestamp)),
size, std::move(replicas), enable_soft_pin, is_hard_pinned);
size, std::move(replicas), enable_soft_pin, is_hard_pinned, data_type);
metadata->lease_timeout = std::chrono::system_clock::time_point(
std::chrono::milliseconds(lease_timestamp));

Expand Down
1 change: 1 addition & 0 deletions mooncake-store/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ add_store_test(task_executor_test task_executor_test.cpp)
add_store_test(task_integration_test task_integration_test.cpp)
add_store_test(dummy_client_get_buffer_test dummy_client_get_buffer_test.cpp)
add_store_test(health_check_test health_check_test.cpp)
add_store_test(object_data_type_test object_data_type_test.cpp)
add_subdirectory(e2e)

add_executable(high_availability_test ha/leadership/high_availability_test.cpp)
Expand Down
Loading
Loading