프로그래밍/머신러닝

인공신경망을 사용한 WiFi 신호패턴 분류

RAVIN 2022. 12. 19. 17:25
  • WiFi 수신 신호 패턴으로부터 실내 사람 수 예측하기
  • 총 4가지 클래스의 데이터셋 제공
  • 데이터 Pn-Wm은 사람 n명 걸어다니는 사람 m명일 때 WiFi 신호의 세기
  • 신호 데이터 하나 당 3x3x56 개의 크기값으로 이루어짐.

1. 기본 패키지/데이터셋 로드 및 변환

import torchvision.datasets as dsets
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import random

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# 구글드라이브 연결
from google.colab import drive
drive.mount('/gdrive')

import pandas as pd
import numpy as np
import torch
import os

 

  • 하나의 클래스 당 9개의 데이터가 포함되어 있음.
  • 이 데이터들은 하나의 데이터(벡터)로 간주 가능하기 때문에, (56 by 1) 벡터를 transpose 한 후 하나로 합침. 최종적으로 하나의 데이터는(1 by 56 * 9) 벡터이며 최종적으로 하나의 데이터셋은 (데이터 개수 by (56*9)) 행렬이 됨.

 

 

#P0-W0 데이터
forders = os.listdir('/gdrive/My Drive/ANN_homework/data_set/P0-W0/')
print(forders)

df_00 = pd.DataFrame()
for i in range(0,len(forders)):
    if forders[i].split('.')[1] == 'csv':
        file = '/gdrive/My Drive/ANN_homework/data_set/P0-W0/'+forders[i]
        df= pd.read_csv(file,header=None
        df_00 = pd.concat([df_00, df])

df_00 = np.transpose(df_00) #각 행벡터가 데이터샘플 하나가 되도록 transpose
label_00 = 0 * np.ones(len(df_00))

label_00 = label_00.astype(np.int64)

print(df_00.shape, label_00.shape)

 

 

#P1-W1 데이터
forders = os.listdir('/gdrive/My Drive/ANN_homework/data_set/P1-W1/')
print(forders)

df_11 = pd.DataFrame()
for i in range(0,len(forders)):
    if forders[i].split('.')[1] == 'csv':
        file = '/gdrive/My Drive/ANN_homework/data_set/P1-W1/'+forders[i]
        df= pd.read_csv(file,header=None
        df_11 = pd.concat([df_11, df])

df_11 = np.transpose(df_11) #각 행벡터가 데이터샘플 하나가 되도록 transpose
label_11 = 1 * np.ones(len(df_11))

label_11 = label_11.astype(np.int64)

print(df_11.shape, label_11.shape)

 

 

#P9-W7 데이터
forders = os.listdir('/gdrive/My Drive/ANN_homework/data_set/P9-W7/')
print(forders)

df_97 = pd.DataFrame()
for i in range(0,len(forders)):
    if forders[i].split('.')[1] == 'csv':
        file = '/gdrive/My Drive/ANN_homework/data_set/P9-W7/'+forders[i]
        df= pd.read_csv(file,header=None
        df_97 = pd.concat([df_97, df])

df_97 = np.transpose(df_97) #각 행벡터가 데이터샘플 하나가 되도록 transpose
label_97 = 2 * np.ones(len(df_97))

label_97 = label_97.astype(np.int64)

print(df_97.shape, label_97.shape)

 

 

#P14-W0 데이터
forders = os.listdir('/gdrive/My Drive/ANN_homework/data_set/P14-W0/')
print(forders)

df_140 = pd.DataFrame()
for i in range(0,len(forders)):
    if forders[i].split('.')[1] == 'csv':
        file = '/gdrive/My Drive/ANN_homework/data_set/P14-W0/'+forders[i]
        df= pd.read_csv(file,header=None
        df_140 = pd.concat([df_140, df])

df_140 = np.transpose(df_140) #각 행벡터가 데이터샘플 하나가 되도록 transpose
label_140 = 3 * np.ones(len(df_140))

label_140 = label_140.astype(np.int64)

print(df_140.shape, label_140.shape)




 

각 샘플들을 합쳐서 신경망 모델 입력 가능하게 하기. 데이터샘플+레이블

x_np = np.concatenate((df_00, df_11, df_97, df_140), axis=0# 데이터샘플 합치기
y_np = np.concatenate((label_00, label_11, label_97, label_140), axis=0# 타겟레이블 합치기
print(x_np.shape, y_np.shape)

print(np.unique(y_np))

y_np = y_np.astype(np.int64)

 

셔플링 및 배치 생성

batch_size = 100
# 데이터셋을 학습셋과 테스트셋으로 분할함:
X_train, X_test, y_train, y_test = train_test_split(
    x_np, y_np, test_size=0.5, random_state=1, stratify=y_np)
# test_size : 전체 샘플에서 지정한 비율 만큼을 테스트 셋으로 분할함
# stratify : 각 분할 셋의 클래스 분포 비가 지정한 셋과 동일하도록 함

# 각 특성값을 표준화:
sc = StandardScaler()
sc.fit(X_train)
X_train_std = sc.transform(X_train)
X_test_std = sc.transform(X_test)

train = torch.utils.data.TensorDataset(torch.Tensor(X_train_std), torch.Tensor(y_train).type(torch.LongTensor))
train_loader = torch.utils.data.DataLoader(dataset=train,
                                          batch_size=batch_size,
                                          shuffle=True,
                                          drop_last=True)

2. 신경망 모델 생성 및 학습 준비

컴퓨팅 플랫폼 결정 및 랜덤 seed 선택
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# for reproducibility
random.seed(10)
torch.manual_seed(10)

if device == 'cuda':
    torch.cuda.manual_seed_all(10)

 

신경망 모델 생성

# WiFi 시그널 모델은 504열 벡터
# 각 Affine 층(입력데이터에 선형 트랜스폼 수행) 모듈을 생성

# 1번 인자: 입력 데이터 수
# 2번 인자: 출력 데이터 수
# bias: False이면 편향을 사용하지 않음

linear1 = torch.nn.Linear(50450, bias=True)
linear2 = torch.nn.Linear(50100, bias=True)
linear3 = torch.nn.Linear(1004, bias=True)

# 활성화함수 모듈을 생성

sigmoid = torch.nn.Sigmoid()
relu = torch.nn.ReLU()
#relu = torch.nn.LeakyReLU()

# 만들어진 층, 활성화함수 모듈들을 순서대로 연결해서 모델 생성
# 타겟 연산 플랫폼을 설정

model = torch.nn.Sequential(linear1, sigmoid, linear2, relu, linear3).to(device)

 

손실함수 및 옵티마이저 생성

criterion = torch.nn.CrossEntropyLoss().to(device)  # 내부에 Softmax를 포함함

# 옵티마이저가 최적화할 파라미터를 넘김

# Stochastic gradient descent
# 1번 인자: 최적화할 파라미터 그룹
# lr: learning rate

optimizer = torch.optim.SGD(model.parameters(), lr=1)

3. 신경망 학습

training_epochs = 100
for epoch in range(training_epochs):

    avg_cost = 0
    total_batch = len(train_loader)

    # data_loader의 데이터는 앞서 지정한 배치사이즈로 쪼개져 있음
    for X, T in train_loader:
        # T는 레이블 인코딩되어 있음
        X = X.view(-1504).to(device) #텐서 모양 바꿀때는 view 함수 사용. -1인 그냥 default. 행 열 순서
        T = T.to(device)

        optimizer.zero_grad()   # 기존 계산한 경사값 삭제
        output = model(X)       # 순방향 연산
        cost = criterion(output, T)   # 손실함수 설정
        cost.backward()         # 경사값 계산
        optimizer.step()        # 업데이트 1회 수행

        avg_cost += cost / total_batch     # 평균 손실함수값 계산

    print('Epoch:''%04d' % (epoch + 1), 'cost =''{:.9f}'.format(avg_cost))
    
print('Learning finished')

4. 테스트

labels = ['P0_W0''P1-W1''P9-W7' , 'P14-W0']
with torch.no_grad(): #no-grad: 그래디언트 계산X

    X_test_std = torch.tensor(X_test_std).float().to(device)
    T_test = torch.tensor(y_test).to(device)

    # 테스트셋에 대해 추론 수행
    output = model(X_test_std)

    # 출력값이 가장 높은 뉴런의 인덱스와 정답을 비교, 맞으면 1, 틀리면 0
    correct_prediction = torch.argmax(output, 1) == T_test #argmax? 아웃풋 중 가장 큰 값(likelihood)를 가지고 있는 인덱스 출력.

    print(output)
    #즉 추론값 출력

    # 정확도 계산
    accuracy = correct_prediction.float().mean()
    print('Accuracy:', accuracy.item())

    # 테스트셋 중에 임의로 하나를 선택
    r = random.randint(0len(X_test_std) - 1)    
    X_single_data = X_test_std[r]
    T_single_data = T_test[r]

    print('Label: ', labels[T_single_data.item()])
    single_output = model(X_single_data)
    print('Prediction: ', labels[torch.argmax(single_output).item()])    

5. Confusion matrix 구하기

from sklearn.metrics import confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay #파이토치랑 호환되게 히는거

cm = confusion_matrix(T_test.cpu(), torch.argmax(output, 1).cpu()) #이건 플러팅하는게 아니라 계산만 하는거
print(cm)

# 노멀라이즈하고 싶으면:(likelihood)
cm = cm / np.repeat(np.bincount(T_test.cpu()), 4).reshape(4,4)

cm_display = ConfusionMatrixDisplay(cm, display_labels=labels).plot(cmap=plt.cm.Blues) #여기서 그리는거임 ㅇㅇ


인공신경망을 사용한 WiFi 신호패턴 분석.ipynb
0.03MB