Source code for nodeworks.nodewidget

# -*- coding: utf-8 -*-
"""
This file is part of the nodeworks core library

Licence
-------
As a work of the United States Government, this project is in the public domain
within the United States. As such, this code is licensed under
CC0 1.0 Universal public domain.

Please see the LICENSE.md for more information.
"""

from functools import reduce
import atexit
import copy
import inspect
import json
import os
import re
import time
import warnings
from typing import Any, List, Mapping, Optional

from qtpy import QtWidgets, QtCore, QtGui
from PyQt5.QtCore import QEvent
from PyQt5.QtGui import QFocusEvent, QMouseEvent, QResizeEvent
from PyQt5.QtWidgets import QGraphicsSceneMouseEvent

from nodeworks.node import Node, NodeGraphic, Connection, ControlPoint
from nodeworks.nodelibrary import NodeLibrary
from nodeworks.tools.general import (
    color_func, TABLEAU20, get_icon, isiterable, CustomEncoder, CustomDecoder,
    get_unique_name)
from nodeworks.tools import parworker, WorkerThread
from nodeworks.widgets.basewidgets import SearchWidget

# Zoom factor
ZOOM_IN_FACTOR = 1.25
ZOOM_OUT_FACTOR = 1 / ZOOM_IN_FACTOR


def is_node_class(cls):
    try:
        if not issubclass(cls, Node):
            return False
    except:
        return False
    return hasattr(cls, 'name')


[docs]class GraphicsScene(QtWidgets.QGraphicsScene): ''' Custom QGraphicsScene. ''' def __init__(self, parent=None): QtWidgets.QGraphicsScene.__init__(self, parent) self.parent = parent self.topLayer = 0 self.maintainSelection = False self.changed.connect(self.sceneChanged) self.dragging = False # def updateSceneBounds(self): # ''' # Update the scene bounds to fit all the Nodes. # ''' # rect = self.itemsBoundingRect() # rect.adjust( -1000, -1000, 1000, 1000) # self.setSceneRect(rect) # def addItem(self, item): # QtWidgets.QGraphicsScene.addItem(self, item) # #self.updateSceneBounds()
[docs] def focusOutEvent(self, event: QFocusEvent): if event.reason() != QtCore.Qt.PopupFocusReason: if not self.maintainSelection: self.clearSelection() self.parent.view.clearFocus() QtWidgets.QGraphicsScene.focusOutEvent(self, event)
[docs] def focusInEvent(self, event: QFocusEvent): if self.parent.parentNode: self.parent.parentNode.parent.deselectAll() QtWidgets.QGraphicsScene.focusInEvent(self, event)
[docs] def mousePressEvent(self, event: QGraphicsSceneMouseEvent): self.dragging = True QtWidgets.QGraphicsScene.mousePressEvent(self, event)
[docs] def mouseReleaseEvent(self, event: QGraphicsSceneMouseEvent): if not self.dragging: if self.selectedItems(): if event.modifiers() != QtCore.Qt.ControlModifier: self.clearSelection() self.dragging = False QtWidgets.QGraphicsScene.mouseReleaseEvent(self, event)
def sceneChanged(self) -> None: if self.parent.parentNode: self.parent.parentNode.nodeGraphic.update() def __getstate__(self): pass def __setstate__(self, state): pass
[docs]class StructureScene(GraphicsScene): ''' Custom QGraphicsScene for structures. ''' def __init__(self, parent=None): GraphicsScene.__init__(self, parent)
# def updateSceneBounds(self): # pass
[docs]class GraphicsView(QtWidgets.QGraphicsView): ''' Custom QGraphicsScene. ''' rightClickEvent = QtCore.Signal(object, object) keyboardPressEvent = QtCore.Signal(object, object) keyboardSearchEvent = QtCore.Signal(object, object) mouseDragEvent = QtCore.Signal() def __init__(self, scene: GraphicsScene, parent: None = None) -> None: QtWidgets.QGraphicsView.__init__(self, parent) self.setScene(scene) self.setRenderHint(QtGui.QPainter.Antialiasing) self.setViewportUpdateMode(QtWidgets.QGraphicsView.FullViewportUpdate) self.setDragMode(QtWidgets.QGraphicsView.RubberBandDrag) self.setHorizontalScrollBarPolicy(QtCore.Qt.ScrollBarAlwaysOff) self.setVerticalScrollBarPolicy(QtCore.Qt.ScrollBarAlwaysOff) self.setUpdatesEnabled(True) self.setMouseTracking(True) self.setCacheMode(QtWidgets.QGraphicsView.CacheBackground) self.setTransformationAnchor(QtWidgets.QGraphicsView.AnchorUnderMouse) self._isPanning = False self._mousePressed = False self._mousePressedPos = None self._screenMousePos = None self._viewMousePos = None self.conn: Optional[Any] = None self.currentScale = 1 self.scale(self.currentScale, self.currentScale) def __getstate__(self): pass def __setstate__(self, state): pass def showAll(self): self.fitInView( self.scene().itemsBoundingRect(), QtCore.Qt.KeepAspectRatio) def centerOnItem(self, item): self.centerOn(item.rect().center())
[docs] def mousePressEvent(self, event: QMouseEvent): ''' Catch mouse events. ''' self._mousePressed = True self._mousePressedPos = event.pos() self._screenMousePos = QtGui.QCursor.pos() if event.button() == QtCore.Qt.MiddleButton: self._isPanning = True self._dragPos = event.pos() return elif event.button() == QtCore.Qt.RightButton: if self.items(event.pos()): QtWidgets.QGraphicsView.mousePressEvent(self, event) else: self.rightClickEvent.emit( event.globalPos(), self.mapToScene(event.pos())) elif event.button() == QtCore.Qt.LeftButton: if self.itemAt(event.pos()): self.setDragMode(QtWidgets.QGraphicsView.NoDrag) else: self.setDragMode(QtWidgets.QGraphicsView.RubberBandDrag) QtWidgets.QGraphicsView.mousePressEvent(self, event) else: QtWidgets.QGraphicsView.mousePressEvent(self, event)
[docs] def wheelEvent(self, event): ''' Handle zooming with the mouse wheel. ''' self._mousePressed = False if not self._isPanning and event.modifiers() == QtCore.Qt.ControlModifier: # Set Anchors self.setTransformationAnchor(QtWidgets.QGraphicsView.NoAnchor) self.setResizeAnchor(QtWidgets.QGraphicsView.NoAnchor) # Save the scene pos oldPos = self.mapToScene(event.pos()) # Zoom if event.angleDelta().y() > 0: zoomFactor = ZOOM_IN_FACTOR else: zoomFactor = ZOOM_OUT_FACTOR self.scale(zoomFactor, zoomFactor) self.currentScale *= zoomFactor # Get the new position newPos = self.mapToScene(event.pos()) # Move scene to old position delta = newPos - oldPos self.translate(delta.x(), delta.y()) else: QtWidgets.QGraphicsView.wheelEvent(self, event)
def set_zoom(self, zoom=None): # reset zoom if zoom is None: self.resetTransform() return if zoom > 0: zoomFactor = ZOOM_IN_FACTOR else: zoomFactor = ZOOM_OUT_FACTOR self.scale(zoomFactor, zoomFactor) self.currentScale *= zoomFactor
[docs] def mouseReleaseEvent(self, event: QMouseEvent): self._mousePressed = False if self._isPanning: self._isPanning = False if self.conn is not None: for item in self.items(event.pos()): if hasattr(item, 'dropConnectionEvent'): item.dropConnectionEvent(self.mapToScene(event.pos())) if self.conn is not None: self.conn.removeConnections() self.parent().newConnect = None self.conn = None QtWidgets.QGraphicsView.mouseReleaseEvent(self, event)
[docs] def mouseMoveEvent(self, event: QMouseEvent): self._viewMousePos = event.pos() if self._isPanning: newPos = event.pos() diff = newPos - self._dragPos self._dragPos = newPos self.horizontalScrollBar().setValue(self.horizontalScrollBar().value() - diff.x()) self.verticalScrollBar().setValue(self.verticalScrollBar().value() - diff.y()) event.accept() elif self._mousePressed and self.conn is not None: self.conn.updateLine(mousePos=self.mapToScene(event.pos())) QtWidgets.QGraphicsView.mouseMoveEvent(self, event) else: QtWidgets.QGraphicsView.mouseMoveEvent(self, event)
[docs] def keyPressEvent(self, event): ''' Handle key press events ''' key = event.key() # delete key if (key == QtCore.Qt.Key_Delete) and self.scene().selectedItems(): self.keyboardPressEvent.emit(key, None) # Modifier elif event.modifiers() == QtCore.Qt.ControlModifier: self.keyboardPressEvent.emit(key, None) elif (self._mousePressedPos is not None and not self.items(self._mousePressedPos) and re.findall('[a-z]', event.text(), re.IGNORECASE)): self.keyboardSearchEvent.emit(self._mousePressedPos, event.text()) QtWidgets.QGraphicsView.keyPressEvent(self, event)
[docs] def leaveEvent(self, event: QEvent): self._viewMousePos = None QtWidgets.QGraphicsView.leaveEvent(self, event)
[docs] def save_to_image(self, fname, scale=1): """Save the image to a file. """ scene = self.scene() rect = scene.itemsBoundingRect() + QtCore.QMarginsF(10, 10, 10, 10) target_rect = QtCore.QRect(0, 0, rect.width()*scale, rect.height()*scale) image = QtGui.QImage(target_rect.size(), QtGui.QImage.Format_ARGB32) image.fill(QtCore.Qt.transparent) painter = QtGui.QPainter(image) painter.setRenderHint(QtGui.QPainter.Antialiasing) scene.render(painter, QtCore.QRectF(target_rect), # target QtCore.QRectF(rect), # source QtCore.Qt.KeepAspectRatio) painter.end() image.save(fname)
[docs]class PreviewView(QtWidgets.QGraphicsView): ''' Custom QGraphicsScene. ''' sizeChangeEvent = QtCore.Signal() def __init__(self, scene: GraphicsScene, parent: Optional['NodeWidget'] = None, view: Optional[GraphicsView] = None): QtWidgets.QGraphicsView.__init__(self, parent) self.setScene(scene) self.view = view self.setHorizontalScrollBarPolicy(QtCore.Qt.ScrollBarAlwaysOff) self.setVerticalScrollBarPolicy(QtCore.Qt.ScrollBarAlwaysOff)
[docs] def mouseDoubleClickEvent(self, event): event.ignore()
[docs] def mouseMoveEvent(self, event): event.ignore()
[docs] def mousePressEvent(self, event): event.ignore()
[docs] def mouseReleaseEvent(self, event): if self.view is not None: self.view.centerOn(self.mapToScene(event.pos()))
[docs] def resizeEvent(self, event): self.sizeChangeEvent.emit()
[docs]class SearchNodeWidget(SearchWidget): ''' Provide a search interface for the nodes ''' newNodeEvent = QtCore.Signal(object, object) def __init__(self, parent: Optional['NodeWidget'] = None, nodelist: List[Any] = []) -> None: SearchWidget.__init__(self, parent=parent, searchList=nodelist) self.additionalItemListOffset = QtCore.QPoint(-1, 5) self.searchBar.setMinimumWidth(300) self.itemListWidget.setTextElideMode(QtCore.Qt.ElideLeft) self.itemListWidget.horizontalScrollBar().setEnabled(False) self.itemListWidget.setUniformItemSizes(True) fm = self.itemListWidget.fontMetrics() self.itemListWidget.setGridSize(QtCore.QSize(100, fm.height())) def setNodeList(self, nodelist): self.setSearchList(nodelist) def newValue(self): item = self.itemListWidget.currentItem() if item or self.searchBar.text() in self.searchList: if item: self.searchBar.setText(item.text()) pos = self.parent().view.mapToScene(copy.deepcopy(self.pos())) value = str(self.searchBar.text()) if value in self.searchList: self.newNodeEvent.emit(value.split('.'), pos) self.hide() def onEscape(self): self.hide()
[docs] def hide(self): self.searchBar.clear() self.searchBar.hide() self.itemListWidget.hide() SearchWidget.hide(self) self.positionSet = False
[docs]class NodeWidget(QtWidgets.QWidget): ''' Main widget for the displaying and editing a Node Chart. ''' networkChangeEvent = QtCore.Signal() sizeChangeEvent = QtCore.Signal() needsSavedEvent = QtCore.Signal() def __init__(self, parent: None = None, parentNode: None = None, showtoolbar: bool = True, onlypositivecoords: bool = False, threaded: bool = False, prompt: bool = True) -> None: QtWidgets.QWidget.__init__(self, parent) # variables self.nodeDict: Mapping[str, Any] = {} self.mousePos = None self.newConnect = None self.autorun = False self.stop = False self.onlypositivecoords = onlypositivecoords self.name = None self.parentNode = parentNode self.threaded = threaded self.runThread = None self.running = False self.pause = True self.localClipboard = None self.runningLabel = QtWidgets.QLabel("running") self.hasForceRunNode = False self.dependencyDict: Mapping[str, Any] = {} self.prompt = prompt self.unsaved_flag = False self.save_path = './' self.file_path = './' # the key to the dictionary should match the type return from # the first value is a color from the palette defined in tools.y # the second value is the pen style - please restrict the style to # SolidLine, DashLine, DottedLine self.typeColorDict = { 'int': [color_func(TABLEAU20[0]), QtCore.Qt.SolidLine], 'float': [color_func(TABLEAU20[2]), QtCore.Qt.SolidLine], 'bool': [color_func(TABLEAU20[4]), QtCore.Qt.SolidLine], 'str': [color_func(TABLEAU20[6]), QtCore.Qt.SolidLine], 'list': [color_func(TABLEAU20[8]), QtCore.Qt.SolidLine], 'dict': [color_func(TABLEAU20[10]), QtCore.Qt.SolidLine], 'set': [color_func(TABLEAU20[12]), QtCore.Qt.SolidLine], 'numpy.ndarray': [color_func(TABLEAU20[14]), QtCore.Qt.SolidLine], 'pandas.core.frame.DataFrame': [color_func(TABLEAU20[16]), QtCore.Qt.SolidLine], 'default': [color_func(TABLEAU20[18]), QtCore.Qt.SolidLine], } self.style = { 'connection': { 'line': 'cubic', # either 'cubic', 'line' 'line-offset': 50, # any integer 'line-color': 'type', #Qt color or 'type' 'line-selected': QtCore.Qt.darkGray, 'line-width': 2, # Thickness of the line 'line-selected-width': 5, # Thickness of the selection 'endpoints-color': 'type', #Qt color or 'type' 'endpoints-shape': 'square', # either 'square', 'circle' 'endpoints-size': 10, # size of the end point }, 'terminal': { 'endpoints-size': 10, # size of the end point }, 'node': { 'color': 'grey', 'title-color': 'black', 'title-background': '#DCDCDC', 'title': True, 'shadow': True, } } # Add type colors to style for key in self.style: self.style[key].update(self.typeColorDict) # Create main objects self.toolbar = QtWidgets.QWidget(self) self.scene = GraphicsScene(self) self.view = GraphicsView(self.scene) self.view.rightClickEvent.connect(self.showRightClickMenu) self.view.keyboardPressEvent.connect(self.keyboardPress) self.view.keyboardSearchEvent.connect(self.keyboardSearch) # if you set the scene rect it won't resize dynamically # however if you add a rect and clear it will initialize to 10k # and resize dynamically # self.scene.setSceneRect(QtCore.QRectF(0, 0, 10000, 10000)) self.scene.addRect(0, 0, 10000, 10000) self.view.centerOn(self.scene.width()/2, self.scene.height()/2) self.scene.clear() self.mainLayout = QtWidgets.QGridLayout(self) self.mainLayout.setContentsMargins(0, 0, 0, 0) self.mainLayout.setSpacing(0) # grid self.snapToGrid = False self.gridSize = 10 self.debugMode = False self.step = 2 self.preview: Optional[Any] = None self.cut = False if showtoolbar: self.mainLayout.addWidget(self.toolbar, 1, 1, 1, 1) self.mainLayout.addWidget(self.view, 2, 1, 1, 1) self.mainLayout.addWidget(self.runningLabel, 3, 1, 1, 1) self.runningLabel.hide() # Node Library self.nodeLibrary = NodeLibrary() #self.exeCount = 0 self.firstRun = True # Build other items self.__initToolbar__() self.__initRightClickMenu__() self.__initSearchWidget__() self.parworker = None def launchQueue(self): # launch the queue - only on the top level nodewidget if self.parentNode is None: self.parworker = parworker.workerContainer() self.parworker._startWorkers() atexit.register(self.stopQueue) def stopQueue(self): if self.parworker: self.parworker._stopWorkers() self.parworker = None def __getstate__(self): return (self.nodeDict, self.threaded, self.firstRun, self.dependencyDict, self.debugMode, self.stop, self.pause, self.autorun, self.hasForceRunNode, self.parentNode) def __setstate__(self, state): (self.nodeDict, self.threaded, self.firstRun, self.dependencyDict, self.debugMode, self.stop, self.pause, self.autorun, self.hasForceRunNode, self.parentNode) = state def __initToolbar__(self) -> None: self.toolbarLayout = QtWidgets.QHBoxLayout(self.toolbar) self.toolbarLayout.setContentsMargins(0, 0, 0, 0) self.toolbarLayout.setSpacing(0) rb = self.runToolButton = QtWidgets.QToolButton() rb.setIcon(get_icon('play')) rb.setToolTip('run/pause') rb.clicked.connect(self.runPause) ar = self.autorunToolButton = QtWidgets.QToolButton() ar.setIcon(get_icon('repeat')) ar.setToolTip('autorun') ar.released.connect(self.autorunToggle) ar.setCheckable(True) st = self.stopToolButton = QtWidgets.QToolButton() st.setIcon(get_icon('stop')) st.setToolTip('stop') st.clicked.connect(self.stopRun) sp = self.stepToolButton = QtWidgets.QToolButton() sp.setIcon(get_icon('step')) sp.setToolTip('step (requires debug mode)') sp.clicked.connect(self.stepThrough) sp.setEnabled(False) db = self.debugToolButton = QtWidgets.QToolButton() db.setIcon(get_icon('debug')) db.setToolTip('enable/disable debug mode') db.clicked.connect(lambda: self.setDebugMode(db.isChecked())) db.setCheckable(True) db.setEnabled(True) for btn in [rb, sp, sp, ar, db]: btn.setAutoRaise(True) self.toolbarLayout.addWidget(btn) self.toolbarLayout.addStretch() def __initRightClickMenu__(self) -> None: ''' Create base actions. ''' self.rightClickMenu = QtWidgets.QMenu() self.showAllAction = QtWidgets.QAction('Show All', self) self.showAllAction.triggered.connect(self.showAllNodes) self.copyAction = QtWidgets.QAction('Copy', self) self.copyAction.setShortcut( QtGui.QKeySequence(QtCore.Qt.CTRL+QtCore.Qt.Key_C)) self.copyAction.triggered.connect(self.copyNodes) self.cutAction = QtWidgets.QAction('Cut', self) self.cutAction.setShortcut( QtGui.QKeySequence(QtCore.Qt.CTRL+QtCore.Qt.Key_X)) self.cutAction.triggered.connect(self.cutNodes) self.pasteAction = QtWidgets.QAction('Paste', self) self.pasteAction.setShortcut( QtGui.QKeySequence(QtCore.Qt.CTRL+QtCore.Qt.Key_V)) self.pasteAction.triggered.connect(self.pasteNodes) self.selectAllAction = QtWidgets.QAction('Select All', self) self.selectAllAction.setShortcut( QtGui.QKeySequence(QtCore.Qt.CTRL+QtCore.Qt.Key_A)) self.selectAllAction.triggered.connect(self.selectAll) self.deleteAction = QtWidgets.QAction('Delete', self) self.deleteAction.setShortcut( QtGui.QKeySequence(QtCore.Qt.Key_Delete)) self.deleteAction.triggered.connect(self.deleteNodes) self.groupAction = QtWidgets.QAction('Group', self) self.groupAction.setShortcut( QtGui.QKeySequence(QtCore.Qt.CTRL + QtCore.Qt.Key_G)) self.groupAction.triggered.connect(self.group) self.refreshAction = QtWidgets.QAction('Refresh Nodes', self) self.refreshAction.setShortcut( QtGui.QKeySequence(QtCore.Qt.CTRL + QtCore.Qt.Key_R)) self.refreshAction.triggered.connect(self.reloadLibrary) self.rightClickBaseList = [ self.showAllAction, self.copyAction, self.cutAction, self.pasteAction, self.selectAllAction, self.deleteAction, self.groupAction, self.refreshAction] def __initSearchWidget__(self) -> None: ''' Init search widget. ''' self.searchWidget = SearchNodeWidget(self) self.searchWidget.hide() self.searchWidget.newNodeEvent.connect(self.addNode) @staticmethod def __buildLibraryMenu(callback, tree, pathlist=[], parent=None, disabled=[]): menu = QtWidgets.QMenu(parent) for k in tree.keys(): templist = pathlist if is_node_class(tree[k]): action = QtWidgets.QAction(parent) action.setText(tree[k].name) path = templist+[tree[k].name] action.triggered.connect(lambda ignore, p=path: callback(path=p)) menu.addAction(action) if k in disabled: action.setEnabled(False) else: temp = NodeWidget.__buildLibraryMenu(callback, tree[k], templist+[k], menu, disabled) temp.setTitle(k) menu.addMenu(temp) return menu @staticmethod def __buildLibraryList(tree, pathlist=[]): nodelist = [] for k in tree.keys(): templist = pathlist if is_node_class(tree[k]): nodelist.append('.'.join(templist+[tree[k].name])) else: temp = NodeWidget.__buildLibraryList(tree[k], templist+[k]) nodelist.extend(temp) return nodelist
[docs] def keyboardSearch(self, pos, text=None): ''' Display a Node search tool when typing. ''' if not self.searchWidget.searchList: # Node List nodelist = self.__buildLibraryList(self.nodeLibrary.nodetree) # remove parallel loop from all except top level if self.parentNode is not None and 'structure.Parallel Loop' in nodelist: nodelist.remove('structure.Parallel Loop') self.searchWidget.setNodeList(nodelist) if not self.searchWidget.isVisible(): self.searchWidget.move( self.searchWidget.parent().view.mapTo( self.searchWidget.parent(), pos)) self.searchWidget.show() self.searchWidget.setFocus(text=text)
[docs] def keyboardPress(self, key, data): ''' Handle keyboard press events from the view. Only lets control modified keys through, except for delete. ''' focusItem = self.scene.focusItem() if key == QtCore.Qt.Key_Delete or key == QtCore.Qt.Key_Backspace: self.deleteNodes() elif key == QtCore.Qt.Key_U: self.run() elif key == QtCore.Qt.Key_C: self.copyNodes() elif key == QtCore.Qt.Key_X: self.cutNodes() elif key == QtCore.Qt.Key_V: # this patch only pastes to scenes where no items have focus # this enables pasting into structures without pasting into all # nodewidgets if focusItem is None: self.pasteNodes() # if you have selected or clicked in a structure node elif self.scene.selectedItems(): self.pasteNodes() elif key == QtCore.Qt.Key_G: self.group() elif key == QtCore.Qt.Key_A: self.selectAll()
def autorunToggle(self): self.autorun = self.autorunToolButton.isChecked() self.runToolButton.setEnabled(not self.autorun) self.stopToolButton.setEnabled(not self.autorun) self.stepToolButton.setEnabled(not self.autorun) self.networkChanged() def updateGraphics(self): for node in self.nodeDict.values(): if isinstance(node, Node): node.nodeGraphic.updateTerminalValues() if hasattr(node, 'nodeWidget'): node.nodeWidget.updateGraphics() elif hasattr(node, 'nodeWidgets'): for wid in node.nodeWidgets: wid.updateGraphics() node.nodeGraphic.update(node.nodeGraphic.rect()) def showRightClickMenu(self, globalpos, scenepos, otheractions=None): self.mousePos = scenepos self.rightClickMenu.clear() # Base menu Actions self.rightClickMenu.addActions(self.rightClickBaseList) if self.scene.selectedItems(): self.copyAction.setEnabled(True) self.cutAction.setEnabled(True) self.deleteAction.setEnabled(True) else: self.copyAction.setEnabled(False) self.cutAction.setEnabled(False) self.deleteAction.setEnabled(False) if hasattr(QtWidgets.QApplication.instance(), 'globalClipboard'): self.pasteAction.setEnabled(True) else: self.pasteAction.setEnabled(False) # Other Actions if otheractions: self.rightClickMenu.addActions(otheractions) self.rightClickMenu.addSeparator() # Add node library # remove parallel loop is not top level nodeWidget disabled: List[Any] = [] if self.parentNode is not None: disabled = ['Parallel Loop'] self.nodeMenu = self.__buildLibraryMenu(self.addNode, self.nodeLibrary.nodetree, parent=self.rightClickMenu, disabled=disabled) self.nodeMenu.setTitle('Add Node') self.rightClickMenu.addMenu(self.nodeMenu) self.rightClickMenu.popup(QtGui.QCursor.pos())
[docs] def copyNodes(self, recursive=False): ''' Copy the currently selected Nodes ''' # this is not a cut self.cut = False # if its the top level nodewidget, put it on the global clipboard if not recursive: QtWidgets.QApplication.instance().globalClipboard = self.scene.selectedItems() # otherwise put it on the local clipboard else: self.localClipboard = self.scene.selectedItems() # check for and copy structure nodes for item in self.scene.selectedItems(): if isinstance(item, NodeGraphic): if hasattr(item.node, 'nodeWidget'): item.node.nodeWidget.selectAll() item.node.nodeWidget.copyNodes(recursive=True) item.node.nodeWidget.deselectAll() elif hasattr(item.node, 'nodeWidgets'): for i in range(0, item.node.switch.count()): item.node.switch.widget(i).selectAll() item.node.switch.widget(i).copyNodes(recursive=True) item.node.switch.widget(i).deselectAll()
[docs] def cutNodes(self): ''' Copies and designates the currently selected Nodes for deletion ''' self.copyNodes() # this is a cut self.cut = True
[docs] def pasteNodes(self, recursive=False): ''' Paste the currently copied Nodes. ''' if not hasattr(QtWidgets.QApplication.instance(), 'globalClipboard')\ or not QtWidgets.QApplication.instance().globalClipboard: return # clear current selection self.scene.clearSelection() # terminal map nodeTermMap = {} # get all the copied items items: List[Any] = [] # if its a structure if recursive: items = self.localClipboard # if its a recursive copy add everything to the nodetermmap for key in self.nodeDict: nodeTermMap[key] = key # if its the top level else: items = QtWidgets.QApplication.instance().globalClipboard # sort into nodes and connections nodeList = [] connectList = [] for item in items: if isinstance(item, NodeGraphic): nodeList.append(item) elif isinstance(item, Connection): connectList.append(item) # for top level nodewidget offset the paste by 30 if recursive: offset = QtCore.QPointF(0.0, 0.0) else: minX = nodeList[0].pos().x() minY = nodeList[0].pos().y() for node in nodeList: if node.pos().x() < minX: minX = node.pos().x() if node.pos().y() < minY: minY = node.pos().y() if isinstance(self.scene, StructureScene) or self.view._viewMousePos is None: pos = self.view.mapToScene(QtCore.QPoint(30.0, 30.0)) offset = QtCore.QPoint(pos.x()-minX, pos.y()-minY) else: pos = self.view.mapToScene(self.view._viewMousePos) offset = QtCore.QPoint(pos.x()-minX, pos.y()-minY) # keep a list of the new nodes to hide their terminals as needed newNodeList = [] # create the new copies of the nodes for node in nodeList: newNode = self.addNode(node.node.path, node.pos()+offset) newNodeList.append(newNode) nodeTermMap[node.node.uniqueName] = newNode.uniqueName # if its a node is a structure node if hasattr(newNode, 'nodeWidget'): if node.node.nodeWidget.localClipboard: newNode.nodeWidget.localClipboard = node.node.nodeWidget.localClipboard newNode.nodeWidget.pasteNodes(recursive=True) if hasattr(newNode, 'nodeWidgets'): newNode.removeTab() for i in range(0, node.node.switch.count()-1): newNode.addTab() newNode.setState(copy.deepcopy(node.node.state())) if hasattr(newNode, 'nodeWidgets'): for i in range(0, newNode.switch.count()): if node.node.switch.widget(i).localClipboard: newNode.switch.widget(i).localClipboard = node.node.switch.widget(i).localClipboard newNode.switch.widget(i).pasteNodes(recursive=True) # only highlight nodes on the top level if isinstance(self.parent(), QtWidgets.QMainWindow): newNode.nodeGraphic.setSelected(True) # add connections for connect in connectList: if connect.startTerm is None or connect.stopTerm is None: continue startterm = connect.startTerm.name startnode = connect.startTerm.node.uniqueName stopterm = connect.stopTerm.name stopnode = connect.stopTerm.node.uniqueName # get the control points controlPoints = [] for controlPoint in connect.pathPoints: controlPoints.append([controlPoint.pos().x(), controlPoint.pos().y(), controlPoint.flipped]) if startnode in nodeTermMap and stopnode in nodeTermMap: conn = self.addConnection( self.nodeDict[nodeTermMap[startnode]][startterm], self.nodeDict[nodeTermMap[stopnode]][stopterm], controlPoints ) conn.setSelected(True) # select all pasted nodes and hide terminals of new nodes as needed for node in newNodeList: node.nodeGraphic.setSelected(True) if node.hidden: node.collapse(node.hidden) if self.cut: self.deleteCopyNodes()
[docs] def selectAll(self): ''' Select Everything ''' for item in self.scene.items(): item.setSelected(True)
[docs] def deselectAll(self): ''' Deselect Everything ''' for item in self.scene.items(): item.setSelected(False)
[docs] def deleteNodes(self): ''' Delete the currently selected Nodes. ''' if self.prompt: self.scene.maintainSelection = True items = self.scene.selectedItems() nodes = [item.name for item in items if not isinstance(item, ControlPoint)] msg = QtWidgets.QMessageBox() msg.setWindowTitle('Confirm') msg.setText('Delete the following?\n'+', '.join(nodes)) msg.setStandardButtons(QtWidgets.QMessageBox.Ok | QtWidgets.QMessageBox.Cancel) if msg.exec_() == QtWidgets.QMessageBox.Ok: self.remove(items) self.networkChanged() self.scene.maintainSelection = False else: self.remove(self.scene.selectedItems()) self.networkChanged()
[docs] def deleteCopyNodes(self): ''' Delete the cut Nodes. ''' self.remove(self.localClipboard) self.remove(QtWidgets.QApplication.instance().globalClipboard) self.networkChanged()
[docs] def deleteAllNodes(self, confirm=True): ''' Delete all Nodes. ''' if confirm: msg = QtWidgets.QMessageBox() msg.setText("Are you sure you want to clear the canvas?") msg.setWindowTitle("Confirm") msg.addButton(QtWidgets.QMessageBox.Yes) msg.addButton(QtWidgets.QMessageBox.No) if msg.exec_() == QtWidgets.QMessageBox.Yes: self.remove(self.scene.items()) self.networkChanged() else: self.remove(self.scene.items()) self.networkChanged()
[docs] def addNode(self, path=[], pos=None, geometry=None, uniquename=None, node=None): ''' Add a node to the scene. ''' if path: # Get the Node from the library node = self.nodeLibrary.getNode(path) if node is None: raise ValueError('No node provided') # inspect the arguments to see if parent is accepted call = inspect.signature(node.__init__).parameters # initialize Node if 'parent' in call: node = node(self) else: node = node() # Check if name in dict, coming from load event if uniquename is not None: name = get_unique_name(uniquename, self.nodeDict) # come from right-click or search else: self.set_unsaved_flag() name = get_unique_name(node.name, self.nodeDict) node.setUniqueName(name) node.setPath(path) node.setStyle(self.style) node.valueChanged.connect(self.networkChanged) # Check to see if the node has a stopEvent #if hasattr(node, 'stopEvent'): # node.stopEvent.connect(self.stopRun) # Add to node scene dict self.nodeDict[name] = node # node.nodeGraphic.onlypositivecoords = self.onlypositivecoords node.parent = self # Add the nodeGraphic to the scene self.scene.addItem(node.nodeGraphic) # Put the new node on top self.scene.topLayer += 1 node.setLayer(self.scene.topLayer) if pos is not None: if not isinstance(pos, QtCore.QPointF): if isinstance(pos, QtCore.QPoint): pos = QtCore.QPointF(pos.x(), pos.y()) elif isinstance(pos, (list, tuple)): pos = QtCore.QPointF(pos[0], pos[1]) else: raise ValueError("{} Not a valid position, must be QPointF, QPoint, [], or ()".format(pos)) node.nodeGraphic.moveTo(pos) elif self.mousePos is not None: node.nodeGraphic.moveTo(self.mousePos) if geometry is not None: if not isinstance(geometry, QtCore.QSizeF): if isinstance(geometry, QtCore.QSize): geometry = QtCore.QSizeF(geometry.x(), geometry.y()) elif isinstance(geometry, (list, tuple)): geometry = QtCore.QSizeF(geometry[0], geometry[1]) else: raise ValueError("{} Not a valid geometry, must be QPointF, QPoint, [], or ()".format(pos)) node.nodeGraphic.resize(geometry) node.newConnectionEvent.connect(self._newConnection) return node
def _newConnection(self, term, pos, inout): ''' Create a new connection from scene events. ''' if self.newConnect is None: self.newConnect = Connection(self, style=self.style['connection']) self.newConnect.name = get_unique_name('Connection', self.nodeDict) self.newConnect.uniqueName = get_unique_name('Connection', self.nodeDict) self.scene.addItem(self.newConnect) self.nodeDict[self.newConnect.uniqueName] = self.newConnect if inout == 'in': self.newConnect.setStopTerminal(term) else: self.newConnect.setStartTerminal(term) self.newConnect.updateLine(mousePos=pos) if self.newConnect.isConnected(): self.networkChanged() self.newConnect = None self.view.conn = self.newConnect
[docs] def addConnection(self, from_, to, controlpoints=None): ''' Add a new connection ''' conn = Connection(self, style=self.style['connection']) conn.setStartTerminal(from_) conn.setStopTerminal(to) if controlpoints is not None: for point in controlpoints: conn.addControlPoint(point=QtCore.QPointF(point[0], point[1]), flipped=point[2]) self.scene.addItem(conn) conn.uniqueName = get_unique_name('Connection', self.nodeDict) conn.name = get_unique_name('Connection', self.nodeDict) self.nodeDict[conn.uniqueName] = conn conn.updateLine() conn.update() self.networkChanged() return conn
def showAllNodes(self): self.view.showAll()
[docs] def remove(self, items): ''' Delete items from scene ''' # TODO: change this to isIterable from the tools.py if not isiterable(items): items = [items] for item in items: if isinstance(item, NodeGraphic): node = self.nodeDict.pop(item.node.uniqueName) node.removeConnections() self.scene.removeItem(item) node.deleted() elif isinstance(item, Connection) and item in self.scene.items(): item.removeConnections() # Clean up connections for item in self.scene.items(): if isinstance(item, Connection) and not item.isConnected(): item.removeConnections() self.networkChanged()
[docs] def buildRunOrder(self): ''' Process dependencies of Nodes, determine order. ''' # Collect dependencies self.dependencyDict = {} for key, node in self.nodeDict.items(): if hasattr(node, 'dependencies'): self.dependencyDict[key] = node.dependencies() # calculate order order: List[Any] = [] if self.dependencyDict: order = self.topologicalSort() for _ in range(order.count(None)): order.remove(None) self.runOrder = order
[docs] def topologicalSort(self): ''' Sort the dependency tree. Inspired by: http://code.activestate.com/recipes/578272-topological-sort/ ''' data = self.dependencyDict.copy() # Ignore self dependencies. for k, v in data.items(): v.discard(k) # Find all items that don't depend on anything. extra_items_in_deps = reduce(set.union, data.values()) - set(data.keys()) # Add empty dependences where needed data.update({item:set() for item in extra_items_in_deps}) order = [] while True: ordered = set(item for item, dep in data.items() if not dep) if not ordered: break order.append(ordered) data = {item: (dep - ordered) for item, dep in data.items() if item not in ordered} return order
def runPause(self): if self.step != 2: self.setStepMode(2) else: self.pauseRun(not self.pause) if self.pause: self.runToolButton.setIcon(get_icon('play')) self.runningLabel.setText('paused') self.updateGraphics() else: if self.nodeDict.items(): self.runToolButton.setIcon(get_icon('pause')) self.runningLabel.setText('running') QtWidgets.QApplication.processEvents() if not self.running: self.runningLabel.show() self.run() self.view.setEnabled(self.pause) def pauseRun(self, pause=True): self.pause = pause for node in self.nodeDict.values(): if isinstance(node, Node): if hasattr(node, 'nodeWidget'): node.nodeWidget.pauseRun(self.pause) elif hasattr(node, 'nodeWidgets'): for wid in node.nodeWidgets: wid.pauseRun(self.pause)
[docs] def stopRun(self): ''' Stop the run ''' self.stop = True for node in self.nodeDict.values(): if isinstance(node, Node): if hasattr(node, 'nodeWidget'): node.nodeWidget.stopRun() elif hasattr(node, 'nodeWidgets'): for wid in node.nodeWidgets: wid.stopRun() self.reinit()
def stepThrough(self): # if the sheet is paused if self.pause: self.pauseRun(False) if not self.running: self.runningLabel.show() self.running = True self.run() # if the sheet is running else: self.runToolButton.setIcon(get_icon('play')) self.updateGraphics() # take one step self.setStepMode(1) def setStepMode(self, step=1): self.step = step for node in self.nodeDict.values(): if isinstance(node, Node): if hasattr(node, 'nodeWidget'): node.nodeWidget.setStepMode(self.step) elif hasattr(node, 'nodeWidgets'): for i in range(0, node.switch.count()): node.widget(i).setStepMode(self.step) def setFirstRun(self): self.firstRun = True for node in self.nodeDict.values(): if isinstance(node, Node): if hasattr(node, 'nodeWidget'): node.nodeWidget.setFirstRun() elif hasattr(node, 'nodeWidgets'): for wid in node.nodeWidgets: wid.setFirstRun() def setDebugMode(self, debug=False): self.debugMode = debug self.stepToolButton.setEnabled(self.debugMode) for node in self.nodeDict.values(): if isinstance(node, Node): if hasattr(node, 'nodeWidget'): node.nodeWidget.setDebugMode(debug) elif hasattr(node, 'nodeWidgets'): for wid in node.nodeWidgets: wid.setDebugMode(debug) def reinit(self): self.runningLabel.hide() self.running = False self.firstRun = True self.runToolButton.setIcon(get_icon('play')) self.stop = False self.setFirstRun() self.updateGraphics() self.view.setEnabled(True) self.pauseRun(True) def run(self): # check to make sure we are not already running the sheet if self.threaded and self.running: return self.running = True if self.nodeDict.items(): # threaded if self.threaded: # if self.runThread is not None: # self.runThread.wait() # the top level sheet if self.parentNode is None: if self.debugMode: self.runThread = WorkerThread(self.execute) else: self.runThread = WorkerThread(self.executeThreaded) self.runThread.finished.connect(self.reinit) self.runThread.finished.connect(self.runThread.deleteLater) self.runThread.start() # structure nodes - future support # currently all structure nodes have threaded = False else: return self.executeThreaded() # not threaded else: error = self.execute() if self.parentNode is None: self.reinit() return error
[docs] def executeThreaded(self): ''' Run the nodes. ''' bigList = [] runList = [] self.buildRunOrder() for nodes in self.runOrder: for node in nodes: bigList.append(self.nodeDict[node]) runList.append(False) self.nodeDict[node].runState = 1 while False in runList: threads = [] for i, node in enumerate(bigList): if not runList[i]: # check nodes for force run if node.forceRun: self.hasForceRunNode = True if node.depsFinished() == 0: node.setRunning() thread = WorkerThread(node.runNode) threads.append(thread) thread.finished.connect(thread.deleteLater) thread.finished.connect(self.updateGraphics) thread.finished.connect(node.clearRunning) thread.start() runList[i] = True elif node.depsFinished() == 2: # center on error if node in self.nodeDict: self.view.centerOn(self.nodeDict[node].nodeGraphic) else: warnings.warn("{} raised an error, can't find in sheet to center on.".format(node)) return False for thread in threads: thread.wait() # if any node has force run - signal network changed # to make the autorun keep running sheets that have forcerun nodes if self.hasForceRunNode: self.networkChanged() self.hasForceRunNode = False return True
def execute(self): # loop through and process if self.firstRun: self.buildRunOrder() self.firstRun = False for nodes in self.runOrder: for node in nodes: # show the running outline self.nodeDict[node].setRunning() # check all nodes for forceRun if self.nodeDict[node].forceRun: self.hasForceRunNode = True # run the node result = self.nodeDict[node].runNode() if self.debugMode: # update the terminal values and wait self.nodeDict[node].updateTerminalValues.emit() # if you are stepping wait if self.step == 1: self.setStepMode(0) while self.step == 0: time.sleep(0.0001) else: # if not sleep to allow the graphics to update time.sleep(0.4) # if the user has chosen to stop if self.stop: return True # an error occurred if not result: # center on error self.view.centerOn(self.nodeDict[node].nodeGraphic) return False # pause operation of the sheet while self.pause and not self.autorun: time.sleep(0.0001) # clear the running outline self.nodeDict[node].clearRunning() # if any node has force run - signal network changed # to make the autorun keep running sheets that have forcerun nodes if self.hasForceRunNode: self.networkChanged() self.hasForceRunNode = False return True
[docs] def networkChanged(self, args=None): ''' Every time a new connection is made, or widget value changes, run this code. ''' #self.buildRunOrder() # Check errors #for nodes in self.runOrder: # for node in nodes: # if node in self.nodeDict: # self.nodeDict[node].errorCheck() if self.autorun: QtCore.QTimer.singleShot(0, self.run) self.networkChangeEvent.emit() self.set_unsaved_flag()
[docs] def open(self, event=None, path=None, nodeDict=None, customParams=None): ''' Open a node chart or nodeDict ''' if path is None and nodeDict is None: path = QtWidgets.QFileDialog.getOpenFileName( self, 'Open a Node Chart', self.save_path, ';;'.join(["Node Chart (*.nc)"]), ) # Qt 5.6 returns a tuple instead of a string if isinstance(path, tuple): path = str(path[0]) else: path = str(path) if not path: return False # set early so if there are errors, we know what the file is self.file_path = path if path is not None: # read file with open(path) as f: nodeDict = json.load(f, cls=CustomDecoder) elif nodeDict is not None: pass else: raise ValueError('Nothing to open') # set name if path is not None: self.setName(os.path.basename(path)) self.save_path = os.path.dirname(path) self.file_path = path # Populate Nodes first nodeTermMap = {} for key, nodeState in nodeDict.items(): if 'type' in nodeState and nodeState['type'] == 'Node': node = self.addNode(nodeState['path'], pos=nodeState['pos'], uniquename=nodeState['uniquename']) nodeTermMap[key] = node.uniqueName # load nodewidgets into case tabs if hasattr(node, 'nodeWidgets'): for key2, value in nodeState.items(): if key2.startswith('nodewidget_'): index = int(key2.split('_')[1]) temp = NodeWidget(node.nodeGraphic.w, parentNode=node, showtoolbar=False, onlypositivecoords=True, threaded=self.threaded) temp.nodeLibrary.buildDefaultLibrary() temp.view.setAlignment(QtCore.Qt.AlignLeft | QtCore.Qt.AlignTop) temp.scene = StructureScene(temp) temp.view.setScene(temp.scene) node.setTab(temp, index) node.nodeWidgets.append(temp) node.setState(nodeState, customParams=customParams) # open the nodedicts into the case node widgets # In order for the connections to populate correctly you have # to load the tab first then set the state and then populate # the nodewidget if hasattr(node, 'nodeWidgets'): for key2, value in nodeState.items(): if key2.startswith('nodewidget_'): index = int(key2.split('_')[1]) node.switch.widget(index).open(nodeDict=value) node.nodeGraphic.update() # load nodes in structure if hasattr(node, 'nodeWidget'): node.nodeWidget.open(nodeDict=nodeState['nodewidget']) node.nodeGraphic.update() elif 'tunnel' in nodeState and nodeState['tunnel']: nodeTermMap[key] = key # Populate the connections second for key, nodeState in nodeDict.items(): if 'type' in nodeState and nodeState['type'] == 'Connection': self.style['connection']['line'] = nodeState['line'] toTerm = self.nodeDict[nodeTermMap[nodeState['output'][0]]][nodeState['output'][1]] fromTerm = self.nodeDict[nodeTermMap[nodeState['input'][0]]][nodeState['input'][1]] conn = self.addConnection(fromTerm, toTerm, nodeState['controlpoints']) conn.set_feedback(nodeState.get('feedback', False)) # hide or expand terminals after the connections are created for key, nodeState in nodeDict.items(): if 'type' in nodeState and nodeState['type'] == 'Node': if 'hidden' in nodeState and nodeState['hidden']: self.nodeDict[nodeState['uniquename']].collapse(nodeState['hidden']) # emit the onLoad signal for all nodes for key, nodeState in nodeDict.items(): if 'type' in nodeState and nodeState['type'] == 'Node': self.nodeDict[nodeTermMap[key]].onLoad.emit() self.unsaved_flag = False return True
def set_unsaved_flag(self): self.unsaved_flag = True self.needsSavedEvent.emit()
[docs] def save(self, event=None, path=None, customParams=None): ''' Save the node chart ''' if path is None: path = QtWidgets.QFileDialog.getSaveFileName( self, 'Save the Node Chart', self.save_path, ';;'.join(["Node Chart (*.nc)"]), ) # Qt 5.6 returns a tuple instead of a string if isinstance(path, tuple): path = str(path[0]) else: path = str(path) if not path: return False self.save_path = os.path.dirname(path) if not path.endswith('.nc'): path += '.nc' saveDict = self.get_save_dict(customParams) # Set Name self.setName(os.path.basename(path)) # Save File with open(path, 'w') as f: json.dump(saveDict, f, indent=2, cls=CustomEncoder) self.unsaved_flag = False return True
def get_save_dict(self, customParams=None): # build save dictionary saveDict = {} for key, node in self.nodeDict.items(): saveDict[key] = node.state(customParams=customParams) return saveDict
[docs] def resizeEvent(self, event: QResizeEvent) -> None: self.sizeChangeEvent.emit()
def reloadLibrary(self): self.save(path='temp.nc') self.deleteAllNodes(False) self.nodeLibrary.reloadNodes() self.nodeLibrary.buildDefaultLibrary() self.open(path='temp.nc') os.remove('temp.nc') def setName(self, name='Untitled'): self.name = name def getName(self): if self.name is None: return 'Untitled' return self.name def toggleLines(self): style = 'cubic' if self.style['connection']['line'] == 'line' else 'line' self.setLineStyle(style) def setLineStyle(self, style: str = 'cubic') -> None: self.style['connection']['line'] = style for node in self.nodeDict.values(): if isinstance(node, Connection): node.style.update(self.style['connection']) node.updateLine() def centerSheet(self): self.view.fitInView(self.scene.itemsBoundingRect(), QtCore.Qt.KeepAspectRatio) #self.view.centerOn(self.scene.itemsBoundingRect().center()) def group(self): if self.scene.selectedItems(): #copy self.copyNodes() #create group groupNode = self.addNode(['structure', 'Group']) #paste Nodes - control where the nodes are pasted groupNode.nodeWidget.pasteNodes() # delete the cut nodes self.deleteCopyNodes() def toggleSnapToGrid(self): self.snapToGrid = not self.snapToGrid def setScrollbarEnabled(self, enabled: bool = True) -> None: if enabled: self.view.setHorizontalScrollBarPolicy(QtCore.Qt.ScrollBarAlwaysOn) self.view.setVerticalScrollBarPolicy(QtCore.Qt.ScrollBarAlwaysOn) else: self.view.setHorizontalScrollBarPolicy(QtCore.Qt.ScrollBarAlwaysOff) self.view.setVerticalScrollBarPolicy(QtCore.Qt.ScrollBarAlwaysOff) def getDebugMode(self) -> bool: return self.debugMode def setGridSize(self, gridSize: int) -> None: self.gridSize = gridSize def previewFitInView(self): #self.scene.updateSceneBounds() self.preview.fitInView(self.scene.itemsBoundingRect(), QtCore.Qt.KeepAspectRatio) def createPreviewView(self) -> None: if self.preview is None: self.preview = PreviewView(self.scene, self, self.view) self.scene.changed.connect(self.previewFitInView) self.preview.sizeChangeEvent.connect(self.previewFitInView) def isPreviewCreated(self) -> bool: return self.preview is not None def screenShot(self, path=None): if path is None: path = QtWidgets.QFileDialog.getSaveFileName( self, 'Save a screenshot', self.save_path, ';;'.join(["PNG (*.png)", "JPG (*.jpg)"]), ) ext = '.png' # Qt 5.6 returns a tuple instead of a string if isinstance(path, tuple): if 'jpg' in path[1]: ext = '.jpg' path = str(path[0]) else: path = str(path) if not path: return self.save_path = os.path.dirname(path) if not path.endswith(ext): path += ext self.view.save_to_image(path, 1)