This post is about how to run a classification algorithm and more specifically a logistic regression of a “Ham or Spam” Subject Line Email classification problem using as features the tf-idf of uni-grams, bi-grams and tri-grams. We can easily apply any classification, like Random Forest, Support Vector Machines etc. Finally, the categorical dependent variable can be anything, in this example, it was “Ham or Spam”, it could be “Promotion or Non-Promotional” and it could also multi-class instead of binary
Let’s assume that our data are called “TrainData.csv”. The first thing that we have to do is to load the required libraries.
file_path = "/user/folder/TrainData.csv" from pyspark.sql.functions import * from pyspark.ml.feature import NGram, VectorAssembler from pyspark.ml.feature import CountVectorizer from pyspark.ml.feature import HashingTF, IDF, Tokenizer from pyspark.ml.feature import StringIndexer from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.ml.evaluation import BinaryClassificationEvaluator
At this step, we are going to convert the subject line to the lower case and also it is convenient to rename our dependent variable to target
. Notice that our .csv file consists of two columns, the subject
, which is the email subject line, and the label
which takes values 0,1.
data = spark.read.csv(file_path, header=True, inferSchema =True ) # convert the subject to lower case data = data.withColumn('subject', lower(col('subject'))) # https://stackoverflow.com/questions/34077353/how-to-change-dataframe-column-names-in-pyspark data = data.withColumnRenamed("label", "target")
data.show(5)
+--------------------+------+
| subject|target|
+--------------------+------+
|the tickets are n...| 0|
|we can't wait for...| 1|
|we're open: join ...| 1|
|actively looking ...| 0|
|detroit tigers, c...| 0|
+--------------------+------+
only showing top 5 rows
At this step, we are going to build the pipeline, which tokenizes the text, then it does the count vectorizing taking as input the tokens, then it does the tf-idf taking as input the count vectorizing, then it takes the tf-idf and and converts it to a VectorAssembler, then it converts the target
column to categorical and finally it runs the logistic regression model.
def build_trigrams(inputCol=["subject","target"], n=3): tokenizer = [Tokenizer(inputCol="subject", outputCol="words")] ngrams = [ NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i)) for i in range(1, n + 1) ] cv = [ CountVectorizer(vocabSize=2**14,inputCol="{0}_grams".format(i), outputCol="{0}_tf".format(i)) for i in range(1, n + 1) ] idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)] assembler = [VectorAssembler( inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)], outputCol="features" )] label_stringIdx = [StringIndexer(inputCol = "target", outputCol = "label")] lr = [LogisticRegression(maxIter=100 )] # regParam=0.3, elasticNetParam=0.8 return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx+lr)
At this step, we can apply fit
to our pipeline function called “build_trigrams” and we can also apply transform
for predicting the model.
trigram_pipelineFit = build_trigrams().fit(data) predictions = trigram_pipelineFit.transform(data) predictions.printSchema()
root
|-- subject: string (nullable = true)
|-- target: string (nullable = true)
|-- words: array (nullable = true)
| |-- element: string (containsNull = true)
|-- 1_grams: array (nullable = true)
| |-- element: string (containsNull = false)
|-- 2_grams: array (nullable = true)
| |-- element: string (containsNull = false)
|-- 3_grams: array (nullable = true)
| |-- element: string (containsNull = false)
|-- 1_tf: vector (nullable = true)
|-- 2_tf: vector (nullable = true)
|-- 3_tf: vector (nullable = true)
|-- 1_tfidf: vector (nullable = true)
|-- 2_tfidf: vector (nullable = true)
|-- 3_tfidf: vector (nullable = true)
|-- features: vector (nullable = true)
|-- label: double (nullable = true)
|-- rawPrediction: vector (nullable = true)
|-- probability: vector (nullable = true)
|-- prediction: double (nullable = true)
# show the top five actual and predicted values predictions.select(col("prediction"), col('label')).show(5)
+----------+-----+
|prediction|label|
+----------+-----+
| 0.0| 0.0|
| 1.0| 1.0|
| 1.0| 1.0|
| 0.0| 0.0|
| 0.0| 0.0|
+----------+-----+
only showing top 5 rows
# evaluate the performance of the model evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction") evaluator.evaluate(predictions) accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(predictions.count()) roc_auc = evaluator.evaluate(predictions)
# print accuracy, roc_auc print "Accuracy Score: {0:.4f}".format(accuracy) print "ROC-AUC: {0:.4f}".format(roc_auc)
Accuracy Score: 0.9964
ROC-AUC: 0.9865