Skip to content
Snippets Groups Projects
Commit 1d5c6728 authored by Daniel Bimschas's avatar Daniel Bimschas
Browse files

Merge branch 'main' into NOISSUE_pub-sub-refactorings-pt2

parents e6a7b2c2 ba206b37
No related branches found
No related tags found
No related merge requests found
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="All Tests" type="tests" factoryName="py.test">
<module name="holi-okuna" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="net.ashald.envfile">
<option name="IS_ENABLED" value="true" />
<option name="IS_SUBST" value="false" />
<option name="IS_PATH_MACRO_SUPPORTED" value="false" />
<option name="IS_IGNORE_MISSING_FILES" value="false" />
<option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
<ENTRIES>
<ENTRY IS_ENABLED="true" PARSER="runconfig" IS_EXECUTABLE="false" />
<ENTRY IS_ENABLED="true" PARSER="env" IS_EXECUTABLE="false" PATH=".envrc.local" />
</ENTRIES>
</EXTENSION>
<option name="_new_keywords" value="&quot;&quot;" />
<option name="_new_parameters" value="&quot;&quot;" />
<option name="_new_additionalArguments" value="&quot;&quot;" />
<option name="_new_target" value="&quot;&quot;" />
<option name="_new_targetType" value="&quot;CUSTOM&quot;" />
<method v="2" />
</configuration>
</component>
\ No newline at end of file
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Test GraphQL" type="tests" factoryName="py.test">
<module name="holi-okuna" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<EXTENSION ID="net.ashald.envfile">
<option name="IS_ENABLED" value="true" />
<option name="IS_SUBST" value="false" />
<option name="IS_PATH_MACRO_SUPPORTED" value="false" />
<option name="IS_IGNORE_MISSING_FILES" value="false" />
<option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
<ENTRIES>
<ENTRY IS_ENABLED="true" PARSER="runconfig" IS_EXECUTABLE="false" />
<ENTRY IS_ENABLED="true" PARSER="env" IS_EXECUTABLE="false" PATH=".envrc.local" />
</ENTRIES>
</EXTENSION>
<option name="_new_keywords" value="&quot;&quot;" />
<option name="_new_parameters" value="&quot;&quot;" />
<option name="_new_additionalArguments" value="&quot;&quot;" />
<option name="_new_target" value="&quot;test_graphql.TestUsers&quot;" />
<option name="_new_targetType" value="&quot;PYTHON&quot;" />
<method v="2" />
</configuration>
</component>
\ No newline at end of file
......@@ -3,6 +3,8 @@ from google.auth.credentials import AnonymousCredentials
from openbook import settings
import structlog
from google.cloud.pubsub_v1.types import PublisherOptions
from openbook_common.pubsub.event import Event
from openbook_common.pubsub.wrappers import skip_test_events
......@@ -12,6 +14,10 @@ GOOGLE_CLOUD_PROJECT_ID = settings.GOOGLE_CLOUD_PROJECT_ID
ENVIRONMENT_ID = settings.ENVIRONMENT_ID
PUBLISH_TIMEOUT_SECONDS = 15
# subscribers can choose to consume messages in order. using a single ordering key because it gives the highest
# guarantees, and we currently don't have scale issues that would justify relaxing guarantees.
ORDERING_KEY = "okuna"
@skip_test_events
def publish_event(topic: str, event: Event) -> None:
......@@ -20,13 +26,14 @@ def publish_event(topic: str, event: Event) -> None:
Awaits the result of the publish operation and logs any errors.
"""
try:
publisher_options = PublisherOptions(enable_message_ordering=True)
publisher = (
pubsub_v1.PublisherClient(credentials=AnonymousCredentials())
pubsub_v1.PublisherClient(credentials=AnonymousCredentials(), publisher_options=publisher_options)
if ENVIRONMENT_ID == "local"
else pubsub_v1.PublisherClient()
else pubsub_v1.PublisherClient(publisher_options=publisher_options)
)
topic_path = publisher.topic_path(GOOGLE_CLOUD_PROJECT_ID, topic)
future = publisher.publish(topic_path, **event.to_dict())
future = publisher.publish(topic_path, ordering_key=ORDERING_KEY, **event.to_dict())
try:
future.result(PUBLISH_TIMEOUT_SECONDS)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment