Skip to content

Commit 3e44f33

Browse files
feat(python-sync): add response buffer support to get() to improve performance by reducing copies (valkey-io#5493)
Signed-off-by: Omer Rubinstein <omerrubi@amazon.com>
1 parent d4139c3 commit 3e44f33

7 files changed

Lines changed: 290 additions & 54 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
* Node: Add OpenTelemetry parent span context propagation support ([#4655](https://github.com/valkey-io/valkey-glide/issues/4655))
2424
* JAVA: Add cluster information and topology commands (CLUSTER INFO, CLUSTER NODES, CLUSTER SHARDS, CLUSTER LINKS, CLUSTER MYID, CLUSTER MYSHARDID) with batch support ([#5106](https://github.com/valkey-io/valkey-glide/issues/5106))
2525
* CORE: Add read only flag, enforcing no write commands and allowing for connecting without a primary ([#5411](https://github.com/valkey-io/valkey-glide/issues/5411))
26+
* Python Sync: Accept `bytearray` and `memoryview` as command argument types to improve performance by reducing copies ([#5492](https://github.com/valkey-io/valkey-glide/pull/5492))
27+
* Python Sync: Add response buffer support to get() to improve performance by reducing copies ([#5493](https://github.com/valkey-io/valkey-glide/pull/5493))
2628

2729
#### Fixes
2830
* CORE: Fix empty hostname in CLUSTER SLOTS metadata causing AllConnectionsUnavailable ([#5367](https://github.com/valkey-io/valkey-glide/issues/5367)). AWS ElastiCache (plaintext, cluster mode) returns `hostname: ""` in node metadata, which was used as the connection address instead of falling back to the IP.

ffi/src/lib.rs

Lines changed: 157 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,11 @@ pub enum ResponseType {
212212
Error = 9,
213213
}
214214

215+
/// A Send-safe wrapper around a raw buffer pointer and length.
216+
/// The caller guarantees the buffer remains valid for the duration of the FFI call.
217+
struct ResponseBuffer(*mut u8, usize);
218+
unsafe impl Send for ResponseBuffer {}
219+
215220
/// Success callback that is called when a command succeeds.
216221
///
217222
/// The success callback needs to copy the given string synchronously, since it will be dropped by Rust once the callback returns. The callback should be offloaded to a separate thread in order not to exhaust the client's thread pool.
@@ -434,6 +439,18 @@ impl ClientAdapter {
434439
/// For sync clients, blocks on the future and returns a `CommandResult`.
435440
#[must_use]
436441
fn execute_request<Fut>(&self, request_id: usize, request_future: Fut) -> *mut CommandResult
442+
where
443+
Fut: Future<Output = RedisResult<Value>> + Send + 'static,
444+
{
445+
self.execute_request_with_buffer(request_id, request_future, None)
446+
}
447+
448+
fn execute_request_with_buffer<Fut>(
449+
&self,
450+
request_id: usize,
451+
request_future: Fut,
452+
response_buf: Option<ResponseBuffer>,
453+
) -> *mut CommandResult
437454
where
438455
Fut: Future<Output = RedisResult<Value>> + Send + 'static,
439456
{
@@ -450,14 +467,15 @@ impl ClientAdapter {
450467
Some(success_callback),
451468
Some(failure_callback),
452469
request_id,
470+
response_buf,
453471
);
454472
});
455473
std::ptr::null_mut()
456474
}
457475
ClientType::SyncClient => {
458476
// Block on the request for sync client
459477
let result = self.runtime.block_on(request_future);
460-
Self::handle_result(result, None, None, request_id)
478+
Self::handle_result(result, None, None, request_id, response_buf)
461479
}
462480
}
463481
}
@@ -473,33 +491,39 @@ impl ClientAdapter {
473491
success_callback: Option<SuccessCallback>,
474492
failure_callback: Option<FailureCallback>,
475493
request_id: usize,
494+
response_buf: Option<ResponseBuffer>,
476495
) -> *mut CommandResult {
477496
match result {
478-
Ok(value) => match valkey_value_to_command_response(value) {
479-
Ok(command_response) => {
480-
if let Some(success_callback) = success_callback {
481-
unsafe {
482-
(success_callback)(
483-
request_id,
484-
Box::into_raw(Box::new(command_response)),
485-
);
497+
Ok(value) => {
498+
let buf = response_buf.map(|rb| (rb.0, rb.1));
499+
match valkey_value_to_command_response(value, buf) {
500+
Ok(command_response) => {
501+
if let Some(success_callback) = success_callback {
502+
unsafe {
503+
(success_callback)(
504+
request_id,
505+
Box::into_raw(Box::new(command_response)),
506+
);
507+
}
508+
} else {
509+
return Box::into_raw(Box::new(CommandResult {
510+
response: Box::into_raw(Box::new(command_response)),
511+
command_error: std::ptr::null_mut(),
512+
}));
486513
}
487-
} else {
488-
return Box::into_raw(Box::new(CommandResult {
489-
response: Box::into_raw(Box::new(command_response)),
490-
command_error: std::ptr::null_mut(),
491-
}));
492514
}
493-
}
494-
Err(err) => {
495-
if let Some(failure_callback) = failure_callback {
496-
unsafe { Self::send_async_redis_error(failure_callback, err, request_id) };
497-
} else {
498-
eprintln!("Error converting value to CommandResponse: {err:?}");
499-
return create_error_result_with_redis_error(err);
515+
Err(err) => {
516+
if let Some(failure_callback) = failure_callback {
517+
unsafe {
518+
Self::send_async_redis_error(failure_callback, err, request_id)
519+
};
520+
} else {
521+
eprintln!("Error converting value to CommandResponse: {err:?}");
522+
return create_error_result_with_redis_error(err);
523+
}
500524
}
501525
}
502-
},
526+
}
503527
Err(err) => {
504528
if let Some(failure_callback) = failure_callback {
505529
unsafe { Self::send_async_redis_error(failure_callback, err, request_id) };
@@ -1013,7 +1037,10 @@ fn convert_vec_to_pointer<T>(mut vec: Vec<T>) -> (*mut T, c_long) {
10131037
(vec_ptr, len)
10141038
}
10151039

1016-
fn valkey_value_to_command_response(value: Value) -> RedisResult<CommandResponse> {
1040+
fn valkey_value_to_command_response(
1041+
value: Value,
1042+
response_buf: Option<(*mut u8, usize)>,
1043+
) -> RedisResult<CommandResponse> {
10171044
let mut command_response = CommandResponse::default();
10181045
let result: RedisResult<CommandResponse> = match value {
10191046
Value::Nil => Ok(command_response),
@@ -1025,8 +1052,29 @@ fn valkey_value_to_command_response(value: Value) -> RedisResult<CommandResponse
10251052
command_response.response_type = ResponseType::String;
10261053
Ok(command_response)
10271054
}
1028-
Value::BulkString(text) => {
1029-
let (vec_ptr, len) = convert_vec_to_pointer(text);
1055+
Value::BulkString(data) => {
1056+
let data = if let Some((buf, buf_len)) = response_buf {
1057+
if data.len() > buf_len {
1058+
return Err(RedisError::from((
1059+
ErrorKind::ClientError,
1060+
"Value size exceeds buffer capacity",
1061+
format!(
1062+
"value is {} bytes but buffer is {} bytes",
1063+
data.len(),
1064+
buf_len
1065+
),
1066+
)));
1067+
}
1068+
// Copy data directly into the caller's buffer; the command response
1069+
// will carry the number of bytes written instead of the data itself.
1070+
unsafe {
1071+
std::ptr::copy_nonoverlapping(data.as_ptr(), buf, data.len());
1072+
}
1073+
data.len().to_string().into_bytes()
1074+
} else {
1075+
data
1076+
};
1077+
let (vec_ptr, len) = convert_vec_to_pointer(data);
10301078
command_response.string_value = vec_ptr as *mut c_char;
10311079
command_response.string_value_len = len;
10321080
command_response.response_type = ResponseType::String;
@@ -1062,7 +1110,7 @@ fn valkey_value_to_command_response(value: Value) -> RedisResult<CommandResponse
10621110
Value::Array(array) => {
10631111
let vec: Result<Vec<CommandResponse>, RedisError> = array
10641112
.into_iter()
1065-
.map(valkey_value_to_command_response)
1113+
.map(|v| valkey_value_to_command_response(v, None))
10661114
.collect();
10671115
let (vec_ptr, len) = convert_vec_to_pointer(vec?);
10681116
command_response.array_value = vec_ptr;
@@ -1076,13 +1124,13 @@ fn valkey_value_to_command_response(value: Value) -> RedisResult<CommandResponse
10761124
.map(|(key, val)| {
10771125
let mut map_response = CommandResponse::default();
10781126

1079-
let map_key = match valkey_value_to_command_response(key) {
1127+
let map_key = match valkey_value_to_command_response(key, None) {
10801128
Ok(map_key) => map_key,
10811129
Err(err) => return Err(err),
10821130
};
10831131
map_response.map_key = Box::into_raw(Box::new(map_key));
10841132

1085-
let map_val = match valkey_value_to_command_response(val) {
1133+
let map_val = match valkey_value_to_command_response(val, None) {
10861134
Ok(map_val) => map_val,
10871135
Err(err) => return Err(err),
10881136
};
@@ -1101,7 +1149,7 @@ fn valkey_value_to_command_response(value: Value) -> RedisResult<CommandResponse
11011149
Value::Set(array) => {
11021150
let vec: Result<Vec<CommandResponse>, RedisError> = array
11031151
.into_iter()
1104-
.map(valkey_value_to_command_response)
1152+
.map(|v| valkey_value_to_command_response(v, None))
11051153
.collect();
11061154
let (vec_ptr, len) = convert_vec_to_pointer(vec?);
11071155
command_response.sets_value = vec_ptr;
@@ -1126,18 +1174,18 @@ fn valkey_value_to_command_response(value: Value) -> RedisResult<CommandResponse
11261174
// Create kind entry
11271175
let mut kind_entry = CommandResponse::default();
11281176
let map_key =
1129-
valkey_value_to_command_response(Value::SimpleString("kind".to_string()))?;
1177+
valkey_value_to_command_response(Value::SimpleString("kind".to_string()), None)?;
11301178
kind_entry.map_key = Box::into_raw(Box::new(map_key));
11311179
let map_val =
1132-
valkey_value_to_command_response(Value::SimpleString(format!("{:?}", kind)))?;
1180+
valkey_value_to_command_response(Value::SimpleString(format!("{:?}", kind)), None)?;
11331181
kind_entry.map_value = Box::into_raw(Box::new(map_val));
11341182

11351183
// Create values entry
11361184
let mut values_entry = CommandResponse::default();
11371185
let map_key =
1138-
valkey_value_to_command_response(Value::SimpleString("values".to_string()))?;
1186+
valkey_value_to_command_response(Value::SimpleString("values".to_string()), None)?;
11391187
values_entry.map_key = Box::into_raw(Box::new(map_key));
1140-
let map_val = valkey_value_to_command_response(Value::Array(data))?;
1188+
let map_val = valkey_value_to_command_response(Value::Array(data), None)?;
11411189
values_entry.map_value = Box::into_raw(Box::new(map_val));
11421190

11431191
let (map_ptr, map_len) = convert_vec_to_pointer(vec![kind_entry, values_entry]);
@@ -1181,6 +1229,64 @@ pub unsafe extern "C-unwind" fn command(
11811229
route_bytes: *const u8,
11821230
route_bytes_len: usize,
11831231
span_ptr: u64,
1232+
) -> *mut CommandResult {
1233+
unsafe {
1234+
command_with_buffer(
1235+
client_adapter_ptr,
1236+
request_id,
1237+
command_type,
1238+
arg_count,
1239+
args,
1240+
args_len,
1241+
route_bytes,
1242+
route_bytes_len,
1243+
std::ptr::null_mut(),
1244+
0,
1245+
span_ptr,
1246+
)
1247+
}
1248+
}
1249+
1250+
/// Executes a command, optionally copying a BulkString response directly into a
1251+
/// caller-provided buffer instead of returning it as a heap-allocated value.
1252+
///
1253+
/// When `response_buf` is null (and `response_buf_len` is 0), behaves identically
1254+
/// to [`command`] — the response flows through the normal `execute_request` path.
1255+
///
1256+
/// When `response_buf` is non-null, the response is written directly into the buffer:
1257+
/// - `response.string_value` = number of bytes written as a string, or Nil response for missing keys.
1258+
/// - Errors if the value exceeds `response_buf_len`.
1259+
///
1260+
/// # Safety
1261+
///
1262+
/// * `client_adapter_ptr` must not be `null` and must be obtained from the `ConnectionResponse` returned from [`create_client`].
1263+
/// * `client_adapter_ptr` must be able to be safely casted to a valid [`Arc<ClientAdapter>`] via [`Arc::from_raw`]. See the safety documentation of [`std::sync::Arc::from_raw`].
1264+
/// * `request_id` must be a request ID from the foreign language and must be valid until either `success_callback` or `failure_callback` is finished.
1265+
/// * `args` is an optional bytes pointers array. The array must be allocated by the caller and subsequently freed by the caller after this function returns.
1266+
/// * `args_len` is an optional bytes length array. The array must be allocated by the caller and subsequently freed by the caller after this function returns.
1267+
/// * `arg_count` the number of elements in `args` and `args_len`. It must also not be greater than the max value of a signed pointer-sized integer.
1268+
/// * `arg_count` must be 0 if `args` and `args_len` are null.
1269+
/// * `args` and `args_len` must either be both null or be both not null.
1270+
/// * `route_bytes` is an optional array of bytes that will be parsed into a Protobuf `Routes` object. The array must be allocated by the caller and subsequently freed by the caller after this function returns.
1271+
/// * `route_bytes_len` is the number of bytes in `route_bytes`. It must also not be greater than the max value of a signed pointer-sized integer.
1272+
/// * `route_bytes_len` must be 0 if `route_bytes` is null.
1273+
/// * When non-null, `response_buf` must point to a writable buffer of at least `response_buf_len` bytes.
1274+
/// * `response_buf_len` must be 0 if `response_buf` is null.
1275+
/// * `span_ptr` is a valid pointer to [`Arc<GlideSpan>`], a span created by [`create_otel_span`] or `0`. The span must be valid until the command is finished.
1276+
/// * This function should only be called with a `client_adapter_ptr` created by [`create_client`], before [`close_client`] was called with the pointer.
1277+
#[unsafe(no_mangle)]
1278+
pub unsafe extern "C-unwind" fn command_with_buffer(
1279+
client_adapter_ptr: *const c_void,
1280+
request_id: usize,
1281+
command_type: RequestType,
1282+
arg_count: c_ulong,
1283+
args: *const usize,
1284+
args_len: *const c_ulong,
1285+
route_bytes: *const u8,
1286+
route_bytes_len: usize,
1287+
response_buf: *mut u8,
1288+
response_buf_len: usize,
1289+
span_ptr: u64,
11841290
) -> *mut CommandResult {
11851291
let client_adapter = unsafe {
11861292
// we increment the strong count to ensure that the client is not dropped just because we turned it into an Arc.
@@ -1239,6 +1345,7 @@ pub unsafe extern "C-unwind" fn command(
12391345
cmd.arg(command_arg);
12401346
}
12411347
}
1348+
12421349
if span_ptr != 0 {
12431350
cmd.set_span(unsafe { get_unsafe_span_from_ptr(Some(span_ptr)) });
12441351
}
@@ -1269,12 +1376,23 @@ pub unsafe extern "C-unwind" fn command(
12691376
let child_span = create_child_span(cmd.span().as_ref(), "send_command");
12701377
let mut client = client_adapter.core.client.clone();
12711378
let client_for_release = client_adapter.core.client.clone();
1272-
let result = client_adapter.execute_request(request_id, async move {
1273-
let routing_info = get_route(route, Some(&cmd))?;
1274-
let result = client.send_command(&mut cmd, routing_info).await;
1275-
client_for_release.release_inflight_request();
1276-
result
1277-
});
1379+
1380+
let buf_option = if response_buf.is_null() {
1381+
None
1382+
} else {
1383+
Some(ResponseBuffer(response_buf, response_buf_len))
1384+
};
1385+
1386+
let result = client_adapter.execute_request_with_buffer(
1387+
request_id,
1388+
async move {
1389+
let routing_info = get_route(route, Some(&cmd))?;
1390+
let result = client.send_command(&mut cmd, routing_info).await;
1391+
client_for_release.release_inflight_request();
1392+
result
1393+
},
1394+
buf_option,
1395+
);
12781396
if let Ok(span) = child_span {
12791397
span.end();
12801398
}

python/glide-sync/glide_sync/_glide_ffi.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,20 @@ def _init_ffi(self):
117117
uint64_t span_ptr
118118
);
119119
120+
CommandResult* command_with_buffer(
121+
const void* client_adapter_ptr,
122+
uintptr_t request_id,
123+
int command_type,
124+
unsigned long arg_count,
125+
const size_t *args,
126+
const unsigned long* args_len,
127+
const unsigned char* route_bytes,
128+
size_t route_bytes_len,
129+
uint8_t* target_buf,
130+
size_t target_len,
131+
uint64_t span_ptr
132+
);
133+
120134
CommandResult* invoke_script(
121135
const void* client_adapter_ptr,
122136
uintptr_t request_id,

python/glide-sync/glide_sync/glide_client.py

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,7 @@ def _execute_command(
392392
request_type: RequestType.ValueType,
393393
args: List[TEncodable],
394394
route: Optional[Route] = None,
395+
response_buffer: Optional[memoryview] = None,
395396
) -> TResult:
396397
if self._is_closed:
397398
raise ClosingError(
@@ -400,6 +401,11 @@ def _execute_command(
400401
client_adapter_ptr = self._core_client
401402
if client_adapter_ptr == self._ffi.NULL:
402403
raise ValueError("Invalid client pointer.")
404+
if response_buffer:
405+
if response_buffer.readonly:
406+
raise TypeError("response_buffer must be writable")
407+
if not response_buffer.c_contiguous:
408+
raise TypeError("response_buffer must be C-contiguous")
403409

404410
# Create span if OpenTelemetry is configured and sampling indicates we should trace
405411
from .opentelemetry import OpenTelemetry
@@ -420,16 +426,24 @@ def _execute_command(
420426
# Route bytes should be kept alive in the scope of the FFI call
421427
route_ptr, route_len, route_bytes = self._to_c_route_ptr_and_len(route)
422428

423-
result = self._lib.command(
424-
client_adapter_ptr, # Pointer to the ClientAdapter from create_client()
425-
0, # Request ID - placeholder for sync clients (used for async callbacks)
426-
request_type, # Request type (e.g., GET or SET)
427-
len(args), # Number of arguments
428-
c_args, # Array of argument pointers
429-
c_lengths, # Array of argument lengths
430-
route_ptr, # Pointer to protobuf-encoded routing information (NULL if no routing)
431-
route_len, # Length of the routing data in bytes (0 if no routing)
432-
span, # Span pointer for tracing
429+
buf_ptr = (
430+
self._ffi.from_buffer(response_buffer)
431+
if response_buffer
432+
else self._ffi.NULL
433+
)
434+
buf_len = len(response_buffer) if response_buffer else 0
435+
result = self._lib.command_with_buffer(
436+
client_adapter_ptr,
437+
0,
438+
request_type,
439+
len(args),
440+
c_args,
441+
c_lengths,
442+
route_ptr,
443+
route_len,
444+
buf_ptr,
445+
buf_len,
446+
span,
433447
)
434448
finally:
435449
# Drop span if it was created

0 commit comments

Comments
 (0)