이전 포스팅까지 keras layer를 통해 encoder를 만드는 것을 배웠으나 실제로 의미있는 데이터를 다루지는 않았다. 이번 포스팅에서는 실제 의미있는 데이터를 다루는 것을 실습한다.
기존과 같이 세팅을 하고, load_dataset을 이요해 imdb를 불러온다.
dataset = load_dataset("imdb")
미리 training 된 Tokenizer를 가져오고, 1024를 기준으로 padding을 맞춰 준 뒤, encoding해 준다.
tokenizer = Tokenizer.from_pretrained("bert-base-uncased")
tokenizer.enable_padding()
tokenizer.enable_truncation(1024)
temp = tokenizer.encode_batch(dataset["train"]["text"])
여기까지 진행하고 나면, 위 그림과 같이 num_tokens가 모두 1024로 맞춰져 있다.
지난 포스팅에서 했던 것처럼 Config class를 만들어 준다.
@dataclass
class Config:
vocab_size = tokenizer.get_vocab_size()
d_model = 512
block_size = 1024
num_heads = 4
key_dim = int(d_model/num_heads)
dff = 2048
dropout_rate = 0.1
num_layers = 2
batch_size = 32
Dataset을 이용해서(상속시켜서) Dataset class를 만들어 주고 앞서 만든 temp의 id들과 dataset의 train>label data를 묶어 준다.
#import torch 필요
class MyDataset(torch.utils.data.Dataset):
def __init__(self, x, y):
super().__init__()
self.x = x
self.y = y
def __getitem__(self, idx):
x = self.x[idx]
y = self.y[idx]
return torch.tensor(x), torch.tensor(y)
def __len__(self):
return len(self.x)
x = [i.ids for i in temp]
y = dataset["train"]["label"]
ds = MyDataset(x, y)
여기서 묶어 준 dataSet(ds)과 Config class에 정의된 batch_size를 이용해 Data Loader instance를 만든다.
dl = torch.utils.data.DataLoader(ds, batch_size=Config.batch_size, shuffle=True)
이제, 지난 포스팅에서 다룬 Layer들을 만들어서 data를 가공할 것이다.
class TokenAndPositionEmbedding(keras.layers.Layer):
def __init__(self, config):
super().__init__()
self.config = config
self.tok_embedding = keras.layers.Embedding(config.vocab_size, config.d_model)
self.pos_embedding = keras.layers.Embedding(config.block_size, config.d_model)
def call(self, x):
b, t = x.shape
tok_emb = self.tok_embedding(x)
pos_emb = self.pos_embedding(keras.ops.arange(self.config.block_size))
return tok_emb + pos_emb
class GlobalSelfAttention(keras.layers.Layer):
def __init__(self, config):
super().__init__()
self.config = config
self.mha = keras.layers.MultiHeadAttention(config.num_heads, config.key_dim)
self.layernorm = keras.layers.LayerNormalization()
self.add = keras.layers.Add()
def call(self, x):
attn_output = self.mha(
query=x,
value=x,
key=x)
x = self.add([x, attn_output])
x = self.layernorm(x)
return x
class FeedForward(keras.layers.Layer):
def __init__(self, config):
super().__init__()
self.config = config
self.seq = keras.Sequential([
keras.layers.Dense(config.dff, activation='relu'),
keras.layers.Dense(config.d_model),
keras.layers.Dropout(config.dropout_rate)
])
self.add = keras.layers.Add()
self.layer_norm = keras.layers.LayerNormalization()
def call(self, x):
x = self.add([x, self.seq(x)])
x = self.layer_norm(x)
return x
class EncoderLayer(keras.layers.Layer):
def __init__(self, config):
super().__init__()
self.config = config
self.gsa = GlobalSelfAttention(config)
self.ff = FeedForward(config)
def call(self, x):
x = self.gsa(x)
x = self.ff(x)
return x
class Encoder(keras.layers.Layer):
def __init__(self, config):
super().__init__()
self.config = config
self.embedding = TokenAndPositionEmbedding(config)
self.enc_layers = [EncoderLayer(config) for _ in range(config.num_layers)]
self.dropout = keras.layers.Dropout(config.dropout_rate)
def call(self, x):
x = self.embedding(x) # Shape `(batch_size, seq_len, d_model)`.
x = self.dropout(x)
for i in range(self.config.num_layers):
x = self.enc_layers[i](x)
return x
위 code들에 대한 설명은 지난 포스팅의 것으로 갈음한다.
이제 여기까지 구현한 것들을 바탕으로 진짜 모델을 만들어야 한다. keras로부터 model을 만들 때는 keras.Model을 상속시켜 만든다.
class MyModel(keras.Model):
def __init__(self, config):
super().__init__()
self.config = config
self.encoder = Encoder(config)
self.avg = keras.layers.GlobalAveragePooling1D()
self.final = keras.layers.Dense(1)
def call(self, x):
x = self.encoder(x)
x = self.avg(x)
x = self.final(x)
return x
앞서 구현한 encoder를 통과시키고 -> 그 값들을 GlobalAveragePooling1D layer에 통과시켜서 average를 구하고 -> 그 값을 다시 Dense Layer에 통과시킨다.
keras.layers.Dense(1)까지 통과시키면, 그 최종값은 input이 긍정적 의미의 영화평인지, 부정적 의미의 영화평인지 파악한 값이 된다.
위 이미지는 첫 번째 data(여기서는 영화평)의 예측 결과로, 우리는 이 숫자가 (부정적인 영화평은) 0에 가까운 값이 되도록, 또는 (긍정적인 영화평은) 1 값에 가깝게 되도록 layer들의 coefficient를 조정하는 것을 model에 맡기는 것이다.
이를 위해 목표값과 차이를 계산하기 위해, BinaryCrossentropy를 도입한다.
loss = keras.losses.BinaryCrossentropy(from_logits=True)
*여기서는 확률값이 아니기 때문에 from_logits =True이다. 확률값에 대해 loss를 계산할 때는 False로 두어야 한다.
optimizing을 위해 keras의 Adam optimizer를 도입한다. 이 역시 이전 포스팅에서 다루었다.
optimizer = keras.optimizers.Adam()
전체 중에서 몇%를 맞추고 있는지 측정하기 위해 BinaryAccuracy를 도입한다.
metrics = keras.metrics.BinaryAccuracy()
지금 도입한 이 3가지를 매개변수로 하여 model을 compile한다.
model.compile(optimizer=optimizer, loss=loss, metrics=[metrics])
여기까지 완료되면 training을 위한 준비가 완료된 것이다.
이제 model이 dataloader를 사용하여(epoch 만큼 반복하면서) 목표치를 맞추게 해야 한다.
model.fit(dl, epochs=5) #목표에 맞게 epoch 값 변경
여기까지가 우리가 지금까지의 포스팅에서 다뤘던 내용들을 통해 (encoder only) model을 만들고 training 시키는 과정이다. 이 training의 결과값이 1에 최대한 가깝게 나오도록 하는 것이 우리의 목표이다.
'경제학코딩 2023' 카테고리의 다른 글
[Attention Mask] (1) | 2023.12.17 |
---|---|
[Scaled dot product] (1) | 2023.12.17 |
[Encoder Only Model with Random Data] (1) | 2023.12.17 |
[Keras Layer - (2)] (0) | 2023.12.16 |
[Keras Layer-(1)] (0) | 2023.12.16 |