“Google Cloud Task Queue with App Engine”

Gokhan Konuk
codeshakeio
Published in
9 min readFeb 8, 2021

--

🗣️ Introduction

This post describes what Cloud Tasks tasks and queues are, and when and how to use them. Cloud Tasks lets you separate out pieces of work that can be performed independently, outside of your main application flow, and send them off to be processed, asynchronously, using handlers that you create. These independent pieces of work are called tasks.

Distributed task queues

Cloud Tasks is a fully managed service that allows you to manage the execution, dispatch, and delivery of a large number of distributed tasks. You can asynchronously perform work outside of a user request. Your tasks can be executed on App Engine or any arbitrary HTTP endpoint.

Build more responsive applications

Asynchronous execution is a well-established way to reduce request latency and make your application more responsive. Cloud Tasks allows you to organize and control those requests with features like scheduling, deduplication, configurable retry policies, and version redirection.

Decouple and scale microservices

Cloud Tasks help you better structure and scale your application: Implementing task handlers in dedicated services allows microservices to scale independently.

Manage resource consumption

Cloud Tasks helps you better control and smooth the load on your services by rate-limiting your queues. It helps you easily manage the execution, dispatch, and delivery of your distributed tasks.

Handle releases and incidents gracefully

Cloud Tasks allows you to preserve your requests in flight. Pausing, retrying, or redirecting to a new version can give you room to handle a bug or stage a deployment without dropping requests.

👉 Contents

  1. General workflow for Cloud Task Queue, How To?
    — Terms
    — Metrics
    — Creating Cloud Task Queues
    — Client Library
    — Creating App Engine Tasks
    — Creating App Engine Task Handlers
    — Managing queues and tasks
  2. Task Queue Overview
    — Push queues & pull queues
    — Using Pull Queues in Java & Workflow
    — Creating Pull Queues & Tasks
    — Leasing Pull Tasks

🧰 Prerequisites

  1. Google Cloud Platform Console and Cloud Tasks API enabled
  2. GCP knowledge
  3. Java knowledge

📖 How-to

Http Targets
App engine targets

The general workflow is as follows

  1. Create a worker to process the tasks.
  2. Create a queue.
  3. Create tasks programmatically and add them to the queue.
  4. The Cloud Tasks service returns an OK to the originating application. This indicates that the task has been successfully written to Cloud Tasks storage, making the create task request both highly available and durable.
  5. The tasks are passed to the worker.
  6. The worker processes the task.
  7. To complete the sequence, the worker returns a 2xx success status code to the Cloud Tasks service.

Terms

The following table shows key terms that describe aspects of Cloud Tasks' behavior.

Metrics

The following predefined Cloud Task metrics are available via Stackdriver.

Creating Cloud Task Queues

Use the Google Cloud SDK to create your queue:

gcloud tasks queues create [QUEUE_ID]

where:

  • QUEUE_ID is the identifier you assign to your queue, for example, my-queue. It can take a few minutes for a newly created queue to be available. Then use describe to verify that your queue was created successfully:
gcloud tasks queues describe [QUEUE_ID]

You can view all queues in your project by calling:

gcloud tasks queues list

Client Library

gradle:

compile 'com.google.cloud:google-cloud-tasks:1.30.11'

imports:

import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.CloudTasksSettings;

Creating App Engine Tasks

// Instantiates a client.
try (CloudTasksClient client = CloudTasksClient.create()) {
// Variables provided by the CLI.
// projectId = "my-project-id";
// queueName = "my-appengine-queue";
// location = "us-central1";
// payload = "hello";
// Construct the fully qualified queue name.
String queuePath = QueueName.of(projectId, location, queueName).toString();
// Construct the task body.
Task.Builder taskBuilder =
Task.newBuilder()
.setAppEngineHttpRequest(
AppEngineHttpRequest.newBuilder()
.setBody(ByteString.copyFrom(payload, Charset.defaultCharset()))
.setRelativeUri("/tasks/create")
.setHttpMethod(HttpMethod.POST)
.build());
if (params.hasOption(IN_SECONDS_OPTION.getOpt())) {
// Add the scheduled time to the request.
int seconds = Integer.parseInt(params.getOptionValue(IN_SECONDS_OPTION.getOpt()));
taskBuilder.setScheduleTime(
Timestamp.newBuilder() .setSeconds(Instant.now(Clock.systemUTC()).plusSeconds(seconds).getEpochSecond()));
}
// Send create task request.
Task task = client.createTask(queuePath, taskBuilder.build());
System.out.println("Task created: " + task.getName());
}

Creating App Engine Task Handlers

@WebServlet(
name = "Tasks",
description = "Create Cloud Task",
urlPatterns = "/tasks/create"
)
public class TaskServlet extends HttpServlet {
private static Logger log = Logger.getLogger(TaskServlet.class.getName());
@Override
public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
log.info("Received task request: " + req.getServletPath());
String body = req.getReader()
.lines()
.reduce("", (accumulator, actual) -> accumulator + actual);
if (!body.isEmpty()) {
log.info("Request payload: " + body);
String output = String.format("Received task with payload %s", body);
resp.getOutputStream().write(output.getBytes());
log.info("Sending response: " + output);
resp.setStatus(HttpServletResponse.SC_OK);
} else {
log.warning("Null payload received in request to " + req.getServletPath());
}
}
}

Managing queues and tasks

You can use the below functions: go to the Google Cloud Platform Console page

  • Deleting a task from a queue
  • Purging all tasks from a queue
  • Pausing queues
  • Deleting queues

Task Queue Overview

Task queues let applications perform work, called tasks, asynchronously outside of a user request. If an app needs to execute work in the background, it adds tasks to task queues. The tasks are executed later, by worker services.

Push queues and pull queues

Task queues come in two flavors, push and pull. The manner in which the Task Queue service dispatches task requests to worker services is different for the different queues.

Push queues

One typical push queue use case is a “slow” operation. Consider a social network messaging system. Every time a user sends a message, the network needs to update the followers of the sender. This can be a very time-consuming operation. Using a push queue, the application can enqueue a task for each message as it arrives to be dispatched to a worker service for processing. When the worker receives the task request, it can retrieve the sender’s list of followers and update the DB for each one. The worker can be made even more efficient by enqueuing another push task for each database update.

Another use for push queues is scheduled tasks. Imagine an application that implements an ad campaign. A group of tasks written to send out emails can be added to a push queue with instructions to withhold the tasks until a specified time in the future. When the due date arrives, the Task Queue service will begin to issue requests to execute the tasks.

Pull queues

Pull queues work well when you need to batch tasks together for efficient execution. One solution takes advantage of the ability to attach a tag to a pull task. Workers can lease a group of tasks that have the same tag. A typical example might be an app that maintains leaderboards for numerous different games, with many players and groups constantly in play. Every time there is a new high score, the app can enqueue a pull task with the score and the player, and use the game ID as a task tag. A worker periodically “wakes up”, leases a group of tasks with the same game ID, and updates the leaderboard. You can lease tasks explicitly, using a specified tag value, or let the service decide which group of similarly tagged tasks to send.

Batching with tags can be very powerful. Since tags can be dynamically generated while your app is running, a worker can handle new game IDs with no special effort.

Using Pull Queues in Java

Using pull queues, you can also group related tasks using tags and then configure your worker to pull multiple tasks with a certain tag all at once. This process is called batching.

Pull queues in the App Engine standard environment are created by setting a property in a configuration file called queue.xml.

Pull queue workflow

Workers that process tasks from pull queues must be defined within a service that runs in the App Engine standard environment.

The workflow is as follows:

  1. You create a pull queue, using queue.xml.
  2. You create tasks and add them to the queue.
  3. The worker you have created leases the task, using TaskQueue.
  4. App Engine sends task data to the worker in the lease response.
  5. The worker processes the task. If the task fails to execute before the lease expires, the worker can modify the lease duration. If the lease expires, the task will be available to be leased to another worker.
  6. After a task is processed successfully, the worker deletes it.

Creating Pull Queues

You create pull queues using the queue configuration file, queue.xml, or queue.yaml, for your application. The process is the same as creating named push queues, with a specialized directive, <mode>pull</mode>, added to the file.

queue.yaml:

queue:  
- name: my-queue-name
mode: pull

To upload the file:

gcloud app deploy queue.yaml

Creating Pull Tasks

Once you have created a pull queue, you can create tasks and add them to the queue.

First, you need the name of the queue, which is defined in queue.xml. Then you use the builder and TaskOptions.Method.PULL to add the task.

The following example puts tasks in a pull queue named pull-queue:

First, get the queue using the queue name defined in the queue.xml

Queue q = QueueFactory.getQueue("pull-queue");

Then use the queue’s add() method with TaskOptions.Method.PULL to place tasks in a pull queue named pull-queue

q.add(TaskOptions.Builder.withMethod(TaskOptions.Method.PULL)
.payload(content.toString()));

Leasing Pull Tasks

Once tasks are in a pull queue, a worker can lease them. After the tasks are processed the worker must delete them.

Leasing Tasks:

After the tasks are in the queue, a worker can lease one or more of them using the leaseTasks() method. There may be a short delay before tasks recently added using add() become available via leaseTasks().

When you request a lease, you specify the number of tasks to lease (up to a maximum of 1,000 tasks) and the duration of the lease in seconds (up to a maximum of one week). The lease duration needs to be long enough to ensure that the slowest task will have time to finish before the lease period expires. You can modify a task lease using modifyTaskLease().

Leasing a task makes it unavailable for processing by another worker, and it remains unavailable until the lease expires.

List<TaskHandle> tasks = q.leaseTasks(3600, TimeUnit.SECONDS, numberOfTasksToLease);

Batching with task tags:

Not all tasks are alike; your code can “tag” tasks and then choose tasks to lease by tag. The tag acts as a filter.

q.add(
TaskOptions.Builder.withMethod(TaskOptions.Method.PULL)
.payload(content.toString())
.tag("process".getBytes()));

Then lease the filtered tasks:

// Lease only tasks tagged with "process"
List<TaskHandle> tasks =
q.leaseTasksByTag(3600, TimeUnit.SECONDS, numberOfTasksToLease, "process");
// You can also specify a tag to lease via LeaseOptions passed to leaseTasks.

Deleting tasks:

q.deleteTask(task);// Lease only tasks tagged with "process"
List<TaskHandle> tasks =
q.leaseTasksByTag(3600, TimeUnit.SECONDS, numberOfTasksToLease, "process");
// You can also specify a tag to lease via LeaseOptions passed to leaseTasks.

And remember — always be a good artist whatever you do.

References

--

--