Welcome toVigges Developer Community-Open, Learning,Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
957 views
in Technique[技术] by (71.8m points)

scala - Spark Shell Add Multiple Drivers/Jars to Classpath using spark-defaults.conf

We are using Spark-Shell REPL Mode to test various use-cases and connecting to multiple sources/sinks

We need to add custom drivers/jars in spark-defaults.conf file, I have tried to add multiple jars separated by comma

like

spark.driver.extraClassPath = /home/sandeep/mysql-connector-java-5.1.36.jar 
spark.executor.extraClassPath = /home/sandeep/mysql-connector-java-5.1.36.jar

But its not working, Can anyone please provide details for correct syntax

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Note: Verified in Linux Mint and Spark 3.0.1

If you are setting properties in spark-defaults.conf, spark will take those settings only when you submit your job using spark-submit.

Note: spark-shell and pyspark need to verify.

file: spark-defaults.conf

spark.driver.extraJavaOptions      -Dlog4j.configuration=file:log4j.properties -Dspark.yarn.app.container.log.dir=app-logs -Dlogfile.name=hello-spark
spark.jars.packages                 org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,org.apache.spark:spark-avro_2.12:3.0.1

In the terminal run your job say wordcount.py

spark-submit /path-to-file/wordcount.py

If you want to run your job in development mode from an IDE then you should use config() method. Here we will set Kafka jar packages and avro package. Also if you want to include log4j.properties, then use extraJavaOptions.

AppName and master can be provided in 2 way.

  1. use .appName() and .master()
  2. use .conf file

file: hellospark.py

from logger import Log4j
from util import get_spark_app_config
from pyspark.sql import SparkSession

# first approach.
spark = SparkSession.builder 
    .appName('Hello Spark') 
    .master('local[3]') 
    .config("spark.streaming.stopGracefullyOnShutdown", "true") 
    .config("spark.jars.packages", 
                 "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,
                  org.apache.spark:spark-avro_2.12:3.0.1") 
    .config("spark.driver.extraJavaOptions", 
                 "-Dlog4j.configuration=file:log4j.properties "
                 "-Dspark.yarn.app.container.log.dir=app-logs "
                 "-Dlogfile.name=hello-spark") 
    .getOrCreate()

# second approach.
conf = get_spark_app_config()
spark = SparkSession.builder 
    .config(conf=conf)
    .config("spark.jars.packages", 
                 "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1") 
    .getOrCreate()

logger = Log4j(spark)

file: logger.py

from pyspark.sql import SparkSession


class Log4j(object):
    def __init__(self, spark: SparkSession):
        conf = spark.sparkContext.getConf()
        app_name = conf.get("spark.app.name")
        log4j = spark._jvm.org.apache.log4j
        self.logger = log4j.LogManager.getLogger(app_name)

    def warn(self, message):
        self.logger.warn(message)

    def info(self, message):
        self.logger.info(message)

    def error(self, message):
        self.logger.error(message)

    def debug(self, message):
        self.logger.debug(message)

file: util.py

import configparser
from pyspark import SparkConf

def get_spark_app_config(enable_delta_lake=False):
    """
    It will read configuration from spark.conf file to create
    an instance of SparkConf(). Can be used to create
    SparkSession.builder.config(conf=conf).getOrCreate()
    :return: instance of SparkConf()
    """
    spark_conf = SparkConf()
    config = configparser.ConfigParser()
    config.read("spark.conf")

    for (key, value) in config.items("SPARK_APP_CONFIGS"):
        spark_conf.set(key, value))

    if enable_delta_lake:
        for (key, value) in config.items("DELTA_LAKE_CONFIGS"):
            spark_conf.set(key, value)
    return spark_conf

file: spark.conf

[SPARK_APP_CONFIGS]
spark.app.name = Hello Spark
spark.master = local[3]
spark.sql.shuffle.partitions = 3

[DELTA_LAKE_CONFIGS]
spark.jars.packages = io.delta:delta-core_2.12:0.7.0
spark.sql.extensions = io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog = org.apache.spark.sql.delta.catalog.DeltaCatalog

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to Vigges Developer Community for programmer and developer-Open, Learning and Share
...