diff --git a/cuda_core/cuda/core/experimental/__init__.py b/cuda_core/cuda/core/experimental/__init__.py index fffb80a5c6..a061193213 100644 --- a/cuda_core/cuda/core/experimental/__init__.py +++ b/cuda_core/cuda/core/experimental/__init__.py @@ -14,7 +14,13 @@ from cuda.core.experimental._launch_config import LaunchConfig from cuda.core.experimental._launcher import launch from cuda.core.experimental._linker import Linker, LinkerOptions -from cuda.core.experimental._memory import Buffer, DeviceMemoryResource, LegacyPinnedMemoryResource, MemoryResource +from cuda.core.experimental._memory import ( + Buffer, + DeviceMemoryResource, + IPCChannel, + LegacyPinnedMemoryResource, + MemoryResource, +) from cuda.core.experimental._module import Kernel, ObjectCode from cuda.core.experimental._program import Program, ProgramOptions from cuda.core.experimental._stream import Stream, StreamOptions diff --git a/cuda_core/cuda/core/experimental/_memory.pyx b/cuda_core/cuda/core/experimental/_memory.pyx index 44e7a77c77..41a506a58e 100644 --- a/cuda_core/cuda/core/experimental/_memory.pyx +++ b/cuda_core/cuda/core/experimental/_memory.pyx @@ -5,18 +5,30 @@ from __future__ import annotations from libc.stdint cimport uintptr_t - from cuda.core.experimental._utils.cuda_utils cimport ( _check_driver_error as raise_if_driver_error, + check_or_create_options, ) +from dataclasses import dataclass +from typing import TypeVar, Union, TYPE_CHECKING import abc -from typing import TypeVar, Union - +import array +import cython +import os +import platform +import weakref from cuda.core.experimental._dlpack import DLDeviceType, make_py_capsule from cuda.core.experimental._stream import Stream, default_stream from cuda.core.experimental._utils.cuda_utils import driver +if platform.system() == "Linux": + import socket + +if TYPE_CHECKING: + import cuda.bindings.driver + from cuda.core.experimental._device import Device + # TODO: define a memory property mixin class and make Buffer and # MemoryResource both inherit from it @@ -119,6 +131,25 @@ cdef class Buffer: return self._mr.device_id raise NotImplementedError("WIP: Currently this property only supports buffers with associated MemoryResource") + def export(self) -> IPCBufferDescriptor: + """Export a buffer allocated for sharing between processes.""" + if not self._mr.is_ipc_enabled: + raise RuntimeError("Memory resource is not IPC-enabled") + err, ptr = driver.cuMemPoolExportPointer(self.handle) + raise_if_driver_error(err) + return IPCBufferDescriptor._init(ptr.reserved, self.size) + + @classmethod + def import_(cls, mr: MemoryResource, ipc_buffer: IPCBufferDescriptor) -> Buffer: + """Import a buffer that was exported from another process.""" + if not mr.is_ipc_enabled: + raise RuntimeError("Memory resource is not IPC-enabled") + share_data = driver.CUmemPoolPtrExportData() + share_data.reserved = ipc_buffer._reserved + err, ptr = driver.cuMemPoolImportPointer(mr._mempool_handle, share_data) + raise_if_driver_error(err) + return Buffer.from_handle(ptr, ipc_buffer.size, mr) + def copy_to(self, dst: Buffer = None, *, stream: Stream) -> Buffer: """Copy from this buffer to the dst buffer asynchronously on the given stream. @@ -251,8 +282,6 @@ class MemoryResource(abc.ABC): memory resource's respective property.) """ - __slots__ = ("_handle",) - @abc.abstractmethod def __init__(self, *args, **kwargs): """Initialize the memory resource. @@ -324,40 +353,374 @@ class MemoryResource(abc.ABC): ... +# IPC is currently only supported on Linux. On other platforms, the IPC handle +# type is set equal to the no-IPC handle type. + +_NOIPC_HANDLE_TYPE = driver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_NONE +_IPC_HANDLE_TYPE = driver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR \ + if platform.system() == "Linux" else _NOIPC_HANDLE_TYPE + +cdef class IPCBufferDescriptor: + """Serializable object describing a buffer that can be shared between processes.""" + + cdef: + bytes _reserved + size_t _size + + def __init__(self, *arg, **kwargs): + raise RuntimeError("IPCBufferDescriptor objects cannot be instantiated directly. Please use MemoryResource APIs.") + + @classmethod + def _init(cls, reserved: bytes, size: int): + cdef IPCBufferDescriptor self = IPCBufferDescriptor.__new__(cls) + self._reserved = reserved + self._size = size + return self + + def __reduce__(self): + # This is subject to change if the CUmemPoolPtrExportData struct/object changes. + return (self._reconstruct, (self._reserved, self._size)) + + @property + def size(self): + return self._size + + @classmethod + def _reconstruct(cls, reserved, size): + instance = cls._init(reserved, size) + return instance + + +cdef class IPCAllocationHandle: + """Shareable handle to an IPC-enabled device memory pool.""" + + cdef: + int _handle + + def __init__(self, *arg, **kwargs): + raise RuntimeError("IPCAllocationHandle objects cannot be instantiated directly. Please use MemoryResource APIs.") + + @classmethod + def _init(cls, handle: int): + cdef IPCAllocationHandle self = IPCAllocationHandle.__new__(cls) + assert handle >= 0 + self._handle = handle + return self + + cpdef close(self): + """Close the handle.""" + if self._handle >= 0: + try: + os.close(self._handle) + finally: + self._handle = -1 + + def __del__(self): + """Close the handle.""" + self.close() + + def __int__(self) -> int: + if self._handle < 0: + raise ValueError( + f"Cannot convert IPCAllocationHandle to int: the handle (id={id(self)}) is closed." + ) + return self._handle + + @property + def handle(self) -> int: + return self._handle + + +cdef class IPCChannel: + """Communication channel for sharing IPC-enabled memory pools.""" + + cdef: + object _proxy + + def __init__(self): + if platform.system() == "Linux": + self._proxy = IPCChannelUnixSocket._init() + else: + raise RuntimeError("IPC is not available on {platform.system()}") + + +cdef class IPCChannelUnixSocket: + """Unix-specific channel for sharing memory pools over sockets.""" + + cdef: + object _sock_out + object _sock_in + + def __init__(self, *arg, **kwargs): + raise RuntimeError("IPCChannelUnixSocket objects cannot be instantiated directly. Please use MemoryResource APIs.") + + @classmethod + def _init(cls): + cdef IPCChannelUnixSocket self = IPCChannelUnixSocket.__new__(cls) + self._sock_out, self._sock_in = socket.socketpair(socket.AF_UNIX, socket.SOCK_SEQPACKET) + return self + + cpdef _send_allocation_handle(self, alloc_handle: IPCAllocationHandle): + """Sends over this channel an allocation handle for exporting a + shared memory pool.""" + self._sock_out.sendmsg( + [], + [(socket.SOL_SOCKET, socket.SCM_RIGHTS, array.array("i", [int(alloc_handle)]))] + ) + + cpdef IPCAllocationHandle _receive_allocation_handle(self): + """Receives over this channel an allocation handle for importing a + shared memory pool.""" + fds = array.array("i") + _, ancillary_data, _, _ = self._sock_in.recvmsg(0, socket.CMSG_LEN(fds.itemsize)) + assert len(ancillary_data) == 1 + cmsg_level, cmsg_type, cmsg_data = ancillary_data[0] + assert cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS + fds.frombytes(cmsg_data[: len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) + return IPCAllocationHandle._init(int(fds[0])) + + +@dataclass +cdef class DeviceMemoryResourceOptions: + """Customizable :obj:`~_memory.DeviceMemoryResource` options. + + Attributes + ---------- + ipc_enabled : bool, optional + Specifies whether to create an IPC-enabled memory pool. When set to + True, the memory pool and its allocations can be shared with other + processes. (Default to False) + + max_size : int, optional + Maximum pool size. When set to 0, defaults to a system-dependent value. + (Default to 0) + """ + ipc_enabled : cython.bint = False + max_size : cython.int = 0 + + +class DeviceMemoryResourceAttributes: + def __init__(self, *args, **kwargs): + raise RuntimeError("DeviceMemoryResourceAttributes cannot be instantiated directly. Please use MemoryResource APIs.") + + @classmethod + def _init(cls, mr : DeviceMemoryReference): + self = DeviceMemoryResourceAttributes.__new__(cls) + self._mr = mr + return self + + def mempool_property(property_type: type): + def decorator(stub): + attr_enum = getattr(driver.CUmemPool_attribute, f"CU_MEMPOOL_ATTR_{stub.__name__.upper()}") + + def fget(self) -> property_type: + mr = self._mr() + if mr is None: + raise RuntimeError("DeviceMemoryResource is expired") + err, value = driver.cuMemPoolGetAttribute(mr._mempool_handle, attr_enum) + raise_if_driver_error(err) + return property_type(value) + return property(fget=fget, doc=stub.__doc__) + return decorator + + @mempool_property(bool) + def reuse_follow_event_dependencies(self): + """Allow memory to be reused when there are event dependencies between streams.""" + + @mempool_property(bool) + def reuse_allow_opportunistic(self): + """Allow reuse of completed frees without dependencies.""" + + @mempool_property(bool) + def reuse_allow_internal_dependencies(self): + """Allow insertion of new stream dependencies for memory reuse.""" + + @mempool_property(int) + def release_threshold(self): + """Amount of reserved memory to hold before OS release.""" + + @mempool_property(int) + def reserved_mem_current(self): + """Current amount of backing memory allocated.""" + + @mempool_property(int) + def reserved_mem_high(self): + """High watermark of backing memory allocated.""" + + @mempool_property(int) + def used_mem_current(self): + """Current amount of memory in use.""" + + @mempool_property(int) + def used_mem_high(self): + """High watermark of memory in use.""" + + del mempool_property + + class DeviceMemoryResource(MemoryResource): - """Create a device memory resource that uses the driver's stream-ordered memory pool. + """Create a device memory resource managing a stream-ordered memory pool. Parameters ---------- - device_id : int - Device ordinal for which a memory resource is constructed. The mempool that is - set to *current* on ``device_id`` is used. If no mempool is set to current yet, - the driver would use the *default* mempool on the device. - """ + device_id : int | Device + Device or Device ordinal for which a memory resource is constructed. - __slots__ = ("_dev_id",) + options : DeviceMemoryResourceOptions + Memory resource creation options. - def __init__(self, device_id: int): - err, self._handle = driver.cuDeviceGetMemPool(device_id) - raise_if_driver_error(err) - self._dev_id = device_id + If set to `None`, the memory resource uses the driver's current + stream-ordered memory pool for the specified `device_id`. If no memory + pool is set as current, the driver's default memory pool for the device + is used. - # Set a higher release threshold to improve performance when there are no active allocations. - # By default, the release threshold is 0, which means memory is immediately released back - # to the OS when there are no active suballocations, causing performance issues. - # Check current release threshold - err, current_threshold = driver.cuMemPoolGetAttribute( - self._handle, driver.CUmemPool_attribute.CU_MEMPOOL_ATTR_RELEASE_THRESHOLD + If not set to `None`, a new memory pool is created, which is owned by + the memory resource. + + When using an existing (current or default) memory pool, the returned + device memory resource does not own the pool (`is_handle_owned` is + `False`), and closing the resource has no effect. + """ + __slots__ = "_dev_id", "_mempool_handle", "_attributes", "_ipc_handle_type", "_mempool_owned", "_is_imported" + + def __init__(self, device_id: int | Device, options=None): + device_id = getattr(device_id, 'device_id', device_id) + opts = check_or_create_options( + DeviceMemoryResourceOptions, options, "DeviceMemoryResource options", keep_none=True ) - raise_if_driver_error(err) - # If threshold is 0 (default), set it to maximum to retain memory in the pool - if int(current_threshold) == 0: - err, = driver.cuMemPoolSetAttribute( - self._handle, - driver.CUmemPool_attribute.CU_MEMPOOL_ATTR_RELEASE_THRESHOLD, - driver.cuuint64_t(0xFFFFFFFFFFFFFFFF), + + if opts is None: + # Get the current memory pool. + self._dev_id = device_id + self._mempool_handle = None + self._attributes = None + self._ipc_handle_type = _NOIPC_HANDLE_TYPE + self._mempool_owned = False + self._is_imported = False + + err, self._mempool_handle = driver.cuDeviceGetMemPool(self.device_id) + raise_if_driver_error(err) + + # Set a higher release threshold to improve performance when there are no active allocations. + # By default, the release threshold is 0, which means memory is immediately released back + # to the OS when there are no active suballocations, causing performance issues. + # Check current release threshold + err, current_threshold = driver.cuMemPoolGetAttribute( + self._mempool_handle, driver.CUmemPool_attribute.CU_MEMPOOL_ATTR_RELEASE_THRESHOLD ) raise_if_driver_error(err) + # If threshold is 0 (default), set it to maximum to retain memory in the pool + if int(current_threshold) == 0: + err, = driver.cuMemPoolSetAttribute( + self._mempool_handle, + driver.CUmemPool_attribute.CU_MEMPOOL_ATTR_RELEASE_THRESHOLD, + driver.cuuint64_t(0xFFFFFFFFFFFFFFFF), + ) + raise_if_driver_error(err) + else: + # Create a new memory pool. + if opts.ipc_enabled and _IPC_HANDLE_TYPE == _NOIPC_HANDLE_TYPE: + raise RuntimeError("IPC is not available on {platform.system()}") + + properties = driver.CUmemPoolProps() + properties.allocType = driver.CUmemAllocationType.CU_MEM_ALLOCATION_TYPE_PINNED + properties.handleTypes = _IPC_HANDLE_TYPE if opts.ipc_enabled else _NOIPC_HANDLE_TYPE + properties.location = driver.CUmemLocation() + properties.location.id = device_id + properties.location.type = driver.CUmemLocationType.CU_MEM_LOCATION_TYPE_DEVICE + properties.maxSize = opts.max_size + properties.win32SecurityAttributes = 0 + properties.usage = 0 + + self._dev_id = device_id + self._mempool_handle = None + self._attributes = None + self._ipc_handle_type = properties.handleTypes + self._mempool_owned = True + self._is_imported = False + + err, self._mempool_handle = driver.cuMemPoolCreate(properties) + raise_if_driver_error(err) + + def __del__(self): + self.close() + + def close(self): + """Close the device memory resource and destroy the associated memory pool if owned.""" + if self._mempool_handle is not None and self._mempool_owned: + err, = driver.cuMemPoolDestroy(self._mempool_handle) + raise_if_driver_error(err) + + self._dev_id = None + self._mempool_handle = None + self._attributes = None + self._ipc_handle_type = _NOIPC_HANDLE_TYPE + self._mempool_owned = False + self._is_imported = False + + @classmethod + def from_shared_channel(cls, device_id: int | Device, channel: IPCChannel) -> DeviceMemoryResource: + """Create a device memory resource from a memory pool shared over an IPC channel.""" + device_id = getattr(device_id, 'device_id', device_id) + alloc_handle = channel._proxy._receive_allocation_handle() + return cls._from_allocation_handle(device_id, alloc_handle) + + @classmethod + def _from_allocation_handle(cls, device_id: int | Device, alloc_handle: IPCAllocationHandle) -> DeviceMemoryResource: + """Create a device memory resource from an allocation handle. + + Construct a new `DeviceMemoryResource` instance that imports a memory + pool from a shareable handle. The memory pool is marked as owned, and + the resource is associated with the specified `device_id`. + + Parameters + ---------- + device_id : int | Device + The ID of the device or a Device object for which the memory + resource is created. + + alloc_handle : int + The shareable handle of the device memory resource to import. + + Returns + ------- + A new device memory resource instance with the imported handle. + """ + device_id = getattr(device_id, 'device_id', device_id) + + self = cls.__new__(cls) + self._dev_id = device_id + self._mempool_handle = None + self._attributes = None + self._ipc_handle_type = _IPC_HANDLE_TYPE + self._mempool_owned = True + self._is_imported = True + + err, self._mempool_handle = driver.cuMemPoolImportFromShareableHandle(int(alloc_handle), _IPC_HANDLE_TYPE, 0) + raise_if_driver_error(err) + + return self + + def share_to_channel(self, channel : IPCChannel): + if not self.is_ipc_enabled: + raise RuntimeError("Memory resource is not IPC-enabled") + channel._proxy._send_allocation_handle(self._get_allocation_handle()) + + def _get_allocation_handle(self) -> IPCAllocationHandle: + """Export the memory pool handle to be shared (requires IPC). + + The handle can be used to share the memory pool with other processes. + The handle is cached in this `MemoryResource` and owned by it. + + Returns + ------- + The shareable handle for the memory pool. + """ + if not self.is_ipc_enabled: + raise RuntimeError("Memory resource is not IPC-enabled") + err, alloc_handle = driver.cuMemPoolExportToShareableHandle(self._mempool_handle, _IPC_HANDLE_TYPE, 0) + raise_if_driver_error(err) + return IPCAllocationHandle._init(alloc_handle) def allocate(self, size_t size, stream: Stream = None) -> Buffer: """Allocate a buffer of the requested size. @@ -376,9 +739,11 @@ class DeviceMemoryResource(MemoryResource): The allocated buffer object, which is accessible on the device that this memory resource was created for. """ + if self._is_imported: + raise TypeError("Cannot allocate from shared memory pool imported via IPC") if stream is None: stream = default_stream() - err, ptr = driver.cuMemAllocFromPoolAsync(size, self._handle, stream.handle) + err, ptr = driver.cuMemAllocFromPoolAsync(size, self._mempool_handle, stream.handle) raise_if_driver_error(err) return Buffer._init(ptr, size, self) @@ -400,20 +765,47 @@ class DeviceMemoryResource(MemoryResource): err, = driver.cuMemFreeAsync(ptr, stream.handle) raise_if_driver_error(err) + @property + def attributes(self) -> DeviceMemoryResourceAttributes: + if self._attributes is None: + ref = weakref.ref(self) + self._attributes = DeviceMemoryResourceAttributes._init(ref) + return self._attributes + + @property + def device_id(self) -> int: + """The associated device ordinal.""" + return self._dev_id + + @property + def handle(self) -> cuda.bindings.driver.CUmemoryPool: + """Handle to the underlying memory pool.""" + return self._mempool_handle + + @property + def is_handle_owned(self) -> bool: + """Whether the memory resource handle is owned. If False, ``close`` has no effect.""" + return self._mempool_owned + + @property + def is_imported(self) -> bool: + """Whether the memory resource was imported from another process. If True, allocation is not permitted.""" + return self._is_imported + @property def is_device_accessible(self) -> bool: - """bool: this memory resource provides device-accessible buffers.""" + """Return True. This memory resource provides device-accessible buffers.""" return True @property def is_host_accessible(self) -> bool: - """bool: this memory resource does not provides host-accessible buffers.""" + """Return False. This memory resource does not provide host-accessible buffers.""" return False @property - def device_id(self) -> int: - """int: the associated device ordinal.""" - return self._dev_id + def is_ipc_enabled(self) -> bool: + """Whether this memory resource has IPC enabled.""" + return self._ipc_handle_type != _NOIPC_HANDLE_TYPE class LegacyPinnedMemoryResource(MemoryResource): @@ -481,9 +873,9 @@ class LegacyPinnedMemoryResource(MemoryResource): class _SynchronousMemoryResource(MemoryResource): __slots__ = ("_dev_id",) - def __init__(self, device_id): + def __init__(self, device_id : int | Device): self._handle = None - self._dev_id = device_id + self._dev_id = getattr(device_id, 'device_id', device_id) def allocate(self, size, stream=None) -> Buffer: err, ptr = driver.cuMemAlloc(size) diff --git a/cuda_core/cuda/core/experimental/_stream.pyx b/cuda_core/cuda/core/experimental/_stream.pyx index 64ae09529f..9d9271f65b 100644 --- a/cuda_core/cuda/core/experimental/_stream.pyx +++ b/cuda_core/cuda/core/experimental/_stream.pyx @@ -9,6 +9,7 @@ from cuda.core.experimental._utils.cuda_utils cimport ( check_or_create_options, ) +import cython import os import warnings from dataclasses import dataclass @@ -42,7 +43,7 @@ cdef class StreamOptions: """ - nonblocking: bool = True + nonblocking : cython.bint = True priority: Optional[int] = None diff --git a/cuda_core/docs/source/api_private.rst b/cuda_core/docs/source/api_private.rst index afbbb7ce38..fb36e0a309 100644 --- a/cuda_core/docs/source/api_private.rst +++ b/cuda_core/docs/source/api_private.rst @@ -18,6 +18,7 @@ CUDA runtime _memory.PyCapsule _memory.DevicePointerT + _memory.IPCBufferDescriptor _device.DeviceProperties _module.KernelAttributes _module.KernelOccupancy diff --git a/cuda_core/docs/source/release/0.X.Y-notes.rst b/cuda_core/docs/source/release/0.X.Y-notes.rst index 8024a14f62..8d639c177e 100644 --- a/cuda_core/docs/source/release/0.X.Y-notes.rst +++ b/cuda_core/docs/source/release/0.X.Y-notes.rst @@ -28,6 +28,7 @@ New features - Added :attr:`Device.arch` property that returns the compute capability as a string (e.g., '75' for CC 7.5), providing a convenient alternative to manually concatenating the compute capability tuple. - CUDA 13.x testing support through new ``test-cu13`` dependency group. +- Stream-ordered memory allocation can now be shared on Linux via :class:`DeviceMemoryResource`. New examples @@ -44,3 +45,4 @@ Fixes and enhancements - Fix :class:`LaunchConfig` grid unit conversion when cluster is set (addresses issue #867). - Fixed a bug in :class:`GraphBuilder.add_child` where dependencies extracted from capturing stream were passed inconsistently with num_dependencies parameter (addresses issue #843). - Make :class:`Buffer` creation more performant. +- Enabled :class:`MemoryResource` subclasses to accept :class:`Device` objects, in addition to previously supported device ordinals. diff --git a/cuda_core/tests/conftest.py b/cuda_core/tests/conftest.py index 57fed98389..c800aae3e0 100644 --- a/cuda_core/tests/conftest.py +++ b/cuda_core/tests/conftest.py @@ -7,6 +7,8 @@ from cuda.bindings import driver except ImportError: from cuda import cuda as driver +import multiprocessing + import pytest from cuda.core.experimental import Device, _device @@ -14,9 +16,13 @@ @pytest.fixture(scope="session", autouse=True) -def always_init_cuda(): +def session_setup(): + # Always init CUDA. handle_return(driver.cuInit(0)) + # Never fork processes. + multiprocessing.set_start_method("spawn", force=True) + @pytest.fixture(scope="function") def init_cuda(): diff --git a/cuda_core/tests/test_ipc_mempool.py b/cuda_core/tests/test_ipc_mempool.py new file mode 100644 index 0000000000..5c4c382753 --- /dev/null +++ b/cuda_core/tests/test_ipc_mempool.py @@ -0,0 +1,179 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +try: + from cuda.bindings import driver +except ImportError: + from cuda import cuda as driver + +import ctypes +import multiprocessing + +import pytest + +from cuda.core.experimental import Buffer, Device, DeviceMemoryResource, IPCChannel, MemoryResource +from cuda.core.experimental._utils.cuda_utils import handle_return + +CHILD_TIMEOUT_SEC = 10 +NBYTES = 64 +POOL_SIZE = 2097152 + + +@pytest.fixture(scope="function") +def ipc_device(): + """Obtains a device suitable for IPC-enabled mempool tests, or skips.""" + # Check if IPC is supported on this platform/device + device = Device() + device.set_current() + + if not device.properties.memory_pools_supported: + pytest.skip("Device does not support mempool operations") + + # Note: Linux specific. Once Windows support for IPC is implemented, this + # test should be updated. + if not device.properties.handle_type_posix_file_descriptor_supported: + pytest.skip("Device does not support IPC") + + return device + + +def test_ipc_mempool(ipc_device): + """Test IPC with memory pools.""" + # Set up the IPC-enabled memory pool and share it. + stream = ipc_device.create_stream() + mr = DeviceMemoryResource(ipc_device, dict(max_size=POOL_SIZE, ipc_enabled=True)) + assert mr.is_ipc_enabled + channel = IPCChannel() + mr.share_to_channel(channel) + + # Start the child process. + queue = multiprocessing.Queue() + process = multiprocessing.Process(target=child_main1, args=(channel, queue)) + process.start() + + # Allocate and fill memory. + buffer = mr.allocate(NBYTES, stream=stream) + protocol = IPCBufferTestProtocol(ipc_device, buffer, stream=stream) + protocol.fill_buffer(flipped=False) + stream.sync() + + # Export the buffer via IPC. + handle = buffer.export() + queue.put(handle) + + # Wait for the child process. + process.join(timeout=CHILD_TIMEOUT_SEC) + assert process.exitcode == 0 + + # Verify that the buffer was modified. + protocol.verify_buffer(flipped=True) + + +def child_main1(channel, queue): + device = Device() + device.set_current() + stream = device.create_stream() + + mr = DeviceMemoryResource.from_shared_channel(device, channel) + handle = queue.get() # Get exported buffer data + buffer = Buffer.import_(mr, handle) + + protocol = IPCBufferTestProtocol(device, buffer, stream=stream) + protocol.verify_buffer(flipped=False) + protocol.fill_buffer(flipped=True) + stream.sync() + + +def test_shared_pool_errors(ipc_device): + """Test expected errors with allocating from a shared IPC memory pool.""" + # Set up the IPC-enabled memory pool and share it. + mr = DeviceMemoryResource(ipc_device, dict(max_size=POOL_SIZE, ipc_enabled=True)) + channel = IPCChannel() + mr.share_to_channel(channel) + + # Start a child process to generate error info. + queue = multiprocessing.Queue() + process = multiprocessing.Process(target=child_main2, args=(channel, queue)) + process.start() + + # Check the errors. + exc_type, exc_msg = queue.get(timeout=CHILD_TIMEOUT_SEC) + assert exc_type is TypeError + assert exc_msg == "Cannot allocate from shared memory pool imported via IPC" + + # Wait for the child process. + process.join(timeout=CHILD_TIMEOUT_SEC) + assert process.exitcode == 0 + + +def child_main2(channel, queue): + """Child process that pushes IPC errors to a shared queue for testing.""" + device = Device() + device.set_current() + + mr = DeviceMemoryResource.from_shared_channel(device, channel) + + # Allocating from an imported pool. + try: + mr.allocate(NBYTES) + except Exception as e: + exc_info = type(e), str(e) + queue.put(exc_info) + + +class DummyUnifiedMemoryResource(MemoryResource): + def __init__(self, device): + self.device = device + + def allocate(self, size, stream=None) -> Buffer: + ptr = handle_return(driver.cuMemAllocManaged(size, driver.CUmemAttach_flags.CU_MEM_ATTACH_GLOBAL.value)) + return Buffer.from_handle(ptr=ptr, size=size, mr=self) + + def deallocate(self, ptr, size, stream=None): + handle_return(driver.cuMemFree(ptr)) + + @property + def is_device_accessible(self) -> bool: + return True + + @property + def is_host_accessible(self) -> bool: + return True + + @property + def device_id(self) -> int: + return self.device + + +class IPCBufferTestProtocol: + """The protocol for verifying IPC. + + Provides methods to fill a buffer with one of two test patterns and verify + the expected values. + """ + + def __init__(self, device, buffer, nbytes=NBYTES, stream=None): + self.device = device + self.buffer = buffer + self.nbytes = nbytes + self.stream = stream if stream is not None else device.create_stream() + self.scratch_buffer = DummyUnifiedMemoryResource(self.device).allocate(self.nbytes, stream=self.stream) + + def fill_buffer(self, flipped=False): + """Fill a device buffer with test pattern using unified memory.""" + ptr = ctypes.cast(int(self.scratch_buffer.handle), ctypes.POINTER(ctypes.c_byte)) + op = (lambda i: 255 - i) if flipped else (lambda i: i) + for i in range(self.nbytes): + ptr[i] = ctypes.c_byte(op(i)) + self.buffer.copy_from(self.scratch_buffer, stream=self.stream) + + def verify_buffer(self, flipped=False): + """Verify the buffer contents.""" + self.scratch_buffer.copy_from(self.buffer, stream=self.stream) + self.stream.sync() + ptr = ctypes.cast(int(self.scratch_buffer.handle), ctypes.POINTER(ctypes.c_byte)) + op = (lambda i: 255 - i) if flipped else (lambda i: i) + for i in range(self.nbytes): + assert ctypes.c_byte(ptr[i]).value == ctypes.c_byte(op(i)).value, ( + f"Buffer contains incorrect data at index {i}" + ) diff --git a/cuda_core/tests/test_memory.py b/cuda_core/tests/test_memory.py index 491521ff9d..c14de85857 100644 --- a/cuda_core/tests/test_memory.py +++ b/cuda_core/tests/test_memory.py @@ -7,13 +7,28 @@ from cuda import cuda as driver import ctypes +import platform import pytest from cuda.core.experimental import Buffer, Device, DeviceMemoryResource, MemoryResource -from cuda.core.experimental._memory import DLDeviceType +from cuda.core.experimental._memory import DLDeviceType, IPCBufferDescriptor from cuda.core.experimental._utils.cuda_utils import handle_return +POOL_SIZE = 2097152 # 2MB size + + +@pytest.fixture(scope="function") +def mempool_device(): + """Obtains a device suitable for mempool tests, or skips.""" + device = Device() + device.set_current() + + if not device.properties.memory_pools_supported: + pytest.skip("Device does not support mempool operations") + + return device + class DummyDeviceMemoryResource(MemoryResource): def __init__(self, device): @@ -259,27 +274,167 @@ def test_buffer_dunder_dlpack_device_failure(): buffer.__dlpack_device__() -def test_device_memory_resource_initialization(): +@pytest.mark.parametrize("use_device_object", [True, False]) +def test_device_memory_resource_initialization(mempool_device, use_device_object): """Test that DeviceMemoryResource can be initialized successfully. This test verifies that the DeviceMemoryResource initializes properly, including the release threshold configuration for performance optimization. """ - device = Device() - if not device.properties.memory_pools_supported: - pytest.skip("memory pools not supported") - device.set_current() + device = mempool_device - # This should succeed and configure the memory pool release threshold - mr = DeviceMemoryResource(device.device_id) + # This should succeed and configure the memory pool release threshold. + # The resource can be constructed from either a device or device ordinal. + device_arg = device if use_device_object else device.device_id + mr = DeviceMemoryResource(device_arg) # Verify basic properties assert mr.device_id == device.device_id - assert mr.is_device_accessible is True - assert mr.is_host_accessible is False + assert mr.is_device_accessible + assert not mr.is_host_accessible + assert not mr.is_ipc_enabled # Test allocation/deallocation works buffer = mr.allocate(1024) assert buffer.size == 1024 assert buffer.device_id == device.device_id buffer.close() + + +def test_mempool(mempool_device): + device = mempool_device + + # Test basic pool creation + mr = DeviceMemoryResource(device, dict(max_size=POOL_SIZE, ipc_enabled=False)) + assert mr.device_id == device.device_id + assert mr.is_device_accessible + assert not mr.is_host_accessible + assert not mr.is_ipc_enabled + + # Test allocation and deallocation + buffer1 = mr.allocate(1024) + assert buffer1.handle != 0 + assert buffer1.size == 1024 + assert buffer1.memory_resource == mr + buffer1.close() + + # Test multiple allocations + buffer1 = mr.allocate(1024) + buffer2 = mr.allocate(2048) + assert buffer1.handle != buffer2.handle + assert buffer1.size == 1024 + assert buffer2.size == 2048 + buffer1.close() + buffer2.close() + + # Test stream-based allocation + stream = device.create_stream() + buffer = mr.allocate(1024, stream=stream) + assert buffer.handle != 0 + buffer.close() + + # Test memory copying between buffers from same pool + src_buffer = mr.allocate(64) + dst_buffer = mr.allocate(64) + stream = device.create_stream() + src_buffer.copy_to(dst_buffer, stream=stream) + device.sync() + dst_buffer.close() + src_buffer.close() + + # Test error cases + # Test IPC operations are disabled + buffer = mr.allocate(64) + ipc_error_msg = "Memory resource is not IPC-enabled" + + with pytest.raises(RuntimeError, match=ipc_error_msg): + mr._get_allocation_handle() + + with pytest.raises(RuntimeError, match=ipc_error_msg): + buffer.export() + + with pytest.raises(RuntimeError, match=ipc_error_msg): + handle = IPCBufferDescriptor._init(b"", 0) + Buffer.import_(mr, handle) + + buffer.close() + + +@pytest.mark.parametrize("ipc_enabled", [True, False]) +@pytest.mark.parametrize( + "property_name,expected_type", + [ + ("reuse_follow_event_dependencies", bool), + ("reuse_allow_opportunistic", bool), + ("reuse_allow_internal_dependencies", bool), + ("release_threshold", int), + ("reserved_mem_current", int), + ("reserved_mem_high", int), + ("used_mem_current", int), + ("used_mem_high", int), + ], +) +def test_mempool_attributes(ipc_enabled, mempool_device, property_name, expected_type): + """Test all properties of the DeviceMemoryResource class.""" + device = mempool_device + if platform.system() == "Windows": + return # IPC not implemented for Windows + + mr = DeviceMemoryResource(device, dict(max_size=POOL_SIZE, ipc_enabled=ipc_enabled)) + assert mr.is_ipc_enabled == ipc_enabled + + # Get the property value + value = getattr(mr.attributes, property_name) + + # Test type + assert isinstance(value, expected_type), f"{property_name} should return {expected_type}, got {type(value)}" + + # Test value constraints + if expected_type is int: + assert value >= 0, f"{property_name} should be non-negative" + + # Test memory usage properties with actual allocations + if property_name in ["reserved_mem_current", "used_mem_current"]: + # Allocate some memory and check if values increase + initial_value = value + buffer = None + try: + buffer = mr.allocate(1024) + new_value = getattr(mr.attributes, property_name) + assert new_value >= initial_value, f"{property_name} should increase or stay same after allocation" + finally: + if buffer is not None: + buffer.close() + + # Test high watermark properties + if property_name in ["reserved_mem_high", "used_mem_high"]: + # High watermark should never be less than current + current_prop = property_name.replace("_high", "_current") + current_value = getattr(mr.attributes, current_prop) + assert value >= current_value, f"{property_name} should be >= {current_prop}" + + +def test_mempool_attributes_ownership(mempool_device): + """Ensure the attributes bundle handles references correctly.""" + device = mempool_device + mr = DeviceMemoryResource(device, dict(max_size=POOL_SIZE)) + attributes = mr.attributes + old_handle = mr._mempool_handle + mr.close() + del mr + + # After deleting the memory resource, the attributes suite is disconnected. + with pytest.raises(RuntimeError, match="DeviceMemoryResource is expired"): + _ = attributes.used_mem_high + + # Even when a new object is created (we found a case where the same + # mempool handle was really reused). + mr = DeviceMemoryResource(device, dict(max_size=POOL_SIZE)) + with pytest.raises(RuntimeError, match="DeviceMemoryResource is expired"): + _ = attributes.used_mem_high + + # Even if we stuff the original handle into a new class. + mr._mempool_handle, old_handle = old_handle, mr._mempool_handle + with pytest.raises(RuntimeError, match="DeviceMemoryResource is expired"): + _ = attributes.used_mem_high + mr._mempool_handle = old_handle