Apache Spark เป็นระบบประมวลแบบ cluster ซึ่งมี API ให้เราใช้ได้หลายภาษาอยู่เหมือนกัน ขึ้นอยู่กับความถนัดของเราเลย ไม่ว่าจะเป็น JAVA, Scala, Python, R และมีการปรับปรุงให้รองรับ Graph processing ด้วย
Apache Spark มีส่วนประกอบให้เราได้เลือกใช้คือ
1. Spark SQL สำหรับ SQL และ structured data
2. MLlib สำหรับ Machine Learning
3. GraphX สำหรับ Graph processing
4. Spark Streaming
สำหรับบทความนี้จะใช้ Spark Streaming ในการเตรียมข้อมูลที่จะเก็บใน Elasticsearch และตัวอย่างยอดนิยม นับจำนวนคำ (WordCount) ทั้ง Apache Spark และ Elasticsearch ติดตั้งอยู่ที่เครื่องผมเองนะครับ (localhost)
เตรียมพร้อมด้วยการสร้าง SparkContext
from pyspark import SparkContext from pyspark.streaming import StreamingContext # Create a local StreamingContext with two working thread and batch interval of 1 second sc = SparkContext("local[2]", "NetworkWordCount") ssc = StreamingContext(sc, 1)
อ่านข้อมูลจากไฟล์ data.txt
text_file = sc.textFile('data.txt')
จากนั้นก็มาจัดการข้อมูลที่ได้จากไฟล์ data.txt
# map each line to its words words = text_file.flatMap(lambda line: line.split()) # emit value:1 for each key:word word_map = words.map(lambda word: (word, 1)) # add up word counts by key:word word_counts = word_map.reduceByKey(lambda a, b: a+b) # เตรียมข้อมูลสำหรับ Elasticsearch es_input = word_counts.map(lambda word: ('key', {"word": word[0], "count": word[1]}))
เก็บข้อมูลเข้าไปที่ Elasticsearch
es_input.saveAsNewAPIHadoopFile( path='-', outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf={ "es.resource" : "data/word_counts" })
รวมแต่ละส่วนของโค้ดไว้ด้วยกันซะหน่อย
#! -*- coding: utf-8 -*- from pyspark import SparkContext from pyspark.streaming import StreamingContext # Create a local StreamingContext with two working thread and batch interval of 1 second sc = SparkContext("local[2]", "NetworkWordCount") ssc = StreamingContext(sc, 1) text_file = sc.textFile('data.txt') words = text_file.flatMap(lambda line: line.split()) word_map = words.map(lambda word: (word, 1)) word_counts = word_map.reduceByKey(lambda a, b: a+b) es_input = word_counts.map(lambda word: ('key', {"word": word[0], "count": word[1]})) es_input.saveAsNewAPIHadoopFile( path='-', outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf={ "es.resource" : "data/word_counts" })
พอเตรียมโค้ดเสร็จก็ถึงเวลารันคำสั่งครับ แต่ก่อนอื่นต้องดาวน์โหลด Elasticsearch Hadoop ก่อนนะครับ
./bin/spark-submit --master local[4] --jars jars/elasticsearch-hadoop-2.1.0.jar word_counts.py
ขอให้สนุกกับ Apache Spark นะครับ ^^
แหล่งข้อมูล:
– http://spark.apache.org/docs/latest/streaming-programming-guide.html