Yelp Dataset Analysis using Apach Spark, PIG and insightfulls using Zeppelin GUI
This project is maintained by shaivikochar
Gathering useful insights from the Dataset using interactive tool Apache Zeppelin. This tool provides an integrated platform to have a Web-based notebook that enables data-driven, interactive data analytics and collaborative documents with SQL, Scala etc. I am running zeppelin locally on docker by following these instructions. I pulled all the JSON files into HDFS for easy access.
%sql
command to generate the visualizations.import scala.collection.mutable.WrappedArray
import spark.implicits._
import org.apache.spark.sql.functions._
val business = spark.read.json("hdfs://localhost:8020/usr/data/yelp_dataset_challenge_round9/yelp_academic_dataset_business.json")
val b = business.withColumn("category", explode(
when(col("categories").isNotNull, col("categories"))
.otherwise(array(lit(null).cast("string")))
))
b.registerTempTable("business")
%sql SELECT city,category, SUM(review_count) AS total_review,FROM business group by category,city order by city
import scala.collection.mutable.WrappedArray
import spark.implicits._
import org.apache.spark.sql.functions._
val business = spark.read.json("hdfs://localhost:8020/usr/data/yelp_dataset_challenge_round9/yelp_academic_dataset_business.json")
val b = business.withColumn("category", explode(
when(col("categories").isNotNull, col("categories"))
.otherwise(array(lit(null).cast("string")))
))
b.registerTempTable("business")
%sql SELECT category,city,avg(stars) as avg_stars from business group by category,city order by category asc, avg_stars desc;
Center: University of Wisconsin - Madison Latitude: 43 04’ 30” N, Longitude: 89 25’ 2” W Decimal Degrees: Latitude: 43.0766, Longitude: -89.4125 The bounding box for this problem is ~10 miles, which we will loosely define as 10 minutes. So the bounding box is a square box, 20 minutes long each side (of longitude and latitude), with UWM at the center.
%sql
command.import scala.collection.mutable.WrappedArray
import spark.implicits._
import org.apache.spark.sql.functions._
val business = spark.read.json("hdfs://localhost:8020/usr/data/yelp_dataset_challenge_round9/yelp_academic_dataset_business.json")
val b = business.withColumn("category", explode(
when(col("categories").isNotNull, col("categories"))
.otherwise(array(lit(null).cast("string")))
))
b.registerTempTable("business")
%sql Select category, avg(stars) as avg_star from business where latitude < 43.22145313 AND longitude < -89.21487592 AND latitude > 42.93172719 AND longitude > -89.61009908 group by category order by category;
import spark.implicits._
import org.apache.spark.sql.functions._
val spark = new org.apache.spark.sql.SQLContext(sc)
val user_data = spark.read.json("hdfs://localhost:8020/usr/data/yelp_dataset_challenge_round9/yelp_academic_dataset_user.json")
val user_attributes = user.select("user_id","name","review_count")
user_attributes.registerTempTable("user");
val user_attributes_sorted = spark.sql("select * from user order by review_count desc");
val top10 = user_attributes_sorted.limit(10);
top10.registerTempTable("top10");
val reviews_data = spark.read.json("hdfs://localhost:8020/usr/data/yelp_dataset_challenge_round9/yelp_academic_dataset_review.json")
val review_attributes = reviews_data.select("user_id","business_id","stars")
review_attributes.registerTempTable("review");
val user_review = spark.sql("select top10.user_id, top10.name, top10.review_count, review.business_id, review.stars from top10 JOIN review on top10.user_id=review.user_id")
user_review.registerTempTable("user_review")
val business_data = spark.read.json("hdfs://localhost:8020/usr/data/yelp_dataset_challenge_round9/yelp_academic_dataset_business.json")
val b = business.withColumn("category", explode(
when(col("categories").isNotNull, col("categories"))
.otherwise(array(lit(null).cast("string")))
))
val business_attributes = b.select("business_id","category");
business_attributes.registerTempTable("business");
val user_review_business = spark.sql("select user_review.business_id,user_review.name,user_review.stars,business.category from user_review JOIN business on user_review.business_id = business.business_id");
user_review_business.registerTempTable("user_review_business")
val resTable = spark.sql("select name,category, sum(stars) from user_review_business group by name,category")
%sql select name, category, sum(stars) from user_review_business group by name,category
import org.apache.spark.sql.functions._
import spark.implicits._
val spark = new org.apache.spark.sql.SQLContext(sc)
val business_data = spark.read.json("hdfs://localhost:8020/usr/data/yelp_dataset_challenge_round9/yelp_academic_dataset_business.json")
val b = business_data.withColumn("category", explode(
when(col("categories").isNotNull, col("categories"))
.otherwise(array(lit(null).cast("string")))
))
b.registerTempTable("business")
val catlatlong = spark.sql("select business_id,name,categories, stars from business where latitude >= 42.908333 AND latitude <= 43.241667 AND longitude >= -89.583889 AND longitude <= -89.250556 AND category == 'Food'");
val catflat = catlatlong.withColumn("category", explode(
when(col("categories").isNotNull, col("categories"))
.otherwise(array(lit(null).cast("string")))
))
catflat.registerTempTable("catflat")
val topstars = spark.sql("select business_id,name,category, stars from catflat order by stars desc limit 10")
val bottomstars = spark.sql("select business_id,name,category, stars from catflat order by stars asc limit 10")
topstars.registerTempTable("topstars")
bottomstars.registerTempTable("bottomstars")
val review_data = spark.read.json("hdfs://localhost:8020/usr/data/yelp_dataset_challenge_round9/yelp_academic_dataset_review.json")
review_data.registerTempTable("review")
val filterdate = spark.sql("select business_id,date,stars from review where month(date)>=1 and month(date)<=5")
filterdate.registerTempTable("filterdate")
val joinedTableTop = spark.sql("select topstars.name,topstars.category, topstars.business_id, filterdate.date, filterdate.stars from topstars INNER JOIN filterdate on topstars.business_id = filterdate.business_id")
val joinedTableBottom = spark.sql("select bottomstars.name, bottomstars.category, bottomstars.business_id, filterdate.date, filterdate.stars from bottomstars INNER JOIN filterdate on bottomstars.business_id = filterdate.business_id")
joinedTableTop.registerTempTable("joinedTableTop")
joinedTableBottom.registerTempTable("joinedTableBottom")
val jointotal = spark.sql("Select * from joinedTableTop as topfull union Select * from joinedTableBottom as bottomfull")
jointotal.registerTempTable("jointotal")
val avg_stars = spark.sql("select business_id, avg(stars) as avg_stars from jointotal group by business_id")
%sql select business_id, avg(stars) as avg_stars from jointotal group by business_id
My repo also have PIG scripts for these analysis.