diff --git a/events.py b/events.py new file mode 100644 index 0000000000000000000000000000000000000000..1819e3f7ac18b2a99571465dbefd8c7cd3aa2260 --- /dev/null +++ b/events.py @@ -0,0 +1,52 @@ +import base64 +import json + +from cloudevents.http.event import CloudEvent +from dataclasses import dataclass + +def get_event_type(cloud_event: CloudEvent) -> str: + return cloud_event.data['message']['attributes']['eventType'] + +def get_event_version(cloud_event: CloudEvent) -> str: + return cloud_event.data['message']['attributes']['eventVersion'] + +@dataclass +class UserPayload: + id: str + name: str + email: str + identity: str + avatar: str + + def __init__(self, data: dict): + self.id = data['user']['id'] + self.name = data['user']['name'] + self.email = data['user']['email'] + self.identity = data['user']['identity'] + self.avatar = data['user']['avatar'] + + +@dataclass +class UserEvent(UserPayload): + event_type: str + event_version: str + + def __init__(self, cloud_event: CloudEvent): + self.event_type = get_event_type(cloud_event) + self.event_version = get_event_version(cloud_event) + data = cloud_event.data['message']['data'] + super().__init__(json.loads(base64.b64decode(data))) + +@dataclass +class UserNameUpdatedEvent(UserEvent): + def __init__(self, cloud_event: CloudEvent): + if get_event_type(cloud_event) != 'UserNameUpdated': + raise ValueError(f'Expected event of type UserNameUpdated, got {data['eventType']}') + super().__init__(cloud_event) + +@dataclass +class UserDeletedEvent(UserEvent): + def __init__(self, cloud_event: CloudEvent): + if get_event_type(cloud_event) != 'UserDeleted': + raise ValueError(f'Expected event of type UserDeleted, got {data['eventType']}') + super().__init__(cloud_event) diff --git a/main.py b/main.py index 91d310c3711a1a8364cad554524a26a932e9e3fe..1ee6075d783f8cbe88326ff5cb5a1892570aa3d4 100644 --- a/main.py +++ b/main.py @@ -1,53 +1,23 @@ -import base64 -import json - import functions_framework from cloudevents.http.event import CloudEvent -from dataclasses import dataclass - -@dataclass -class UserPayload: - id: str - name: str - email: str - identity: str - avatar: str - - def __init__(self, data: dict): - print(f"UserPayload: {data}") - self.id = data['user']['id'] - self.name = data['user']['name'] - self.email = data['user']['email'] - self.identity = data['user']['identity'] - self.avatar = data['user']['avatar'] - - -@dataclass -class UserNameUpdatedEvent(UserPayload): - event_type: str - event_version: str - - def __init__(self, event: CloudEvent): - message = event.data['message'] - if message['attributes']['eventType'] != 'UserNameUpdated': - raise ValueError(f'Expected event of type UserNameUpdated, got {data['eventType']}') - self.event_type = message['attributes']['eventType'] - self.event_version = message['attributes']['eventVersion'] - super().__init__(json.loads(decode_message_data(message['data']))) - +from events import UserNameUpdatedEvent, get_event_type, get_event_version, UserDeletedEvent +from dataclasses import asdict def process_user_name_updated_event(event: UserNameUpdatedEvent): - print(f'process_user_name_updated_event: {event}') + print(f'process_user_name_updated_event: {asdict(event)}') -def decode_message_data(data: str): - return base64.b64decode(data) +def process_user_deleted_event(event: UserDeletedEvent): + print(f'process_user_deleted_event: {asdict(event)}') -def process_event(cloud_event: CloudEvent): - attributes = cloud_event.data['message']['attributes'] - if attributes['eventType'] == 'UserNameUpdated' and attributes['eventVersion'] == '1.0.0': # TODO backwards semver - process_user_name_updated_event(UserNameUpdatedEvent(cloud_event)) +def process_event(event: CloudEvent): + type_version = (get_event_type(event), get_event_version(event)) + match type_version: + case ('UserNameUpdated', '1.0.0'): + return process_user_name_updated_event(UserNameUpdatedEvent(event)) + case ('UserDeleted', '1.0.0'): + return process_user_deleted_event(UserDeletedEvent(event)) @functions_framework.cloud_event -def process_message(cloud_event: CloudEvent): - print(f"Received message: {cloud_event}") - process_event(cloud_event) +def process_message(event: CloudEvent): + #print(f"Received message: {event}") + process_event(event) diff --git a/test_main.py b/test_main.py index f5cd62ed0185b1e206c6e0cc93da42a2a6d7e662..c136ddb352a0423a162a4928131cc18a35248630 100644 --- a/test_main.py +++ b/test_main.py @@ -1,5 +1,8 @@ -from cloudevents.http.event import CloudEvent +import base64 +import json +from cloudevents.http import CloudEvent +from cloudevents.http.event import CloudEvent from main import process_message attributes = { @@ -10,22 +13,32 @@ attributes = { 'type': 'google.cloud.pubsub.topic.v1.messagePublished', 'source': '//pubsub.googleapis.com/' } -data = { - 'message': { - '@type': 'type.googleapis.com/google.pubsub.v1.PubsubMessage', - 'data': 'eyJ1c2VyIjogeyJpZCI6ICIxNDlhM2QyOS1hMGRhLTRmNTItOGUxYy02NjAzNzZiNzA4NGIiLCAibmFtZSI6ICJEYW5pZWwgaG9saSB0ZWFtIiwgImVtYWlsIjogImRhbmllbEBob2xpLnRlYW0iLCAiaWRlbnRpdHkiOiAiZGFuaWVsaG9saS50ZWFtIiwgImF2YXRhciI6ICJodHRwczovL2lrLmltYWdla2l0LmlvL2hvbGkvX0RFVl8vYXZhdGFyLzE0OWEzZDI5LWEwZGEtNGY1Mi04ZTFjLTY2MDM3NmI3MDg0Yl95djMxVVc3bEQwLmpwZyJ9fQ==', - 'attributes': { - 'eventType': 'UserNameUpdated', - 'eventVersion': '1.0.0' - }, - 'messageId': '12833783708309476', - 'publishTime': '2024-12-07T16:21:48.022Z' - } + +user_payload = { + 'id': '149a3d29-a0da-4f52-8e1c-660376b7084b', + 'name': 'Daniel holi team', + 'email': 'daniel@holi.team', + 'identity': 'danielholi.team', + 'avatar': 'https://ik.imagekit.io/holi/_DEV_/avatar/149a3d29-a0da-4f52-8e1c-660376b7084b_yv31UW7lD0.jpg', +} + +def message_data(event_type, event_version, data): + return { + 'message': { + '@type': 'type.googleapis.com/google.pubsub.v1.PubsubMessage', + 'data': base64.b64encode(json.dumps(data).encode('utf-8')), + 'attributes': { + 'eventType': event_type, + 'eventVersion': event_version, + }, + 'messageId': '12833783708309476', + 'publishTime': '2024-12-07T16:21:48.022Z' + } } def test_process_message(): - process_message(CloudEvent(attributes, data)) - + process_message(CloudEvent(attributes, message_data('UserNameUpdated', '1.0.0', {'user': user_payload}))) + process_message(CloudEvent(attributes, message_data('UserDeleted', '1.0.0', {'user': user_payload}))) if __name__ == '__main__': test_process_message() \ No newline at end of file