1、Introduction to Apache Spark,Patrick Wendell - Databricks,What is Spark?,Efficient,General execution graphs In-memory storage,Usable,Rich APIs in Java, Scala, Python Interactive shell,Fast and Expressive Cluster Computing Engine Compatible with Apache Hadoop,2-5 less code,Up to 10 faster on disk, 10
2、0 in memory,The Spark Community,+You!,Todays Talk,The Spark programming modelLanguage and deployment choicesExample algorithm (PageRank),Spark Programming Model,Key Concept: RDDs,Resilient Distributed Datasets Collections of objects spread across a cluster, stored in RAM or on Disk Built through par
3、allel transformations Automatically rebuilt on failure,Operations Transformations (e.g. map, filter, groupBy) Actions (e.g. count, collect, save),Write programs in terms of operations on distributed datasets,Example: Log Mining,Load error messages from a log into memory, then interactively search fo
4、r various patterns,lines = spark.textFile(“hdfs:/.”) errors = lines.filter(lambda s: s.startswith(“ERROR”) messages = errors.map(lambda s: s.split(“t”)2) messages.cache(),Block 1,Block 2,Block 3,messages.filter(lambda s: “mysql” in s).count(),messages.filter(lambda s: “php” in s).count(),. . .,tasks
5、,results,Cache 1,Cache 2,Cache 3,Base RDD,Transformed RDD,Action,Full-text search of Wikipedia 60GB on 20 EC2 machine 0.5 sec vs. 20s for on-disk,Scaling Down,Fault Recovery,RDDs track lineage information that can be used to efficiently recompute lost data,msgs = textFile.filter(lambda s: s.startsWi
6、th(“ERROR”).map(lambda s: s.split(“t”)2),HDFS File,Filtered RDD,Mapped RDD,filter (func = startsWith(),map (func = split(.),Programming with RDDs,SparkContext,Main entry point to Spark functionality Available in shell as variable sc In standalone programs, youd make your own (see later for details),
7、Creating RDDs,# Turn a Python collection into an RDD sc.parallelize(1, 2, 3)# Load text file from local FS, HDFS, or S3 sc.textFile(“file.txt”) sc.textFile(“directory/*.txt”) sc.textFile(“hdfs:/namenode:9000/path/file”)# Use existing Hadoop InputFormat (Java/Scala only) sc.hadoopFile(keyClass, valCl
8、ass, inputFmt, conf),Basic Transformations,nums = sc.parallelize(1, 2, 3) # Pass each element through a function squares = nums.map(lambda x: x*x) / 1, 4, 9# Keep elements passing a predicate even = squares.filter(lambda x: x % 2 = 0) / 4# Map each element to zero or more others nums.flatMap(lambda
9、x: = range(x) # = 0, 0, 1, 0, 1, 2,Range object (sequence of numbers 0, 1, , x-1),Basic Actions,nums = sc.parallelize(1, 2, 3) # Retrieve RDD contents as a local collection nums.collect() # = 1, 2, 3# Return first K elements nums.take(2) # = 1, 2# Count number of elements nums.count() # = 3# Merge e
10、lements with an associative function nums.reduce(lambda x, y: x + y) # = 6# Write elements to a text file nums.saveAsTextFile(“hdfs:/file.txt”),Working with Key-Value Pairs,Sparks “distributed reduce” transformations operate on RDDs of key-value pairs,Python: pair = (a, b) pair0 # = a pair1 # = b Sc
11、ala: val pair = (a, b) pair._1 / = a pair._2 / = b Java: Tuple2 pair = new Tuple2(a, b); pair._1 / = a pair._2 / = b,Some Key-Value Operations,pets = sc.parallelize( (“cat”, 1), (“dog”, 1), (“cat”, 2) pets.reduceByKey(lambda x, y: x + y) # = (cat, 3), (dog, 1) pets.groupByKey() # = (cat, 1, 2), (dog
12、, 1) pets.sortByKey() # = (cat, 1), (cat, 2), (dog, 1)reduceByKey also automatically implements combiners on the map side,lines = sc.textFile(“hamlet.txt”) counts = lines.flatMap(lambda line: line.split(“ ”) .map(lambda word = (word, 1) .reduceByKey(lambda x, y: x + y),Example: Word Count,Other Key-
13、Value Operations,visits = sc.parallelize( (“index.html”, “1.2.3.4”), (“about.html”, “3.4.5.6”), (“index.html”, “1.3.3.1”) )pageNames = sc.parallelize( (“index.html”, “Home”), (“about.html”, “About”) )visits.join(pageNames) # (“index.html”, (“1.2.3.4”, “Home”) # (“index.html”, (“1.3.3.1”, “Home”) # (
14、“about.html”, (“3.4.5.6”, “About”)visits.cogroup(pageNames) # (“index.html”, (“1.2.3.4”, “1.3.3.1”, “Home”) # (“about.html”, (“3.4.5.6”, “About”),Setting the Level of Parallelism,All the pair RDD operations take an optional second parameter for number of taskswords.reduceByKey(lambda x, y: x + y, 5)
15、 words.groupByKey(5) visits.join(pageViews, 5),Using Local Variables,Any external variables you use in a closure will automatically be shipped to the cluster:query = sys.stdin.readline() pages.filter(lambda x: query in x).count()Some caveats: Each task gets a new copy (updates arent sent back) Varia
16、ble must be Serializable / Pickle-able Dont use fields of an outer object (ships all of it!),Under The Hood: DAG Scheduler,General task graphs Automatically pipelines functions Data locality aware Partitioning aware to avoid shuffles,= cached partition,= RDD,More RDD Operators,map filter groupBy sor
17、t union join leftOuterJoin rightOuterJoin,reduce count fold reduceByKey groupByKey cogroup cross zip,sample take first partitionBy mapWith pipe save .,How to Run Spark,Language Support,Standalone Programs Python, Scala, & JavaInteractive Shells Python & ScalaPerformance Java & Scala are faster due t
18、o static typing but Python is often fine,Python lines = sc.textFile(.) lines.filter(lambda s: “ERROR” in s).count(),Scala val lines = sc.textFile(.) lines.filter(x = x.contains(“ERROR”).count(),Java JavaRDD lines = sc.textFile(.); lines.filter(new Function() Boolean call(String s) return s.contains(
19、“error”); ).count();,Interactive Shell,The Fastest Way to Learn Spark Available in Python and Scala Runs as an application on an existing Spark Cluster OR Can run locally,import sys from pyspark import SparkContextif _name_ = “_main_“:sc = SparkContext( “local”, “WordCount”, sys.argv0, None) lines =
20、 sc.textFile(sys.argv1) counts = lines.flatMap(lambda s: s.split(“ ”) .map(lambda word: (word, 1) .reduceByKey(lambda x, y: x + y) counts.saveAsTextFile(sys.argv2), or a Standalone Application,import org.apache.spark.api.java.JavaSparkContext;JavaSparkContext sc = new JavaSparkContext(“masterUrl”, “
21、name”, “sparkHome”, new String “app.jar”);,import org.apache.spark.SparkContext import org.apache.spark.SparkContext._val sc = new SparkContext(“url”, “name”, “sparkHome”, Seq(“app.jar”),Cluster URL, or local / localN,App name,Spark install path on cluster,List of JARs with app code (to ship),Create
22、 a SparkContext,Scala,Java,from pyspark import SparkContextsc = SparkContext(“masterUrl”, “name”, “sparkHome”, “library.py”),Python,Add Spark to Your Project,Scala / Java: add a Maven dependency ongroupId: org.spark-project artifactId: spark-core_2.10 version: 0.9.0Python: run program with our pyspa
23、rk script,Administrative GUIs,http:/:8080 (by default),Software Components,Spark runs as a library in your program (1 instance per app) Runs tasks locally or on cluster Mesos, YARN or standalone mode Accesses storage systems via Hadoop InputFormat API Can use HBase, HDFS, S3, ,Your application,Spark
24、Context,Local threads,Cluster manager,Worker,Spark executor,Worker,Spark executor,HDFS or other storage,Just pass local or localk as master URL Debug using local debuggers For Java / Scala, just run your program in a debugger For Python, use an attachable debugger (e.g. PyDev) Great for development
25、& unit tests,Local Execution,Cluster Execution,Easiest way to launch is EC2: ./spark-ec2 -k keypair i id_rsa.pem s slaves launch|stop|start|destroy clusterName Several options for private clusters: Standalone mode (similar to Hadoops deploy scripts) Mesos Hadoop YARN Amazon EMR: Application: PageRa
26、nk,Example: PageRank,Good example of a more complex algorithm Multiple stages of map & reduce Benefits from Sparks in-memory caching Multiple iterations over the same data,Basic Idea,Give pages ranks (scores) based on links to them Links from many pages high rank Link from a high-rank page high rank
27、,Image: en.wikipedia.org/wiki/File:PageRank-hi-res-2.png,Algorithm,1.0,1.0,1.0,1.0,Start each page at a rank of 1 On each iteration, have page p contribute rankp / |neighborsp| to its neighbors Set each pages rank to 0.15 + 0.85 contribs,Algorithm,Start each page at a rank of 1 On each iteration, ha
28、ve page p contribute rankp / |neighborsp| to its neighbors Set each pages rank to 0.15 + 0.85 contribs,1.0,1.0,1.0,1.0,1,0.5,0.5,0.5,1,0.5,Algorithm,Start each page at a rank of 1 On each iteration, have page p contribute rankp / |neighborsp| to its neighbors Set each pages rank to 0.15 + 0.85 contr
29、ibs,0.58,1.0,1.85,0.58,Algorithm,Start each page at a rank of 1 On each iteration, have page p contribute rankp / |neighborsp| to its neighbors Set each pages rank to 0.15 + 0.85 contribs,0.58,0.29,0.29,0.5,1.85,0.58,1.0,1.85,0.58,0.5,Algorithm,Start each page at a rank of 1 On each iteration, have
30、page p contribute rankp / |neighborsp| to its neighbors Set each pages rank to 0.15 + 0.85 contribs,0.39,1.72,1.31,0.58,. . .,Algorithm,Start each page at a rank of 1 On each iteration, have page p contribute rankp / |neighborsp| to its neighbors Set each pages rank to 0.15 + 0.85 contribs,Scala Imp
31、lementation,val links = / load RDD of (url, neighbors) pairs var ranks = / load RDD of (url, rank) pairsfor (i links.map(dest = (dest, rank/links.size) ranks = contribs.reduceByKey(_ + _) .mapValues(0.15 + 0.85 * _) ranks.saveAsTextFile(.),PageRank Performance,Other Iterative Algorithms,Time per Ite
32、ration (s),Conclusion,Conclusion,Spark offers a rich API to make data analytics fast: both fast to write and fast to run Achieves 100x speedups in real applications Growing community with 25+ companies contributing,Get Started,Up and Running in a Few Steps Download Unzip ShellProject Resources Examples on the Project Site Examples in the Distribution Documentation,http:/spark.incubator.apache.org,