요약

  1. airflow는 자동으로 dags 폴더와 plugins 폴더를 경로에 추가함
  2. local과 airflow 경로 불일치 오류를 해결하기 위해 Local.env 파일 생성
  3. PYTHON_PATH 지정

1. Python 모듈 경로 이해

모듈 경로 예시

  • from airflow.operators.python import PythonOperator의 경로는??
  • airflow 폴더 아래 operators 폴더 아래 python 파일 아래에 있는 PythonOperator 클래스를 가져옴

현재 python 경로 확인

  • Python은 sys.path 변수를 통해 모듈의 경로 확인 가능
    • 이 때, '' 는 현재 실행하는 파이썬 파일과 동일한 경로를 의미함
    • 그 외는 Pip로 설치한 라이브러리가 설치된 경로
import sys
from pprint import pprint
 
pprint(sys.path)

경로 추가 방법

  • Dag 에서 외부 함수를 가져오기 위해서는 sys.path 변수에 경로를 추가해야 함
    • sys.path 변수 값은 list인데, 사용자 경로를 명시적으로 append 가능
    • 환경변수 설정 가능
    • 단, 위 2가지 경우는 매우 귀찮음!!
  • Airflow는 자동으로 dags와 plugins 폴더를 sys.path에 추가함

2. plugins 폴더 활용

외부함수 작성의 장점

  • 공통함수 작성
  • 재활용성 증가
  • DAG 깔끔

사용 방법

  • from common.common_func import get_sftp()
  • 이는 plugins 폴더 아래 common 폴더 아래 common_func 파일 내 get_sftp() 작성된 함수를 가져옴

오류 원인

  1. vscode 내 오류
    1. Why? python 인터프리터는 기본적으로 최상위 폴더인 airflow 폴더를 path로 지정
    2. 이로 인해 from plugins.common.common_func 까지 작성해줘야함
  2. airflow 오류
    1. 그러나!! 위와 같이 작성하면 local은 정상작동 하지만, airflow 가상환경에서는 오류가 발생함
    • Why? airflow는 이미 plugins폴더가 path로 잡힘!
    • 따라서, from common.common_func import get_sftp() 로 작성해야함

해결 방안

vscode 내 .env 파일 생성하여 경로 변수 추가

  • WORKSPACE_FOLDER : 프로젝트 최상위 경로
  • PYTHONPATH : plugins 폴더 경로
WORKSPACE_FOLDER = /Users/haejun/airflow
PYTHONPATH = ${WORKSPACE_FOLDER}/plugins

번외) PYTHON PATH

  1. VSCODE의 PYTHONPATH
    1. Open folder로 연 디렉토리를 최상위로써 인식하고 그 디렉토리는 기본적으로 PYTHONPATH 로 인식함
    2. 바로 아래의 plugins 폴더는 PYTHONPATH에 잡히지 않기 때문에 새롭게 추가함
  2. AIRFLOW의 PYTHONPATH
    1. Airflow는 기본적으로 $AIRFLOW_HOME 경로의 dags 폴더와 plugins 폴더, config 폴더를 PYTHONPATH에 등록합니다.

3. 파이썬 외부 함수 실행

from airflow import DAG
import pendulum
import datetime
from airflow.operators.python import PythonOperator
from common.common_func import get_sftp
 
with DAG(
	dag_id="dags_python_import_func",
	schedule="23 8 * * *",
	start_date=pendulum.datetime(2024, 8, 22, tz = "Asia/Seoul"),
	catchup=False
	) as dag:
 
	task_get_sftp = PythonOperator(
		task_id = 'task_get_sftp',
		python_callable=get_sftp
		)
 
	task_get_sftp