মূল বিষয়বস্তুতে যান

Apache NiFi Integration with Spark

Empty NiFi homepage

Apache NiFi is an enterprise-grade, open-source platform designed to automate the flow of data between disparate systems. It leverages a flow-based programming model, providing a highly configurable, visual interface for data routing, transformation, and system mediation.

When integrated with Ilum, NiFi acts as the orchestration layer, responsible for data ingestion and event triggering, while Ilum serves as the compute layer, executing heavy-duty Apache Spark jobs. This separation of concerns allows for highly scalable, event-driven architectures where data arrival or specific system events automatically trigger distributed processing tasks.

Architecture Overview

In a typical production deployment, the integration follows an Event-Driven Architecture (EDA) pattern:

  1. ডাটা ইনজেশন : NiFi monitors external sources (S3, Kafka, FTP, HTTP) for new data.
  2. Event Detection: Upon detecting new data (e.g., a file landing in an Object Storage bucket), NiFi generates a FlowFile representing this event.
  3. Payload Construction: NiFi transforms the event attributes into a JSON payload compatible with Ilum's REST API.
  4. Job Submission: NiFi submits the job request to the Ilum Core API (/api/v1/group/{groupID}/job/execute).
  5. Execution: Ilum schedules and executes the Spark job on the cluster.

পূর্বশর্ত

Before implementing the pipeline, ensure the following technical requirements are met:

  • ইলাম কোর : A running instance of Ilum Core (v4.0+ recommended).
  • Apache NiFi : A deployed NiFi instance (v1.15+ recommended) with network connectivity to Ilum Core.
  • Network Access:
    • Connectivity from the NiFi environment to the Ilum Core service API (default port 9888 ).
    • Connectivity from NiFi to the Object Storage (e.g., MinIO/S3).
  • Service Accounts: Credentials for accessing the S3/MinIO buckets (Access Key & Secret Key).
তথ্য

To learn how to enable the bundled NiFi deployment within the Ilum stack, refer to the Production Deployment Guide.


Phase 1: Configuring Data Ingestion (NiFi & MinIO)

This section details configuring NiFi to monitor an S3-compatible object storage (MinIO) for incoming files. This setup creates the trigger for our event-driven pipeline.

1. Create a Controller Service for AWS Credentials

Controller Services provide shared resources for processors. We will use the AWSCredentialsProviderControllerService to manage authentication securely.

  1. Right-click on the NiFi canvas and select Controller Services.
  2. Click the + icon and search for AWSCredentialsProviderControllerService.
  3. টিপুন যুক্ত .

Controller Services

2. Configure Credentials

  1. Click the সম্পাদনা icon (pencil) next to the new service.
  2. নেভিগেট করুন Propertiesট্যাব।
  3. Enter your Access Key IDএবং Secret Access Key.
    • Default Ilum MinIO credentials: মিনিওঅ্যাডমিন / মিনিওঅ্যাডমিন
  4. টিপুন Apply.

Placing the key of the credentials provider

3. Enable the Service

Click the সক্ষম icon (lightning bolt) to start the service. Ensure the status changes to সক্ষম .

Enabling the credentials provider

4. Add the ListS3 Processor

ListS3 processor is designed to monitor a bucket and emit a FlowFile for each object found. Unlike GetS3, it does not fetch the content, only the metadata, making it highly efficient for triggering downstream jobs.

  1. Drag the Processor icon from the top toolbar onto the canvas.
  2. Search for and select ListS3.

Default S3 list processor on the canvas

5. Configure ListS3 Properties

Right-click the processor and select Configure. Set the following properties:

Propertyমান বর্ণনা
বালতি ইলুম-ফাইল The target bucket to monitor.
AWS Credentials ProviderAWSCredentials...Select the service created in Step 1.
Endpoint Override URLhttp://ilum-minio:9000 Required for local/private S3-compatible stores.
Regionইউএস-ইস্ট-১ Default region for MinIO (often required even if ignored).
টিপ

Use the Verify button in the configuration window to test connectivity immediately.

Verification of the values


Phase 2: Orchestrating Spark Jobs (NiFi & Ilum)

With the ingestion trigger ready, we will now configure the logic to transform the S3 event into an API call that launches an interactive Spark job in Ilum.

1. Create the Ilum Spark Job

First, ensure you have a registered Ilum Job capable of accepting parameters. Create a Python file (e.g., path_printer.py):

থেকে ইলুম . এপিআই আমদানি ইলুমজব 

শ্রেণী PathPrinter( ইলুমজব ) :
ডিএফ চালনা ( স্বয়ং , স্ফুলিঙ্গ , কনফিগার ) :
# Extract the 'path' parameter passed from NiFi
পথ = কনফিগার . পান ( "path", "unknown_path")
মুদ্রণ ( f"PROCESSING EVENT: New file detected at { পথ } " )

Upload this file to an Ilum Service (Group) and ensure the language is set to পাইথন .

2. Retrieve the Execution Endpoint

নেভিগেট করুন Execute Job tab of your Ilum Service and copy the Job Execution URL. Format: http://<ilum-core-host>:<port>/api/v1/group/<group-id>/job/execute

Execute job tab

3. Dynamic Payload Construction (ReplaceText)

We need to convert the FlowFile (which currently contains S3 metadata) into a JSON payload for the Ilum API.

  1. একটি যোগ করুন ReplaceText processor to the canvas.
  2. Connect the success relationship of ListS3করতে ReplaceText.
  3. কনফিগার করুন Properties:
Propertyমান বর্ণনা
Replacement StrategyAlways ReplaceDiscards input content and replaces it with our JSON.
Evaluation ModeEntire text
Replacement ValueSee JSON belowThe request body for Ilum.

Replacement Value JSON:

{ 
"টাইপ" : "interactive_job_execute" ,
"জব ক্লাস" : "ilumJob.PathPrinter",
"জব কনফিগ" : {
"path": "${filename}"
}
}

Technical Note: The syntax ${filename}হয় NiFi Expression Language. At runtime, NiFi resolves this variable to the actual filename of the object detected by ListS3. This allows a single generic processor to trigger unique jobs for every file.

The relationships tab

4. Job Submission (InvokeHTTP)

InvokeHTTP processor acts as the REST client.

  1. Add an InvokeHTTP processor.
  2. Connect successথেকে ReplaceTextকরতে InvokeHTTP.
  3. কনফিগার করুন Properties:
Propertyমান বর্ণনা
HTTP Methodপোস্ট
Remote URLhttp://ilum-core:9888...Paste the URL from Step 2.
Content-Typeapplication/json Critical for the API to parse the body.
Connection Timeout5 secsFail fast if Ilum is unreachable.
Read Timeout15 secsWait for acknowledgment of job submission.

5. Finalize the Flow

  1. Route Relationships:
    • Response: This relationship contains the JSON response from Ilum (e.g., Job ID). Route this to a LogAttribute processor or valid downstream flow.
    • Retry/Failure: Route these back to the processor (loop) or to a failure handling path.
    • Original: Auto-terminate if not needed.
  2. সূচনা the processors.

Final flow look


Advanced Configuration

Error Handling & Reliability

In production environments, network blips or temporary service unavailability can occur. Do not simply auto-terminate failure relationships.

  1. Retry Policies: In InvokeHTTP, set Retry on Response Status Codesকরতে 500-599.
  2. Dead Letter Queues: Route the failureএবং No Retry relationships to a PutFile or separate LogAttribute processor to archive failed requests for manual inspection.

Security Integration

If your Ilum instance is secured (using JWT or OAuth), you must include authentication headers in the InvokeHTTP processor.

  1. Static Token: Add a dynamic property to InvokeHTTP named অনুমোদন with value Bearer <your-token>.
  2. Dynamic Token: Use a separate flow to fetch a token from your IDP, store it in a distributed map cache, and retrieve it before the InvokeHTTP call.

Performance & Backpressure

When processing high volumes of data (thousands of files per minute), NiFi might overwhelm the Ilum submission API.

  • Backpressure: Right-click the connection between ReplaceTextএবং InvokeHTTP. Configure Back Pressure Object Threshold (e.g., 1000). If the queue fills up, ListS3 will pause automatically.
  • Concurrent Tasks: In the InvokeHTTP scheduling tab, increase Concurrent Tasks (e.g., 2-4) to submit multiple jobs in parallel, increasing throughput.

সমস্যা সমাধান

Verification

If the flow is configured correctly, checking the Data Provenance in NiFi will show SEND events to the Ilum URL.

List of job&#39;s requests

Inside Ilum, navigate to the Service's Job Logs to confirm execution:

The log of the job

সাধারণ সমস্যা

  • 400 Bad Request: Usually indicates invalid JSON syntax in ReplaceText. Verify quotes and brackets.
  • 404 Not Found: Check the Remote URL. Ensure the Group ID matches exactly.
  • Connection Refused: Verify the network policy allows traffic from the NiFi pod/host to the Ilum Core pod/host on port 9888.