Page tree

Основные функции

Скрипт для проверки доступности камер и других устройств посредством команды "Ping".

Установка

  • Перейти в автоматизацию, нажать «Загрузить пример» и выбрать «Из файла», указав путь к скрипту.
  • Снять галочку «Включить скрипт».
  • Выполнить настройку скрипта.
  • Нажать "Сохранить".
  • Активировать галочку "Включить скрипт".


Позволяет пропинговать устройства с сервера.

Результат ping сохраняется в файл ping.log в скриншота.


# Результат ping сохраняется в файл ping_log.txt в скриншотах
'''
<parameters>
    <company>DSSL</company>
    <title>ping</title>
    <version>2.3</version>
    <parameter>
        <id>IP</id>
        <name>Введите IP адрес</name>
        <type>string</type>
    </parameter>
    <parameter>
        <id>NUM_PAC</id>
        <name>Введите кол-во пакетов (0 - бесконечно)</name>
        <type>integer</type>
        <value>4</value>
        <min>0</min>
        <max>9999</max>
    </parameter>
    <parameter>
        <id>SIZE_PAC</id>
        <name>Введите величину пакета</name>
        <type>integer</type>
        <value>32</value>
        <min>1</min>
        <max>99999</max>
    </parameter>
    <parameter>
        <id>DELAY</id>
        <name>Чаcтота запроса Ping (сек)</name>
        <type>integer</type>
        <value>2</value>
        <min>1</min>
        <max>99999</max>
    </parameter>
</parameters>
'''


import os
import threading
import subprocess
from functools import wraps
from time import strftime, time, localtime


def _run_as_thread(fn):
    @wraps(fn)
    def run(*args, **kwargs):
        t = threading.Thread(target=fn, args=args, kwargs=kwargs)
        t.daemon = True
        t.start()
        return t
    return run


class Ping():
    def __init__(self, count, ip, size, delay):
        self.num = 1
        self.count_ping = count
        self.ip = ip
        self.size = size
        self.delay = delay
        self.path = os.path.join(settings("system_wide_options")["screenshots_folder"], '%s.log'%settings("scripts/%s"%script_guid)["name"])
        self.max_rows = 5000
        if not os.path.exists(settings("system_wide_options")["screenshots_folder"]):
            raise ValueError('Not found folder "screenshot"!')
        if os.name == 'nt':
            self.ping = 'ping -n %s %s -l %s' % (1, self.ip, self.size)
        else:
            self.ping = 'ping -s %s -c %s %s' % (self.size, 1, self.ip)
        self.work()

    @_run_as_thread
    def cmd(self):
        p = subprocess.Popen(self.ping, stdout=subprocess.PIPE, shell=True)
        p.wait()
        self.call = p.stdout.read().decode('cp866').encode('utf -8')
        self.parce_call()
        self.write()

    def write(self):
        message(self.call)
        with open(self.path, 'a') as f:
            f.write('%s: %s\n\n' % (strftime('%d.%m.%Y %H:%M:%S', localtime()), self.call))
        self.check_line()

    def parce_call(self):
        if not self.call:
            self.call = 'Specified node is not available!'
        else:
            s = self.call.replace('\r', '').split('\n')
            self.call = '\n' + '\n'.join(x for x in s if x != '')

    def check_line(self):
        with open(self.path, 'rb') as f:
            text = f.read().split("\n")
        if len(text) > self.max_rows:
            text = text[len(text) - self.max_rows:]
            with open(self.path, 'wb') as f:
                f.write("\n".join(text))

    def work(self):
        self.cmd()
        if self.num == self.count_ping:
            self.call = '\nWas done:\n\tcount: %s\n\tpacket size: %s\n\tip: %s' % (self.count_ping, self.size, self.ip)
            self.write()
            return
        self.num += 1
        timeout(1000 * self.delay, self.work)


if not IP:
    raise ValueError('Not ip for ping')

png = Ping(NUM_PAC, IP, SIZE_PAC, DELAY)



Скачать