Google Cloud | Firestore

Using Firestore And Apache Beam For Data Processing

Large scale data processing workloads can be challenging to operationalize and orchestrate. Google Cloud announced the release of a Firestore in Native Mode connector for Apache Beam that makes data processing easier than ever for Firestore users. Apache Beam is a popular open source project that supports large scale data processing with a unified batch and streaming processing model.  It’s portable, works with many different backend runners, and allows for flexible deployment. The Firestore Beam I/O Connector joins BigQuery, Bigtable, and Datastore as Google databases with Apache Beam connectors and is automatically included with the Google Cloud Platform IO module of the Apache Beam Java SDK.  

The Firestore connector can be used with a variety of Apache Beam backends, including Google Cloud Dataflow. Dataflow, an Apache Beam backend runner, provides a structure for developers to solve “embarrassingly parallel” problems. Mutating every record of your database is an example of such a problem. Using Beam pipelines removes much of the work of orchestrating the parallelization and allows developers to instead focus on the transforms on the data.

A practical application of a Firestore Connector for Beam

To better understand the use case for a Beam + Firestore Pipeline, let’s look at an example that illustrates the value of using Google Cloud Dataflow to do bulk operations on a Firestore database. Imagine you have a Firestore database and have a collection group you want to do a high number of operations on; for instance, deleting all documents within a collection group. Doing this on one worker could take a while. What if instead we could use the power of Beam to do it in parallel?

Pipeline pipeline = Pipeline.create(options);

String collectionGroupId = "collection-group-name";
RpcQosOptions rpcQosOptions = RpcQosOptions.newBuilder()

       .apply(new CreatePartitionQueryRequest(rpcQosOptions.getHintMaxNumWorkers()))
           (runQueryResponse) -> runQueryResponse.getDocument().getName())
       .apply(ParDo.of(new CreateDeleteOperation()))
       .apply("shuffle writes", Reshuffle.viaRandomKey())

This pipeline starts by creating a request for a partition query on a given collectionGroupId. We specify withNameOnlyQuery as it will save on network bandwidth; we only need the name to delete a document. From there, we use a few custom functions. We read the query response to a document object, get the document’s name, and delete a document by that name.

private static final class CreateDeleteOperation extends DoFn<String, Write> {
   public void processElement(ProcessContext c) {

Beam utilizes a watermark to ensure exactly-once processing.  As a result, the Shuffle operation stops backtracking over work that is complete already, providing both speed and correctness.

While the code to create a partition query is a bit long, it consists of constructing the protobuf request to be sent to Firestore using the generated protobuf builder.

Creating  a Partition Query:

static final class CreatePartitionQueryRequest extends
       PTransform<PCollection<String>, PCollection<PartitionQueryRequest>> {

   private final long partitionCount;

   public CreatePartitionQueryRequest(long partitionCount) {
       this.partitionCount = partitionCount;

   public PCollection<PartitionQueryRequest> expand(PCollection<String> input) {
       return input.apply("create queries", ParDo.of(new DoFn<String, PartitionQueryRequest>() {
           public void processElement(ProcessContext ctx) {
               String collectionId = ctx.element();
               String project = ctx.getPipelineOptions().as(GcpOptions.class).getProject();
               DatabaseRootName db = DatabaseRootName.of(project, "(default)");
               StructuredQuery.Builder query = StructuredQuery.newBuilder()
               PartitionQueryRequest request = PartitionQueryRequest.newBuilder()
                       .setParent(db.toString() + "/documents")
                       .setPartitionCount(partitionCount - 1)

There are many possible applications for this connector for Google Cloud users. Joining disparate data in a Firestore in Native Mode database, relating data across multiple databases, deleting a large number of entities, writing Firestore data to BigQuery, and more. We’re excited to have contributed this connector to the Apache Beam ecosystem and can’t wait to see how you use the Firestore connector to build the next great thing.

By: Chris Wilcox (Staff Engineer, Google Cloud) and Ben Whitehead (Developer Relations Engineer, Google Cloud)
Source: Google Cloud Blog

Previous Article
Google Cloud | Cloud Logging

Enabling SRE Best Practices: New Contextual Traces In Cloud Logging

Next Article
Google Cloud | Training

A Learning Journey For Members Transitioning Out Of The Military

Related Posts