Spark-movie-lens - An on-line movie recommender using Spark, Python Flask, and the MovieLens dataset

Overview

A scalable on-line movie recommender using Spark and Flask

This Apache Spark tutorial will guide you step-by-step into how to use the MovieLens dataset to build a movie recommender using collaborative filtering with Spark's Alternating Least Saqures implementation. It is organised in two parts. The first one is about getting and parsing movies and ratings data into Spark RDDs. The second is about building and using the recommender and persisting it for later use in our on-line recommender system.

This tutorial can be used independently to build a movie recommender model based on the MovieLens dataset. Most of the code in the first part, about how to use ALS with the public MovieLens dataset, comes from my solution to one of the exercises proposed in the CS100.1x Introduction to Big Data with Apache Spark by Anthony D. Joseph on edX, that is also publicly available since 2014 at Spark Summit. Starting from there, I've added with minor modifications to use a larger dataset, then code about how to store and reload the model for later use, and finally a web service using Flask.

In any case, the use of this algorithm with this dataset is not new (you can Google about it), and this is because we put the emphasis on ending up with a usable model in an on-line environment, and how to use it in different situations. But I truly got inspired by solving the exercise proposed in that course, and I highly recommend you to take it. There you will learn not just ALS but many other Spark algorithms.

It is the second part of the tutorial the one that explains how to use Python/Flask for building a web-service on top of Spark models. By doing so, you will be able to develop a complete on-line movie recommendation service.

Part I: Building the recommender

Part II: Building and running the web service

Quick start

The file server/server.py starts a CherryPy server running a Flask app.py to start a RESTful web server wrapping a Spark-based engine.py context. Through its API we can perform on-line movie recommendations.

Please, refer the the second notebook for detailed instructions on how to run and use the service.

Contributing

Contributions are welcome! For bug reports or requests please submit an issue.

Contact

Feel free to contact me to discuss any issues, questions, or comments.

License

This repository contains a variety of content; some developed by Jose A. Dianes, and some from third-parties. The third-party content is distributed under the license provided by those parties.

The content developed by Jose A. Dianes is distributed under the following license:

Copyright 2016 Jose A Dianes

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Comments
  • GETing top recommendations shows the same movie

    GETing top recommendations shows the same movie

    Hello,

    I've managed to run the project locally, and the output from getting top recommendations shows the same movie. Does anyone else experience the same behavior? I mention that I've run it with the exact source files as in this repo.

    The mentioned output looks like this. "[["The War (2007)", 8.836370207914264, 30], ["The War (2007)", 8.836370207914264, 30], ["The War (2007)", 8.836370207914264, 30], ["The War (2007)", 8.836370207914264, 30], ["The War (2007)", 8.836370207914264, 30], ["The War (2007)", 8.836370207914264, 30], ["The War (2007)", 8.836370207914264, 30], ["The War (2007)", 8.836370207914264, 30], ["The War (2007)", 8.836370207914264, 30], ["The War (2007)", 8.836370207914264, 30]]"

    Thank you.

    opened by marius92mc 6
  • Invalid Syntax in engine.py - line 115, self.seed = 5L

    Invalid Syntax in engine.py - line 115, self.seed = 5L

    (C:\Program Files\Anaconda3) F:\Data Science\movielens\spark-movie-lens>server.py Traceback (most recent call last): File "F:\Data Science\movielens\spark-movie-lens\server.py", line 3, in from app import create_app File "F:\Data Science\movielens\spark-movie-lens\app.py", line 5, in from engine import RecommendationEngine File "F:\Data Science\movielens\spark-movie-lens\engine.py", line 115 self.seed = 5L ^ SyntaxError: invalid syntax

    opened by ghost 3
  • logic error in function

    logic error in function "get_top_ratings" when get "user_unrated_moies_RDD"

    file:engine.py ->function:get_top_ratings, code as user_unrated_movies_RDD = self.movies_RDD.filter(lambda rating: not rating[1]==user_id).map(lambda x: (user_id, x[0])) Element of self.movies_RDD as (movie_id, movie_title, movie_category), "rating[1]" represent "movie_title";I guess "self.movies_RDD" should be "self.ratings_RDD"; Please check this question.

    opened by movingheart 2
  • Importing Spark

    Importing Spark

    Hello, As I was following the guide, I found the variable sc which was not defined, I figured it belonged to Spark. However, I don't know how to configure Spark to run the notebook. I'm on windows, any help?

    opened by Mohamed3on 1
  • org.apache.hadoop.mapred.InvalidInputException: Input path does not exist

    org.apache.hadoop.mapred.InvalidInputException: Input path does not exist

    I am using the same notebook on Cloudera's quickstart VM and Anaconda installed. I have done no other changes.

    On this step:

    small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

    it gives an error:

    ---------------------------------------------------------------------------
    Py4JJavaError                             Traceback (most recent call last)
    <ipython-input-21-61849ee50ee7> in <module>()
    ----> 1 small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]
    
    /usr/lib/spark/python/pyspark/rdd.py in take(self, num)
       1265         """
       1266         items = []
    -> 1267         totalParts = self.getNumPartitions()
       1268         partsScanned = 0
       1269 
    
    /usr/lib/spark/python/pyspark/rdd.py in getNumPartitions(self)
        354         2
        355         """
    --> 356         return self._jrdd.partitions().size()
        357 
        358     def filter(self, f):
    
    /usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
        811         answer = self.gateway_client.send_command(command)
        812         return_value = get_return_value(
    --> 813             answer, self.gateway_client, self.target_id, self.name)
        814 
        815         for temp_arg in temp_args:
    
    /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
         43     def deco(*a, **kw):
         44         try:
    ---> 45             return f(*a, **kw)
         46         except py4j.protocol.Py4JJavaError as e:
         47             s = e.java_exception.toString()
    
    /usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
        306                 raise Py4JJavaError(
        307                     "An error occurred while calling {0}{1}{2}.\n".
    --> 308                     format(target_id, ".", name), value)
        309             else:
        310                 raise Py4JError(
    
    Py4JJavaError: An error occurred while calling o108.partitions.
    : org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://quickstart.cloudera:8020/home/cloudera/datasets/ml-latest-small/ratings.csv
        at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287)
        at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
        at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:64)
        at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:46)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:209)
        at java.lang.Thread.run(Thread.java:745)
    

    I checked the previous step:

    small_ratings_raw_data

    This gives the result:

    /home/cloudera/datasets/ml-latest-small/ratings.csv MapPartitionsRDD[7] at textFile at NativeMethodAccessorImpl.java:-2

    Could you please help me with this?

    opened by anshuljoshi 1
  • Ger error when execute

    Ger error when execute " error_complete = math.sqrt(rates_and_preds_complete.map(lambda r:(r[1][0]-r[1][1])**2).mean())"

    Hello, i submit the code building-recommender.ipynb by pyspark ,when code goes here

    error_complete = math.sqrt(rates_and_preds_complete.map(lambda  r:(r[1][0]-r[1][1])**2).mean())
    

    i got an error below

    It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation.
    
    opened by shartoo 1
  • Fix bug

    Fix bug "GETing top recommendations shows the same movie #6"

    Problem: Engine returns the same movie for the best recommendations.

    Fix: Revert changes that were made by #5.

    With these changes, the get top recommendation feature works fine.

    Example: before

    $ curl http://0.0.0.0:5433/0/ratings/top/5
    [["Citizen Kane (1941)", 9.007845861110559, 77], ["Citizen Kane (1941)", 9.007845861110559, 77],  
     ["Citizen Kane (1941)", 9.007845861110559, 77], ["Citizen Kane (1941)", 9.007845861110559, 77], 
     ["Citizen Kane (1941)", 9.007845861110559, 77], ]
    

    after

    $ curl http://0.0.0.0:5433/0/ratings/top/5
    [["Citizen Kane (1941)", 9.007845861110559, 77], 
     ["Spirited Away (Sen to Chihiro no kamikakushi) (2001)", 8.846219879097916, 72], 
     ["12 Angry Men (1957)", 8.78432643295096, 63], 
     ["Sunset Blvd. (a.k.a. Sunset Boulevard) (1950)", 8.767201279657312, 29], 
     ["\"Clockwork Orange", 8.699448955180992, 134]]
    
    opened by marius92mc 1
  • Unable to proceed past stage 7.0 (OutOfMemoryError: Java heap space)

    Unable to proceed past stage 7.0 (OutOfMemoryError: Java heap space)

    py4j.protocol.Py4JJavaError: An error occurred while calling o96.trainALSModel. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 7.0 failed 1 times, most recent failure: Lost task 5.0 in stage 7.0 (TID 54, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space

    Unable to proceed further.. Any help is much appreciated!

    opened by datawrangl3r 0
  • Update engine.py

    Update engine.py

    Fixing an issue reported several times, but not fixed entirely in issues #5 and #6 (and mentioned in issue #9 as a remaining bug).

    The faulty logic mentioned in issue #5 was valid, but an error remained which led to the same items/movies recommended multiple times. This was due to the fact that the operations in line 80 resulted a non-unique RDD, meaning that the same movies are present multiple times. This is solved by adding the .distinct() operation, which removes duplicate entires.

    Step-by-step:

    1. self.ratings_RDD contains all user ratings
    2. .filter(lambda rating: not rating[0] == user_id) eliminates all movies already rated by specified user, where rating[0] refers to the user_id column
    3. .map(lambda x: (user_id, x[1])) puts all movie_ids in a (user_id, movie_id) format in a table (This is where a movie can exist multiple times!)
    4. .distinct() removes all duplicates entries
    opened by curato-research 0
  • Fixed wrong comparison between movie_id and user_id

    Fixed wrong comparison between movie_id and user_id

    Problem: In get_top_ratings() method, the filtering for the movies that weren't rated by the user_id is made based on comparison between movie_id and user_id. Fix: Replaced movie_id field from ratings_RDD with the user_id field.

    opened by marius92mc 0
  • New User Ratings

    New User Ratings

    Hi Jose, Great job ! Am new to github so please pardon if this should not be reported as an issue but I just wanted to bring to your attention on the ratings that we are providing to the complete data for a new user.

    The range for new user ratings seems to be [0,10] and when the reco engine makes predictions it throws predicted ratings in the similar range. Shouldn't it be in [0,5] range. When i supply ratings in this range it predicts movie ratings in the [0,5] range. But the predictions are drastically different than what they were earlier. Am i missing something here ?

    opened by chunkybaba 0
  • engine.iteration

    engine.iteration

    Hi jadianes, Thank you for your all work, it really helped me. But iteration in engine causes error in my system when it gets bigger than 5. I think 5 iterations are not enough for a good recommendation. Can you suggest any way to fix it? Error is this:

    File "F:\bitirme\spark-2.0.1-bin-hadoop2.7\python\pyspark\mllib\common.py", line 123, in callJavaFunc 17/04/24 22:58:09 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-7,5,main] java.lang.StackOverflowError at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:147) at org.apache.spark.util.ByteBufferInputStream.read(ByteBufferInputStream.scala:52) at java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)


    ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerJobEnd(5,1493063889491,JobFailed(org.apache.spark.SparkException: Job 5 cancelled because SparkContext was shut down)

    my system spesifications i7 2th gen 6 gb ram ssd 550/440 asus n53sv laptop

    opened by alperenbabagil 2
  •  Getting individual ratings

    Getting individual ratings

    Currently, the example code looks like:

    my_movie = sc.parallelize([(0, 500)]) # Quiz Show (1994)
    individual_movie_rating_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)
    individual_movie_rating_RDD.take(1)
    

    Should it be this ...?

    my_movie = sc.parallelize([(0, 500)]) # Quiz Show (1994)
    individual_movie_rating_RDD = new_ratings_model.predictAll(my_movie)
    individual_movie_rating_RDD.collect()
    
    opened by snowch 1
  • duplicates of new user unrated moves passed to predict

    duplicates of new user unrated moves passed to predict

    new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))
    

    The list of unrated movies contains duplicates:

    print(new_user_unrated_movies_RDD.take(10))
    [(0, 1), (0, 1), (0, 1), (0, 1), (0, 1), (0, 1), (0, 1), (0, 1), (0, 1), (0, 1)]
    

    Should there be a distinct added?

    new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0]))).distinct()
    
    print(new_user_unrated_movies_RDD.take(10))
    [(0, 378), (0, 1934), (0, 3282), (0, 5606), (0, 862), (0, 2146), (0, 3766), (0, 1330), (0, 2630), (0, 4970)]
    

    The predict function that receives new_user_unrated_movies_RDD:

    # Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
    new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)
    
    opened by snowch 0
  • StackOverflow

    StackOverflow

    Getting StackOverflow error while running the application engine An error occurred while calling o90.trainALSModel. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 28.0 failed 4 times, most recent failure: Lost task 0.3 in stage 28.0 (TID 49, 192.168.110.130): java.lang.StackOverflowError at java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2846) at java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1455) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invok

    opened by ujjwalit 0
Owner
Jose A Dianes
Principal Data Scientist at Mosaic Therapeutics.
Jose A Dianes
An open source movie recommendation WebApp build by movie buffs and mathematicians that uses cosine similarity on the backend.

Movie Pundit Find your next flick by asking the (almost) all-knowing Movie Pundit Jump to Project Source » View Demo · Report Bug · Request Feature Ta

Kapil Pramod Deshmukh 8 May 28, 2022
Deep recommender models using PyTorch.

Spotlight uses PyTorch to build both deep and shallow recommender models. By providing both a slew of building blocks for loss functions (various poin

Maciej Kula 2.8k Dec 29, 2022
A Python scikit for building and analyzing recommender systems

Overview Surprise is a Python scikit for building and analyzing recommender systems that deal with explicit rating data. Surprise was designed with th

Nicolas Hug 5.7k Jan 1, 2023
QRec: A Python Framework for quick implementation of recommender systems (TensorFlow Based)

QRec is a Python framework for recommender systems (Supported by Python 3.7.4 and Tensorflow 1.14+) in which a number of influential and newly state-of-the-art recommendation models are implemented. QRec has a lightweight architecture and provides user-friendly interfaces. It can facilitate model implementation and evaluation.

Yu 1.4k Dec 27, 2022
E-Commerce recommender demo with real-time data and a graph database

?? E-Commerce recommender demo ?? This is a simple stream setup that uses Memgraph to ingest real-time data from a simulated online store. Data is str

g-despot 3 Feb 23, 2022
Recommender System Papers

Included Conferences: SIGIR 2020, SIGKDD 2020, RecSys 2020, CIKM 2020, AAAI 2021, WSDM 2021, WWW 2021

RUCAIBox 704 Jan 6, 2023
Graph Neural Networks for Recommender Systems

This repository contains code to train and test GNN models for recommendation, mainly using the Deep Graph Library (DGL).

null 217 Jan 4, 2023
RecSim NG: Toward Principled Uncertainty Modeling for Recommender Ecosystems

RecSim NG, a probabilistic platform for multi-agent recommender systems simulation. RecSimNG is a scalable, modular, differentiable simulator implemented in Edward2 and TensorFlow. It offers: a powerful, general probabilistic programming language for agent-behavior specification;

Google Research 110 Dec 16, 2022
NVIDIA Merlin is an open source library designed to accelerate recommender systems on NVIDIA’s GPUs.

NVIDIA Merlin is an open source library providing end-to-end GPU-accelerated recommender systems, from feature engineering and preprocessing to training deep learning models and running inference in production.

null 420 Jan 4, 2023
Collaborative variational bandwidth auto-encoder (VBAE) for recommender systems.

Collaborative Variational Bandwidth Auto-encoder The codes are associated with the following paper: Collaborative Variational Bandwidth Auto-encoder f

Yaochen Zhu 14 Dec 11, 2022
Mutual Fund Recommender System. Tailor for fund transactions.

Explainable Mutual Fund Recommendation Data Please see 'DATA_DESCRIPTION.md' for mode detail. Recommender System Methods Baseline Collabarative Fiilte

JHJu 2 May 19, 2022
Movies/TV Recommender

recommender Movies/TV Recommender. Recommends Movies, TV Shows, Actors, Directors, Writers. Setup Create file API_KEY and paste your TMDB API key in i

Aviem Zur 3 Apr 22, 2022
6002project-rl - An implemention of offline RL on recommender system

An implemention of offline RL on recommender system @author: misajie @update: 20

Tzay Lee 3 May 24, 2022
Implementation of a hadoop based movie recommendation system

Implementation-of-a-hadoop-based-movie-recommendation-system 通过编写代码,设计一个基于Hadoop的电影推荐系统,通过此推荐系统的编写,掌握在Hadoop平台上的文件操作,数据处理的技能。windows 10 hadoop 2.8.3 p

汝聪(Ricardo) 5 Oct 2, 2022
Recommendation System to recommend top books from the dataset

recommendersystem Recommendation System to recommend top books from the dataset Introduction The recom.py is the main program code. The dataset is als

Vishal karur 1 Nov 15, 2021
Approximate Nearest Neighbors in C++/Python optimized for memory usage and loading/saving to disk

Annoy Annoy (Approximate Nearest Neighbors Oh Yeah) is a C++ library with Python bindings to search for points in space that are close to a given quer

Spotify 10.6k Jan 1, 2023
A TensorFlow recommendation algorithm and framework in Python.

TensorRec A TensorFlow recommendation algorithm and framework in Python. NOTE: TensorRec is not under active development TensorRec will not be receivi

James Kirk 1.2k Jan 4, 2023
Fast Python Collaborative Filtering for Implicit Feedback Datasets

Implicit Fast Python Collaborative Filtering for Implicit Datasets. This project provides fast Python implementations of several different popular rec

Ben Frederickson 3k Dec 31, 2022
A Python implementation of LightFM, a hybrid recommendation algorithm.

LightFM Build status Linux OSX (OpenMP disabled) Windows (OpenMP disabled) LightFM is a Python implementation of a number of popular recommendation al

Lyst 4.2k Jan 2, 2023