본문 바로가기
BackEnd

eBUS SDK Python API를 활용한 Jai 카메라 다중 채널 스트리밍 구현

by E_van 2025. 1. 3.

이 글에서는 Python과 PyQt5를 사용하여 eBUS SDK Python API를 기반으로 Jai 카메라와의 다중 채널 카메라 스트리밍 애플리케이션을 개발하는 과정을 설명합니다. 이 프로젝트는 여러 카메라에서 실시간 스트리밍 데이터를 수집하고, 이를 PyQt5 기반 UI에서 표시하며, 각 채널의 노출 시간을 조정할 수 있는 기능을 포함합니다. Jai 카메라를 통해 수집한 이미지를 활용한 딥러닝 작업을 진행하기에 앞서 개발중에 있는 카메라 관련 초기 개발모델에 대해서 정의합니다.

 

프로젝트 구성

프로젝트는 다음 세 가지 주요 파일로 구성되어 있습니다:

  1. main.py: 애플리케이션의 진입점으로, PyQt5 애플리케이션을 초기화하고 주요 클래스와 UI를 실행합니다.
  2. device_manager.py: eBUS SDK를 활용해 장치를 연결하고 데이터를 수집하는 핵심 로직이 포함된 클래스입니다.
  3. multi_camera_stream_app.py: PyQt5 기반 UI를 제공하며, 여러 카메라 스트림을 표시하고 각 채널의 설정을 관리합니다.

주요 구현

1. main.py: 애플리케이션 초기화

def main():
    app = QApplication([])

    # eBUSDeviceManager를 초기화하고 장치 연결
    device_manager = eBUSDeviceManager()

    try:
        device_manager.connect(num_channels=3)
        device_manager.start_acquisition()

        # UI 초기화 및 실행
        stream_app = MultiChannelStreamApp(device_manager=device_manager, num_channels=3)
        stream_app.show()
        sys.exit(app.exec_())
    except Exception as e:
        print(f"오류 발생: {e}")
    finally:
        device_manager.cleanup()

이 코드는 애플리케이션을 초기화하고 device_manager를 통해 카메라를 연결 및 스트리밍을 시작합니다. UI는 MultiChannelStreamApp 클래스를 사용하여 생성됩니다.


2. device_manager.py: 장치 연결 및 데이터 관리

주요 클래스: eBUSDeviceManager

  • 장치 연결 (connect_to_device)
def connect_to_device(self, connection_ID):
    print("장치에 연결 중...")
    result, device = eb.PvDevice.CreateAndConnect(connection_ID)
    if not device:
        raise Exception(f"연결 실패: {result.GetCodeString()} ({result.GetDescription()})")
    print("장치가 연결되었습니다.")
    return device

connection_ID를 사용해 PvDevice.CreateAndConnect를 통해 장치를 연결합니다.

  • 채널 구성 (configure_channels)
def configure_channels(self, device, num_channels=3):
        params = device.GetParameters()
        channel_selector = params.Get("GevStreamChannelSelector")
        scp_port = params.Get("GevSCPHostPort")
        gev_scda = params.Get("GevSCDA")

        if not gev_scda or not scp_port or not channel_selector:
            raise Exception("매개변수 누락: GevSCDA, GevSCPHostPort, 또는 GevStreamChannelSelector.")

        for channel in range(num_channels):
            print(f"채널 {channel} 구성 중...")
            if not channel_selector.SetValue(channel).IsOK():
                raise Exception(f"채널 {channel} 선택 실패")

            assigned_port = 60000 + channel
            if scp_port.IsWritable() and not scp_port.SetValue(assigned_port).IsOK():
                raise Exception(f"채널 {channel}에 대한 GevSCPHostPort 설정 실패")

            ip_address = (255 << 24) | (255 << 16) | (100 << 8) | 100
            if gev_scda.IsWritable() and not gev_scda.SetValue(ip_address).IsOK():
                raise Exception(f"채널 {channel}에 대한 GevSCDA IP 설정 실패")

채널 선택(GevStreamChannelSelector), 호스트 포트(GevSCPHostPort)와 IP 주소(GevSCDA) 설정.

 

  • 스트림 열기 (open_stream)
def open_stream(self, connection_ID, channel):
    print(f"채널 {channel}에 대한 스트림 열기 중...")
    stream = eb.PvStreamGEV()
    if not stream.Open(connection_ID, 60000 + channel).IsOK():
        raise Exception(f"채널 {channel}에 대한 스트림 열기 실패")
    print(f"채널 {channel}에 대한 스트림이 열렸습니다.")
    return stream

PvStreamGEV를 사용해 각 채널의 스트림을 열고 포트를 기반으로 스트림을 구성합니다.
- PvStreamGEV는 eBUS SDK의 스트림 객체로, 스트림 데이터 전송을 처리합니다.

  • 파이프라인 구성 (configure_pipeline)
   def configure_pipeline(self, device, stream):
        print("파이프라인 초기화 중...")
        pipeline = eb.PvPipeline(stream)
        pipeline.SetBufferCount(16)
        pipeline.SetBufferSize(device.GetPayloadSize())
        pipeline.Start()
        print("파이프라인이 시작되었습니다.")
        return pipeline

스트림 데이터를 처리하기 위한 버퍼 크기와 버퍼 수를 설정합니다.

  • 프레임 수집 (fetch_frame)
 def fetch_frame(self, channel, timeout=5000):
        if not self.connected:
            print(f"장치가 연결되지 않았습니다. 채널={channel}")
            return None

        pipeline = self.pipelines[channel]
        for _ in range(3):
            result, buffer, op_result = pipeline.RetrieveNextBuffer(timeout)
            if result.IsOK() and op_result.IsOK() and buffer.GetPayloadType() == eb.PvPayloadTypeImage:
                image = buffer.GetImage()
                frame_data = np.array(image.GetDataPointer(), dtype=np.uint8).reshape((image.GetHeight(), image.GetWidth(), -1))
                pipeline.ReleaseBuffer(buffer)
                return frame_data
            pipeline.ReleaseBuffer(buffer)
        print(f"채널 {channel}에 대한 프레임 가져오기 실패.")
        return None

지정된 채널에서 스트림 데이터를 수집하고 이미지를 반환합니다.

  • 노출시간 수집/설정
def get_exposure(self, channel):
        if not self.connected:
            raise Exception("장치가 연결되지 않았습니다.")

        params = self.device.GetParameters()
        if not params.Get("GevStreamChannelSelector").SetValue(channel).IsOK():
            raise Exception(f"채널 {channel} 선택 실패")

        result, exposure_value = params.Get("ExposureTime").GetValue()
        if not result.IsOK():
            raise Exception(f"채널 {channel}에 대한 노출 시간 가져오기 실패")
        return exposure_value

    def set_exposure(self, channel, exposure_time):
        if not self.connected:
            raise Exception("장치가 연결되지 않았습니다.")

        params = self.device.GetParameters()
        if not params.Get("GevStreamChannelSelector").SetValue(channel).IsOK():
            raise Exception(f"채널 {channel} 선택 실패")

        exposure_param = params.Get("ExposureTime")
        if not exposure_param.SetValue(exposure_time).IsOK():
            raise Exception(f"채널 {channel}에 대한 노출 시간 설정 실패")

        result, current_exposure = exposure_param.GetValue()
        if result.IsOK():
            print(f"채널 {channel}에 대한 노출 시간이 {current_exposure}로 설정되었습니다.")

채널별 현재 노출값을 가져오거나 설정할 수 있습니다.


3. multi_camera_stream_app.py: UI 구현

주요 클래스: MultiChannelStreamApp

  • UI 초기화 (init_ui)
def init_ui(self):
    self.setWindowTitle("다중 채널 스트림 뷰어")
    self.central_widget = QWidget()
    self.grid_layout = QGridLayout(self.central_widget)
    self.image_labels = [self.create_image_label(i) for i in range(self.num_channels)]

각 카메라 채널에 대한 이미지 뷰어를 생성하고 UI에 배치합니다.

  • 노출 시간 설정
def update_exposure_settings(self, channel, input_field, current_label):
    exposure = float(input_field.text())
    self.device_manager.set_exposure(channel, exposure)
    current_label.setText(f"현재 노출 값: {exposure}")

사용자가 입력한 값을 장치에 전달하여 채널의 노출 시간을 설정합니다.

  • 프레임 업데이트
def update_frames(self):
    for i in range(self.num_channels):
        frame = self.device_manager.fetch_frame(i)
        if frame is not None:
            self.display_frame(frame, self.image_labels[i])

각 채널의 프레임을 수집하여 UI에 표시합니다.


실행 화면

  1. 다중 채널 스트림 뷰어
    • 각 채널의 스트림 데이터가 실시간으로 표시됩니다.
    • 각 채널의 노출 시간을 조정할 수 있는 입력 필드와 버튼이 제공됩니다.
  2. 노출 시간 조정
    • 사용자가 노출 시간을 입력하고 설정 버튼을 클릭하면 해당 채널의 설정이 업데이트됩니다.

마무리

이 프로젝트는 다중 채널 카메라 스트리밍을 하나의 UI에서 관리하고자 하는 목적으로 개발하게 되었으며, eBUS SDK를 활용한 개발에 있어서 어떤 방식으로 개발하면 될지에 대한 하나의 접근법에 대해서 설명하고자 정리해 보았습니다.

추가 자료