Skip to content

Commit

Permalink
Initial Check-in
Browse files Browse the repository at this point in the history
  • Loading branch information
gbg3 committed Jun 10, 2022
0 parents commit bd31797
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 0 deletions.
69 changes: 69 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import json
import os
from google.cloud import tasks_v2
from google.protobuf import duration_pb2, timestamp_pb2

def kion_webhook(request):
"""Responds to any HTTP request.
Args:
request (flask.Request): HTTP request object.
Creates a Cloud Task to call a function in the background to complete the webhook.
The webhook recipient should return near immediatly, but in serverless functions this leaves no time to do real work.
As soon as a return is issued no further processing will occur, meaning everything is syncronous.
Here we create task that call out to the second function that will operate in the background to complete the webhook job in Kion.
"""

client = tasks_v2.CloudTasksClient()

project = os.environ['GCP_PROJECT']
queue = 'testing'
location = os.environ['FUNCTION_REGION']
url = format('https://{}-{}.cloudfunctions.net/{}', os.environ['FUNCTION_REGION'], os.environ['GCP_PROJECT'], os.environ['KION_FINALZER_FUNCTION'])
service_account_email = os.environ['FUNCTION_IDENTITY']
payload = request.get_json()
# Construct the fully qualified queue name.
parent = client.queue_path(project, location, queue)

# Construct the request body.
task = {
"http_request": { # Specify the type of request.
"http_method": tasks_v2.HttpMethod.POST,
"url": url, # The full url path that the task will be sent to.
"oidc_token": {
"service_account_email": service_account_email,
"audience": url,
},
}
}
if payload is not None:
if isinstance(payload, dict):
# Convert dict to JSON string
payload = json.dumps(payload)
# specify http content-type to application/json
task["http_request"]["headers"] = {"Content-type": "application/json"}

# The API expects a payload of type bytes.
converted_payload = payload.encode()

# Add the payload to the request.
task["http_request"]["body"] = converted_payload


# Use the client to build and send the task.
response = client.create_task(request={"parent": parent, "task": task})

print("Created task {}".format(response.name))

return json.dumps({"status":"complete"})



def kion_finalize_webhook(request):
request_json = request.get_json()
print(request_json.get("callback_url"))
#Despite the Kion Docs claiming that a simple empty POST to the callback url is sufficient, an API key is needed.
headers = {"Authorization": "Bearer "+os.environ['KION_API_KEY']}
response = requests.post(request_json.get("callback_url"), headers=headers).json()
print(response)
return json.dumps(body)

2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
requests
google-cloud-tasks==2.9.0

0 comments on commit bd31797

Please sign in to comment.