Skip to content

Commit 112f6e3

Browse files
alixhamitswast
authored andcommitted
BigQuery: option for job ID generation with user-supplied prefix (#4198)
adds job_id_prefix to _make_job_id()
1 parent 9f095bf commit 112f6e3

File tree

3 files changed

+80
-20
lines changed

3 files changed

+80
-20
lines changed

bigquery/google/cloud/bigquery/client.py

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -599,8 +599,8 @@ def list_jobs(self, max_results=None, page_token=None, all_users=None,
599599
extra_params=extra_params)
600600

601601
def load_table_from_storage(self, source_uris, destination,
602-
job_id=None, job_config=None,
603-
retry=DEFAULT_RETRY):
602+
job_id=None, job_id_prefix=None,
603+
job_config=None, retry=DEFAULT_RETRY):
604604
"""Starts a job for loading data into a table from CloudStorage.
605605
606606
See
@@ -618,6 +618,11 @@ def load_table_from_storage(self, source_uris, destination,
618618
:type job_id: str
619619
:param job_id: (Optional) Name of the job.
620620
621+
:type job_id_prefix: str or ``NoneType``
622+
:param job_id_prefix: (Optional) the user-provided prefix for a
623+
randomly generated job ID. This parameter will be
624+
ignored if a ``job_id`` is also given.
625+
621626
:type job_config: :class:`google.cloud.bigquery.job.LoadJobConfig`
622627
:param job_config: (Optional) Extra configuration options for the job.
623628
@@ -627,7 +632,7 @@ def load_table_from_storage(self, source_uris, destination,
627632
:rtype: :class:`google.cloud.bigquery.job.LoadJob`
628633
:returns: a new ``LoadJob`` instance
629634
"""
630-
job_id = _make_job_id(job_id)
635+
job_id = _make_job_id(job_id, job_id_prefix)
631636
if isinstance(source_uris, six.string_types):
632637
source_uris = [source_uris]
633638
job = LoadJob(job_id, source_uris, destination, self, job_config)
@@ -638,7 +643,7 @@ def load_table_from_file(self, file_obj, destination,
638643
rewind=False,
639644
size=None,
640645
num_retries=_DEFAULT_NUM_RETRIES,
641-
job_id=None, job_config=None):
646+
job_id=None, job_id_prefix=None, job_config=None):
642647
"""Upload the contents of this table from a file-like object.
643648
644649
Like load_table_from_storage, this creates, starts and returns
@@ -665,6 +670,11 @@ def load_table_from_file(self, file_obj, destination,
665670
:type job_id: str
666671
:param job_id: (Optional) Name of the job.
667672
673+
:type job_id_prefix: str or ``NoneType``
674+
:param job_id_prefix: (Optional) the user-provided prefix for a
675+
randomly generated job ID. This parameter will be
676+
ignored if a ``job_id`` is also given.
677+
668678
:type job_config: :class:`google.cloud.bigquery.job.LoadJobConfig`
669679
:param job_config: (Optional) Extra configuration options for the job.
670680
@@ -677,7 +687,7 @@ def load_table_from_file(self, file_obj, destination,
677687
be determined, or if the ``file_obj`` can be detected to be
678688
a file opened in text mode.
679689
"""
680-
job_id = _make_job_id(job_id)
690+
job_id = _make_job_id(job_id, job_id_prefix)
681691
job = LoadJob(job_id, None, destination, self, job_config)
682692
job_resource = job._build_resource()
683693
if rewind:
@@ -801,8 +811,8 @@ def _do_multipart_upload(self, stream, metadata, size, num_retries):
801811

802812
return response
803813

804-
def copy_table(self, sources, destination, job_id=None, job_config=None,
805-
retry=DEFAULT_RETRY):
814+
def copy_table(self, sources, destination, job_id=None, job_id_prefix=None,
815+
job_config=None, retry=DEFAULT_RETRY):
806816
"""Start a job for copying one or more tables into another table.
807817
808818
See
@@ -821,6 +831,11 @@ def copy_table(self, sources, destination, job_id=None, job_config=None,
821831
:type job_id: str
822832
:param job_id: (Optional) The ID of the job.
823833
834+
:type job_id_prefix: str or ``NoneType``
835+
:param job_id_prefix: (Optional) the user-provided prefix for a
836+
randomly generated job ID. This parameter will be
837+
ignored if a ``job_id`` is also given.
838+
824839
:type job_config: :class:`google.cloud.bigquery.job.CopyJobConfig`
825840
:param job_config: (Optional) Extra configuration options for the job.
826841
@@ -830,7 +845,7 @@ def copy_table(self, sources, destination, job_id=None, job_config=None,
830845
:rtype: :class:`google.cloud.bigquery.job.CopyJob`
831846
:returns: a new ``CopyJob`` instance
832847
"""
833-
job_id = _make_job_id(job_id)
848+
job_id = _make_job_id(job_id, job_id_prefix)
834849

835850
if not isinstance(sources, collections.Sequence):
836851
sources = [sources]
@@ -841,7 +856,7 @@ def copy_table(self, sources, destination, job_id=None, job_config=None,
841856

842857
def extract_table(
843858
self, source, destination_uris, job_config=None, job_id=None,
844-
retry=DEFAULT_RETRY):
859+
job_id_prefix=None, retry=DEFAULT_RETRY):
845860
"""Start a job to extract a table into Cloud Storage files.
846861
847862
See
@@ -863,6 +878,11 @@ def extract_table(
863878
:type job_id: str
864879
:param job_id: (Optional) The ID of the job.
865880
881+
:type job_id_prefix: str or ``NoneType``
882+
:param job_id_prefix: (Optional) the user-provided prefix for a
883+
randomly generated job ID. This parameter will be
884+
ignored if a ``job_id`` is also given.
885+
866886
:type job_config: :class:`google.cloud.bigquery.job.ExtractJobConfig`
867887
:param job_config: (Optional) Extra configuration options for the job.
868888
@@ -872,7 +892,7 @@ def extract_table(
872892
:rtype: :class:`google.cloud.bigquery.job.ExtractJob`
873893
:returns: a new ``ExtractJob`` instance
874894
"""
875-
job_id = _make_job_id(job_id)
895+
job_id = _make_job_id(job_id, job_id_prefix)
876896

877897
if isinstance(destination_uris, six.string_types):
878898
destination_uris = [destination_uris]
@@ -883,7 +903,8 @@ def extract_table(
883903
job.begin(retry=retry)
884904
return job
885905

886-
def query(self, query, job_config=None, job_id=None, retry=DEFAULT_RETRY):
906+
def query(self, query, job_config=None, job_id=None, job_id_prefix=None,
907+
retry=DEFAULT_RETRY):
887908
"""Start a job that runs a SQL query.
888909
889910
See
@@ -900,13 +921,18 @@ def query(self, query, job_config=None, job_id=None, retry=DEFAULT_RETRY):
900921
:type job_id: str
901922
:param job_id: (Optional) ID to use for the query job.
902923
924+
:type job_id_prefix: str or ``NoneType``
925+
:param job_id_prefix: (Optional) the user-provided prefix for a
926+
randomly generated job ID. This parameter will be
927+
ignored if a ``job_id`` is also given.
928+
903929
:type retry: :class:`google.api.core.retry.Retry`
904930
:param retry: (Optional) How to retry the RPC.
905931
906932
:rtype: :class:`google.cloud.bigquery.job.QueryJob`
907933
:returns: a new ``QueryJob`` instance
908934
"""
909-
job_id = _make_job_id(job_id)
935+
job_id = _make_job_id(job_id, job_id_prefix)
910936
job = QueryJob(job_id, query, client=self, job_config=job_config)
911937
job.begin(retry=retry)
912938
return job
@@ -1269,18 +1295,24 @@ def _item_to_table(iterator, resource):
12691295
return Table.from_api_repr(resource)
12701296

12711297

1272-
def _make_job_id(job_id):
1298+
def _make_job_id(job_id, prefix=None):
12731299
"""Construct an ID for a new job.
12741300
12751301
:type job_id: str or ``NoneType``
12761302
:param job_id: the user-provided job ID
12771303
1304+
:type prefix: str or ``NoneType``
1305+
:param prefix: (Optional) the user-provided prefix for a job ID
1306+
12781307
:rtype: str
12791308
:returns: A job ID
12801309
"""
1281-
if job_id is None:
1310+
if job_id is not None:
1311+
return job_id
1312+
elif prefix is not None:
1313+
return str(prefix) + str(uuid.uuid4())
1314+
else:
12821315
return str(uuid.uuid4())
1283-
return job_id
12841316

12851317

12861318
def _check_mode(stream):

bigquery/tests/system.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -633,7 +633,7 @@ def test_copy_table(self):
633633

634634
def test_job_cancel(self):
635635
DATASET_ID = _make_dataset_id('job_cancel')
636-
JOB_ID = 'fetch_' + DATASET_ID + str(uuid.uuid4())
636+
JOB_ID_PREFIX = 'fetch_' + DATASET_ID
637637
TABLE_NAME = 'test_table'
638638
QUERY = 'SELECT * FROM %s.%s' % (DATASET_ID, TABLE_NAME)
639639

@@ -643,7 +643,7 @@ def test_job_cancel(self):
643643
table = retry_403(Config.CLIENT.create_table)(table_arg)
644644
self.to_delete.insert(0, table)
645645

646-
job = Config.CLIENT.query(QUERY, job_id=JOB_ID)
646+
job = Config.CLIENT.query(QUERY, job_id_prefix=JOB_ID_PREFIX)
647647
job.cancel()
648648

649649
retry = RetryInstanceState(_job_done, max_tries=8)
@@ -866,7 +866,7 @@ def test_query_w_dml(self):
866866

867867
query_job = Config.CLIENT.query(
868868
query_template.format(dataset_name, table_name),
869-
job_id='test_query_w_dml_{}'.format(str(uuid.uuid4())))
869+
job_id_prefix='test_query_w_dml_')
870870
query_job.result()
871871

872872
self.assertEqual(query_job.num_dml_affected_rows, 1)
@@ -1053,8 +1053,7 @@ def test_query_w_query_params(self):
10531053
query_job = Config.CLIENT.query(
10541054
example['sql'],
10551055
job_config=jconfig,
1056-
job_id='test_query_w_query_params{}'.format(
1057-
str(uuid.uuid4())))
1056+
job_id_prefix='test_query_w_query_params')
10581057
rows = list(query_job.result())
10591058
self.assertEqual(len(rows), 1)
10601059
self.assertEqual(len(rows[0]), 1)

bigquery/tests/unit/test_client.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2859,6 +2859,35 @@ def test_list_partitions(self):
28592859
[20160804, 20160805])
28602860

28612861

2862+
class Test_make_job_id(unittest.TestCase):
2863+
def _call_fut(self, job_id, prefix=None):
2864+
from google.cloud.bigquery.client import _make_job_id
2865+
2866+
return _make_job_id(job_id, prefix=prefix)
2867+
2868+
def test__make_job_id_wo_suffix(self):
2869+
job_id = self._call_fut('job_id')
2870+
2871+
self.assertEqual(job_id, 'job_id')
2872+
2873+
def test__make_job_id_w_suffix(self):
2874+
with mock.patch('uuid.uuid4', side_effect=['212345']):
2875+
job_id = self._call_fut(None, prefix='job_id')
2876+
2877+
self.assertEqual(job_id, 'job_id212345')
2878+
2879+
def test__make_random_job_id(self):
2880+
with mock.patch('uuid.uuid4', side_effect=['212345']):
2881+
job_id = self._call_fut(None)
2882+
2883+
self.assertEqual(job_id, '212345')
2884+
2885+
def test__make_job_id_w_job_id_overrides_prefix(self):
2886+
job_id = self._call_fut('job_id', prefix='unused_prefix')
2887+
2888+
self.assertEqual(job_id, 'job_id')
2889+
2890+
28622891
class TestClientUpload(object):
28632892
# NOTE: This is a "partner" to `TestClient` meant to test some of the
28642893
# "load_table_from_file" portions of `Client`. It also uses

0 commit comments

Comments
 (0)