Gcloud::Pubsub::Subscription

Subscription

A named resource representing the stream of messages from a single, specific topic, to be delivered to the subscribing application.

require "gcloud"

gcloud = Gcloud.new
pubsub = gcloud.pubsub

sub = pubsub.subscription "my-topic-sub"
msgs = sub.pull
msgs.each { |msg| msg.acknowledge! }

Methods

Public Instance Methods

ack(*messages)

Alias for: acknowledge

acknowledge(*messages)

Acknowledges receipt of a message. After an ack, the Pub/Sub system can remove the message from the subscription. Acknowledging a message whose ack deadline has expired may succeed, although the message may have been sent again. Acknowledging a message more than once will not result in an error. This is only used for messages received via pull.

Parameters

messages

One or more ReceivedMessage objects or ack_id values. (ReceivedMessage or ack_id)

Example

require "gcloud"

gcloud = Gcloud.new
pubsub = gcloud.pubsub

sub = pubsub.subscription "my-topic-sub"
messages = sub.pull
sub.acknowledge messages
Also aliased as: ack

deadline()

This value is the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.

delay(new_deadline, *messages)

Modifies the acknowledge deadline for messages.

This indicates that more time is needed to process the messages, or to make the messages available for redelivery if the processing was interrupted.

Parameters

new_deadline

The new ack deadline in seconds from the time this request is sent to the Pub/Sub system. Must be >= 0. For example, if the value is 10, the new ack deadline will expire 10 seconds after the call is made. Specifying 0 may immediately make the messages available for another pull request. (Integer)

messages

One or more ReceivedMessage objects or ack_id values. (ReceivedMessage or ack_id)

Example

require "gcloud"

gcloud = Gcloud.new
pubsub = gcloud.pubsub

sub = pubsub.subscription "my-topic-sub"
messages = sub.pull
sub.delay 120, messages

delete()

Deletes an existing subscription. All pending messages in the subscription are immediately dropped.

Returns

true if the subscription was deleted.

Example

require "gcloud"

gcloud = Gcloud.new
pubsub = gcloud.pubsub

sub = pubsub.subscription "my-topic-sub"
sub.delete

endpoint()

Returns the URL locating the endpoint to which messages should be pushed.

endpoint=(new_endpoint)

Sets the URL locating the endpoint to which messages should be pushed.

exists?()

Determines whether the subscription exists in the Pub/Sub service.

Example

require "gcloud"

gcloud = Gcloud.new
pubsub = gcloud.pubsub

sub = pubsub.subscription "my-topic-sub"
sub.exists? #=> true

listen(options = {})

Poll the backend for new messages. This runs a loop to ping the API, blocking indefinitely, yielding retrieved messages as they are received.

Parameters

options

An optional Hash for controlling additional behavior. (Hash)

options[:max]

The maximum number of messages to return for this request. The Pub/Sub system may return fewer than the number specified. The default value is 100, the maximum value is 1000. (Integer)

options[:autoack]

Automatically acknowledge the message as it is pulled. The default value is false. (Boolean)

options[:delay]

The number of seconds to pause between requests when the Google Cloud service has no messages to return. The default value is 1. (Number)

Examples

require "gcloud"

gcloud = Gcloud.new
pubsub = gcloud.pubsub

sub = pubsub.subscription "my-topic-sub"
sub.listen do |msg|
  # process msg
end

The number of messages pulled per batch can be set with the max option:

require "gcloud"

gcloud = Gcloud.new
pubsub = gcloud.pubsub

sub = pubsub.subscription "my-topic-sub"
sub.listen max: 20 do |msg|
  # process msg
end

Messages can be automatically acknowledged as they are pulled with the autoack option:

require "gcloud"

gcloud = Gcloud.new
pubsub = gcloud.pubsub

sub = pubsub.subscription "my-topic-sub"
sub.listen autoack: true do |msg|
  # process msg
end

name()

The name of the subscription.

policy(options = {})

Gets the access control policy.

Parameters

options

An optional Hash for controlling additional behavior. (Hash)

options[:force]

Force the latest policy to be retrieved from the Pub/Sub service when +true. Otherwise the policy will be memoized to reduce the number of API calls made to the Pub/Sub service. The default is false. (Boolean)

Returns

A hash that conforms to the following structure:

{
  "bindings" => [{
    "role" => "roles/viewer",
    "members" => ["serviceAccount:your-service-account"]
  }],
  "rules" => []
}

Examples

By default, the policy values are memoized to reduce the number of API calls to the Pub/Sub service.

require "gcloud"

gcloud = Gcloud.new
pubsub = gcloud.pubsub

subscription = pubsub.subscription "my-subscription"
puts subscription.policy["bindings"]
puts subscription.policy["rules"]

To retrieve the latest policy from the Pub/Sub service, use the force flag.

require "gcloud"

gcloud = Gcloud.new
pubsub = gcloud.pubsub

subscription = pubsub.subscription "my-subscription"
policy = subscription.policy force: true

policy=(new_policy)

Sets the access control policy.

Parameters

new_policy

A hash that conforms to the following structure:

{
  "bindings" => [{
    "role" => "roles/viewer",
    "members" => ["serviceAccount:your-service-account"]
  }],
  "rules" => []
}

Example

require "gcloud"

gcloud = Gcloud.new
pubsub = gcloud.pubsub

viewer_policy = {
  "bindings" => [{
    "role" => "roles/viewer",
    "members" => ["serviceAccount:your-service-account"]
  }]
}
subscription = pubsub.subscription "my-subscription"
subscription.policy = viewer_policy

pull(options = {})

Pulls messages from the server. Returns an empty list if there are no messages available in the backlog. Raises an ApiError with status UNAVAILABLE if there are too many concurrent pull requests pending for the given subscription.

Parameters

options

An optional Hash for controlling additional behavior. (Hash)

options[:immediate]

When true the system will respond immediately even if it is not able to return messages. When false the system is allowed to wait until it can return least one message. No messages are returned when a request times out. The default value is true. (Boolean)

options[:max]

The maximum number of messages to return for this request. The Pub/Sub system may return fewer than the number specified. The default value is 100, the maximum value is 1000. (Integer)

options[:autoack]

Automatically acknowledge the message as it is pulled. The default value is false. (Boolean)

Returns

Array of Gcloud::Pubsub::ReceivedMessage

Examples

require "gcloud"

gcloud = Gcloud.new
pubsub = gcloud.pubsub

sub = pubsub.subscription "my-topic-sub"
sub.pull.each { |msg| msg.acknowledge! }

A maximum number of messages returned can also be specified:

require "gcloud"

gcloud = Gcloud.new
pubsub = gcloud.pubsub

sub = pubsub.subscription "my-topic-sub", max: 10
sub.pull.each { |msg| msg.acknowledge! }

The call can block until messages are available by setting the :immediate option to false:

require "gcloud"

gcloud = Gcloud.new
pubsub = gcloud.pubsub

sub = pubsub.subscription "my-topic-sub"
msgs = sub.pull immediate: false
msgs.each { |msg| msg.acknowledge! }

topic()

The Topic from which this subscription receives messages.

Returns

Topic

Example

require "gcloud"

gcloud = Gcloud.new
pubsub = gcloud.pubsub

sub = pubsub.subscription "my-topic-sub"
sub.topic.name #=> "projects/my-project/topics/my-topic"

wait_for_messages(options = {})

Pulls from the server while waiting for messages to become available. This is the same as:

subscription.pull immediate: false

Parameters

options

An optional Hash for controlling additional behavior. (Hash)

options[:max]

The maximum number of messages to return for this request. The Pub/Sub system may return fewer than the number specified. The default value is 100, the maximum value is 1000. (Integer)

options[:autoack]

Automatically acknowledge the message as it is pulled. The default value is false. (Boolean)

Returns

Array of Gcloud::Pubsub::ReceivedMessage

Example

require "gcloud"

gcloud = Gcloud.new
pubsub = gcloud.pubsub

sub = pubsub.subscription "my-topic-sub"
msgs = sub.wait_for_messages
msgs.each { |msg| msg.acknowledge! }