헬창 개발자

밀도기반 클러스터링 DBSCAN [파이썬] 본문

데이터 분석

밀도기반 클러스터링 DBSCAN [파이썬]

찬배 2022. 5. 1. 14:51
알고리즘

K Means나 Hierarchical 클러스터링의 경우 군집간의 거리를 이용하여 클러스터링을 하는 방법인데, 밀도 기반의 클러스터링은 점이 세밀하게 몰려 있어서 밀도가 높은 부분을 클러스터링 하는 방식이다.

쉽게 설명하면, 어느점을 기준으로 반경 x내에 점이 n개 이상 있으면 하나의 군집으로 인식하는 방식이다.

 

먼저 점 p가 있다고 할때, 점 p에서 부터 거리 e (epsilon)내에 점이 m(minPts) 개 있으면 하나의 군집으로 인식한다고 하자. 이 조건 즉 거리 e 내에 점 m개를 가지고 있는 점 p를 core point (중심점) 이라고 한다.

DBSCAN 알고리즘을 사용하려면 기준점 부터의 거리 epsilon값과, 이 반경내에 있는 점의 수 minPts를 인자로 전달해야 한다.

인자의 반환값은 핵심(core), 경계(border), 이상치(noise)로 구분된다.

 

동작과정은 다음과 같다.

1. e(반경)과 m(minPts)를 설정한다.

2. 데이터로부터 코어점의 조건을 만족하는 임의의 점을 선택한다.

3. 밀도-도달가능한 점들을 뽑아서 코어점과 경계점을 구분하고, 이에 속하지 않은 점들을 잡음으로 구분한다.

4. e반경 안에 있는 코어점들을 서로 연결한다.

5. 연결된 코어점들을 하나의 군집으로 형성한다.

6. 모든 경계점들은 어느 하나의 군집에 할당한다. (여러 군집에 걸쳐있는 경우, 반복 과정에서 먼저 할당된 군집으로 할당)


출처 http://primo.ai/index.php?title=Density-Based_Spatial_Clustering_of_Applications_with_Noise_(DBSCAN)


소스코드
from matplotlib import pyplot as plt
import math

class DBSCANING():
    def __init__(self):
        self.cluster = []
        self.noise = []
        self.eps = 1.12 # 반지름 1.12
        self.minPoints = 2 # minPoints는 2개, 반지름 안에 2개가 있어야 됨
        self.data = [[5.95,5.55],[5.56,5.61],[3.14,4.07],
                    [4.99,5.14],[5.4,2.6],[5.05,4.61],
                    [6.36,5.14],[5.93,3.29],[6.16,4.32],
                    [3.5,3.46],[4.27,4.91],[5.66,5.0],
                    [5.68,5.09],[3.15,3.71],[5.6,4.43],
                    [2.37,4.33],[4.57,4.3],[5.62,4.31],
                    [6.57,4.95],[9.99,9.97]] # 입력받은 원데이터 [9.99,9.97]->노이즈값
        
    def Euclidean(self, v1, v2): # 유클리드 거리 계산 함수
        sum = 0
        for i in range(len(v1)):
            sum +=  (v1[i]-v2[i])**2
        return round(math.sqrt(sum), 2)
    
    def search(self, r, eps): # 근접한 클러스터 찾는 함수
        neighborP = [] # 근접 클러스터를 담을 리스트
        for i in range(len(r)): # 리스트 r의 길이만큼 반복
            if r[i] < eps: # 만약 r 리스트의 i번째 값이 1.12 보다 작으면
                neighborP.append(i) # 근접 클러스터에 추가
        return neighborP # neighborP 리턴

    def expansion(self, i, idx, dist, neighborP, visited,  C): # 클러스터 통합하는 함수
        idx[i] = C # C값 idx 리스트에 저장 (마이크로 클러스터)
        k = 0 # k값 0으로 초기화
        while len(neighborP) > k: # 이웃리스트 길이가 k보다 큰 경우
            j = neighborP[k] # 이웃리스트의 k번째 값을 j에 저장
            if(visited[j]) == False: # 방문 False인 경우
                visited[j] = True # True로 초기화
                neighbor_of_neighbor = self.search(dist[j], 1.12) # 반경 안에 있는 점들을 저장
                if len(neighbor_of_neighbor) > 2: # 만약 neighbor_of_neighbor의 길이가 2보다 큰 경우
                    neighborP += neighbor_of_neighbor # neighborP에 추가  
            if idx[j] == 0: # idx의 j번째 값이 0인 경우
                idx[j] = C # c로 초기화
            k+=1 # 계산 종료되면 K+1

    def DBSCAN(self, points): # DBSCAN
        n = len(points) # points(입력받는 data)의 길이 n에 저장
        dist = [[0] * n for _ in range(n)] # n의 길이만큼 이중리스트 dist 생성
        for i in range(n): # 행의 길이만큼 반복
            for j in range(n): # 열의 길이만큼 반복
                if i == j: # 만약 i==j인 경우 
                    continue # continue
                else: # 그 외의 경우
                    dist[i][j] = self.Euclidean(points[i], points[j]) # 거리를 구하고 해당 행열에 추가
        visited = [False for _ in range(n)] # n의 길이만큼 boolean형 리스트 생성
        noise = [False for _ in range(n)] # n의 길이만큼 boolean형 리스트 생성
        idx = [0 for _ in range(n)] # n의 길이만큼 리스트 생성

        C = 0 # c값 0으로 초기화
        
        for i, point in enumerate(points): # 튜플의 형태로 points의 길이만큼 반복
            if visited[i] == False: # 만약 i번째에 방문하지 않았다면
                visited[i] = True # True로 바꿈
                neighborP = self.search(dist[i], 1.12) # 클러스터가 해당 반경 내에 있으면 클러스터에 추가
                if len(neighborP) > 2: # 만약 neighborP가 minPoints보다 클 경우
                    C += 1 # C의 값이 1 증가
                    self.expansion(i, idx, dist, neighborP, visited, C) # 마이크로 클러스터 확장
                else: # 그 외의 경우
                    noise[i] = True # noise 값을 True로 변경
        return idx # idx 리턴
    
    def run(self):
        points = self.data 
        print("Input Data : ", points) # 데이터 출력

        print('-'*50) 
        idx = self.DBSCAN(points) # points를 DBSCAN실행 후 idx로 초기화

        for i in range(max(idx)): # idx의 최대값만큼 반복
            k = [] # k 리스트 초기화
            for j in range(len(idx)): # idx의 길이만큼 반복
                if idx[j] == i+1: # idx의 j번째 값이 i+1과 같으면
                    k.append(j) # k 리스트에 추가
            result = [] # result 리스트 초기화
            for p in range(len(k)): # k 리스트의 길이만큼 반복
                result.append(points[k[p]]) # result 리스트에 points의 k[p]번째 값 추가
            self.cluster.append(result) # cluster에 result 추가

        for i in range(len(idx)): # DBSCAN을 통해 구한 idx의 길이만큼 반복
            if idx[i] == 0: # 만약 idx의 i번째 값이 0인 경우
                self.noise.append(points[i]) # noise 리스트에 추가

        for i, c in enumerate(self.cluster): # 튜플의 형태로 cluster의 길이만큼 반복
            print('Cluster ', i , ': ', c) # 클러스터 출력
        print('-'*50)
        print('noise : ', self.noise) # 노이즈 출력

        fig, ax = plt.subplots() 
        colors = ['red', 'blue', 'yellow']
        for i, c in enumerate(self.cluster): # 튜플의 형태로 cluster의 길이만큼 반복
            for j, v in enumerate(c): # 튜플의 형태로 c의 길이만큼 반복
                ax.plot(v[0], v[1], marker='o', color=colors[i], linestyle='') # 그래프 초기화

        for i, v in enumerate(self.noise): # 튜플의 형태로 noise의 길이만큼 반복
            ax.plot(v[0], v[1], marker='x', linestyle='') # 그래프 초기화

        plt.title('DBSCAN', fontsize=10) # 타이틀 DBSCAN 설정, 폰트사이즈 10
        plt.xlabel('X', fontsize=10) # X 라벨값 부여, 폰트사이즈 10
        plt.ylabel('Y', fontsize=10) # Y 라벨값 부여, 폰트사이즈 10
        plt.show() # 그래프 출력
        
if __name__=='__main__':
    aaaaaaaaaaaaa = DBSCANING()
    aaaaaaaaaaaaa.run()
결과화면

Comments