মূল বিষয়বস্তুতে যান

Automating Spark Jobs with GitLab CI/CD

This guide demonstrates how to build a complete CI/CD pipeline for Apache Spark applications on Kubernetes using GitLab CI/CD and Ilum. By implementing this architecture, you establish a robust DataOps practice that automates the lifecycle of your data engineering code.

মূল উপকারিতা

  • Automated Deployments: Eliminate manual স্পার্ক-সাবমিট commands.
  • সংস্করণ নিয়ন্ত্রণ: Ensure every job running in your cluster corresponds to a specific Git commit.
  • Consistency: Automatically synchronize environment configurations (Ilum Groups) and job definitions.
  • Feedback Loop: Get immediate status reporting on job submission success or failure directly in GitLab.

পূর্বশর্ত

  • A GitLab project (SaaS or Self-Managed).
  • Ilum API endpoint reachable from your GitLab Runner (e.g., via Ingress or internal network).
  • কার্ল এবং jq installed in the runner image (or use an image like alpine and install them).

1. Architecture of a Spark CI/CD Pipeline

The pipeline uses the Ilum REST API to interact with the Kubernetes cluster.

  1. Check/Manage Groups: Ensures the target Ilum Group exists (and optionally recreates it to update group-level files/settings).
  2. Submit Job: Posts the Spark job definition and code to Ilum.
  3. Execute: The job runs on the Spark cluster, independent of the CI runner.

2. Automating Ilum Group Management & Environment Setup

This example pipeline stage demonstrates how to manage Ilum Groups—logical containers for interactive sessions and shared resources. It checks if a group exists, deletes it (to cleanup old state), and recreates it with fresh configuration.

The Group Service (service.py)

This Python script defines an interactive Spark job that the group will execute. It inherits from ইলুমজব and provides details about a specified table.

service.py
থেকে ইলুম . এপিআই আমদানি ইলুমজব 
থেকে পাইস্পার্ক . এসকিউএল . functions আমদানি col, sum হিসেবে spark_sum
থেকে io আমদানি StringIO

শ্রেণী SparkInteractiveExample( ইলুমজব ) :
ডিএফ চালনা ( স্বয়ং , স্ফুলিঙ্গ , কনফিগার ) - > str:
table_name = কনফিগার . পান ( 'টেবিল' )
database_name = কনফিগার . পান ( 'database') # optional
report_lines = [ ]

যদি না table_name :
raise ValueError( "Config must provide a 'table' key")

# Use specified database if provided
যদি database_name:
স্ফুলিঙ্গ . ক্যাটালগ . setCurrentDatabase( database_name)
report_lines. append( f"Using database: { database_name} " )

# Check if table exists in catalog
যদি table_name না মধ্যে [ t . নাম জন্য t মধ্যে স্ফুলিঙ্গ . ক্যাটালগ . listTables( ) ] :
raise ValueError( f"Table '{ table_name } ' not found in catalog")

ডিএফ = স্ফুলিঙ্গ . সারণী ( table_name )

report_lines. append( f"=== Details for table: { table_name } ===")

# Total rows
total_rows = ডিএফ . গণনা ( )
report_lines. append( f"Total rows: { total_rows} " )

# Total columns
total_columns = len( ডিএফ . columns)
report_lines. append( f"Total columns: { total_columns} " )

# Distinct counts per column
report_lines. append( "Distinct values per column:")
জন্য c মধ্যে ডিএফ . columns:
distinct_count = ডিএফ . select( c ) . distinct( ) . গণনা ( )
report_lines. append( f" { c } : { distinct_count} " )

# Schema info
report_lines. append( "Schema:")
# Spark does not easily return schema as string; we can reconstruct:
জন্য f মধ্যে ডিএফ . স্কীমা . fields:
report_lines. append( f" { f . নাম } : { f . dataType} " )

# Sample data
report_lines. append( "Sample data (first 5 rows):")
sample_rows = ডিএফ . take( 5 )
জন্য সারি মধ্যে sample_rows:
report_lines. append( str( সারি . asDict( ) ) )

# Null counts per column
report_lines. append( "Null counts per column:")
null_counts_df = ডিএফ . select( [ spark_sum( col( c ) . isNull( ) . cast( "int") ) . alias( c ) জন্য c মধ্যে ডিএফ . columns] )
null_counts = null_counts_df. collect( ) [ 0 ] . asDict( )
জন্য c , v মধ্যে null_counts. items( ) :
report_lines. append( f" { c } : { v} " )

ফিরে "\n". join( report_lines)

The CI Pipeline (Group Management)

This pipeline stage manages the lifecycle of the Ilum Group. It checks if the group exists, removes it if necessary to apply code updates, and recreates it with the latest service.py.

gitlab-ci-group.yml
stages: 
- manage_group

manage_group:
stage: manage_group
প্রতিচ্ছবি : alpine: 3.20
before_script:
- apk add - - no- cache curl jq
script:
- echo "--- Checking Group Existence --- "
- |
GROUP_NAME="ILUM_COURSE"
API_URL="http://ilum-core.default:9888/api/v1"

# 1. Check if group exists
RESPONSE=$(curl - s "$API_URL/group")
GROUP_ID=$(echo "$RESPONSE" | jq - r ".content[ ] | select(.name==\"$GROUP_NAME\") | .id")

# 2. Delete if exists (to update code/config)
যদি [ - n "$GROUP_ID" ] && [ "$GROUP_ID" != "null" ] ; then
echo "Deleting existing group $GROUP_ID... "
কার্ল - s - X POST "$API_URL/group/$GROUP_ID/stop"
কার্ল - s - X DELETE "$API_URL/group/$GROUP_ID"
fi

# 3. Create new group with updated service.py
echo "Creating new group... "
CREATE_RESP=$(curl - s - X POST \
- F "name=$GROUP_NAME" \
- F "clusterName=default" \
- F "language=PYTHON" \
- F "[email protected] " \
- F "jobConfig=spark.executor.instances=2;spark.driver.memory=2g" \
"$API_URL/group")

NEW_ID=$(echo "$CREATE_RESP" | jq - r '.groupId // empty')
যদি [ - n "$NEW_ID" ] ; then
echo "Group created successfully with ID: $NEW_ID"
অন্য
echo "Failed to create group. Response: $CREATE_RESP"
exit 1
fi

3. Continuous Deployment of Spark Jobs

This stage implements the Continuous Deployment (CD) logic by submitting a PySpark job to the Ilum cluster. It is configured to trigger automatically on a git push to the মূল branch.

The Spark Job (submit.py)

Create a file named submit.py in your repository root. This example job creates a university database with Hive tables and uses Delta Lake format.

submit.py
আমদানি  logging
থেকে পাইস্পার্ক . এসকিউএল আমদানি স্পার্কসেশন

logging. basicConfig( level= logging. INFO)
logger = logging. getLogger( __নাম__ )

logger. তথ্য ( """
=== Example Spark Job: Student Enrollments ===
This Spark job demonstrates a simple educational data pipeline using Hive tables.
It performs the following steps:
1. Creates a 'students' table with student information.
2. Creates a 'courses' table with available courses.
3. Creates an 'enrollments' table linking students to courses.
4. Joins the tables to calculate enrollment statistics and saves them into 'course_stats'.

All intermediate results are stored in the 'university' Hive database.
=============================================
""")

স্ফুলিঙ্গ = SparkSession \
. নির্মাতা \
. অ্যাপের নাম ( "Spark") \
. getOrCreate ( )

logger. তথ্য ( "SparkSession initialized")

স্ফুলিঙ্গ . এসকিউএল ( "CREATE DATABASE IF NOT EXISTS university")
logger. তথ্য ( "Database 'university' ensured")

# --- Create Students Table ---
students_data = [
( 1 , "Alice", "Computer Science") ,
( 2 , "Bob", "Mathematics") ,
( 3 , "Charlie", "Physics") ,
( 4 , "Diana", "Computer Science")
]

df_students = স্ফুলিঙ্গ . createDataFrame ( students_data, [ "student_id", "নাম" , "major"] )
স্ফুলিঙ্গ . এসকিউএল ( "DROP TABLE IF EXISTS university.students")
df_students. লিখন . format( "delta") . saveAsTable( "university.students")
logger. তথ্য ( "Created table: university.students")

# --- Create Courses Table ---
courses_data = [
( 101, "Big Data") ,
( 102, "Linear Algebra") ,
( 103, "Quantum Mechanics")
]

df_courses = স্ফুলিঙ্গ . createDataFrame ( courses_data, [ "course_id", "course_name"] )
স্ফুলিঙ্গ . এসকিউএল ( "DROP TABLE IF EXISTS university.courses")
df_courses. লিখন . format( "delta") . saveAsTable( "university.courses")
logger. তথ্য ( "Created table: university.courses")

# --- Create Enrollments Table ---
enrollments_data = [
( 1 , 101) , # Alice -> Big Data
( 2 , 102) , # Bob -> Linear Algebra
( 3 , 103) , # Charlie -> Quantum Mechanics
( 4 , 101) , # Diana -> Big Data
( 2 , 101) # Bob -> Big Data
]

df_enrollments = স্ফুলিঙ্গ . createDataFrame ( enrollments_data, [ "student_id", "course_id"] )
স্ফুলিঙ্গ . এসকিউএল ( "DROP TABLE IF EXISTS university.enrollments")
df_enrollments. লিখন . format( "delta") . saveAsTable( "university.enrollments")
logger. তথ্য ( "Created table: university.enrollments")

# --- Join to calculate course enrollment counts ---
df_course_stats = স্ফুলিঙ্গ . এসকিউএল ( """
নির্বাচন
c.course_id,
c.course_name,
COUNT(e.student_id) AS total_students
FROM university.courses c
LEFT JOIN university.enrollments e ON c.course_id = e.course_id
GROUP BY c.course_id, c.course_name
""")

স্ফুলিঙ্গ . এসকিউএল ( "DROP TABLE IF EXISTS university.course_stats")
df_course_stats. লিখন . format( "delta") . saveAsTable( "university.course_stats")
logger. তথ্য ( "Inserted final data into course_stats table")

The CI Pipeline (Job Submission)

Add the job submission stage to your .gitlab-ci.yml. This configuration uses alpine:3.20 and includes error handling for the REST API response.

gitlab-ci-job.yml
stages: 
- submit_job

# Define variables globally or per job
variables:
# In a real project, use CI/CD Variables for endpoints and tokens
ILUM_API_URL: "http://ilum-core.default:9888/api/v1"

submit_job:
stage: submit_job
প্রতিচ্ছবি : alpine: 3.20
rules:
- যদি : '$CI_COMMIT_BRANCH == "main"'
before_script:
- apk add - - no- cache curl jq
script:
- echo "Creating job ILUM_JOB_SUBMIT with submit.py... "
- |
# Submit job request and capture HTTP status code + body
# Note: jobClass is mandatory.
# - For a script, use the filename without .py (e.g., "submit").
# - For no specific class (main entry point), use an empty string.
# - "filename.classname" is only for interactive jobs or packages.
RESPONSE=$(curl -s -X POST \
-F "name=gitlab_pipeline_job" \
-F "[email protected] " \
-F "clusterName=default" \
-F "language=PYTHON" \
-F "jobClass=submit" \
-w "\nHTTP_STATUS:%{http_code}" \
"$ILUM_API_URL/job/submit")

# Extract Status and Body
HTTP_STATUS=$(echo "$RESPONSE" | grep HTTP_STATUS | cut - d': ' - f2)
BODY=$(echo "$RESPONSE" | sed '/HTTP_STATUS/d')

echo "HTTP Status: $HTTP_STATUS"
echo "Response Body: $BODY"

যদি [ "$HTTP_STATUS" - ne 200 ] ; then
echo "Error: Failed to create job (Status: $HTTP_STATUS)"
exit 1
fi

JOB_ID=$(echo "$BODY" | jq - r '.jobId // empty')
যদি [ - n "$JOB_ID" ] ; then
echo "✅ Job created successfully with ID $JOB_ID."
echo "You can check status in Ilum UI."
অন্য
echo "Warning: Job created but ID not returned."
fi

Pipeline Variables

For security, avoid hardcoding URLs. Use GitLab CI/CD Variables (Settings -> CI/CD -> Variables):

jobConfig format

When using the মাল্টিপার্ট/ফর্ম-ডাটা endpoint (like /চাকরি/জমা দিন বা /গোষ্ঠী ), the জবকনফিগ should be a semicolon-separated string (e.g., spark.key=value;spark.key2=value2).

However, when submitting an interactive execution via the JSON API (e.g., /গ্রুপ/{groupId}/job/execute ), the জবকনফিগ must be a standard JSON object.

  • ILUM_API_URL: e.g., https://ilum.example.com/api
  • ILUM_AUTH_TOKEN: If authentication is enabled, pass this header in কার্ল ( -H "Authorization: Bearer $ILUM_AUTH_TOKEN").

Verification

  1. Push your code to the মূল branch.
  2. যেতে Build -> Pipelines in GitLab.
  3. Wait for the submit_job stage to pass.
  4. Open the Ilum UI -> কাজ ট্যাব।
  5. You should see gitlab_daily_report in the running or completed state.

Frequently Asked Questions (FAQ)

Why use the REST API instead of স্পার্ক-সাবমিট ?

The Ilum REST API provides a programmatic, language-agnostic way to interact with the cluster. Unlike স্পার্ক-সাবমিট which requires a complex client-side setup (Java, Hadoop configs, K8s credentials) in your CI runner, the REST API only requires কার্ল and network access to the Ilum endpoint. This drastically simplifies your CI runner images.

How do I handle secrets in my pipeline?

Never hardcode secrets like S3 keys or database passwords in your submit.py. Instead:

  1. Store them as GitLab CI/CD Variables (masked and protected).
  2. Pass them to the Ilum job as Spark configuration properties or environment variables during the API call (e.g., -F "jobConfig=spark.hadoop.fs.s3a.access.key=$AWS_ACCESS_KEY...").

Can I run this on GitHub Actions?

Yes. The concepts are identical. You would replace the .gitlab-ci.yml syntax with GitHub Actions workflow syntax, using কার্ল steps to hit the same Ilum API endpoints.