Source code for recommenders.models.vowpal_wabbit.vw

# Copyright (c) Recommenders contributors.
# Licensed under the MIT License.

"""
This file provides a wrapper to run Vowpal Wabbit from the command line through python.
It is not recommended to use this approach in production, there are python bindings that can be installed from the
repository or pip or the command line can be used. This is merely to demonstrate vw usage in the example notebooks.
"""

import os
from subprocess import run
from tempfile import TemporaryDirectory
import pandas as pd

from recommenders.utils.constants import (
    DEFAULT_USER_COL,
    DEFAULT_ITEM_COL,
    DEFAULT_RATING_COL,
    DEFAULT_TIMESTAMP_COL,
    DEFAULT_PREDICTION_COL,
)


[docs] class VW: """Vowpal Wabbit Class""" def __init__( self, col_user=DEFAULT_USER_COL, col_item=DEFAULT_ITEM_COL, col_rating=DEFAULT_RATING_COL, col_timestamp=DEFAULT_TIMESTAMP_COL, col_prediction=DEFAULT_PREDICTION_COL, **kwargs, ): """Initialize model parameters Args: col_user (str): user column name col_item (str): item column name col_rating (str): rating column name col_timestamp (str): timestamp column name col_prediction (str): prediction column name """ # create temporary files self.tempdir = TemporaryDirectory() self.train_file = os.path.join(self.tempdir.name, "train.dat") self.test_file = os.path.join(self.tempdir.name, "test.dat") self.model_file = os.path.join(self.tempdir.name, "vw.model") self.prediction_file = os.path.join(self.tempdir.name, "prediction.dat") # set DataFrame columns self.col_user = col_user self.col_item = col_item self.col_rating = col_rating self.col_timestamp = col_timestamp self.col_prediction = col_prediction self.logistic = "logistic" in kwargs.values() self.train_cmd = self.parse_train_params(params=kwargs) self.test_cmd = self.parse_test_params(params=kwargs)
[docs] @staticmethod def to_vw_cmd(params): """Convert dictionary of parameters to vw command line. Args: params (dict): key = parameter, value = value (use True if parameter is just a flag) Returns: list[str]: vw command line parameters as list of strings """ cmd = ["vw"] for k, v in params.items(): if v is False: # don't add parameters with a value == False continue # add the correct hyphen to the parameter cmd.append(f"-{k}" if len(k) == 1 else f"--{k}") if v is not True: # don't add an argument for parameters with value == True cmd.append("{}".format(v)) return cmd
[docs] def parse_train_params(self, params): """Parse input hyper-parameters to build vw train commands Args: params (dict): key = parameter, value = value (use True if parameter is just a flag) Returns: list[str]: vw command line parameters as list of strings """ # make a copy of the original hyper parameters train_params = params.copy() # remove options that are handled internally, not supported, or test only parameters invalid = [ "data", "final_regressor", "invert_hash", "readable_model", "t", "testonly", "i", "initial_regressor", "link", ] for option in invalid: if option in train_params: del train_params[option] train_params.update( { "d": self.train_file, "f": self.model_file, "quiet": params.get("quiet", True), } ) return self.to_vw_cmd(params=train_params)
[docs] def parse_test_params(self, params): """Parse input hyper-parameters to build vw test commands Args: params (dict): key = parameter, value = value (use True if parameter is just a flag) Returns: list[str]: vw command line parameters as list of strings """ # make a copy of the original hyper parameters test_params = params.copy() # remove options that are handled internally, ot supported or train only parameters invalid = [ "data", "f", "final_regressor", "initial_regressor", "test_only", "invert_hash", "readable_model", "b", "bit_precision", "holdout_off", "c", "cache", "k", "kill_cache", "l", "learning_rate", "l1", "l2", "initial_t", "power_t", "decay_learning_rate", "q", "quadratic", "cubic", "i", "interactions", "rank", "lrq", "lrqdropout", "oaa", ] for option in invalid: if option in test_params: del test_params[option] test_params.update( { "d": self.test_file, "i": self.model_file, "quiet": params.get("quiet", True), "p": self.prediction_file, "t": True, } ) return self.to_vw_cmd(params=test_params)
[docs] def to_vw_file(self, df, train=True): """Convert Pandas DataFrame to vw input format file Args: df (pandas.DataFrame): input DataFrame train (bool): flag for train mode (or test mode if False) """ output = self.train_file if train else self.test_file with open(output, "w") as f: # extract columns and create a new dataframe tmp = df[[self.col_rating, self.col_user, self.col_item]].reset_index() if train: # we need to reset the rating type to an integer to simplify the vw formatting tmp[self.col_rating] = tmp[self.col_rating].astype("int64") # convert rating to binary value if self.logistic: max_value = tmp[self.col_rating].max() tmp[self.col_rating] = tmp[self.col_rating].apply( lambda x: 2 * round(x / max_value) - 1 ) else: tmp[self.col_rating] = "" # convert each row to VW input format (https://github.com/VowpalWabbit/vowpal_wabbit/wiki/Input-format) # [label] [tag]|[user namespace] [user id feature] |[item namespace] [movie id feature] # label is the true rating, tag is a unique id for the example just used to link predictions to truth # user and item namespaces separate features to support interaction features through command line options for _, row in tmp.iterrows(): f.write( "{rating} {index}|user {userID} |item {itemID}\n".format( rating=row[self.col_rating], index=row["index"], userID=row[self.col_user], itemID=row[self.col_item], ) )
[docs] def fit(self, df): """Train model Args: df (pandas.DataFrame): input training data """ # write dataframe to disk in vw format self.to_vw_file(df=df) # train model run(self.train_cmd, check=True)
[docs] def predict(self, df): """Predict results Args: df (pandas.DataFrame): input test data """ # write dataframe to disk in vw format self.to_vw_file(df=df, train=False) # generate predictions run(self.test_cmd, check=True) # read predictions return df.join( pd.read_csv( self.prediction_file, delim_whitespace=True, names=[self.col_prediction], index_col=1, ) )
def __del__(self): self.tempdir.cleanup()