새소식

Project

AI캐디 프로젝트 1 - Flask 실시간 webcam - SAM2

  • -

 

모바일로봇의 카메라를 실시간으로 입력받아서 GPU가 있는 컴퓨터에서 인공지능을 돌리고 

그 결과값을 모바일 로봇에게 보내 제어를 하려고 한다. 

 

실시간으로 모바일로봇 카메라를 수신받는 방법이 무엇이 있을까

Socket 방식에서 TCP와 UDP 방식이 있을 수 있고 

 

HTTP 방식이 있을 수 있다. 

나는 로봇이 보내는 영상을 휴먼 팔로잉할때도 써야하고 Action Recognition 할때도 써야하니깐 

쉽게 접근 가능한 HTTP 방식의 Flask를 사용하기로 하였다. 

 

로봇 / 보내는 쪽 (송신) 코드 

더보기

 

import cv2 from flask import Flask, Response import time # import request app = Flask(__name__) # OpenCV로 웹캠 캡처 객체 생성 camera = cv2.VideoCapture(1) def generate_frames(): """웹캠에서 프레임을 가져와 스트리밍""" while True: success, frame = camera.read() # 웹캠에서 프레임 읽기 if not success: break else: # 프레임 크기 조정 (640x480) height, width, channel = frame.shape # frame = cv2.resize(frame, (int(width//2), int(height//2))) # frame = cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE) # JPEG 인코딩 품질 조정 ret, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 70]) frame = buffer.tobytes() # 프레임 데이터를 전송 yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') # FPS 제한 (30 FPS) time.sleep(1 / 30) @app.route('/video_feed') def video_feed(): """비디오 스트리밍""" return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame') if __name__ == "__main__": # Flask 앱 실행 try: app.run(host='0.0.0.0', port=5000, threaded=True) finally: # 리소스 해제 if camera.isOpened(): camera.release()

 

받는 쪽 (수신) 코드 

더보기

 

import cv2 import requests import numpy as np # 모바일로봇 서버의 스트리밍 URL url = "http://192.168.0.127:5000/video_feed" # Flask 서버의 /video_feed URL # 스트리밍 데이터 읽기 stream = requests.get(url, stream=True) if stream.status_code == 200: print("Streaming 연결 성공") byte_data = b"" # 스트리밍 데이터를 저장할 바이트 버퍼 for chunk in stream.iter_content(chunk_size=1024): # 1KB 단위로 데이터 읽기 byte_data += chunk a = byte_data.find(b'\xff\xd8') # JPEG 시작 부분 b = byte_data.find(b'\xff\xd9') # JPEG 끝 부분 if a != -1 and b != -1: # JPEG 이미지의 시작과 끝이 존재할 때 jpg = byte_data[a:b+2] # JPEG 이미지 추출 byte_data = byte_data[b+2:] # 읽은 데이터 버퍼에서 제거 # JPEG 데이터를 OpenCV 이미지로 디코딩 frame = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR) # OpenCV로 이미지 표시 cv2.imshow("YOLO Object Detection", frame) # 'q'를 누르면 종료 if cv2.waitKey(1) & 0xFF == ord('q'): break else: print(f"Streaming 연결 실패: 상태 코드 {stream.status_code}") # OpenCV 윈도우 닫기 cv2.destroyAllWindows()

 

다음으로 Yolo를 적용해보자. 

utils 폴더 안에 yolo_fuc.py 코드 

더보기
# YOLO 모델로 객체 탐지 from ultralytics import YOLO model = YOLO("./models/yolo11n.pt") def get_yolo(): return model def get_bbox(frame): results = model.track(source=frame, classes=[0], conf=0.5, show=False, stream=True, verbose=False) largest_box = None # 가장 큰 바운딩 박스를 저장할 변수 largest_area = 0 # 가장 큰 바운딩 박스의 면적 # 탐지 결과 처리 for result in results: boxes = result.boxes # 탐지된 객체의 박스 정보 for box in boxes: x1, y1, x2, y2 = map(int, box.xyxy[0]) # 바운딩 박스 좌표 area = (x2 - x1) * (y2 - y1) # 바운딩 박스 면적 계산 # 가장 큰 바운딩 박스 갱신 if area > largest_area: largest_area = area largest_box = (x1, y1, x2, y2, box.conf[0], int(box.cls[0])) # 좌표, 신뢰도, 클래스 저장 return largest_box

 

다음으로 영상 실시간으로 수신받고 yolo 적용 코드 

 

더보기
import cv2 import requests import numpy as np from utils.yolo_fuc import get_bbox # 모바일로봇 서버의 스트리밍 URL url = "http://192.168.0.127:5000/video_feed" # Flask 서버의 /video_feed URL # 스트리밍 데이터 읽기 stream = requests.get(url, stream=True) if stream.status_code == 200: print("Streaming 연결 성공") byte_data = b"" # 스트리밍 데이터를 저장할 바이트 버퍼 for chunk in stream.iter_content(chunk_size=1024): # 1KB 단위로 데이터 읽기 byte_data += chunk a = byte_data.find(b'\xff\xd8') # JPEG 시작 부분 b = byte_data.find(b'\xff\xd9') # JPEG 끝 부분 if a != -1 and b != -1: # JPEG 이미지의 시작과 끝이 존재할 때 jpg = byte_data[a:b+2] # JPEG 이미지 추출 byte_data = byte_data[b+2:] # 읽은 데이터 버퍼에서 제거 # JPEG 데이터를 OpenCV 이미지로 디코딩 frame = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR) frame = cv2.flip(frame, 1) largest_box = get_bbox(frame) # 가장 큰 바운딩 박스가 있는 경우 화면에 그리기 if largest_box: x1, y1, x2, y2, conf, cls = largest_box cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) # OpenCV로 이미지 표시 cv2.imshow("YOLO Object Detection", frame) # 'q'를 누르면 종료 if cv2.waitKey(1) & 0xFF == ord('q'): break else: print(f"Streaming 연결 실패: 상태 코드 {stream.status_code}") # OpenCV 윈도우 닫기 cv2.destroyAllWindows()

 

실시간으로 잘 작동한다.

 

그렇다면 이번에는 sam2를 적용해보자. 

 

sam2 환경을 먼저 구축해준다. 

 

https://github.com/khw11044/SAM2_streaming

 

GitHub - khw11044/SAM2_streaming

Contribute to khw11044/SAM2_streaming development by creating an account on GitHub.

github.com

 

다음은 utils 폴더안에 sam2_fuc.py 코드이다. 

 

더보기
import torch from sam2.build_sam import build_sam2_camera_predictor # use bfloat16 for the entire notebook torch.autocast(device_type="cuda", dtype=torch.bfloat16).__enter__() if torch.cuda.get_device_properties(0).major >= 8: # turn on tfloat32 for Ampere GPUs (https://pytorch.org/docs/stable/notes/cuda.html#tensorfloat-32-tf32-on-ampere-devices) torch.backends.cuda.matmul.allow_tf32 = True torch.backends.cudnn.allow_tf32 = True sam2_checkpoint = "./checkpoints/sam2_hiera_small.pt" model_cfg = "sam2_hiera_s.yaml" predictor = build_sam2_camera_predictor(model_cfg, sam2_checkpoint) def get_predictor(): return predictor

그리고 이 함수를 임포트해서 수신받는 컴퓨터 코드를 작성한다.

 

더보기
import cv2 import requests import numpy as np from utils.sam2_fuc import get_predictor from utils.yolo_fuc import get_yolo, get_bbox sam2 = get_predictor() yolo = get_yolo() # 모바일로봇 서버의 스트리밍 URL url = "http://192.168.0.127:5000/video_feed" # Flask 서버의 /video_feed URL # 스트리밍 데이터 읽기 stream = requests.get(url, stream=True, timeout=5) if_init = False largest_bbox=None if stream.status_code == 200: print("Streaming 연결 성공") byte_data = b"" # 스트리밍 데이터를 저장할 바이트 버퍼 for chunk in stream.iter_content(chunk_size=1024): # 1KB 단위로 데이터 읽기 byte_data += chunk a = byte_data.find(b'\xff\xd8') # JPEG 시작 부분 b = byte_data.find(b'\xff\xd9') # JPEG 끝 부분 if a != -1 and b != -1: # JPEG 이미지의 시작과 끝이 존재할 때 jpg = byte_data[a:b+2] # JPEG 이미지 추출 byte_data = byte_data[b+2:] # 읽은 데이터 버퍼에서 제거 # JPEG 데이터를 OpenCV 이미지로 디코딩 frame = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR) frame = cv2.flip(frame, 1) width, height = frame.shape[:2][::-1] # 중심점 계산 center_x, center_y = width // 2, height // 2 if not largest_bbox: largest_bbox = get_bbox(frame) # 가장 큰 바운딩 박스가 있는 경우 화면에 그리기 if largest_bbox: x1, y1, x2, y2, conf, cls = largest_bbox cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) if largest_bbox and not if_init: sam2.load_first_frame(frame) bbox = np.array([[largest_bbox[0], largest_bbox[1]], [largest_bbox[2], largest_bbox[3]]], dtype=np.float32) _, out_obj_ids, out_mask_logits = sam2.add_new_prompt(frame_idx=0, obj_id=1, bbox=bbox) if_init = True elif if_init: out_obj_ids, out_mask_logits = sam2.track(frame) all_mask = np.zeros((height, width, 1), dtype=np.uint8) for i in range(len(out_obj_ids)): out_mask = (out_mask_logits[i] > 0.0).permute(1, 2, 0).byte().cuda() all_mask = cv2.bitwise_or(all_mask, out_mask.cpu().numpy() * 255) # 마스크 적용 if all_mask is not None: all_mask = cv2.cvtColor(all_mask, cv2.COLOR_GRAY2RGB) frame = cv2.addWeighted(frame, 1, all_mask, 0.5, 0) # OpenCV로 이미지 표시 cv2.imshow("YOLO Object Detection", frame) # 'q'를 누르면 종료 if cv2.waitKey(1) & 0xFF == ord('q'): break else: print(f"Streaming 연결 실패: 상태 코드 {stream.status_code}") # OpenCV 윈도우 닫기 cv2.destroyAllWindows()

 

역시나 느리다. 

어떻게 해야할까?

멀티 쓰레드를 사용하는거다.

 

더보기
import cv2 import requests import numpy as np import threading import queue import torch from utils.sam2_fuc import get_predictor from utils.yolo_fuc import get_yolo, get_bbox # 모델 및 장치 설정 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") torch.backends.cuda.matmul.allow_tf32 = True torch.backends.cudnn.allow_tf32 = True # SAM2와 YOLO 모델 초기화 sam2 = get_predictor() yolo = get_yolo() # 스트리밍 URL 및 상태 변수 url = "http://192.168.0.127:5000/video_feed" frame_queue = queue.Queue(maxsize=5) # 프레임 큐 if_init = False largest_bbox = None running = True # 실시간 스트림을 수신하는 스레드 def stream_frames(): global running print("스트리밍 시작...") stream = requests.get(url, stream=True, timeout=5) if stream.status_code == 200: byte_data = b"" for chunk in stream.iter_content(chunk_size=1024): if not running: break byte_data += chunk a = byte_data.find(b'\xff\xd8') b = byte_data.find(b'\xff\xd9') if a != -1 and b != -1: jpg = byte_data[a:b+2] byte_data = byte_data[b+2:] frame = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR) frame = cv2.flip(frame, 1) if not frame_queue.full(): frame_queue.put(frame) else: print(f"스트리밍 실패: {stream.status_code}") # YOLO 및 SAM2로 프레임 처리하는 스레드 def process_frames(): global running, if_init, largest_bbox print("프레임 처리 시작...") while running: if not frame_queue.empty(): frame = frame_queue.get() height, width = frame.shape[:2] # YOLO를 통해 가장 큰 객체 감지 if not largest_bbox: largest_bbox = get_bbox(frame) if largest_bbox: x1, y1, x2, y2, _, _ = largest_bbox cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) # SAM2 모델로 객체 세그멘테이션 with torch.autocast(device_type="cuda", dtype=torch.bfloat16): if largest_bbox and not if_init: sam2.load_first_frame(frame) bbox = np.array([[x1, y1], [x2, y2]], dtype=np.float32) _, out_obj_ids, out_mask_logits = sam2.add_new_prompt(frame_idx=0, obj_id=1, bbox=bbox) if_init = True elif if_init: out_obj_ids, out_mask_logits = sam2.track(frame) all_mask = torch.zeros((height, width), dtype=torch.uint8, device=device) for i in range(len(out_obj_ids)): out_mask = (out_mask_logits[i] > 0.0).byte() all_mask = torch.bitwise_or(all_mask, out_mask.squeeze(0)) all_mask = all_mask.cpu().numpy() * 255 all_mask = cv2.cvtColor(all_mask, cv2.COLOR_GRAY2BGR) frame = cv2.addWeighted(frame, 1, all_mask, 0.5, 0) # 최종 프레임 출력 cv2.imshow("YOLO Object Detection & SAM2 Segmentation", frame) if cv2.waitKey(1) & 0xFF == ord('q'): running = False break # 스레드 시작 stream_thread = threading.Thread(target=stream_frames) process_thread = threading.Thread(target=process_frames) stream_thread.start() process_thread.start() # 스레드 종료 대기 stream_thread.join() process_thread.join() # 리소스 정리 cv2.destroyAllWindows() print("프로그램 종료.")

 

최종 코드 보기 

 

https://github.com/khw11044/flask-SAM2-multi-thread

 

GitHub - khw11044/flask-SAM2-multi-thread

Contribute to khw11044/flask-SAM2-multi-thread development by creating an account on GitHub.

github.com

 

Contents

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.