本投稿は、以下の本の実装例を学習用に写経したものになります。詳細について気になりましたら、本を読んでみてください。
Bitly
実装例は筆者らのgithubに公開されています。
GitHub - YutaroOgawa/causal_book: 書籍「作りながら学ぶ! PyTorchによる因果推論・因果探索」の実装コードのリポジトリです
書籍「作りながら学ぶ! PyTorchによる因果推論・因果探索」の実装コードのリポジトリです. Contribute to YutaroOgawa/causal_book development by creating an account on GitHub.
ライブラリインポート
# 乱数のシードを設定
import random
import numpy as np
np.random.seed(1234)
random.seed(1234)
# 使用するパッケージ(ライブラリと関数)を定義
import pandas as pd
from sklearn.decomposition import FastICA
from munkres import Munkres
from copy import deepcopy
データ作成
# データ数
num_data = 200
# 非ガウスのノイズ
ex1 = 2*(np.random.rand(num_data)-0.5) # -1.0から1.0
ex2 = 2*(np.random.rand(num_data)-0.5)
ex3 = 2*(np.random.rand(num_data)-0.5)
# データ生成
x2 = ex2
x1 = 4*x2 + ex1
x3 = 1*x1 + 4*x2 + ex3
# 表にまとめる
df = pd.DataFrame({"x1": x1, "x2": x2, "x3": x3})
df.head()
x1 x2 x3
0 1.659078 0.232279 2.390404
1 -1.825129 -0.561418 -3.209571
2 2.321453 0.745047 5.332314
3 1.893085 0.295730 3.695168
4 -1.176263 -0.053994 -0.628695
独立成分分析の実施
# 独立成分分析はscikit-learnの関数を使用します
from sklearn.decomposition import FastICA
# https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.FastICA.html
ica = FastICA(random_state=1234).fit(df)
# ICAで求めた行列A
A_ica = ica.mixing_
# 行列Aの逆行列を求める
A_ica_inv = np.linalg.pinv(A_ica)
print(A_ica_inv)
[[-0.11202027 -0.52360933 0.12237653]
[-0.01254258 0.07593409 0.01223273]
[-0.11910073 0.47901401 -0.00101996]]
行の順番と大きさを調整して、行列A_invを求める
# 絶対値の逆数にして対角成分の和を最小にする問題に置き換える
A_ica_inv_small = 1 / np.abs(A_ica_inv)
# 対角成分の和を最小にする行の入れ替え順を求める
m = Munkres() # ハンガリアン法
ixs = np.vstack(m.compute(deepcopy(A_ica_inv_small)))
# 求めた順番で変換
ixs = ixs[np.argsort(ixs[:, 0]), :]
ixs_perm = ixs[:, 1]
A_ica_inv_perm = np.zeros_like(A_ica_inv)
A_ica_inv_perm[ixs_perm] = A_ica_inv
print('A_ica_inv_perm=\n',A_ica_inv_perm)
# 並び替わった順番
print('ixs=\n',ixs)
# 「行の大きさを調整」
D = np.diag(A_ica_inv_perm)[:, np.newaxis] # 対角成分の抽出
A_ica_inv_perm_D = A_ica_inv_perm / D
print('A_ica_inv_perm_D=\n',A_ica_inv_perm_D)
A_ica_inv_perm=
[[-0.11910073 0.47901401 -0.00101996]
[-0.01254258 0.07593409 0.01223273]
[-0.11202027 -0.52360933 0.12237653]]
ixs=
[[0 2]
[1 1]
[2 0]]
A_ica_inv_perm_D=
[[ 1. -4.0219234 0.00856387]
[-0.16517717 1. 0.16109668]
[-0.9153738 -4.27867449 1. ]]
係数行列Bを求める
# 「B=I-A_inv」
B_est = np.eye(3) - A_ica_inv_perm_D
print('B_est=\n',B_est)
# 上側成分の0になるはずの数だけ、絶対値が小さい成分を0にする
# 変数の順番を入れ替えて、下三角行列になるかを確かめる、
def _slttestperm(b_i):
# b_iの行を並び替えて下三角行列にできるかどうかチェック
n = b_i.shape[0]
remnodes = np.arange(n)
b_rem = deepcopy(b_i)
p = list()
for i in range(n):
# 成分が全て0である行番号のリスト
ixs = np.where(np.sum(np.abs(b_rem), axis=1) < 1e-12)[0]
if len(ixs) == 0:
return None
else:
ix = ixs[0]
p.append(remnodes[ix])
# 成分が全て0である行を削除
remnodes = np.hstack((remnodes[:ix], remnodes[(ix + 1):]))
ixs = np.hstack((np.arange(ix), np.arange(ix + 1, len(b_rem))))
b_rem = b_rem[ixs, :]
b_rem = b_rem[:, ixs]
return np.array(p)
b = B_est
n = b.shape[0]
assert(b.shape == (n, n))
ixs = np.argsort(np.abs(b).ravel())
for i in range(int(n * (n + 1) / 2) - 1, (n * n) - 1):
b_i = deepcopy(b)
b_i.ravel()[ixs[:i]] = 0
ixs_perm = _slttestperm(b_i)
if ixs_perm is not None:
b_opt = deepcopy(b)
b_opt = b_opt[ixs_perm, :]
b_opt = b_opt[:, ixs_perm]
break
b_csl = np.tril(b_opt, -1)
b_csl[ixs_perm, :] = deepcopy(b_csl)
b_csl[:, ixs_perm] = deepcopy(b_csl)
B_est1 = b_csl
print('B_est1=\n',B_est1)
B_est=
[[ 0. 4.0219234 -0.00856387]
[ 0.16517717 0. -0.16109668]
[ 0.9153738 4.27867449 0. ]]
B_est1=
[[0. 4.0219234 0. ]
[0. 0. 0. ]
[0.9153738 4.27867449 0. ]]
線形回帰により係数行列を求め直す
# scikit-learnから線形回帰をimport
from sklearn.linear_model import LinearRegression
# 説明変数
X1 = df[["x2"]]
X3 = df[["x1", "x2"]]
# 被説明変数(目的変数)
# df["x1"]
# df["x3"]
# 回帰の実施
reg1 = LinearRegression().fit(X1, df["x1"])
reg3 = LinearRegression().fit(X3, df["x3"])
# 回帰した結果の係数を出力
print("係数:", reg1.coef_)
print("係数:", reg3.coef_)
係数: [3.96649192]
係数: [0.90836785 4.20694598]
コメント