GCP: publishing and reading from Google PubSub Topic using Python client libraries

Google Pub/Sub is a managed messaging platform providing a scalable, asynchronous, loosely-coupled solution for communication between application entities.

It centers around the concept of a Topic (queue).  A Publisher can put messages on the Topic, and a Subscriber can read messages from the Subscription on a Topic.

In this article, I will first use the Python Client libraries to interface with the managed Google Pub/Sub service from the CLI.  Then we will deploy this same code to a GKE Kubernetes cluster and perform the same actions by providing the proper permission to the running Kubernetes Service Account.

Enable managed Pub/Sub service

Make sure that Pub/Sub is enabled at the project level with the command below.

# enable pub/sub managed service
gcloud services enable pubsub.googleapis.com

Then create the Topic and Subscription.

export PROJECT_ID=$(gcloud config get project)
export TOPIC_ID=my-topic
export SUBSCRIBE_ID=my-sub
gcloud pubsub topics create $TOPIC_ID --project $PROJECT_ID
gcloud pubsub subscriptions create $SUBSCRIBE_ID --topic $TOPIC_ID --project $PROJECT_ID

# show objects created
gcloud pubsub topics list
gcloud pubsub subscriptions list

Download sample project

I have provided sample code for this article, go ahead and grab it.

git clone https://gitlab.com/fabianlee/docker-python-pyenv-gcp-pub-sub.git
cd docker-python-pyenv-gcp-pub-sub

Install pip module client library

As a best practice, you should be using something like pyenv or venv to isolate these pip client libraries from your system settings and isolated from other projects.  Here are the venv commands:

# create venv
pip install virtualenv
python3 -m venv .
source bin/activate

# install pip modules
pip install google-cloud-pubsub google-api-core

Run sanity test from CLI

Let’s test the applications by sending messages to the Topic and reading messages from the Subscription.

# send 10 messages to $TOPIC_ID
src/publish_test.py

# grab all messages from $SUBSCRIBE_ID
src/subscribe_test.py

This proves out the functionality, and both the Topic as well as Subscription are visible in the Google console web UI as shown below (Pub/Sub > Subscriptions).

Code details

The code used in this article is taken straight from the Google Quickstart for the Python Pub/Sub client libraries, with slight changes to take the $PROJECT_ID, $TOPIC_ID, and $SUBSCRIBE_ID from the environment variables.

I also enhanced the publish code by using a retry_strategy as recommended in the risingwave.com blog.

I am using environment variables as parameters because in the following sections we are going to deploy this as a container in a GKE Kubernetes cluster, and we can define these same environment variables in the yaml manifest.

Prepare for GKE Kubernetes deployment

A deployment within GKE would not typically have permission to publish and subscribe, because it is running as the default service account.  What we need to do is create a Google Service Account (GSA) that has the correct roles, and then link that to a Kubernetes Service Account (KSA) that the pod can adopt.

If your GKE cluster and nodepool are enabled with Workload Identity, the commands are as follows.

GSA_NAME=pubsub-sa
# create GSA
gcloud iam service-accounts create $GSA_NAME --project=$PROJECT_ID

# add roles to GSA
ROLE_NAMES="roles/pubsub.publisher roles/pubsub.subscriber roles/pubsub.viewer"
for ROLE_NAME in $ROLE_NAMES; do
  gcloud projects add-iam-policy-binding $PROJECT_ID --member "serviceAccount:${GSA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com" --role "$ROLE_NAME"
done

NAMESPACE=default
KSA_NAME=pubsub-sa
# bind GSA to Kubernetes Service account using workload identity
gcloud iam service-accounts add-iam-policy-binding ${GSA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com \
    --role roles/iam.workloadIdentityUser \
    --member "serviceAccount:${PROJECT_ID}.svc.id.goog[$NAMESPACE/$KSA_NAME]"

# create Kubernetes Service Account (KSA)
kubectl create namespace $NAMESPACE --dry-run=client -o yaml | kubectl apply -f -
kubectl create serviceaccount $KSA_NAME -n $NAMESPACE

# use annotation to bind KSA to GSA
kubectl annotate serviceaccount $KSA_NAME \
  --namespace $NAMESPACE \
  iam.gke.io/gcp-service-account=${GSA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com

With this in place, you are now ready to have a pod that runs as this Kubernetes Service Account (KSA).

Deploy example into GKE cluster

This project has a GitLab pipeline that builds/publishes its image to the public GitLab Container Registry.  You can deploy this image using the provided yaml manifest.

# apply to GKE cluster, runs as service account 'pubsub-sa'
cat gcp-pub-sub.yaml | envsubst | kubectl apply -f -
# wait for readiness
kubectl rollout status deployment gcp-pub-sub -n default --timeout=90s

Once ready, you can run the same Pub/Sub publish and subscribe scripts (but now from inside the GKE cluster).

# run publish command from inside cluster
kubectl exec -it -n default deployment/gcp-pub-sub -- /bin/bash -c "./publish_test.py"

# run subscribe command from inside cluster
kubectl exec -it -n default deployment/gcp-pub-sub -- /bin/bash -c "./subscribe_test.py"

If there is an issue with the ‘pubsub-sa’ KSA the pod is running as, you will see an error similar to “google.api_core.exceptions.PermissionDenied: 403 User not authorized to perform this action.”

Destroy Topic and Subscription

In order to avoid excessive charges for unused infrastructure, delete the Topic and Subscription created in this article.

gcloud pubsub subscriptions delete $SUBSCRIBE_ID --project $PROJECT_ID
gcloud pubsub topics delete $TOPIC_ID --project $PROJECT_ID

 

REFERENCES

Google doc, pubsub Python client libraries

mouaazfarrukh99, getting start with pubsub using Python

risingwave.com, google pubsub integration with Python

pip google-api-core module

 

NOTES