global_event_loop: return running loop for current thread

Like asyncio.get_event_loop(), return the running loop for the
current thread if there is one, and otherwise construct a new
one if needed. This allows the _safe_loop function to become
synonymous with the global_event_loop function.

For the case of "loop running in non-main thread" of API
consumer, this change makes portage compatible with PEP
492 coroutines with async and await syntax. Portage
internals can safely begin using async / await syntax instead
of compat_coroutine.

Bug: https://bugs.gentoo.org/763339
Signed-off-by: Zac Medico <zmedico@gentoo.org>
diff --git a/lib/portage/util/_eventloop/global_event_loop.py b/lib/portage/util/_eventloop/global_event_loop.py
index 4130111..cb7a130 100644
--- a/lib/portage/util/_eventloop/global_event_loop.py
+++ b/lib/portage/util/_eventloop/global_event_loop.py
@@ -1,28 +1,6 @@
-# Copyright 2012-2020 Gentoo Authors
+# Copyright 2012-2021 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
-import portage
-from portage.util._eventloop.asyncio_event_loop import AsyncioEventLoop
+__all__ = ('global_event_loop',)
 
-_instances = {}
-
-
-def global_event_loop():
-	"""
-	Get a global EventLoop (or compatible object) instance which
-	belongs exclusively to the current process.
-	"""
-
-	pid = portage.getpid()
-	instance = _instances.get(pid)
-	if instance is not None:
-		return instance
-
-	constructor = AsyncioEventLoop
-
-	# Use the _asyncio_wrapper attribute, so that unit tests can compare
-	# the reference to one retured from _wrap_loop(), since they should
-	# not close the loop if it refers to a global event loop.
-	instance = constructor()._asyncio_wrapper
-	_instances[pid] = instance
-	return instance
+from portage.util.futures._asyncio import _safe_loop as global_event_loop
diff --git a/lib/portage/util/futures/_asyncio/__init__.py b/lib/portage/util/futures/_asyncio/__init__.py
index d39f317..5590963 100644
--- a/lib/portage/util/futures/_asyncio/__init__.py
+++ b/lib/portage/util/futures/_asyncio/__init__.py
@@ -1,4 +1,4 @@
-# Copyright 2018-2020 Gentoo Authors
+# Copyright 2018-2021 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
 __all__ = (
@@ -37,9 +37,6 @@
 	'portage.util.futures:compat_coroutine@_compat_coroutine',
 )
 from portage.util._eventloop.asyncio_event_loop import AsyncioEventLoop as _AsyncioEventLoop
-from portage.util._eventloop.global_event_loop import (
-	global_event_loop as _global_event_loop,
-)
 # pylint: disable=redefined-builtin
 from portage.util.futures.futures import (
 	CancelledError,
@@ -238,7 +235,7 @@
 	# The default loop returned by _wrap_loop should be consistent
 	# with global_event_loop, in order to avoid accidental registration
 	# of callbacks with a loop that is not intended to run.
-	loop = loop or _global_event_loop()
+	loop = loop or _safe_loop()
 	return (loop if hasattr(loop, '_asyncio_wrapper')
 		else _AsyncioEventLoop(loop=loop))
 
@@ -267,13 +264,15 @@
 	@rtype: asyncio.AbstractEventLoop (or compatible)
 	@return: event loop instance
 	"""
-	if portage._internal_caller or threading.current_thread() is threading.main_thread():
-		return _global_event_loop()
+	loop = _get_running_loop()
+	if loop is not None:
+		return loop
 
 	thread_key = threading.get_ident()
 	with _thread_weakrefs.lock:
 		if _thread_weakrefs.pid != portage.getpid():
 			_thread_weakrefs.pid = portage.getpid()
+			_thread_weakrefs.mainloop = None
 			_thread_weakrefs.loops = weakref.WeakValueDictionary()
 		try:
 			loop = _thread_weakrefs.loops[thread_key]
@@ -283,9 +282,23 @@
 			except RuntimeError:
 				_real_asyncio.set_event_loop(_real_asyncio.new_event_loop())
 			loop = _thread_weakrefs.loops[thread_key] = _AsyncioEventLoop()
+
+	if _thread_weakrefs.mainloop is None and threading.current_thread() is threading.main_thread():
+		_thread_weakrefs.mainloop = loop
+
 	return loop
 
 
+def _get_running_loop():
+	with _thread_weakrefs.lock:
+		if _thread_weakrefs.pid == portage.getpid():
+			try:
+				loop = _thread_weakrefs.loops[threading.get_ident()]
+			except KeyError:
+				return None
+			return loop if loop.is_running() else None
+
+
 def _thread_weakrefs_atexit():
 	with _thread_weakrefs.lock:
 		if _thread_weakrefs.pid == portage.getpid():
@@ -297,6 +310,5 @@
 				else:
 					loop.close()
 
-
-_thread_weakrefs = types.SimpleNamespace(lock=threading.Lock(), loops=None, pid=None)
+_thread_weakrefs = types.SimpleNamespace(lock=threading.Lock(), loops=None, mainloop=None, pid=None)
 portage.process.atexit_register(_thread_weakrefs_atexit)