Tengo que activar una pyspark módulo de flujo de aire mediante un sparksubmit operador. Pero, la pyspark módulo necesita tomar la chispa de la variable de sesión como un argumento. He utilizado application_args para pasar el parámetro para la pyspark módulo. Pero, cuando me encontré con el dag la chispa presentar operador es llegar fallado y el parámetro de los que pasaron por considerarse como Ninguno tipo de variable. Necesita saber cómo pasar de un argumento a una pyspark módulo activa a través de spark_submit_operator.
El DAG código es el siguiente:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PRJT").enableHiveSupport().getOrCreate()
spark_config = {
'conn_id': 'spark_default',
'driver_memory': '1g',
'executor_cores': 1,
'num_executors': 1,
'executor_memory': '1g'
}
dag = DAG(
dag_id="spark_session_prgm",
default_args=default_args,
schedule_interval='@daily',
catchup=False)
spark_submit_task1 = SparkSubmitOperator(
task_id='spark_submit_task1',
application='/home/airflow_home/dags/tmp_spark_1.py',
application_args=['spark'],
**spark_config, dag=dag)
El código de ejemplo en tmp_spark_1.py programa: