LiNGAMを用いた因果探索

本投稿は、以下の本の実装例を学習用に写経したものになります。詳細について気になりましたら、本を読んでみてください。

つくりながら学ぶ! Pythonによる因果分析 ~因果推論・因果探索の実践入門 (Compass Data Science)
つくりながら学ぶ! Pythonによる因果分析 ~因果推論・因果探索の実践入門 (Compass Data Science)

実装例は筆者らのgithubに公開されています。

GitHub - YutaroOgawa/causal_book: 書籍「作りながら学ぶ! PyTorchによる因果推論・因果探索」の実装コードのリポジトリです
書籍「作りながら学ぶ! PyTorchによる因果推論・因果探索」の実装コードのリポジトリです. Contribute to YutaroOgawa/causal_book development by creating an account on GitHub.

 

ライブラリインポート
# 乱数のシードを設定
import random
import numpy as np

np.random.seed(1234)
random.seed(1234)

# 使用するパッケージ(ライブラリと関数)を定義
import pandas as pd
from sklearn.decomposition import FastICA
from munkres import Munkres
from copy import deepcopy
データ作成
# データ数
num_data = 200

# 非ガウスのノイズ
ex1 = 2*(np.random.rand(num_data)-0.5)  # -1.0から1.0
ex2 = 2*(np.random.rand(num_data)-0.5)
ex3 = 2*(np.random.rand(num_data)-0.5)

# データ生成
x2 = ex2
x1 = 4*x2 + ex1
x3 = 1*x1 + 4*x2 + ex3

# 表にまとめる
df = pd.DataFrame({"x1": x1, "x2": x2, "x3": x3})
df.head()

        x1	        x2	        x3
0	1.659078	0.232279	2.390404
1	-1.825129	-0.561418	-3.209571
2	2.321453	0.745047	5.332314
3	1.893085	0.295730	3.695168
4	-1.176263	-0.053994	-0.628695
独立成分分析の実施
# 独立成分分析はscikit-learnの関数を使用します
from sklearn.decomposition import FastICA
# https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.FastICA.html

ica = FastICA(random_state=1234).fit(df)

# ICAで求めた行列A
A_ica = ica.mixing_

# 行列Aの逆行列を求める
A_ica_inv = np.linalg.pinv(A_ica)
print(A_ica_inv)
[[-0.11202027 -0.52360933  0.12237653]
 [-0.01254258  0.07593409  0.01223273]
 [-0.11910073  0.47901401 -0.00101996]]
行の順番と大きさを調整して、行列A_invを求める
# 絶対値の逆数にして対角成分の和を最小にする問題に置き換える
A_ica_inv_small = 1 / np.abs(A_ica_inv)

# 対角成分の和を最小にする行の入れ替え順を求める
m = Munkres()  # ハンガリアン法
ixs = np.vstack(m.compute(deepcopy(A_ica_inv_small)))

# 求めた順番で変換
ixs = ixs[np.argsort(ixs[:, 0]), :]
ixs_perm = ixs[:, 1]
A_ica_inv_perm = np.zeros_like(A_ica_inv)
A_ica_inv_perm[ixs_perm] = A_ica_inv
print('A_ica_inv_perm=\n',A_ica_inv_perm)

# 並び替わった順番
print('ixs=\n',ixs)

# 「行の大きさを調整」
D = np.diag(A_ica_inv_perm)[:, np.newaxis]  # 対角成分の抽出
A_ica_inv_perm_D = A_ica_inv_perm / D
print('A_ica_inv_perm_D=\n',A_ica_inv_perm_D)
A_ica_inv_perm=
[[-0.11910073  0.47901401 -0.00101996]
 [-0.01254258  0.07593409  0.01223273]
 [-0.11202027 -0.52360933  0.12237653]]

ixs=
 [[0 2]
 [1 1]
 [2 0]]

A_ica_inv_perm_D=
 [[ 1.         -4.0219234   0.00856387]
 [-0.16517717  1.          0.16109668]
 [-0.9153738  -4.27867449  1.        ]]
係数行列Bを求める
# 「B=I-A_inv」
B_est = np.eye(3) - A_ica_inv_perm_D
print('B_est=\n',B_est)


# 上側成分の0になるはずの数だけ、絶対値が小さい成分を0にする
# 変数の順番を入れ替えて、下三角行列になるかを確かめる、

def _slttestperm(b_i):
# b_iの行を並び替えて下三角行列にできるかどうかチェック
    n = b_i.shape[0]
    remnodes = np.arange(n)
    b_rem = deepcopy(b_i)
    p = list() 

    for i in range(n):
        # 成分が全て0である行番号のリスト
        ixs = np.where(np.sum(np.abs(b_rem), axis=1) < 1e-12)[0]

        if len(ixs) == 0:
            return None
        else:
            ix = ixs[0]
            p.append(remnodes[ix])

            # 成分が全て0である行を削除
            remnodes = np.hstack((remnodes[:ix], remnodes[(ix + 1):]))
            ixs = np.hstack((np.arange(ix), np.arange(ix + 1, len(b_rem))))
            b_rem = b_rem[ixs, :]
            b_rem = b_rem[:, ixs]

    return np.array(p)

b = B_est
n = b.shape[0]
assert(b.shape == (n, n))

ixs = np.argsort(np.abs(b).ravel())

for i in range(int(n * (n + 1) / 2) - 1, (n * n) - 1):
    b_i = deepcopy(b)
    b_i.ravel()[ixs[:i]] = 0
    ixs_perm = _slttestperm(b_i)
    if ixs_perm is not None:
        b_opt = deepcopy(b)
        b_opt = b_opt[ixs_perm, :]
        b_opt = b_opt[:, ixs_perm]
        break
b_csl = np.tril(b_opt, -1)
b_csl[ixs_perm, :] = deepcopy(b_csl)
b_csl[:, ixs_perm] = deepcopy(b_csl)

B_est1 = b_csl
print('B_est1=\n',B_est1)
B_est=
 [[ 0.          4.0219234  -0.00856387]
 [ 0.16517717  0.         -0.16109668]
 [ 0.9153738   4.27867449  0.        ]]

B_est1=
 [[0.         4.0219234  0.        ]
 [0.         0.         0.        ]
 [0.9153738  4.27867449 0.        ]]
線形回帰により係数行列を求め直す
# scikit-learnから線形回帰をimport
from sklearn.linear_model import LinearRegression

# 説明変数
X1 = df[["x2"]]
X3 = df[["x1", "x2"]]

# 被説明変数(目的変数)
# df["x1"]
# df["x3"]

# 回帰の実施
reg1 = LinearRegression().fit(X1, df["x1"])
reg3 = LinearRegression().fit(X3, df["x3"])

# 回帰した結果の係数を出力
print("係数:", reg1.coef_)
print("係数:", reg3.coef_)
係数: [3.96649192]
係数: [0.90836785 4.20694598]

コメント

タイトルとURLをコピーしました