How to Write Interactive Spark Jobs in Python (IlumJob)
This guide teaches you how to develop interactive Spark jobs in Python using the ইলুমজব interface. You'll learn how to structure your code, pass parameters at execution time, and leverage the benefits of this approach for production workloads on Kubernetes.
What is the ইলুমজব Interface?
ঐ ইলুমজব interface is a Python base class used to create reusable, parameterized Spark jobs that run on interactive Ilum services. Unlike traditional স্পার্ক-সাবমিট scripts, ইলুমজব allows you to:
- Receive configuration at runtime: Parameters are passed as a dictionary, allowing the same job to handle different inputs without code changes.
- Return structured results:ঐ
চালনাmethod returns a string, making it easy to extract and display results. - Run on-demand: Jobs can be triggered via the UI, REST API, or CI/CD pipelines.
থেকে ইলুম . এপিআই আমদানি ইলুমজব
শ্রেণী MySparkJob( ইলুমজব ) :
ডিএফ চালনা ( স্বয়ং , স্ফুলিঙ্গ , কনফিগার ) - > str:
# Your Spark logic here
ফিরে "Job completed successfully"
Structure of an Interactive Spark Job
Every interactive job consists of three essential parts:
- Import the interface:
from ilum.api import IlumJob - Define a class: Create a class that inherits from
ইলুমজব. - Implement
চালনা: Write your Spark logic inside therun(self, spark, config)পদ্ধতি।
| Parameter | টাইপ | বর্ণনা |
|---|---|---|
স্ফুলিঙ্গ | স্পার্কসেশন | Pre-initialized Spark session, ready to use. |
কনফিগার | dict | A dictionary containing parameters passed at execution time. |
| Return | str | A string result that will be displayed in the UI or returned via API. |
How to Pass Parameters to Spark Jobs
Parameters are passed as a JSON object when executing the job. Inside your চালনা method, you access them using standard dictionary methods.
Example: Table Inspector
This example demonstrates reading databaseএবং সারণী parameters to inspect a Hive table.
থেকে ইলুম . এপিআই আমদানি ইলুমজব
থেকে পাইস্পার্ক . এসকিউএল . functions আমদানি col, sum হিসেবে spark_sum
শ্রেণী TableInspector( ইলুমজব ) :
ডিএফ চালনা ( স্বয়ং , স্ফুলিঙ্গ , কনফিগার ) - > str:
# Read required parameters
table_name = কনফিগার . পান ( 'টেবিল' )
database_name = কনফিগার . পান ( 'database') # Optional
যদি না table_name :
raise ValueError( "Config must provide a 'table' key")
# Set database if provided
যদি database_name:
স্ফুলিঙ্গ . ক্যাটালগ . setCurrentDatabase( database_name)
# Check if table exists
যদি table_name না মধ্যে [ t . নাম জন্য t মধ্যে স্ফুলিঙ্গ . ক্যাটালগ . listTables( ) ] :
raise ValueError( f"Table '{ table_name } ' not found in catalog")
ডিএফ = স্ফুলিঙ্গ . সারণী ( table_name )
# Build report
report = [
f"=== Table: { table_name } ===",
f"Total rows: { ডিএফ . গণনা ( ) } " ,
f"Total columns: { len( ডিএফ . columns) } " ,
"" ,
"Schema:",
]
জন্য field মধ্যে ডিএফ . স্কীমা . fields:
report. append( f" { field. নাম } : { field. dataType} " )
report. append( "" )
report. append( "Sample (5 rows):")
জন্য সারি মধ্যে ডিএফ . take( 5 ) :
report. append( str( সারি . asDict( ) ) )
# Null counts
report. append( "" )
report. append( "Null counts:")
null_df = ডিএফ . select( [ spark_sum( col( c ) . isNull( ) . cast( "int") ) . alias( c ) জন্য c মধ্যে ডিএফ . columns] )
জন্য c , v মধ্যে null_df. collect( ) [ 0 ] . asDict( ) . items( ) :
report. append( f" { c } : { v} " )
ফিরে "\n". join( report)
Execution Parameters (JSON)
When executing via UI or API, provide parameters like this:
{
"database": "ilum_example_product_sales",
"table": "products"
}
To run an interactive job, you first need to create and deploy a Job-type Service in Ilum. This service provides the Spark environment where your jobs execute.
When creating the service:
- টাইপ : Select
কাজ - ভাষা : Select
পাইথন - Py Files: Upload your job file (e.g.,
table_inspector.py)
👉 Learn how to deploy a Job Service — step-by-step guide with UI screenshots and configuration options.
Executing Jobs
You can execute interactive jobs in three ways:
- Ilum UI
- REST API
- CI/CD Pipeline
- যেতে সেবা → Select your Job service
- এতে কার্যকর section:
- শ্রেণী:
table_inspector.TableInspector - Parameters:
{"database": "sales", "table": "orders"}
- শ্রেণী:
- টিপুন কার্যকর
The result string is displayed immediately in the UI.
Before executing jobs via API:
- Expose the API: See Accessing the API for port forwarding, NodePort, or Ingress setup
- Get your Group ID: Run
curl http://localhost:9888/api/v1/groupand copy theআইডিfield of your Job Service
curl -X POST "http://ilum-core:9888/api/v1/group/{groupId}/job/execute" \
-H "Content-Type: application/json" \
-d '{
"প্রকার": "interactive_job_execute",
"jobClass": "table_inspector.TableInspector",
"jobConfig": {
"database": "sales",
"table": "orders"
}
}'
The response contains the result string and execution metadata.
Trigger job execution from GitLab CI/CD or similar:
execute_interactive_job:
stage: চালনা
script:
- |
curl -s -X POST \
-H "Content-Type: application/json" \
-d '{
"প্রকার": "interactive_job_execute",
"jobClass": "table_inspector.TableInspector",
"jobConfig": {
"database": "sales",
"table": "orders"
}
}' \
http://ilum-core:9888/api/v1/group/${GROUP_ID}/job/execute
variables:
GROUP_ID : "your-group-id-here" # Get this from: curl http://ilum-core:9888/api/v1/group
See CI/CD with GitLab for a complete pipeline example including group creation.
Benefits of the ইলুমজব Approach
| Benefit | বর্ণনা |
|---|---|
| Reusability | Write once, run many times with different parameters. |
| No Cold Starts | Interactive services keep Spark warm, so subsequent executions are instant. |
| Parameterization | Pass configuration at runtime—no need to hardcode values. |
| পর্যবেক্ষণযোগ্যতা | Results are captured and visible in the UI/API for easy debugging. |
| API-Driven | Execute jobs programmatically from orchestrators, CI/CD, or external systems. |
| সংস্করণ নিয়ন্ত্রণ | Store job code in Git and deploy via pipelines. |
Interactive Jobs vs. Batch Jobs (Spark Submit)
| বৈশিষ্ট্য | Interactive Jobs (ইলুমজব ) | Batch Jobs (স্পার্ক-সাবমিট ) |
|---|---|---|
| Startup Time | Instant (uses warm executors) | Slow (provisions new pods) |
| Context | Shared Spark Context | Isolated Spark Context |
| কেস ব্যবহার করুন | Ad-hoc queries, API backends, quick reports | Long-running ETL, heavy processing |
| ফলাফল | Returns string result to API/UI | Logs to driver stdout/file |
| সংস্থান | Shared within the service | Dedicated per job |
সর্বোত্তম অনুশীলন
1. Validate Input Parameters
Always validate required parameters and provide helpful error messages.
ডিএফ চালনা ( স্বয়ং , স্ফুলিঙ্গ , কনফিগার ) - > str:
required_keys = [ 'টেবিল' , 'output_path']
জন্য কী মধ্যে required_keys:
যদি কী না মধ্যে কনফিগার :
raise ValueError( f"Missing required parameter: '{ কী } '")
2. Use Default Values
For optional parameters, use config.get('key', default_value).
batch_size = আইএনটি ( কনফিগার . পান ( 'batch_size', 1000) )
3. Structure Your Output
Return a well-formatted string for readability in the UI.
lines = [ "=== Job Summary ==="]
lines. append( f"Processed: { গণনা } records")
lines. append( f"Duration: { elapsed_time} s")
ফিরে "\n". join( lines)
4. Handle Errors Gracefully
Wrap risky operations in try/except and return meaningful messages.
try:
ডিএফ . লিখন . saveAsTable( output_table)
ফিরে f"Successfully wrote to { output_table} "
except Exception হিসেবে ই :
ফিরে f"Error writing table: { str( ই ) } "
Complete Example: Transaction Report Generator
This job generates a transaction summary report based on the transaction_anomaly_d.transactionsসারণী।
থেকে ইলুম . এপিআই আমদানি ইলুমজব
থেকে পাইস্পার্ক . এসকিউএল . functions আমদানি sum হিসেবে spark_sum, গণনা , col
শ্রেণী TransactionReportGenerator( ইলুমজব ) :
ডিএফ চালনা ( স্বয়ং , স্ফুলিঙ্গ , কনফিগার ) - > str:
# Parameters
merchant_filter = কনফিগার . পান ( 'merchant') # Optional filter
# Load data from the default Ilum transactions table
ডিএফ = স্ফুলিঙ্গ . সারণী ( "transaction_anomaly_detection.transactions")
যদি merchant_filter:
ডিএফ = ডিএফ . ফিল্টার ( col( "Merchant") == merchant_filter)
# Aggregate by TransactionType
summary = ডিএফ . groupBy( "TransactionType") . agg(
গণনা ( "TransactionID") . alias( "transaction_count") ,
spark_sum( "Amount") . alias( "total_amount")
) . collect( )
# Build report
report = [
f"=== Transaction Report ===",
f"Merchant Filter: { merchant_filter বা 'All'} " ,
"" ,
"Summary by Transaction Type:",
]
জন্য সারি মধ্যে summary:
report. append( f" { সারি [ 'TransactionType'] } : { সারি [ 'transaction_count'] } txns, ${ সারি [ 'total_amount'] : ,.2f} " )
ফিরে "\n". join( report)
Execute with:
{
"merchant": "AcmeCorp"
}
পরবর্তী পদক্ষেপ
- Interactive Job Service: Learn how to deploy and manage Job-type services.
- Interactive Code Service: For ad-hoc exploratory analysis with persistent sessions.
- CI/CD with GitLab: Automate job deployments via pipelines.
প্রায়শই জিজ্ঞাসিত প্রশ্নাবলী
Can I use Scala for interactive jobs?
Yes. Currently, the ইলুমজব interface is primarily documented for পাইথন . Check the Interactive Job Service documentation for language support details.
How do I debug an interactive job?
Since interactive jobs run on a remote cluster, you can't use a local debugger directly. Instead:
- ব্যবহার
print()statements or a logger, which will appear in the driver logs. - Return error messages as part of the string result in your
try/exceptblocks. - Check the স্পার্ক ইউআই for the specific job execution to analyze tasks and stages.
What happens if my job fails?
If your code raises an unhandled exception, the execution will fail, and the error trace will be returned in the API response. It is best practice to wrap your logic in a try/except block to return a user-friendly error message.