11# Copyright (c) 2015 [email protected] 22# Distributed under the terms of the Modified BSD License.
3- try :
4- from asyncio import Future
5- except ImportError :
6-
7- class Future (object ):
8- """A class nothing will use."""
3+ import asyncio
4+ import inspect
5+ import requests
96
107
11- import requests
128from ipykernel .ipkernel import IPythonKernel
139from hdijupyterutils .ipythondisplay import IpythonDisplay
1410
@@ -17,6 +13,55 @@ class Future(object):
1713from sparkmagic .livyclientlib .exceptions import wrap_unexpected_exceptions
1814from sparkmagic .kernels .wrapperkernel .usercodeparser import UserCodeParser
1915
16+ # NOTE: This is a (hopefully) temporary workaround to accommodate async do_execute in ipykernel>=6
17+ import nest_asyncio
18+
19+
20+ # NOTE: This is a (hopefully) temporary workaround to accommodate async do_execute in ipykernel>=6
21+ # Taken from: https://github.com/jupyter/notebook/blob/eb3a1c24839205afcef0ba65ace2309d38300a2b/notebook/utils.py#L332
22+ def run_sync (maybe_async ):
23+ """If async, runs maybe_async and blocks until it has executed,
24+ possibly creating an event loop.
25+ If not async, just returns maybe_async as it is the result of something
26+ that has already executed.
27+ Parameters
28+ ----------
29+ maybe_async : async or non-async object
30+ The object to be executed, if it is async.
31+ Returns
32+ -------
33+ result :
34+ Whatever the async object returns, or the object itself.
35+ """
36+ if not inspect .isawaitable (maybe_async ):
37+ # that was not something async, just return it
38+ return maybe_async
39+ # it is async, we need to run it in an event loop
40+
41+ def wrapped ():
42+ create_new_event_loop = False
43+ result = None
44+ loop = None
45+ try :
46+ loop = asyncio .get_event_loop ()
47+ except RuntimeError :
48+ create_new_event_loop = True
49+ else :
50+ if loop .is_closed ():
51+ create_new_event_loop = True
52+ if create_new_event_loop :
53+ loop = asyncio .new_event_loop ()
54+ asyncio .set_event_loop (loop )
55+ try :
56+ result = loop .run_until_complete (maybe_async )
57+ except RuntimeError as e :
58+ if str (e ) == "This event loop is already running" :
59+ # just return a Future, hoping that it will be awaited
60+ result = asyncio .ensure_future (maybe_async )
61+ return result
62+
63+ return wrapped ()
64+
2065
2166class SparkKernelBase (IPythonKernel ):
2267 def __init__ (
@@ -40,7 +85,16 @@ def __init__(
4085 # Override
4186 self .session_language = session_language
4287
43- super (SparkKernelBase , self ).__init__ (** kwargs )
88+ # NOTE: This is a (hopefully) temporary workaround to accommodate async do_execute in ipykernel>=6
89+ # Patch loop.run_until_complete as early as possible
90+ try :
91+ nest_asyncio .apply ()
92+ except RuntimeError :
93+ # nest_asyncio requires a running loop in order to patch.
94+ # In tests the loop may not have been created yet.
95+ pass
96+
97+ super ().__init__ (** kwargs )
4498
4599 self .logger = SparkLog ("{}_jupyter_kernel" .format (self .session_language ))
46100 self ._fatal_error = None
@@ -54,11 +108,15 @@ def __init__(
54108 # Disable warnings for test env in HDI
55109 requests .packages .urllib3 .disable_warnings ()
56110
57- if not kwargs .get ("testing" , False ):
58- self ._load_magics_extension ()
59- self ._change_language ()
60- if conf .use_auto_viz ():
61- self ._register_auto_viz ()
111+ # Do not load magics in testing
112+ if kwargs .get ("testing" , False ):
113+ return
114+
115+ # Load magics on init
116+ self ._load_magics_extension ()
117+ self ._change_language ()
118+ if conf .use_auto_viz ():
119+ self ._register_auto_viz ()
62120
63121 def do_execute (
64122 self , code , silent , store_history = True , user_expressions = None , allow_stdin = False
@@ -71,7 +129,9 @@ def f(self):
71129 code , silent , store_history , user_expressions , allow_stdin
72130 )
73131
74- return wrap_unexpected_exceptions (f , self ._complete_cell )(self )
132+ # Execute the code and handle exceptions
133+ wrapped = wrap_unexpected_exceptions (f , self ._complete_cell )
134+ return wrapped (self )
75135
76136 def do_shutdown (self , restart ):
77137 # Cleanup
@@ -164,30 +224,50 @@ def _execute_cell(
164224 def _execute_cell_for_user (
165225 self , code , silent , store_history = True , user_expressions = None , allow_stdin = False
166226 ):
167- result = super (SparkKernelBase , self ).do_execute (
227+ result = super ().do_execute (
168228 code , silent , store_history , user_expressions , allow_stdin
169229 )
170- if isinstance (result , Future ):
171- result = result .result ()
230+
231+ # In ipykernel 6, this returns native asyncio coroutine
232+ if asyncio .iscoroutine (result ):
233+ return run_sync (result )
234+
235+ # In ipykernel 5, this returns gen.coroutine
236+ if asyncio .isfuture (result ):
237+ return result .result ()
238+
239+ # In ipykernel 4, this func is synchronous
172240 return result
173241
174242 def _do_shutdown_ipykernel (self , restart ):
175- return super (SparkKernelBase , self ).do_shutdown (restart )
243+ result = super ().do_shutdown (restart )
244+
245+ # In tests, super() calls this SparkKernelBase.do_shutdown, which is async
246+ if asyncio .iscoroutine (result ):
247+ return run_sync (result )
248+
249+ return result
176250
177251 def _complete_cell (self ):
178- """A method that runs a cell with no effect. Call this and return the value it
179- returns when there's some sort of error preventing the user's cell from executing; this
180- will register the cell from the Jupyter UI as being completed."""
252+ """A method that runs a cell with no effect.
253+
254+ Call this and return the value it returns when there's some sort
255+ of error preventing the user's cell from executing; this will
256+ register the cell from the Jupyter UI as being completed.
257+ """
181258 return self ._execute_cell ("None" , False , True , None , False )
182259
183260 def _show_user_error (self , message ):
184261 self .logger .error (message )
185262 self .ipython_display .send_error (message )
186263
187264 def _queue_fatal_error (self , message ):
188- """Queues up a fatal error to be thrown when the next cell is executed; does not
189- raise an error immediately. We use this for errors that happen on kernel startup,
190- since IPython crashes if we throw an exception in the __init__ method."""
265+ """Queues up a fatal error to be thrown when the next cell is executed;
266+ does not raise an error immediately.
267+
268+ We use this for errors that happen on kernel startup, since
269+ IPython crashes if we throw an exception in the __init__ method.
270+ """
191271 self ._fatal_error = message
192272
193273 def _abort_with_fatal_error (self , message ):
0 commit comments