Jupyter Notebook
The following instructions show how to launch a Jupyter Notebook Server image preconfigured with InAccel acceleration layer.
Resources:
How To Install "InAccel Studio"-
Run the docker image:
docker run --name "inaccel-jupyter" \
-h jupyter -p 8888:8888 \
--restart=unless-stopped \
--runtime=inaccel \
-v /path/to/data:/inaccel/data \
-it "inaccel/jupyter:lab"
Enter in your web browser the following address: http://localhost:8888.
Example Notebook-
MLlib Pipelines and Structured Streaming-
This notebook shows how to train an MLlib pipeline on historic data and apply it to streaming data.
You can read more about Pipeline API in the MLlib Programming Guide.
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf=conf).appName("LogisticRegressionExample").getOrCreate()
For this example we'll be using the Extended Modified NIST (National Institute of Standards and Technology) Letters dataset which is a set of handwritten letters derived from the NIST Special Database 19 and converted to a 28x28 pixel grayscale image format. The dataset merges a balanced set of the uppercase and lowercase letters into a single 26-class task. Further information on the dataset contents and conversion process can be found in the paper available at https://arxiv.org/abs/1702.05373v1.
Begin by reading the train data and inspecting the schema.
The data is stored in the popular LibSVM format, so the file can be loaded using MLlib's dataset reader utility.
train = spark.read.format("libsvm").option("numFeatures", 784).load("file:///inaccel/data/nist/letters_libsvm_train.dat")
train.printSchema()
Every LibSVM dataset contains exactly 2 columns:
- label: 0 to 25. Each label represents a different letter.
- features: The total 784 pixels of the grayscale handwritten letter image.
We want to build a model that will predict the label
using the features
data. We'll do this by using a pipeline with 4 stages.
- A
StringIndexer
to tell the algorithm that the labels are categories 0-25, rather than continuous values. - A
StandardScaler
to normalize each feature to have unit standard deviation and zero mean. - A
LogisticRegression
to perform Elastic-Net Regularization with Logistic Regression algorithm and learn how to predict labels of new examples. - An
IndexToString
to convert the prediction indices back to the original labels as strings.
Let's start by creating the objects that represent these stages.
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import IndexToString, StandardScaler, StringIndexer
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(train)
featuresScaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withMean=True, withStd=True).fit(train)
lr = LogisticRegression(labelCol="indexedLabel", featuresCol="scaledFeatures")
indexToLabel = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)
Logistic Regression
Logistic Regression (LR) training takes several hyperparameters which can affect the accuracy of the learned model. There is no one "best" setting for these for all datasets. To get the optimal accuracy, we need to tune these hyperparameters based on our data.
We will do this tuning using Spark's Cross Validation framework, which automatically tests a grid of hyperparameters and chooses the best.
We wrap the Logistic Regression model training stage within a CrossValidator
stage. A cross validator knows how to call the LR algorithm with different hyperparameter (e.g. elasticNetParam
) settings. It will train multiple models and choose the best one, based on some metric. In this example, our metric is accuracy
.
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", metricName="accuracy")
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = ParamGridBuilder().addGrid(lr.elasticNetParam, [0.1, 0.3, 0.6, 0.9]).build()
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
Now we're ready to build a pipeline and fit it. This puts the data through all of the feature processing, model tuning & training we described in a single call.
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[labelIndexer, featuresScaler, cv, indexToLabel])
%time model = pipeline.fit(train)
The next cell converts the test set from LibSVM to Parquet memory format, in order to serve as our streaming source.
if not os.path.exists("/inaccel/data/nist/letters_parquet_test.dat"):
test = spark.read.format("libsvm").option("numFeatures", 784).load("file:///inaccel/data/nist/letters_libsvm_test.dat")
test.write.format("parquet").save("file:///inaccel/data/nist/letters_parquet_test.dat")
Now we'll use a stream to do some validation of our model. Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets.
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import DoubleType
from pyspark.ml.linalg import VectorUDT
schema = StructType([StructField("label", DoubleType(), True), StructField("features", VectorUDT(), True)])
test = spark.readStream.schema(schema).format("parquet").load("file:///inaccel/data/nist/letters_parquet_test.dat")
To validate the model we'll write the predicted labels to a table
, but because we're using Spark Structured Streaming this table will update in real time as more data is read from the stream.
results = model.transform(test)
query = results.writeStream.queryName("table").format("memory").start()
The following SQL query calculates the accuracy of the predictions, comparing the label
to the predictedLabel
from the table.
spark.sql("SELECT (COUNT(*) / (SELECT COUNT(*) FROM table)) AS accuracy FROM table WHERE label = predictedLabel").show()
Quit.
query.stop()
spark.stop()