diff --git a/events.py b/events.py index 9125e76ca64f32840f260a99149b23754a4c5c3c..9194a7ebd98b77f656c447e87385d8a235c203e7 100644 --- a/events.py +++ b/events.py @@ -56,9 +56,9 @@ class UserEvent: @dataclass -class UserNameUpdatedEvent(UserEvent): +class UserUpdatedEvent(UserEvent): def __init__(self, cloud_event: CloudEvent): - _validate_event_type(cloud_event, "UserNameUpdated") + _validate_event_type(cloud_event, "UserUpdated") super().__init__(cloud_event) def as_typesense_document(self) -> object: diff --git a/main.py b/main.py index 8e4e34b5c9e103192cc03e8c510afad8553386b6..fde4eb562c41bc8a6ac7c8978d2f92f0d8a922ca 100644 --- a/main.py +++ b/main.py @@ -1,14 +1,15 @@ import re -import functions_framework +from logging import getLogger from typing import Union +import functions_framework from cloudevents.http.event import CloudEvent from typesense.exceptions import ObjectNotFound from events import ( - UserNameUpdatedEvent, get_event_type, get_event_version, + UserUpdatedEvent, UserDeletedEvent, SpaceCreatedEvent, SpaceUpdatedEvent, @@ -16,20 +17,18 @@ from events import ( ) from typesense_client import TypesenseClient -from logging import getLogger - logger = getLogger(__name__) SEMVER_1XX_SERIES = r"^1\.\d+\.\d+$" -def process_user_name_updated_event(client: TypesenseClient, event: UserNameUpdatedEvent): +def process_user_updated_event(client: TypesenseClient, event: UserUpdatedEvent) -> None: logger.debug(f"Processing {event}") client.upsert(event.as_typesense_document()) logger.info(f"Upserted user {event.user.id} in Typesense") -def process_user_deleted_event(client: TypesenseClient, event: UserDeletedEvent): +def process_user_deleted_event(client: TypesenseClient, event: UserDeletedEvent) -> None: logger.debug(f"Processing {event}") try: client.delete(f"profile_{event.user.id}") @@ -38,13 +37,13 @@ def process_user_deleted_event(client: TypesenseClient, event: UserDeletedEvent) logger.info(f"Deleted user {event.user.id} from Typesense") -def process_space_upserting_event(client: TypesenseClient, event: Union[SpaceCreatedEvent, SpaceUpdatedEvent]): +def process_space_upserting_event(client: TypesenseClient, event: Union[SpaceCreatedEvent, SpaceUpdatedEvent]) -> None: logger.debug(f"Processing {event}") client.upsert(event.as_typesense_document()) logger.info(f"Upserted space {event.space.id} in Typesense") -def process_space_deleted_event(client: TypesenseClient, event: SpaceDeletedEvent): +def process_space_deleted_event(client: TypesenseClient, event: SpaceDeletedEvent) -> None: logger.debug(f"Processing {event}") try: client.delete(f"space_{event.space.id}") @@ -53,36 +52,36 @@ def process_space_deleted_event(client: TypesenseClient, event: SpaceDeletedEven logger.info(f"Deleted space {event.space.id} in Typesense") -def process_event(client: TypesenseClient, event: CloudEvent): +def process_event(client: TypesenseClient, event: CloudEvent) -> None: type_version = (get_event_type(event), re.match(SEMVER_1XX_SERIES, get_event_version(event)) is not None) match type_version: - case ("UserNameUpdated", True): - process_user_name_updated_event(client, UserNameUpdatedEvent(event)) + case ("UserUpdated", True): + return process_user_updated_event(client, UserUpdatedEvent(event)) case ("UserDeleted", True): - process_user_deleted_event(client, UserDeletedEvent(event)) + return process_user_deleted_event(client, UserDeletedEvent(event)) + case("UserNameUpdated", True): + # no need to process, it is sufficient to handle UserUpdated + logger.debug("Ignoring UserNameUpdated event") + return None case ("UserEmailUpdated", True): - # no need to process, it is sufficient to handle UserNameUpdated + # no need to process, it is sufficient to handle UserUpdated + # also, no PII must be in Typesense as there's no authentication logger.debug("Ignoring UserEmailUpdated event") - pass + return None case ("SpaceCreated", True): - process_space_upserting_event(client, SpaceCreatedEvent(event)) - pass + return process_space_upserting_event(client, SpaceCreatedEvent(event)) case ("SpaceUpdated", True): - process_space_upserting_event(client, SpaceUpdatedEvent(event)) - pass + return process_space_upserting_event(client, SpaceUpdatedEvent(event)) case ("SpaceDeleted", True): - process_space_deleted_event(client, SpaceDeletedEvent(event)) - pass + return process_space_deleted_event(client, SpaceDeletedEvent(event)) case (eventType, eventVersion): - # UserEmailUpdated: no need to process, it is sufficient to handle UserNameUpdated because this is emitted - # during signup and search should not display email addresses (PII) - # All other events are assumed to (currently) not be of interest in search + # other events than handled above are assumed to (currently) not be of interest in search logger.debug(f"Ignoring {eventType} ({eventVersion}) event.") - pass + return @functions_framework.cloud_event -def process_message(event: CloudEvent): +def process_message(event: CloudEvent) -> None: client = TypesenseClient() process_event(client, event) diff --git a/terraform/environments/pubsub.tf b/terraform/environments/pubsub.tf index d5cfe620b906051a5509fbba02973ff9ed39e305..e4fc2ded4f9a9cb5cbcf650cc3d24fa31fb3591b 100644 --- a/terraform/environments/pubsub.tf +++ b/terraform/environments/pubsub.tf @@ -26,7 +26,7 @@ resource "google_monitoring_alert_policy" "pubsub_unacked_messages" { conditions { display_name = "Unacked messages condition" condition_prometheus_query_language { - query = "rate(pubsub_googleapis_com:subscription_oldest_unacked_message_age{subscription_id='${google_pubsub_subscription.holi-search-integration-okuna-sub.name}'}[5m]) > 0" + query = "rate(pubsub_googleapis_com:subscription_oldest_unacked_message_age{subscription_id='${google_pubsub_subscription.holi-search-integration-okuna-sub.name}'}[5m]) >= 1" duration = "300s" } } @@ -38,7 +38,7 @@ resource "google_monitoring_alert_policy" "pubsub_unacked_messages" { notification_channels = [local.environment_name == "production" ? data.terraform_remote_state.holi_infra_monitoring_state.outputs.monitoring_notification_channel_rocket_chat_matching_data_production_id : data.terraform_remote_state.holi_infra_monitoring_state.outputs.monitoring_notification_channel_rocket_chat_matching_data_staging_id] documentation { - content = "Alert triggered when Pub/Sub subscription(s) (subscription_id='${google_pubsub_subscription.holi-search-integration-okuna-sub.name}') have unacknowledged messages for more than 5 minutes" + content = "Alert triggered when Pub/Sub subscription(s) (subscription_id='${google_pubsub_subscription.holi-search-integration-okuna-sub.name}') have unacknowledged messages for more than 5 minutes. Please check the cloud function deployment and logs to see if the subscriber works correctly." mime_type = "text/markdown" } } diff --git a/test_main.py b/test_main.py index 54214b1de533f6528e25280c0ada07fdf6b78892..cdb26041b8eff8cfbb59d01f656809bf37e5fdf0 100644 --- a/test_main.py +++ b/test_main.py @@ -56,9 +56,9 @@ def message_data(event_type, event_version, data): @patch("typesense.client.Client") -def test_user_name_updated(mock_client): +def test_user_updated(mock_client): process_event( - mock_client, CloudEvent(attributes, message_data("UserNameUpdated", "1.0.0", {"user": user_payload})) + mock_client, CloudEvent(attributes, message_data("UserUpdated", "1.1.0", {"user": user_payload})) ) mock_client.upsert.assert_called_with( { @@ -77,17 +77,32 @@ def test_user_name_updated(mock_client): } ) +@patch("typesense.client.Client") +def test_user_name_updated_ignored(mock_client): + process_event( + mock_client, CloudEvent(attributes, message_data("UserNameUpdated", "1.1.0", {"user": user_payload})) + ) + mock_client.upsert.assert_not_called() + + +@patch("typesense.client.Client") +def test_user_email_updated_ignored(mock_client): + process_event( + mock_client, CloudEvent(attributes, message_data("UserEmailUpdated", "1.1.0", {"user": user_payload})) + ) + mock_client.upsert.assert_not_called() + @patch("typesense.client.Client") def test_user_deleted(mock_client): - process_event(mock_client, CloudEvent(attributes, message_data("UserDeleted", "1.0.0", {"user": user_payload}))) + process_event(mock_client, CloudEvent(attributes, message_data("UserDeleted", "1.1.0", {"user": user_payload}))) mock_client.delete.assert_called_with(f"profile_{user_payload['id']}") # noinspection DuplicatedCode @patch("typesense.client.Client") def test_space_created(mock_client): - process_event(mock_client, CloudEvent(attributes, message_data("SpaceCreated", "1.0.0", {"space": space_payload}))) + process_event(mock_client, CloudEvent(attributes, message_data("SpaceCreated", "1.1.0", {"space": space_payload}))) mock_client.upsert.assert_called_with( { "id": f'space_{space_payload['id']}', @@ -112,7 +127,7 @@ def test_space_created(mock_client): # noinspection DuplicatedCode @patch("typesense.client.Client") def test_space_updated(mock_client): - process_event(mock_client, CloudEvent(attributes, message_data("SpaceUpdated", "1.0.0", {"space": space_payload}))) + process_event(mock_client, CloudEvent(attributes, message_data("SpaceUpdated", "1.1.0", {"space": space_payload}))) mock_client.upsert.assert_called_with( { "id": f'space_{space_payload['id']}',