ホーム » Alibi Detect » Alibi Detect 0.7 : Examples : 外れ値、敵対的 & ドリフト検知 on CIFAR10

Alibi Detect 0.7 : Examples : 外れ値、敵対的 & ドリフト検知 on CIFAR10

Alibi Detect 0.7 : Examples : 外れ値、敵対的 & ドリフト検知 on CIFAR10 (翻訳/解説)
翻訳 : (株)クラスキャット セールスインフォメーション
作成日時 : 07/04/2021 (0.7.0)

* 本ページは、Alibi Detect の以下のドキュメントを翻訳した上で適宜、補足説明したものです:

* サンプルコードの動作確認はしておりますが、必要な場合には適宜、追加改変しています。
* ご自由にリンクを張って頂いてかまいませんが、sales-info@classcat.com までご一報いただけると嬉しいです。

 

無料 Web セミナー開催中 クラスキャット主催 人工知能 & ビジネス Web セミナー

人工知能とビジネスをテーマに WEB セミナーを定期的に開催しています。
スケジュールは弊社 公式 Web サイト でご確認頂けます。
  • お住まいの地域に関係なく Web ブラウザからご参加頂けます。事前登録 が必要ですのでご注意ください。
  • ウェビナー運用には弊社製品「ClassCat® Webinar」を利用しています。
クラスキャットは人工知能・テレワークに関する各種サービスを提供しております :

人工知能研究開発支援 人工知能研修サービス テレワーク & オンライン授業を支援
PoC(概念実証)を失敗させないための支援 (本支援はセミナーに参加しアンケートに回答した方を対象としています。)

お問合せ : 本件に関するお問い合わせ先は下記までお願いいたします。

株式会社クラスキャット セールス・マーケティング本部 セールス・インフォメーション
E-Mail:sales-info@classcat.com  ;  WebSite: https://www.classcat.com/  ;  Facebook

 

 

Alibi Detect 0.7 : Examples : 外れ値、敵対的 & ドリフト検知 on CIFAR10

0. データセット

CIFAR10 は 10 クラス : 飛行機、自動車、鳥、猫、鹿、犬、カエル、馬、船とトラック – に渡り均等に分配された 60,000 の 32 x 32 RGB 画像から成ります。

# imports and plot examples
import matplotlib.pyplot as plt
%matplotlib inline
import tensorflow as tf

(X_train, y_train), (X_test, y_test) = tf.keras.datasets.cifar10.load_data()
X_train = X_train.astype('float32') / 255
X_test = X_test.astype('float32') / 255
y_train = y_train.astype('int64').reshape(-1,)
y_test = y_test.astype('int64').reshape(-1,)
print('Train: ', X_train.shape, y_train.shape)
print('Test: ', X_test.shape, y_test.shape)

plt.figure(figsize=(10, 10))
n = 4
for i in range(n ** 2):
    plt.subplot(n, n, i + 1)
    plt.imshow(X_train[i])
    plt.axis('off')
plt.show();
Train:  (50000, 32, 32, 3) (50000,)
Test:  (10000, 32, 32, 3) (10000,)

 

1. 変分オートエンコーダ (VAE) による外れ値検知

メソッド

簡単に言えば :

  • 正常データ上で VAE を訓練するのでそれは inlier を上手く再構築できます。
  • VAE が incoming リクエストを上手く再構築できないのであれば?外れ値です!

VAE のより多くのリソース: 論文優れたブログ投稿

画像ソース: https://lilianweng.github.io/lil-log/2018/08/12/from-autoencoder-to-beta-vae.html

# more imports
import logging
import numpy as np
import os

from tensorflow.keras.layers import Conv2D, Conv2DTranspose, Dense
from tensorflow.keras.layers import Flatten, Layer, Reshape, InputLayer
from tensorflow.keras.regularizers import l1

from alibi_detect.od import OutlierVAE
from alibi_detect.utils.fetching import fetch_detector
from alibi_detect.utils.perturbation import apply_mask
from alibi_detect.utils.saving import save_detector, load_detector
from alibi_detect.utils.visualize import plot_instance_score, plot_feature_outlier_image

logger = tf.get_logger()
logger.setLevel(logging.ERROR)

 

検知器をロードまたはスクラッチから訓練する

ノートブックで使用される事前訓練済みの外れ値と敵対的検知器は ここ で見つかります。組込みの fetch_detector 関数を利用できます、これは事前訓練モデルをローカルディレクトリ filepath にセーブして検知器をロードします。代わりに、スクラッチから検知器を訓練することができます :

load_pretrained = False
%%time
filepath = os.path.join(os.getcwd(), 'outlier')

if load_pretrained:  # load pre-trained detector
    detector_type = 'outlier'
    dataset = 'cifar10'
    detector_name = 'OutlierVAE'
    od = fetch_detector(filepath, detector_type, dataset, detector_name)
    filepath = os.path.join(filepath, detector_name)
else:  # define model, initialize, train and save outlier detector

    # define encoder and decoder networks
    latent_dim = 1024
    encoder_net = tf.keras.Sequential(
      [
          InputLayer(input_shape=(32, 32, 3)),
          Conv2D(64, 4, strides=2, padding='same', activation=tf.nn.relu),
          Conv2D(128, 4, strides=2, padding='same', activation=tf.nn.relu),
          Conv2D(512, 4, strides=2, padding='same', activation=tf.nn.relu)
      ]
    )

    decoder_net = tf.keras.Sequential(
      [
          InputLayer(input_shape=(latent_dim,)),
          Dense(4*4*128),
          Reshape(target_shape=(4, 4, 128)),
          Conv2DTranspose(256, 4, strides=2, padding='same', activation=tf.nn.relu),
          Conv2DTranspose(64, 4, strides=2, padding='same', activation=tf.nn.relu),
          Conv2DTranspose(3, 4, strides=2, padding='same', activation='sigmoid')
      ]
    )

    # initialize outlier detector
    od = OutlierVAE(
        threshold=.015,  # threshold for outlier score
        encoder_net=encoder_net,  # can also pass VAE model instead
        decoder_net=decoder_net,  # of separate encoder and decoder
        latent_dim=latent_dim
    )

    # train
    od.fit(X_train, epochs=50, verbose=True)

    # save the trained outlier detector
    save_detector(od, filepath)
782/782 [=] - 32s 36ms/step - loss: 3328.0921
782/782 [=] - 29s 36ms/step - loss: -2477.8159
782/782 [=] - 29s 36ms/step - loss: -3505.9077
782/782 [=] - 29s 36ms/step - loss: -4018.4684
782/782 [=] - 29s 36ms/step - loss: -4339.1812
782/782 [=] - 29s 36ms/step - loss: -4564.4784
782/782 [=] - 29s 36ms/step - loss: -4752.0325
782/782 [=] - 29s 36ms/step - loss: -4909.1439
782/782 [=] - 29s 36ms/step - loss: -5027.7148
782/782 [=] - 29s 36ms/step - loss: -5108.3359
782/782 [=] - 29s 36ms/step - loss: -5183.5322
782/782 [=] - 29s 36ms/step - loss: -5221.8213
782/782 [=] - 29s 36ms/step - loss: -5286.5078
782/782 [=] - 29s 36ms/step - loss: -5336.4698
782/782 [=] - 29s 36ms/step - loss: -5390.1145
782/782 [=] - 29s 36ms/step - loss: -5435.4759
782/782 [=] - 29s 36ms/step - loss: -5469.5508
782/782 [=] - 29s 36ms/step - loss: -5513.7060
782/782 [=] - 29s 36ms/step - loss: -5546.3630
782/782 [=] - 29s 36ms/step - loss: -5586.9172
782/782 [=] - 29s 36ms/step - loss: -5604.6617
782/782 [=] - 29s 36ms/step - loss: -5638.2204
782/782 [=] - 29s 36ms/step - loss: -5657.4971
782/782 [=] - 29s 36ms/step - loss: -5684.9612
782/782 [=] - 29s 36ms/step - loss: -5706.7390
782/782 [=] - 29s 36ms/step - loss: -5719.2535
782/782 [=] - 29s 36ms/step - loss: -5742.7461
782/782 [=] - 29s 36ms/step - loss: -5760.9044
782/782 [=] - 29s 36ms/step - loss: -5777.8526
782/782 [=] - 29s 36ms/step - loss: -5793.0808
782/782 [=] - 29s 36ms/step - loss: -5803.5456
782/782 [=] - 29s 36ms/step - loss: -5822.1962
782/782 [=] - 29s 36ms/step - loss: -5821.3968
782/782 [=] - 29s 36ms/step - loss: -5847.5206
782/782 [=] - 29s 36ms/step - loss: -5855.4035
782/782 [=] - 29s 36ms/step - loss: -5866.9793
782/782 [=] - 29s 36ms/step - loss: -5879.4730
782/782 [=] - 29s 36ms/step - loss: -5885.7166
782/782 [=] - 29s 36ms/step - loss: -5895.9667
782/782 [=] - 29s 36ms/step - loss: -5905.9128
782/782 [=] - 29s 36ms/step - loss: -5913.2937
782/782 [=] - 29s 36ms/step - loss: -5922.4864
782/782 [=] - 29s 36ms/step - loss: -5930.4101
782/782 [=] - 29s 36ms/step - loss: -5935.5514
782/782 [=] - 29s 36ms/step - loss: -5942.9960
782/782 [=] - 29s 36ms/step - loss: -5953.9588
782/782 [=] - 29s 36ms/step - loss: -5959.2278
782/782 [=] - 29s 36ms/step - loss: -5962.3586
782/782 [=] - 29s 36ms/step - loss: -5967.2992
782/782 [=] - 29s 36ms/step - loss: -5975.1563
Directory /home/ubuntu/ws.alibi_detect/outlier does not exist and is now created.
CPU times: user 24min 6s, sys: 22.3 s, total: 24min 28s
Wall time: 24min 8s

モデルが in-distribution 訓練データを何とか再構築できるかを確認しましょう :

# plot original and reconstructed instance
idx = 8
X = X_train[idx].reshape(1, 32, 32, 3)
X_recon = od.vae(X)
plt.imshow(X.reshape(32, 32, 3)); plt.axis('off'); plt.show()
plt.imshow(X_recon.numpy().reshape(32, 32, 3)); plt.axis('off'); plt.show()

 

閾値を設定する

良い閾値を見つけることは技巧的であり得ます、何故ならばそれらは典型的には解釈することが容易でないからです。infer_threshold メソッドは sensible な値を見つけるのに役立ちます。インスタンスのバッチ X を渡してそれらの何パーセントを正常であると考えるかを threshold_perc を通して指定する必要があります。

print('Current threshold: {}'.format(od.threshold))
od.infer_threshold(X_train, threshold_perc=99, batch_size=128)  # assume 1% of the training data are outliers
print('New threshold: {}'.format(od.threshold))
Current threshold: 0.015
New threshold: 0.00969364859163762

 

外れ値を作成して検知する

元のインスタンスにランダムノイズマスクを適用することにより幾つかの外れ値を作成できます :

np.random.seed(0)

i = 1

# create masked instance
x = X_test[i].reshape(1, 32, 32, 3)
x_mask, mask = apply_mask(
    x,
    mask_size=(8,8),
    n_masks=1,
    channels=[0,1,2],
    mask_type='normal',
    noise_distr=(0,1),
    clip_rng=(0,1)
)

# predict outliers and reconstructions
sample = np.concatenate([x_mask, x])
preds = od.predict(sample)
x_recon = od.vae(sample).numpy()
# check if outlier and visualize outlier scores
labels = ['No!', 'Yes!']
print(f"Is original outlier? {labels[preds['data']['is_outlier'][1]]}")
print(f"Is perturbed outlier? {labels[preds['data']['is_outlier'][0]]}")
plot_feature_outlier_image(preds, sample, x_recon, max_instances=1)
Is original outlier? No!
Is perturbed outlier? Yes!

 

検知器を配備する

このサンプルのオープンソース配備プラットフォーム Seldon Core と (サーバレス・コンポーネントがイベントストリームに接続されることを可能にする) イベント・ベースのプロジェクト Knative を使用します。Seldon Core payload logger はモデルリクエストを含むイベントを Knative に送ります、これはこれらを外れ値、ドリフトや敵対的検知モジュールのようなサーバレス・コンポーネントに委託することができます。更に例えばアラート or ストレージ・モジュールに前方に送るためにこれらのコンポーネントにより生成されたイベントを供給するためにイベント・コンポーネントが追加できます。これらは非同期に発生します。

既に Seldon Core をインストールして DigitalOcean 上クラスタを構成しました。スクラッチから総てをセットアップする構成ステップは このサンプル・ノートブック で詳述されています。

最初に Istio Ingress Gatewaw の IP アドレスを取得します。これは Istio が LoadBalancer とともにインストールされていることを仮定しています。

CLUSTER_IPS=!(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
CLUSTER_IP=CLUSTER_IPS[0]
print(CLUSTER_IP)
188.166.139.197
SERVICE_HOSTNAMES=!(kubectl get ksvc vae-outlier -o jsonpath='{.status.url}' | cut -d "/" -f 3)
SERVICE_HOSTNAME_VAEOD=SERVICE_HOSTNAMES[0]
print(SERVICE_HOSTNAME_VAEOD)
vae-outlier.default.example.com

配備されたモデルの予測のために幾つかのユティリティ関数を定義します。

import json
import requests
from typing import Union

classes = ('plane', 'car', 'bird', 'cat', 'deer',
           'dog', 'frog', 'horse', 'ship', 'truck')

def predict(x: np.ndarray) -> Union[str, list]:
    """ Model prediction. """
    formData = {
    'instances': x.tolist()
    }
    headers = {}
    res = requests.post(
        'http://'+CLUSTER_IP+'/seldon/default/tfserving-cifar10/v1/models/resnet32/:predict',
        json=formData,
        headers=headers
    )
    if res.status_code == 200:
        return classes[np.array(res.json()["predictions"])[0].argmax()]
    else:
        print("Failed with ",res.status_code)
        return []


def outlier(x: np.ndarray) -> Union[dict, list]:
    """ Outlier prediction. """
    formData = {
    'instances': x.tolist()
    }
    headers = {
        "Alibi-Detect-Return-Feature-Score": "true",
        "Alibi-Detect-Return-Instance-Score": "true"
    }
    headers["Host"] = SERVICE_HOSTNAME_VAEOD
    res = requests.post('http://'+CLUSTER_IP+'/', json=formData, headers=headers)
    if res.status_code == 200:
        od = res.json()
        od["data"]["feature_score"] = np.array(od["data"]["feature_score"])
        od["data"]["instance_score"] = np.array(od["data"]["instance_score"])
        return od
    else:
        print("Failed with ",res.status_code)
        return []


def show(x: np.ndarray) -> None:
    plt.imshow(x.reshape(32, 32, 3))
    plt.axis('off')
    plt.show()

元のインスタンス上で予測を行ないましょう :

show(x)
predict(x)

'ship'

外れ値検知器の出力のためにメッセージ dumper を確認しましょう :

res=!kubectl logs $(kubectl get pod -l serving.knative.dev/configuration=message-dumper -o jsonpath='{.items[0].metadata.name}') user-container
data = []
for i in range(0,len(res)):
    if res[i] == 'Data,':
        data.append(res[i+1])
j = json.loads(json.loads(data[0]))
print("Outlier?",labels[j["data"]["is_outlier"]==[1]])
Outlier? No!

そして摂動されたインスタンスで予測を行ないます :

show(x_mask)
predict(x_mask)

'ship'

予測は依然として正しいですが、インスタンスは明らかに外れ値です :

res=!kubectl logs $(kubectl get pod -l serving.knative.dev/configuration=message-dumper -o jsonpath='{.items[0].metadata.name}') user-container
data= []
for i in range(0,len(res)):
    if res[i] == 'Data,':
        data.append(res[i+1])
j = json.loads(json.loads(data[1]))
print("Outlier?",labels[j["data"]["is_outlier"]==[1]])
Outlier? Yes!
preds = outlier(x_mask)
plot_feature_outlier_image(preds, x_mask, X_recon=None)

 

2. 予測確率のマッチングによる敵対的検知

メソッド

敵対的検知器は Adversarial Detection and Correction by Matching Prediction Distributions に基づいています。通常、オートエンコーダは平均二乗再構築誤差のような $x$ と $x’$ の間の類似性を捕捉するのに適する損失関数を用いて、入力インスタンス $x$ を出来る限り正確に再構築するような変換 $T$ を見つけるために訓練されます。敵対的オートエンコーダ (AE) 検知器の新奇性は、オートエンコーダネットワークを訓練するために、モデルの出力空間の距離尺度に基づいた分類モデル依存な損失関数の使用に依存していることです。分類モデル $M$ が与えられたとき、オートエンコーダの重みは $x$ と $x’$ のモデル予測間の KL-ダイバージェンス を最小化するように最適化されます。再構築損失項 $x’$ の存在がない場合には、$x$ と $x’$ の近接性を気に掛けることなく、単純に予測確率 $M(x’)$ と $M(x)$ が一致することを確かなものにしようとします。その結果、$x’$ はモデル $M$ に関する異なる決定境界 shape によって $x$ とは異なる入力特徴空間の異なる領域に存在することが可能になります。$x$ 周りで効果的な注意深く作成された敵対的摂動は特徴空間の $x’$ の新しい位置には転送されませんので、従って攻撃は無力化されます。オートエンコーダの訓練は教師なしです、何故ならばモデル予測確率と通常の訓練インスタンスへのアクセスだけを必要とするからです。基礎となる敵対的攻撃についての知識を必要としません、そして分類器の重みは訓練の間凍結されます。

検知器は以下のように利用できます :

  • 敵対的スコア $S$ が計算されます。$S$ は $x$ と $x’$ のモデル予測間の K-L ダイバージェンスに等しいです。
  • $S$ が (明示的に定義されたか訓練データから推論された) 閾値を越える場合、インスタンスは敵対的とフラグ立てされます。
  • 敵対的インスタンスについては、モデル $M$ は予測を行なうために再構築されたインスタンス $x’$ を使用します。敵対的スコアが閾値の下であれば、モデルは元のインスタンス $x$ 上で予測を行ないます。

この手順は下の図で図示されます :

この方法は非常に柔軟で、モデル性能にネガティブな影響を与える一般的なデータ破損と摂動を検出するためにも使用できます。

# more imports
from sklearn.metrics import roc_curve, auc
from alibi_detect.ad import AdversarialAE
from alibi_detect.datasets import fetch_attack
from alibi_detect.utils.fetching import fetch_tf_model
from alibi_detect.utils.prediction import predict_batch

 

ユティリティ関数

# instance scaling and plotting utility functions
def scale_by_instance(X: np.ndarray) -> np.ndarray:
    mean_ = X.mean(axis=(1, 2, 3)).reshape(-1, 1, 1, 1)
    std_ = X.std(axis=(1, 2, 3)).reshape(-1, 1, 1, 1)
    return (X - mean_) / std_, mean_, std_


def accuracy(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    return (y_true == y_pred).astype(int).sum() / y_true.shape[0]


def plot_adversarial(idx: list,
                     X: np.ndarray,
                     y: np.ndarray,
                     X_adv: np.ndarray,
                     y_adv: np.ndarray,
                     mean: np.ndarray,
                     std: np.ndarray,
                     score_x: np.ndarray = None,
                     score_x_adv: np.ndarray = None,
                     X_recon: np.ndarray = None,
                     y_recon: np.ndarray = None,
                     figsize: tuple = (10, 5)) -> None:

    # category map from class numbers to names
    cifar10_map = {0: 'airplane', 1: 'automobile', 2: 'bird', 3: 'cat', 4: 'deer', 5: 'dog',
                   6: 'frog', 7: 'horse', 8: 'ship', 9: 'truck'}

    nrows = len(idx)
    ncols = 3 if isinstance(X_recon, np.ndarray) else 2
    fig, ax = plt.subplots(nrows=nrows, ncols=ncols, figsize=figsize)

    n_subplot = 1
    for i in idx:

        # rescale images in [0, 1]
        X_adj = (X[i] * std[i] + mean[i]) / 255
        X_adv_adj = (X_adv[i] * std[i] + mean[i]) / 255
        if isinstance(X_recon, np.ndarray):
            X_recon_adj = (X_recon[i] * std[i] + mean[i]) / 255

        # original image
        plt.subplot(nrows, ncols, n_subplot)
        plt.axis('off')
        if i == idx[0]:
            if isinstance(score_x, np.ndarray):
                plt.title('CIFAR-10 Image \n{}: {:.3f}'.format(cifar10_map[y[i]], score_x[i]))
            else:
                plt.title('CIFAR-10 Image \n{}'.format(cifar10_map[y[i]]))
        else:
            if isinstance(score_x, np.ndarray):
                plt.title('{}: {:.3f}'.format(cifar10_map[y[i]], score_x[i]))
            else:
                plt.title('{}'.format(cifar10_map[y[i]]))
        plt.imshow(X_adj)
        n_subplot += 1

        # adversarial image
        plt.subplot(nrows, ncols, n_subplot)
        plt.axis('off')
        if i == idx[0]:
            if isinstance(score_x_adv, np.ndarray):
                plt.title('Adversarial \n{}: {:.3f}'.format(cifar10_map[y_adv[i]], score_x_adv[i]))
            else:
                plt.title('Adversarial \n{}'.format(cifar10_map[y_adv[i]]))
        else:
            if isinstance(score_x_adv, np.ndarray):
                plt.title('{}: {:.3f}'.format(cifar10_map[y_adv[i]], score_x_adv[i]))
            else:
                plt.title('{}'.format(cifar10_map[y_adv[i]]))
        plt.imshow(X_adv_adj)
        n_subplot += 1

        # reconstructed image
        if isinstance(X_recon, np.ndarray):
            plt.subplot(nrows, ncols, n_subplot)
            plt.axis('off')
            if i == idx[0]:
                plt.title('AE Reconstruction \n{}'.format(cifar10_map[y_recon[i]]))
            else:
                plt.title('{}'.format(cifar10_map[y_recon[i]]))
            plt.imshow(X_recon_adj)
            n_subplot += 1

    plt.show()


def plot_roc(roc_data: dict, figsize: tuple = (10,5)):
    plot_labels = []
    scores_attacks = []
    labels_attacks = []
    for k, v in roc_data.items():
        if 'original' in k:
            continue
        score_x = roc_data[v['normal']]['scores']
        y_pred = roc_data[v['normal']]['predictions']
        score_v = v['scores']
        y_pred_v = v['predictions']
        labels_v = np.ones(score_x.shape[0])
        idx_remove = np.where(y_pred == y_pred_v)[0]
        labels_v = np.delete(labels_v, idx_remove)
        score_v = np.delete(score_v, idx_remove)
        scores = np.concatenate([score_x, score_v])
        labels = np.concatenate([np.zeros(y_pred.shape[0]), labels_v]).astype(int)
        scores_attacks.append(scores)
        labels_attacks.append(labels)
        plot_labels.append(k)

    for sc_att, la_att, plt_la in zip(scores_attacks, labels_attacks, plot_labels):
        fpr, tpr, thresholds = roc_curve(la_att, sc_att)
        roc_auc = auc(fpr, tpr)
        label = str('{}: AUC = {:.2f}'.format(plt_la, roc_auc))
        plt.plot(fpr, tpr, lw=1, label='{}: AUC={:.4f}'.format(plt_la, roc_auc))

    plt.plot([0, 1], [0, 1], color='black', lw=1, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('{}'.format('ROC curve'))
    plt.legend(loc="lower right", ncol=1)
    plt.grid()
    plt.show()

 

データをリスケールする

ResNet 分類モデルはインスタンスにより標準化されたデータ上で訓練されます :

# rescale data
X_train, mean_train, std_train = scale_by_instance(X_train * 255.)
X_test, mean_test, std_test = scale_by_instance(X_test * 255.)
scale = (mean_train, std_train), (mean_test, std_test)

 

事前訓練済みの分類器をロードする

dataset = 'cifar10'
model = 'resnet56'
clf = fetch_tf_model(dataset, model)

テスト上の予測を確認します :

y_pred = predict_batch(clf, X_test, batch_size=32, return_class=True)
acc_y_pred = accuracy(y_test, y_pred)
print('Accuracy: {:.4f}'.format(acc_y_pred))
Accuracy: 0.9315

 

敵対的攻撃

Carlini-Wagner (C&W)SLIDE 攻撃の両者を調査します。以前に見つかった敵対的なインスタンスを事前訓練済みの ResNet-56 モデル上に単純にロードできます。攻撃は Foolbox を使用して生成されます :

# C&W attack
data_cw = fetch_attack(dataset, model, 'cw')
X_train_cw, X_test_cw = data_cw['data_train'], data_cw['data_test']
meta_cw = data_cw['meta'] # metadata with hyperparameters of the attack
# SLIDE attack
data_slide = fetch_attack(dataset, model, 'slide')
X_train_slide, X_test_slide = data_slide['data_train'], data_slide['data_test']
meta_slide = data_slide['meta']

分類器の精度が殆ど 0% に低下することを検証できます :

y_pred_cw = predict_batch(clf, X_test_cw, batch_size=32, return_class=True)
y_pred_slide = predict_batch(clf, X_test_slide, batch_size=32, return_class=True)
acc_y_pred_cw = accuracy(y_test, y_pred_cw)
acc_y_pred_slide = accuracy(y_test, y_pred_slide)
print('Accuracy: cw {:.4f} -- SLIDE {:.4f}'.format(acc_y_pred_cw, acc_y_pred_slide))
Accuracy: cw 0.0000 -- SLIDE 0.0001

幾つかの敵対的インスタンスを可視化しましょう :

# plot attacked instances
idx = [3, 4]
print('C&W attack...')
plot_adversarial(idx, X_test, y_pred, X_test_cw, y_pred_cw,
                 mean_test, std_test, figsize=(10, 10))
print('SLIDE attack...')
plot_adversarial(idx, X_test, y_pred, X_test_slide, y_pred_slide,
                 mean_test, std_test, figsize=(10, 10))

C&W 攻撃 …

SLIDE 攻撃 …

 

敵対的検知器をロードまたは訓練して評価する

Google Cloud Bucket から事前訓練済みの検知器を再度取得するかスクラッチから訓練できます :

load_pretrained = False
filepath = os.path.join(os.getcwd(), 'adversarial')

if load_pretrained:
    detector_type = 'adversarial'
    detector_name = 'base'
    ad = fetch_detector(filepath, detector_type, dataset, detector_name, model=model)
    filepath = os.path.join(filepath, detector_name)
else:  # train detector from scratch

    # define encoder and decoder networks
    encoder_net = tf.keras.Sequential(
            [
                InputLayer(input_shape=(32, 32, 3)),
                Conv2D(32, 4, strides=2, padding='same',
                       activation=tf.nn.relu, kernel_regularizer=l1(1e-5)),
                Conv2D(64, 4, strides=2, padding='same',
                       activation=tf.nn.relu, kernel_regularizer=l1(1e-5)),
                Conv2D(256, 4, strides=2, padding='same',
                       activation=tf.nn.relu, kernel_regularizer=l1(1e-5)),
                Flatten(),
                Dense(40)
            ]
        )

    decoder_net = tf.keras.Sequential(
        [
                InputLayer(input_shape=(40,)),
                Dense(4 * 4 * 128, activation=tf.nn.relu),
                Reshape(target_shape=(4, 4, 128)),
                Conv2DTranspose(256, 4, strides=2, padding='same',
                                activation=tf.nn.relu, kernel_regularizer=l1(1e-5)),
                Conv2DTranspose(64, 4, strides=2, padding='same',
                                activation=tf.nn.relu, kernel_regularizer=l1(1e-5)),
                Conv2DTranspose(3, 4, strides=2, padding='same',
                                activation=None, kernel_regularizer=l1(1e-5))
            ]
        )

    # initialise and train detector
    ad = AdversarialAE(
        encoder_net=encoder_net,
        decoder_net=decoder_net,
        model=clf
    )
    ad.fit(X_train, epochs=40, batch_size=64, verbose=True)

    # save the trained adversarial detector
    save_detector(ad, filepath)

検知器は最初に敵対的であり得る入力インスタンスを再構築します。そして再構築された入力は敵対的スコアを計算するために分類器に供給されます。スコアが閾値より上の場合、インスタンスは敵対的と分類されて検知器は攻撃を正そうとします。攻撃されたインスタンスを再構築してその上で予測を行なうとき何が起きるかを調べましょう :

X_recon_cw = predict_batch(ad.ae, X_test_cw, batch_size=32)
X_recon_slide = predict_batch(ad.ae, X_test_slide, batch_size=32)
y_recon_cw = predict_batch(clf, X_recon_cw, batch_size=32, return_class=True)
y_recon_slide = predict_batch(clf, X_recon_slide, batch_size=32, return_class=True)

攻撃された (インスタンス) vs. 再構築されたインスタンスの精度 :

acc_y_recon_cw = accuracy(y_test, y_recon_cw)
acc_y_recon_slide = accuracy(y_test, y_recon_slide)
print('Accuracy after C&W attack {:.4f} -- reconstruction {:.4f}'.format(acc_y_pred_cw, acc_y_recon_cw))
print('Accuracy after SLIDE attack {:.4f} -- reconstruction {:.4f}'.format(acc_y_pred_slide, acc_y_recon_slide))
Accuracy after C&W attack 0.0000 -- reconstruction 0.8048
Accuracy after SLIDE attack 0.0001 -- reconstruction 0.8159

検知器は攻撃後の精度を殆ど 0 % から 80% を上手く越えるまでに復旧します!攻撃的スコアを計算して再構築されたインスタンスの幾つかを調査できます :

score_x = ad.score(X_test, batch_size=32)
score_cw = ad.score(X_test_cw, batch_size=32)
score_slide = ad.score(X_test_slide, batch_size=32)
# visualize original, attacked and reconstructed instances with adversarial scores
print('C&W attack...')
idx = [10, 13, 14, 16, 17]
plot_adversarial(idx, X_test, y_pred, X_test_cw, y_pred_cw, mean_test, std_test,
                 score_x=score_x, score_x_adv=score_cw, X_recon=X_recon_cw,
                 y_recon=y_recon_cw, figsize=(10, 15))
print('SLIDE attack...')
idx = [23, 25, 27, 29, 34]
plot_adversarial(idx, X_test, y_pred, X_test_slide, y_pred_slide, mean_test, std_test,
                 score_x=score_x, score_x_adv=score_slide, X_recon=X_recon_slide,
                 y_recon=y_recon_slide, figsize=(10, 15))

C&W 攻撃 …

SLIDE 攻撃 …

ROC カーブと AUC 値は敵対的インスタンスを検知するための敵対的スコアの有効性を示します :

# plot roc curve
roc_data = {
    'original': {'scores': score_x, 'predictions': y_pred},
    'C&W': {'scores': score_cw, 'predictions': y_pred_cw, 'normal': 'original'},
    'SLIDE': {'scores': score_slide, 'predictions': y_pred_slide, 'normal': 'original'}
}

plot_roc(roc_data)

敵対的スコアのための閾値は infer_threshold を通して設定できます。インスタンスのバッチ X を渡してそれらの何パーセントを正常であると考えるかを threshold_perc を通して指定する必要があります。Assume we have only normal instances some of which the model has misclassified leading to a higher score if the reconstruction picked up features from the correct class or some might look adversarial in the first place. その結果、閾値を 95% に設定します :

ad.infer_threshold(X_test, threshold_perc=95, margin=0., batch_size=32)
print('Adversarial threshold: {:.4f}'.format(ad.threshold))
Adversarial threshold: 2.6722

検知器の正しい方法は Figure 1 の図を実行します。最初に敵対的スコアが計算されます。スコアが閾値を越えるインスタンスについては、再構築されたインスタンスの分類器予測が返されます。そうでなければ元の予測が保持されます。このメソッドは検知器のメタデータ、(バッチのインスタンスが敵対的であろうとなかろうと、) 訂正メカニズムを使用した分類器予測、そして元の予測と再構築された予測の両者を含む辞書を返します。幾つかの敵対的 (インスタンス) そして元のテストセットのインスタンスを含むバッチでこれを示しましょう :

n_test = X_test.shape[0]
np.random.seed(0)
idx_normal = np.random.choice(n_test, size=1600, replace=False)
idx_cw = np.random.choice(n_test, size=400, replace=False)

X_mix = np.concatenate([X_test[idx_normal], X_test_cw[idx_cw]])
y_mix = np.concatenate([y_test[idx_normal], y_test[idx_cw]])
print(X_mix.shape, y_mix.shape)
(2000, 32, 32, 3) (2000,)

モデル性能を確認しましょう :

y_pred_mix = predict_batch(clf, X_mix, batch_size=32, return_class=True)
acc_y_pred_mix = accuracy(y_mix, y_pred_mix)
print('Accuracy {:.4f}'.format(acc_y_pred_mix))
Accuracy 0.7380

これは訂正メカニズムで改良できます :

preds = ad.correct(X_mix, batch_size=32)
acc_y_corr_mix = accuracy(y_mix, preds['data']['corrected'])
print('Accuracy {:.4f}'.format(acc_y_corr_mix))
Accuracy 0.8205

論文 でハイライトされていて (temperature スケーリング隠れ層 K-L ダイバージェンス) Alibi Detect で実装されている幾つかの他のトリックがあります、これは敵対的検知器の性能を更にブーストできます。より詳細については この example ノートブック を確認してください。

 

以上



ClassCat® Chatbot

人工知能開発支援
クラスキャットは 人工知能研究開発支援 サービスを提供しています :
  • テクニカルコンサルティングサービス
  • 実証実験 (プロトタイプ構築)
  • アプリケーションへの実装
  • 人工知能研修サービス
◆ お問合せ先 ◆
クラスキャット
セールス・インフォメーション
E-Mail:sales-info@classcat.com