Skip to content

Commit 5b6d389

Browse files
authored
Add locking to File interface (#87)
1 parent 7f8b287 commit 5b6d389

16 files changed

+548
-88
lines changed

composer-require-check.json

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
"deleteFile",
2121
"filesystem",
2222
"openFile",
23+
"lock",
24+
"tryLock",
25+
"unlock",
2326
"eio_cancel",
2427
"eio_chmod",
2528
"eio_chown",
@@ -83,7 +86,8 @@
8386
"uv_fs_utime",
8487
"uv_fs_write",
8588
"uv_fs_close",
86-
"uv_strerror"
89+
"uv_strerror",
90+
"Amp\\Process\\IS_WINDOWS"
8791
],
8892
"php-core-extensions": [
8993
"Core",

composer.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,10 @@
5454
"psr-4": {
5555
"Amp\\File\\": "src"
5656
},
57-
"files": ["src/functions.php"]
57+
"files": [
58+
"src/functions.php",
59+
"src/Internal/functions.php"
60+
]
5861
},
5962
"autoload-dev": {
6063
"psr-4": {

src/Driver/BlockingFile.php

Lines changed: 59 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
use Amp\Cancellation;
99
use Amp\DeferredFuture;
1010
use Amp\File\File;
11+
use Amp\File\Internal;
12+
use Amp\File\LockType;
1113
use Amp\File\Whence;
1214

1315
/**
@@ -24,6 +26,8 @@ final class BlockingFile implements File, \IteratorAggregate
2426

2527
private readonly DeferredFuture $onClose;
2628

29+
private ?LockType $lockType = null;
30+
2731
/**
2832
* @param resource $handle An open filesystem descriptor.
2933
* @param string $path File path.
@@ -55,18 +59,46 @@ public function __destruct()
5559
}
5660
}
5761

58-
public function read(?Cancellation $cancellation = null, int $length = self::DEFAULT_READ_LENGTH): ?string
62+
/**
63+
* Returns the currently active lock mode, or null if the file is not locked.
64+
*/
65+
public function getLockType(): ?LockType
5966
{
60-
if ($this->handle === null) {
61-
throw new ClosedException("The file '{$this->path}' has been closed");
67+
return $this->lockType;
68+
}
69+
70+
public function lock(LockType $type, ?Cancellation $cancellation = null): void
71+
{
72+
Internal\lock($this->path, $this->getFileHandle(), $type, $cancellation);
73+
$this->lockType = $type;
74+
}
75+
76+
public function tryLock(LockType $type): bool
77+
{
78+
$locked = Internal\tryLock($this->path, $this->getFileHandle(), $type);
79+
if ($locked) {
80+
$this->lockType = $type;
6281
}
6382

83+
return $locked;
84+
}
85+
86+
public function unlock(): void
87+
{
88+
Internal\unlock($this->path, $this->getFileHandle());
89+
$this->lockType = null;
90+
}
91+
92+
public function read(?Cancellation $cancellation = null, int $length = self::DEFAULT_READ_LENGTH): ?string
93+
{
94+
$handle = $this->getFileHandle();
95+
6496
try {
6597
\set_error_handler(function (int $type, string $message): never {
6698
throw new StreamException("Failed reading from file '{$this->path}': {$message}");
6799
});
68100

69-
$data = \fread($this->handle, $length);
101+
$data = \fread($handle, $length);
70102
if ($data === false) {
71103
throw new StreamException("Failed reading from file '{$this->path}'");
72104
}
@@ -79,16 +111,14 @@ public function read(?Cancellation $cancellation = null, int $length = self::DEF
79111

80112
public function write(string $bytes): void
81113
{
82-
if ($this->handle === null) {
83-
throw new ClosedException("The file '{$this->path}' has been closed");
84-
}
114+
$handle = $this->getFileHandle();
85115

86116
try {
87117
\set_error_handler(function (int $type, string $message): never {
88118
throw new StreamException("Failed writing to file '{$this->path}': {$message}");
89119
});
90120

91-
$length = \fwrite($this->handle, $bytes);
121+
$length = \fwrite($handle, $bytes);
92122
if ($length === false) {
93123
throw new StreamException("Failed writing to file '{$this->path}'");
94124
}
@@ -131,6 +161,7 @@ public function close(): void
131161
throw new StreamException("Failed closing file '{$this->path}'");
132162
} finally {
133163
\restore_error_handler();
164+
$this->lockType = null;
134165
}
135166
}
136167

@@ -146,16 +177,14 @@ public function onClose(\Closure $onClose): void
146177

147178
public function truncate(int $size): void
148179
{
149-
if ($this->handle === null) {
150-
throw new ClosedException("The file '{$this->path}' has been closed");
151-
}
180+
$handle = $this->getFileHandle();
152181

153182
try {
154183
\set_error_handler(function (int $type, string $message): never {
155184
throw new StreamException("Could not truncate file '{$this->path}': {$message}");
156185
});
157186

158-
if (!\ftruncate($this->handle, $size)) {
187+
if (!\ftruncate($handle, $size)) {
159188
throw new StreamException("Could not truncate file '{$this->path}'");
160189
}
161190
} finally {
@@ -165,9 +194,7 @@ public function truncate(int $size): void
165194

166195
public function seek(int $position, Whence $whence = Whence::Start): int
167196
{
168-
if ($this->handle === null) {
169-
throw new ClosedException("The file '{$this->path}' has been closed");
170-
}
197+
$handle = $this->getFileHandle();
171198

172199
$mode = match ($whence) {
173200
Whence::Start => SEEK_SET,
@@ -181,7 +208,7 @@ public function seek(int $position, Whence $whence = Whence::Start): int
181208
throw new StreamException("Could not seek in file '{$this->path}': {$message}");
182209
});
183210

184-
if (\fseek($this->handle, $position, $mode) === -1) {
211+
if (\fseek($handle, $position, $mode) === -1) {
185212
throw new StreamException("Could not seek in file '{$this->path}'");
186213
}
187214

@@ -193,20 +220,12 @@ public function seek(int $position, Whence $whence = Whence::Start): int
193220

194221
public function tell(): int
195222
{
196-
if ($this->handle === null) {
197-
throw new ClosedException("The file '{$this->path}' has been closed");
198-
}
199-
200-
return \ftell($this->handle);
223+
return \ftell($this->getFileHandle());
201224
}
202225

203226
public function eof(): bool
204227
{
205-
if ($this->handle === null) {
206-
throw new ClosedException("The file '{$this->path}' has been closed");
207-
}
208-
209-
return \feof($this->handle);
228+
return \feof($this->getFileHandle());
210229
}
211230

212231
public function getPath(): string
@@ -238,4 +257,18 @@ public function getId(): int
238257
{
239258
return $this->id;
240259
}
260+
261+
/**
262+
* @return resource
263+
*
264+
* @throws ClosedException
265+
*/
266+
private function getFileHandle()
267+
{
268+
if ($this->handle === null) {
269+
throw new ClosedException("The file '{$this->path}' has been closed");
270+
}
271+
272+
return $this->handle;
273+
}
241274
}

src/Driver/EioFile.php

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,26 +14,36 @@ final class EioFile extends Internal\QueuedWritesFile
1414
{
1515
private readonly Internal\EioPoll $poll;
1616

17-
/** @var resource eio file handle. */
18-
private $fh;
17+
/** @var int eio file handle resource ID. */
18+
private int $fh;
19+
20+
/** @var resource|closed-resource */
21+
private $fd;
1922

2023
private ?Future $closing = null;
2124

2225
private readonly DeferredFuture $onClose;
2326

24-
/**
25-
* @param resource $fh
26-
*/
27-
public function __construct(Internal\EioPoll $poll, $fh, string $path, string $mode, int $size)
27+
public function __construct(Internal\EioPoll $poll, int $fh, string $path, string $mode, int $size)
2828
{
2929
parent::__construct($path, $mode, $size);
3030

3131
$this->poll = $poll;
3232
$this->fh = $fh;
33+
$this->fd = \fopen('php://fd/' . $this->fh, 'r');
3334

3435
$this->onClose = new DeferredFuture;
3536
}
3637

38+
protected function getFileHandle()
39+
{
40+
if (!\is_resource($this->fd)) {
41+
throw new ClosedException("The file has been closed");
42+
}
43+
44+
return $this->fd;
45+
}
46+
3747
public function read(?Cancellation $cancellation = null, int $length = self::DEFAULT_READ_LENGTH): ?string
3848
{
3949
if ($this->isReading || !$this->queue->isEmpty()) {
@@ -99,6 +109,10 @@ public function close(): void
99109
return;
100110
}
101111

112+
if (\is_resource($this->fd)) {
113+
\fclose($this->fd);
114+
}
115+
102116
$this->closing = $this->onClose->getFuture();
103117
$this->poll->listen();
104118

@@ -111,6 +125,7 @@ public function close(): void
111125
$this->closing->await();
112126
} finally {
113127
$this->poll->done();
128+
$this->lockType = null;
114129
}
115130
}
116131

src/Driver/ParallelFile.php

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use Amp\DeferredFuture;
1010
use Amp\File\File;
1111
use Amp\File\Internal;
12+
use Amp\File\LockType;
1213
use Amp\File\PendingOperationError;
1314
use Amp\File\Whence;
1415
use Amp\Future;
@@ -42,6 +43,8 @@ final class ParallelFile implements File, \IteratorAggregate
4243

4344
private readonly DeferredFuture $onClose;
4445

46+
private ?LockType $lockType = null;
47+
4548
public function __construct(
4649
private readonly Internal\FileWorker $worker,
4750
int $id,
@@ -93,6 +96,7 @@ public function close(): void
9396
$this->closing->await();
9497
} finally {
9598
$this->onClose->complete();
99+
$this->lockType = null;
96100
}
97101
}
98102

@@ -142,6 +146,52 @@ public function eof(): bool
142146
return $this->pendingWrites === 0 && $this->size <= $this->position;
143147
}
144148

149+
public function lock(LockType $type, ?Cancellation $cancellation = null): void
150+
{
151+
$this->flock('lock', $type, $cancellation);
152+
$this->lockType = $type;
153+
}
154+
155+
public function tryLock(LockType $type): bool
156+
{
157+
$locked = $this->flock('try-lock', $type);
158+
if ($locked) {
159+
$this->lockType = $type;
160+
}
161+
162+
return $locked;
163+
}
164+
165+
public function unlock(): void
166+
{
167+
$this->flock('unlock');
168+
$this->lockType = null;
169+
}
170+
171+
public function getLockType(): ?LockType
172+
{
173+
return $this->lockType;
174+
}
175+
176+
private function flock(string $action, ?LockType $type = null, ?Cancellation $cancellation = null): bool
177+
{
178+
if ($this->id === null) {
179+
throw new ClosedException("The file has been closed");
180+
}
181+
182+
$this->busy = true;
183+
184+
try {
185+
return $this->worker->execute(new Internal\FileTask('flock', [$type, $action], $this->id), $cancellation);
186+
} catch (TaskFailureException $exception) {
187+
throw new StreamException("Attempting to lock the file failed", 0, $exception);
188+
} catch (WorkerException $exception) {
189+
throw new StreamException("Sending the task to the worker failed", 0, $exception);
190+
} finally {
191+
$this->busy = false;
192+
}
193+
}
194+
145195
public function read(?Cancellation $cancellation = null, int $length = self::DEFAULT_READ_LENGTH): ?string
146196
{
147197
if ($this->id === null) {

src/Driver/StatusCachingFile.php

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Amp\ByteStream\ReadableStreamIteratorAggregate;
66
use Amp\Cancellation;
77
use Amp\File\File;
8+
use Amp\File\LockType;
89
use Amp\File\Whence;
910

1011
/**
@@ -53,6 +54,26 @@ public function end(): void
5354
}
5455
}
5556

57+
public function lock(LockType $type, ?Cancellation $cancellation = null): void
58+
{
59+
$this->file->lock($type, $cancellation);
60+
}
61+
62+
public function tryLock(LockType $type): bool
63+
{
64+
return $this->file->tryLock($type);
65+
}
66+
67+
public function unlock(): void
68+
{
69+
$this->file->unlock();
70+
}
71+
72+
public function getLockType(): ?LockType
73+
{
74+
return $this->file->getLockType();
75+
}
76+
5677
public function close(): void
5778
{
5879
$this->file->close();

0 commit comments

Comments
 (0)