Predictive Hacks

PySpark: Logistic Regression with TF-IDF on N-Grams

This post is about how to run a classification algorithm and more specifically a logistic regression of a “Ham or Spam” Subject Line Email classification problem using as features the tf-idf of uni-grams, bi-grams and tri-grams. We can easily apply any classification, like Random Forest, Support Vector Machines etc. Finally, the categorical dependent variable can be anything, in this example, it was “Ham or Spam”, it could be “Promotion or Non-Promotional” and it could also multi-class instead of binary

Let’s assume that our data are called “TrainData.csv”. The first thing that we have to do is to load the required libraries.

file_path = "/user/folder/TrainData.csv"

from pyspark.sql.functions import *
from import NGram, VectorAssembler
from import CountVectorizer

from import HashingTF, IDF, Tokenizer
from import StringIndexer
from import Pipeline

from import LogisticRegression
from import BinaryClassificationEvaluator

At this step, we are going to convert the subject line to the lower case and also it is convenient to rename our dependent variable to target. Notice that our .csv file consists of two columns, the subject , which is the email subject line, and the label which takes values 0,1.

data =, header=True, inferSchema =True )
# convert the subject to lower case
data = data.withColumn('subject', lower(col('subject')))
data = data.withColumnRenamed("label", "target")
|             subject|target|
|the tickets are n...|     0|
|we can't wait for...|     1|
|we're open: join ...|     1|
|actively looking ...|     0|
|detroit tigers, c...|     0|
only showing top 5 rows

At this step, we are going to build the pipeline, which tokenizes the text, then it does the count vectorizing taking as input the tokens, then it does the tf-idf taking as input the count vectorizing, then it takes the tf-idf and and converts it to a VectorAssembler, then it converts the target column to categorical and finally it runs the logistic regression model.

def build_trigrams(inputCol=["subject","target"], n=3):
    tokenizer = [Tokenizer(inputCol="subject", outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)

    cv = [
        for i in range(1, n + 1)
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
    label_stringIdx = [StringIndexer(inputCol = "target", outputCol = "label")]
    lr = [LogisticRegression(maxIter=100 )]  # regParam=0.3, elasticNetParam=0.8
    return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx+lr)

At this step, we can apply fit to our pipeline function called “build_trigrams” and we can also apply transform for predicting the model.

trigram_pipelineFit = build_trigrams().fit(data)
predictions = trigram_pipelineFit.transform(data)
 |-- subject: string (nullable = true)
 |-- target: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- 1_grams: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- 2_grams: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- 3_grams: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- 1_tf: vector (nullable = true)
 |-- 2_tf: vector (nullable = true)
 |-- 3_tf: vector (nullable = true)
 |-- 1_tfidf: vector (nullable = true)
 |-- 2_tfidf: vector (nullable = true)
 |-- 3_tfidf: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- label: double (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)
# show the top five actual and predicted values"prediction"), col('label')).show(5)
|       0.0|  0.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       0.0|  0.0|
only showing top 5 rows
# evaluate the performance of the model 

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(predictions.count())
roc_auc = evaluator.evaluate(predictions)
# print accuracy, roc_auc
print "Accuracy Score: {0:.4f}".format(accuracy)
print "ROC-AUC: {0:.4f}".format(roc_auc)
Accuracy Score: 0.9964
ROC-AUC: 0.9865

Share This Post

Share on facebook
Share on linkedin
Share on twitter
Share on email

Leave a Comment

Subscribe To Our Newsletter

Get updates and learn from the best

More To Explore

data science journey

My Journey as a Data Science Blogger

Μy Background My Studies Back in 2001, I entered university to study Statistics. During my first year, I ran my