Report an Issue

Source code for gcloud.bigquery.job

# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Define API Jobs."""

import six

from gcloud.exceptions import NotFound
from gcloud._helpers import _datetime_from_microseconds
from gcloud.bigquery.dataset import Dataset
from gcloud.bigquery.table import SchemaField
from gcloud.bigquery.table import Table
from gcloud.bigquery.table import _build_schema_resource
from gcloud.bigquery.table import _parse_schema_resource


class _ConfigurationProperty(object):
    """Base property implementation.

    Values will be stored on a `_configuration` helper attribute of the
    property's job instance.

    :type name: string
    :param name:  name of the property
    """

    def __init__(self, name):
        self.name = name
        self._backing_name = '_%s' % (self.name,)

    def __get__(self, instance, owner):
        """Descriptor protocal:  accesstor"""
        if instance is None:
            return self
        return getattr(instance._configuration, self._backing_name)

    def _validate(self, value):
        """Subclasses override to impose validation policy."""
        pass

    def __set__(self, instance, value):
        """Descriptor protocal:  mutator"""
        self._validate(value)
        setattr(instance._configuration, self._backing_name, value)

    def __delete__(self, instance):
        """Descriptor protocal:  deleter"""
        delattr(instance._configuration, self._backing_name)


class _TypedProperty(_ConfigurationProperty):
    """Property implementation:  validates based on value type.

    :type name: string
    :param name:  name of the property

    :type property_type: type or sequence of types
    :param property_type: type to be validated
    """
    def __init__(self, name, property_type):
        super(_TypedProperty, self).__init__(name)
        self.property_type = property_type

    def _validate(self, value):
        if not isinstance(value, self.property_type):
            raise ValueError('Required type: %s' % (self.property_type,))


class _EnumProperty(_ConfigurationProperty):
    """Psedo-enumeration class.

    Subclasses must define ``ALLOWED`` as a class-level constant:  it must
    be a sequence of strings.

    :type name: string
    :param name:  name of the property
    """
    def _validate(self, value):
        """Check that ``value`` is one of the allowed values.

        :raises: ValueError if value is not allowed.
        """
        if value not in self.ALLOWED:
            raise ValueError('Pass one of: %s' ', '.join(self.ALLOWED))


[docs]class Compression(_EnumProperty): """Pseudo-enum for ``compression`` properties.""" GZIP = 'GZIP' NONE = 'NONE' ALLOWED = (GZIP, NONE)
[docs]class CreateDisposition(_EnumProperty): """Pseudo-enum for ``create_disposition`` properties.""" CREATE_IF_NEEDED = 'CREATE_IF_NEEDED' CREATE_NEVER = 'CREATE_NEVER' ALLOWED = (CREATE_IF_NEEDED, CREATE_NEVER)
[docs]class DestinationFormat(_EnumProperty): """Pseudo-enum for ``destination_format`` properties.""" CSV = 'CSV' NEWLINE_DELIMITED_JSON = 'NEWLINE_DELIMITED_JSON' AVRO = 'AVRO' ALLOWED = (CSV, NEWLINE_DELIMITED_JSON, AVRO)
[docs]class Encoding(_EnumProperty): """Pseudo-enum for ``encoding`` properties.""" UTF_8 = 'UTF-8' ISO_8559_1 = 'ISO-8559-1' ALLOWED = (UTF_8, ISO_8559_1)
[docs]class QueryPriority(_EnumProperty): """Pseudo-enum for ``RunQueryJob.priority`` property.""" INTERACTIVE = 'INTERACTIVE' BATCH = 'BATCH' ALLOWED = (INTERACTIVE, BATCH)
[docs]class SourceFormat(_EnumProperty): """Pseudo-enum for ``source_format`` properties.""" CSV = 'CSV' DATASTORE_BACKUP = 'DATASTORE_BACKUP' NEWLINE_DELIMITED_JSON = 'NEWLINE_DELIMITED_JSON' ALLOWED = (CSV, DATASTORE_BACKUP, NEWLINE_DELIMITED_JSON)
[docs]class WriteDisposition(_EnumProperty): """Pseudo-enum for ``write_disposition`` properties.""" WRITE_APPEND = 'WRITE_APPEND' WRITE_TRUNCATE = 'WRITE_TRUNCATE' WRITE_EMPTY = 'WRITE_EMPTY' ALLOWED = (WRITE_APPEND, WRITE_TRUNCATE, WRITE_EMPTY)
class _BaseJob(object): """Base class for asynchronous jobs. :type name: string :param name: the name of the job :type client: :class:`gcloud.bigquery.client.Client` :param client: A client which holds credentials and project configuration for the dataset (which requires a project). """ def __init__(self, name, client): self.name = name self._client = client self._properties = {} @property def project(self): """Project bound to the job. :rtype: string :returns: the project (derived from the client). """ return self._client.project @property def path(self): """URL path for the job's APIs. :rtype: string :returns: the path based on project and job name. """ return '/projects/%s/jobs/%s' % (self.project, self.name) @property def etag(self): """ETag for the job resource. :rtype: string, or ``NoneType`` :returns: the ETag (None until set from the server). """ return self._properties.get('etag') @property def job_id(self): """ID for the job resource. :rtype: string, or ``NoneType`` :returns: the ID (None until set from the server). """ return self._properties.get('id') @property def self_link(self): """URL for the job resource. :rtype: string, or ``NoneType`` :returns: the URL (None until set from the server). """ return self._properties.get('selfLink') @property def user_email(self): """E-mail address of user who submitted the job. :rtype: string, or ``NoneType`` :returns: the URL (None until set from the server). """ return self._properties.get('user_email') @property def created(self): """Datetime at which the job was created. :rtype: ``datetime.datetime``, or ``NoneType`` :returns: the creation time (None until set from the server). """ statistics = self._properties.get('statistics') if statistics is not None: millis = statistics.get('creationTime') if millis is not None: return _datetime_from_microseconds(millis * 1000.0) @property def started(self): """Datetime at which the job was started. :rtype: ``datetime.datetime``, or ``NoneType`` :returns: the start time (None until set from the server). """ statistics = self._properties.get('statistics') if statistics is not None: millis = statistics.get('startTime') if millis is not None: return _datetime_from_microseconds(millis * 1000.0) @property def ended(self): """Datetime at which the job finished. :rtype: ``datetime.datetime``, or ``NoneType`` :returns: the end time (None until set from the server). """ statistics = self._properties.get('statistics') if statistics is not None: millis = statistics.get('endTime') if millis is not None: return _datetime_from_microseconds(millis * 1000.0) @property def error_result(self): """Error information about the job as a whole. :rtype: mapping, or ``NoneType`` :returns: the error information (None until set from the server). """ status = self._properties.get('status') if status is not None: return status.get('errorResult') @property def errors(self): """Information about individual errors generated by the job. :rtype: list of mappings, or ``NoneType`` :returns: the error information (None until set from the server). """ status = self._properties.get('status') if status is not None: return status.get('errors') @property def state(self): """Status of the job. :rtype: string, or ``NoneType`` :returns: the state (None until set from the server). """ status = self._properties.get('status') if status is not None: return status.get('state') def _require_client(self, client): """Check client or verify over-ride. :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` :param client: the client to use. If not passed, falls back to the ``client`` stored on the current dataset. :rtype: :class:`gcloud.bigquery.client.Client` :returns: The client passed in or the currently bound client. """ if client is None: client = self._client return client def _scrub_local_properties(self, cleaned): """Helper: handle subclass properties in cleaned.""" pass def _set_properties(self, api_response): """Update properties from resource in body of ``api_response`` :type api_response: httplib2.Response :param api_response: response returned from an API call """ cleaned = api_response.copy() self._scrub_local_properties(cleaned) statistics = cleaned.get('statistics', {}) if 'creationTime' in statistics: statistics['creationTime'] = float(statistics['creationTime']) if 'startTime' in statistics: statistics['startTime'] = float(statistics['startTime']) if 'endTime' in statistics: statistics['endTime'] = float(statistics['endTime']) self._properties.clear() self._properties.update(cleaned) def begin(self, client=None): """API call: begin the job via a POST request See: https://cloud.google.com/bigquery/docs/reference/v2/jobs/insert :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` :param client: the client to use. If not passed, falls back to the ``client`` stored on the current dataset. """ client = self._require_client(client) path = '/projects/%s/jobs' % (self.project,) api_response = client.connection.api_request( method='POST', path=path, data=self._build_resource()) self._set_properties(api_response) def exists(self, client=None): """API call: test for the existence of the job via a GET request See https://cloud.google.com/bigquery/docs/reference/v2/jobs/get :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` :param client: the client to use. If not passed, falls back to the ``client`` stored on the current dataset. """ client = self._require_client(client) try: client.connection.api_request(method='GET', path=self.path, query_params={'fields': 'id'}) except NotFound: return False else: return True def reload(self, client=None): """API call: refresh job properties via a GET request See https://cloud.google.com/bigquery/docs/reference/v2/jobs/get :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` :param client: the client to use. If not passed, falls back to the ``client`` stored on the current dataset. """ client = self._require_client(client) api_response = client.connection.api_request( method='GET', path=self.path) self._set_properties(api_response) def cancel(self, client=None): """API call: cancel job via a POST request See https://cloud.google.com/bigquery/docs/reference/v2/jobs/cancel :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` :param client: the client to use. If not passed, falls back to the ``client`` stored on the current dataset. """ client = self._require_client(client) api_response = client.connection.api_request( method='POST', path='%s/cancel' % (self.path,)) self._set_properties(api_response) class _LoadConfiguration(object): """User-settable configuration options for load jobs.""" # None -> use server default. _allow_jagged_rows = None _allow_quoted_newlines = None _create_disposition = None _encoding = None _field_delimiter = None _ignore_unknown_values = None _max_bad_records = None _quote_character = None _skip_leading_rows = None _source_format = None _write_disposition = None
[docs]class LoadTableFromStorageJob(_BaseJob): """Asynchronous job for loading data into a table from CloudStorage. :type name: string :param name: the name of the job :type destination: :class:`gcloud.bigquery.table.Table` :param destination: Table into which data is to be loaded. :type source_uris: sequence of string :param source_uris: URIs of data files to be loaded. :type client: :class:`gcloud.bigquery.client.Client` :param client: A client which holds credentials and project configuration for the dataset (which requires a project). :type schema: list of :class:`gcloud.bigquery.table.SchemaField` :param schema: The job's schema """ def __init__(self, name, destination, source_uris, client, schema=()): super(LoadTableFromStorageJob, self).__init__(name, client) self.destination = destination self.source_uris = source_uris self.schema = schema self._configuration = _LoadConfiguration() @property def schema(self): """Table's schema. :rtype: list of :class:`SchemaField` :returns: fields describing the schema """ return list(self._schema) @schema.setter def schema(self, value): """Update table's schema :type value: list of :class:`SchemaField` :param value: fields describing the schema :raises: TypeError if 'value' is not a sequence, or ValueError if any item in the sequence is not a SchemaField """ if not all(isinstance(field, SchemaField) for field in value): raise ValueError('Schema items must be fields') self._schema = tuple(value) @property def input_file_bytes(self): """Count of bytes loaded from source files. :rtype: integer, or ``NoneType`` :returns: the count (None until set from the server). """ statistics = self._properties.get('statistics') if statistics is not None: return int(statistics['load']['inputFileBytes']) @property def input_files(self): """Count of source files. :rtype: integer, or ``NoneType`` :returns: the count (None until set from the server). """ statistics = self._properties.get('statistics') if statistics is not None: return int(statistics['load']['inputFiles']) @property def output_bytes(self): """Count of bytes saved to destination table. :rtype: integer, or ``NoneType`` :returns: the count (None until set from the server). """ statistics = self._properties.get('statistics') if statistics is not None: return int(statistics['load']['outputBytes']) @property def output_rows(self): """Count of rows saved to destination table. :rtype: integer, or ``NoneType`` :returns: the count (None until set from the server). """ statistics = self._properties.get('statistics') if statistics is not None: return int(statistics['load']['outputRows']) allow_jagged_rows = _TypedProperty('allow_jagged_rows', bool) """See: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.allowJaggedRows """ allow_quoted_newlines = _TypedProperty('allow_quoted_newlines', bool) """See: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.allowQuotedNewlines """ create_disposition = CreateDisposition('create_disposition') """See: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.createDisposition """ encoding = Encoding('encoding') """See: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.encoding """ field_delimiter = _TypedProperty('field_delimiter', six.string_types) """See: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.fieldDelimiter """ ignore_unknown_values = _TypedProperty('ignore_unknown_values', bool) """See: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.ignoreUnknownValues """ max_bad_records = _TypedProperty('max_bad_records', six.integer_types) """See: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.maxBadRecords """ quote_character = _TypedProperty('quote_character', six.string_types) """See: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.quote """ skip_leading_rows = _TypedProperty('skip_leading_rows', six.integer_types) """See: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.skipLeadingRows """ source_format = SourceFormat('source_format') """See: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.sourceFormat """ write_disposition = WriteDisposition('write_disposition') """See: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.writeDisposition """ def _populate_config_resource(self, configuration): """Helper for _build_resource: copy config properties to resource""" if self.allow_jagged_rows is not None: configuration['allowJaggedRows'] = self.allow_jagged_rows if self.allow_quoted_newlines is not None: configuration['allowQuotedNewlines'] = self.allow_quoted_newlines if self.create_disposition is not None: configuration['createDisposition'] = self.create_disposition if self.encoding is not None: configuration['encoding'] = self.encoding if self.field_delimiter is not None: configuration['fieldDelimiter'] = self.field_delimiter if self.ignore_unknown_values is not None: configuration['ignoreUnknownValues'] = self.ignore_unknown_values if self.max_bad_records is not None: configuration['maxBadRecords'] = self.max_bad_records if self.quote_character is not None: configuration['quote'] = self.quote_character if self.skip_leading_rows is not None: configuration['skipLeadingRows'] = self.skip_leading_rows if self.source_format is not None: configuration['sourceFormat'] = self.source_format if self.write_disposition is not None: configuration['writeDisposition'] = self.write_disposition def _build_resource(self): """Generate a resource for :meth:`begin`.""" resource = { 'jobReference': { 'projectId': self.project, 'jobId': self.name, }, 'configuration': { 'load': { 'sourceUris': self.source_uris, 'destinationTable': { 'projectId': self.destination.project, 'datasetId': self.destination.dataset_name, 'tableId': self.destination.name, }, }, }, } configuration = resource['configuration']['load'] self._populate_config_resource(configuration) if len(self.schema) > 0: configuration['schema'] = { 'fields': _build_schema_resource(self.schema)} return resource def _scrub_local_properties(self, cleaned): """Helper: handle subclass properties in cleaned.""" schema = cleaned.pop('schema', {'fields': ()}) self.schema = _parse_schema_resource(schema)
class _CopyConfiguration(object): """User-settable configuration options for copy jobs.""" # None -> use server default. _create_disposition = None _write_disposition = None
[docs]class CopyJob(_BaseJob): """Asynchronous job: copy data into a table from other tables. :type name: string :param name: the name of the job :type destination: :class:`gcloud.bigquery.table.Table` :param destination: Table into which data is to be loaded. :type sources: list of :class:`gcloud.bigquery.table.Table` :param sources: Table into which data is to be loaded. :type client: :class:`gcloud.bigquery.client.Client` :param client: A client which holds credentials and project configuration for the dataset (which requires a project). """ def __init__(self, name, destination, sources, client): super(CopyJob, self).__init__(name, client) self.destination = destination self.sources = sources self._configuration = _CopyConfiguration() create_disposition = CreateDisposition('create_disposition') """See: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.copy.createDisposition """ write_disposition = WriteDisposition('write_disposition') """See: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.copy.writeDisposition """ def _populate_config_resource(self, configuration): """Helper for _build_resource: copy config properties to resource""" if self.create_disposition is not None: configuration['createDisposition'] = self.create_disposition if self.write_disposition is not None: configuration['writeDisposition'] = self.write_disposition def _build_resource(self): """Generate a resource for :meth:`begin`.""" source_refs = [{ 'projectId': table.project, 'datasetId': table.dataset_name, 'tableId': table.name, } for table in self.sources] resource = { 'jobReference': { 'projectId': self.project, 'jobId': self.name, }, 'configuration': { 'copy': { 'sourceTables': source_refs, 'destinationTable': { 'projectId': self.destination.project, 'datasetId': self.destination.dataset_name, 'tableId': self.destination.name, }, }, }, } configuration = resource['configuration']['copy'] self._populate_config_resource(configuration) return resource
class _ExtractConfiguration(object): """User-settable configuration options for extract jobs.""" # None -> use server default. _compression = None _destination_format = None _field_delimiter = None _print_header = None
[docs]class ExtractTableToStorageJob(_BaseJob): """Asynchronous job: extract data from a table into Cloud Storage. :type name: string :param name: the name of the job :type source: :class:`gcloud.bigquery.table.Table` :param source: Table into which data is to be loaded. :type destination_uris: list of string :param destination_uris: URIs describing Cloud Storage blobs into which extracted data will be written. :type client: :class:`gcloud.bigquery.client.Client` :param client: A client which holds credentials and project configuration for the dataset (which requires a project). """ def __init__(self, name, source, destination_uris, client): super(ExtractTableToStorageJob, self).__init__(name, client) self.source = source self.destination_uris = destination_uris self._configuration = _ExtractConfiguration() compression = Compression('compression') """See: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.extracted.compression """ destination_format = DestinationFormat('destination_format') """See: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.extracted.destinationFormat """ field_delimiter = _TypedProperty('field_delimiter', six.string_types) """See: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.extracted.fieldDelimiter """ print_header = _TypedProperty('print_header', bool) """See: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.extracted.printHeader """ def _populate_config_resource(self, configuration): """Helper for _build_resource: copy config properties to resource""" if self.compression is not None: configuration['compression'] = self.compression if self.destination_format is not None: configuration['destinationFormat'] = self.destination_format if self.field_delimiter is not None: configuration['fieldDelimiter'] = self.field_delimiter if self.print_header is not None: configuration['printHeader'] = self.print_header def _build_resource(self): """Generate a resource for :meth:`begin`.""" source_ref = { 'projectId': self.source.project, 'datasetId': self.source.dataset_name, 'tableId': self.source.name, } resource = { 'jobReference': { 'projectId': self.project, 'jobId': self.name, }, 'configuration': { 'extract': { 'sourceTable': source_ref, 'destinationUris': self.destination_uris, }, }, } configuration = resource['configuration']['extract'] self._populate_config_resource(configuration) return resource
class _QueryConfiguration(object): """User-settable configuration options for query jobs.""" # None -> use server default. _allow_large_results = None _create_disposition = None _default_dataset = None _destination_table = None _flatten_results = None _priority = None _use_query_cache = None _write_disposition = None
[docs]class RunQueryJob(_BaseJob): """Asynchronous job: query tables. :type name: string :param name: the name of the job :type query: string :param query: SQL query string :type client: :class:`gcloud.bigquery.client.Client` :param client: A client which holds credentials and project configuration for the dataset (which requires a project). """ def __init__(self, name, query, client): super(RunQueryJob, self).__init__(name, client) self.query = query self._configuration = _QueryConfiguration() allow_large_results = _TypedProperty('allow_large_results', bool) """See: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.allowLargeResults """ create_disposition = CreateDisposition('create_disposition') """See: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.createDisposition """ default_dataset = _TypedProperty('default_dataset', Dataset) """See: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.default_dataset """ destination_table = _TypedProperty('destination_table', Table) """See: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.destinationTable """ flatten_results = _TypedProperty('flatten_results', bool) """See: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.flattenResults """ priority = QueryPriority('priority') """See: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.priority """ use_query_cache = _TypedProperty('use_query_cache', bool) """See: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.useQueryCache """ write_disposition = WriteDisposition('write_disposition') """See: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.writeDisposition """ def _destination_table_resource(self): if self.destination_table is not None: return { 'projectId': self.destination_table.project, 'datasetId': self.destination_table.dataset_name, 'tableId': self.destination_table.name, } def _populate_config_resource(self, configuration): """Helper for _build_resource: copy config properties to resource""" if self.allow_large_results is not None: configuration['allowLargeResults'] = self.allow_large_results if self.create_disposition is not None: configuration['createDisposition'] = self.create_disposition if self.default_dataset is not None: configuration['defaultDataset'] = { 'projectId': self.default_dataset.project, 'datasetId': self.default_dataset.name, } if self.destination_table is not None: table_res = self._destination_table_resource() configuration['destinationTable'] = table_res if self.flatten_results is not None: configuration['flattenResults'] = self.flatten_results if self.priority is not None: configuration['priority'] = self.priority if self.use_query_cache is not None: configuration['useQueryCache'] = self.use_query_cache if self.write_disposition is not None: configuration['writeDisposition'] = self.write_disposition def _build_resource(self): """Generate a resource for :meth:`begin`.""" resource = { 'jobReference': { 'projectId': self.project, 'jobId': self.name, }, 'configuration': { 'query': { 'query': self.query, }, }, } configuration = resource['configuration']['query'] self._populate_config_resource(configuration) return resource def _scrub_local_properties(self, cleaned): """Helper: handle subclass properties in cleaned.""" configuration = cleaned['configuration']['query'] dest_remote = configuration.get('destinationTable') if dest_remote is None: if self.destination_table is not None: del self.destination_table else: dest_local = self._destination_table_resource() if dest_remote != dest_local: assert dest_remote['projectId'] == self.project dataset = self._client.dataset(dest_remote['datasetId']) self.destination_table = dataset.table(dest_remote['tableId'])