Loading up some Big Queries
by December 5, 2013

Filed under: Performance Monitoring

AppNeta no longer blogs on DevOps topics like this one.

Feel free to enjoy it, and check out what we can do for monitoring end user experience of the apps you use to drive your business at www.appneta.com.

Google’s BigQuery service has a lot to offer for organizations with massive datasets to query against. Rented massive parallelism is much more cost effective than trying to set up the infrastructure to do it yourself. BigQuery has some limits, though — flexible boundaries that your design needs to accomodate. At AppNeta, our naive approach to using BigQuery to power TraceView’s extended retention windows ran up against those boundaries.

Let’s Not Be Naive, Shall We?

We have a ton of analyzed trace data — URLs, SQL queries, host names, layer names, NSA watchwords (just kidding) — all analyzed out and indexed against the traces, for cross-referencing, searching, and summation. It’s a pretty big stewpot full of stuff! The data are sharded out by the configured application, put into hourly or six-hourly buckets. So our naive solution is to just gather the buckets across the shards and ship them off to BigQuery. Here we run into the first limitation — BigQuery doesn’t really want you to do more than 10,000 imports a day. That’s going to be a problem.

BigQuery Best Practices

Converting TSV to JSON is a great way to heat your apartment this winter.

A Clever Plan!

We considered several approaches for working around the limitation. One was to limit the number of imports — fewer than 24, per table, per customer. Another was to limit the number of tables — possibly put all customers into the same table. A third option was to put all customers into the same table initially, then run queries to extract each customer’s data into its own table. This option had tremendous appeal — it scaled against the 10,000 limit as a function of frequency and number of tables, rather than number of customers. So this is what we set out to do.

The system we envisioned had two components, and three distinct phases. In the first component — and the first phase — we translate MySQL backup files into JSON files suitable for import into BigQuery. The resultant files are gzipped and uploaded into Google Cloud Storage.

In the second phase, we group the files together by day, and by base table type, and do a data import. All customer data goes into the same big table.

When phase two completes, in phase three we fire off a query job for each customer, pulling their data out of the main table and putting it into a customer-specific daily table.

Phase One: Upload to GCS

At one point we considered aggregating all the JSON files on a local filesystem, then uploading a single file directly into BigQuery. That put a lot of memory and disk demand on the local node. BigQuery will read files out of Google Cloud Storage, though, so uploading the JSON files there, then combining them in the import saves some local procesing time as well as some disk.

From Python, the right way to use Google Cloud Storage is through the boto package. Since we were already using boto to interface with S3, this was convenient! We did need to upgrade our version of boto, however. Following the instructions on Google’s site we installed gsutil using pip, we ran gsutil config, and we were off to the races.

The tutorials on Google’s site provided a pretty good introduction to using boto with Google Cloud Storage. Google’s tutorials work a little differently than the standard boto tutorials. While the regular boto documentation has you create Connections, open Buckets, and manipulate Keys, Google’s documentation has you creating storage_uri objects and performing operations on those. Some manipulation of the underlying objects suggests that there’s more commonality than the two sets of documentation might suggest.

Phase Two: Import to BigQuery

With the files uploaded to Google’s Cloud Storage, it’s time to import them into BigQuery. We do this using Google’s Python API client, which means we form up a JSON document that configures the job, use the client to submit it, then wait for it to complete. With the number of jobs we have to submit, we usually won’t spend too much time waiting.

The configuration document is fairly straightforward. Here’s the Python data structure we use to cook it up:

<br />job_data = {<br /><%%KEEPWHITESPACE%%>    'projectId': self.project_id,<br /><%%KEEPWHITESPACE%%>    'configuration': {<br /><%%KEEPWHITESPACE%%>        'load': {<br /><%%KEEPWHITESPACE%%>            'sourceFormat': 'NEWLINE_DELIMITED_JSON',<br /><%%KEEPWHITESPACE%%>            'sourceUris': self.filenames,<br /><%%KEEPWHITESPACE%%>            'schema': {<br /><%%KEEPWHITESPACE%%>                'fields': self.schema<br /><%%KEEPWHITESPACE%%>            },<br /><%%KEEPWHITESPACE%%>            'destinationTable': {<br /><%%KEEPWHITESPACE%%>                'projectId': self.project_id,<br /><%%KEEPWHITESPACE%%>                'datasetId': self.dataset,<br /><%%KEEPWHITESPACE%%>                'tableId': self.bq_import_tablename<br /><%%KEEPWHITESPACE%%>            },<br /><%%KEEPWHITESPACE%%>            'writeDisposition': 'WRITE_APPEND'<br /><%%KEEPWHITESPACE%%>        }<br /><%%KEEPWHITESPACE%%>    }<br />}<br />

We feed this to BigQuery using the Python API:

<br />bq = BQConfiguration()<br />http = bq.credentials.authorize(httplib2.Http())<br />bq = build('bigquery', 'v2', http=http)<br />jc = bq.jobs()<br /><br />insert_response = jc.insert(projectId=self.project_id, body=job_data).execute()<br />log.debug("Insert response: %r", insert_response)<br />self.job_id = insert_response['jobReference']['jobId']<br />

The insert response provides all sorts of information. We log it, but the only information we actually use is the job ID. We use that later, when waiting on job status.

<br />while self.jobs_remaining:<br /><%%KEEPWHITESPACE%%>    job = jc.get(projectId=self.project_id, jobId=self.job_id).execute()<br /><%%KEEPWHITESPACE%%>    if 'errorResult' in job['status'].keys():<br /><%%KEEPWHITESPACE%%>        err = job['status']['errorResult']<br /><%%KEEPWHITESPACE%%>        log.error('Error uploading %s: %s; %s', self.bq_import_tablename, err['reason'], err['message'])<br /><%%KEEPWHITESPACE%%>        raise BQError(repr(err))<br /><%%KEEPWHITESPACE%%>    if job['status']['state'] == 'DONE':<br /><%%KEEPWHITESPACE%%>        self.complete_job(job)<br />

And like that, our data now live in BigQuery.

Phase Three: Split Into Customer-Specific Tables

In phase three of our upload process, we split the data back out by customer. For some portion of our work, this proves to be a bridge too far. For the rest, we run a query operation that creates a new table.

BigQuery Best Practices

We use queries to explode our big tables into customer-specific tables.

<br />query = "select * from {0} where customer_id = {1}"<br />query.format(self.bq_import_tablename, customer_id)<br />job_data = {<br /><%%KEEPWHITESPACE%%>    'projectId': self.project_id,<br /><%%KEEPWHITESPACE%%>    'configuration': {<br /><%%KEEPWHITESPACE%%>        'query': {<br /><%%KEEPWHITESPACE%%>            'query': query,<br /><%%KEEPWHITESPACE%%>            'defaultDataset': {<br /><%%KEEPWHITESPACE%%>                'projectId': self.project_id,<br /><%%KEEPWHITESPACE%%>                'datasetId': self.dataset,<br /><%%KEEPWHITESPACE%%>            },<br /><%%KEEPWHITESPACE%%>            'destinationTable': {<br /><%%KEEPWHITESPACE%%>                'projectId': self.project_id,<br /><%%KEEPWHITESPACE%%>                'datasetId': self.dataset,<br /><%%KEEPWHITESPACE%%>                'tableId': self.bq_dest_table<br /><%%KEEPWHITESPACE%%>            },<br /><%%KEEPWHITESPACE%%>            'priority': 'BATCH',<br /><%%KEEPWHITESPACE%%>            'writeDisposition': 'WRITE_APPEND',<br /><%%KEEPWHITESPACE%%>            'createDisposition': 'CREATE_IF_NEEDED',<br /><%%KEEPWHITESPACE%%>            'allowLargeResults': True,<br /><%%KEEPWHITESPACE%%>            'preserveNulls': True,<br /><%%KEEPWHITESPACE%%>        }<br /><%%KEEPWHITESPACE%%>    }<br />}<br />

This query extracts the relevant data into a customer-specific table, which is much friendlier for querying.

But we ran into a problem here for much of our data. Many of our tables relied on nested data structures — “record” field types with “repeated” mode. When querying tables with nested data structures, BigQuery automatically flattens the repeated records into the query results. There’s no way around this: either you omit the nested fields, or you get one row for every row in your nested table. That blocked our approach. We had to fall back to once-daily imports for those tables.

Postscript: Cleanup’s Always the Worst Part

While we lost some ground due to the data flattening, the project was still a go. Implementation had a few surprises, but the biggest was probably around deleting and moving files in Google Cloud Storage. Our earliest implementations were designed around tracking state by shuffling files through buckets in GCS. That turned out to be so slow that we moved to keeping track of job status locally. Things again seemed good until we hit the job with full production-scale data. Then we began to see that our job spent most of its run time waiting for files to be deleted from Google Cloud Storage. Our solution was to do bulk deletions. Unfortunately, the boto interface we were using to interoperate with GCS does not support bulk delete. We had to turn to the (experimental) JSON API. Here’s how that works:

<br />bq = BQCredentials()<br />http = bq.credentials.authorize(httplib2.Http())<br />service = build(serviceName='storage', version='v1beta2', http=http)<br />while filenames:<br /><%%KEEPWHITESPACE%%>    batch = BatchHttpRequest()<br /><%%KEEPWHITESPACE%%>    count = 0<br /><%%KEEPWHITESPACE%%>    for (bucket, obj) in filenames:<br /><%%KEEPWHITESPACE%%>        count += 1<br /><%%KEEPWHITESPACE%%>        batch.add(service.objects().delete(bucket=bucket, object=obj), callback=cb)<br /><br /><%%KEEPWHITESPACE%%>    log.info("added %d deletes to batch request, executing", count)<br /><%%KEEPWHITESPACE%%>    batch.execute(http=http)<br /><%%KEEPWHITESPACE%%>    filenames = filenames[count:]<br />

Batch deletes were many, many times faster. Offloading those deletes meant a tremendous reduction of runtime for the main loader job. Now its spends most of its time waiting for BigQuery to process the import jobs. Using the batch style for deletes was also much faster than using the boto interface. Not only could we parallelize the deletes, but they happened dramatically faster.