I was going through Dagster tutorials and thought it be a good exercise to connect to my local mongodb.
from dagster import get_dagster_logger, job, op
from pymongo import MongoClient
@op
def connection():
client = MongoClient("mongodb://localhost:27017/")
return client["development"]
@job
def execute():
client = connection()
get_dagster_logger().info(f"Connection: {client} ")
Dagster error:
dagster.core.errors.DagsterExecutionHandleOutputError: Error occurred while handling output "result" of step "connection":
File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/execute_plan.py", line 232, in dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py", line 348, in core_dagster_event_sequence_for_step
for evt in _type_check_and_store_output(step_context, user_event, input_lineage):
File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py", line 405, in _type_check_and_store_output
for evt in _store_output(step_context, step_output_handle, output, input_lineage):
File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py", line 534, in _store_output
for elt in iterate_with_context(
File "/usr/local/lib/python3.9/site-packages/dagster/utils/__init__.py", line 400, in iterate_with_context
return
File "/usr/local/Cellar/[email protected]/3.9.12/Frameworks/Python.framework/Versions/3.9/lib/python3.9/contextlib.py", line 137, in __exit__
self.gen.throw(typ, value, traceback)
File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/utils.py", line 73, in solid_execution_error_boundary
raise error_cls(
The above exception was caused by the following exception:
TypeError: cannot pickle '_thread.lock' object
File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/utils.py", line 47, in solid_execution_error_boundary
yield
File "/usr/local/lib/python3.9/site-packages/dagster/utils/__init__.py", line 398, in iterate_with_context
next_output = next(iterator)
File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py", line 524, in _gen_fn
gen_output = output_manager.handle_output(output_context, output.value)
File "/usr/local/lib/python3.9/site-packages/dagster/core/storage/fs_io_manager.py", line 124, in handle_output
pickle.dump(obj, write_obj, PICKLE_PROTOCOL)
I have tested this locally in a ipython and it works so the issue is related to dagster.
CodePudding user response:
The default IOManager requires that inputs and outputs to ops be pickleable - it's likely that your MongoClient is not. You might want to try refactoring this to use Dagster's @resource method. This allows you to define resources externally to your @op, and makes mocking those resources later in tests really easy. You code would look something like this:
from dagster import get_dagster_logger, job, op, resource
from pymongo import MongoClient
@resource
def mongo_client():
client = MongoClient("mongodb://localhost:27017/")
return client["development"]
@op(
required_resource_keys={'mongo_client'}
)
def test_client(context):
client = context.resources.mongo_client
get_dagster_logger().info(f"Connection: {client} ")
@job(
resource_defs={'mongo_client': mongo_client}
)
def execute():
test_client()
Notice too that I moved the testing code into another @op, and then only called that op from within the execute @job. This is because the code within a job definition gets compiled at load time, and is only used to describe the graph of ops to execute. All general programming to carry out tasks needs to be contained within @op code.
The really neat thing about the @resource pattern is that this makes testing with mock resources or more generally swapping resources incredibly easy. Lets say you wanted a mocked client so you could run your job code without actually hitting the database. You could do something like the following:
@resource
def mocked_mongo_client():
from unittest.mock import MagicMock
return MagicMock()
@graph
def execute_graph():
test_client()
execute_live = execute_graph.to_job(name='execute_live',
resource_defs={'mongo_client': mongo_client,})
execute_mocked = execute_graph.to_job(name='execute_mocked',
resource_defs={'mongo_client': mocked_mongo_client,})
This uses Dagster's @graph pattern to describe a DAG of ops, then use the .to_job()
method on the GraphDefinition object to configure the graph in different ways. This way you can have the same exact underlying op structure, but pass different resources, tags, executors, etc.