-
Notifications
You must be signed in to change notification settings - Fork 657
[Store] Add ObjectDataType enum for type-aware metadata #1719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
55c7893
bfbdef2
e3eb2f1
96bad43
e68abff
515b36e
c632b5c
011f3ce
0a85afb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ | |
| #include <memory> | ||
| #include <optional> | ||
| #include <string> | ||
| #include <string_view> | ||
| #include <limits> | ||
| #include <unordered_map> | ||
| #include <utility> | ||
|
|
@@ -123,6 +124,46 @@ static constexpr uint64_t DEFAULT_PROCESSING_TASK_TIMEOUT_SEC = | |
| 300; // 0 to be no timeout | ||
| static constexpr uint32_t DEFAULT_MAX_RETRY_ATTEMPTS = 10; | ||
|
|
||
| /** | ||
| * @brief Data type classification for objects stored in Mooncake Store. | ||
| * | ||
| * This allows the store to track what kind of data each object holds, | ||
| * enabling future type-aware policies (eviction priority, replication | ||
| * strategies, etc.). Defaults to UNKNOWN for backward compatibility. | ||
| */ | ||
| enum class ObjectDataType : uint8_t { | ||
| UNKNOWN = 0, | ||
| KVCACHE = 1, | ||
| TENSOR = 2, | ||
| WEIGHT = 3, | ||
| SAMPLE = 4, | ||
| ACTIVATION = 5, | ||
| GRADIENT = 6, | ||
| OPTIMIZER_STATE = 7, | ||
| METADATA = 8, | ||
| GENERAL = 9, | ||
| // 10-255 reserved for future types | ||
| }; | ||
|
|
||
| inline std::ostream& operator<<(std::ostream& os, | ||
| const ObjectDataType& type) noexcept { | ||
| static const std::unordered_map<ObjectDataType, std::string_view> | ||
| type_strings{{ObjectDataType::UNKNOWN, "UNKNOWN"}, | ||
| {ObjectDataType::KVCACHE, "KVCACHE"}, | ||
| {ObjectDataType::TENSOR, "TENSOR"}, | ||
| {ObjectDataType::WEIGHT, "WEIGHT"}, | ||
| {ObjectDataType::SAMPLE, "SAMPLE"}, | ||
| {ObjectDataType::ACTIVATION, "ACTIVATION"}, | ||
| {ObjectDataType::GRADIENT, "GRADIENT"}, | ||
| {ObjectDataType::OPTIMIZER_STATE, "OPTIMIZER_STATE"}, | ||
| {ObjectDataType::METADATA, "METADATA"}, | ||
| {ObjectDataType::GENERAL, "GENERAL"}}; | ||
|
|
||
| auto it = type_strings.find(type); | ||
| os << (it != type_strings.end() ? it->second : "UNKNOWN"); | ||
| return os; | ||
| } | ||
|
Comment on lines
+148
to
+165
|
||
|
|
||
| // Forward declarations | ||
| class BufferAllocatorBase; | ||
| class CachelibBufferAllocator; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -781,7 +781,8 @@ auto MasterService::AllocateAndInsertMetadata( | |
| shard->metadata.emplace( | ||
| std::piecewise_construct, std::forward_as_tuple(key), | ||
| std::forward_as_tuple(client_id, now, value_length, std::move(replicas), | ||
| config.with_soft_pin, config.with_hard_pin)); | ||
| config.with_soft_pin, config.with_hard_pin, | ||
| config.data_type)); | ||
| shard->processing_keys.insert(key); | ||
|
|
||
| return replica_list; | ||
|
|
@@ -3867,7 +3868,7 @@ MasterService::MetadataSerializer::DeserializeShard(const msgpack::object& obj, | |
| metadata_ptr->client_id, metadata_ptr->put_start_time, | ||
| metadata_ptr->size, metadata_ptr->PopReplicas(), | ||
| metadata_ptr->soft_pin_timeout.has_value(), | ||
| metadata_ptr->IsHardPinned())); | ||
| metadata_ptr->IsHardPinned(), metadata_ptr->data_type)); | ||
|
|
||
| it->second.lease_timeout = metadata_ptr->lease_timeout; | ||
| it->second.soft_pin_timeout = metadata_ptr->soft_pin_timeout; | ||
|
|
@@ -3882,12 +3883,12 @@ MasterService::MetadataSerializer::SerializeMetadata( | |
| MsgpackPacker& packer) const { | ||
| // Pack ObjectMetadata using array structure for efficiency | ||
| // Format: [client_id, put_start_time, size, lease_timeout, | ||
| // has_soft_pin_timeout, soft_pin_timeout, replicas_count, replicas..., | ||
| // hard_pinned] | ||
| // has_soft_pin_timeout, soft_pin_timeout, replicas_count, data_type, | ||
| // replicas..., hard_pinned] | ||
|
|
||
| size_t array_size = 8; // client_id, put_start_time, size, lease_timeout, | ||
| size_t array_size = 9; // client_id, put_start_time, size, lease_timeout, | ||
| // has_soft_pin_timeout, soft_pin_timeout, | ||
| // replicas_count + hard_pinned | ||
| // replicas_count, data_type, hard_pinned | ||
| array_size += metadata.CountReplicas(); // One element per replica | ||
| packer.pack_array(array_size); | ||
|
|
||
|
|
@@ -3927,6 +3928,9 @@ MasterService::MetadataSerializer::SerializeMetadata( | |
| // Serialize replicas count | ||
| packer.pack(static_cast<uint32_t>(metadata.CountReplicas())); | ||
|
|
||
| // Serialize data_type | ||
| packer.pack(static_cast<uint8_t>(metadata.data_type)); | ||
|
|
||
|
Comment on lines
3884
to
+3933
|
||
| // Serialize replicas | ||
| for (const auto& replica : metadata.GetAllReplicas()) { | ||
| auto result = Serializer<Replica>::serialize( | ||
|
|
@@ -3951,9 +3955,8 @@ MasterService::MetadataSerializer::DeserializeMetadata( | |
| "deserialize ObjectMetadata state is not an array")); | ||
| } | ||
|
|
||
| // Need at least 7 elements: client_id, put_start_time, size, lease_timeout, | ||
| // has_soft_pin_timeout, soft_pin_timeout, replicas_count | ||
| // (8th element = hard_pinned is optional for backward compat) | ||
| // Need at least 7 elements for old format, 8 for data_type-only or | ||
| // hard_pinned-only, 9 for newest format with both | ||
| if (obj.via.array.size < 7) { | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
|
|
@@ -3986,15 +3989,27 @@ MasterService::MetadataSerializer::DeserializeMetadata( | |
| // Deserialize replicas count | ||
| uint32_t replicas_count = array[index++].as<uint32_t>(); | ||
|
|
||
| // Array size: 7 + replicas_count (old format) or 8 + replicas_count (new | ||
| // format with hard_pinned) | ||
| if (obj.via.array.size != 7 + replicas_count && | ||
| obj.via.array.size != 8 + replicas_count) { | ||
| // Format detection: | ||
| // v1 (old): 7 + replicas_count — no data_type, no hard_pinned | ||
| // v2 (main): 8 + replicas_count — hard_pinned after replicas | ||
| // v3 (newest): 9 + replicas_count — adds data_type & hard_pinned | ||
| const uint32_t total_elements = obj.via.array.size; | ||
| const bool is_v1 = (total_elements == 7 + replicas_count); | ||
| const bool is_v2 = (total_elements == 8 + replicas_count); | ||
| const bool is_v3 = (total_elements == 9 + replicas_count); | ||
|
|
||
| if (!is_v1 && !is_v2 && !is_v3) { | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
| "deserialize ObjectMetadata array size mismatch")); | ||
| } | ||
|
|
||
| // v3 has data_type right after replicas_count | ||
| ObjectDataType data_type = ObjectDataType::UNKNOWN; | ||
| if (is_v3) { | ||
| data_type = static_cast<ObjectDataType>(array[index++].as<uint8_t>()); | ||
| } | ||
|
|
||
| // Deserialize replicas | ||
| std::vector<Replica> replicas; | ||
| replicas.reserve(replicas_count); | ||
|
|
@@ -4020,7 +4035,7 @@ MasterService::MetadataSerializer::DeserializeMetadata( | |
| client_id, | ||
| std::chrono::system_clock::time_point( | ||
| std::chrono::milliseconds(put_start_time_timestamp)), | ||
| size, std::move(replicas), enable_soft_pin, is_hard_pinned); | ||
| size, std::move(replicas), enable_soft_pin, is_hard_pinned, data_type); | ||
| metadata->lease_timeout = std::chrono::system_clock::time_point( | ||
| std::chrono::milliseconds(lease_timestamp)); | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should add a
generaltype?