@@ -516,9 +516,11 @@ void TlsSocket::SetProactor(ProactorBase* p) {
516516
517517void TlsSocket::AsyncReq::MaybeSendOutputAsyncWithRead () {
518518 if (owner->engine_ ->OutputPending () != 0 ) {
519- // sync interface, works because we are still executing within a fiber
520- // used for "mocking" and shall be replaced on the next PR with actual async op
521- owner->MaybeSendOutput ();
519+ // Once the networking socket completes the write, it will start the read path
520+ // We use this bool to signal this.
521+ should_read = true ;
522+ StartUpstreamWrite ();
523+ return ;
522524 }
523525
524526 // TODO handle WRITE_IN_PROGRESS here by adding pending_blocked_
@@ -535,16 +537,17 @@ void TlsSocket::AsyncReq::AsyncProgressCb(io::Result<size_t> read_result) {
535537 VLOG (1 ) << " sock[" << owner->native_handle () << " ], state " << int (owner->state_ )
536538 << " , write_total:" << owner->upstream_write_ << " "
537539 << " pending output: " << owner->engine_ ->OutputPending () << " "
538- << " StartUpstreamRead failed " << read_result.error ();
540+ << " AsyncProgressCb failed " << read_result.error ();
539541 }
540542 // Erronous path. Apply the completion callback and exit.
541543 CompleteAsyncReq (read_result);
542544 return ;
543545 }
544546
545- DVLOG (1 ) << " HandleUpstreamRead " << *read_result << " bytes" ;
547+ DVLOG (1 ) << " AsyncProgressCb " << *read_result << " bytes" ;
546548 owner->engine_ ->CommitInput (*read_result);
547- Engine::OpResult engine_read = owner->MaybeReadFromEngine (vec, len);
549+ Engine::OpResult engine_read =
550+ owner->engine_ ->Read (reinterpret_cast <uint8_t *>(vec->iov_base ), vec->iov_len );
548551 if (engine_read > 0 ) {
549552 CompleteAsyncReq (engine_read);
550553 return ;
@@ -579,42 +582,21 @@ void TlsSocket::AsyncReq::HandleOpAsync() {
579582 case Engine::NEED_READ_AND_MAYBE_WRITE:
580583 MaybeSendOutputAsyncWithRead ();
581584 break ;
582- // TODO handle NEED_WRITE
585+ case Engine::NEED_WRITE:
586+ MaybeSendOutputAsync ();
587+ break ;
583588 default :
584589 // EOF_STREAM should be handled earlier
585590 LOG (DFATAL) << " Unsupported " << op_val;
586591 }
587592}
588593
589- Engine::OpResult TlsSocket::MaybeReadFromEngine (const iovec* v, uint32_t len) {
590- size_t read_len = std::min (v->iov_len , size_t (INT_MAX));
591- Engine::OpResult op_val = engine_->Read (reinterpret_cast <uint8_t *>(v->iov_base ), read_len);
592- DVLOG (2 ) << " Engine::Read " << read_len << " bytes, got " << op_val;
593- // if read_len == op_val we could try to read more. However, the next read might require
594- // an async operation on the underline socket because op_val < 0.
595- // The problem here is that SSL_read from engine_->Read is *not* idempotent and we might
596- // end up in a situation where we need to do two things at the same time:
597- // 1. Call the callers completion callback which will start another async op because
598- // we read less bytes than what was requested, i.e, read_total < sum_of_all(v->len).
599- // 2. Start another async operation to satisfy the protocol because op_val < 0 and we
600- // called engine_->Read which is *not* idempotent.
601- // For that, it's best to let it flow naturally. If there is some data in the engine read it
602- // and call the completion callback which will in turn try to read more from the engine.
603- // It will read everything or reach to a point that an async operation needs to be dispatched.
604- // That way, we get a linear view of the operations involved with the downside of a few more
605- // function calls (since we don't try to drain the whole engine as we don't know if the next
606- // read can be satisfied or dispatch as an async operation).
607- // Last but not least, it was advised here:
608- // https://github.com/romange/helio/pull/408#discussion_r2080998216
609- // That we should remove engine reads from the AsyncRequest all together and return
610- // to the caller if there was some data read.
611- return op_val;
612- }
613-
614594void TlsSocket::AsyncReadSome (const iovec* v, uint32_t len, io::AsyncProgressCb cb) {
595+ // Engine read
615596 CHECK (!async_read_req_);
616597
617- Engine::OpResult op_val = MaybeReadFromEngine (v, len);
598+ Engine::OpResult op_val = engine_->Read (reinterpret_cast <uint8_t *>(v->iov_base ), v->iov_len );
599+ DVLOG (2 ) << " Engine::Read tried to read " << v->iov_len << " bytes, got " << op_val;
618600 // We read some data from the engine. Satisfy the request and return.
619601 if (op_val > 0 ) {
620602 return cb (op_val);
@@ -632,5 +614,80 @@ void TlsSocket::AsyncReadSome(const iovec* v, uint32_t len, io::AsyncProgressCb
632614 async_read_req_->HandleOpAsync ();
633615}
634616
617+ void TlsSocket::AsyncReq::CompleteAsyncWrite (io::Result<size_t > write_result) {
618+ if (!write_result) {
619+ owner->state_ &= ~WRITE_IN_PROGRESS;
620+
621+ // broken_pipe - happens when the other side closes the connection. do not log this.
622+ if (write_result.error () != errc::broken_pipe) {
623+ VLOG (1 ) << " sock[" << owner->native_handle () << " ], state " << int (owner->state_ )
624+ << " , write_total:" << owner->upstream_write_ << " "
625+ << " pending output: " << owner->engine_ ->OutputPending ()
626+ << " HandleUpstreamAsyncWrite failed " << write_result.error ();
627+ }
628+
629+ // We are done. Errornous exit.
630+ CompleteAsyncReq (write_result);
631+ return ;
632+ }
633+
634+ CHECK_GT (*write_result, 0u );
635+ owner->upstream_write_ += *write_result;
636+ owner->engine_ ->ConsumeOutputBuf (*write_result);
637+ // We might have more data pending. Peek again.
638+ Buffer buffer = owner->engine_ ->PeekOutputBuf ();
639+
640+ // We are not done. Re-arm the async write until we drive it to completion or error.
641+ // We would also like to avoid fragmented socket writes so we make sure we drain it here
642+ if (!buffer.empty ()) {
643+ auto & scratch = scratch_iovec;
644+ scratch.iov_base = const_cast <uint8_t *>(buffer.data ());
645+ scratch.iov_len = buffer.size ();
646+ owner->next_sock_ ->AsyncWriteSome (
647+ &scratch, 1 , [this ](auto write_result) { CompleteAsyncWrite (write_result); });
648+ return ;
649+ }
650+
651+ if (owner->engine_ ->OutputPending () > 0 ) {
652+ LOG (DFATAL) << " ssl buffer is not empty with " << owner->engine_ ->OutputPending ()
653+ << " bytes. Async short write detected" ;
654+ }
655+
656+ owner->state_ &= ~WRITE_IN_PROGRESS;
657+
658+ // We are done with the writes, check if we also need to read because we are
659+ // in NEED_READ_AND_MAYBE_WRITE state
660+ if (should_read) {
661+ should_read = false ;
662+ StartUpstreamRead ();
663+ }
664+ }
665+
666+ void TlsSocket::AsyncReq::StartUpstreamWrite () {
667+ Engine::Buffer buffer = owner->engine_ ->PeekOutputBuf ();
668+ DCHECK (!buffer.empty ());
669+ DCHECK ((owner->state_ & WRITE_IN_PROGRESS) == 0 );
670+
671+ DVLOG (2 ) << " StartUpstreamWrite " << buffer.size ();
672+ // we do not allow concurrent writes from multiple fibers.
673+ owner->state_ |= WRITE_IN_PROGRESS;
674+
675+ auto & scratch = scratch_iovec;
676+ scratch.iov_base = const_cast <uint8_t *>(buffer.data ());
677+ scratch.iov_len = buffer.size ();
678+
679+ owner->next_sock_ ->AsyncWriteSome (
680+ &scratch, 1 , [this ](auto write_result) { CompleteAsyncWrite (write_result); });
681+ }
682+
683+ void TlsSocket::AsyncReq::MaybeSendOutputAsync () {
684+ if (owner->engine_ ->OutputPending () == 0 ) {
685+ return ;
686+ }
687+
688+ // TODO handle WRITE_IN_PROGRESS to avoid deadlock
689+ StartUpstreamWrite ();
690+ }
691+
635692} // namespace tls
636693} // namespace util
0 commit comments