Hi, in this article, I will show you how to use the EDA app for Frappe.
What is EDA ?
Event Driven Architecture.
Event-Driven Architecture (EDA) is a software architectural pattern that focuses on the flow and processing of events as the primary means of communication and coordination between different components or services in a system. In EDA, the system’s behavior is determined by the occurrence and handling of events, which represent significant occurrences or changes in the system or its external environment.
What is the Frappe Framework?
For detailed information about Frappe, you can visit this link: Frappe Official Website
What is RabbitMQ?
For detailed information about RabbitMQ, you can visit this link: RabbitMQ Official Website
What is Amazon MQ?
For detailed information about Amazon MQ, you can visit this link: Amazon MQ Documentation
Problem :
The problem is transferring data to a different Frappe server using API calls.
Issues :
- API calls fail when dealing with a large set of data.
- Internal developers often forget to handle errors and tracebacks.
Solution :
To create a message broker using RabbitMQ in Amazon MQ, we will follow the steps below:
1. Set up RabbitMQ in Amazon MQ.
2. Create a Producer program that determines the data flow and specifies the program to be executed on the consumer side.
3. Develop a Consumer program that runs every 10 minutes, retrieves new messages from the queue, and processes them.
By implementing these steps, we can establish a messaging system using RabbitMQ and Amazon MQ, where the Producer decides the data flow and the Consumer handles message processing at regular intervals.
Introduction :
RabbitMQ in Amazon MQ: Utilizing the Polling Method with Python
Amazon MQ is a cloud-based, fully managed message broker service that allows for the setup and operation of message brokers.
RabbitMQ is among the supported message brokers in Amazon MQ, offering reliable messaging capabilities.
This document delves into the notion of RabbitMQ in Amazon MQ and showcases the usage of the EDA Frappe app.
The concept employed in RabbitMQ involves utilizing the routing topic exchange method.
For further information, please refer to the following link: https://www.rabbitmq.com/tutorials/tutorial-four-python.html
Here is a basic producer code snippet to connect with AWS RabbitMQ and produce a message:
import pika
from pika.exchange_type import ExchangeType
import json
import ssl
exchange = "random_exchange"
message = "hello" #{"module":"eda_test_app.test","function":"test","argument":""}
# Set credentials
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
ssl_context.set_ciphers('ECDHE+AESGCM:!ECDSA')
url = "{AWS amps key}"
parameters = pika.URLParameters(url)
parameters.ssl_options = pika.SSLOptions(context=ssl_context)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# Declare exchange
channel.exchange_declare(exchange=exchange, exchange_type=ExchangeType.topic)
# Publish message with topic routing
routing_key = "frappe.sever.1"
channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message)
# Close connection
connection.close()
Here is a basic consumer code snippet to connect with AWS RabbitMQ and consume a message:
import pika
from pika.exchange_type import ExchangeType
import time
import json
import ssl
queue_name = "server_1"
exchange = "random_exchange"
routing_key = "frappe.sever.1"
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
ssl_context.set_ciphers('ECDHE+AESGCM:!ECDSA')
url = "{AWS amps key}"
parameters = pika.URLParameters(url)
parameters.ssl_options = pika.SSLOptions(context=ssl_context)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.exchange_declare(exchange=exchange, exchange_type=ExchangeType.topic)
channel.queue_declare(queue_name, exclusive=False)
channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key)
constumer_used = 0
while True:
method_frame, header_frame, body = channel.basic_get(queue=queue_name, auto_ack=True)
if method_frame:
print(body)
else:
time.sleep(10)
Make sure to replace “{AWS amps key}” with the appropriate values specific to your AWS RabbitMQ setup. This code establishes a connection, declares a queue, and publishes a message to the specified queue.
The above Producer and Consumer programs will produce and consume the data.
The main role of the program's key is crucial for its functionality.
queue_name = "server_1" # Declare the queue name in RabbitM
exchange = "random_exchange" # This queue will be bound to this exchange
routing_key = "frappe.server.1" # This key will determine the destination queue for data flowQ
The Consumer program will connect to this exchange and receive the data using the routing_key. The routing_key acts as a URL inside RabbitMQ, determining the queue to which the data needs to flow.
------------------------------------------------------------------------------------------------------------
In this app, the Dynamic Import program is utilized to address the need for centralized error monitoring and capturing in one place. This approach ensures that errors are not overlooked, even if developers forget to handle them.
Sample Code :
import importlib
module_name = "eda.test.test" # Specify the file path starting from the module name and ending with the file name
function_name = "run" # Specify the function inside the module
function_argument = "dfsd" # Pass the arguments
my_module = importlib.import_module(module_name)
my_function = getattr(my_module, function_name)
my_function(function_argument)
The above program dynamically imports the function and passes the arguments to it. Instead of using the conventional approach of importing functions like "from test_module.test import test", the provided code allows for dynamic importing of multiple functions from other libraries.
------------------------------------------------------------------------------------------------------------
So, in the EDA app, the major roles are played by the three aforementioned programs: Producer, Consumer, and Dynamic Importing. By installing the app directly in the Frappe Framework and configuring the settings in the EDA app’s doctype, you can begin publishing messages and utilizing its functionalities.
To install the EDA app in the Frappe Framework, follow the steps below:
1. Navigate to the frappe-bench folder and activate the virtual environment.
2. Run the following command in the command prompt:
pip3 install pika
3. Execute the following commands one by one:
bench get-app https://github.com/venkatasidhartha/eda.git --branch develo
bench --site {your-site-name} install-app eda
bench enable-scheduler
bench --site {your-site-name} migratep
Note: Replace `{your-site-name}` with the actual name of your site.
After installing the app on the Frappe server, follow these steps:
1. Open your browser and access the Frappe server.
2. Search for the “EDA Settings” doctype.
3. In the settings, click on the checkbox labeled “Switch” to enable the Producer and Consumer programs to run.
4. Fill in the following fields:
a. URL: Enter the AWS AMQPS key.
b. Exchange: Specify the exchange to which the Producer and Consumer will connect.
c. Routing Key: Set the current server key to start listening for the Consumer.
d. Queue Name: Connect with a static queue to prevent data loss.
e. Server: Specify other server routing keys to be used while writing the Producer code.
By following these steps, you will successfully install and configure the EDA app in the Frappe Framework.
Now let’s start writing the Producer code.
In this scenario, the Consumer is located on another Frappe server, specifically Frappe server 6. It’s important to note that in the EDA Settings doctype, we have already specified the routing key in the server field, which we can use.
Here is a sample code snippet:
from eda.publisher import send_msg, site_routing_key
import random
def trigger():
route_key = site_routing_key()
send_msg(module="test_app.test_folder.test_file",
function="test_run",
argument={"data": "hello"},
doc_uuid=random.random(),
to_server=route_key.server_6)
In the given example scenario, this program will be triggered when a new user is inserted into the user doctype through a hook file.
For more information on Frappe hook files, you can visit the Frappe FrameworkHooksDocumentation.
Additionally, there is a “Producer Logs” doctype present in the EDA app. It is used to log all produced data and capture any errors that may occur during the production process. These errors are stored within the log for reference and analysis.
Now, on the Consumer side
The Producer will publish the data from Frappe server 1, and Frappe server 6 will receive the data through the routing key concept in RabbitMQ.
The Consumer program runs every 10 minutes. Its main task is to fetch data from the RabbitMQ queue and store it in the Consumer Logs doctype. Once the data is stored in the Consumer Logs doctype, it triggers the dynamic import program to run the code specified in the producer code’s module key, function key, and argument key.
The module key specifies the source from which the program needs to be imported. The function key determines which function should be executed. The argument key is used to pass parameters in dictionary format.
If the program runs successfully, the status in the Consumer Logs doctype is changed to “Processed”. However, if any errors occur during program execution, the Consumer Logs doctype captures the error, stores it in the respective document, and changes the status to “Error”.
Therefore, if anyone wants to use the app, they can follow the instructions provided in the above documentation.
If you have any queries or need further assistance, please let me know.