Source code for recommenders.datasets.movielens

# Copyright (c) Recommenders contributors.
# Licensed under the MIT License.

import os
import re
import random
import shutil
import warnings
import pandas as pd
from typing import Optional
from zipfile import ZipFile
from recommenders.datasets.download_utils import maybe_download, download_path
from recommenders.utils.notebook_utils import is_databricks
from recommenders.utils.constants import (

    from pyspark.sql.types import (
except ImportError:
    pass  # so the environment without spark doesn't break

import pandera as pa
import pandera.extensions as extensions
from pandera import Field
from pandera.typing import Series

class _DataFormat:
    def __init__(
        """MovieLens data format container as a different size of MovieLens data file
        has a different format

            sep (str): Rating data delimiter
            path (str): Rating data path within the original zip file
            has_header (bool): Whether the rating data contains a header line or not
            item_sep (str): Item data delimiter
            item_path (str): Item data path within the original zip file
            item_has_header (bool): Whether the item data contains a header line or not

        # Rating file
        self._sep = sep
        self._path = path
        self._has_header = has_header

        # Item file
        self._item_sep = item_sep
        self._item_path = item_path
        self._item_has_header = item_has_header

    def separator(self):
        return self._sep

    def path(self):
        return self._path

    def has_header(self):
        return self._has_header

    def item_separator(self):
        return self._item_sep

    def item_path(self):
        return self._item_path

    def item_has_header(self):
        return self._item_has_header

# 10m and 20m data do not have user data
    "100k": _DataFormat("\t", "ml-100k/", False, "|", "ml-100k/u.item", False),
    "1m": _DataFormat(
        "::", "ml-1m/ratings.dat", False, "::", "ml-1m/movies.dat", False
    "10m": _DataFormat(
        "::", "ml-10M100K/ratings.dat", False, "::", "ml-10M100K/movies.dat", False
    "20m": _DataFormat(",", "ml-20m/ratings.csv", True, ",", "ml-20m/movies.csv", True),

# Fake data for testing only
    "mock100": {"size": 100, "seed": 6},

# 100K data genres index to string mapper. For 1m, 10m, and 20m, the genres labels are already in the dataset.

# Warning and error messages
WARNING_MOVIE_LENS_HEADER = """MovieLens rating dataset has four columns
    (user id, movie id, rating, and timestamp), but more than four column names are provided.
    Will only use the first four column names."""
WARNING_HAVE_SCHEMA_AND_HEADER = """Both schema and header are provided.
    The header argument will be ignored."""
    "Invalid data size. Should be one of {100k, 1m, 10m, or 20m, or mock100}"
ERROR_HEADER = "Header error. At least user and movie column names should be provided"

[docs] def load_pandas_df( size="100k", header=None, local_cache_path=None, title_col=None, genres_col=None, year_col=None, ): """Loads the MovieLens dataset as pd.DataFrame. Download the dataset from, unzip, and load. To load movie information only, you can use load_item_df function. Args: size (str): Size of the data to load. One of ("100k", "1m", "10m", "20m", "mock100"). header (list or tuple or None): Rating dataset header. If `size` is set to any of 'MOCK_DATA_FORMAT', this parameter is ignored and data is rendered using the 'DEFAULT_HEADER' instead. local_cache_path (str): Path (directory or a zip file) to cache the downloaded zip file. If None, all the intermediate files will be stored in a temporary directory and removed after use. If `size` is set to any of 'MOCK_DATA_FORMAT', this parameter is ignored. title_col (str): Movie title column name. If None, the column will not be loaded. genres_col (str): Genres column name. Genres are '|' separated string. If None, the column will not be loaded. year_col (str): Movie release year column name. If None, the column will not be loaded. If `size` is set to any of 'MOCK_DATA_FORMAT', this parameter is ignored. Returns: pandas.DataFrame: Movie rating dataset. **Examples** .. code-block:: python # To load just user-id, item-id, and ratings from MovieLens-1M dataset, df = load_pandas_df('1m', ('UserId', 'ItemId', 'Rating')) # To load rating's timestamp together, df = load_pandas_df('1m', ('UserId', 'ItemId', 'Rating', 'Timestamp')) # To load movie's title, genres, and released year info along with the ratings data, df = load_pandas_df('1m', ('UserId', 'ItemId', 'Rating', 'Timestamp'), title_col='Title', genres_col='Genres', year_col='Year' ) """ size = size.lower() if size not in DATA_FORMAT and size not in MOCK_DATA_FORMAT: raise ValueError(f"Size: {size}. " + ERROR_MOVIE_LENS_SIZE) if header is None: header = DEFAULT_HEADER elif len(header) < 2: raise ValueError(ERROR_HEADER) elif len(header) > 4: warnings.warn(WARNING_MOVIE_LENS_HEADER) header = header[:4] if size in MOCK_DATA_FORMAT: # generate fake data return MockMovielensSchema.get_df( keep_first_n_cols=len(header), keep_title_col=(title_col is not None), keep_genre_col=(genres_col is not None), **MOCK_DATA_FORMAT[ size ], # supply the rest of the kwarg with the dictionary ) movie_col = header[1] with download_path(local_cache_path) as path: filepath = os.path.join(path, "ml-{}.zip".format(size)) datapath, item_datapath = _maybe_download_and_extract(size, filepath) # Load movie features such as title, genres, and release year item_df = _load_item_df( size, item_datapath, movie_col, title_col, genres_col, year_col ) # Load rating data df = pd.read_csv( datapath, sep=DATA_FORMAT[size].separator, engine="python", names=header, usecols=[*range(len(header))], header=0 if DATA_FORMAT[size].has_header else None, ) # Convert 'rating' type to float if len(header) > 2: df[header[2]] = df[header[2]].astype(float) # Merge rating df w/ item_df if item_df is not None: df = df.merge(item_df, on=header[1]) return df
[docs] def load_item_df( size="100k", local_cache_path=None, movie_col=DEFAULT_ITEM_COL, title_col=None, genres_col=None, year_col=None, ): """Loads Movie info. Args: size (str): Size of the data to load. One of ("100k", "1m", "10m", "20m"). local_cache_path (str): Path (directory or a zip file) to cache the downloaded zip file. If None, all the intermediate files will be stored in a temporary directory and removed after use. movie_col (str): Movie id column name. title_col (str): Movie title column name. If None, the column will not be loaded. genres_col (str): Genres column name. Genres are '|' separated string. If None, the column will not be loaded. year_col (str): Movie release year column name. If None, the column will not be loaded. Returns: pandas.DataFrame: Movie information data, such as title, genres, and release year. """ size = size.lower() if size not in DATA_FORMAT: raise ValueError(f"Size: {size}. " + ERROR_MOVIE_LENS_SIZE) with download_path(local_cache_path) as path: filepath = os.path.join(path, "ml-{}.zip".format(size)) _, item_datapath = _maybe_download_and_extract(size, filepath) item_df = _load_item_df( size, item_datapath, movie_col, title_col, genres_col, year_col ) return item_df
def _load_item_df(size, item_datapath, movie_col, title_col, genres_col, year_col): """Loads Movie info""" if title_col is None and genres_col is None and year_col is None: return None item_header = [movie_col] usecols = [0] # Year is parsed from title if title_col is not None or year_col is not None: item_header.append("title_year") usecols.append(1) genres_header_100k = None if genres_col is not None: # 100k data's movie genres are encoded as a binary array (the last 19 fields) # For details, see if size == "100k": genres_header_100k = [*(str(i) for i in range(19))] item_header.extend(genres_header_100k) usecols.extend([*range(5, 24)]) # genres columns else: item_header.append(genres_col) usecols.append(2) # genres column item_df = pd.read_csv( item_datapath, sep=DATA_FORMAT[size].item_separator, engine="python", names=item_header, usecols=usecols, header=0 if DATA_FORMAT[size].item_has_header else None, encoding="ISO-8859-1", ) # Convert 100k data's format: '0|0|1|...' to 'Action|Romance|..." if genres_header_100k is not None: item_df[genres_col] = item_df[genres_header_100k].values.tolist() item_df[genres_col] = item_df[genres_col].map( lambda l: "|".join([GENRES[i] for i, v in enumerate(l) if v == 1]) ) item_df.drop(genres_header_100k, axis=1, inplace=True) # Parse year from movie title. Note, MovieLens title format is "title (year)" # Note, there are very few records that are missing the year info. if year_col is not None: def parse_year(t): parsed = re.split("[()]", t) if len(parsed) > 2 and parsed[-2].isdecimal(): return parsed[-2] else: return None item_df[year_col] = item_df["title_year"].map(parse_year) if title_col is None: item_df.drop("title_year", axis=1, inplace=True) if title_col is not None: item_df.rename(columns={"title_year": title_col}, inplace=True) return item_df
[docs] def load_spark_df( spark, size="100k", header=None, schema=None, local_cache_path=None, dbutils=None, title_col=None, genres_col=None, year_col=None, ): """Loads the MovieLens dataset as `pyspark.sql.DataFrame`. Download the dataset from, unzip, and load as `pyspark.sql.DataFrame`. To load movie information only, you can use `load_item_df` function. Args: spark (pyspark.SparkSession): Spark session. size (str): Size of the data to load. One of ("100k", "1m", "10m", "20m", "mock100"). header (list or tuple): Rating dataset header. If `schema` is provided or `size` is set to any of 'MOCK_DATA_FORMAT', this argument is ignored. schema (pyspark.StructType): Dataset schema. If `size` is set to any of 'MOCK_DATA_FORMAT', data is rendered in the 'MockMovielensSchema' instead. local_cache_path (str): Path (directory or a zip file) to cache the downloaded zip file. If None, all the intermediate files will be stored in a temporary directory and removed after use. dbutils (Databricks.dbutils): Databricks utility object If `size` is set to any of 'MOCK_DATA_FORMAT', this parameter is ignored. title_col (str): Title column name. If None, the column will not be loaded. genres_col (str): Genres column name. Genres are '|' separated string. If None, the column will not be loaded. year_col (str): Movie release year column name. If None, the column will not be loaded. If `size` is set to any of 'MOCK_DATA_FORMAT', this parameter is ignored. Returns: pyspark.sql.DataFrame: Movie rating dataset. **Examples** .. code-block:: python # To load just user-id, item-id, and ratings from MovieLens-1M dataset: spark_df = load_spark_df(spark, '1m', ('UserId', 'ItemId', 'Rating')) # The schema can be defined as well: schema = StructType([ StructField(DEFAULT_USER_COL, IntegerType()), StructField(DEFAULT_ITEM_COL, IntegerType()), StructField(DEFAULT_RATING_COL, FloatType()), StructField(DEFAULT_TIMESTAMP_COL, LongType()), ]) spark_df = load_spark_df(spark, '1m', ('UserId', 'ItemId', 'Rating'), schema=schema) # To load rating's timestamp together: spark_df = load_spark_df(spark, '1m', ('UserId', 'ItemId', 'Rating', 'Timestamp')) # To load movie's title, genres, and released year info along with the ratings data: spark_df = load_spark_df(spark, '1m', ('UserId', 'ItemId', 'Rating', 'Timestamp'), title_col='Title', genres_col='Genres', year_col='Year' ) # On DataBricks, pass the dbutils argument as follows: spark_df = load_spark_df(spark, dbutils=dbutils) """ size = size.lower() if size not in DATA_FORMAT and size not in MOCK_DATA_FORMAT: raise ValueError(f"Size: {size}. " + ERROR_MOVIE_LENS_SIZE) if size in MOCK_DATA_FORMAT: # generate fake data return MockMovielensSchema.get_spark_df( spark, keep_title_col=(title_col is not None), keep_genre_col=(genres_col is not None), **MOCK_DATA_FORMAT[ size ], # supply the rest of the kwarg with the dictionary ) schema = _get_schema(header, schema) if len(schema) < 2: raise ValueError(ERROR_HEADER) movie_col = schema[1].name with download_path(local_cache_path) as path: filepath = os.path.join(path, "ml-{}.zip".format(size)) datapath, item_datapath = _maybe_download_and_extract(size, filepath) spark_datapath = "file:///" + datapath # shorten form of file://localhost/ # Load movie features such as title, genres, and release year. # Since the file size is small, we directly load as pd.DataFrame from the driver node # and then convert into pyspark.sql.DataFrame item_pd_df = _load_item_df( size, item_datapath, movie_col, title_col, genres_col, year_col ) item_df = spark.createDataFrame(item_pd_df) if item_pd_df is not None else None if is_databricks(): if dbutils is None: raise ValueError( """ To use on a Databricks, dbutils object should be passed as an argument. E.g. load_spark_df(spark, dbutils=dbutils) """ ) # Move rating file to DBFS in order to load into pyspark.sql.DataFrame dbfs_datapath = "dbfs:/tmp/" + datapath, dbfs_datapath) spark_datapath = dbfs_datapath # pyspark's read csv currently doesn't support multi-character delimiter, thus we manually handle that separator = DATA_FORMAT[size].separator if len(separator) > 1: raw_data = spark.sparkContext.textFile(spark_datapath) data_rdd = l: l.split(separator)).map( lambda c: [int(c[0]), int(c[1]), float(c[2]), int(c[3])][: len(schema)] ) df = spark.createDataFrame(data_rdd, schema) else: df = spark_datapath, schema=schema, sep=separator, header=DATA_FORMAT[size].has_header, ) # Merge rating df w/ item_df if item_df is not None: df = df.join(item_df, movie_col, "left") # Cache and force trigger action since data-file might be removed. df.cache() df.count() return df
def _get_schema(header, schema): if schema is None or len(schema) == 0: # Use header to generate schema if header is None or len(header) == 0: header = DEFAULT_HEADER elif len(header) > 4: warnings.warn(WARNING_MOVIE_LENS_HEADER) header = header[:4] schema = StructType() try: ( schema.add(StructField(header[0], IntegerType())) .add(StructField(header[1], IntegerType())) .add(StructField(header[2], FloatType())) .add(StructField(header[3], LongType())) ) except IndexError: pass else: if header is not None: warnings.warn(WARNING_HAVE_SCHEMA_AND_HEADER) if len(schema) > 4: warnings.warn(WARNING_MOVIE_LENS_HEADER) schema = schema[:4] return schema def _maybe_download_and_extract(size, dest_path): """Downloads and extracts MovieLens rating and item datafiles if they don’t already exist""" dirs, _ = os.path.split(dest_path) if not os.path.exists(dirs): os.makedirs(dirs) _, rating_filename = os.path.split(DATA_FORMAT[size].path) rating_path = os.path.join(dirs, rating_filename) _, item_filename = os.path.split(DATA_FORMAT[size].item_path) item_path = os.path.join(dirs, item_filename) if not os.path.exists(rating_path) or not os.path.exists(item_path): download_movielens(size, dest_path) extract_movielens(size, rating_path, item_path, dest_path) return rating_path, item_path
[docs] def download_movielens(size, dest_path): """Downloads MovieLens datafile. Args: size (str): Size of the data to load. One of ("100k", "1m", "10m", "20m"). dest_path (str): File path for the downloaded file """ if size not in DATA_FORMAT: raise ValueError(f"Size: {size}. " + ERROR_MOVIE_LENS_SIZE) url = "" + size + ".zip" dirs, file = os.path.split(dest_path) maybe_download(url, file, work_directory=dirs)
[docs] def extract_movielens(size, rating_path, item_path, zip_path): """Extract MovieLens rating and item datafiles from the MovieLens raw zip file. To extract all files instead of just rating and item datafiles, use ZipFile's extractall(path) instead. Args: size (str): Size of the data to load. One of ("100k", "1m", "10m", "20m"). rating_path (str): Destination path for rating datafile item_path (str): Destination path for item datafile zip_path (str): zipfile path """ with ZipFile(zip_path, "r") as z: with[size].path) as zf, open(rating_path, "wb") as f: shutil.copyfileobj(zf, f) with[size].item_path) as zf, open(item_path, "wb") as f: shutil.copyfileobj(zf, f)
# For more information on data synthesis, see @extensions.register_check_method(statistics=["columns"], supported_types=pd.DataFrame) def unique_columns(df, *, columns): return not df[columns].duplicated().any()
[docs] class MockMovielensSchema(pa.SchemaModel): """ Mock dataset schema to generate fake data for testing purpose. This schema is configured to mimic the Movielens dataset Dataset schema and generation is configured using pandera. Please see for more information. """ # Some notebooks will do a cross join with userID and itemID, # a sparse range for these IDs can slow down the notebook tests userID: Series[int] = Field( in_range={"min_value": 1, "max_value": 50}, alias=DEFAULT_USER_COL ) itemID: Series[int] = Field( in_range={"min_value": 1, "max_value": 50}, alias=DEFAULT_ITEM_COL ) rating: Series[float] = Field( in_range={"min_value": 1, "max_value": 5}, alias=DEFAULT_RATING_COL ) timestamp: Series[int] = Field( in_range={"min_value": 0, "max_value": 1e9}, alias=DEFAULT_TIMESTAMP_COL ) title: Series[str] = Field(eq="foo", alias=DEFAULT_TITLE_COL) genre: Series[str] = Field(eq="genreA|0", alias=DEFAULT_GENRE_COL)
[docs] @classmethod def get_df( cls, size: int = 3, seed: int = 100, keep_first_n_cols: Optional[int] = None, keep_title_col: bool = False, keep_genre_col: bool = False, ) -> pd.DataFrame: """Return fake movielens dataset as a Pandas Dataframe with specified rows. Args: size (int): number of rows to generate seed (int, optional): seeding the pseudo-number generation. Defaults to 100. keep_first_n_cols (int, optional): keep the first n default movielens columns. keep_title_col (bool): remove the title column if False. Defaults to True. keep_genre_col (bool): remove the genre column if False. Defaults to True. Returns: pandas.DataFrame: a mock dataset """ schema = cls.to_schema() if keep_first_n_cols is not None: if keep_first_n_cols < 1 or keep_first_n_cols > len(DEFAULT_HEADER): raise ValueError( f"Invalid value for 'keep_first_n_cols': {keep_first_n_cols}. Valid range: [1-{len(DEFAULT_HEADER)}]" ) schema = schema.remove_columns(DEFAULT_HEADER[keep_first_n_cols:]) if not keep_title_col: schema = schema.remove_columns([DEFAULT_TITLE_COL]) if not keep_genre_col: schema = schema.remove_columns([DEFAULT_GENRE_COL]) random.seed(seed) schema.checks = [pa.Check.unique_columns([DEFAULT_USER_COL, DEFAULT_ITEM_COL])] return schema.example(size=size)
[docs] @classmethod def get_spark_df( cls, spark, size: int = 3, seed: int = 100, keep_title_col: bool = False, keep_genre_col: bool = False, tmp_path: Optional[str] = None, ): """Return fake movielens dataset as a Spark Dataframe with specified rows Args: spark (SparkSession): spark session to load the dataframe into size (int): number of rows to generate seed (int): seeding the pseudo-number generation. Defaults to 100. keep_title_col (bool): remove the title column if False. Defaults to False. keep_genre_col (bool): remove the genre column if False. Defaults to False. tmp_path (str, optional): path to store files for serialization purpose when transferring data from python to java. If None, a temporal path is used instead Returns: pyspark.sql.DataFrame: a mock dataset """ pandas_df = cls.get_df( size=size, seed=seed, keep_title_col=True, keep_genre_col=True ) # generate temp folder with download_path(tmp_path) as tmp_folder: filepath = os.path.join(tmp_folder, f"mock_movielens_{size}.csv") # serialize the pandas.df as a csv to avoid the expensive java <-> python communication pandas_df.to_csv(filepath, header=False, index=False) spark_df = filepath, schema=cls._get_spark_deserialization_schema() ) # Cache and force trigger action since data-file might be removed. spark_df.cache() spark_df.count() if not keep_title_col: spark_df = spark_df.drop(DEFAULT_TITLE_COL) if not keep_genre_col: spark_df = spark_df.drop(DEFAULT_GENRE_COL) return spark_df
@classmethod def _get_spark_deserialization_schema(cls): return StructType( [ StructField(DEFAULT_USER_COL, IntegerType()), StructField(DEFAULT_ITEM_COL, IntegerType()), StructField(DEFAULT_RATING_COL, FloatType()), StructField(DEFAULT_TIMESTAMP_COL, StringType()), StructField(DEFAULT_TITLE_COL, StringType()), StructField(DEFAULT_GENRE_COL, StringType()), ] )