#!/usr/bin/python3 # -*- coding: utf-8 -*- """ Perceptrón para estimar precio de viviendas. Una capa oculta con 5 procesadores con activación tangente hiperbólica y error cuadrático. """ import math import torch import torch.nn as nn import random import scipy import scipy.cluster from scipy import stats import numpy as np from numpy import array import matplotlib.pyplot as plot from sklearn.metrics import mutual_info_regression from torch.utils.tensorboard import SummaryWriter def septorch(datos,tipo,donde): entradas=[ [col for col in fila[:-1]] for fila in datos] salidas=[ fila[-1:] for fila in datos] redent=torch.tensor(entradas,dtype=tipo,device=donde,requires_grad=True) redsal=torch.tensor(salidas,dtype=tipo,device=donde) return redent,redsal,entradas,salidas def grafini(numfils,numcols): return plot.subplots(numfils, numcols, sharex='col', sharey='row', figsize=(20, 15)) def grafhintini(numcapas): return plot.subplots(numcapas, 1, sharex='col', sharey='row', figsize=(20, 15)) def grafindiv(figtot,figind,f,c,x,y): figind[f,c].scatter(x, y) figind[f,c].minorticks_off() figind[f,c].locator_params(tight=True, nbins=4) def grafhintcada(pesos): fig,cada=plot.subplots(len(pesos), 1, sharex='col', sharey='row', figsize=(20, 15)) if len(pesos)>1: for filaproc in range(len(pesos)): cada[filaproc].bar(range(pesos.shape[1]), pesos[filaproc,:]) else: cada.bar(range(pesos.shape[1]), pesos[0,:]) plot.suptitle("Valores de los pesos") plot.show() def numhintcada(pesos): valpesos=np.absolute(pesos) maxpeso=np.maximum.reduce(pesos,axis=None) gordos=valpesos>0.8*maxpeso print( "Pesos gordos (procesador,variable,valor): ",gordos.nonzero(),pesos[gordos]) def grafconc(x,rotulo): plot.suptitle(rotulo) plot.show() def micini(numfils,numcols): result=np.zeros((numfils,numcols)) return result,None def micindiv(mat,mod,f,c,x,y): mat[f,c]=mutual_info_regression(x, y, discrete_features=[False]) def micconc(mat,rotulo): for umbral in np.arange(0.1,0.9,0.1): filtro=mat>umbral cuantos=len(mat[filtro]) if cuantos>=1 and cuantos<15: break if cuantos>=1: quienes=filtro.nonzero() print( rotulo) print ("Casos de dependencia clara: Cuál de las derivadas primeras, Con qué variable, Cuánta información mutua",np.hstack((np.array(quienes).T,np.atleast_2d(mat[filtro]).T))) def procesacapas(red,previo,sini): salcapa=[] guardar=sini#La primera es la salida de lo lineal, que no interesa. Por ese patrón, irán alternando. for capa in red.children(): previo=capa(previo) if guardar: salcapa.append(previo) guardar=not guardar return salcapa dtype = torch.float device = torch.device("cpu") #device = torch.device("cuda:0") #Cargamos los datos, todos fichdatos=open('casas.trn.txt','r') datos= [[float(cada) for cada in linea.strip().split()] for linea in fichdatos.readlines()] numentradas=len(datos[0])-1 #Separamos ajuste y prueba numuestras=len(datos) #Desordena conjunto random.shuffle(datos) #Llevarlos a escala unitaria??? datot=stats.zscore(array(datos,dtype=np.float32)) #Separa una parte para escoger por azar limazar=int(0.4*numuestras) # Cambia el 0.4 datazar=datot[:limazar,:] datgrup=datot[limazar:,:] #Separa un primer lote de ajuste y prueba por azar limajaz=int(limazar*0.7) # parte de ajuste= 0.7 Puede ser otra cosa limvalaz=int(limazar*0.85) # 0.85 o lo que quieras es ajuste+validación datajaz=datazar[:limajaz,:] datvalaz=datazar[limajaz:limvalaz,:] datpruaz=datazar[limvalaz:,:] #Separa un segundo lote de ajuste y prueba por agrupamiento limgrupaj=len(datgrup) numajgrup=int(limgrupaj*muesajuste) centros,grupos=scipy.cluster.vq.kmeans2(datgrup,numajgrup,minit='points') orgpuntos=scipy.spatial.KDTree(datgrup) dist,ind=orgpuntos.query(centros) datajgrup=datgrup[ind] indvalpru=np.setdiff1d(range(limgrupaj),ind) datvalprugrup=datgrup[indvalpru] numprugrup=int(limgrupaj*0.15) # La parte de prueba. Con ajuste+validación, debe sumar 1 centros,grupos=scipy.cluster.vq.kmeans2(datvalprugrup,numprugrup,minit='points') orgpuntos=scipy.spatial.KDTree(datvalprugrup) dist,ind=orgpuntos.query(centros) datprugrup=datvalprugrup[ind] indpru=np.setdiff1d(range(len(datvalprugrup)),ind) datvalgrup=datvalprugrup[indpru] dataj=np.vstack((datajaz,datajgrup)) datval=np.vstack((datvalaz,datvalgrup)) datpru=np.vstack((datpruaz,datprugrup)) #Pasarlo a tensores torch tea,tsa,ea,sa=septorch(dataj,dtype,device) tev,tsv,ev,sv=septorch(datval,dtype,device) tep,tsp,ep,sp=septorch(datpru,dtype,device) #Si interesa la referencia de regresión lineal, Torch no la tiene directamente, pero puedes hacer una red enteramente lineal y ajustarla #definir red red=nn.Sequential( nn.Linear(numentradas, 5), # ¿Cuántos ocultos quieres? nn.Tanh(),#Para un modelo lineal, suprimirías esto nn.Linear(5, 1), # Si cambias los ocultos, cambia también aquí ) #).cuda(device) #definir error a optimizar error = nn.MSELoss() #definir algoritmo de ajuste velocidad ajuste=torch.optim.LBFGS(red.parameters(),lr=0.01,max_iter=50,history_size=10) nvalfal=0 def evalua(): ajuste.zero_grad() s = red(tea) e = error(s, tsa) e.backward() return e print ("Iteración","Error de ajuste","Error de validación") for it in range(100): # 100 son las iteraciones ea=evalua() salval = red(tev) ev=error(salval,tsv) if 'evprevio' in dir(): if evprevio5: # 5 o el tope que pongas break evprevio=ev.item() print(it, math.sqrt(ea.item()),math.sqrt(evprevio)) ajuste.step(evalua) #Prueba: pasada directa, incluyendo derivadas de la red ajuste.zero_grad() salpru=red(tep) ep=error(salpru,tsp) print("Error de prueba",math.sqrt(ep.item())) #Tensorboard writer = SummaryWriter('runs/p2') writer.add_graph(red, tep) writer.add_embedding(datot) writer.close() ###Análisis adicionales ##Derivadas respecto a las entradas for salida in salpru: salida.backward(retain_graph=True) fungraf={'pesini':grafini,'pescada':grafindiv,'pesfin':grafconc,'hinton':grafhintcada} funnum={'pesini':micini,'pescada':micindiv,'pesfin':micconc,'hinton':numhintcada} numvartot=tep.shape[1] funanal=fungraf if numvartot>15: # Tope para desechar gráficos por demasiados funanal=funnum #Sacamos en pantalla las medias y las desviaciones típicas dermed=tep.grad.numpy().mean(axis=0) print( "Derivadas medias",dermed) print ("Desviación típica de derivadas",tep.grad.numpy().std(axis=0)) sal1,sal2=funanal['pesini'](numvartot,numvartot+1) for vardev in range(numvartot): for var in range(numvartot): funanal['pescada'](sal1,sal2,vardev,var,tep[:,var].detach().numpy(), tep.grad[:,vardev].detach().numpy()) funanal['pescada'](sal1,sal2,vardev,numvartot,tsp.detach().numpy().squeeze(), tep.grad[:,vardev].detach().numpy()) funanal['pesfin'](sal1,"Derivadas respecto a entradas frente a entradas") ##Actividad de procesadores ocultos: #pasada capa a capa previo=tep salcapa=procesacapas(red,previo,False) #salcapa.pop() Si hubiera que quitar la de salida #Graficarlo respecto a variables si es manejable. Si no, simplemente sacar las dependencias procesador/variable más significativas for capa in salcapa: #Sacar varianza y mostrar los que la tengan pequeña varian=capa.var() inutiles=varian<0.15 # Tope para marcar procesadores que se mueven poco if len(varian[inutiles])>0: print ("Procesadores poco activos: ",inutiles.nonzero()) numprocs=capa.shape[1] if numprocs>15: # Tope para desechar gráficos funanal=funnum sal1,sal2=funanal['pesini'](numprocs,numvartot+1) for proc in range(numprocs): for var in range(numvartot): funanal['pescada'](sal1,sal2,proc,var,tep[:,var].detach().numpy(), capa[:,proc].detach().numpy()) funanal['pescada'](sal1,sal2,proc,numvartot,tsp.detach().numpy().squeeze(), capa[:,proc].detach().numpy()) funanal['pesfin'](sal1,"Procesadores frente a variables") ##Mostrar pesos. Cada procesador un diagrama de barras respecto a las variables. Si son muchos, indicar sólo los más significativos tienepesos=True#La primera es la salida de lo lineal, que tiene pesos. Por ese patrón, irán alternandoLa de función de activación, no. Si el patrón se mantiene, irán alternando for capa in red.children(): if tienepesos: pesos=capa.parameters()#procesadores en filas, variables en columnas p= next(pesos)#Poner que sólo interesan capas salternas funanal['hinton'](p.detach().numpy()) tienepesos=not tienepesos