????JFIF??x?x????'
| Server IP : 172.67.174.47  /  Your IP : 216.73.216.113 Web Server : LiteSpeed System : Linux premium151.web-hosting.com 4.18.0-553.44.1.lve.el8.x86_64 #1 SMP Thu Mar 13 14:29:12 UTC 2025 x86_64 User : tempvsty ( 647) PHP Version : 8.0.30 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /proc/thread-self/root/./opt/hc_python/lib/python3.12/site-packages/greenlet/tests/ | 
| Upload File : | 
from __future__ import print_function
import sys
import greenlet
import unittest
from . import TestCase
from . import PY312
# https://discuss.python.org/t/cpython-3-12-greenlet-and-tracing-profiling-how-to-not-crash-and-get-correct-results/33144/2
DEBUG_BUILD_PY312 = (
    PY312 and hasattr(sys, 'gettotalrefcount'),
    "Broken on debug builds of Python 3.12"
)
class SomeError(Exception):
    pass
class GreenletTracer(object):
    oldtrace = None
    def __init__(self, error_on_trace=False):
        self.actions = []
        self.error_on_trace = error_on_trace
    def __call__(self, *args):
        self.actions.append(args)
        if self.error_on_trace:
            raise SomeError
    def __enter__(self):
        self.oldtrace = greenlet.settrace(self)
        return self.actions
    def __exit__(self, *args):
        greenlet.settrace(self.oldtrace)
class TestGreenletTracing(TestCase):
    """
    Tests of ``greenlet.settrace()``
    """
    def test_a_greenlet_tracing(self):
        main = greenlet.getcurrent()
        def dummy():
            pass
        def dummyexc():
            raise SomeError()
        with GreenletTracer() as actions:
            g1 = greenlet.greenlet(dummy)
            g1.switch()
            g2 = greenlet.greenlet(dummyexc)
            self.assertRaises(SomeError, g2.switch)
        self.assertEqual(actions, [
            ('switch', (main, g1)),
            ('switch', (g1, main)),
            ('switch', (main, g2)),
            ('throw', (g2, main)),
        ])
    def test_b_exception_disables_tracing(self):
        main = greenlet.getcurrent()
        def dummy():
            main.switch()
        g = greenlet.greenlet(dummy)
        g.switch()
        with GreenletTracer(error_on_trace=True) as actions:
            self.assertRaises(SomeError, g.switch)
            self.assertEqual(greenlet.gettrace(), None)
        self.assertEqual(actions, [
            ('switch', (main, g)),
        ])
    def test_set_same_tracer_twice(self):
        # https://github.com/python-greenlet/greenlet/issues/332
        # Our logic in asserting that the tracefunction should
        # gain a reference was incorrect if the same tracefunction was set
        # twice.
        tracer = GreenletTracer()
        with tracer:
            greenlet.settrace(tracer)
class PythonTracer(object):
    oldtrace = None
    def __init__(self):
        self.actions = []
    def __call__(self, frame, event, arg):
        # Record the co_name so we have an idea what function we're in.
        self.actions.append((event, frame.f_code.co_name))
    def __enter__(self):
        self.oldtrace = sys.setprofile(self)
        return self.actions
    def __exit__(self, *args):
        sys.setprofile(self.oldtrace)
def tpt_callback():
    return 42
class TestPythonTracing(TestCase):
    """
    Tests of the interaction of ``sys.settrace()``
    with greenlet facilities.
    NOTE: Most of this is probably CPython specific.
    """
    maxDiff = None
    def test_trace_events_trivial(self):
        with PythonTracer() as actions:
            tpt_callback()
        # If we use the sys.settrace instead of setprofile, we get
        # this:
        # self.assertEqual(actions, [
        #     ('call', 'tpt_callback'),
        #     ('call', '__exit__'),
        # ])
        self.assertEqual(actions, [
            ('return', '__enter__'),
            ('call', 'tpt_callback'),
            ('return', 'tpt_callback'),
            ('call', '__exit__'),
            ('c_call', '__exit__'),
        ])
    def _trace_switch(self, glet):
        with PythonTracer() as actions:
            glet.switch()
        return actions
    def _check_trace_events_func_already_set(self, glet):
        actions = self._trace_switch(glet)
        self.assertEqual(actions, [
            ('return', '__enter__'),
            ('c_call', '_trace_switch'),
            ('call', 'run'),
            ('call', 'tpt_callback'),
            ('return', 'tpt_callback'),
            ('return', 'run'),
            ('c_return', '_trace_switch'),
            ('call', '__exit__'),
            ('c_call', '__exit__'),
        ])
    def test_trace_events_into_greenlet_func_already_set(self):
        def run():
            return tpt_callback()
        self._check_trace_events_func_already_set(greenlet.greenlet(run))
    def test_trace_events_into_greenlet_subclass_already_set(self):
        class X(greenlet.greenlet):
            def run(self):
                return tpt_callback()
        self._check_trace_events_func_already_set(X())
    def _check_trace_events_from_greenlet_sets_profiler(self, g, tracer):
        g.switch()
        tpt_callback()
        tracer.__exit__()
        self.assertEqual(tracer.actions, [
            ('return', '__enter__'),
            ('call', 'tpt_callback'),
            ('return', 'tpt_callback'),
            ('return', 'run'),
            ('call', 'tpt_callback'),
            ('return', 'tpt_callback'),
            ('call', '__exit__'),
            ('c_call', '__exit__'),
        ])
    def test_trace_events_from_greenlet_func_sets_profiler(self):
        tracer = PythonTracer()
        def run():
            tracer.__enter__()
            return tpt_callback()
        self._check_trace_events_from_greenlet_sets_profiler(greenlet.greenlet(run),
                                                             tracer)
    def test_trace_events_from_greenlet_subclass_sets_profiler(self):
        tracer = PythonTracer()
        class X(greenlet.greenlet):
            def run(self):
                tracer.__enter__()
                return tpt_callback()
        self._check_trace_events_from_greenlet_sets_profiler(X(), tracer)
    @unittest.skipIf(*DEBUG_BUILD_PY312)
    def test_trace_events_multiple_greenlets_switching(self):
        tracer = PythonTracer()
        g1 = None
        g2 = None
        def g1_run():
            tracer.__enter__()
            tpt_callback()
            g2.switch()
            tpt_callback()
            return 42
        def g2_run():
            tpt_callback()
            tracer.__exit__()
            tpt_callback()
            g1.switch()
        g1 = greenlet.greenlet(g1_run)
        g2 = greenlet.greenlet(g2_run)
        x = g1.switch()
        self.assertEqual(x, 42)
        tpt_callback() # ensure not in the trace
        self.assertEqual(tracer.actions, [
            ('return', '__enter__'),
            ('call', 'tpt_callback'),
            ('return', 'tpt_callback'),
            ('c_call', 'g1_run'),
            ('call', 'g2_run'),
            ('call', 'tpt_callback'),
            ('return', 'tpt_callback'),
            ('call', '__exit__'),
            ('c_call', '__exit__'),
        ])
    @unittest.skipIf(*DEBUG_BUILD_PY312)
    def test_trace_events_multiple_greenlets_switching_siblings(self):
        # Like the first version, but get both greenlets running first
        # as "siblings" and then establish the tracing.
        tracer = PythonTracer()
        g1 = None
        g2 = None
        def g1_run():
            greenlet.getcurrent().parent.switch()
            tracer.__enter__()
            tpt_callback()
            g2.switch()
            tpt_callback()
            return 42
        def g2_run():
            greenlet.getcurrent().parent.switch()
            tpt_callback()
            tracer.__exit__()
            tpt_callback()
            g1.switch()
        g1 = greenlet.greenlet(g1_run)
        g2 = greenlet.greenlet(g2_run)
        # Start g1
        g1.switch()
        # And it immediately returns control to us.
        # Start g2
        g2.switch()
        # Which also returns. Now kick of the real part of the
        # test.
        x = g1.switch()
        self.assertEqual(x, 42)
        tpt_callback() # ensure not in the trace
        self.assertEqual(tracer.actions, [
            ('return', '__enter__'),
            ('call', 'tpt_callback'),
            ('return', 'tpt_callback'),
            ('c_call', 'g1_run'),
            ('call', 'tpt_callback'),
            ('return', 'tpt_callback'),
            ('call', '__exit__'),
            ('c_call', '__exit__'),
        ])
if __name__ == '__main__':
    unittest.main()