Skip to content
Permalink
main
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
import json
import os
from google.cloud import tasks_v2
from google.protobuf import duration_pb2, timestamp_pb2
def kion_webhook(request):
"""Responds to any HTTP request.
Args:
request (flask.Request): HTTP request object.
Creates a Cloud Task to call a function in the background to complete the webhook.
The webhook recipient should return near immediatly, but in serverless functions this leaves no time to do real work.
As soon as a return is issued no further processing will occur, meaning everything is syncronous.
Here we create task that call out to the second function that will operate in the background to complete the webhook job in Kion.
"""
client = tasks_v2.CloudTasksClient()
project = os.environ['GCP_PROJECT']
queue = 'testing'
location = os.environ['FUNCTION_REGION']
url = format('https://{}-{}.cloudfunctions.net/{}', os.environ['FUNCTION_REGION'], os.environ['GCP_PROJECT'], os.environ['KION_FINALZER_FUNCTION'])
service_account_email = os.environ['FUNCTION_IDENTITY']
payload = request.get_json()
# Construct the fully qualified queue name.
parent = client.queue_path(project, location, queue)
# Construct the request body.
task = {
"http_request": { # Specify the type of request.
"http_method": tasks_v2.HttpMethod.POST,
"url": url, # The full url path that the task will be sent to.
"oidc_token": {
"service_account_email": service_account_email,
"audience": url,
},
}
}
if payload is not None:
if isinstance(payload, dict):
# Convert dict to JSON string
payload = json.dumps(payload)
# specify http content-type to application/json
task["http_request"]["headers"] = {"Content-type": "application/json"}
# The API expects a payload of type bytes.
converted_payload = payload.encode()
# Add the payload to the request.
task["http_request"]["body"] = converted_payload
# Use the client to build and send the task.
response = client.create_task(request={"parent": parent, "task": task})
print("Created task {}".format(response.name))
return json.dumps({"status":"complete"})
def kion_finalize_webhook(request):
request_json = request.get_json()
print(request_json.get("callback_url"))
#Despite the Kion Docs claiming that a simple empty POST to the callback url is sufficient, an API key is needed.
headers = {"Authorization": "Bearer "+os.environ['KION_API_KEY']}
response = requests.post(request_json.get("callback_url"), headers=headers).json()
print(response)
return json.dumps(body)