33import logging
44import sys
55import threading
6+ from contextlib import contextmanager
7+ from stat import S_IWGRP , S_IWOTH , S_IWUSR
68
79import pytest
810
911from filelock import FileLock , SoftFileLock , Timeout
12+ from filelock ._util import PermissionError
1013
1114
1215@pytest .mark .parametrize ("lock_type" , [FileLock , SoftFileLock ])
@@ -29,6 +32,52 @@ def test_simple(lock_type, tmp_path, caplog):
2932 assert [r .levelno for r in caplog .records ] == [logging .DEBUG , logging .DEBUG , logging .DEBUG , logging .DEBUG ]
3033
3134
35+ @contextmanager
36+ def make_ro (path ):
37+ write = S_IWUSR | S_IWGRP | S_IWOTH
38+ path .chmod (path .stat ().st_mode & ~ write )
39+ yield
40+ path .chmod (path .stat ().st_mode | write )
41+
42+
43+ @pytest .fixture ()
44+ def tmp_path_ro (tmp_path ):
45+ with make_ro (tmp_path ):
46+ yield tmp_path
47+
48+
49+ @pytest .mark .parametrize ("lock_type" , [FileLock , SoftFileLock ])
50+ @pytest .mark .skipif (sys .platform == "win32" , reason = "Windows does not have read only folders" )
51+ def test_ro_folder (lock_type , tmp_path_ro ):
52+ lock = lock_type (str (tmp_path_ro / "a" ))
53+ with pytest .raises (PermissionError , match = "Permission denied" ):
54+ lock .acquire ()
55+
56+
57+ @pytest .fixture ()
58+ def tmp_file_ro (tmp_path ):
59+ filename = tmp_path / "a"
60+ filename .write_text ("" )
61+ with make_ro (filename ):
62+ yield filename
63+
64+
65+ @pytest .mark .parametrize ("lock_type" , [FileLock , SoftFileLock ])
66+ def test_ro_file (lock_type , tmp_file_ro ):
67+ lock = lock_type (str (tmp_file_ro ))
68+ with pytest .raises (PermissionError , match = "Permission denied" ):
69+ lock .acquire ()
70+
71+
72+ @pytest .mark .parametrize ("lock_type" , [FileLock , SoftFileLock ])
73+ def test_missing_directory (lock_type , tmp_path_ro ):
74+ lock_path = tmp_path_ro / "a" / "b"
75+ lock = lock_type (str (lock_path ))
76+
77+ with pytest .raises (OSError , match = "No such file or directory:" ):
78+ lock .acquire ()
79+
80+
3281@pytest .mark .parametrize ("lock_type" , [FileLock , SoftFileLock ])
3382def test_nested_context_manager (lock_type , tmp_path ):
3483 # lock is not released before the most outer with statement that locked the lock, is left
@@ -93,8 +142,8 @@ def test_nested_forced_release(lock_type, tmp_path):
93142
94143
95144class ExThread (threading .Thread ):
96- def __init__ (self , target ):
97- super (ExThread , self ).__init__ (target = target )
145+ def __init__ (self , target , name ):
146+ super (ExThread , self ).__init__ (target = target , name = name )
98147 self .ex = None
99148
100149 def run (self ):
@@ -106,6 +155,7 @@ def run(self):
106155 def join (self , timeout = None ):
107156 super (ExThread , self ).join (timeout = timeout )
108157 if self .ex is not None :
158+ print ("fail from thread {}" .format (self .name ))
109159 if sys .version_info [0 ] == 2 :
110160 wrapper_ex = self .ex [1 ]
111161 raise (wrapper_ex .__class__ , wrapper_ex , self .ex [2 ])
@@ -124,7 +174,7 @@ def thread_work():
124174 with lock :
125175 assert lock .is_locked
126176
127- threads = [ExThread (target = thread_work ) for _ in range (100 )]
177+ threads = [ExThread (target = thread_work , name = "t{}" . format ( i )) for i in range (100 )]
128178 for thread in threads :
129179 thread .start ()
130180 for thread in threads :
@@ -138,21 +188,21 @@ def test_threaded_lock_different_lock_obj(lock_type, tmp_path):
138188 # Runs multiple threads, which acquire the same lock file with a different FileLock object. When thread group 1
139189 # acquired the lock, thread group 2 must not hold their lock.
140190
141- def thread_work_one ():
191+ def t_1 ():
142192 for _ in range (1000 ):
143193 with lock_1 :
144194 assert lock_1 .is_locked
145195 assert not lock_2 .is_locked
146196
147- def thread_work_two ():
197+ def t_2 ():
148198 for _ in range (1000 ):
149199 with lock_2 :
150200 assert not lock_1 .is_locked
151201 assert lock_2 .is_locked
152202
153203 lock_path = tmp_path / "a"
154204 lock_1 , lock_2 = lock_type (str (lock_path )), lock_type (str (lock_path ))
155- threads = [(ExThread (target = thread_work_one ) , ExThread (target = thread_work_two )) for i in range (10 )]
205+ threads = [(ExThread (t_1 , "t1_{}" . format ( i )) , ExThread (t_2 , "t2_{}" . format ( i ) )) for i in range (10 )]
156206
157207 for thread_1 , thread_2 in threads :
158208 thread_1 .start ()
0 commit comments