Running Synergos Cluster in Distributed Mode

In some settings, where the parties in a collaboration have access to a large expanse of resources, it is possible to orchestrate multiple federated learning jobs in parallel across a cluster of multiple federated grids. One such setting is to build multiple models in parallel as part of hyper-parameter tuning.

The configuration to run Synergos with multiple TTP-Workers grids as a cluster is referred to as Synergos Cluster. Synergos supports this with the extended components (i.e. those components not in Federation & Federated Grid), e.g. the Synergos MQ and Synergos Director in the Orchestration block.

This chapter serves as a guide on how to run Synergos Cluster in the distributed mode.

Three main extended components used in this guide are:

  • Synergos Director - handle federated learning job creation and dispatchment to multiple TTP-Workers grids.
  • Synergos MQ - message queue to manage a queue of federated learning jobs in different statues. The current version is built on top of RabbitMQ.
  • Synergos Federated MLOps - monitor models, experiments and training performances. The current version is built on top of MLFlow.

The key difference between running with and without those extended components is that, a new role Director is created by Synergos Director. Director is the one creating multiple federated learning job and dispatching to different TTP-Workers federated grid. But within a federated grid, TTP is still the one coordinating different workers to complete the federated learning cycle. You, as the orchestrator of the parallel federated learning cycles, interact with the Director via the Synergos Director package, instead of TTP in the individual federated grids.

This guide only shows the setup with one Director and one TTP-Worker grid which is formed with one TTP and two Workers. It can be easily extended to more than one grid.

While it is possible to install Synergos TTP, Synergos Director and Synergos MQ on a single machine, we recommend separating them into individual VMs to avoid overloading a single computer and creating a single point of failure for the setup.

An additional requirement is that TTP, Director amd Federated MLOps should have access to a shared mount volume.

Setting up

1.1 Preparing data

In each of the Workers, prepare the data. Preprocess the data and format the directories as instructed here.

1.2 Installing Synergos

Follow the instructions here to install Synergos on different VMs. Since you are running Synergos Cluster configuration, install Synergos TTP, Synergos Worker, Syergos Director, Synergos MQ and Synergos Federated MLOps on the respective VMs. Additionally, install Synergos Driver python package in your virtual environment locally.

The VMs used in this guide are as follow:

VM ID Role VM IP used in this guide
A MQ <mq-vm-ip> e.g. 172.19.152.151
B Director <director-vm-ip> e.g. 172.19.152.152
C TTP <ttp-vm-ip> e.g. 172.19.152.153
D Worker 1 <worker-1-vm-ip> e.g. 172.19.152.154
E Worker 2 <worker-2-vm-ip> e.g. 172.19.152.155
F Federated MLOps <mlops-vm-ip> e.g 172.19.152.156

1.3 Setting up the environment

For each of the VMs, pull the respective docker images according the installation guide based on the role the VMs are playing. The mapping of docker images to VMs is shown below.

VM ID Requisite Docker image
A synergos_mq:v0.1.0
B synergos_director:v0.1.0
C synergos_ttp_cluster:v0.1.0
D synergos_worker:v0.1.0
E synergos_worker:v0.1.0
F synergos_mlops:v0.1.0
  1. In a terminal, ssh to VM A and link your available local port to the VM's port 15672. In this guide, we use local port 15672 as an example.

    ssh -L 15672:<mq-vm-ip>:15672 <user>@<mq-vm-ip>
    

    This allows you to view RabbitMQ management UI on your browser at localhost:15672.

  2. In a separate terminal, ssh to VM B and link your local port to the VM's port 5000. In this guide, we use local port 15000 as an example.

    ssh -L 15000:<director-vm-ip>:5000 <user>@<director-vm-ip>
    

    This allows you to send commands to the Director, from your local environment using Synergos Driver.

  3. In a separate terminal, ssh to VM F and link your available local port to the VM's port 5500. In tthis guide, we use local port 5500 as an example.

    ssh -L 5500:<mlops-vm-ip>:5500 <user>@<mlops-vm-ip>
    

    This allows you to interact with MLFlow, which is the underlying model management component of the Federated MLOps. You can view the MLFlow UI on your browser at localhost:5500.

Start the docker containers

Start the docker containers on the respective VMs.

On VM A

  docker run -it --rm -p 15672:15672 -p 5672:5672 \
    --name mq \
    synergos_mq:v0.1.0

Once completed, you can check if Synergos MQ is running by accessing its management UI by visiting port 15672 on VM A. The username and password are both guest.

On VM B

  docker run -p 5000:5000 -v /<director-ttp-shared-directory>/ttp_data/:/director/data \
    -v /<director-ttp-shared-directory>/ttp_outputs/:/director/outputs \
    -v /<director-ttp-shared-directory>/mlflow_test/:/director/mlflow \
    --name director synergos_director:v0.1.0 \
    --id director \
    --logging_variant basic \

Note that the volume mount source for Director needs to be shared with that for TTP and Federated MLOps.

On VM C

  docker run -it --rm -p 5000:5000 \
    -v /<director-ttp-shared-directory>/ttp_data:/ttp/data \
    -v /<director-ttp-shared-directory>/ttp_outputs:/ttp/outputs \
    -v /<director-ttp-shared-directory>/mlflow_test:/ttp/mlflow \
    --name ttp synergos_ttp_cluster:v0.1.0  \
    --id ttp \
    --logging_variant basic \
    --queue rabbitmq <mq-vm-ip> 5672 \
    --censored

On VM D

  docker run -v /<worker-1-dataset-directory>/data:/worker/data \
    -v /<worker-1-dataset-directory>/outputs_1:/worker/outputs \
    --name worker_1 synergos_worker:v0.1.0 \
    --id worker_1 \
    --logging_variant basic \
    --censored

On VM E

  docker run -v /<worker-2-dataset-directory>/data:/worker/data \
    -v /<worker-2-dataset-directory>/outputs_2:/worker/outputs \
    --name worker_2 synergos_worker:v0.1.0 \
    --id worker_2 \
    --logging_variant basic \
    --censored

On VM F

docker run --rm \
  -p 5500:5500 \
  -v /<director-ttp-shared-directory>/mlflow_test/:/mlflow \ 
  -e GUNICORN_CMD_ARGS="--bind=0.0.0.0" \
  --name synmlops \ 
  synergos_mlops:v0.1.0 \

Note that the volume mount source for Federated MLOps needs to be shared with that for TTP and Director.


Preparing the Synergos script

As mentioned here, there are three phases in users' interaction with Synergos. During the three phases, necessary meta-data is supplied by the users, including both the orchestrator and participants. In this guide, all the meta-data supplied by different parties, for different phases are included in a single script. However, in a real setup, different parties can run different scripts to supply their respective information.

As you are running Synergos Cluster, after all the meta-data has been collected, the script(s) leverages Synergos Driver package to send information and instructions to the Director, which then orchestrates all of the TTP in (multiple) grid(s), which will subsequently coordinate with other workers to complete the federated learning process.

This guide also assumes that the script is running from your local computer, which is configured to forward communicate to the Director via SSH tunneling. This can be done by the following command:

ssh -L 15000:<director-vm-ip>:5000 <user>@<director-vm-ip>

This command opens a connection to the Director, and forwards any communication on local port 15000 to port 5000 on the Director. We use 15000 as an example, while you can use any other available local port numbers.

Phase 1: Registration

In this phase, the orchestrator and participants will provide different information. Orchestrator defines Collaboration, Project, Experiment, and Run. Participants register to the orchestrator, their intention to join the collaboration and project, hence submitting information about the compute and data they are using.

Create a Python script, named it synergos.py. Let's start by importing Synergos Driver package.

from synergos import Driver

host = "0.0.0.0"
port = 15000

driver = Driver(host=host, port=port)

The port variable shown above is the local port 15000 that has been configured to tunnel to the Director.

1A. Orchestator creates a collaboration

collab_task = driver.collaborations
# use designated <mq-vm-ip> for 'host' parameter to configure mq
collab_task.configure_mq(host="172.19.152.151", port=5672) # configure MQ
collab_task.create("collaboration_1") #collaboration_1 is the ID of the collaboration

1B. Orchestator creates a project

There are two kinds of action currently supported - "classify" if you are building a classification model or "regress" for regression model.

driver.projects.create(
    collab_id = "collaboration_1",
    project_id="project_1",
    action="classify",
    incentives={
        'tier_1': [],
        'tier_2': [],
        'tier_3': []
    }
)

incentives is related to the Contribution & Reward block, which is still under development. It will be ignored in current version of Synergos.

1C. Orchestator creates an experiment

In this section, you, as the orchestrator, create an experiment, and define the model architecture to be built. The model is defined as a list of dictionaries - each dictionary represents a layer; the order of elements in the list correspond to the order of model layers. Examples are shown below for a simple model with tabular data and a CNN model with image data.

A simple model with tabular data

driver.experiments.create(
    collab_id = "collaboration_1",
    project_id="project_1",
    expt_id="experiment_1",
    model=[
        {
            "activation": "sigmoid",
            "is_input": True,
            "l_type": "Linear",
            "structure": {
                "bias": True,
                "in_features": 18,
                "out_features": 1
            }
        }
    ]
)

A CNN model with image data

driver.experiments.create(
    collab_id = "collaboration_1",
    project_id="project_1",
    expt_id="experiment_1",
    model = [
        {
            "activation": "relu",
            "is_input": True,
            "l_type": "Conv2d",
            "structure": {
                "in_channels": 1,
                "out_channels": 4,
                "kernel_size": 3,
                "stride": 1,
                "padding": 1
            }
        },
        {
            "activation": None,
            "is_input": False,
            "l_type": "Flatten",
            "structure": {}
        },
        {
            "activation": "sigmoid",
            "is_input": False,
            "l_type": "Linear",
            "structure": {
                "bias": True,
                "in_features": 4 * 32 * 32,
                "out_features": 1
            }
        }
    ]
)

When defining each layer in the model argument, the parameters are:

  • activation - Any activation function found in the PyTorch's torch.nn.functional module
  • is_input - Indicates if the current layer is an input layer. If a layer is an input layer, it is considered to be "wobbly" layer, meaning that the in-features may be modified automatically to accommodate changes in input structure post-alignment.
  • l_type - Type of layer to be used, which can be found in PyTorch's torch.nn module.
  • structure - Any input parameters accepted in the layer class specified in l_type

Note: If you want to use FedGKT as the federated aggregation algorithm, the model has to be defined in a different manner due to implementation nuances. In this case, each activation layer in the neural network has to be defined as its own element in the model parameter list. The type of activation is defined in the l_type parameter. Suitable activations to use can be found in PyTorch's torch.nn module. For example, for a layer originally defined in model such as:

model=[
  {
      "activation": "sigmoid",
      "is_input": True,
      "l_type": "Linear",
      "structure": {
          "bias": True,
          "in_features": 18,
          "out_features": 1
      }
  }
]

will need to be in the following form, for FedGKT.

# model definition for FedGKT
model=[
  {
      "activation": None,
      "is_input": True,
      "l_type": "Linear",
      "structure": {
          "bias": True,
          "in_features": 18,
          "out_features": 1
      }
  },
  {
      "activation": None,
      "is_input": False,
      "l_type": "Sigmoid",
      "structure": {}
  }
]

1D. Orchestrator creates a run

A run is specific to a set of hyper-parameter values used.

driver.runs.create(
    collab_id = "collaboration_1",
    project_id="project_1",
    expt_id="experiment_1",
    run_id="run_1",
    rounds=2,
    epochs=1,
    base_lr=0.0005,
    max_lr=0.005,
    criterion="L1Loss",
    optimizer="SGD",
    lr_scheduler="CyclicLR"
)

The arguments to this method first requires the run identification parameters. Next it requires the following arguments:

  • round: number of federated aggregated rounds
  • epoch: number of epochs for each worker, during each round
  • criterion: loss function
  • lr_scheduler: torch scheduler module (optional)
  • optimizer: torch optim module (optional)

Further keyword arguments are required to pass model and loss hyper-parameters values to use. These keywords must match the argument attribute name in PyTorch for each module used. From the code block above, base_lr and max_lr are the arguments for the torch "CyclicLR" scheduler module used.

Most of the criterions found in PyTorch's torch.nn Loss Function are supported, except MarginRankingLoss, CosineEmbeddingLoss, TripletMarginLoss and CTCLoss.

1E. Participants register to the collaboration created above

It is now the participants' turns to supply information.

First, let's create individual participants.

driver.participants.create(
    participant_id="participant_1",
)

driver.participants.create(
    participant_id="participant_2",
)

1F. Participants declare the compute resource and data they are using

After this, each participant provides information about its compute resource. When adding a compute resource, provide information about host, port, and fport. The host is IP address of the compute resource. port is the port that is used to communicate with other parties during the federated learning process to exchange intermediate training results, while fport is the port that is used to receive commands from Synergos Driver, e.g. to dismantle the grid after training completes. If a participant has more than one compute resource to declare, add them one by one by calling add_node(). All the compute resources one participant declares will be registered by calling create(). When calling create(), provide the corresponding collab_id, project_id, and participant_id.

registration_task = driver.registrations

registration_task.add_node(
    host='172.19.152.154', # <worker-1-vm-ip>
    port=8020,
    f_port=5000,
    log_msgs=True,
    verbose=True
)
registration_task.create(
    collab_id = "collaboration_1",
    project_id="project_1",
    participant_id="participant_1",
    role="guest"
)

registration_task.add_node(
    host='172.19.152.155', # <worker-2-vm-ip>
    port=8020,
    f_port=5000,
    log_msgs=True,
    verbose=True
)
registration_task.create(
    collab_id = "collaboration_1",
    project_id="project_1",
    participant_id="participant_2",
    role="host"
)

Although participants are contributing data, they are not exposing data to other parties. They declare the tags of their data, for the project which they registered above. For more information on how to define the data tag, refer to this.

 driver.tags.create(
    collab_id = "collaboration_1",
    project_id="project_1",
    participant_id="participant_1",
    train=[["train"]], # data used in training
    evaluate=[["evaluate"]], # data used to evaluate model performance
    predict = [["predict"]] # data used for prediction/inference
)

driver.tags.create(
    collab_id = "collaboration_1",
    project_id="project_1",
    participant_id="participant_2",
    train=[["train"]],
    evaluate=[["evaluate"]],
    predict=[["predict"]]
)

Phase 2: Training

Once all the meta-data is collected in the previous phase, you can move to start the federated training.

2A. Perform feature alignment to dynamically configure multiple datasets and models for cross-party compatibility

In machine learning, one-hot encoding is usually applied on categorical variables. In Federated Learning, since different parties do not expose data to one another, one-hot encoding is done locally without the knowledge of other parties' data. Due to the issue of non-IID data, it is possible that the features from different parties will not align, after one-hot encoding.

To illustrate this point, we use the example where a few hospitals are collaboratively trying to train a federated model, to predict mortality in ICUs. One of the predictor features used in the model is a patient's ethnicity. Assuming that there are 5 ethnicities recorded in total, for all the patients served across these hospitals. One of these hospitals, however, does not have patients from a particular ethnicity due to its geographic location. If each hospital applies one-hot encoding locally before federated training starts, all the hospitals will have 5 ethnicity-related features, except the one which has only 4 ethnicity-related features. This causes error in federated learning.

Therefore feature alignment is important. Synergos aligns the dataset, inputs and outputs to get the proper symmetry across all participants before federated training starts.

driver.alignments.create(
    collab_id = "collaboration_1",
    project_id="project_1"
)

Note that the guide only used one set of hyper-parameter in one run so far, even though Director is used. If you want to run multiple runs in parallel for hyper-parameter tuning to better utilize the value of Synergos Cluster configuration, please jump to this section.

2B. Start training

Once feature alignment has been completed, training can be started.

model_resp = driver.models.create(
    collab_id = "collaboration_1",
    project_id="project_1",
    expt_id="experiment_1",
    run_id="run_1"
)

Phase 3: Evaluation

In Synergos, the orchestrator is not allowed see different parties' data, even though it is the one who coordinates the federated training. Only the participants are allowed to interact with the prediction results, and compare it with its own ground truth to derive the performance evaluation.

3A. Perform validation

The participants conduct performance evaluation of the federated model with the evaluate data they declared previously in Step 1F. The local evaluation by individual workers are then sent to TTP to aggregate and derive the final evaluation, which will be reported back to Director.

driver.validations.create(
    collab_id = "collaboration_1",
    project_id="project_1",
    expt_id="experiment_1",
    run_id="run_1"
)

3B. Perform prediction(s)

The participants are also allowed to submit new data that was not declared for evaluation or training previously in Step 1F. The trained global model is used to obtain inferences on this new data. Do note that the predict tag declared here will override the one in Step 1F, if any.

driver.predictions.create(
    collab_id = "collaboration_1",
    tags={"project_1": [["predict"]]},
    participant_id="participant_1",
    project_id="project_1",
    expt_id="experiment_1",
    run_id="run_1"
)

Running the script

Now that the script has all the information and instructions needed to run the complete federated learning process, you can proceed to run the script. The script will be sent to Director, which then orchestrates with (multiple) TTP(s) to subsequently dispatch the instructions to all the workers in the respective grids.

#Activate <synergos_env> virtual environment
conda activate <synergos_env>

#Navigate into the repository
cd ./synergos

#Run synergos.py
python synergos.py

Hyper-parameter Optimization

This section will guide you on how to conduct hyper-parameter tuning with Synergos Cluster.

Additional instructions are needed after Phase 2A and before Phase 2B.

  1. After Phase 2A, define a search space for hyper-parameter tuning process.

    The search space is defined as a dictionary of <hyperparameter>: {"_type": <selection as string>, "_value": <selection field>} key value pairs. Here, _type is the hyper-parameter search sampling method, while _value is the list of values or range to search.

    Synergos' hyper-parameter optimization functionality is built on top of Ray Tune, you can refer to search_space_api for possible options to use for _type. Currently, only choice, uniform, loguniform and randint are supported.

    For hyper-parameters that have been fixed to single values, you can define their values directly in the key-value pair of the search_space dictionary.

    For example, in the search_space below, rounds is a hyper-parameter to be tuned with choice strategy with two possible values, 1 or 2. The rest of the hyper-parameters are fixed to single values.

     search_space = {
         'algorithm': 'FedProx',
         'rounds': {"_type": "choice", "_value": [1,2]},
         'epochs': 1,
         'lr': 0.001,
         'weight_decay': 0.0,
         'lr_decay': 0.1,
         'mu': 0.1,
         'l1_lambda': 0.0,
         'l2_lambda': 0.0,
         'optimizer': 'SGD',
         'criterion': 'MSELoss',
         'lr_scheduler': 'CyclicLR',
         'delta': 0.0,
         'patience': 10,
         'seed': 42,
         'is_snn': False,
         'precision_fractional': 5,
         'base_lr': 0.0005,
         'max_lr': 0.005,
     }
    
  2. After the search space has been defined, you can run the hyper-parameter optimization process to generate n_samples runs via Synergos Driver:

    driver.optimizations.create(
     collab_id="collaboration_1",
     project_id="project_1",
     expt_id="experiment_1",
     tuner=None,
     metric=None,
     optimize_mode=None,
     search_space=search_space,
     n_samples=3)
    

    This initiates the splitting of the search space into individual runs for each hyper-parameter setting, which will be pushed into the message queue. TTPs then pop the messages to retrieve the setting and send to the workers to start a run.

    Once these hyper-parameter optimizations commands are include in the script, jump back to this section to run the script.

  1. Once evaluation tasks are completed, the completed_<uuid> MQ consumer on Synergos Director will display the message:
    VALIDATION COMPLETED - <project_id>/<expt_id>/optim_run_<uuid>
    
    You can then refer to the MLFlow UI on your browser at localhost:5500, to view the various runs, their corresponding hyper-parameter settings, and validated model performance.

results matching ""

    No results matching ""