> ## Documentation Index
> Fetch the complete documentation index at: https://prod-mint.classiq.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Quantum Optimization Training - part 2

<Card title="View on GitHub" icon="github" href="https://github.com/Classiq/classiq-library/blob/main/tutorials/workshops/finance_workshops/combi_workshop_equality_constriants_PO.ipynb">
  Open this notebook in GitHub to run it yourself
</Card>

## Dealing with constraint using portfolio optimization

In this workshop, we will solve the Portfolio Optimization problem using the Quantum Approximate Optimization Algorithm (QAOA), by **introducing how to add various types of constraints of the problem to the QAOA algorithm**.

#

### Guidance for the workshop:

**The `# TODO` or `# Your code` is there for you to do yourself.**
\*\*The `# Solution start` and `# Solution end` are only for helping you.

Please delete the `Solution` and try doing it yourself...\*\*

## Portfolio Optimization with the Quantum Approximate Optimization Algorithm (QAOA)

#

## Introduction

Portfolio optimization is the process of allocating a portfolio of financial assets optimally, according to some predetermined goal. Usually, the goal is to maximize the potential return while minimizing the financial risk of the portfolio.

One can express this problem as a combinatorial optimization problem like many other real-world problems. In this demo, we'll show how the Quantum Approximate Optimization Algorithm (QAOA) can be employed on the Classiq platform to solve the problem of portfolio optimization.

#

## Modeling the Portfolio Optimization Problem

As a first step, we have to model the problem mathematically. We will use a simple yet powerful model, which captures the essence of portfolio optimization:

* A portfolio is built from a pool of $n$ financial assets, each asset labeled $i \in \{1,\ldots,n\}$.
* Every asset's return is a random variable, with expected value $\mu_i$ and variance $\Sigma_i$ (modeling the financial risk involved in the asset).
* Every two assets $i \neq j$ have covariance $\Sigma_{ij}$ (modeling market correlation between assets).
* Every asset $i$ has a weight $w_i \in D_i = \{0,\ldots,b_i\}$ in the portfolio, with $b_i$ defined as the budget for asset $i$ (modeling the maximum allowed weight of the asset).
* The return vector $\mu$, the covariance matrix $\Sigma$ and the weight vector $w$ are defined naturally from the above (with the domain $D = D_1 \times D_2 \times \ldots \times D_n$ for $w$).

With the above definitions, the total expected return of the portfolio is $\mu^T w$ and the total risk is $w^T \Sigma w$. We'll use a simple difference of the two as our cost function, with the additional constraint that the total sum of assets does not exceed a predefined budget $B$. We note that there are many other possibilities for defining a cost function (e.g. add a scaling factor to the risk/return or even some non-linear relation).

For reasons of simplicity we select the model below, and we assume all constants and variables are dimensionless.
Thus, the problem is, given the constant inputs $\mu, \Sigma, D, B$, to find optimal variable $w$ as follows:

$$
\begin{equation*}
\min_{w \in D}  w^T \Sigma w - \mu^T w,
\end{equation*}
$$

subject to $\Sigma_{i} w_i \leq B$.

The case presented above is called integer portfolio optimization, since the domains $D_i$ are over the (positive) integers.

Another variation of this problem defines weights over binary domains, and will not be discussed here.

```python theme={null}
import math

# import matplotlib.pyplot as plt
import numpy as np
from scipy.optimize import minimize

from classiq import *
```

#

### First we will solve the problem without adding constraint.

Just:

$$
\begin{equation*}
\min_{w \in D}  w^T \Sigma w - \mu^T w
\end{equation*}
$$

#

### Then, we will add equality constraint:

$$
\begin{equation*}
\min_{w \in D}  w^T \Sigma w - \mu^T w
\end{equation*}
$$

subject to:

$$
\Sigma_{i} w_i == B
$$

#

### Finaly, we will add inequality constraints:

$$
\begin{equation*}
\min_{w \in D}  w^T \Sigma w - \mu^T w,
\end{equation*}
$$

subject to:

$$
\Sigma_{i} w_i \leq B
$$

## The Portfolio Optimization Problem Parameters

First we define the parameters of the optimization problem, which include the expected return vector, the covariance matrix, the total budget and the asset-specific budgets.

```python theme={null}
returns = np.array([3, 4, -1])
# fmt: off
covariances = np.array(
    [
        [ 0.9,  0.5, -0.7],
        [ 0.5,  0.9, -0.2],
        [-0.7, -0.2,  0.9],
    ]
)
# fmt: on
total_budget = 6
```

## Defining the variables

```python theme={null}
num_assets = 3

num_qubits_per_asset = 2  # Defines the possible values of choosing each asset.


class PortfolioOptimizationVars(QStruct):
    a: QArray[QNum[num_qubits_per_asset], num_assets]
```

#

## Define the expected return

Define a function that describes $\mu^T w$ where $\mu$ is the `return` vector.

```python theme={null}
def expected_return_cost(
    returns: np.ndarray, w_array: PortfolioOptimizationVars
) -> float:
    # Your code here

    # Solution start
    return sum(returns[i] * w_array.a[i] for i in range(len(returns)))
    # Solution end
```

#

## Define the risk term

Define a function that describes the risk term in the objective function $w^T \Sigma w$ where $\Sigma$ is the `covariances` matrix.

```python theme={null}
def risk_cost(covariances: np.ndarray, w_array: PortfolioOptimizationVars) -> float:
    # Your code here

    # hint:
    # risk_term =  sum(
    #     ... * sum(... for j in range(covariances.shape[0])) for i in range(covariances.shape[0])
    # )

    # Solution start
    risk_term = sum(
        w_array.a[i]
        * sum(w_array.a[j] * covariances[i][j] for j in range(covariances.shape[0]))
        for i in range(covariances.shape[0])
    )
    # Solution end
    return risk_term
```

#

## Define the entire portfolio optimization objective function

Combine the risk term and the expected return functions.

There a a term called return coefficient `return_coeff` that defines how much you prefer certainly over return.

Higher values is more risky but can be more profitable.

Later try changing it to see how the result changes.

```python theme={null}
return_coeff = 1.5


def objective_portfolio(
    w_array: PortfolioOptimizationVars,
    returns: np.ndarray,
    covariances: np.ndarray,
    return_coeff: float,
) -> float:
    # Your code here

    # Solution start

    return risk_cost(covariances, w_array) - return_coeff * expected_return_cost(
        returns, w_array
    )
    # Solution end
```

## Build the QAOA circuit

```python theme={null}
@qfunc
def mixer_layer(beta: CReal, qba: QArray):
    # Your code here

    # Solution start
    apply_to_all(lambda q: RX(beta, q), qba)
    # Solution end
```

```python theme={null}

NUM_LAYERS = 4


@qfunc
def main(
    params: CArray[CReal, 2 * NUM_LAYERS], w_array: Output[PortfolioOptimizationVars]
) -> None:
    # Your code here

    # Allocating the qubits
    allocate(w_array)

    # Build the QAOA circuit similarly to the maxcut

    # Solution start
    hadamard_transform(w_array)

    repeat(
        count=params.len / 2,
        iteration=lambda i: (
            phase(
                objective_portfolio(w_array, returns, covariances, return_coeff),
                params[2 * i],
            ),
            mixer_layer(params[2 * i + 1], w_array),
        ),
    )
    # Solution end
```

## Synthesizing and visualizing

```python theme={null}
qprog = synthesize(main)
show(qprog)
```

<Info>
  **Output:**

  ```

  Quantum program link: https://platform.classiq.io/circuit/2yj5cH6Av1SoHt6eo5xO1mDKSWs
    

  ```
</Info>

## Execution and post processing

For the hybrid execution, we use `ExecutionSession`, which can evaluate the circuit in multiple methods, such as sampling the circuit, giving specific values for the parameters, and evaluating to a specific Hamiltonian, which is very common in chemical applications.

In QAOA, we will use the `estimate_cost` method, which samples the cost function and returns their average cost from all measurements.

That helps to optimize easily.

```python theme={null}
NUM_SHOTS = 1000

es = ExecutionSession(
    qprog, execution_preferences=ExecutionPreferences(num_shots=NUM_SHOTS)
)


# Build `initial_params` list of np.array type.
# The gamma values should start from 0 and, in each layer, should approach closer to 1 linearly
# The beta values should start from 1 and in each layer, should approach closer to 0 linearly
# Then unify it to one list so scipy minimize can digest it.
# Your code here


# Solution start
def initial_qaoa_params(NUM_LAYERS) -> np.ndarray:
    initial_gammas = math.pi * np.linspace(0, 1, NUM_LAYERS)
    initial_betas = math.pi * np.linspace(1, 0, NUM_LAYERS)

    initial_params = []

    for i in range(NUM_LAYERS):
        initial_params.append(initial_gammas[i])
        initial_params.append(initial_betas[i])

    return np.array(initial_params)


# Solution end

initial_params = initial_qaoa_params(NUM_LAYERS)
```

## Define a callback function to track the optimization

```python theme={null}
# Record the steps of the optimization
intermediate_params = []
objective_values = []


# Define the callback function to store the intermediate steps
def callback(xk):
    intermediate_params.append(xk)
```

## Define the objective function

```python theme={null}
# Your code with hints in the comments:

# cost_func = lambda state: objective_portfolio(
#     w_array = ...,
#     returns = ...,
#     covariances = ...,
#     return_coeff= ...
# )
# def estimate_cost_func(params: np.ndarray) -> float:
#     objective_value = es.estimate_cost(
#         cost_func = ...,
#         parameters = {"params": params.tolist()}
#     )
#     # Your code here
#     # Save the result for convergence graph
#
#     return objective_value


# Solution start
cost_func = lambda state: objective_portfolio(
    w_array=state["w_array"],
    returns=returns,
    covariances=covariances,
    return_coeff=return_coeff,
)


def estimate_cost_func(params: np.ndarray) -> float:
    objective_value = es.estimate_cost(
        cost_func=cost_func, parameters={"params": params.tolist()}
    )
    objective_values.append(objective_value)
    return objective_value


# Solution end
```

## Optimize

```python theme={null}
# Your code with hints in the comments:

# optimization_res = minimize(
#     fun = ...,
#     x0=...,
#     method="COBYLA",
#     callback=...,
#     options={"maxiter": 10},
# )

# Solution start
optimization_res = minimize(
    fun=estimate_cost_func,
    x0=initial_params,
    method="COBYLA",
    callback=callback,
    options={"maxiter": 30},
)
# Solution end
```

## Look at the results

```python theme={null}
res = es.sample({"params": optimization_res.x.tolist()})

print(f"Optimized parameters: {optimization_res.x.tolist()}")

sorted_counts = sorted(
    res.parsed_counts,
    key=lambda pc: objective_portfolio(
        pc.state["w_array"],
        returns=returns,
        covariances=covariances,
        return_coeff=return_coeff,
    ),
)

for sampled in sorted_counts:
    w_sample = sampled.state["w_array"]
    print(
        f"solution={w_sample} probability={sampled.shots/NUM_SHOTS} "
        f"cost={objective_portfolio(w_array=w_sample,returns = returns, covariances = covariances, return_coeff= return_coeff)}"
    )
```

## Convergence graph

```python theme={null}
# plt.plot(objective_values)
# plt.xlabel("Iteration")
# plt.ylabel("Objective Value")
# plt.title("Optimization Progress")
```

## The Entire Solution

```python theme={null}
import math
from typing import List

import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
from scipy.optimize import minimize

from classiq import *

NUM_LAYERS = 3

returns = np.array([3, 4, -1])
# fmt: off
covariances = np.array(
    [
        [ 0.9,  0.5, -0.7],
        [ 0.5,  0.9, -0.2],
        [-0.7, -0.2,  0.9],
    ]
)
# fmt: on
total_budget = 6
specific_budgets = 3

return_coeff = 1.7

num_assets = 3

num_qubits_per_asset = 2

# start with integer variables


class PortfolioOptimizationVars(QStruct):
    a: QArray[QNum[num_qubits_per_asset], num_assets]


def expected_return_cost(
    returns: np.ndarray, w_array: PortfolioOptimizationVars
) -> float:
    return sum(returns[i] * w_array.a[i] for i in range(len(returns)))


def risk_cost(covariances: np.ndarray, w_array: PortfolioOptimizationVars) -> float:
    risk_term = sum(
        w_array.a[i]
        * sum(w_array.a[j] * covariances[i][j] for j in range(covariances.shape[0]))
        for i in range(covariances.shape[0])
    )
    return risk_term


def objective_portfolio(
    w_array: PortfolioOptimizationVars,
    returns: np.ndarray,
    covariances: np.ndarray,
    return_coeff: float,
) -> float:
    return risk_cost(covariances, w_array) - return_coeff * expected_return_cost(
        returns, w_array
    )


@qfunc
def mixer_layer(beta: CReal, qba: QArray):
    apply_to_all(lambda q: RX(beta, q), qba)


@qfunc
def main(
    params: CArray[CReal, 2 * NUM_LAYERS], w_array: Output[PortfolioOptimizationVars]
) -> None:

    allocate(w_array)

    hadamard_transform(w_array)

    repeat(
        count=params.len / 2,
        iteration=lambda i: (
            phase(
                objective_portfolio(w_array, returns, covariances, return_coeff),
                params[2 * i],
            ),
            mixer_layer(params[2 * i + 1], w_array),
        ),
    )


qprog = synthesize(main)
show(qprog)


NUM_SHOTS = 1000

es = ExecutionSession(
    qprog, execution_preferences=ExecutionPreferences(num_shots=NUM_SHOTS)
)


def initial_qaoa_params(NUM_LAYERS) -> np.ndarray:
    initial_gammas = math.pi * np.linspace(0, 1, NUM_LAYERS)
    initial_betas = math.pi * np.linspace(1, 0, NUM_LAYERS)

    initial_params = []

    for i in range(NUM_LAYERS):
        initial_params.append(initial_gammas[i])
        initial_params.append(initial_betas[i])

    return np.array(initial_params)


initial_params = initial_qaoa_params(NUM_LAYERS)

# Record the steps of the optimization
intermediate_params = []
objective_values = []


# Define the callback function to store the intermediate steps
def callback(xk):
    intermediate_params.append(xk)


cost_func = lambda state: objective_portfolio(
    w_array=state["w_array"],
    returns=returns,
    covariances=covariances,
    return_coeff=return_coeff,
)


def estimate_cost_func(params: np.ndarray) -> float:
    objective_value = es.estimate_cost(
        cost_func=cost_func, parameters={"params": params.tolist()}
    )
    objective_values.append(objective_value)
    return objective_value


optimization_res = minimize(
    fun=estimate_cost_func,
    x0=initial_params,
    method="COBYLA",
    callback=callback,
    options={"maxiter": 10},
)

res = es.sample({"params": optimization_res.x.tolist()})

print(f"Optimized parameters: {optimization_res.x.tolist()}")

sorted_counts = sorted(
    res.parsed_counts,
    key=lambda pc: objective_portfolio(
        pc.state["w_array"],
        returns=returns,
        covariances=covariances,
        return_coeff=return_coeff,
    ),
)

for sampled in sorted_counts:
    w_sample = sampled.state["w_array"]
    print(
        f"solution={w_sample} probability={sampled.shots/NUM_SHOTS} "
        f"cost={objective_portfolio(w_array=w_sample,returns = returns, covariances = covariances, return_coeff= return_coeff)}"
    )

plt.plot(objective_values)
plt.xlabel("Iteration")
plt.ylabel("Objective Value")
plt.title("Optimization Progress")
```

## Adding equality constraint

The method to deal with equality constraint, namely:

$$
\begin{equation*}
\min_{w \in D}  w^T \Sigma w - \mu^T w
\end{equation*}
$$

subject to:

$$
\Sigma_{i} w_i == B
$$

is to add a penalty term to lead us to the set of valid solutions. To do so, we will change the objective function as follows:

$$
\begin{equation*}
\min_{w \in D}  w^T \Sigma w - \mu^T w + P * (\Sigma_{i} w_i - B)^2
\end{equation*}
$$

Where $P$ is the penalty value you need to define.

## Define the objective value with a penalty term

```python theme={null}
PENALTY = 2.0


def objective_portfolio_equality(
    w_array: PortfolioOptimizationVars,
    returns: np.ndarray,
    covariances: np.ndarray,
    return_coeff: float,
) -> float:
    # Your code here

    # Solution start

    return (
        risk_cost(covariances, w_array)
        - return_coeff * expected_return_cost(returns, w_array)
        + Penalty * (sum(w_array.a[i] for i in range(len(returns))) - total_budget) ** 2
    )
    # Solution end
```

#

### Repeat the whole process all over again with `objective_portfolio_equality`

## Solution

```python theme={null}
import math
from typing import List

import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
from scipy.optimize import minimize

from classiq import *

NUM_LAYERS = 3

returns = np.array([3, 4, -1])
# fmt: off
covariances = np.array(
    [
        [ 0.9,  0.5, -0.7],
        [ 0.5,  0.9, -0.2],
        [-0.7, -0.2,  0.9],
    ]
)
# fmt: on
total_budget = 6
specific_budgets = 3

return_coeff = 10.0

num_assets = 3

num_qubits_per_asset = 2

Penalty = 2.0

# start with integer variables


class PortfolioOptimizationVars(QStruct):
    a: QArray[QNum[num_qubits_per_asset], num_assets]


def expected_return_cost(
    returns: np.ndarray, w_array: PortfolioOptimizationVars
) -> float:
    return sum(returns[i] * w_array.a[i] for i in range(len(returns)))


def risk_cost(covariances: np.ndarray, w_array: PortfolioOptimizationVars) -> float:
    risk_term = sum(
        w_array.a[i]
        * sum(w_array.a[j] * covariances[i][j] for j in range(covariances.shape[0]))
        for i in range(covariances.shape[0])
    )
    return risk_term


def objective_portfolio_equality(
    w_array: PortfolioOptimizationVars,
    returns: np.ndarray,
    covariances: np.ndarray,
    return_coeff: float,
) -> float:
    return (
        risk_cost(covariances, w_array)
        - return_coeff * expected_return_cost(returns, w_array)
        + Penalty * (sum(w_array.a[i] for i in range(len(returns))) - total_budget) ** 2
    )


@qfunc
def mixer_layer(beta: CReal, qba: QArray):
    apply_to_all(lambda q: RX(beta, q), qba)


@qfunc
def main(
    params: CArray[CReal, 2 * NUM_LAYERS], w_array: Output[PortfolioOptimizationVars]
) -> None:
    allocate(w_array)

    hadamard_transform(w_array)

    repeat(
        count=params.len / 2,
        iteration=lambda i: (
            phase(
                objective_portfolio_equality(
                    w_array, returns, covariances, return_coeff
                ),
                params[2 * i],
            ),
            mixer_layer(params[2 * i + 1], w_array),
        ),
    )


qprog = synthesize(main)
show(qprog)


NUM_SHOTS = 1000

es = ExecutionSession(
    qprog, execution_preferences=ExecutionPreferences(num_shots=NUM_SHOTS)
)


def initial_qaoa_params(NUM_LAYERS) -> np.ndarray:
    initial_gammas = math.pi * np.linspace(0, 1, NUM_LAYERS)
    initial_betas = math.pi * np.linspace(1, 0, NUM_LAYERS)

    initial_params = []

    for i in range(NUM_LAYERS):
        initial_params.append(initial_gammas[i])
        initial_params.append(initial_betas[i])

    return np.array(initial_params)


initial_params = initial_qaoa_params(NUM_LAYERS)

# Record the steps of the optimization
intermediate_params = []
objective_values = []


# Define the callback function to store the intermediate steps
def callback(xk):
    intermediate_params.append(xk)


cost_func = lambda state: objective_portfolio_equality(
    state["w_array"],
    returns=returns,
    covariances=covariances,
    return_coeff=return_coeff,
)


def estimate_cost_func(params: np.ndarray) -> float:
    objective_value = es.estimate_cost(
        cost_func=cost_func, parameters={"params": params.tolist()}
    )
    objective_values.append(objective_value)
    return objective_value


optimization_res = minimize(
    estimate_cost_func,
    x0=initial_params,
    method="COBYLA",
    callback=callback,
    options={"maxiter": 40},
)

res = es.sample({"params": optimization_res.x.tolist()})

print(f"Optimized parameters: {optimization_res.x.tolist()}")

sorted_counts = sorted(
    res.parsed_counts,
    key=lambda pc: objective_portfolio_equality(
        pc.state["w_array"],
        returns=returns,
        covariances=covariances,
        return_coeff=return_coeff,
    ),
)

for sampled in sorted_counts:
    w = sampled.state["w_array"]
    print(
        f"solution={w} probability={sampled.shots/NUM_SHOTS} "
        f"cost={objective_portfolio_equality(w_array=w,returns = returns, covariances = covariances, return_coeff= return_coeff)}"
    )

plt.plot(objective_values)
plt.xlabel("Iteration")
plt.ylabel("Objective Value")
plt.title("Optimization Progress")
```
