# PySpark auf dem HPC Cluster In diesem Beispiel verwenden wir Magpie um einen Sparkcluster innerhalb des Lithium HPC Clusters aufzubauen. ## Zielpublikum Dieses Dokument ist für fortgeschrittene Anwender gedacht. Du solltest daher mit den Grundsätzen unseres HPC Clusters vertraut sein (siehe [Cluster Howto][1]) und Übung im Umgang mit der Linux Konsole und SSH haben. Falls Du Unterstützung beim Einrichten von PySpark brauchst wende dich an den DAViS Sysadmin. Diese Anleitung ist eine ausgedeutsche und auf unsere Umgebung zugeschnittene Version der offiziellen [Magpie Dokumentation][2] ## Grundsätzliches Vorgehen - Installation der Softwarekomponenten für Spark - Konfiguration dieser Softwarekomponenten - Mit Slurm ein Magpie Batchskript aufgeben - Zugriff auf den Cluster mit SSH ## Installation und Konfiguration von Spark Klonen der neusten Magpie Skripte ab Github: ``` mkdir spark && cd spark git clone https://github.com/LLNL/magpie.git cd magpie/misc/ vim magpie-download-and-setup.sh ``` Passe im File `magpie-download-and-setup.sh` die folgenden Parameter an und lösche "#" bei Bedarf vor diesen Einträgen: ``` SPARK_DOWNLOAD="Y" INSTALL_PATH="$HOME/spark" PRESET_LAUNCH_SCRIPT_CONFIGS="Y" LOCAL_DIR_PATH="/tmp/$USER" NETWORKFS_DIR_PATH="/scratch/$USER" ``` Danach das Skript mit dem Befehl `./magpie-download-and-setup.sh` ausführen. Während des eher gemächlichen Downloads von Spark kann die Zeit genutzt werden, um herauszufinden welche Version von Python und Java die verwendete Spark Version (in unserem Fall 3.5.0) voraussetzt. Während des Downloads wird die Sparkversion angezeigt. Diese Informationen können wir am Zuverlässigsten aus der offiziellen Spark Webseite entnehmen. Im Falle von Spark 3.5.0 ist diese Information unter https://spark.apache.org/docs/3.5.0/ zu finden. Daher können wir folgende Prerequisits notieren: * Java 17 * Python 3.8 und neuer Diese Softwarepakete installieren wir mit Miniconda - Lizenzvereinbarungen akzeptieren und Installationsorte übernehmen: ``` wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh bash Miniconda3-latest-Linux-x86_64.sh rm Miniconda3-latest-Linux-x86_64.sh ``` Nach Abschluss der Installation die Datei `.bashrc` neu laden ``` . ~/.bashrc ``` Jetzt können wir eine Conda Environment anlegen und die entsprechenden Versionen von Java und Python installieren. Ìn unserem Beispiel mit **Spark 3.3.2** sieht das so aus: ``` conda create -y -n spark conda activate spark conda install -y python=3.12.2 conda install -y -c conda-forge openjdk=17.0.10 ``` Danach mit den Befehlen (innerhalb der conda env) ``` type -p python dirname $(dirname $(readlink -f $(type -p java))) ``` die **jeweiligen Speicherorte des Python und Java executable anzeigen lassen und notieren**. Die Pfade sollten die Verzeichniselement miniconda3/envs enthalten. Nachdem das Skript `./magpie-download-and-setup.sh` seine Arbeit abgeschlossen hat, wechseln wir mit folgenden Befehlen zurück ins Verzeichnis `~/spark` und führen "magpie.sbatch-srun-spark" aus: ``` cd $HOME/spark cp magpie/submission-scripts/script-sbatch-srun/magpie.sbatch-srun-spark . ``` Im Submission-Skript `magpie.sbatch-srun-spark` passen wir nun die folgenden Zeilen an; je nach benötigten Resourcen, können die Parameter zu den SBATCH Befehlen individuell angepasst werden. **Achtung: die Rautenzeichen for SBATCH müssen bestehen bleiben.** ``` #SBATCH --nodes=5 #SBATCH --time=180:00 #SBATCH --job-name="spark-test" #SBATCH --partition=staff ODER #SBATCH --partition=students export MAGPIE_JOB_TYPE="interactive" export JAVA_HOME="" export MAGPIE_PYTHON="" export SPARK_LOCAL_SCRATCH_DIR="/tmp/${USER}/sparkscratch/" ``` Als nächstes können wir nun das Magpie Skript als Slurmjob an den HPC Cluster übergeben. Zuerst überprüfen wir nochmals, ob das Cluster Lithium online ist und Ressourcen frei sind: ``` sinfo --all ``` Falls genügend Nodes (in unserem Beispiel fünf) im Status 'Idle' erscheinen, können wir sofern wir uns noch im Verzeichnis "$HOME/spark" befinden, den Slurmjob mit dem Befehl ``` sbatch -k magpie.sbatch-srun-spark ``` dem Slurm Scheduler übergeben. Mit dem Befehl `squeue` können wir den Status unseres Batchjobs in Erfahrung bringen. Idealerweise ist dieser auf 'R' (running) oder 'PD' (pending). Ob unser Sparkcluster bereits gestartet ist, erfahren wir im Slurmjob Logfile im Verzeichnis `spark`. Zum Beispiel mit folgendem Befehl mit der Nummer des Bashjobs für *: ``` cat $HOME/spark/slurm-*.out ``` Im Erfolgsfalle sollten unter anderem die untenstehenden Angaben im Logfile angezeigt werden: ``` . . . * ssh computenode1 * export JAVA_HOME="$HOME/miniconda3/envs/spark/bin" * export SPARK_HOME="$HOME/spark/spark-3.4.1-bin-hadoop3" * export SPARK_CONF_DIR="/tmp/$USER/spark/spark-test/*/spark/conf" * * Then you can do as you please. For example to run a job: * * $SPARK_HOME/bin/spark-submit --class * ******************************************************* ******************************************************* * Executing Pre Run Scripts ******************************************************* ******************************************************* * Entering Magpie interactive mode ******************************************************* ******************************************************* * Run * * ssh computenode1 kill -s 10 6636 * * to exit 'interactive' mode early. ******************************************************* ``` In diesem Logfile interessiert uns hauptsächlich wie wir nun auf die PySpark Shell zugreifen können. Wie in der `slurm-*. out` Datei erwähnt, können wir uns nun mit den folgenden Befehlen aus dem Logfile einloggen: ``` ssh computenode1 export JAVA_HOME="$HOME/miniconda3/envs/spark/bin" export SPARK_HOME="$HOME/spark/spark-3.4.1-bin-hadoop3" export SPARK_CONF_DIR="/tmp/$USER/spark/spark-test/*/spark/conf" ``` Danach überprüfen ob die Spark Conda Environment bereits gestartet ist, ansonsten mit dem Befehl ``` conda activate spark ``` aktivieren. Danach kann ein beliebiger PySpark Befehl ausgeführt werden. Zum Beispiel eine Sparkshell: ``` $SPARK_HOME/bin/pyspark ``` ## Installation von SparkNLP auf PySpark Wie auch andere Python-Bibliotheken kann beispielsweise auch die auf Spark-basierende NLP-Library [SparkNLP][3] über die aufgesetzte virtuelle Umgebung "spark" via pip oder conda installiert werden. Für SparkNLP werden als Ergänzung der Miniconda-Basisumgebung noch folgende Bibliotheken benötigt: * NumPy Nach der Installation von Numpy kann nun auch SparkNLP in der virtuellen Umgebung "spark" mit folgendem Befehl installiert werden: ``` pip install spark-nlp ``` ### SparkNLP mit einem vortrainierten Modell Starten des Batchjobs: ``` sbatch -k magpie.sbatch-srun-spark ``` Danach folgt der übliche Ablauf um Pyspark zu starten, jedoch müssen wir gleichzeitig die [jar-packages][3] von SparkNLP beim Aufstarten von Pyspark laden: **Achtung: Die Version des Package muss mit der installierten SparkNLP Version übereinstimmen. (hier SparkNLP 5.1.1)** ``` ssh computenode1 export JAVA_HOME="$HOME/miniconda3/envs/spark/bin" export SPARK_HOME="$HOME/spark/spark-3.4.1-bin-hadoop3" export SPARK_CONF_DIR="/tmp/$USER/spark/spark-test/*/spark/conf" conda activate spark $SPARK_HOME/bin/pyspark --packages com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.1 ``` #### Beispiel Nach erfolgreichem Start von SparkNLP und der Sparkshell können wir ein vortrainiertes Modell zu einem Named Entity Recognition-Task mit folgendem [Beispielcode][4] verwenden: ``` import json import numpy as np import sparknlp import pyspark.sql.functions as F from pyspark.ml import Pipeline from pyspark.sql import SparkSession from sparknlp.annotator import * from sparknlp.base import * from sparknlp.pretrained import PretrainedPipeline from pyspark.sql.types import StringType, IntegerType # If you change the model, re-run all the cells below # Other applicable models: ner_dl, ner_dl_bert MODEL_NAME = "ner_dl_bert" text_list = ["""William Henry Gates III (born October 28, 1955) is an American business magnate, software developer, investor, and philanthropist. He is best known as the co-founder of Microsoft Corporation. During his career at Microsoft, Gates held the positions of chairman, chief executive officer (CEO), president and chief software architect, while also being the largest individual shareholder until May 2014. He is one of the best-known entrepreneurs and pioneers of the microcomputer revolution of the 1970s and 1980s. Born and raised in Seattle, Washington, Gates co-founded Microsoft with childhood friend Paul Allen in 1975, in Albuquerque, New Mexico; it went on to become the world's largest personal computer software company. Gates led the company as chairman and CEO until stepping down as CEO in January 2000, but he remained chairman and became chief software architect. During the late 1990s, Gates had been criticized for his business tactics, which have been considered anti-competitive. This opinion has been upheld by numerous court rulings. In June 2006, Gates announced that he would be transitioning to a part-time role at Microsoft and full-time work at the Bill & Melinda Gates Foundation, the private charitable foundation that he and his wife, Melinda Gates, established in 2000.[9] He gradually transferred his duties to Ray Ozzie and Craig Mundie. He stepped down as chairman of Microsoft in February 2014 and assumed a new post as technology adviser to support the newly appointed CEO Satya Nadella.""", """The Mona Lisa is a 16th century oil painting created by Leonardo. It's held at the Louvre in Paris."""] documentAssembler = DocumentAssembler().setInputCol('text').setOutputCol('document') tokenizer = Tokenizer().setInputCols(['document']).setOutputCol('token') # ner_dl and onto_100 model are trained with glove_100d, so the embeddings in # the pipeline should match if (MODEL_NAME == "ner_dl") or (MODEL_NAME == "onto_100"): embeddings = WordEmbeddingsModel.pretrained('glove_100d').setInputCols(["document", 'token']).setOutputCol("embeddings") # Bert model uses Bert embeddings if MODEL_NAME == "ner_dl_bert": embeddings = BertEmbeddings.pretrained(name='bert_base_cased', lang='en').setInputCols(['document', 'token']).setOutputCol('embeddings') ner_model = NerDLModel.pretrained(MODEL_NAME, 'en').setInputCols(['document', 'token', 'embeddings']).setOutputCol('ner') ner_converter = NerConverter().setInputCols(['document', 'token', 'ner']).setOutputCol('ner_chunk') nlp_pipeline = Pipeline(stages=[documentAssembler, tokenizer, embeddings, ner_model, ner_converter]) df = spark.createDataFrame(text_list, StringType()).toDF("text") result = nlp_pipeline.fit(df).transform(df) ``` Die Resultate des verwendeten Modells sind nun geordnet in einem verschachtelten RDD-Dataframe gespeichert. Die Inhalte dieses Dataframe können wir mit unterschiedlichsten Spark Methoden ausgeben: ``` #Gibt den vollständigen Inhalt als String aus result.collect() #Zeigt eine Übersicht zur RDD-Struktur result.show() #Zeigt die Token bzw. Wörter zusammen mit den vorausgesagten NER-Labels in einer Tabelle result.select('token.result','ner.result').show(truncate=100) ``` tbc [1]: https://gitea.fhgr.ch/kellerthomas/docs-cds/src/branch/master/Cluster-Howto.md [2]: https://github.com/LLNL/magpie/blob/master/doc/README.spark [3]: https://sparknlp.org/api/python/getting_started/index.html [4]: https://colab.research.google.com/github/JohnSnowLabs/spark-nlp-workshop/blob/master/tutorials/streamlit_notebooks/NER_EN.ipynb