Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions py/client/pydeephaven/_arrow_flight_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def import_table(self, data: pa.Table) -> Table:
try:
if not isinstance(data, (pa.Table, pa.RecordBatch)):
raise DHError("source data must be either a pa table or RecordBatch.")
ticket = self.session.get_ticket()
ticket = self.session.next_export_ticket_number()
dh_fields = []
for f in data.schema:
dh_fields.append(pa.field(name=f.name, type=f.type, metadata=map_arrow_type(f.type)))
Expand All @@ -36,15 +36,15 @@ def import_table(self, data: pa.Table) -> Table:
# it is possible that by the time the request arrives at the server that it no longer
# knows what it is for and sends a RST_STREAM causing a failure.
_ = reader.read()
flight_ticket = self.session.make_ticket(ticket)
flight_ticket = self.session.make_export_ticket(ticket)
return Table(self.session, ticket=flight_ticket, size=data.num_rows, schema=dh_schema)
except Exception as e:
raise DHError("failed to create a Deephaven table from Arrow data.") from e

def do_get_table(self, table: Table) -> pa.Table:
"""Gets a snapshot of a Table via Flight do_get."""
try:
flight_ticket = paflight.Ticket(table.ticket.ticket)
flight_ticket = paflight.Ticket(table.ticket.bytes)
reader = self._flight_client.do_get(
flight_ticket,
FlightCallOptions(headers=self.session.grpc_metadata))
Expand Down
6 changes: 3 additions & 3 deletions py/client/pydeephaven/_batch_assembler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def batch(self) -> List[Any]:

def build_batch(self) -> List[Any]:
"""Transforms the table ops into valid chained batch compatible ops."""
self._curr_source = table_pb2.TableReference(ticket=self.table_ops[0].table.ticket)
self._curr_source = table_pb2.TableReference(ticket=self.table_ops[0].table.pb_ticket)

for table_op in self.table_ops[1:-1]:
result_id = None
Expand All @@ -27,8 +27,8 @@ def build_batch(self) -> List[Any]:
self._curr_source = table_pb2.TableReference(batch_offset=len(self.grpc_table_ops) - 1)

# the last op in the batch needs a result_id to reference the result
result_id = self.session.make_ticket()
export_ticket = self.session.make_export_ticket()
self.grpc_table_ops.append(
self.table_ops[-1].make_grpc_request_for_batch(result_id=result_id, source_id=self._curr_source))
self.table_ops[-1].make_grpc_request_for_batch(result_id=export_ticket.pb_ticket, source_id=self._curr_source))

return self.grpc_table_ops
4 changes: 2 additions & 2 deletions py/client/pydeephaven/_console_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def start_console(self):
with self.session._r_lock:
if not self.console_id:
try:
result_id = self.session.make_ticket()
result_id = self.session.make_export_ticket().pb_ticket
response = self.session.wrap_rpc(
self._grpc_console_stub.StartConsole,
console_pb2.StartConsoleRequest(
Expand Down Expand Up @@ -55,7 +55,7 @@ def bind_table(self, table: Table, variable_name: str):
self._grpc_console_stub.BindTableToVariable,
console_pb2.BindTableToVariableRequest(
console_id=self.console_id,
table_id=table.ticket,
table_id=table.pb_ticket,
variable_name=variable_name))
except Exception as e:
raise DHError("failed to bind a table to a variable on the server.") from e
8 changes: 4 additions & 4 deletions py/client/pydeephaven/_input_table_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ def add(self, input_table: InputTable, table: Table):
try:
self.session.wrap_rpc(
self._grpc_input_table_stub.AddTableToInputTable,
inputtable_pb2.AddTableRequest(input_table=input_table.ticket,
table_to_add=table.ticket))
inputtable_pb2.AddTableRequest(input_table=input_table.pb_ticket,
table_to_add=table.pb_ticket))
except Exception as e:
raise DHError("failed to add to InputTable") from e

Expand All @@ -28,7 +28,7 @@ def delete(self, input_table: InputTable, table: Table):
self.session.wrap_rpc(
self._grpc_input_table_stub.DeleteTableFromInputTable,
inputtable_pb2.DeleteTableRequest(
input_table=input_table.ticket,
table_to_remove=table.ticket))
input_table=input_table.pb_ticket,
table_to_remove=table.pb_ticket))
except Exception as e:
raise DHError("failed to delete from InputTable") from e
41 changes: 31 additions & 10 deletions py/client/pydeephaven/_session_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from pydeephaven.dherror import DHError
from pydeephaven.proto import session_pb2_grpc, session_pb2, ticket_pb2
from pydeephaven.ticket import Ticket, ExportTicket


class SessionService:
Expand All @@ -21,7 +22,7 @@ def connect(self) -> grpc.Channel:
root_certificates=self.session._tls_root_certs,
private_key=self.session._client_private_key,
certificate_chain=self.session._client_cert_chain)
grpc_channel = grpc.secure_channel(target, credentials, self.session._client_opts)
grpc_channel = grpc.secure_channel(target, credentials, self.session._client_opts)
else:
grpc_channel = grpc.insecure_channel(target, self.session._client_opts)
self._grpc_session_stub = session_pb2_grpc.SessionServiceStub(grpc_channel)
Expand All @@ -38,28 +39,48 @@ def close(self):
except Exception as e:
raise DHError("failed to close the session.") from e

def release(self, ticket):
"""Releases an exported ticket."""
def release(self, ticket: ExportTicket) -> None:
"""Releases an export ticket."""
try:
self.session.wrap_rpc(
self._grpc_session_stub.Release,
session_pb2.ReleaseRequest(id=ticket))
session_pb2.ReleaseRequest(id=ticket.pb_ticket))
except Exception as e:
raise DHError("failed to release a ticket.") from e


def publish(self, source_ticket: ticket_pb2.Ticket, result_ticket: ticket_pb2.Ticket) -> None:
def publish(self, source_ticket: Ticket, result_ticket: Ticket) -> None:
"""Makes a copy from the source ticket and publishes it to the result ticket.

Args:
source_ticket: The source ticket to publish from.
result_ticket: The result ticket to publish to.
source_ticket (Ticket): The source ticket to publish from.
result_ticket (Ticket): The result ticket to publish to.

Raises:
DHError: If the operation fails.
"""
try:
self.session.wrap_rpc(
self._grpc_session_stub.PublishFromTicket,
session_pb2.PublishRequest(
source_id=source_ticket,
result_id=result_ticket))
source_id=source_ticket.pb_ticket,
result_id=result_ticket.pb_ticket))
except Exception as e:
raise DHError("failed to publish a ticket.") from e

def fetch(self, ticket: Ticket) -> ExportTicket:
"""Fetches a typed ticket from a ticket.

Args:
ticket: The ticket to fetch from.

Returns:
The typed ticket.
"""
try:
result_id = self.session.make_export_ticket()
self.session.wrap_rpc(
self._grpc_session_stub.ExportFromTicket,
session_pb2.ExportRequest(source_id=ticket.pb_ticket, result_id=result_id.pb_ticket))
return result_id
except Exception as e:
raise DHError("failed to fetch a ticket.") from e
18 changes: 9 additions & 9 deletions py/client/pydeephaven/_table_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> A
def make_grpc_request(self, result_id, source_id) -> Any:
table_references = []
for tbl in self.tables:
table_references.append(table_pb2.TableReference(ticket=tbl.ticket))
table_references.append(table_pb2.TableReference(ticket=tbl.pb_ticket))

return table_pb2.MergeTablesRequest(result_id=result_id,
source_ids=table_references,
Expand All @@ -368,7 +368,7 @@ def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> A

def make_grpc_request(self, result_id, source_id) -> Any:
left_id = source_id
right_id = table_pb2.TableReference(ticket=self.table.ticket)
right_id = table_pb2.TableReference(ticket=self.table.pb_ticket)
return table_pb2.NaturalJoinTablesRequest(result_id=result_id,
left_id=left_id,
right_id=right_id,
Expand All @@ -392,7 +392,7 @@ def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> A

def make_grpc_request(self, result_id, source_id) -> Any:
left_id = source_id
right_id = table_pb2.TableReference(ticket=self.table.ticket)
right_id = table_pb2.TableReference(ticket=self.table.pb_ticket)
return table_pb2.ExactJoinTablesRequest(result_id=result_id,
left_id=left_id,
right_id=right_id,
Expand All @@ -417,7 +417,7 @@ def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> A

def make_grpc_request(self, result_id, source_id) -> Any:
left_id = source_id
right_id = table_pb2.TableReference(ticket=self.table.ticket)
right_id = table_pb2.TableReference(ticket=self.table.pb_ticket)
return table_pb2.CrossJoinTablesRequest(result_id=result_id,
left_id=left_id,
right_id=right_id,
Expand All @@ -442,7 +442,7 @@ def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> A

def make_grpc_request(self, result_id, source_id) -> Any:
left_id = source_id
right_id = table_pb2.TableReference(ticket=self.table.ticket)
right_id = table_pb2.TableReference(ticket=self.table.pb_ticket)
return table_pb2.AjRajTablesRequest(result_id=result_id,
left_id=left_id,
right_id=right_id,
Expand All @@ -467,7 +467,7 @@ def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> A

def make_grpc_request(self, result_id, source_id) -> Any:
left_id = source_id
right_id = table_pb2.TableReference(ticket=self.table.ticket)
right_id = table_pb2.TableReference(ticket=self.table.pb_ticket)
return table_pb2.AjRajTablesRequest(result_id=result_id,
left_id=left_id,
right_id=right_id,
Expand Down Expand Up @@ -554,7 +554,7 @@ def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> A

def make_grpc_request(self, result_id, source_id) -> Any:
base_id = source_id
trigger_id = table_pb2.TableReference(ticket=self.trigger_table.ticket)
trigger_id = table_pb2.TableReference(ticket=self.trigger_table.pb_ticket)
return table_pb2.SnapshotWhenTableRequest(result_id=result_id,
base_id=base_id,
trigger_id=trigger_id,
Expand Down Expand Up @@ -642,7 +642,7 @@ def make_grpc_request(self, result_id, source_id) -> Any:
schema=schema,
kind=input_table_kind)
else:
source_table_id = table_pb2.TableReference(ticket=self.init_table.ticket)
source_table_id = table_pb2.TableReference(ticket=self.init_table.pb_ticket)
return table_pb2.CreateInputTableRequest(result_id=result_id,
source_table_id=source_table_id,
kind=input_table_kind)
Expand All @@ -664,7 +664,7 @@ def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> A
return table_service_stub.WhereIn

def make_grpc_request(self, result_id, source_id) -> Any:
right_id = table_pb2.TableReference(ticket=self.filter_table.ticket)
right_id = table_pb2.TableReference(ticket=self.filter_table.pb_ticket)
return table_pb2.WhereInRequest(result_id=result_id,
left_id=source_id,
right_id=right_id,
Expand Down
15 changes: 9 additions & 6 deletions py/client/pydeephaven/_table_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pydeephaven.dherror import DHError
from pydeephaven.proto import table_pb2_grpc, table_pb2
from pydeephaven.table import Table, InputTable
from pydeephaven.ticket import ExportTicket, Ticket, _ticket_from_proto


class TableService:
Expand All @@ -29,7 +30,7 @@ def batch(self, ops: List[TableOp]) -> Table:
if not exported.success:
raise DHError(exported.error_info)
if exported.result_id.WhichOneof("ref") == "ticket":
exported_tables.append(Table(self.session, ticket=exported.result_id.ticket,
exported_tables.append(Table(self.session, ticket=ExportTicket(ticket_bytes=exported.result_id.ticket.ticket),
schema_header=exported.schema_header,
size=exported.size,
is_static=exported.is_static))
Expand All @@ -40,19 +41,19 @@ def batch(self, ops: List[TableOp]) -> Table:
def grpc_table_op(self, table: Table, op: TableOp, table_class: type = Table) -> Union[Table, InputTable]:
"""Makes a single gRPC Table operation call and returns a new Table."""
try:
result_id = self.session.make_ticket()
export_ticket = self.session.make_export_ticket()
if table:
table_reference = table_pb2.TableReference(ticket=table.ticket)
table_reference = table_pb2.TableReference(ticket=table.pb_ticket)
else:
table_reference = None
stub_func = op.__class__.get_stub_func(self._grpc_table_stub)
response = self.session.wrap_rpc(
stub_func,
op.make_grpc_request(
result_id=result_id,
result_id=export_ticket.pb_ticket,
source_id=table_reference))
if response.success:
return table_class(self.session, ticket=response.result_id.ticket,
return table_class(self.session, ticket=ExportTicket(response.result_id.ticket.ticket),
schema_header=response.schema_header,
size=response.size,
is_static=response.is_static)
Expand All @@ -66,8 +67,10 @@ def fetch_etcr(self, ticket) -> Table:
response = self.session.wrap_rpc(
self._grpc_table_stub.GetExportedTableCreationResponse,
ticket)

ticket = _ticket_from_proto(response.result_id.ticket)
if response.success:
return Table(self.session, ticket=response.result_id.ticket,
return Table(self.session, ticket=ticket,
schema_header=response.schema_header,
size=response.size,
is_static=response.is_static)
Expand Down
Loading