## Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#__all__=["SparkConf"]importsysfromtypingimportDict,List,Optional,Tuple,cast,overloadfrompy4j.java_gatewayimportJVMView,JavaObjectfrompyspark.errorsimportPySparkRuntimeError
[docs]classSparkConf:""" Configuration for a Spark application. Used to set various Spark parameters as key-value pairs. Most of the time, you would create a SparkConf object with ``SparkConf()``, which will load values from `spark.*` Java system properties as well. In this case, any parameters you set directly on the :class:`SparkConf` object take priority over system properties. For unit tests, you can also call ``SparkConf(false)`` to skip loading external settings and get the same configuration no matter what the system properties are. All setter methods in this class support chaining. For example, you can write ``conf.setMaster("local").setAppName("My app")``. Parameters ---------- loadDefaults : bool whether to load values from Java system properties (True by default) _jvm : class:`py4j.java_gateway.JVMView` internal parameter used to pass a handle to the Java VM; does not need to be set by users _jconf : class:`py4j.java_gateway.JavaObject` Optionally pass in an existing SparkConf handle to use its parameters Notes ----- Once a SparkConf object is passed to Spark, it is cloned and can no longer be modified by the user. Examples -------- >>> from pyspark.conf import SparkConf >>> from pyspark.context import SparkContext >>> conf = SparkConf() >>> conf.setMaster("local").setAppName("My app") <pyspark.conf.SparkConf object at ...> >>> conf.get("spark.master") 'local' >>> conf.get("spark.app.name") 'My app' >>> sc = SparkContext(conf=conf) >>> sc.master 'local' >>> sc.appName 'My app' >>> sc.sparkHome is None True >>> conf = SparkConf(loadDefaults=False) >>> conf.setSparkHome("/path") <pyspark.conf.SparkConf object at ...> >>> conf.get("spark.home") '/path' >>> conf.setExecutorEnv("VAR1", "value1") <pyspark.conf.SparkConf object at ...> >>> conf.setExecutorEnv(pairs = [("VAR3", "value3"), ("VAR4", "value4")]) <pyspark.conf.SparkConf object at ...> >>> conf.get("spark.executorEnv.VAR1") 'value1' >>> print(conf.toDebugString()) spark.executorEnv.VAR1=value1 spark.executorEnv.VAR3=value3 spark.executorEnv.VAR4=value4 spark.home=/path >>> for p in sorted(conf.getAll(), key=lambda p: p[0]): ... print(p) ('spark.executorEnv.VAR1', 'value1') ('spark.executorEnv.VAR3', 'value3') ('spark.executorEnv.VAR4', 'value4') ('spark.home', '/path') >>> conf._jconf.setExecutorEnv("VAR5", "value5") JavaObject id... >>> print(conf.toDebugString()) spark.executorEnv.VAR1=value1 spark.executorEnv.VAR3=value3 spark.executorEnv.VAR4=value4 spark.executorEnv.VAR5=value5 spark.home=/path """_jconf:Optional[JavaObject]_conf:Optional[Dict[str,str]]def__init__(self,loadDefaults:bool=True,_jvm:Optional[JVMView]=None,_jconf:Optional[JavaObject]=None,):""" Create a new Spark configuration. """if_jconf:self._jconf=_jconfelse:frompyspark.contextimportSparkContext_jvm=_jvmorSparkContext._jvmif_jvmisnotNone:# JVM is created, so create self._jconf directly through JVMself._jconf=_jvm.SparkConf(loadDefaults)self._conf=Noneelse:# JVM is not created, so store data in self._conf firstself._jconf=Noneself._conf={}
[docs]defset(self,key:str,value:str)->"SparkConf":"""Set a configuration property."""# Try to set self._jconf first if JVM is created, set self._conf if JVM is not created yet.ifself._jconfisnotNone:self._jconf.set(key,str(value))else:assertself._confisnotNoneself._conf[key]=str(value)returnself
[docs]defsetIfMissing(self,key:str,value:str)->"SparkConf":"""Set a configuration property, if not already set."""ifself.get(key)isNone:self.set(key,value)returnself
[docs]defsetMaster(self,value:str)->"SparkConf":"""Set master URL to connect to."""self.set("spark.master",value)returnself
[docs]defsetExecutorEnv(self,key:Optional[str]=None,value:Optional[str]=None,pairs:Optional[List[Tuple[str,str]]]=None,)->"SparkConf":"""Set an environment variable to be passed to executors."""if(keyisnotNoneandpairsisnotNone)or(keyisNoneandpairsisNone):raisePySparkRuntimeError(error_class="KEY_VALUE_PAIR_REQUIRED",message_parameters={},)elifkeyisnotNone:self.set("spark.executorEnv.{}".format(key),cast(str,value))elifpairsisnotNone:for(k,v)inpairs:self.set("spark.executorEnv.{}".format(k),v)returnself
[docs]defsetAll(self,pairs:List[Tuple[str,str]])->"SparkConf":""" Set multiple parameters, passed as a list of key-value pairs. Parameters ---------- pairs : iterable of tuples list of key-value pairs to set """for(k,v)inpairs:self.set(k,v)returnself
[docs]defget(self,key:str,defaultValue:Optional[str]=None)->Optional[str]:"""Get the configured value for some key, or return a default otherwise."""ifdefaultValueisNone:# Py4J doesn't call the right get() if we pass Noneifself._jconfisnotNone:ifnotself._jconf.contains(key):returnNonereturnself._jconf.get(key)else:assertself._confisnotNonereturnself._conf.get(key,None)else:ifself._jconfisnotNone:returnself._jconf.get(key,defaultValue)else:assertself._confisnotNonereturnself._conf.get(key,defaultValue)
[docs]defgetAll(self)->List[Tuple[str,str]]:"""Get all values as a list of key-value pairs."""ifself._jconfisnotNone:return[(elem._1(),elem._2())forelemincast(JavaObject,self._jconf).getAll()]else:assertself._confisnotNonereturnlist(self._conf.items())
[docs]defcontains(self,key:str)->bool:"""Does this configuration contain a given key?"""ifself._jconfisnotNone:returnself._jconf.contains(key)else:assertself._confisnotNonereturnkeyinself._conf
[docs]deftoDebugString(self)->str:""" Returns a printable version of the configuration, as a list of key=value pairs, one per line. """ifself._jconfisnotNone:returnself._jconf.toDebugString()else:assertself._confisnotNonereturn"\n".join("%s=%s"%(k,v)fork,vinself._conf.items())