Running Synergos Basic in Distributed Mode

This chapter serves as a guide on how to run Synergos Basic in distribute mode, i.e. different parties running on different machines.

This is useful for the scenarios where we want to run a proper federated learning with multiple participants with minimum setup requirements (without hyper-parameter tuning and model tracking, etc.).

If you wish to run a more complex configuration, e.g. running with multiple federated grids as a cluster with the support of extended components (this configuration is referred to as Synergos Cluster), please refer to Running Synergos Cluster in distributed mode.

In this chapter, a three-party federated grid will be created, with one party acting as TTP and the other two as workers. Each will be located on a separate Virtual Machine (VM).

Setting up

1.1 Preparing data

In each of the workers, prepare the data. Preprocess the data and format the directories as instructed here.

1.2 Installing Synergos

Follow the instructions here to install Synergos on different VMs. Since you are running Synergos Basic configuration, install Synergos TTP and Synergos Worker on the respective VMs. Additionally, in a virtual environment on your local computer, install the Synergos Driver python package.

The three VMs used in this guide are as follows:

VM ID Role the VM is playing VM IP used in this guide
A TTP <ttp-vm-ip> e.g. 172.19.152.157
B Worker 1 <worker-1-vm-ip> e.g. 172.19.152.152
C Worker 2 <worker-2-vm-ip> e.g. 172.19.152.153

1.3 Setting up the environment

Launch the docker containers

Start the docker containers on the respective VMs.

On VM A

 docker run -p <ttp-vm-ip>:5000:5000 -p <ttp-vm-ip>:5678:5678 \
 -p <ttp-vm-ip>:8020:8020 \
 -v <ttp-directory>/mlflow_test:/orchestrator/mlflow \
 -v <ttp-directory>/ttp_outputs:/orchestrator/outputs \
 -v <ttp-directory>/ttp_data:/orchestrator/data \
 --name ttp synergos_ttp:v0.1.0 \
 --logging_variant basic \
 -c

On VM B

docker run -p <worker-1-vm-ip>:5000:5000 \
-p <worker-1-vm-ip>:8020:8020 \
-v /<path-to-dataset>/data:/worker/data \
-v /<path-to-dataset>/outputs:/worker/outputs \
--name worker_1 synergos_worker:v0.1.0 \
--logging_variant basic

ON VM C

docker run -p <worker-2-vm-ip>:5000:5000 \
-p <worker-2-vm-ip>:8020:8020 \
-v /<path-to-dataset>/data:/worker/data \
-v /<path-to-dataset>/outputs:/worker/outputs \
--name worker_2 synergos_worker:v0.1.0 \
--logging_variant basic

When the containers are running successfully, a message should be displayed, as shown below.
001_flask_running


Preparing the Synergos script

As mentioned here, there are three phases in users' interaction with Synergos. During the three phases, necessary meta-data is supplied by the users, including both the orchestrator and participants. In this guide, all the meta-data supplied by different parties, for different phases are included in a single script. However, in a real setup, different parties can run different scripts to supply their respective information.

As you are running Synergos Basic, after all the meta-data has been collected, the script(s) leverages Synergos Driver package to send information and instructions to the TTP, which then coordinates with other workers to complete the federated learning process.

This guide also assumes that the script is running from your local computer, which is configured to forward communicate to the TTP via SSH tunnelling. This can be done by the following command:

ssh -L 15000:<ttp-vm-ip>:5000 <user>@<ttp-vm-ip>

This command opens a connection to the TTP, and forwards any communication on local port 15000 to port 5000 on the TTP. We use 15000 as an example, while you can use any other available local port numbers.

Phase 1: Registration

In this phase, the orchestrator and participants will provide different information. Orchestrator defines Collaboration, Project, Experiment, and Run. Participants register to the orchestrator, their intention to join the collaboration and project, hence submitting information about the compute and data they are using.

Create a Python script, named it script.py. Let's start by importing Synergos Driver package.

from synergos import Driver

host = "0.0.0.0"
port = 15000

driver = Driver(host=host, port=port)

The port variable shown above is the local port 15000 that has been configured to tunnel to the TTP.

1A. Orchestator creates a collaboration

collab_task = driver.collaborations
collab_task.create("collaboration_1") #collaboration_1 is the ID of the collaboration

1B. Orchestator creates a project

There are two kinds of action currently supported - "classify" if you are building a classification model or "regress" for regression model.

driver.projects.create(
    collab_id = "collaboration_1",
    project_id="project_1",
    action="classify",
    incentives={
        'tier_1': [],
        'tier_2': [],
        'tier_3': []
    }
)

incentives is related to the Contribution & Reward block, which is still under development. It will be ignored in current version of Synergos.

1C. Orchestator creates an experiment

In this section, you, as the orchestrator, create an experiment, and define the model architecture to be built. The model is defined as a list of dictionaries - each dictionary represents a layer; the order of elements in the list correspond to the order of model layers. Examples are shown below for a simple model with tabular data and a CNN model with image data.

A simple model with tabular data

driver.experiments.create(
    collab_id = "collaboration_1",
    project_id="project_1",
    expt_id="experiment_1",
    model=[
        {
            "activation": "sigmoid",
            "is_input": True,
            "l_type": "Linear",
            "structure": {
                "bias": True,
                "in_features": 18,
                "out_features": 1
            }
        }
    ]
)

A CNN model with image data

driver.experiments.create(
    collab_id = "collaboration_1",
    project_id="project_1",
    expt_id="experiment_1",
    model = [
        {
            "activation": "relu",
            "is_input": True,
            "l_type": "Conv2d",
            "structure": {
                "in_channels": 1,
                "out_channels": 4,
                "kernel_size": 3,
                "stride": 1,
                "padding": 1
            }
        },
        {
            "activation": None,
            "is_input": False,
            "l_type": "Flatten",
            "structure": {}
        },
        {
            "activation": "sigmoid",
            "is_input": False,
            "l_type": "Linear",
            "structure": {
                "bias": True,
                "in_features": 4 * 32 * 32,
                "out_features": 1
            }
        }
    ]
)

When defining each layer in the model argument, the parameters are:

  • activation - Any activation function found in the PyTorch's torch.nn.functional module
  • is_input - Indicates if the current layer is an input layer. If a layer is an input layer, it is considered to be "wobbly" layer, meaning that the in-features may be modified automatically to accommodate changes in input structure post-alignment.
  • l_type - Type of layer to be used, which can be found in PyTorch's torch.nn module.
  • structure - Any input parameters accepted in the layer class specified in l_type

Note: If you want to use FedGKT as the federated aggregation algorithm, the model has to be defined in a different manner due to implementation nuances. In this case, each activation layer in the neural network has to be defined as its own element in the model parameter list. The type of activation is defined in the l_type parameter. Suitable activations to use can be found in PyTorch's torch.nn module. For example, for a layer originally defined in model such as:

model=[
  {
      "activation": "sigmoid",
      "is_input": True,
      "l_type": "Linear",
      "structure": {
          "bias": True,
          "in_features": 18,
          "out_features": 1
      }
  }
]

will need to be in the following form, for FedGKT.

# model definition for FedGKT
model=[
  {
      "activation": None,
      "is_input": True,
      "l_type": "Linear",
      "structure": {
          "bias": True,
          "in_features": 18,
          "out_features": 1
      }
  },
  {
      "activation": None,
      "is_input": False,
      "l_type": "Sigmoid",
      "structure": {}
  }
]

1D. Orchestrator creates a run

A run is specific to a set of hyper-parameter values used.

driver.runs.create(
    collab_id = "collaboration_1",
    project_id="project_1",
    expt_id="experiment_1",
    run_id="run_1",
    rounds=2,
    epochs=1,
    base_lr=0.0005,
    max_lr=0.005,
    criterion="L1Loss",
    optimizer="SGD",
    lr_scheduler="CyclicLR"
)

The arguments to this method first requires the run identification parameters. Next it requires the following arguments:

  • round: number of federated aggregated rounds
  • epoch: number of epochs for each worker, during each round
  • criterion: loss function
  • lr_scheduler: torch scheduler module (optional)
  • optimizer: torch optim module (optional)

Further keyword arguments are required to pass model and loss hyper-parameters values to use. These keywords must match the argument attribute name in PyTorch for each module used. From the code block above, base_lr and max_lr are the arguments for the torch "CyclicLR" scheduler module used.

Most of the criterions found in PyTorch's torch.nn Loss Function are supported, except MarginRankingLoss, CosineEmbeddingLoss, TripletMarginLoss and CTCLoss.

1E. Participants register to the collaboration created above

It is now the participants' turns to supply information.

First, let's create individual participants.

driver.participants.create(
    participant_id="participant_1",
)

driver.participants.create(
    participant_id="participant_2",
)

1F. Participants declare the compute resource and data they are using

After this, each participant provides information about its compute resource. When adding a compute resource, provide information about host, port, and fport. The host is IP address of the compute resource. port is the port that is used to communicate with other parties during the federated learning process to exchange intermediate training results, while fport is the port that is used to receive commands from Synergos Driver, e.g. to dismantle the grid after training completes. The compute resource one participant declares will be registered by calling create(). When calling create(), provide the corresponding collab_id, project_id, and participant_id.

registration_task = driver.registrations

registration_task.add_node(
    host='172.19.152.152', # <worker-1-vm-ip>
    port=8020,
    f_port=5000,
    log_msgs=True,
    verbose=True
)
registration_task.create(
    collab_id = "collaboration_1",
    project_id="project_1",
    participant_id="participant_1",
    role="guest"
)

registration_task.add_node(
    host='172.19.152.153', # <worker-2-vm-ip>
    port=8020,
    f_port=5000,
    log_msgs=True,
    verbose=True
)
registration_task.create(
    collab_id = "collaboration_1",
    project_id="project_1",
    participant_id="participant_2",
    role="host"
)

Although participants are contributing data, they are not exposing data to other parties. They declare the tags of their data, for the project which they registered above. For more information on how to define the data tag, refer to this.

 driver.tags.create(
    collab_id = "collaboration_1",
    project_id="project_1",
    participant_id="participant_1",
    train=[["train"]], # data used in training
    evaluate=[["evaluate"]], # data used to evaluate model performance
    predict = [["predict"]] # data used for prediction/inference
)

driver.tags.create(
    collab_id = "collaboration_1",
    project_id="project_1",
    participant_id="participant_2",
    train=[["train"]],
    evaluate=[["evaluate"]],
    predict=[["predict"]]
)

Phase 2: Training

Once all the meta-data is collected in the previous phase, you can move to start the federated training.

2A. Perform feature alignment to dynamically configure multiple datasets and models for cross-party compatibility

In machine learning, one-hot encoding is usually applied on categorical variables. In Federated Learning, since different parties do not expose data to one another, one-hot encoding is done locally without the knowledge of other parties' data. Due to the issue of non-IID data, it is possible that the features from different parties will not align, after one-hot encoding.

To illustrate this point, we use the example where a few hospitals are collaboratively trying to train a federated model, to predict mortality in ICUs. One of the predictor features used in the model is a patient's ethnicity. Assuming that there are 5 ethnicities recorded in total, for all the patients served across these hospitals. One of these hospitals, however, does not have patients from a particular ethnicity due to its geographic location. If each hospital applies one-hot encoding locally before federated training starts, all the hospitals will have 5 ethnicity-related features, except the one which has only 4 ethnicity-related features. This causes error in federated learning.

Therefore feature alignment is important. Synergos aligns the dataset, inputs and outputs to get the proper symmetry across all participants before federated training starts.

driver.alignments.create(
    collab_id = "collaboration_1",
    project_id="project_1"
)

2B. Start training

Once feature alignment has been completed, training can be started.

model_resp = driver.models.create(
    collab_id = "collaboration_1",
    project_id="project_1",
    expt_id="experiment_1",
    run_id="run_1"
)

Phase 3: Evaluation

In Synergos, the orchestrator is not allowed see different parties' data, even though it is the one who coordinates the federated training. Only the participants are allowed to interact with the prediction results, and compare it with its own ground truth to derive the performance evaluation.

3A. Performance evaluation

The participants conduct performance evaluation of the federated model with the evaluate data they declared previously in Step 1F. The local evaluation by individual workers are then sent to TTP to aggregate and derive the final evaluation.

driver.validations.create(
    collab_id = "collaboration_1",
    project_id="project_1",
    expt_id="experiment_1",
    run_id="run_1"
)

3B. Perform prediction

The participants are also allowed to submit new data that was not declared for evaluation or training previously in Step 1F. The trained global model is used to obtain inferences on this new data. Do note that the predict tag declared here will override the one in Step 1F, if any.

driver.predictions.create(
    collab_id = "collaboration_1",
    tags={"project_1": [["predict"]]},
    participant_id="participant_1",
    project_id="project_1",
    expt_id="experiment_1",
    run_id="run_1"
)

Running the script

Now that the script has all the information and instructions needed to run the complete federated learning process, you can proceed to run the script. The script will be sent to the TTP, which then dispatches the instructions to all of the workers.

#Activate <synergos_env> virtual environment
conda activate <synergos_env>

#Navigate into the repository
cd ./synergos

#Run script.py
python script.py

The output for both workers and TTP should look like this if it is run successfully:

Worker

2020-10-01 09:31:10,238 - 172.19.152.152 - - [01/Oct/2020 09:31:10] "POST /worker/terminate/collaboration_1/project_1/experiment_1/run_1 HTTP/1.1" 200 -

TTP

2020-10-01 09:31:10,268 - 172.19.152.157 - - [01/Oct/2020 09:31:10] "POST /ttp/evaluate/participants/collaboration_1/participant_1/predictions/project_1/experiment_1/run_1 HTTP/1.1" 200 -

results matching ""

    No results matching ""