tenseal学习笔记二(代码片段)

一只特立独行的猫 一只特立独行的猫     2022-12-04     773

关键词:

文章目录

Training and Evaluation of Logistic Regression on Encrypted Data

Download the coronary heart disease dataset

使用同态加密和Logistic Regression对冠心病(coronary heart disease ,CHD)进行预测
数据集使用的是Kaggle的数据集,可以点击here进行下载。
另外,实验的目的是探究HE密文训练与明文训练的差异,为了减少训练的计算量,会对数据及计算过程进行一些取舍。如果不想用CHD的数据,也可以用随机数来测试。
CHD数据集的格式如下:

maleageeducationcurrentSmokercigsPerDaycigsPerDayBPMedsprevalentStrokeprevalentHypdiabetestotCholsysBPdiaBPBMIheartRateglucoseTenYearCHD
139400000019510670269780770

Initialize dataset

这里采用了CHD的数据进行测试,但是也提供了随机数的实现,只需要改变注释的代码即可。

import torch
import tenseal as ts
import pandas as pd
import random
from time import time

# those are optional and are not necessary for training
import numpy as np
import matplotlib.pyplot as plt

#设置随机种子,用来初始化模型权重
torch.random.manual_seed(73)
random.seed(73)


def split_train_test(x, y, test_ratio=0.3):
    #拆分训练集和测试集
    idxs = [i for i in range(len(x))]
    random.shuffle(idxs)
    # delimiter between test and train data
    delim = int(len(x) * test_ratio)
    test_idxs, train_idxs = idxs[:delim], idxs[delim:]
    return x[train_idxs], y[train_idxs], x[test_idxs], y[test_idxs]


def heart_disease_data():
    data = pd.read_csv("./data/framingham.csv")
    # 删除有空值的样本
    data = data.dropna()
    # 删除"education", "currentSmoker", "BPMeds", "diabetes", "diaBP", "BMI"这些特征
    data = data.drop(columns=["education", "currentSmoker", "BPMeds", "diabetes", "diaBP", "BMI"])
    # balance data
    grouped = data.groupby('TenYearCHD')
    data = grouped.apply(lambda x: x.sample(grouped.size().min(), random_state=73).reset_index(drop=True))
    # 提取标签
    y = torch.tensor(data["TenYearCHD"].values).float().unsqueeze(1)
    data = data.drop("TenYearCHD", 'columns')
    # 数据归一化
    data = (data - data.mean()) / data.std()
    x = torch.tensor(data.values).float()
    return split_train_test(x, y)


def random_data(m=1024, n=2):
    # data separable by the line `y = x`
    x_train = torch.randn(m, n)
    x_test = torch.randn(m // 2, n)
    y_train = (x_train[:, 0] >= x_train[:, 1]).float().unsqueeze(0).t()
    y_test = (x_test[:, 0] >= x_test[:, 1]).float().unsqueeze(0).t()
    return x_train, y_train, x_test, y_test


# You can use whatever data you want without modification to the tutorial
# x_train, y_train, x_test, y_test = random_data()
x_train, y_train, x_test, y_test = heart_disease_data()

print("############# Data summary #############")
print(f"x_train has shape: x_train.shape")
print(f"y_train has shape: y_train.shape")
print(f"x_test has shape: x_test.shape")
print(f"y_test has shape: y_test.shape")
print("#######################################")

输出如下

############# Data summary #############
x_train has shape: torch.Size([780, 9])
y_train has shape: torch.Size([780, 1])
x_test has shape: torch.Size([334, 9])
y_test has shape: torch.Size([334, 1])
#######################################

Training a Logistic Regression Model

没什么需要注意的点,就是一个简单的logistic回归的过程。

class LR(torch.nn.Module):

    def __init__(self, n_features):
        super(LR, self).__init__()
        self.lr = torch.nn.Linear(n_features, 1)
        
    def forward(self, x):
        out = torch.sigmoid(self.lr(x))
        return out

n_features = x_train.shape[1]
model = LR(n_features)
# use gradient descent with a learning_rate=1
optim = torch.optim.SGD(model.parameters(), lr=1)
# use Binary Cross Entropy Loss
criterion = torch.nn.BCELoss()

# define the number of epochs for both plain and encrypted training
EPOCHS = 5

def train(model, optim, criterion, x, y, epochs=EPOCHS):
    for e in range(1, epochs + 1):
        optim.zero_grad()
        out = model(x)
        loss = criterion(out, y)
        loss.backward()
        optim.step()
        print(f"Loss at epoch e: loss.data")
    return model

model = train(model, optim, criterion, x_train, y_train)
def accuracy(model, x, y):
    out = model(x)
    correct = torch.abs(y - out) < 0.5
    return correct.float().mean()

plain_accuracy = accuracy(model, x_test, y_test)
print(f"Accuracy on plain test_set: plain_accuracy")

def accuracy(model, x, y):
    out = model(x)
    correct = torch.abs(y - out) < 0.5
    return correct.float().mean()

plain_accuracy = accuracy(model, x_test, y_test)
print(f"Accuracy on plain test_set: plain_accuracy")

最后的正确率在70.35%,现在我们用HE加密训练来测试测试一下。

Encrypted Evaluation

采用同态加密的方式来对模型进行评估。
评估方式伪码如下
明文方式:
model(x_test) compare with y_test

HE方式:
model.weight.decrypt()
model.bias.decrypt()
x_test = x_test.decrypt()
y_test = y_test.decrypt()
model(x_test) compare with y_test
具体代码

def encrypted_evaluation(model, enc_x_test, y_test):
    t_start = time()
    
    correct = 0
    for enc_x, y in zip(enc_x_test, y_test):
        # encrypted evaluation
        enc_out = model(enc_x)
        # plain comparaison
        out = enc_out.decrypt()
        out = torch.tensor(out)
        out = torch.sigmoid(out)
        if torch.abs(out - y) < 0.5:
            correct += 1
    
    t_end = time()
    print(f"Evaluated test_set of len(x_test) entries in int(t_end - t_start) seconds")
    print(f"Accuracy: correct/len(x_test) = correct / len(x_test)")
    return correct / len(x_test)
    

encrypted_accuracy = encrypted_evaluation(eelr, enc_x_test, y_test)
diff_accuracy = plain_accuracy - encrypted_accuracy
print(f"Difference between plain and encrypted accuracies: diff_accuracy")
if diff_accuracy < 0:
    print("Oh! We got a better accuracy on the encrypted test-set! The noise was on our side...")

最后的正确率在62.57%,相差不大,可以认为同态加密对模型的保护较好。

Training an Encrypted Logistic Regression Model on Encrypted Data

通过加密模型进行评估的方式,验证了HE在加密模型中进行运算的可能性。现在只是使用HE对训练数据进行即加密训练,进行模型效果评估。
明文方式:
pred = model(x_train)
loss = loss(pred ,y_test)
loss.back()
model.updataParm()
acc = model(x_test) compare with y_test
密文方式
model.encrypt()
pred = model(enc_x_test)
loss = loss(predm,enc_y_test)
loss.back()
model.updataParm()
model.decrypt()
acc = model(x_test) compare with y_test
代码:

import torch
import tenseal as ts
import pandas as pd
import random
from time import time

# those are optional and are not necessary for training
import numpy as np
import matplotlib.pyplot as plt

#设置随机种子,用来初始化模型权重
torch.random.manual_seed(73)
random.seed(73)


def split_train_test(x, y, test_ratio=0.3):
    #拆分训练集和测试集
    idxs = [i for i in range(len(x))]
    random.shuffle(idxs)
    # delimiter between test and train data
    delim = int(len(x) * test_ratio)
    test_idxs, train_idxs = idxs[:delim], idxs[delim:]
    return x[train_idxs], y[train_idxs], x[test_idxs], y[test_idxs]


def heart_disease_data():
    data = pd.read_csv("./data/framingham.csv")
    # 删除有空值的样本
    data = data.dropna()#
    # 数据归一化
    data = (data - data.mean()) / data.std()
    x = torch.tensor(data.values).float()
    return split_train_test(x, y)


def random_data(m=1024, n=2):
    # data separable by the line `y = x`
    x_train = torch.randn(m, n)
    x_test = torch.randn(m // 2, n)
    y_train = (x_train[:, 0] >= x_train[:, 1]).float().unsqueeze(0).t()
    y_test = (x_test[:, 0] >= x_test[:, 1]).float().unsqueeze(0).t()
    return x_train, y_train, x_test, y_test


# You can use whatever data you want without modification to the tutorial
# x_train, y_train, x_test, y_test = random_data()
x_train, y_train, x_test, y_test = heart_disease_data()

print("############# Data summary #############")
print(f"x_train has shape: x_train.shape")
print(f"y_train has shape: y_train.shape")
print(f"x_test has shape: x_test.shape")
print(f"y_test has shape: y_test.shape")
print("#######################################")

class LR(torch.nn.Module):

    def __init__(self, n_features):
        super(LR, self).__init__()
        self.lr = torch.nn.Linear(n_features, 1)
        
    def forward(self, x):
        out = torch.sigmoid(self.lr(x))
        return out

n_features = x_train.shape[1]
model = LR(n_features)
# use gradient descent with a learning_rate=1
optim = torch.optim.SGD(model.parameters(), lr=1)
# use Binary Cross Entropy Loss
criterion = torch.nn.BCELoss()

# define the number of epochs for both plain and encrypted training
EPOCHS = 5

def train(model, optim, criterion, x, y, epochs=EPOCHS):
    for e in range(1, epochs + 1):
        optim.zero_grad()
        out = model(x)
        loss = criterion(out, y)
        loss.backward()
        optim.step()
        print(f"Loss at epoch e: loss.data")
    return model

model = train(model, optim, criterion, x_train, y_train)
def accuracy(model, x, y):
    out = model(x)
    correct = torch.abs(y - out) < 0.5
    return correct.float().mean()

plain_accuracy = accuracy(model, x_test, y_test)
print(f"Accuracy on plain test_set: plain_accuracy")

def accuracy(model, x, y):
    out = model(x)
    correct = torch.abs(y - out) < 0.5
    return correct.float().mean()

plain_accuracy = accuracy(model, x_test, y_test)
print(f"Accuracy on plain test_set: plain_accuracy")

class EncryptedLR:
    
    def __init__(self, torch_lr):
        self.weight = torch_lr.lr.weight.data.tolist()[0]
        self.bias = torch_lr.lr.bias.data.tolist()
        # we accumulate gradients and counts the number of iterations
        self._delta_w = 0
        self._delta_b = 0
        self._count = 0
        
    def forward(self, enc_x):
        enc_out = enc_x.dot(self.weight) + self.bias
        enc_out = EncryptedLR.sigmoid(enc_out)
        return enc_out
    
    def backward(self, enc_x, enc_out, enc_y):
        out_minus_y = (enc_out - enc_y)
        self._delta_w += enc_x * out_minus_y
        self._delta_b += out_minus_y
        self._count += 1
        
    def update_parameters(self):
        if self._count == 0:
            raise RuntimeError("You should at least run one forward iteration")
        # update weights
        # We use a small regularization term to keep the output
        # of the linear layer in the range of the sigmoid approximation
        self.weight -= self._delta_w * (1 / self._count) + self.weight * 0.05
        self.bias -= self._delta_b * (1 / self._count)
        # reset gradient accumulators and iterations count
        self._delta_w = 0
        self._delta_b = 0
        self._count = 0
    
    @staticmethod
    def sigmoid(enc_x):
        # We use the polynomial approximation of degree 3
        # sigmoid(x) = 0.5 + 0.197 * x - 0.004 * x^3
        # from https://eprint.iacr.org/2018/462.pdf
        # which fits the function pretty well in the range [-5,5]
        return enc_x.polyval([0.5, 0.197, 0, -0.004])
    
    def plain_accuracy(self, x_test, y_test):
        # evaluate accuracy of the model on
        # the plain (x_test, y_test) dataset
        w = torch.tensor(self.weight)
        b = torch.tensor(self.bias)
        out = torch.sigmoid(x_test.matmul(w) + b).reshape(-1, 1)
        correct = torch.abs(y_test - out) < 0.5
        return correct.float().mean()    
    
    def encrypt(self, context):
        self.weight = ts.ckks_vector(context, self.weight)
        self.bias = ts.ckks_vector(context, self.bias)
        
    def decrypt(self):
        self.weight = self.weight.decrypt()
        self.bias = self.bias.decrypt()
        
    def __call__(self, *args, **kwargs):
        return self.forward(*args, **kwargs)

        

# parameters
poly_mod_degree = 8192
coeff_mod_bit_sizes = [40, 21, 21, 21, 21, 21, 21, 40]
# create TenSEALContext
ctx_training = ts.context(ts.SCHEME_TYPE.CKKS, poly_mod_degree, -1, coeff_mod_bit_sizes)
ctx_training.global_scale = 2 ** 21
ctx_training.generate_galois_keys()

t_start = time()
enc_x_train = [ts.ckks_vector(ctx_training, x.tolist()) for x in x_train]
enc_y_train = [ts.ckks_vector(ctx_training, y.tolist()) for y in y_train]
t_end = time()
print(f"Encryption of the training_set took int(t_end - t_start) seconds")

normal_dist = lambda x, mean, var: np.exp(- np.square(x - mean) / (2 * var)) / np.sqrt(2 * np.pi * var)

def plot_normal_dist(mean, var, rmin=-10, rmax=10):
    x = np.arange(rmin, rmax, 0.01)
    y = normal_dist(x, mean, var)
    fig = plt.plot(x, y)
    
# plain distribution
lr = LR(n_features)
data = lr.lr(x_test)
mean, var = map(float, [data.mean(), data.std() ** 2])
plot_normal_dist(mean, var)
print("Distribution on plain data:")
plt.show()

# encrypted distribution
def encrypted_out_distribution(eelr, enc_x_test):
    w = eelr.weight
    b = eelr.bias
    data = []
    for enc_x in enc_x_test:
        enc_out = enc_x.dot(w) + b
        data.append(enc_out.decrypt())
    data = torch.tensor(data)
    mean, var = map(float, [data.mean(), data.std() ** 2])
    plot_normal_dist(mean, var)
    print("Distribution on encrypted data:")
    plt.show()

eelr = EncryptedLR(lr)
eelr.encrypt(ctx_training)
encrypted_out_distribution(eelr, enc_x_train)

eelr = EncryptedLR(LR(n_features))
accuracy = eelr.plain_accuracy(x_test, y_test)
print(f"Accuracy at epoch #0 is accuracy")

times = []
for epoch in range(EPOCHS):
    eelr.encrypt(ctx_training)
    
    # if you want to keep an eye on the distribution to make sure
    # the function approxiamation is still working fine
    # WARNING: this operation is time consuming
    # encrypted_out_distribution(eelr, enc_x_train)
    
    t_start = time()
    for enc_x, enc_y in zip(enc_x_train, enc_y_traintenseal学习笔记一(代码片段)

文章目录引言TenSEAL简介HomomorphicEncryptionTenSEALContextEncryptionandEvaluation引言发现网上没有什么TenSEAL的学习笔记,只能看官方文档,所以打算根据官方文档和自己的一些理解写一下TenSEAL的学习笔记。TenSEAL简介Tutorial0:GettingStar... 查看详情

tenseal学习笔记一(代码片段)

文章目录引言TenSEAL简介HomomorphicEncryptionTenSEALContextEncryptionandEvaluation引言发现网上没有什么TenSEAL的学习笔记,只能看官方文档,所以打算根据官方文档和自己的一些理解写一下TenSEAL的学习笔记。TenSEAL简介Tutorial0:GettingStar... 查看详情

django学习笔记二(代码片段)

Django的数据库:sqlite31.数据库配置2.创建appDjango规定,如果要使用模型,必须要创建一个app。pythonmanage.pystartappTestModelproject和app之间的区别一个project包含很多个Djangoapp以及对它们的配置。技术上,project的作用是提供配置文件,... 查看详情

cmake学习笔记二(代码片段)

 CMake预定义变量 PROJECT_SOURCE_DIR工程的根目录PROJECT_BINARY_DIR运行cmake命令的目录,通常是$PROJECT_SOURCE_DIR/buildCMAKE_INCLUDE_PATH环境变量,非cmake变量CMAKE_LIBRARY_PATH环境变量CMAKE_CURRENT_SOURCE_DIR当前处理的CMakeLists. 查看详情

sx1281驱动学习笔记二:驱动学习(代码片段)

...控制逻辑八、SPI时序和BUSY通过上一篇博客:SX1281驱动学习笔记一:Lora驱动移植_无痕幽雨的博客-CSDN博客,能够实现简单的收和发了,下面继续学习。一、电源选择SX128X内部集成了电源ÿ 查看详情

sx1281驱动学习笔记二:驱动学习(代码片段)

...控制逻辑八、SPI时序和BUSY通过上一篇博客:SX1281驱动学习笔记一:Lora驱动移植_无痕幽雨的博客-CSDN博客,能够实现简单的收和发了,下面继续学习。一、电源选择SX128X内部集成了电源ÿ 查看详情

映射filter的细节二学习笔记(代码片段)

当需要过滤forward请求的资源时,可以设置dispatcher为FORWARD过滤方式FromServlet::doGet()MappingFilter::doFilter():AToServlet::doGet()MappingFilter::doFilter():B在web.xml文件中配置代码如下:<filter-mapping><filter-name>Mapp 查看详情

slurm学习笔记(代码片段)

slurm学习笔记(一)官网:  https://slurm.schedmd.com/中文文档:https://docs.slurm.cn/users/shou-ce-ye学习笔记二:Slurm学习笔记(二)_种花家的奋斗兔的博客-CSDN博客一、slurm简介Slurm(SimpleL 查看详情

数据结构与算法学习笔记查找(代码片段)

数据结构与算法学习笔记(9)查找文章目录数据结构与算法学习笔记(9)查找一.查找的基本概念二.线性表的查找1.顺序查找应用范围算法基本形式改进算法特点2.折半查找(二分查找)非递归算法递归算法算法分析判定树优缺点3.分块... 查看详情

hadoop学习笔记二集群环境搭建(代码片段)

零、准备工作    准备三台Linux服务器。本文是在自己的台式机上安装了虚拟机工具(vmware和virtualbox都行),装3台CentOS7虚拟机。    集群规划:hadoop001192.168.164.10hadoop002192.168.164.20hadoop003192.168.164.30HDFS集群NameNo... 查看详情

java学习笔记(代码片段)

java笔记一、面向对象一、面向对象二、累和对象三、类的定义和使用四、构造方法五、this关键字六、方法重载七、static关键字二、继承和多态一、继承的含义以及用法二、方法的重写三、多态及其应用四、super关键字五、Object... 查看详情

学习数据结构笔记---[二叉树学习(binarytree)](代码片段)

B站学习传送门–>尚硅谷Java数据结构与java算法(Java数据结构与算法)ml1.初步学习二叉树2.初步实现二叉树的前序,中序,后序遍历图解简易实现前中后序遍历3.初步实现二叉树的前序查找,中序查找;后序查找;4.初步实现二... 查看详情

veriloghdl学习笔记一(代码片段)

VerilogHDL学习笔记一文章目录VerilogHDL学习笔记一一、简介二、第一个案例三、环境的配置四、其他知识一、简介VerilogHDL是一种硬件描述语言二、第一个案例这个只是一个展示了modulecounter10(//端口定义inputrstn,//复位端,低有效i... 查看详情

veriloghdl学习笔记一(代码片段)

VerilogHDL学习笔记一文章目录VerilogHDL学习笔记一一、简介二、第一个案例三、环境的配置四、其他知识一、简介VerilogHDL是一种硬件描述语言二、第一个案例这个只是一个展示了modulecounter10(//端口定义inputrstn,//复位端,低有效i... 查看详情

mysql学习笔记六(代码片段)

MySQL学习笔记六文章目录MySQL学习笔记六一、简介二、MySQLNULL值处理三、MySQL正则表达式四、总结一、简介时隔多日没有学习MySQL数据库了,今天重新开始继续学习MySQL数据库的有关内容,并重新开启MySQL的新的征程。二、My... 查看详情

数据结构与算法学习笔记树(代码片段)

数据结构与算法学习笔记(7)树前情回顾文章目录数据结构与算法学习笔记(7)树一.树和二叉树的定义1.树树的定义树的基本术语树结构与线性结构比较2.二叉树二叉树的定义二叉树与树的差别二叉树基本形态3.树和二叉树的抽象数... 查看详情

数据结构学习笔记(二叉树)总结与整理(代码片段)

数据结构学习笔记(二叉树)总结与整理二叉树定义二叉树结构二叉树常用操作二叉树的创建递归方式二叉树的遍历非递归方式二叉树的遍历二叉树的其他功能堆(二叉树的顺序结构)堆的实现二叉树定义概念... 查看详情

数据结构学习笔记(二叉树)总结与整理(代码片段)

数据结构学习笔记(二叉树)总结与整理二叉树定义二叉树结构二叉树常用操作二叉树的创建递归方式二叉树的遍历非递归方式二叉树的遍历二叉树的其他功能堆(二叉树的顺序结构)堆的实现二叉树定义概念... 查看详情