Yelp-Dataset-Analysis

Yelp Dataset Analysis using Apach Spark, PIG and insightfulls using Zeppelin GUI

This project is maintained by shaivikochar

Big Data Analysis and Visualization

Gathering useful insights from the Dataset using interactive tool Apache Zeppelin. This tool provides an integrated platform to have a Web-based notebook that enables data-driven, interactive data analytics and collaborative documents with SQL, Scala etc. I am running zeppelin locally on docker by following these instructions. I pulled all the JSON files into HDFS for easy access.

About the Yelp Dataset

Analysis on Zeppelin

import scala.collection.mutable.WrappedArray
import spark.implicits._
import org.apache.spark.sql.functions._

val business = spark.read.json("hdfs://localhost:8020/usr/data/yelp_dataset_challenge_round9/yelp_academic_dataset_business.json")
 
val b = business.withColumn("category", explode(
    when(col("categories").isNotNull, col("categories"))
    .otherwise(array(lit(null).cast("string")))
    ))
    
b.registerTempTable("business")

%sql SELECT city,category, SUM(review_count) AS total_review,FROM business group by category,city order by city

screenshot

import scala.collection.mutable.WrappedArray
import spark.implicits._
import org.apache.spark.sql.functions._

val business = spark.read.json("hdfs://localhost:8020/usr/data/yelp_dataset_challenge_round9/yelp_academic_dataset_business.json")
 
val b = business.withColumn("category", explode(
    when(col("categories").isNotNull, col("categories"))
    .otherwise(array(lit(null).cast("string")))
    ))
    
b.registerTempTable("business")

%sql SELECT  category,city,avg(stars) as avg_stars from business group by category,city order by category asc, avg_stars desc;

screenshot

import scala.collection.mutable.WrappedArray
import spark.implicits._
import org.apache.spark.sql.functions._

val business = spark.read.json("hdfs://localhost:8020/usr/data/yelp_dataset_challenge_round9/yelp_academic_dataset_business.json")
 
val b = business.withColumn("category", explode(
    when(col("categories").isNotNull, col("categories"))
    .otherwise(array(lit(null).cast("string")))
    ))
    
b.registerTempTable("business")
%sql Select category, avg(stars) as avg_star from business where latitude < 43.22145313 AND longitude < -89.21487592 AND latitude > 42.93172719 AND longitude > -89.61009908 group by category order by category;

screenshot

import spark.implicits._
import org.apache.spark.sql.functions._

val spark = new org.apache.spark.sql.SQLContext(sc)

val user_data = spark.read.json("hdfs://localhost:8020/usr/data/yelp_dataset_challenge_round9/yelp_academic_dataset_user.json")

val user_attributes = user.select("user_id","name","review_count")

user_attributes.registerTempTable("user");

val user_attributes_sorted = spark.sql("select * from user order by review_count desc");

val top10 = user_attributes_sorted.limit(10);

top10.registerTempTable("top10");

val reviews_data = spark.read.json("hdfs://localhost:8020/usr/data/yelp_dataset_challenge_round9/yelp_academic_dataset_review.json")

val review_attributes = reviews_data.select("user_id","business_id","stars")

review_attributes.registerTempTable("review");

val user_review = spark.sql("select top10.user_id, top10.name, top10.review_count, review.business_id, review.stars from top10 JOIN review on top10.user_id=review.user_id")

user_review.registerTempTable("user_review")

val business_data = spark.read.json("hdfs://localhost:8020/usr/data/yelp_dataset_challenge_round9/yelp_academic_dataset_business.json")

val b = business.withColumn("category", explode(
    when(col("categories").isNotNull, col("categories"))
    .otherwise(array(lit(null).cast("string")))
    ))

val business_attributes = b.select("business_id","category");
business_attributes.registerTempTable("business");

val user_review_business = spark.sql("select user_review.business_id,user_review.name,user_review.stars,business.category from user_review JOIN business on user_review.business_id = business.business_id");

user_review_business.registerTempTable("user_review_business")

val resTable = spark.sql("select name,category, sum(stars) from user_review_business group by name,category")

%sql select name, category, sum(stars) from user_review_business group by name,category

screenshot

import org.apache.spark.sql.functions._
import spark.implicits._

val spark = new org.apache.spark.sql.SQLContext(sc)

val business_data = spark.read.json("hdfs://localhost:8020/usr/data/yelp_dataset_challenge_round9/yelp_academic_dataset_business.json")

val b = business_data.withColumn("category", explode(
    when(col("categories").isNotNull, col("categories"))
    .otherwise(array(lit(null).cast("string")))
    ))
    
b.registerTempTable("business")

val catlatlong = spark.sql("select business_id,name,categories, stars from business where latitude >= 42.908333 AND latitude <= 43.241667 AND longitude >= -89.583889 AND longitude <= -89.250556 AND category == 'Food'");

val catflat = catlatlong.withColumn("category", explode(
    when(col("categories").isNotNull, col("categories"))
    .otherwise(array(lit(null).cast("string")))
    ))

catflat.registerTempTable("catflat")	
	
val topstars = spark.sql("select business_id,name,category, stars from catflat order by stars desc limit 10")

val bottomstars = spark.sql("select business_id,name,category, stars from catflat order by stars asc limit 10")

topstars.registerTempTable("topstars")

bottomstars.registerTempTable("bottomstars")

val review_data = spark.read.json("hdfs://localhost:8020/usr/data/yelp_dataset_challenge_round9/yelp_academic_dataset_review.json")

review_data.registerTempTable("review")

val filterdate = spark.sql("select business_id,date,stars from review where month(date)>=1 and month(date)<=5")

filterdate.registerTempTable("filterdate")

val joinedTableTop = spark.sql("select topstars.name,topstars.category, topstars.business_id, filterdate.date, filterdate.stars from topstars INNER JOIN filterdate on topstars.business_id = filterdate.business_id")

val joinedTableBottom = spark.sql("select bottomstars.name, bottomstars.category, bottomstars.business_id, filterdate.date, filterdate.stars from bottomstars INNER JOIN filterdate on bottomstars.business_id = filterdate.business_id")

joinedTableTop.registerTempTable("joinedTableTop")

joinedTableBottom.registerTempTable("joinedTableBottom")

val jointotal = spark.sql("Select * from joinedTableTop as topfull union Select * from joinedTableBottom as bottomfull")

jointotal.registerTempTable("jointotal")

val avg_stars = spark.sql("select business_id, avg(stars) as avg_stars from jointotal group by business_id")

%sql select business_id, avg(stars) as avg_stars from jointotal group by business_id

screenshot

My repo also have PIG scripts for these analysis.