11 KiB
PySpark auf dem HPC Cluster
In diesem Beispiel verwenden wir Magpie um einen Sparkcluster innerhalb des Lithium HPC Clusters aufzubauen.
Zielpublikum
Dieses Dokument ist für fortgeschrittene Anwender gedacht. Du solltest daher mit den Grundsätzen unseres HPC Clusters vertraut sein (siehe Cluster Howto) und Übung im Umgang mit der Linux Konsole und SSH haben.
Diese Anleitung ist eine ausgedeutsche und auf unsere Umgebung zugeschnittene Version der offiziellen Magpie Dokumentation
Grundsätzliches Vorgehen
- Installation der Softwarekomponenten für Spark
- Konfiguration dieser Softwarekomponenten
- Mit Slurm ein Magpie Batchskript aufgeben
- Zugriff auf den Cluster mit SSH
Installation und Konfiguration von Spark
Klonen der neusten Magpie Skripte ab Github:
mkdir spark && cd spark
git clone https://github.com/LLNL/magpie.git
cd magpie/misc/
vim magpie-download-and-setup.sh
Passe im File magpie-download-and-setup.sh die folgenden Parameter an und lösche "#" bei Bedarf vor diesen Einträgen:
SPARK_DOWNLOAD="Y"
INSTALL_PATH="$HOME/spark"
PRESET_LAUNCH_SCRIPT_CONFIGS="Y"
LOCAL_DIR_PATH="/tmp/$USER"
NETWORKFS_DIR_PATH="/scratch/$USER"
Danach das Skript mit dem Befehl ./magpie-download-and-setup.sh ausführen. Während des eher gemächlichen Downloads von Spark kann die Zeit genutzt werden, um herauszufinden welche Version von Python und Java die verwendete Spark Version (in unserem Fall 3.5.0) voraussetzt. Während des Downloads wird die Sparkversion angezeigt.
Diese Informationen können wir am Zuverlässigsten aus der offiziellen Spark Webseite entnehmen. Im Falle von Spark 3.5.0 ist diese Information unter https://spark.apache.org/docs/3.5.0/ zu finden. Daher können wir folgende Prerequisits notieren:
- Java 17
- Python 3.8 und neuer
Diese Softwarepakete installieren wir mit Miniconda - Lizenzvereinbarungen akzeptieren und Installationsorte übernehmen:
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
bash Miniconda3-latest-Linux-x86_64.sh
rm Miniconda3-latest-Linux-x86_64.sh
Nach Abschluss der Installation die Datei .bashrc neu laden
. ~/.bashrc
Jetzt können wir eine Conda Environment anlegen und die entsprechenden Versionen von Java und Python installieren. Ìn unserem Beispiel mit Spark 3.3.2 sieht das so aus:
conda create -y -n spark
conda activate spark
conda install -y python=3.11.4
conda install -y -c conda-forge openjdk=17.0.8
Danach mit den Befehlen (innerhalb der conda env)
type -p python
dirname $(dirname $(readlink -f $(type -p java)))
die jeweiligen Speicherorte des Python und Java executable anzeigen lassen und notieren. Die Pfade sollten die Verzeichniselement miniconda3/envs enthalten. Nachdem das Skript ./magpie-download-and-setup.sh seine Arbeit abgeschlossen hat, wechseln wir mit folgenden Befehlen zurück ins Verzeichnis ~/spark und führen "magpie.sbatch-srun-spark" aus:
cd $HOME/spark
cp magpie/submission-scripts/script-sbatch-srun/magpie.sbatch-srun-spark .
Im Submission-Skript magpie.sbatch-srun-spark passen wir nun die folgenden Zeilen an; je nach benötigten Resourcen, können die Parameter zu den SBATCH Befehlen individuell angepasst werden.
Achtung: die Rautenzeichen müssen bestehen bleiben.
#SBATCH --nodes=5
#SBATCH --time=180:00
#SBATCH --job-name="spark-test"
#SBATCH --partition=staff ODER #SBATCH --partition=students
export MAGPIE_JOB_TYPE="interactive"
export JAVA_HOME="<gemäss des Outputs mit type -a java, siehe oben>"
export MAGPIE_PYTHON="<gemäss des Outputs mit type -a python, siehe oben>"
export SPARK_LOCAL_SCRATCH_DIR="/tmp/${USER}/sparkscratch/"
Als nächstes können wir nun das Magpie Skript als Slurmjob an den HPC Cluster übergeben. Zuerst überprüfen wir nochmals, ob das Cluster Lithium online ist und Ressourcen frei sind:
sinfo --all
Falls genügend Nodes (in unserem Beispiel fünf) im Status 'Idle' erscheinen, können wir sofern wir uns noch im Verzeichnis "/home/-user-@edu.local/spark" befinden, den Slurmjob mit dem Befehl
sbatch -k magpie.sbatch-srun-spark
dem Slurm Scheduler übergeben. Mit dem Befehl squeue können wir den Status unseres Batchjobs in Erfahrung bringen. Idealerweise ist dieser auf 'R' (running) oder 'PD' (pending).
Ob unser Sparkcluster bereits gestartet ist, erfahren wir im Slurmjob Logfile im Verzeichnis spark. Zum Beispiel mit folgendem Befehl mit der Nummer des Bashjobs für *:
cat $HOME/spark/slurm-*.out
Im Erfolgsfalle sollten unter anderem die untenstehenden Angaben im Logfile angezeigt werden:
.
.
.
* ssh computenode1
* export JAVA_HOME="$HOME/miniconda3/envs/spark/bin"
* export SPARK_HOME="$HOME/spark/spark-3.4.1-bin-hadoop3"
* export SPARK_CONF_DIR="/tmp/$USER/spark/spark-test/*/spark/conf"
*
* Then you can do as you please. For example to run a job:
*
* $SPARK_HOME/bin/spark-submit --class <class> <jar>
*
*******************************************************
*******************************************************
* Executing Pre Run Scripts
*******************************************************
*******************************************************
* Entering Magpie interactive mode
*******************************************************
*******************************************************
* Run
*
* ssh computenode1 kill -s 10 6636
*
* to exit 'interactive' mode early.
*******************************************************
In diesem Logfile interessiert uns hauptsächlich wie wir nun auf die PySpark Shell zugreifen können. Wie in der slurm-*. out Datei erwähnt, können wir uns nun mit den folgenden Befehlen aus dem Logfile einloggen:
ssh computenode1
export JAVA_HOME="$HOME/miniconda3/envs/spark/bin"
export SPARK_HOME="$HOME/spark/spark-3.4.1-bin-hadoop3"
export SPARK_CONF_DIR="/tmp/$USER/spark/spark-test/*/spark/conf"
conda activate spark
Danach kann ein beliebiger PySpark Befehl ausgeführt werden. Zum Beispiel eine Sparkshell:
$SPARK_HOME/bin/pyspark
Installation von SparkNLP auf PySpark
Wie auch andere Python-Bibliotheken kann beispielsweise auch die auf Spark-basierende NLP-Library SparkNLP über die aufgesetzte virtuelle Umgebung "spark" via pip oder conda installiert werden. Für SparkNLP werden als Ergänzung der Miniconda-Basisumgebung noch folgende Bibliotheken benötigt:
- NumPy
Nach der Installation von Numpy kann nun auch SparkNLP in der virtuellen Umgebung "spark" mit folgendem Befehl installiert werden:
pip install spark-nlp
SparkNLP mit einem vortrainierten Modell
Starten des Batchjobs:
sbatch -k magpie.sbatch-srun-spark
Danach folgt der übliche Ablauf um Pyspark zu starten, jedoch müssen wir gleichzeitig die jar-packages von SparkNLP beim Aufstarten von Pyspark laden: Achtung: Die Version des Package muss mit der installierten SparkNLP Version übereinstimmen. (hier SparkNLP 5.1.1)
ssh computenode1
export JAVA_HOME="$HOME/miniconda3/envs/spark/bin"
export SPARK_HOME="$HOME/spark/spark-3.4.1-bin-hadoop3"
export SPARK_CONF_DIR="/tmp/$USER/spark/spark-test/*/spark/conf"
conda activate spark
$SPARK_HOME/bin/pyspark --packages com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.1
Beispiel
Nach erfolgreichem Start von SparkNLP und der Sparkshell können wir ein vortrainiertes Modell zu einem Named Entity Recognition-Task mit folgendem Beispielcode verwenden:
import json
import numpy as np
import sparknlp
import pyspark.sql.functions as F
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from sparknlp.annotator import *
from sparknlp.base import *
from sparknlp.pretrained import PretrainedPipeline
from pyspark.sql.types import StringType, IntegerType
# If you change the model, re-run all the cells below
# Other applicable models: ner_dl, ner_dl_bert
MODEL_NAME = "ner_dl_bert"
text_list = ["""William Henry Gates III (born October 28, 1955) is an American business magnate, software developer, investor, and philanthropist. He is best known as the co-founder of Microsoft Corporation. During his career at Microsoft, Gates held the positions of chairman, chief executive officer (CEO), president and chief software architect, while also being the largest individual shareholder until May 2014. He is one of the best-known entrepreneurs and pioneers of the microcomputer revolution of the 1970s and 1980s. Born and raised in Seattle, Washington, Gates co-founded Microsoft with childhood friend Paul Allen in 1975, in Albuquerque, New Mexico; it went on to become the world's largest personal computer software company. Gates led the company as chairman and CEO until stepping down as CEO in January 2000, but he remained chairman and became chief software architect. During the late 1990s, Gates had been criticized for his business tactics, which have been considered anti-competitive. This opinion has been upheld by numerous court rulings. In June 2006, Gates announced that he would be transitioning to a part-time role at Microsoft and full-time work at the Bill & Melinda Gates Foundation, the private charitable foundation that he and his wife, Melinda Gates, established in 2000.[9] He gradually transferred his duties to Ray Ozzie and Craig Mundie. He stepped down as chairman of Microsoft in February 2014 and assumed a new post as technology adviser to support the newly appointed CEO Satya Nadella.""", """The Mona Lisa is a 16th century oil painting created by Leonardo. It's held at the Louvre in Paris."""]
documentAssembler = DocumentAssembler().setInputCol('text').setOutputCol('document')
tokenizer = Tokenizer().setInputCols(['document']).setOutputCol('token')
# ner_dl and onto_100 model are trained with glove_100d, so the embeddings in
# the pipeline should match
if (MODEL_NAME == "ner_dl") or (MODEL_NAME == "onto_100"):
embeddings = WordEmbeddingsModel.pretrained('glove_100d').setInputCols(["document", 'token']).setOutputCol("embeddings")
# Bert model uses Bert embeddings
if MODEL_NAME == "ner_dl_bert":
embeddings = BertEmbeddings.pretrained(name='bert_base_cased', lang='en').setInputCols(['document', 'token']).setOutputCol('embeddings')
ner_model = NerDLModel.pretrained(MODEL_NAME, 'en').setInputCols(['document', 'token', 'embeddings']).setOutputCol('ner')
ner_converter = NerConverter().setInputCols(['document', 'token', 'ner']).setOutputCol('ner_chunk')
nlp_pipeline = Pipeline(stages=[documentAssembler, tokenizer, embeddings, ner_model, ner_converter])
df = spark.createDataFrame(text_list, StringType()).toDF("text")
result = nlp_pipeline.fit(df).transform(df)
Die Resultate des verwendeten Modells sind nun geordnet in einem verschachtelten RDD-Dataframe gespeichert. Die Inhalte dieses Dataframe können wir mit unterschiedlichsten Spark Methoden ausgeben:
#Gibt den vollständigen Inhalt als String aus
result.collect()
#Zeigt eine Übersicht zur RDD-Struktur
result.show()
#Zeigt die Token bzw. Wörter zusammen mit den vorausgesagten NER-Labels in einer Tabelle
result.select('token.result','ner.result').show(truncate=100)
tbc