Рассмотрим ситуацию, когда вы пишете свой класс, наследуюетесь от класса библиотеки и вам потребовалось значение локальной переменной функции родительского класса.
Хороший пример: класс SSHOperator, метод execute(). https://airflow.apache.org/docs/apache-airflow/1.10.12/_modules/airflow/contrib/operators/ssh_operator.html#SSHOperator.execute
Данный метод использует в работе переменную exit_status, однако не возвращает её. Если для дальнейшей обработки вам потребуется получить её значение из дочернего класса, то можно воспользоваться следующим способом, через трейсбек.
from airflow.providers.ssh.operators.ssh import SSHOperator
import sys
import traceback
class MySSHOperator(SSHOperator):
def _run_ssh(self, context) -> tuple[Optional[str], dict]:
try:
ssh_result = super(MySSHOperator, self).execute(context)
except Exception as ex:
exc_type, exc_value, etraceback = sys.exc_info()
while etraceback:
if etraceback.tb_frame.f_locals.get('exit_status') is not None:
exit_status = etraceback.tb_frame.f_locals.get('exit_status')
logger.error(f"Грохнулось с кодом: {exit_status}")
break
etraceback = tb.tb_next
logger.error(e.__traceback__)
Через метод exc_info() получаем тип, значение и трейсбэк ошибки. Начиная с текущего кадра стека, осуществляем проход по всем обработанным/необработанным исключениям до тех пор, пока не обнаружим в словаре со значениями локальных переменных искомое имя.
Данный способ применим если надо реагировать надо на эксепшены. Если необходимо обрабатывать и успешное выполнение, то проще оверрайтнуть эту функцию в дочернем классе и сделать ее со своим блэкджеком со своей областью видимости переменных.
from airflow.providers.ssh.operators.ssh import SSHOperator
class MySSHOperator(SSHOperator):
def execute(self, context) -> Union[bytes, str, bool]:
#----
#exit_status = stdout.channel.recv_exit_status()
self.exit_status = stdout.channel.recv_exit_status()
#----
def _run_ssh(self, context) -> Tuple[Optional[str], dict]:
try:
ssh_result = super(MySSHOperator, self).execute(context)
except Exception as ex:
logger.error(f"Грохнулось с кодом: {self.exit_status}")
Или вытащить модуль описывающий родительский класс, импортировать его, и там поменять область видимости переменных.
# copy_ssh_operator.py
# Copy from airflow.providers.ssh.operators.ssh import SSHOperator
class SSHOperator(BaseOperator):
def execute(self, context) -> Union[bytes, str, bool]:
#----
#exit_status = stdout.channel.recv_exit_status()
self.exit_status = exit_status
#----
from copy_ssh_operator import SSHOperator
class MySSHOperator(SSHOperator):
def _run_ssh(self, context) -> Tuple[Optional[str], dict]:
try:
ssh_result = super(MySSHOperator, self).execute(context)
except Exception as ex:
logger.error(f"Грохнулось с кодом: {self.exit_status}")
Есть мнение что копипаста - это признак больших архитектурных проблем у разработчика и не будем разбирать вопрос правильности такого подхода. Поэтому, если требуется не переопределять ни методы, ни модули, ни классы, то решение с трейсбэком так же есть.
import traceback
import sys
from airflow.providers.ssh.operators.ssh import SSHOperator
class MySSHOperator(SSHOperator):
def tracer(self, frame,event,arg):
if frame.f_locals.get('exit_status'):
self.exit_status = frame.f_locals.get('exit_status')
return self.tracer
def run(self):
try:
save_trace = sys.gettrace()
sys.settrace(self.tracer)
sys.call_tracing(self.execute, ())
sys.settrace(save_trace)
except Exception as e:
exc_type, exc_value, etraceback = sys.exc_info()
while etraceback:
if etraceback.tb_frame.f_locals.get('exit_status') is not None:
exit_status = etraceback.tb_frame.f_locals.get('exit_status')
logger.error(f"Грохнулось с кодом: {exit_status}")
break
etraceback = etraceback.tb_next
logger.error(e.__traceback__)
Сохраняем стандартный обработчик(стр.13). Устанавливаем собственный обработчик ссылкой на функцию(стр.14), чтобы при каждом вызове подставлялась наша кастомная функция. После получения искомого значения переменной возвращаем стандартный обработчик(стр.15).
Данное решение содержит очевидные минусы: такая реализация является однопоточной , существенно замедляется скорость выполнения и не удастся использовать стандартный дебагер. Но глобально решение есть.
P.S.: у меня в прод пошло решение с оверрайтом функции. Понимаю что ситуации бывают разные, может кому пригодится.