Dive into secure and efficient coding practices with our curated list of the top 10 examples showcasing 'watchdog' in functional components in Python. Our advanced machine learning engine meticulously scans each line of code, cross-referencing millions of open source libraries to ensure your implementation is not just functional, but also robust and secure. Elevate your React applications to new heights by mastering the art of handling side effects, API calls, and asynchronous operations with confidence and precision.
def test_unschedule_self(observer):
"""
Tests that unscheduling a watch from within an event handler correctly
correctly unregisters emitter and handler without deadlocking.
"""
class EventHandler(FileSystemEventHandler):
def on_modified(self, event):
observer.unschedule(watch)
unschedule_finished.set()
unschedule_finished = Event()
watch = observer.schedule(EventHandler(), '')
observer.start()
(emitter,) = observer.emitters
emitter.queue_event(FileModifiedEvent(''))
assert unschedule_finished.wait()
assert len(observer.emitters) == 0
def test_rename_file(self, sha_mock):
sha_mock.return_value = '1234'
project = self.PROJECT_STRUCTURE[0]
file_path = self.root_dir.join(
project['files'][0]['children'][0]['rel_path'].lstrip(os.path.sep)
)
os.rename(str(file_path), 'foobar.baz')
self.sync_worker.flushed.wait()
assert len(self.sync_worker._event_cache.events) == 1, 'exactly one event captured'
assert isinstance(self.sync_worker._event_cache.events[0], FileMovedEvent) is True, 'the one captured event is a FileMovedEvent'
def test_detect_modify_for_moved_files(p):
touch(p('a'))
ref = DirectorySnapshot(p(''))
wait()
touch(p('a'))
mv(p('a'), p('b'))
diff = DirectorySnapshotDiff(ref, DirectorySnapshot(p('')))
assert diff.files_moved == [(p('a'), p('b'))]
assert diff.files_modified == [p('a')]
def test_dir_modify_on_create(p):
ref = DirectorySnapshot(p(''))
wait()
touch(p('a'))
diff = DirectorySnapshotDiff(ref, DirectorySnapshot(p('')))
assert diff.dirs_modified == [p('')]
def test_move_folder(self):
project = self.PROJECT_STRUCTURE[0]
dir_path = self.root_dir.join(
project['files'][0]['rel_path'].lstrip(os.path.sep)
)
new_dir_path = str(dir_path).replace(dir_path.basename, 'newdir')
shutil.move(str(dir_path), new_dir_path)
self.sync_worker.flushed.wait()
assert len(self.sync_worker._event_cache.events) == 1, 'exactly one event captured'
assert isinstance(self.sync_worker._event_cache.events[0], DirMovedEvent) is True, 'the one captured event is a DirMovedEvent'
def test_rename_folder(self):
project = self.PROJECT_STRUCTURE[0]
dir_path = self.root_dir.join(
project['files'][0]['rel_path'].lstrip(os.path.sep)
)
new_dir_path = str(dir_path).replace(dir_path.basename, 'newdir')
os.rename(str(dir_path), new_dir_path)
self.sync_worker.flushed.wait()
assert len(self.sync_worker._event_cache.events) == 1, 'exactly one event captured'
assert isinstance(self.sync_worker._event_cache.events[0], DirMovedEvent) is True, 'the one captured event is a DirMovedEvent'
def wait():
"""
Wait long enough for file/folder mtime to change. This is needed
to be able to detected modifications.
"""
if platform.is_darwin() or platform.is_windows():
# on macOS resolution of stat.mtime is only 1 second
time.sleep(1.5)
else:
time.sleep(0.5)
all_events = [
dir_mod_event,
dir_del_event,
dir_cre_event,
dir_mov_event,
file_mod_event,
file_del_event,
file_cre_event,
file_mov_event,
]
def assert_equal(a, b):
self.assertEqual(a, b)
class TestableEventHandler(FileSystemEventHandler):
def on_any_event(self, event):
assert True
def on_modified(self, event):
assert_equal(event.event_type, EVENT_TYPE_MODIFIED)
def on_deleted(self, event):
assert_equal(event.event_type, EVENT_TYPE_DELETED)
def on_moved(self, event):
assert_equal(event.event_type, EVENT_TYPE_MOVED)
def on_created(self, event):
assert_equal(event.event_type, EVENT_TYPE_CREATED)
excluded_patterns=ignore_patterns,
case_sensitive=False)
self.assertTrue(filtered_paths)
dir_del_event_match = DirDeletedEvent('/path/blah.py')
dir_del_event_not_match = DirDeletedEvent('/path/foobar')
dir_del_event_ignored = DirDeletedEvent('/path/foobar.pyc')
file_del_event_match = FileDeletedEvent('/path/blah.txt')
file_del_event_not_match = FileDeletedEvent('/path/foobar')
file_del_event_ignored = FileDeletedEvent('/path/blah.pyc')
dir_cre_event_match = DirCreatedEvent('/path/blah.py')
dir_cre_event_not_match = DirCreatedEvent('/path/foobar')
dir_cre_event_ignored = DirCreatedEvent('/path/foobar.pyc')
file_cre_event_match = FileCreatedEvent('/path/blah.txt')
file_cre_event_not_match = FileCreatedEvent('/path/foobar')
file_cre_event_ignored = FileCreatedEvent('/path/blah.pyc')
dir_mod_event_match = DirModifiedEvent('/path/blah.py')
dir_mod_event_not_match = DirModifiedEvent('/path/foobar')
dir_mod_event_ignored = DirModifiedEvent('/path/foobar.pyc')
file_mod_event_match = FileModifiedEvent('/path/blah.txt')
file_mod_event_not_match = FileModifiedEvent('/path/foobar')
file_mod_event_ignored = FileModifiedEvent('/path/blah.pyc')
dir_mov_event_match = DirMovedEvent('/path/blah.py', '/path/blah')
dir_mov_event_not_match = DirMovedEvent('/path/foobar', '/path/blah')
dir_mov_event_ignored = DirMovedEvent('/path/foobar.pyc', '/path/blah')
file_mov_event_match = FileMovedEvent('/path/blah.txt', '/path/blah')
file_mov_event_not_match = FileMovedEvent('/path/foobar', '/path/blah')
file_mov_event_ignored = FileMovedEvent('/path/blah.pyc', '/path/blah')
from tests.sync.utils import TestSyncObserver
start_logging()
_map = {
('move', True): events.DirMovedEvent,
('move', False): events.FileMovedEvent,
('modify', True): events.DirModifiedEvent,
('modify', False): events.FileModifiedEvent,
('delete', True): events.DirDeletedEvent,
('delete', False): events.FileDeletedEvent,
('create', True): events.DirCreatedEvent,
('create', False): events.FileCreatedEvent,
}
def Event(type_, *src, sha=None):
assert len(src) < 3
if len(src) > 1:
assert src[0].endswith('/') == src[1].endswith('/')
event = _map[(type_, src[0].endswith('/'))](*(x.rstrip('/').replace('/', os.path.sep) for x in src))
event.sha256 = sha
return event
CASES = [{
'input': [Event('modify', '/Foo/bar/')],
'output': []
}, {