新4年生用

Pythonを用いて測定機器を遠隔操作してみる

はじめに

このページではラップトップPCとオシロスコープをつないで、オシロスコープを遠隔操作する基本的な方法を紹介しています。

研究では大量のデータを連続的に取るという事がしばしば、必要とされます。

その際、測定機器をちまちま数時間あるいは数十時間いじりながら測定するのはさすがに苦なので、そんな単純作業はコンピュータに任せたいわけです。

でも、いざそれが必要となったときに0からやるとなると、何からやったらよいか分からない人も多いでしょうから、今回最初のきっかけになるようなことを皆さんにやってもらえればと思い、このページを作りました。

研究室には古い機器から新しい機器までありますから、すべて一様に操作できるというわけではありませんし、その機器ごとにマニュアルを少々見ながらコーディングする必要があります。

しかしながら、退屈な測定作業を自動化することで研究の効率を飛躍的に向上できますので、ぜひトライしてみて下さい。

今回は記事作成用に研究室に転がっていたRigolのDS1104Zを使いましたが、みなさんにはぜひ練習として別のオシロを使ってほしいです。

必要なモジュール、ドライバをインストールする

PyVISAのインストール

PyVISAとは?

「PyVISA」は、”VISA”という通信規格を用いて、Pythonコード上で計測器を制御するモジュールです。

インストールコマンドは「python -m pip install pyvisa」です。Anaconda Promptで実行します。

NI-VISAのインストール

NI-VISAとは?

VISA規格に沿って作成されたドライバソフトウェアは様々な団体からリリースされています。その中でも最も有名なNational Instruments社が開発した、NI-VISAをインストールします。

National Instrumentsの公式サイトへアクセスし、「バージョン」18.5を選択し、「エディション」はRuntimeを選択します。

バージョンを落としたのは、Full版ではなくRuntime版をインストールするためです。Runtime版がダウンロードできる可能な限り新しいバージョンを選択してください。

選択したらダウンロードしてインストールします。なお、ダウンロードにはログインが必要ですのでご注意ください。

遠隔操作の手順

以下が測定機器を遠隔操作する一般的な手順です。

  1. 測定機器をケーブル等でPCに接続する
  2. 接続アドレスを取得する
  3. 基本コマンドにて接続できているか確認する
  4. 機器のマニュアルを参照し、使用可能なSCPI(Standard Commands for Programmable Instruments)コマンドを調べる
  5. 使えるSCPIの範囲で制御を試みる

実際に遠隔操作してみる

PCとオシロスコープを接続する

オシロスコープの背面を見ると大抵USBの 2.0 Standard-B(Type-B)があるので、そこを使います。

ケーブルで接続ができたら、以下のコードをSpyderで実行します。

import pyvisa

rm = pyvisa.ResourceManager()
print(rm.list_resources())

ResourceManagerがPCと計測器をつなぐクラスです。

実行すると

('USB0::0x___::____::INSTR', 'ASRL1::INSTR', 'ASRL2::INSTR')

が帰ってくればOKです。

この「USB0::~::INSTR」をメモします。

それでは次に実際に接続します。以下のコードを実行します。

import pyvisa
import time
from datetime import datetime

inst = rm.open_resource('USB0::0x___::____::INSTR') #アドレスは先ほどメモしたものを用いる
print(inst.query('*IDN?'))

以下のように接続機器の情報が手に入ります。

RIGOL TECHNOLOGIES,DS1104Z Plus,DS1ZD243400937,00.04.05.SP2

使用可能なコマンドの確認

測定機器との接続が確立できましたので、続いてマニュアルを参照して、使用可能なSCPIコマンドを調べます。

今回用いたRIGOLのDS1104Zのマニュアルをネットで検索すると、DS1000Zシリーズ向けのマニュアルが見つかりましたので、pdfを落とし、これを参照してみます。

ブラウザでpdfを開いた場合「ctrl」+「G」でページ内検索できます。「scpi」で検索すると早いかも

マニュアルによると「:RUN」と「:STOP」、「:AUToscale」、「:CLEar」が使えるそうなので、実際にこれらが使えるか試します。

import pyvisa
import time
from datetime import datetime
import time

rm = pyvisa.ResourceManager()
inst = rm.open_resource('USB0::0x___::____::INSTR') #アドレスは先ほどメモしたものを用いる

inst.write(':STOP')
time.sleep(1)
inst.write(':RUN')

以上を実行すると、一度測定が停止し1秒後に測定を再開しました。

timeが見つからない場合、Anaconda promptで「python -m pip install time」でインストールします。

次にこれらを応用して少々複雑なものを作ってみます。

import pyvisa
import tkinter as tk
from tkinter import ttk, messagebox
import time
import threading


class OscilloscopeController:
    def __init__(self, resource_name=None):
        self.resource_name = resource_name
        self.instrument = None
        self.rm = pyvisa.ResourceManager()
        self.connected = False
        
    def connect(self):
        try:
            # 利用可能なリソースをリスト表示
            resources = self.rm.list_resources()
            if not resources:
                return False, "利用可能なVISAリソースが見つかりません"
            
            # リソース名が指定されていない場合は最初のリソースを使用
            if not self.resource_name:
                self.resource_name = resources[0]
                
            # オシロスコープに接続
            self.instrument = self.rm.open_resource(self.resource_name)
            self.instrument.timeout = 5000  # タイムアウトは5秒に設定
            
            # 接続確認
            idn = self.instrument.query("*IDN?")
                
            self.connected = True
            return True, f"正常に接続されました: {idn}"
        except Exception as e:
            return False, f"接続エラー: {str(e)}"
    
    def disconnect(self):
        if self.instrument:
            self.instrument.close()
        if self.rm:
            self.rm.close()
        self.connected = False
    
    def run(self):
        if not self.connected:
            return False, "オシロスコープに接続されていません"
        try:
            self.instrument.write(":RUN")
            return True, "測定開始"
        except Exception as e:
            return False, f"実行エラー: {str(e)}"
    
    def stop(self):
        if not self.connected:
            return False, "オシロスコープに接続されていません"
        try:
            self.instrument.write(":STOP")
            return True, "測定停止"
        except Exception as e:
            return False, f"停止エラー: {str(e)}"
    
    def autoscale(self):
        if not self.connected:
            return False, "オシロスコープに接続されていません"
        try:
            self.instrument.write(":AUToscale")
            return True, "自動スケール実行"
        except Exception as e:
            return False, f"自動スケールエラー: {str(e)}"
    
    def clear(self):
        if not self.connected:
            return False, "オシロスコープに接続されていません"
        try:
            self.instrument.write(":CLEar")
            return True, "画面クリア"
        except Exception as e:
            return False, f"クリアエラー: {str(e)}"
    
    # 複合コマンド
    def quick_check(self):
        """クイック波形チェックモード: RUN → AUToscale"""
        run_result = self.run()
        if not run_result[0]:
            return run_result
        time.sleep(0.5)  # コマンド間の遅延
        return self.autoscale()
    
    def freeze_and_clear(self):
        """フリーズ&クリアモード: STOP → CLEar"""
        stop_result = self.stop()
        if not stop_result[0]:
            return stop_result
        time.sleep(0.5)  # コマンド間の遅延
        return self.clear()
    
    def comparison_mode(self):
        """比較モード: STOP → 数秒待機 → RUN → AUToscale"""
        stop_result = self.stop()
        if not stop_result[0]:
            return stop_result
        return True, "比較モード: 現在の波形がフリーズされました。新しい波形を表示するにはクイック波形チェックを実行してください。"
    
    def continuous_monitoring(self, interval=5, callback=None):
        """連続モニタリングモード: 一定間隔でCLEar → RUN → AUToscale → STOP"""
        def monitoring_thread():
            while self.monitoring:
                self.clear()
                time.sleep(0.5)
                self.run()
                time.sleep(0.5)
                self.autoscale()
                time.sleep(1)
                self.stop()
                if callback:
                    callback(f"波形キャプチャ: {time.strftime('%H:%M:%S')}")
                # 次のサイクルまで待機
                for _ in range(interval):
                    if not self.monitoring:
                        break
                    time.sleep(1)
        
        self.monitoring = True
        thread = threading.Thread(target=monitoring_thread)
        thread.daemon = True
        thread.start()
        return True, f"連続モニタリング開始: {interval}秒間隔"
    
    def stop_monitoring(self):
        """連続モニタリングを停止"""
        self.monitoring = False
        return True, "連続モニタリングを停止しました"


class OscilloscopeGUI:
    def __init__(self, root):
        self.root = root
        self.root.title("DS-1000Z オシロスコープ コントローラー")
        self.root.geometry("600x400")
        self.root.resizable(True, True)
        
        self.controller = OscilloscopeController()
        self.monitoring_active = False
        
        self.create_widgets()
    
    def create_widgets(self):
        # メインフレーム
        main_frame = ttk.Frame(self.root, padding="10")
        main_frame.pack(fill=tk.BOTH, expand=True)
        
        # 接続フレーム
        conn_frame = ttk.LabelFrame(main_frame, text="接続設定", padding="10")
        conn_frame.pack(fill=tk.X, padx=5, pady=5)
        
        ttk.Label(conn_frame, text="リソース名:").grid(row=0, column=0, sticky=tk.W, padx=5, pady=5)
        self.resource_var = tk.StringVar()
        self.resource_entry = ttk.Entry(conn_frame, textvariable=self.resource_var, width=40)
        self.resource_entry.grid(row=0, column=1, sticky=tk.W+tk.E, padx=5, pady=5)
        
        self.connect_btn = ttk.Button(conn_frame, text="接続", command=self.connect)
        self.connect_btn.grid(row=0, column=2, padx=5, pady=5)
        
        self.disconnect_btn = ttk.Button(conn_frame, text="切断", command=self.disconnect, state=tk.DISABLED)
        self.disconnect_btn.grid(row=0, column=3, padx=5, pady=5)
        
        # リソース更新ボタン
        self.refresh_btn = ttk.Button(conn_frame, text="更新", command=self.refresh_resources)
        self.refresh_btn.grid(row=1, column=0, padx=5, pady=5)
        
        # リソースリスト
        self.resource_list = ttk.Combobox(conn_frame, width=40)
        self.resource_list.grid(row=1, column=1, columnspan=2, sticky=tk.W+tk.E, padx=5, pady=5)
        self.resource_list.bind("<<ComboboxSelected>>", self.on_resource_selected)
        
        # モードフレーム
        mode_frame = ttk.LabelFrame(main_frame, text="操作モード", padding="10")
        mode_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
        
        # モードボタン
        self.quick_check_btn = ttk.Button(mode_frame, text="クイック波形チェック\n(RUN → AUToscale)", 
                                       command=self.quick_check, state=tk.DISABLED)
        self.quick_check_btn.grid(row=0, column=0, padx=10, pady=10, sticky=tk.W+tk.E)
        
        self.freeze_clear_btn = ttk.Button(mode_frame, text="フリーズ&クリア\n(STOP → CLEar)", 
                                        command=self.freeze_and_clear, state=tk.DISABLED)
        self.freeze_clear_btn.grid(row=0, column=1, padx=10, pady=10, sticky=tk.W+tk.E)
        
        self.comparison_btn = ttk.Button(mode_frame, text="比較モード\n(STOP → 待機)", 
                                      command=self.comparison_mode, state=tk.DISABLED)
        self.comparison_btn.grid(row=1, column=0, padx=10, pady=10, sticky=tk.W+tk.E)
        
        self.monitoring_btn = ttk.Button(mode_frame, text="連続モニタリング開始\n(CLEar → RUN → AUToscale → STOP)", 
                                      command=self.toggle_monitoring, state=tk.DISABLED)
        self.monitoring_btn.grid(row=1, column=1, padx=10, pady=10, sticky=tk.W+tk.E)
        
        # 個別コマンドフレーム
        cmd_frame = ttk.LabelFrame(main_frame, text="個別コマンド", padding="10")
        cmd_frame.pack(fill=tk.X, padx=5, pady=5)
        
        self.run_btn = ttk.Button(cmd_frame, text="RUN", command=self.run_cmd, state=tk.DISABLED)
        self.run_btn.grid(row=0, column=0, padx=10, pady=5, sticky=tk.W+tk.E)
        
        self.stop_btn = ttk.Button(cmd_frame, text="STOP", command=self.stop_cmd, state=tk.DISABLED)
        self.stop_btn.grid(row=0, column=1, padx=10, pady=5, sticky=tk.W+tk.E)
        
        self.autoscale_btn = ttk.Button(cmd_frame, text="AUToscale", command=self.autoscale_cmd, state=tk.DISABLED)
        self.autoscale_btn.grid(row=0, column=2, padx=10, pady=5, sticky=tk.W+tk.E)
        
        self.clear_btn = ttk.Button(cmd_frame, text="CLEar", command=self.clear_cmd, state=tk.DISABLED)
        self.clear_btn.grid(row=0, column=3, padx=10, pady=5, sticky=tk.W+tk.E)
        
        # ステータスフレーム
        status_frame = ttk.Frame(main_frame)
        status_frame.pack(fill=tk.X, padx=5, pady=5)
        
        ttk.Label(status_frame, text="ステータス:").pack(side=tk.LEFT, padx=5)
        self.status_var = tk.StringVar(value="未接続")
        ttk.Label(status_frame, textvariable=self.status_var).pack(side=tk.LEFT, padx=5)
        
        # 初期リソース一覧の取得
        self.refresh_resources()
    
    def refresh_resources(self):
        try:
            # 既存のリソースマネージャーがあれば閉じる
            if hasattr(self, 'temp_rm') and self.temp_rm:
                self.temp_rm.close()
                
            self.temp_rm = pyvisa.ResourceManager()
            resources = self.temp_rm.list_resources()
            # ここではcloseしない
            self.resource_list['values'] = resources
            if resources:
                self.resource_list.current(0)
                self.resource_var.set(resources[0])
        except Exception as e:
            messagebox.showerror("エラー", f"リソース一覧の取得に失敗しました: {str(e)}")
    
    def on_resource_selected(self, event):
        selected = self.resource_list.get()
        if selected:
            self.resource_var.set(selected)
    
    def connect(self):
        resource = self.resource_var.get()
        if not resource:
            messagebox.showwarning("警告", "リソース名を入力または選択してください")
            return
        
        # 既存の接続を確実に閉じる
        if self.controller.connected:
            self.controller.disconnect()
        
        # 新しいコントローラーを作成して接続
        self.controller = OscilloscopeController(resource)
        success, message = self.controller.connect()
        
        if success:
            self.status_var.set(message)
            self.connect_btn.config(state=tk.DISABLED)
            self.disconnect_btn.config(state=tk.NORMAL)
            self.enable_buttons()
        else:
            messagebox.showerror("接続エラー", message)
            self.status_var.set("未接続")
    
    def disconnect(self):
        self.controller.disconnect()
        self.status_var.set("切断されました")
        self.connect_btn.config(state=tk.NORMAL)
        self.disconnect_btn.config(state=tk.DISABLED)
        self.disable_buttons()
        
        # モニタリングがアクティブなら停止
        if self.monitoring_active:
            self.controller.stop_monitoring()
            self.monitoring_active = False
            self.monitoring_btn.config(text="連続モニタリング開始\n(CLEar → RUN → AUToscale → STOP)")
    
    def enable_buttons(self):
        for btn in [self.quick_check_btn, self.freeze_clear_btn, self.comparison_btn, 
                   self.monitoring_btn, self.run_btn, self.stop_btn, 
                   self.autoscale_btn, self.clear_btn]:
            btn.config(state=tk.NORMAL)
    
    def disable_buttons(self):
        for btn in [self.quick_check_btn, self.freeze_clear_btn, self.comparison_btn, 
                   self.monitoring_btn, self.run_btn, self.stop_btn, 
                   self.autoscale_btn, self.clear_btn]:
            btn.config(state=tk.DISABLED)
    
    def update_status(self, success, message):
        if success:
            self.status_var.set(message)
        else:
            messagebox.showerror("エラー", message)
            self.status_var.set("エラー発生")
    
    # モードボタンのコマンド
    def quick_check(self):
        success, message = self.controller.quick_check()
        self.update_status(success, message)
    
    def freeze_and_clear(self):
        success, message = self.controller.freeze_and_clear()
        self.update_status(success, message)
    
    def comparison_mode(self):
        success, message = self.controller.comparison_mode()
        self.update_status(success, message)
    
    def toggle_monitoring(self):
        if not self.monitoring_active:
            # モニタリング開始
            def status_callback(message):
                self.root.after(0, lambda: self.status_var.set(message))
            
            success, message = self.controller.continuous_monitoring(interval=5, callback=status_callback)
            if success:
                self.monitoring_active = True
                self.monitoring_btn.config(text="連続モニタリング停止")
                self.update_status(success, message)
        else:
            # モニタリング停止
            success, message = self.controller.stop_monitoring()
            self.monitoring_active = False
            self.monitoring_btn.config(text="連続モニタリング開始\n(CLEar → RUN → AUToscale → STOP)")
            self.update_status(success, message)
    
    # 個別コマンドボタン
    def run_cmd(self):
        success, message = self.controller.run()
        self.update_status(success, message)
    
    def stop_cmd(self):
        success, message = self.controller.stop()
        self.update_status(success, message)
    
    def autoscale_cmd(self):
        success, message = self.controller.autoscale()
        self.update_status(success, message)
    
    def clear_cmd(self):
        success, message = self.controller.clear()
        self.update_status(success, message)


if __name__ == "__main__":
    root = tk.Tk()
    app = OscilloscopeGUI(root)
    root.mainloop()

    
  • デバイスの検知および接続状況確認機能
  • 操作モード4つ
  • 各コマンドのボタン
  • 現在のステータス表示機能

これらを実装しました。

以上おおまかな流れでした。

皆さんも研究室にある色々な測定機器で試してみて下さい。かなり古いものでも意外と動きます。