새소식

Project

AI캐디 프로젝트 1 - Flask 실시간 webcam - SAM2

  • -

 

모바일로봇의 카메라를 실시간으로 입력받아서 GPU가 있는 컴퓨터에서 인공지능을 돌리고 

그 결과값을 모바일 로봇에게 보내 제어를 하려고 한다. 

 

실시간으로 모바일로봇 카메라를 수신받는 방법이 무엇이 있을까

Socket 방식에서 TCP와 UDP 방식이 있을 수 있고 

 

HTTP 방식이 있을 수 있다. 

나는 로봇이 보내는 영상을 휴먼 팔로잉할때도 써야하고 Action Recognition 할때도 써야하니깐 

쉽게 접근 가능한 HTTP 방식의 Flask를 사용하기로 하였다. 

 

로봇 / 보내는 쪽 (송신) 코드 

더보기

 

import cv2
from flask import Flask, Response
import time
# import request

app = Flask(__name__)

# OpenCV로 웹캠 캡처 객체 생성
camera = cv2.VideoCapture(1)

def generate_frames():
    """웹캠에서 프레임을 가져와 스트리밍"""
    while True:
        success, frame = camera.read()  # 웹캠에서 프레임 읽기
        if not success:
            break
        else:
            # 프레임 크기 조정 (640x480)
            height, width, channel  = frame.shape
            # frame = cv2.resize(frame, (int(width//2), int(height//2)))
            # frame = cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE)

            # JPEG 인코딩 품질 조정
            ret, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 70])
            frame = buffer.tobytes()

            # 프레임 데이터를 전송
            yield (b'--frame\r\n'
                   b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')

        # FPS 제한 (30 FPS)
        time.sleep(1 / 30)

@app.route('/video_feed')
def video_feed():
    """비디오 스트리밍"""
    return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')


if __name__ == "__main__":
    # Flask 앱 실행
    try:
        app.run(host='0.0.0.0', port=5000, threaded=True)
    finally:
        # 리소스 해제
        if camera.isOpened():
            camera.release()

 

받는 쪽 (수신) 코드 

더보기

 

import cv2
import requests
import numpy as np

# 모바일로봇 서버의 스트리밍 URL
url = "http://192.168.0.127:5000/video_feed"  # Flask 서버의 /video_feed URL

# 스트리밍 데이터 읽기
stream = requests.get(url, stream=True)

if stream.status_code == 200:
    print("Streaming 연결 성공")
    byte_data = b""  # 스트리밍 데이터를 저장할 바이트 버퍼
    
    for chunk in stream.iter_content(chunk_size=1024):  # 1KB 단위로 데이터 읽기
        byte_data += chunk
        a = byte_data.find(b'\xff\xd8')  # JPEG 시작 부분
        b = byte_data.find(b'\xff\xd9')  # JPEG 끝 부분
        if a != -1 and b != -1:  # JPEG 이미지의 시작과 끝이 존재할 때
            jpg = byte_data[a:b+2]  # JPEG 이미지 추출
            byte_data = byte_data[b+2:]  # 읽은 데이터 버퍼에서 제거
            
            # JPEG 데이터를 OpenCV 이미지로 디코딩
            frame = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR)
            
            # OpenCV로 이미지 표시
            cv2.imshow("YOLO Object Detection", frame)

            # 'q'를 누르면 종료
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
else:
    print(f"Streaming 연결 실패: 상태 코드 {stream.status_code}")

# OpenCV 윈도우 닫기
cv2.destroyAllWindows()

 

다음으로 Yolo를 적용해보자. 

utils 폴더 안에 yolo_fuc.py 코드 

더보기
# YOLO 모델로 객체 탐지
from ultralytics import YOLO    
model = YOLO("./models/yolo11n.pt")

def get_yolo():
    return model

def get_bbox(frame):

    results = model.track(source=frame, classes=[0], conf=0.5, show=False, stream=True, verbose=False)

    largest_box = None  # 가장 큰 바운딩 박스를 저장할 변수
    largest_area = 0  # 가장 큰 바운딩 박스의 면적

    # 탐지 결과 처리
    for result in results:
        boxes = result.boxes  # 탐지된 객체의 박스 정보
        for box in boxes:
            x1, y1, x2, y2 = map(int, box.xyxy[0])  # 바운딩 박스 좌표
            area = (x2 - x1) * (y2 - y1)  # 바운딩 박스 면적 계산
            
            # 가장 큰 바운딩 박스 갱신
            if area > largest_area:
                largest_area = area
                largest_box = (x1, y1, x2, y2, box.conf[0], int(box.cls[0]))  # 좌표, 신뢰도, 클래스 저장

    return largest_box

 

다음으로 영상 실시간으로 수신받고 yolo 적용 코드 

 

더보기
import cv2
import requests
import numpy as np
from utils.yolo_fuc import get_bbox

# 모바일로봇 서버의 스트리밍 URL
url = "http://192.168.0.127:5000/video_feed"  # Flask 서버의 /video_feed URL

# 스트리밍 데이터 읽기
stream = requests.get(url, stream=True)

if stream.status_code == 200:
    print("Streaming 연결 성공")
    byte_data = b""  # 스트리밍 데이터를 저장할 바이트 버퍼
    
    for chunk in stream.iter_content(chunk_size=1024):  # 1KB 단위로 데이터 읽기
        byte_data += chunk
        a = byte_data.find(b'\xff\xd8')  # JPEG 시작 부분
        b = byte_data.find(b'\xff\xd9')  # JPEG 끝 부분
        if a != -1 and b != -1:  # JPEG 이미지의 시작과 끝이 존재할 때
            jpg = byte_data[a:b+2]  # JPEG 이미지 추출
            byte_data = byte_data[b+2:]  # 읽은 데이터 버퍼에서 제거
            
            # JPEG 데이터를 OpenCV 이미지로 디코딩
            frame = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR)
            frame = cv2.flip(frame, 1)
            
            largest_box = get_bbox(frame)
            
            # 가장 큰 바운딩 박스가 있는 경우 화면에 그리기
            if largest_box:
                x1, y1, x2, y2, conf, cls = largest_box
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)

            
            # OpenCV로 이미지 표시
            cv2.imshow("YOLO Object Detection", frame)

            # 'q'를 누르면 종료
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
else:
    print(f"Streaming 연결 실패: 상태 코드 {stream.status_code}")

# OpenCV 윈도우 닫기
cv2.destroyAllWindows()

 

실시간으로 잘 작동한다.

 

그렇다면 이번에는 sam2를 적용해보자. 

 

sam2 환경을 먼저 구축해준다. 

 

https://github.com/khw11044/SAM2_streaming

 

GitHub - khw11044/SAM2_streaming

Contribute to khw11044/SAM2_streaming development by creating an account on GitHub.

github.com

 

다음은 utils 폴더안에 sam2_fuc.py 코드이다. 

 

더보기
import torch 
from sam2.build_sam import build_sam2_camera_predictor

# use bfloat16 for the entire notebook
torch.autocast(device_type="cuda", dtype=torch.bfloat16).__enter__()

if torch.cuda.get_device_properties(0).major >= 8:
    # turn on tfloat32 for Ampere GPUs (https://pytorch.org/docs/stable/notes/cuda.html#tensorfloat-32-tf32-on-ampere-devices)
    torch.backends.cuda.matmul.allow_tf32 = True
    torch.backends.cudnn.allow_tf32 = True
    

sam2_checkpoint = "./checkpoints/sam2_hiera_small.pt"
model_cfg = "sam2_hiera_s.yaml"
predictor = build_sam2_camera_predictor(model_cfg, sam2_checkpoint)

def get_predictor():
    return predictor

그리고 이 함수를 임포트해서 수신받는 컴퓨터 코드를 작성한다.

 

더보기
import cv2
import requests
import numpy as np
from utils.sam2_fuc import get_predictor
from utils.yolo_fuc import get_yolo, get_bbox

sam2 = get_predictor()
yolo = get_yolo()

# 모바일로봇 서버의 스트리밍 URL
url = "http://192.168.0.127:5000/video_feed"  # Flask 서버의 /video_feed URL
# 스트리밍 데이터 읽기
stream = requests.get(url, stream=True, timeout=5)  


if_init = False
largest_bbox=None


if stream.status_code == 200:
    print("Streaming 연결 성공")
    byte_data = b""  # 스트리밍 데이터를 저장할 바이트 버퍼
    
    for chunk in stream.iter_content(chunk_size=1024):  # 1KB 단위로 데이터 읽기
        byte_data += chunk
        a = byte_data.find(b'\xff\xd8')  # JPEG 시작 부분
        b = byte_data.find(b'\xff\xd9')  # JPEG 끝 부분
        if a != -1 and b != -1:  # JPEG 이미지의 시작과 끝이 존재할 때
            jpg = byte_data[a:b+2]  # JPEG 이미지 추출
            byte_data = byte_data[b+2:]  # 읽은 데이터 버퍼에서 제거
            
            # JPEG 데이터를 OpenCV 이미지로 디코딩
            frame = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR)
            frame = cv2.flip(frame, 1)
            width, height = frame.shape[:2][::-1]
            # 중심점 계산
            center_x, center_y = width // 2, height // 2
            
            if not largest_bbox:
                largest_bbox = get_bbox(frame)
            
            # 가장 큰 바운딩 박스가 있는 경우 화면에 그리기
            if largest_bbox:
                x1, y1, x2, y2, conf, cls = largest_bbox
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)

            
            if largest_bbox and not if_init:
                sam2.load_first_frame(frame)
                bbox = np.array([[largest_bbox[0], largest_bbox[1]],
                                [largest_bbox[2], largest_bbox[3]]], dtype=np.float32)
                
                _, out_obj_ids, out_mask_logits = sam2.add_new_prompt(frame_idx=0, obj_id=1, bbox=bbox)
                if_init = True
                
            elif if_init:
                out_obj_ids, out_mask_logits = sam2.track(frame)
                all_mask = np.zeros((height, width, 1), dtype=np.uint8)
                
                for i in range(len(out_obj_ids)):
                    out_mask = (out_mask_logits[i] > 0.0).permute(1, 2, 0).byte().cuda()
                    all_mask = cv2.bitwise_or(all_mask, out_mask.cpu().numpy() * 255)

                # 마스크 적용
                if all_mask is not None:
                    all_mask = cv2.cvtColor(all_mask, cv2.COLOR_GRAY2RGB)
                    frame = cv2.addWeighted(frame, 1, all_mask, 0.5, 0)
                
            
            # OpenCV로 이미지 표시
            cv2.imshow("YOLO Object Detection", frame)

            # 'q'를 누르면 종료
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
else:
    print(f"Streaming 연결 실패: 상태 코드 {stream.status_code}")

# OpenCV 윈도우 닫기
cv2.destroyAllWindows()

 

역시나 느리다. 

어떻게 해야할까?

멀티 쓰레드를 사용하는거다.

 

더보기
import cv2
import requests
import numpy as np
import threading
import queue
import torch
from utils.sam2_fuc import get_predictor
from utils.yolo_fuc import get_yolo, get_bbox

# 모델 및 장치 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True

# SAM2와 YOLO 모델 초기화
sam2 = get_predictor()
yolo = get_yolo()

# 스트리밍 URL 및 상태 변수
url = "http://192.168.0.127:5000/video_feed"
frame_queue = queue.Queue(maxsize=5)  # 프레임 큐
if_init = False
largest_bbox = None
running = True

# 실시간 스트림을 수신하는 스레드
def stream_frames():
    global running
    print("스트리밍 시작...")
    stream = requests.get(url, stream=True, timeout=5)
    if stream.status_code == 200:
        byte_data = b""
        for chunk in stream.iter_content(chunk_size=1024):
            if not running:
                break
            byte_data += chunk
            a = byte_data.find(b'\xff\xd8')
            b = byte_data.find(b'\xff\xd9')
            if a != -1 and b != -1:
                jpg = byte_data[a:b+2]
                byte_data = byte_data[b+2:]
                frame = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR)
                frame = cv2.flip(frame, 1)
                if not frame_queue.full():
                    frame_queue.put(frame)
    else:
        print(f"스트리밍 실패: {stream.status_code}")

# YOLO 및 SAM2로 프레임 처리하는 스레드
def process_frames():
    global running, if_init, largest_bbox
    print("프레임 처리 시작...")
    while running:
        if not frame_queue.empty():
            frame = frame_queue.get()
            height, width = frame.shape[:2]

            # YOLO를 통해 가장 큰 객체 감지
            if not largest_bbox:
                largest_bbox = get_bbox(frame)

            if largest_bbox:
                x1, y1, x2, y2, _, _ = largest_bbox
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)

            # SAM2 모델로 객체 세그멘테이션
            with torch.autocast(device_type="cuda", dtype=torch.bfloat16):
                if largest_bbox and not if_init:
                    sam2.load_first_frame(frame)
                    bbox = np.array([[x1, y1], [x2, y2]], dtype=np.float32)
                    _, out_obj_ids, out_mask_logits = sam2.add_new_prompt(frame_idx=0, obj_id=1, bbox=bbox)
                    if_init = True

                elif if_init:
                    out_obj_ids, out_mask_logits = sam2.track(frame)
                    all_mask = torch.zeros((height, width), dtype=torch.uint8, device=device)
                    
                    for i in range(len(out_obj_ids)):
                        out_mask = (out_mask_logits[i] > 0.0).byte()
                        all_mask = torch.bitwise_or(all_mask, out_mask.squeeze(0))
                    
                    all_mask = all_mask.cpu().numpy() * 255
                    all_mask = cv2.cvtColor(all_mask, cv2.COLOR_GRAY2BGR)
                    frame = cv2.addWeighted(frame, 1, all_mask, 0.5, 0)

            # 최종 프레임 출력
            cv2.imshow("YOLO Object Detection & SAM2 Segmentation", frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                running = False
                break

# 스레드 시작
stream_thread = threading.Thread(target=stream_frames)
process_thread = threading.Thread(target=process_frames)

stream_thread.start()
process_thread.start()

# 스레드 종료 대기
stream_thread.join()
process_thread.join()

# 리소스 정리
cv2.destroyAllWindows()
print("프로그램 종료.")

 

최종 코드 보기 

 

https://github.com/khw11044/flask-SAM2-multi-thread

 

GitHub - khw11044/flask-SAM2-multi-thread

Contribute to khw11044/flask-SAM2-multi-thread development by creating an account on GitHub.

github.com

 

Contents

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.