diff --git a/.gitignore b/.gitignore index 05c3650ca9cbe7312c943af8457d7eeae6c33c28..f619a0ffc8a5cea9724e4c302176b57abbb97da5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ build/ **/.terraform.lock.hcl **/.terraform/ +**/__pycache__/ diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index ff3186ac37cf8fb6377d205bf425f486f7304ac2..02bae8a111f81e4f2c5c6c5bbb121019f89ce045 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -16,13 +16,10 @@ install_test: stage: install_test image: "europe-north1-docker.pkg.dev/holi-shared/docker-hub-remote/python:3.12-slim" script: - - pip install -r src/requirements.txt - - pip install -r src/dev-requirements.txt + - pip install -r requirements.txt + - pip install -r dev-requirements.txt - pytest interruptible: true - artifacts: - paths: - - build/search_integration.zip .deploy: stage: "deploy" @@ -41,7 +38,7 @@ install_test: ENVIRONMENT_ID: $CI_ENVIRONMENT_SLUG script: - apk add zip - - pushd src/ && zip -r ../build/search_integration.zip ./*; popd + - zip -r search_integration.zip requirements.txt *.py - terraform/environments/scripts/create-or-update-env.sh "$ENVIRONMENT_ID" "$CI_COMMIT_SHA" interruptible: false diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000000000000000000000000000000000000..861adde5736f2cb21011fdc3927bb1b8a194ee86 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,15 @@ +repos: + - repo: local + hooks: + - id: ruff-lint + name: ruff-lint + language: system + entry: ruff check + pass_filenames: false + always_run: true + - id: ruff-format + name: ruff-format + language: system + entry: ruff format --check + pass_filenames: false + always_run: true \ No newline at end of file diff --git a/TODOs.md b/TODOs.md deleted file mode 100644 index 2d2d48783101cecae1690e60afedacacd61af3da..0000000000000000000000000000000000000000 --- a/TODOs.md +++ /dev/null @@ -1 +0,0 @@ -- Add .envrc stuff diff --git a/dev-requirements.txt b/dev-requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..a4a71c5808094cbc10413dc186e00f0a4ac50ace --- /dev/null +++ b/dev-requirements.txt @@ -0,0 +1,3 @@ +pytest==8.3.4 +ruff==0.8.3 +pre-commit==4.0.1 diff --git a/events.py b/events.py new file mode 100644 index 0000000000000000000000000000000000000000..85258ee021f0101875c83b0eb16300d22d17a0eb --- /dev/null +++ b/events.py @@ -0,0 +1,164 @@ +import base64 +import json +from dataclasses import dataclass + +# noinspection PyPackageRequirements +from cloudevents.http.event import CloudEvent + + +def get_event_type(cloud_event: CloudEvent) -> str: + return cloud_event.data["message"]["attributes"]["eventType"] + + +def get_event_version(cloud_event: CloudEvent) -> str: + return cloud_event.data["message"]["attributes"]["eventVersion"] + + +def _validate_event_type(cloud_event: CloudEvent, expected_event_type: str) -> None: + event_type = get_event_type(cloud_event) + if event_type != expected_event_type: + raise ValueError(f"Expected event of type {expected_event_type}, got {event_type}") + + +@dataclass +class UserPayload: + id: str + name: str + email: str + identity: str + avatar: str + about_me: str + location: str + + def __init__(self, data: dict): + self.id = data["user"].get("id") + self.name = data["user"].get("name") + self.email = data["user"].get("email") + self.identity = data["user"].get("identity") + self.avatar = data["user"].get("avatar") + self.about_me = data["user"].get("aboutMe") + self.location = data["user"].get("location") + + +@dataclass +class UserEvent: + event_type: str + event_version: str + user: UserPayload + + def __init__(self, cloud_event: CloudEvent): + self.event_type = get_event_type(cloud_event) + self.event_version = get_event_version(cloud_event) + self.user = UserPayload(json.loads(base64.b64decode(cloud_event.data["message"]["data"]))) + + def __str__(self): + return f'{type(self).__name__}(id="{self.user.id}",name="{self.user.name}",email="***",identity="{self.user.identity}",avatar="{self.user.avatar}")' + + +@dataclass +class UserUpdatedEvent(UserEvent): + def __init__(self, cloud_event: CloudEvent): + _validate_event_type(cloud_event, "UserUpdated") + super().__init__(cloud_event) + + def as_typesense_document(self) -> object: + return { + "id": f"profile_{self.user.id}", + "type": "profile", + "user_full_name": self.user.name, + "location": self.user.location, + "location_lat_lng": None, # TODO maybe enrich? + "image_url": self.user.avatar, + "link_locators": {"profile": self.user.id}, + } + + +@dataclass +class UserDeletedEvent(UserEvent): + def __init__(self, cloud_event: CloudEvent): + _validate_event_type(cloud_event, "UserDeleted") + super().__init__(cloud_event) + + +@dataclass +class LatLng: + latitude: float + longitude: float + + +@dataclass +class SpacePayload: + id: str + name: str + slug: str + avatar: str + description: str + location: str + location_lat_lng: LatLng + + def __init__(self, data: dict): + self.id = data["space"].get("id") + self.name = data["space"].get("name") + self.slug = data["space"].get("slug") + self.avatar = data["space"].get("avatar") + self.description = data["space"].get("description") + self.location = data["space"].get("location") + contains_lat_lng = ( + data["space"].get("locationLatLng") is not None + and data["space"].get("locationLatLng")["latitude"] is not None + and data["space"].get("locationLatLng")["longitude"] is not None + ) + self.location_lat_lng = ( + LatLng(data["space"].get("locationLatLng").get("latitude"), data["space"].get("locationLatLng").get("longitude")) + if contains_lat_lng + else None + ) + + +@dataclass +class SpaceEvent: + event_type: str + event_version: str + space: SpacePayload + + def __init__(self, cloud_event: CloudEvent): + self.event_type = get_event_type(cloud_event) + self.event_version = get_event_version(cloud_event) + self.space = SpacePayload(json.loads(base64.b64decode(cloud_event.data["message"]["data"]))) + + def as_typesense_document(self) -> object: + return { + "id": f"space_{self.space.id}", + "type": "space", + "title_de": self.space.name, + "title_en": self.space.name, + "description_de": self.space.description, + "description_en": self.space.description, + "location": self.space.location, + "location_lat_lng": [self.space.location_lat_lng.latitude, self.space.location_lat_lng.longitude] + if self.space.location_lat_lng is not None + else None, + "image_url": self.space.avatar, + "link_locators": {"space": self.space.slug}, + } + + +@dataclass +class SpaceCreatedEvent(SpaceEvent): + def __init__(self, cloud_event: CloudEvent): + _validate_event_type(cloud_event, "SpaceCreated") + super().__init__(cloud_event) + + +@dataclass +class SpaceUpdatedEvent(SpaceEvent): + def __init__(self, cloud_event: CloudEvent): + _validate_event_type(cloud_event, "SpaceUpdated") + super().__init__(cloud_event) + + +@dataclass +class SpaceDeletedEvent(SpaceEvent): + def __init__(self, cloud_event: CloudEvent): + _validate_event_type(cloud_event, "SpaceDeleted") + super().__init__(cloud_event) diff --git a/main.py b/main.py new file mode 100644 index 0000000000000000000000000000000000000000..fde4eb562c41bc8a6ac7c8978d2f92f0d8a922ca --- /dev/null +++ b/main.py @@ -0,0 +1,89 @@ +import re +from logging import getLogger +from typing import Union + +import functions_framework +from cloudevents.http.event import CloudEvent +from typesense.exceptions import ObjectNotFound + +from events import ( + get_event_type, + get_event_version, + UserUpdatedEvent, + UserDeletedEvent, + SpaceCreatedEvent, + SpaceUpdatedEvent, + SpaceDeletedEvent, +) +from typesense_client import TypesenseClient + +logger = getLogger(__name__) + +SEMVER_1XX_SERIES = r"^1\.\d+\.\d+$" + + +def process_user_updated_event(client: TypesenseClient, event: UserUpdatedEvent) -> None: + logger.debug(f"Processing {event}") + client.upsert(event.as_typesense_document()) + logger.info(f"Upserted user {event.user.id} in Typesense") + + +def process_user_deleted_event(client: TypesenseClient, event: UserDeletedEvent) -> None: + logger.debug(f"Processing {event}") + try: + client.delete(f"profile_{event.user.id}") + except ObjectNotFound: + pass + logger.info(f"Deleted user {event.user.id} from Typesense") + + +def process_space_upserting_event(client: TypesenseClient, event: Union[SpaceCreatedEvent, SpaceUpdatedEvent]) -> None: + logger.debug(f"Processing {event}") + client.upsert(event.as_typesense_document()) + logger.info(f"Upserted space {event.space.id} in Typesense") + + +def process_space_deleted_event(client: TypesenseClient, event: SpaceDeletedEvent) -> None: + logger.debug(f"Processing {event}") + try: + client.delete(f"space_{event.space.id}") + except ObjectNotFound: + pass + logger.info(f"Deleted space {event.space.id} in Typesense") + + +def process_event(client: TypesenseClient, event: CloudEvent) -> None: + type_version = (get_event_type(event), re.match(SEMVER_1XX_SERIES, get_event_version(event)) is not None) + match type_version: + case ("UserUpdated", True): + return process_user_updated_event(client, UserUpdatedEvent(event)) + case ("UserDeleted", True): + return process_user_deleted_event(client, UserDeletedEvent(event)) + case("UserNameUpdated", True): + # no need to process, it is sufficient to handle UserUpdated + logger.debug("Ignoring UserNameUpdated event") + return None + case ("UserEmailUpdated", True): + # no need to process, it is sufficient to handle UserUpdated + # also, no PII must be in Typesense as there's no authentication + logger.debug("Ignoring UserEmailUpdated event") + return None + case ("SpaceCreated", True): + return process_space_upserting_event(client, SpaceCreatedEvent(event)) + case ("SpaceUpdated", True): + return process_space_upserting_event(client, SpaceUpdatedEvent(event)) + case ("SpaceDeleted", True): + return process_space_deleted_event(client, SpaceDeletedEvent(event)) + case (eventType, eventVersion): + # other events than handled above are assumed to (currently) not be of interest in search + logger.debug(f"Ignoring {eventType} ({eventVersion}) event.") + return + + +@functions_framework.cloud_event +def process_message(event: CloudEvent) -> None: + client = TypesenseClient() + process_event(client, event) + + +# TODO ensure that UserUpdated or similar is emitted on updates of fields we're interested in diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000000000000000000000000000000000000..39705f784cb27e58d676cb8cd24a9ab3a1243cb4 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,45 @@ +[tool.ruff] +line-length = 119 +extend-exclude = [ + ".docker", + ".docker-cache", + "media", + "migrations", + "reports", + "static", + "terraform", +] + +[tool.ruff.lint] +select = [ + "F", # pyflakes https://docs.astral.sh/ruff/rules/#pyflakes-f + "E", # pycodestyle errors https://docs.astral.sh/ruff/rules/#error-e + "B", # bugbear https://docs.astral.sh/ruff/rules/#flake8-bugbear-b + "S", # bandit https://docs.astral.sh/ruff/rules/#flake8-bandit-s + "N", # pep8-naming https://docs.astral.sh/ruff/rules/#pep8-naming-n + "T20", # forbid print statements https://docs.astral.sh/ruff/rules/#flake8-print-t20 + "ERA", # forbid commented out code https://docs.astral.sh/ruff/rules/#eradicate-era + # "ANN", # enforce type annotations https://docs.astral.sh/ruff/rules/#flake8-annotations-ann +] +extend-ignore = [ + "E501", # "line too long" - only triggers in edge cases of the formatter + "N806", # "non-lowercase-variable-in-function" - we use that to reference model classes in functions +] +[tool.ruff.lint.extend-per-file-ignores] +# S101 use of assert - allowed in tests +"**/test_*.py" = ["S101"] +"**/tests/*.py" = ["S101"] + +[tool.ruff.format] +exclude = [ + ".docker", + ".docker-cache", + ".git", + ".venv", + "media", + "migrations", + "reports", + "static", + "terraform", + "venv", +] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..a153ebf1c999745f7ab5512185456129db14c472 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +functions_framework==3.8.2 +typesense==0.21.0 \ No newline at end of file diff --git a/src/__init__.py b/src/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/src/dev-requirements.txt b/src/dev-requirements.txt deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/src/main.py b/src/main.py deleted file mode 100644 index 8a6abe7519358670777092cbab2c4753826fa2b9..0000000000000000000000000000000000000000 --- a/src/main.py +++ /dev/null @@ -1,7 +0,0 @@ -def process_message(event, context): - """ - Entry point for the Pub/Sub-triggered Cloud Function. - :param event: Pub/Sub event payload - :param context: Metadata about the event - """ - print(f"Received event: {event['data']} with context: {context}") diff --git a/src/requirements.txt b/src/requirements.txt deleted file mode 100644 index d197ada2ff50cd90ce252664a35c1eb28165a1da..0000000000000000000000000000000000000000 --- a/src/requirements.txt +++ /dev/null @@ -1 +0,0 @@ -pytest==8.3.4 diff --git a/terraform/environments/function.tf b/terraform/environments/function.tf index 4881eaf2a3718d648f94173156aa405c1dcb0999..dd31d95161ad2180e236d367cf78eb2401ccf57d 100644 --- a/terraform/environments/function.tf +++ b/terraform/environments/function.tf @@ -8,7 +8,7 @@ resource "random_id" "main" { resource "google_storage_bucket_object" "function_source" { name = "${random_id.main.hex}/search_integration.zip" bucket = data.terraform_remote_state.holi_infra_state.outputs.gcf_sources_upload_bucket_name - source = "../../build/search_integration.zip" + source = "../../search_integration.zip" } resource "google_cloudfunctions2_function" "holi-search-integration" { @@ -39,10 +39,11 @@ resource "google_cloudfunctions2_function" "holi-search-integration" { max_instance_request_concurrency = 5 environment_variables = { - ENVIRONMENT = local.environment_name - TYPESENSE_HOST = "t9vemwkibxajcorqp-1.a1.typesense.net" - TYPESENSE_PORT = "443" - TYPESENSE_PROTOCOL = "https" + ENVIRONMENT = local.environment_name + TYPESENSE_HOST = "t9vemwkibxajcorqp-1.a1.typesense.net" + TYPESENSE_PORT = "443" + TYPESENSE_PROTOCOL = "https" + TYPESENSE_COLLECTION_NAME = local.environment_name == "production" ? "holi_search_production" : "holi_search_staging" } vpc_connector = data.terraform_remote_state.holi_infra_state.outputs.vpc_access_connector_name vpc_connector_egress_settings = "PRIVATE_RANGES_ONLY" diff --git a/terraform/environments/init.tf b/terraform/environments/init.tf index e3681012a9ef76f7c992ac63a21c049b8f9f581c..e2c1545b3f7918ba3a67709132cadeeb3707675b 100644 --- a/terraform/environments/init.tf +++ b/terraform/environments/init.tf @@ -21,6 +21,14 @@ data "terraform_remote_state" "holi_infra_state" { } } +data "terraform_remote_state" "holi_infra_monitoring_state" { + backend = "gcs" + config = { + bucket = "holi-shared-terraform-state" + prefix = "infra-monitoring" + } +} + data "terraform_remote_state" "okuna_common_state" { backend = "gcs" config = { diff --git a/terraform/environments/pubsub.tf b/terraform/environments/pubsub.tf index 85b19743ae47365f3b19d1741e183a9820e45499..e4fc2ded4f9a9cb5cbcf650cc3d24fa31fb3591b 100644 --- a/terraform/environments/pubsub.tf +++ b/terraform/environments/pubsub.tf @@ -18,3 +18,27 @@ resource "google_pubsub_subscription" "holi-search-integration-okuna-sub" { } } } + +resource "google_monitoring_alert_policy" "pubsub_unacked_messages" { + project = data.terraform_remote_state.holi_infra_state.outputs.shared_project_id + display_name = "MATCHING: PubSub Unacked Messages Alert (${local.environment_name}) / subscription_id='${google_pubsub_subscription.holi-search-integration-okuna-sub.name}'" + combiner = "OR" + conditions { + display_name = "Unacked messages condition" + condition_prometheus_query_language { + query = "rate(pubsub_googleapis_com:subscription_oldest_unacked_message_age{subscription_id='${google_pubsub_subscription.holi-search-integration-okuna-sub.name}'}[5m]) >= 1" + duration = "300s" + } + } + + alert_strategy { + auto_close = "300s" + } + + notification_channels = [local.environment_name == "production" ? data.terraform_remote_state.holi_infra_monitoring_state.outputs.monitoring_notification_channel_rocket_chat_matching_data_production_id : data.terraform_remote_state.holi_infra_monitoring_state.outputs.monitoring_notification_channel_rocket_chat_matching_data_staging_id] + + documentation { + content = "Alert triggered when Pub/Sub subscription(s) (subscription_id='${google_pubsub_subscription.holi-search-integration-okuna-sub.name}') have unacknowledged messages for more than 5 minutes. Please check the cloud function deployment and logs to see if the subscriber works correctly." + mime_type = "text/markdown" + } +} diff --git a/test_main.py b/test_main.py new file mode 100644 index 0000000000000000000000000000000000000000..5df84bc5e8c7875f0e9bf87cf2957d5e623f7cd0 --- /dev/null +++ b/test_main.py @@ -0,0 +1,146 @@ +import base64 +import json +from unittest.mock import patch + +# noinspection PyPackageRequirements +from cloudevents.http.event import CloudEvent + +from main import process_event + +attributes = { + "id": "12833783708309476", + "time": "2024-12-07T16:21:48.022Z", + "specversion": "1.0", + "datacontenttype": "application/json", + "type": "google.cloud.pubsub.topic.v1.messagePublished", + "source": "//pubsub.googleapis.com/", +} + +user_payload = { + "id": "149a3d29-a0da-4f52-8e1c-660376b7084b", + "name": "Daniel holi team", + "email": "daniel@holi.team", + "identity": "danielholi.team", + "avatar": "https://ik.imagekit.io/holi/_DEV_/avatar/149a3d29-a0da-4f52-8e1c-660376b7084b_yv31UW7lD0.jpg", + "aboutMe": "Daniel about himself", + "location": "Klitmøller, Denmark", +} + +space_payload = { + "id": "149a3d29-a0da-4f52-8e1c-660376b7084b", + "name": "Daniels Space", + "slug": "daniels-space", + "avatar": "https://ik.imagekit.io/holi/_DEV_/avatar/149a3d29-a0da-4f52-8e1c-660376b7084b_yv31UW7lD0.jpg", + "description": "This is Daniels Space", + "location": "Klitmøller, Denmark", + "locationLatLng": { + "latitude": 57.0393497, + "longitude": 8.474686, + }, +} + + +def message_data(event_type, event_version, data): + return { + "message": { + "@type": "type.googleapis.com/google.pubsub.v1.PubsubMessage", + "data": base64.b64encode(json.dumps(data).encode("utf-8")), + "attributes": { + "eventType": event_type, + "eventVersion": event_version, + }, + "messageId": "12833783708309476", + "publishTime": "2024-12-07T16:21:48.022Z", + } + } + + +@patch("typesense.client.Client") +def test_user_updated(mock_client): + process_event( + mock_client, CloudEvent(attributes, message_data("UserUpdated", "1.1.0", {"user": user_payload})) + ) + mock_client.upsert.assert_called_with( + { + "id": f'profile_{user_payload["id"]}', + "type": "profile", + "user_full_name": user_payload["name"], + "location": user_payload["location"], + "location_lat_lng": None, + "image_url": user_payload["avatar"], + "link_locators": { + "profile": user_payload["id"], + }, + } + ) + +@patch("typesense.client.Client") +def test_user_name_updated_ignored(mock_client): + process_event( + mock_client, CloudEvent(attributes, message_data("UserNameUpdated", "1.1.0", {"user": user_payload})) + ) + mock_client.upsert.assert_not_called() + + +@patch("typesense.client.Client") +def test_user_email_updated_ignored(mock_client): + process_event( + mock_client, CloudEvent(attributes, message_data("UserEmailUpdated", "1.1.0", {"user": user_payload})) + ) + mock_client.upsert.assert_not_called() + + +@patch("typesense.client.Client") +def test_user_deleted(mock_client): + process_event(mock_client, CloudEvent(attributes, message_data("UserDeleted", "1.1.0", {"user": user_payload}))) + mock_client.delete.assert_called_with(f"profile_{user_payload['id']}") + + +# noinspection DuplicatedCode +@patch("typesense.client.Client") +def test_space_created(mock_client): + process_event(mock_client, CloudEvent(attributes, message_data("SpaceCreated", "1.1.0", {"space": space_payload}))) + mock_client.upsert.assert_called_with( + { + "id": f'space_{space_payload['id']}', + "type": "space", + "title_de": space_payload["name"], + "title_en": space_payload["name"], + "description_de": space_payload["description"], + "description_en": space_payload["description"], + "location": space_payload["location"], + "location_lat_lng": [ + space_payload["locationLatLng"]["latitude"], + space_payload["locationLatLng"]["longitude"], + ], + "image_url": space_payload["avatar"], + "link_locators": { + "space": space_payload["slug"], + }, + } + ) + + +# noinspection DuplicatedCode +@patch("typesense.client.Client") +def test_space_updated(mock_client): + process_event(mock_client, CloudEvent(attributes, message_data("SpaceUpdated", "1.1.0", {"space": space_payload}))) + mock_client.upsert.assert_called_with( + { + "id": f'space_{space_payload['id']}', + "type": "space", + "title_de": space_payload["name"], + "title_en": space_payload["name"], + "description_de": space_payload["description"], + "description_en": space_payload["description"], + "location": space_payload["location"], + "location_lat_lng": [ + space_payload["locationLatLng"]["latitude"], + space_payload["locationLatLng"]["longitude"], + ], + "image_url": space_payload["avatar"], + "link_locators": { + "space": space_payload["slug"], + }, + } + ) diff --git a/typesense_client.py b/typesense_client.py new file mode 100644 index 0000000000000000000000000000000000000000..aec51d063ac031bde489f9722198142186b72044 --- /dev/null +++ b/typesense_client.py @@ -0,0 +1,53 @@ +import os +from logging import getLogger + +from typesense.client import Client + +logger = getLogger(__name__) + + +class TypesenseClient(object): + """ + Thin wrapper around the Typesense client to simplify call site and testing/mocking code. + """ + + client: Client + + def __init__(self): + self.client = Client( + { + "api_key": (os.getenv("TYPESENSE_ADMIN_API_KEY")), + "nodes": [ + { + "host": (os.getenv("TYPESENSE_HOST")), + "port": (os.getenv("TYPESENSE_PORT")), + "protocol": (os.getenv("TYPESENSE_PROTOCOL")), + } + ], + "connection_timeout_seconds": 60 * 60, + } + ) + + def upsert(self, document) -> object: + """ + Upserts the document in Typesense + + :param document: the document to upsert + :return: deserialized JSON document + """ + try: + return self.client.collections[os.getenv("TYPESENSE_COLLECTION_NAME")].documents.upsert(document) + except Exception as e: + # typesense at this point should only contain public data so it is fine to log documents (no PII) + # if PII should be added later, this must be handled in the documents __str__ function + logger.error(f'Error upserting document: {document}') + raise e + + def delete(self, id: str) -> object: + """ + Deletes the document from Typesense + + :param id: the id of the document to delete + :return: deserialized JSON document + """ + return self.client.collections[os.getenv("TYPESENSE_COLLECTION_NAME")].documents[id].delete()