본문 바로가기

GCP

CGP composer(Airflow) 알림 teams로 받기

gcp airflow dag의 실패시 teams 로 알림을 받으려고 하였으나 . 제공되는 provider list에 메시져는 slack, telegram, discord 밖에 없었다 

https://airflow.apache.org/docs/#providers-packages-docs-apache-airflow-providers-index-html

 

Documentation

Platform created by the community to programmatically author, schedule and monitor workflows.

airflow.apache.org

 

해서 여기 저기 찾아 보니 teams 로 알림 보내는 plugins을 만든분이 있어 github에서 소스를 다운 받았다

https://github.com/astronomer/cs-tutorial-msteams-callbacks

혹시 이소스 사용이 문제라면 글삭제 하겟습니다. 

 

이 소스를  composer 에 적용시키려면 airflow의 dag 폴더가 있는 gsc에 plugins 폴더안에 넣어야 인식이 된다.

제경우 원래 폴더명이 아닌 noti 폴더에 넣는걸로 소스수정했습니다.

 

airflow 를 로컬에 설치한 경우는 web ui, scheduler를 restart해야 인식이 되지만. gcp의 경우 plugins 폴더에 위치 시키면 자동으로 인식이 됩니다. 

 

그다음 팀즈에서 팀을 만들고 

아래 처럼 채널링크 받기를 눌러 링크 정보를 복사해 둡니다. 

 

위에서 복사한 링크값을 가지고

airflow  menu > Admin > Connections 에서 

 

connection Type : HTTP 로 하여 생성해 줍니다. 

저는 teams_conn_id로 생성을 했습니다. 이값은 소스에서도 변경해주세요. http_conn_id 부분의 값입니다.  

 이 connection 정보로 teams로 메시지를 보낼수 있습니다.

 

테스트용 dag를 아래와 같이 만들었습니다.

bigquery의 sp 호출코드인데. 

일부러 오류가 발생하게 sp를 작성했습니다.

on_failure_callback 이부분에 추가한 플러그인의 호출 함수명을 넣습니다.

from airflow import models
from airflow.models.variable import Variable

from airflow.providers.google.cloud.operators.bigquery import  BigQueryInsertJobOperator
from airflow.utils.dates import days_ago
from noti.teams import ms_teams_callback_functions


project_id = "{{var.value.project_id}}"
dataset = "DEV_DA_TM"
sp_name = "sp_run_test"
#{{var.value.service_account_email}}
# Set default arguments for the DAG
default_args = {
    "start_date": days_ago(0),    
    "on_failure_callback": ms_teams_callback_functions.failure_callback,
    'depends_on_past': True,  
    'wait_for_downstream': True, 
    'retries': 0,
}


# Create the DAG
with models.DAG(dag_id="bigquery_sp_call_sample",
                schedule_interval=None, 
                default_args=default_args) as dag:
    
    call_stored_procedure = BigQueryInsertJobOperator(
        task_id="call_stored_procedure",
        project_id=project_id,
        configuration={
            "query": {
                "query": f"CALL {dataset}.{sp_name}(); ",
                "useLegacySql": False,
            }
        },        
    )
    call_stored_procedure

 

 

실행 결과는

ui 상 실패 

teams message

 

오류 메시지가 팀즈로 전송이 됩니다.

 

'GCP' 카테고리의 다른 글

GCP composer(airflow) 에서 jdbc to jdbc 데이터 적재하기  (0) 2024.04.05